diff options
Diffstat (limited to 'python/samba/tests/krb5/raw_testcase.py')
-rw-r--r-- | python/samba/tests/krb5/raw_testcase.py | 6221 |
1 files changed, 6221 insertions, 0 deletions
diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py new file mode 100644 index 0000000..90d286a --- /dev/null +++ b/python/samba/tests/krb5/raw_testcase.py @@ -0,0 +1,6221 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Isaac Boukris 2020 +# Copyright (C) Stefan Metzmacher 2020 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import sys +import socket +import struct +import time +import datetime +import random +import binascii +import itertools +import collections +import math + +from enum import Enum +from pprint import pprint + +from cryptography import x509 +from cryptography.hazmat.primitives import asymmetric, hashes +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend + +from pyasn1.codec.der.decoder import decode as pyasn1_der_decode +from pyasn1.codec.der.encoder import encode as pyasn1_der_encode +from pyasn1.codec.native.decoder import decode as pyasn1_native_decode +from pyasn1.codec.native.encoder import encode as pyasn1_native_encode + +from pyasn1.codec.ber.encoder import BitStringEncoder +import pyasn1.type.univ + +from pyasn1.error import PyAsn1Error + +from samba import unix2nttime +from samba.credentials import Credentials +from samba.dcerpc import claims, krb5pac, netlogon, samr, security +from samba.gensec import FEATURE_SEAL +from samba.ndr import ndr_pack, ndr_unpack +from samba.dcerpc.misc import ( + SEC_CHAN_WKSTA, + SEC_CHAN_BDC, +) + +import samba.tests +from samba.tests import TestCase + +import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 +from samba.tests.krb5.rfc4120_constants import ( + AD_IF_RELEVANT, + AD_WIN2K_PAC, + FX_FAST_ARMOR_AP_REQUEST, + KDC_ERR_CLIENT_REVOKED, + KDC_ERR_GENERIC, + KDC_ERR_POLICY, + KDC_ERR_PREAUTH_FAILED, + KDC_ERR_SKEW, + KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS, + KERB_ERR_TYPE_EXTENDED, + KRB_AP_REP, + KRB_AP_REQ, + KRB_AS_REP, + KRB_AS_REQ, + KRB_ERROR, + KRB_PRIV, + KRB_TGS_REP, + KRB_TGS_REQ, + KU_AP_REQ_AUTH, + KU_AP_REQ_ENC_PART, + KU_AS_FRESHNESS, + KU_AS_REP_ENC_PART, + KU_AS_REQ, + KU_ENC_CHALLENGE_KDC, + KU_FAST_ENC, + KU_FAST_FINISHED, + KU_FAST_REP, + KU_FAST_REQ_CHKSUM, + KU_KRB_PRIV, + KU_NON_KERB_CKSUM_SALT, + KU_NON_KERB_SALT, + KU_PKINIT_AS_REQ, + KU_TGS_REP_ENC_PART_SESSION, + KU_TGS_REP_ENC_PART_SUB_KEY, + KU_TGS_REQ_AUTH, + KU_TGS_REQ_AUTH_CKSUM, + KU_TGS_REQ_AUTH_DAT_SESSION, + KU_TGS_REQ_AUTH_DAT_SUBKEY, + KU_TICKET, + NT_PRINCIPAL, + NT_SRV_INST, + NT_WELLKNOWN, + PADATA_AS_FRESHNESS, + PADATA_ENCRYPTED_CHALLENGE, + PADATA_ENC_TIMESTAMP, + PADATA_ETYPE_INFO, + PADATA_ETYPE_INFO2, + PADATA_FOR_USER, + PADATA_FX_COOKIE, + PADATA_FX_ERROR, + PADATA_FX_FAST, + PADATA_GSS, + PADATA_KDC_REQ, + PADATA_PAC_OPTIONS, + PADATA_PAC_REQUEST, + PADATA_PKINIT_KX, + PADATA_PK_AS_REP, + PADATA_PK_AS_REP_19, + PADATA_PK_AS_REQ, + PADATA_PW_SALT, + PADATA_REQ_ENC_PA_REP, + PADATA_SUPPORTED_ETYPES, +) +import samba.tests.krb5.kcrypto as kcrypto + + +def BitStringEncoder_encodeValue32( + self, value, asn1Spec, encodeFun, **options): + # + # BitStrings like KDCOptions or TicketFlags should at least + # be 32-Bit on the wire + # + if asn1Spec is not None: + # TODO: try to avoid ASN.1 schema instantiation + value = asn1Spec.clone(value) + + valueLength = len(value) + if valueLength % 8: + alignedValue = value << (8 - valueLength % 8) + else: + alignedValue = value + + substrate = alignedValue.asOctets() + length = len(substrate) + # We need at least 32-Bit / 4-Bytes + if length < 4: + padding = 4 - length + else: + padding = 0 + ret = b'\x00' + substrate + (b'\x00' * padding) + return ret, False, True + + +BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32 + + +def BitString_NamedValues_prettyPrint(self, scope=0): + ret = "%s" % self.asBinary() + bits = [] + highest_bit = 32 + for byte in self.asNumbers(): + for bit in [7, 6, 5, 4, 3, 2, 1, 0]: + mask = 1 << bit + if byte & mask: + val = 1 + else: + val = 0 + bits.append(val) + if len(bits) < highest_bit: + for bitPosition in range(len(bits), highest_bit): + bits.append(0) + indent = " " * scope + delim = ": (\n%s " % indent + for bitPosition in range(highest_bit): + if bitPosition in self.prettyPrintNamedValues: + name = self.prettyPrintNamedValues[bitPosition] + elif bits[bitPosition] != 0: + name = "unknown-bit-%u" % bitPosition + else: + continue + ret += "%s%s:%u" % (delim, name, bits[bitPosition]) + delim = ",\n%s " % indent + ret += "\n%s)" % indent + return ret + + +krb5_asn1.TicketFlags.prettyPrintNamedValues =\ + krb5_asn1.TicketFlagsValues.namedValues +krb5_asn1.TicketFlags.namedValues =\ + krb5_asn1.TicketFlagsValues.namedValues +krb5_asn1.TicketFlags.prettyPrint =\ + BitString_NamedValues_prettyPrint +krb5_asn1.KDCOptions.prettyPrintNamedValues =\ + krb5_asn1.KDCOptionsValues.namedValues +krb5_asn1.KDCOptions.namedValues =\ + krb5_asn1.KDCOptionsValues.namedValues +krb5_asn1.KDCOptions.prettyPrint =\ + BitString_NamedValues_prettyPrint +krb5_asn1.APOptions.prettyPrintNamedValues =\ + krb5_asn1.APOptionsValues.namedValues +krb5_asn1.APOptions.namedValues =\ + krb5_asn1.APOptionsValues.namedValues +krb5_asn1.APOptions.prettyPrint =\ + BitString_NamedValues_prettyPrint +krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\ + krb5_asn1.PACOptionFlagsValues.namedValues +krb5_asn1.PACOptionFlags.namedValues =\ + krb5_asn1.PACOptionFlagsValues.namedValues +krb5_asn1.PACOptionFlags.prettyPrint =\ + BitString_NamedValues_prettyPrint + + +def Integer_NamedValues_prettyPrint(self, scope=0): + intval = int(self) + if intval in self.prettyPrintNamedValues: + name = self.prettyPrintNamedValues[intval] + else: + name = "<__unknown__>" + ret = "%d (0x%x) %s" % (intval, intval, name) + return ret + + +krb5_asn1.NameType.prettyPrintNamedValues =\ + krb5_asn1.NameTypeValues.namedValues +krb5_asn1.NameType.prettyPrint =\ + Integer_NamedValues_prettyPrint +krb5_asn1.AuthDataType.prettyPrintNamedValues =\ + krb5_asn1.AuthDataTypeValues.namedValues +krb5_asn1.AuthDataType.prettyPrint =\ + Integer_NamedValues_prettyPrint +krb5_asn1.PADataType.prettyPrintNamedValues =\ + krb5_asn1.PADataTypeValues.namedValues +krb5_asn1.PADataType.prettyPrint =\ + Integer_NamedValues_prettyPrint +krb5_asn1.EncryptionType.prettyPrintNamedValues =\ + krb5_asn1.EncryptionTypeValues.namedValues +krb5_asn1.EncryptionType.prettyPrint =\ + Integer_NamedValues_prettyPrint +krb5_asn1.ChecksumType.prettyPrintNamedValues =\ + krb5_asn1.ChecksumTypeValues.namedValues +krb5_asn1.ChecksumType.prettyPrint =\ + Integer_NamedValues_prettyPrint +krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\ + krb5_asn1.KerbErrorDataTypeValues.namedValues +krb5_asn1.KerbErrorDataType.prettyPrint =\ + Integer_NamedValues_prettyPrint + + +class Krb5EncryptionKey: + __slots__ = [ + 'ctype', + 'etype', + 'key', + 'kvno', + ] + + def __init__(self, key, kvno): + EncTypeChecksum = { + kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256, + kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128, + kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5, + } + self.key = key + self.etype = key.enctype + self.ctype = EncTypeChecksum[self.etype] + self.kvno = kvno + + def __str__(self): + return "etype=%d ctype=%d kvno=%d key=%s" % ( + self.etype, self.ctype, self.kvno, self.key) + + def encrypt(self, usage, plaintext): + ciphertext = kcrypto.encrypt(self.key, usage, plaintext) + return ciphertext + + def decrypt(self, usage, ciphertext): + plaintext = kcrypto.decrypt(self.key, usage, ciphertext) + return plaintext + + def make_zeroed_checksum(self, ctype=None): + if ctype is None: + ctype = self.ctype + + checksum_len = kcrypto.checksum_len(ctype) + return bytes(checksum_len) + + def make_checksum(self, usage, plaintext, ctype=None): + if ctype is None: + ctype = self.ctype + cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext) + return cksum + + def verify_checksum(self, usage, plaintext, ctype, cksum): + if self.ctype != ctype: + raise AssertionError(f'key checksum type ({self.ctype}) != ' + f'checksum type ({ctype})') + + kcrypto.verify_checksum(ctype, + self.key, + usage, + plaintext, + cksum) + + def export_obj(self): + EncryptionKey_obj = { + 'keytype': self.etype, + 'keyvalue': self.key.contents, + } + return EncryptionKey_obj + + +class RodcPacEncryptionKey(Krb5EncryptionKey): + __slots__ = ['rodc_id'] + + def __init__(self, key, kvno, rodc_id=None): + super().__init__(key, kvno) + + if rodc_id is None: + kvno = self.kvno + if kvno is not None: + kvno >>= 16 + kvno &= (1 << 16) - 1 + + rodc_id = kvno or None + + if rodc_id is not None: + self.rodc_id = rodc_id.to_bytes(2, byteorder='little') + else: + self.rodc_id = b'' + + def make_rodc_zeroed_checksum(self, ctype=None): + checksum = super().make_zeroed_checksum(ctype) + return checksum + bytes(len(self.rodc_id)) + + def make_rodc_checksum(self, usage, plaintext, ctype=None): + checksum = super().make_checksum(usage, plaintext, ctype) + return checksum + self.rodc_id + + def verify_rodc_checksum(self, usage, plaintext, ctype, cksum): + if self.rodc_id: + cksum, cksum_rodc_id = cksum[:-2], cksum[-2:] + + if self.rodc_id != cksum_rodc_id: + raise AssertionError(f'{self.rodc_id.hex()} != ' + f'{cksum_rodc_id.hex()}') + + super().verify_checksum(usage, + plaintext, + ctype, + cksum) + + +class ZeroedChecksumKey(RodcPacEncryptionKey): + def make_checksum(self, usage, plaintext, ctype=None): + return self.make_zeroed_checksum(ctype) + + def make_rodc_checksum(self, usage, plaintext, ctype=None): + return self.make_rodc_zeroed_checksum(ctype) + + +class WrongLengthChecksumKey(RodcPacEncryptionKey): + __slots__ = ['_length'] + + def __init__(self, key, kvno, length): + super().__init__(key, kvno) + + self._length = length + + @classmethod + def _adjust_to_length(cls, checksum, length): + diff = length - len(checksum) + if diff > 0: + checksum += bytes(diff) + elif diff < 0: + checksum = checksum[:length] + + return checksum + + def make_zeroed_checksum(self, ctype=None): + return bytes(self._length) + + def make_checksum(self, usage, plaintext, ctype=None): + checksum = super().make_checksum(usage, plaintext, ctype) + return self._adjust_to_length(checksum, self._length) + + def make_rodc_zeroed_checksum(self, ctype=None): + return bytes(self._length) + + def make_rodc_checksum(self, usage, plaintext, ctype=None): + checksum = super().make_rodc_checksum(usage, plaintext, ctype) + return self._adjust_to_length(checksum, self._length) + + +class KerberosCredentials(Credentials): + __slots__ = [ + '_private_key', + 'account_type', + 'ap_supported_enctypes', + 'as_supported_enctypes', + 'dn', + 'forced_keys', + 'forced_salt', + 'kvno', + 'sid', + 'spn', + 'tgs_supported_enctypes', + 'upn', + ] + + non_etype_bits = ( + security.KERB_ENCTYPE_FAST_SUPPORTED) | ( + security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED) | ( + security.KERB_ENCTYPE_CLAIMS_SUPPORTED) | ( + security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED) | ( + security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK) + + def __init__(self): + super().__init__() + all_enc_types = 0 + all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5 + all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 + all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 + + self.as_supported_enctypes = all_enc_types + self.tgs_supported_enctypes = all_enc_types + self.ap_supported_enctypes = all_enc_types + + self.kvno = None + self.forced_keys = {} + + self.forced_salt = None + + self.dn = None + self.upn = None + self.spn = None + self.sid = None + self.account_type = None + + self._private_key = None + + def set_as_supported_enctypes(self, value): + self.as_supported_enctypes = int(value) + + def set_tgs_supported_enctypes(self, value): + self.tgs_supported_enctypes = int(value) + + def set_ap_supported_enctypes(self, value): + self.ap_supported_enctypes = int(value) + + etype_map = collections.OrderedDict([ + (kcrypto.Enctype.AES256, + security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96), + (kcrypto.Enctype.AES128, + security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96), + (kcrypto.Enctype.RC4, + security.KERB_ENCTYPE_RC4_HMAC_MD5), + (kcrypto.Enctype.DES_MD5, + security.KERB_ENCTYPE_DES_CBC_MD5), + (kcrypto.Enctype.DES_CRC, + security.KERB_ENCTYPE_DES_CBC_CRC) + ]) + + @classmethod + def etypes_to_bits(cls, etypes): + bits = 0 + for etype in etypes: + bit = cls.etype_map[etype] + if bits & bit: + raise ValueError(f'Got duplicate etype: {etype}') + bits |= bit + + return bits + + @classmethod + def bits_to_etypes(cls, bits): + etypes = () + for etype, bit in cls.etype_map.items(): + if bit & bits: + bits &= ~bit + etypes += (etype,) + + bits &= ~cls.non_etype_bits + if bits != 0: + raise ValueError(f'Unsupported etype bits: {bits}') + + return etypes + + def get_as_krb5_etypes(self): + return self.bits_to_etypes(self.as_supported_enctypes) + + def get_tgs_krb5_etypes(self): + return self.bits_to_etypes(self.tgs_supported_enctypes) + + def get_ap_krb5_etypes(self): + return self.bits_to_etypes(self.ap_supported_enctypes) + + def set_kvno(self, kvno): + # Sign-extend from 32 bits. + if kvno & 1 << 31: + kvno |= -1 << 31 + self.kvno = kvno + + def get_kvno(self): + return self.kvno + + def set_forced_key(self, etype, hexkey): + etype = int(etype) + contents = binascii.a2b_hex(hexkey) + key = kcrypto.Key(etype, contents) + self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno) + + # Also set the NT hash of computer accounts for which we don’t know the + # password. + if etype == kcrypto.Enctype.RC4 and self.get_password() is None: + nt_hash = samr.Password() + nt_hash.hash = list(contents) + + self.set_nt_hash(nt_hash) + + def get_forced_key(self, etype): + etype = int(etype) + return self.forced_keys.get(etype) + + def set_forced_salt(self, salt): + self.forced_salt = bytes(salt) + + def get_forced_salt(self): + return self.forced_salt + + def get_salt(self): + if self.forced_salt is not None: + return self.forced_salt + + upn = self.get_upn() + if upn is not None: + salt_name = upn.rsplit('@', 1)[0].replace('/', '') + else: + salt_name = self.get_username() + + secure_schannel_type = self.get_secure_channel_type() + if secure_schannel_type in [SEC_CHAN_WKSTA,SEC_CHAN_BDC]: + salt_name = self.get_username().lower() + if salt_name[-1] == '$': + salt_name = salt_name[:-1] + salt_string = '%shost%s.%s' % ( + self.get_realm().upper(), + salt_name, + self.get_realm().lower()) + else: + salt_string = self.get_realm().upper() + salt_name + + return salt_string.encode('utf-8') + + def set_dn(self, dn): + self.dn = dn + + def get_dn(self): + return self.dn + + def set_spn(self, spn): + self.spn = spn + + def get_spn(self): + return self.spn + + def set_upn(self, upn): + self.upn = upn + + def get_upn(self): + return self.upn + + def set_sid(self, sid): + self.sid = sid + + def get_sid(self): + return self.sid + + def get_rid(self): + sid = self.get_sid() + if sid is None: + return None + + _, rid = sid.rsplit('-', 1) + return int(rid) + + def set_type(self, account_type): + self.account_type = account_type + + def get_type(self): + return self.account_type + + def update_password(self, password): + self.set_password(password) + self.set_kvno(self.get_kvno() + 1) + + def get_private_key(self): + if self._private_key is None: + # Generate a new keypair. + self._private_key = asymmetric.rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + + return self._private_key + + def get_public_key(self): + return self.get_private_key().public_key() + + +class KerberosTicketCreds: + __slots__ = [ + 'cname', + 'crealm', + 'decryption_key', + 'encpart_private', + 'session_key', + 'sname', + 'srealm', + 'ticket_private', + 'ticket', + ] + + def __init__(self, ticket, session_key, + crealm=None, cname=None, + srealm=None, sname=None, + decryption_key=None, + ticket_private=None, + encpart_private=None): + self.ticket = ticket + self.session_key = session_key + self.crealm = crealm + self.cname = cname + self.srealm = srealm + self.sname = sname + self.decryption_key = decryption_key + self.ticket_private = ticket_private + self.encpart_private = encpart_private + + def set_sname(self, sname): + self.ticket['sname'] = sname + self.sname = sname + + +class PkInit(Enum): + NOT_USED = object() + PUBLIC_KEY = object() + DIFFIE_HELLMAN = object() + + +class RawKerberosTest(TestCase): + """A raw Kerberos Test case.""" + + class KpasswdMode(Enum): + SET = object() + CHANGE = object() + + # The location of a SID within the PAC + class SidType(Enum): + BASE_SID = object() # in info3.base.groups + EXTRA_SID = object() # in info3.sids + RESOURCE_SID = object() # in resource_groups + PRIMARY_GID = object() # the (sole) primary group + + def __repr__(self): + return self.__str__() + + pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM, + krb5pac.PAC_TYPE_KDC_CHECKSUM, + krb5pac.PAC_TYPE_TICKET_CHECKSUM, + krb5pac.PAC_TYPE_FULL_CHECKSUM} + + etypes_to_test = ( + {"value": -1111, "name": "dummy", }, + {"value": kcrypto.Enctype.AES256, "name": "aes128", }, + {"value": kcrypto.Enctype.AES128, "name": "aes256", }, + {"value": kcrypto.Enctype.RC4, "name": "rc4", }, + ) + + expect_padata_outer = object() + + setup_etype_test_permutations_done = False + + @classmethod + def setup_etype_test_permutations(cls): + if cls.setup_etype_test_permutations_done: + return + + res = [] + + num_idxs = len(cls.etypes_to_test) + permutations = [] + for num in range(1, num_idxs + 1): + chunk = list(itertools.permutations(range(num_idxs), num)) + for e in chunk: + el = list(e) + permutations.append(el) + + for p in permutations: + name = None + etypes = () + for idx in p: + n = cls.etypes_to_test[idx]["name"] + if name is None: + name = n + else: + name += "_%s" % n + etypes += (cls.etypes_to_test[idx]["value"],) + + r = {"name": name, "etypes": etypes, } + res.append(r) + + cls.etype_test_permutations = res + cls.setup_etype_test_permutations_done = True + + @classmethod + def etype_test_permutation_name_idx(cls): + cls.setup_etype_test_permutations() + res = [] + idx = 0 + for e in cls.etype_test_permutations: + r = (e['name'], idx) + idx += 1 + res.append(r) + return res + + def etype_test_permutation_by_idx(self, idx): + e = self.etype_test_permutations[idx] + return (e['name'], e['etypes']) + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.host = samba.tests.env_get_var_value('SERVER') + cls.dc_host = samba.tests.env_get_var_value('DC_SERVER') + + # A dictionary containing credentials that have already been + # obtained. + cls.creds_dict = {} + + kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT', + allow_missing=True) + if kdc_fast_support is None: + kdc_fast_support = '0' + cls.kdc_fast_support = bool(int(kdc_fast_support)) + + kdc_claims_support = samba.tests.env_get_var_value('CLAIMS_SUPPORT', + allow_missing=True) + if kdc_claims_support is None: + kdc_claims_support = '0' + cls.kdc_claims_support = bool(int(kdc_claims_support)) + + kdc_compound_id_support = samba.tests.env_get_var_value( + 'COMPOUND_ID_SUPPORT', + allow_missing=True) + if kdc_compound_id_support is None: + kdc_compound_id_support = '0' + cls.kdc_compound_id_support = bool(int(kdc_compound_id_support)) + + tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT', + allow_missing=True) + if tkt_sig_support is None: + tkt_sig_support = '1' + cls.tkt_sig_support = bool(int(tkt_sig_support)) + + full_sig_support = samba.tests.env_get_var_value('FULL_SIG_SUPPORT', + allow_missing=True) + if full_sig_support is None: + full_sig_support = '1' + cls.full_sig_support = bool(int(full_sig_support)) + + expect_pac = samba.tests.env_get_var_value('EXPECT_PAC', + allow_missing=True) + if expect_pac is None: + expect_pac = '1' + cls.expect_pac = bool(int(expect_pac)) + + expect_extra_pac_buffers = samba.tests.env_get_var_value( + 'EXPECT_EXTRA_PAC_BUFFERS', + allow_missing=True) + if expect_extra_pac_buffers is None: + expect_extra_pac_buffers = '1' + cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers)) + + cname_checking = samba.tests.env_get_var_value('CHECK_CNAME', + allow_missing=True) + if cname_checking is None: + cname_checking = '1' + cls.cname_checking = bool(int(cname_checking)) + + padata_checking = samba.tests.env_get_var_value('CHECK_PADATA', + allow_missing=True) + if padata_checking is None: + padata_checking = '1' + cls.padata_checking = bool(int(padata_checking)) + + kadmin_is_tgs = samba.tests.env_get_var_value('KADMIN_IS_TGS', + allow_missing=True) + if kadmin_is_tgs is None: + kadmin_is_tgs = '0' + cls.kadmin_is_tgs = bool(int(kadmin_is_tgs)) + + default_etypes = samba.tests.env_get_var_value('DEFAULT_ETYPES', + allow_missing=True) + if default_etypes is not None: + default_etypes = int(default_etypes) + cls.default_etypes = default_etypes + + forced_rc4 = samba.tests.env_get_var_value('FORCED_RC4', + allow_missing=True) + if forced_rc4 is None: + forced_rc4 = '0' + cls.forced_rc4 = bool(int(forced_rc4)) + + expect_nt_hash = samba.tests.env_get_var_value('EXPECT_NT_HASH', + allow_missing=True) + if expect_nt_hash is None: + expect_nt_hash = '1' + cls.expect_nt_hash = bool(int(expect_nt_hash)) + + expect_nt_status = samba.tests.env_get_var_value('EXPECT_NT_STATUS', + allow_missing=True) + if expect_nt_status is None: + expect_nt_status = '1' + cls.expect_nt_status = bool(int(expect_nt_status)) + + crash_windows = samba.tests.env_get_var_value('CRASH_WINDOWS', + allow_missing=True) + if crash_windows is None: + crash_windows = '1' + cls.crash_windows = bool(int(crash_windows)) + + def setUp(self): + super().setUp() + self.do_asn1_print = False + self.do_hexdump = False + + strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', + allow_missing=True) + if strict_checking is None: + strict_checking = '1' + self.strict_checking = bool(int(strict_checking)) + + self.s = None + + self.unspecified_kvno = object() + + def tearDown(self): + self._disconnect("tearDown") + super().tearDown() + + def _disconnect(self, reason): + if self.s is None: + return + self.s.close() + self.s = None + if self.do_hexdump: + sys.stderr.write("disconnect[%s]\n" % reason) + + def _connect_tcp(self, host, port=None): + if port is None: + port = 88 + try: + self.a = socket.getaddrinfo(host, port, socket.AF_UNSPEC, + socket.SOCK_STREAM, socket.SOL_TCP, + 0) + self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2]) + self.s.settimeout(10) + self.s.connect(self.a[0][4]) + except socket.error: + self.s.close() + raise + + def connect(self, host, port=None): + self.assertNotConnected() + self._connect_tcp(host, port) + if self.do_hexdump: + sys.stderr.write("connected[%s]\n" % host) + + def env_get_var(self, varname, prefix, + fallback_default=True, + allow_missing=False): + val = None + if prefix is not None: + allow_missing_prefix = allow_missing or fallback_default + val = samba.tests.env_get_var_value( + '%s_%s' % (prefix, varname), + allow_missing=allow_missing_prefix) + else: + fallback_default = True + if val is None and fallback_default: + val = samba.tests.env_get_var_value(varname, + allow_missing=allow_missing) + return val + + def _get_krb5_creds_from_env(self, prefix, + default_username=None, + allow_missing_password=False, + allow_missing_keys=True, + require_strongest_key=False): + c = KerberosCredentials() + c.guess() + + domain = self.env_get_var('DOMAIN', prefix) + realm = self.env_get_var('REALM', prefix) + allow_missing_username = default_username is not None + username = self.env_get_var('USERNAME', prefix, + fallback_default=False, + allow_missing=allow_missing_username) + if username is None: + username = default_username + password = self.env_get_var('PASSWORD', prefix, + fallback_default=False, + allow_missing=allow_missing_password) + c.set_domain(domain) + c.set_realm(realm) + c.set_username(username) + if password is not None: + c.set_password(password) + as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES', + prefix, allow_missing=True) + if as_supported_enctypes is not None: + c.set_as_supported_enctypes(as_supported_enctypes) + tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES', + prefix, allow_missing=True) + if tgs_supported_enctypes is not None: + c.set_tgs_supported_enctypes(tgs_supported_enctypes) + ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES', + prefix, allow_missing=True) + if ap_supported_enctypes is not None: + c.set_ap_supported_enctypes(ap_supported_enctypes) + + if require_strongest_key: + kvno_allow_missing = False + if password is None: + aes256_allow_missing = False + else: + aes256_allow_missing = True + else: + kvno_allow_missing = allow_missing_keys + aes256_allow_missing = allow_missing_keys + kvno = self.env_get_var('KVNO', prefix, + fallback_default=False, + allow_missing=kvno_allow_missing) + if kvno is not None: + c.set_kvno(int(kvno)) + aes256_key = self.env_get_var('AES256_KEY_HEX', prefix, + fallback_default=False, + allow_missing=aes256_allow_missing) + if aes256_key is not None: + c.set_forced_key(kcrypto.Enctype.AES256, aes256_key) + aes128_key = self.env_get_var('AES128_KEY_HEX', prefix, + fallback_default=False, + allow_missing=True) + if aes128_key is not None: + c.set_forced_key(kcrypto.Enctype.AES128, aes128_key) + rc4_key = self.env_get_var('RC4_KEY_HEX', prefix, + fallback_default=False, allow_missing=True) + if rc4_key is not None: + c.set_forced_key(kcrypto.Enctype.RC4, rc4_key) + + if not allow_missing_keys: + self.assertTrue(c.forced_keys, + 'Please supply %s encryption keys ' + 'in environment' % prefix) + + return c + + def _get_krb5_creds(self, + prefix, + default_username=None, + allow_missing_password=False, + allow_missing_keys=True, + require_strongest_key=False, + fallback_creds_fn=None): + if prefix in self.creds_dict: + return self.creds_dict[prefix] + + # We don't have the credentials already + creds = None + env_err = None + try: + # Try to obtain them from the environment + creds = self._get_krb5_creds_from_env( + prefix, + default_username=default_username, + allow_missing_password=allow_missing_password, + allow_missing_keys=allow_missing_keys, + require_strongest_key=require_strongest_key) + except Exception as err: + # An error occurred, so save it for later + env_err = err + else: + self.assertIsNotNone(creds) + # Save the obtained credentials + self.creds_dict[prefix] = creds + return creds + + if fallback_creds_fn is not None: + try: + # Try to use the fallback method + creds = fallback_creds_fn() + except Exception as err: + print("ERROR FROM ENV: %r" % (env_err)) + print("FALLBACK-FN: %s" % (fallback_creds_fn)) + print("FALLBACK-ERROR: %r" % (err)) + else: + self.assertIsNotNone(creds) + # Save the obtained credentials + self.creds_dict[prefix] = creds + return creds + + # Both methods failed, so raise the exception from the + # environment method + raise env_err + + def get_user_creds(self, + allow_missing_password=False, + allow_missing_keys=True): + c = self._get_krb5_creds(prefix=None, + allow_missing_password=allow_missing_password, + allow_missing_keys=allow_missing_keys) + return c + + def get_service_creds(self, + allow_missing_password=False, + allow_missing_keys=True): + c = self._get_krb5_creds(prefix='SERVICE', + allow_missing_password=allow_missing_password, + allow_missing_keys=allow_missing_keys) + return c + + def get_client_creds(self, + allow_missing_password=False, + allow_missing_keys=True): + c = self._get_krb5_creds(prefix='CLIENT', + allow_missing_password=allow_missing_password, + allow_missing_keys=allow_missing_keys) + return c + + def get_server_creds(self, + allow_missing_password=False, + allow_missing_keys=True): + c = self._get_krb5_creds(prefix='SERVER', + allow_missing_password=allow_missing_password, + allow_missing_keys=allow_missing_keys) + return c + + def get_admin_creds(self, + allow_missing_password=False, + allow_missing_keys=True): + c = self._get_krb5_creds(prefix='ADMIN', + allow_missing_password=allow_missing_password, + allow_missing_keys=allow_missing_keys) + c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL) + c.set_workstation('') + return c + + def get_rodc_krbtgt_creds(self, + require_keys=True, + require_strongest_key=False): + if require_strongest_key: + self.assertTrue(require_keys) + c = self._get_krb5_creds(prefix='RODC_KRBTGT', + allow_missing_password=True, + allow_missing_keys=not require_keys, + require_strongest_key=require_strongest_key) + return c + + def get_krbtgt_creds(self, + require_keys=True, + require_strongest_key=False): + if require_strongest_key: + self.assertTrue(require_keys) + c = self._get_krb5_creds(prefix='KRBTGT', + default_username='krbtgt', + allow_missing_password=True, + allow_missing_keys=not require_keys, + require_strongest_key=require_strongest_key) + return c + + def get_anon_creds(self): + c = Credentials() + c.set_anonymous() + return c + + # Overridden by KDCBaseTest. At this level we don't know what actual + # enctypes are supported, so the best we can do is go by whether NT hashes + # are expected and whether the account is a workstation or not. This + # matches the behaviour that tests expect by default. + def get_default_enctypes(self, creds): + self.assertIsNotNone(creds) + + default_enctypes = [ + kcrypto.Enctype.AES256, + kcrypto.Enctype.AES128, + ] + + if self.expect_nt_hash or creds.get_workstation(): + default_enctypes.append(kcrypto.Enctype.RC4) + + return default_enctypes + + def asn1_dump(self, name, obj, asn1_print=None): + if asn1_print is None: + asn1_print = self.do_asn1_print + if asn1_print: + if name is not None: + sys.stderr.write("%s:\n%s" % (name, obj)) + else: + sys.stderr.write("%s" % (obj)) + + def hex_dump(self, name, blob, hexdump=None): + if hexdump is None: + hexdump = self.do_hexdump + if hexdump: + sys.stderr.write( + "%s: %d\n%s" % (name, len(blob), self.hexdump(blob))) + + def der_decode( + self, + blob, + asn1Spec=None, + native_encode=True, + asn1_print=None, + hexdump=None): + if asn1Spec is not None: + class_name = type(asn1Spec).__name__.split(':')[0] + else: + class_name = "<None-asn1Spec>" + self.hex_dump(class_name, blob, hexdump=hexdump) + obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec) + self.asn1_dump(None, obj, asn1_print=asn1_print) + if native_encode: + obj = pyasn1_native_encode(obj) + return obj + + def der_encode( + self, + obj, + asn1Spec=None, + native_decode=True, + asn1_print=None, + hexdump=None): + if native_decode: + obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec) + class_name = type(obj).__name__.split(':')[0] + if class_name is not None: + self.asn1_dump(None, obj, asn1_print=asn1_print) + blob = pyasn1_der_encode(obj) + if class_name is not None: + self.hex_dump(class_name, blob, hexdump=hexdump) + return blob + + def send_pdu(self, req, asn1_print=None, hexdump=None): + k5_pdu = self.der_encode( + req, native_decode=False, asn1_print=asn1_print, hexdump=False) + self.send_msg(k5_pdu, hexdump=hexdump) + + def send_msg(self, msg, hexdump=None): + header = struct.pack('>I', len(msg)) + req_pdu = header + req_pdu += msg + self.hex_dump("send_msg", header, hexdump=hexdump) + self.hex_dump("send_msg", msg, hexdump=hexdump) + + try: + while True: + sent = self.s.send(req_pdu, 0) + if sent == len(req_pdu): + return + req_pdu = req_pdu[sent:] + except socket.error as e: + self._disconnect("send_msg: %s" % e) + raise + + def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None): + rep_pdu = None + try: + if timeout is not None: + self.s.settimeout(timeout) + rep_pdu = self.s.recv(num_recv, 0) + self.s.settimeout(10) + if len(rep_pdu) == 0: + self._disconnect("recv_raw: EOF") + return None + self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump) + except socket.timeout: + self.s.settimeout(10) + sys.stderr.write("recv_raw: TIMEOUT\n") + except socket.error as e: + self._disconnect("recv_raw: %s" % e) + raise + return rep_pdu + + def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None): + raw_pdu = self.recv_raw( + num_recv=4, hexdump=hexdump, timeout=timeout) + if raw_pdu is None: + return None + header = struct.unpack(">I", raw_pdu[0:4]) + k5_len = header[0] + if k5_len == 0: + return "" + missing = k5_len + rep_pdu = b'' + while missing > 0: + raw_pdu = self.recv_raw( + num_recv=missing, hexdump=hexdump, timeout=timeout) + self.assertGreaterEqual(len(raw_pdu), 1) + rep_pdu += raw_pdu + missing = k5_len - len(rep_pdu) + return rep_pdu + + def recv_reply(self, asn1_print=None, hexdump=None, timeout=None): + rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print, + hexdump=hexdump, + timeout=timeout) + if not rep_pdu: + return None, rep_pdu + k5_raw = self.der_decode( + rep_pdu, + asn1Spec=None, + native_encode=False, + asn1_print=False, + hexdump=False) + pvno = k5_raw['field-0'] + self.assertEqual(pvno, 5) + msg_type = k5_raw['field-1'] + self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR]) + if msg_type == KRB_AS_REP: + asn1Spec = krb5_asn1.AS_REP() + elif msg_type == KRB_TGS_REP: + asn1Spec = krb5_asn1.TGS_REP() + elif msg_type == KRB_ERROR: + asn1Spec = krb5_asn1.KRB_ERROR() + rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec, + asn1_print=asn1_print, hexdump=False) + return (rep, rep_pdu) + + def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None): + (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print, + hexdump=hexdump, + timeout=timeout) + return rep + + def assertIsConnected(self): + self.assertIsNotNone(self.s, msg="Not connected") + + def assertNotConnected(self): + self.assertIsNone(self.s, msg="Is connected") + + def send_recv_transaction( + self, + req, + asn1_print=None, + hexdump=None, + timeout=None, + to_rodc=False): + host = self.host if to_rodc else self.dc_host + self.connect(host) + try: + self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump) + rep = self.recv_pdu( + asn1_print=asn1_print, hexdump=hexdump, timeout=timeout) + except Exception: + self._disconnect("transaction failed") + raise + self._disconnect("transaction done") + return rep + + def getElementValue(self, obj, elem): + return obj.get(elem) + + def assertElementMissing(self, obj, elem): + v = self.getElementValue(obj, elem) + self.assertIsNone(v) + + def assertElementPresent(self, obj, elem, expect_empty=False): + v = self.getElementValue(obj, elem) + self.assertIsNotNone(v) + if self.strict_checking: + if isinstance(v, collections.abc.Container): + if expect_empty: + self.assertEqual(0, len(v)) + else: + self.assertNotEqual(0, len(v)) + + def assertElementEqual(self, obj, elem, value): + v = self.getElementValue(obj, elem) + self.assertIsNotNone(v) + self.assertEqual(v, value) + + def assertElementEqualUTF8(self, obj, elem, value): + v = self.getElementValue(obj, elem) + self.assertIsNotNone(v) + self.assertEqual(v, bytes(value, 'utf8')) + + def assertPrincipalEqual(self, princ1, princ2): + self.assertEqual(princ1['name-type'], princ2['name-type']) + self.assertEqual( + len(princ1['name-string']), + len(princ2['name-string']), + msg="princ1=%s != princ2=%s" % (princ1, princ2)) + for idx in range(len(princ1['name-string'])): + self.assertEqual( + princ1['name-string'][idx], + princ2['name-string'][idx], + msg="princ1=%s != princ2=%s" % (princ1, princ2)) + + def assertElementEqualPrincipal(self, obj, elem, value): + v = self.getElementValue(obj, elem) + self.assertIsNotNone(v) + v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName()) + self.assertPrincipalEqual(v, value) + + def assertElementKVNO(self, obj, elem, value): + v = self.getElementValue(obj, elem) + if value == "autodetect": + value = v + if value is not None: + self.assertIsNotNone(v) + # The value on the wire should never be 0 + self.assertNotEqual(v, 0) + # unspecified_kvno means we don't know the kvno, + # but want to enforce its presence + if value is not self.unspecified_kvno: + value = int(value) + self.assertNotEqual(value, 0) + self.assertEqual(v, value) + else: + self.assertIsNone(v) + + def assertElementFlags(self, obj, elem, expected, unexpected): + v = self.getElementValue(obj, elem) + self.assertIsNotNone(v) + if expected is not None: + self.assertIsInstance(expected, krb5_asn1.TicketFlags) + for i, flag in enumerate(expected): + if flag == 1: + self.assertEqual('1', v[i], + f"'{expected.namedValues[i]}' " + f"expected in {v}") + if unexpected is not None: + self.assertIsInstance(unexpected, krb5_asn1.TicketFlags) + for i, flag in enumerate(unexpected): + if flag == 1: + self.assertEqual('0', v[i], + f"'{unexpected.namedValues[i]}' " + f"unexpected in {v}") + + def assertSequenceElementsEqual(self, expected, got, *, + require_strict=None, + unchecked=None, + require_ordered=True): + if self.strict_checking and require_ordered and not unchecked: + self.assertEqual(expected, got) + else: + fail_msg = f'expected: {expected} got: {got}' + + ignored = set() + if unchecked: + ignored.update(unchecked) + if require_strict and not self.strict_checking: + ignored.update(require_strict) + + if ignored: + fail_msg += f' (ignoring: {ignored})' + expected = (x for x in expected if x not in ignored) + got = (x for x in got if x not in ignored) + + self.assertCountEqual(expected, got, fail_msg) + + def get_KerberosTimeWithUsec(self, epoch=None, offset=None): + if epoch is None: + epoch = time.time() + if offset is not None: + epoch = epoch + int(offset) + dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc) + return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond) + + def get_KerberosTime(self, epoch=None, offset=None): + (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset) + return s + + def get_EpochFromKerberosTime(self, kerberos_time): + if isinstance(kerberos_time, bytes): + kerberos_time = kerberos_time.decode() + + epoch = datetime.datetime.strptime(kerberos_time, + '%Y%m%d%H%M%SZ') + epoch = epoch.replace(tzinfo=datetime.timezone.utc) + epoch = int(epoch.timestamp()) + + return epoch + + def get_Nonce(self): + nonce_min = 0x7f000000 + nonce_max = 0x7fffffff + v = random.randint(nonce_min, nonce_max) + return v + + def get_pa_dict(self, pa_data): + pa_dict = {} + + if pa_data is not None: + for pa in pa_data: + pa_type = pa['padata-type'] + if pa_type in pa_dict: + raise RuntimeError(f'Duplicate type {pa_type}') + pa_dict[pa_type] = pa['padata-value'] + + return pa_dict + + def SessionKey_create(self, etype, contents, kvno=None): + key = kcrypto.Key(etype, contents) + return RodcPacEncryptionKey(key, kvno) + + def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None, + params=None): + self.assertIsNotNone(pwd) + self.assertIsNotNone(salt) + key = kcrypto.string_to_key(etype, pwd, salt, params=params) + return RodcPacEncryptionKey(key, kvno) + + def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None): + e = etype_info2['etype'] + salt = etype_info2.get('salt') + _params = etype_info2.get('s2kparams') + return self.PasswordKey_from_etype(creds, e, + kvno=kvno, + salt=salt) + + def PasswordKey_from_creds(self, creds, etype): + kvno = creds.get_kvno() + salt = creds.get_salt() + return self.PasswordKey_from_etype(creds, etype, + kvno=kvno, + salt=salt) + + def PasswordKey_from_etype(self, creds, etype, kvno=None, salt=None): + if etype == kcrypto.Enctype.RC4: + nthash = creds.get_nt_hash() + return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno) + + password = creds.get_password().encode('utf-8') + return self.PasswordKey_create( + etype=etype, pwd=password, salt=salt, kvno=kvno) + + def TicketDecryptionKey_from_creds(self, creds, etype=None): + + if etype is None: + etypes = creds.get_tgs_krb5_etypes() + if etypes and etypes[0] not in (kcrypto.Enctype.DES_CRC, + kcrypto.Enctype.DES_MD5): + etype = etypes[0] + else: + etype = kcrypto.Enctype.RC4 + + forced_key = creds.get_forced_key(etype) + if forced_key is not None: + return forced_key + + kvno = creds.get_kvno() + + fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] " + "nor a password specified, " % ( + creds.get_username(), etype, kvno)) + + if etype == kcrypto.Enctype.RC4: + nthash = creds.get_nt_hash() + self.assertIsNotNone(nthash, msg=fail_msg) + return self.SessionKey_create(etype=etype, + contents=nthash, + kvno=kvno) + + password = creds.get_password() + self.assertIsNotNone(password, msg=fail_msg) + salt = creds.get_salt() + return self.PasswordKey_create(etype=etype, + pwd=password, + salt=salt, + kvno=kvno) + + def RandomKey(self, etype): + e = kcrypto._get_enctype_profile(etype) + contents = samba.generate_random_bytes(e.keysize) + return self.SessionKey_create(etype=etype, contents=contents) + + def EncryptionKey_import(self, EncryptionKey_obj): + return self.SessionKey_create(EncryptionKey_obj['keytype'], + EncryptionKey_obj['keyvalue']) + + def EncryptedData_create(self, key, usage, plaintext): + # EncryptedData ::= SEQUENCE { + # etype [0] Int32 -- EncryptionType --, + # kvno [1] Int32 OPTIONAL, + # cipher [2] OCTET STRING -- ciphertext + # } + ciphertext = key.encrypt(usage, plaintext) + EncryptedData_obj = { + 'etype': key.etype, + 'cipher': ciphertext + } + if key.kvno is not None: + EncryptedData_obj['kvno'] = key.kvno + return EncryptedData_obj + + def Checksum_create(self, key, usage, plaintext, ctype=None): + # Checksum ::= SEQUENCE { + # cksumtype [0] Int32, + # checksum [1] OCTET STRING + # } + if ctype is None: + ctype = key.ctype + checksum = key.make_checksum(usage, plaintext, ctype=ctype) + Checksum_obj = { + 'cksumtype': ctype, + 'checksum': checksum, + } + return Checksum_obj + + @classmethod + def PrincipalName_create(cls, name_type, names): + # PrincipalName ::= SEQUENCE { + # name-type [0] Int32, + # name-string [1] SEQUENCE OF KerberosString + # } + PrincipalName_obj = { + 'name-type': name_type, + 'name-string': names, + } + return PrincipalName_obj + + def AuthorizationData_create(self, ad_type, ad_data): + # AuthorizationData ::= SEQUENCE { + # ad-type [0] Int32, + # ad-data [1] OCTET STRING + # } + AUTH_DATA_obj = { + 'ad-type': ad_type, + 'ad-data': ad_data + } + return AUTH_DATA_obj + + def PA_DATA_create(self, padata_type, padata_value): + # PA-DATA ::= SEQUENCE { + # -- NOTE: first tag is [1], not [0] + # padata-type [1] Int32, + # padata-value [2] OCTET STRING -- might be encoded AP-REQ + # } + PA_DATA_obj = { + 'padata-type': padata_type, + 'padata-value': padata_value, + } + return PA_DATA_obj + + def PA_ENC_TS_ENC_create(self, ts, usec): + # PA-ENC-TS-ENC ::= SEQUENCE { + # patimestamp[0] KerberosTime, -- client's time + # pausec[1] krb5int32 OPTIONAL + # } + PA_ENC_TS_ENC_obj = { + 'patimestamp': ts, + 'pausec': usec, + } + return PA_ENC_TS_ENC_obj + + def PA_PAC_OPTIONS_create(self, options): + # PA-PAC-OPTIONS ::= SEQUENCE { + # options [0] PACOptionFlags + # } + PA_PAC_OPTIONS_obj = { + 'options': options + } + return PA_PAC_OPTIONS_obj + + def KRB_FAST_ARMOR_create(self, armor_type, armor_value): + # KrbFastArmor ::= SEQUENCE { + # armor-type [0] Int32, + # armor-value [1] OCTET STRING, + # ... + # } + KRB_FAST_ARMOR_obj = { + 'armor-type': armor_type, + 'armor-value': armor_value + } + return KRB_FAST_ARMOR_obj + + def KRB_FAST_REQ_create(self, fast_options, padata, req_body): + # KrbFastReq ::= SEQUENCE { + # fast-options [0] FastOptions, + # padata [1] SEQUENCE OF PA-DATA, + # req-body [2] KDC-REQ-BODY, + # ... + # } + KRB_FAST_REQ_obj = { + 'fast-options': fast_options, + 'padata': padata, + 'req-body': req_body + } + return KRB_FAST_REQ_obj + + def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req): + # KrbFastArmoredReq ::= SEQUENCE { + # armor [0] KrbFastArmor OPTIONAL, + # req-checksum [1] Checksum, + # enc-fast-req [2] EncryptedData -- KrbFastReq -- + # } + KRB_FAST_ARMORED_REQ_obj = { + 'req-checksum': req_checksum, + 'enc-fast-req': enc_fast_req + } + if armor is not None: + KRB_FAST_ARMORED_REQ_obj['armor'] = armor + return KRB_FAST_ARMORED_REQ_obj + + def PA_FX_FAST_REQUEST_create(self, armored_data): + # PA-FX-FAST-REQUEST ::= CHOICE { + # armored-data [0] KrbFastArmoredReq, + # ... + # } + PA_FX_FAST_REQUEST_obj = { + 'armored-data': armored_data + } + return PA_FX_FAST_REQUEST_obj + + def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True): + # KERB-PA-PAC-REQUEST ::= SEQUENCE { + # include-pac[0] BOOLEAN --If TRUE, and no pac present, + # -- include PAC. + # --If FALSE, and PAC present, + # -- remove PAC. + # } + KERB_PA_PAC_REQUEST_obj = { + 'include-pac': include_pac, + } + if not pa_data_create: + return KERB_PA_PAC_REQUEST_obj + pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj, + asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST()) + pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac) + return pa_data + + def get_pa_pac_options(self, options): + pac_options = self.PA_PAC_OPTIONS_create(options) + pac_options = self.der_encode(pac_options, + asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) + pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options) + + return pac_options + + def KDC_REQ_BODY_create(self, + kdc_options, + cname, + realm, + sname, + from_time, + till_time, + renew_time, + nonce, + etypes, + addresses, + additional_tickets, + EncAuthorizationData, + EncAuthorizationData_key, + EncAuthorizationData_usage, + asn1_print=None, + hexdump=None): + # KDC-REQ-BODY ::= SEQUENCE { + # kdc-options [0] KDCOptions, + # cname [1] PrincipalName OPTIONAL + # -- Used only in AS-REQ --, + # realm [2] Realm + # -- Server's realm + # -- Also client's in AS-REQ --, + # sname [3] PrincipalName OPTIONAL, + # from [4] KerberosTime OPTIONAL, + # till [5] KerberosTime, + # rtime [6] KerberosTime OPTIONAL, + # nonce [7] UInt32, + # etype [8] SEQUENCE OF Int32 + # -- EncryptionType + # -- in preference order --, + # addresses [9] HostAddresses OPTIONAL, + # enc-authorization-data [10] EncryptedData OPTIONAL + # -- AuthorizationData --, + # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL + # -- NOTE: not empty + # } + if EncAuthorizationData is not None: + enc_ad_plain = self.der_encode( + EncAuthorizationData, + asn1Spec=krb5_asn1.AuthorizationData(), + asn1_print=asn1_print, + hexdump=hexdump) + enc_ad = self.EncryptedData_create(EncAuthorizationData_key, + EncAuthorizationData_usage, + enc_ad_plain) + else: + enc_ad = None + KDC_REQ_BODY_obj = { + 'kdc-options': kdc_options, + 'realm': realm, + 'till': till_time, + 'nonce': nonce, + 'etype': etypes, + } + if cname is not None: + KDC_REQ_BODY_obj['cname'] = cname + if sname is not None: + KDC_REQ_BODY_obj['sname'] = sname + if from_time is not None: + KDC_REQ_BODY_obj['from'] = from_time + if renew_time is not None: + KDC_REQ_BODY_obj['rtime'] = renew_time + if addresses is not None: + KDC_REQ_BODY_obj['addresses'] = addresses + if enc_ad is not None: + KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad + if additional_tickets is not None: + KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets + return KDC_REQ_BODY_obj + + def KDC_REQ_create(self, + msg_type, + padata, + req_body, + asn1Spec=None, + asn1_print=None, + hexdump=None): + # KDC-REQ ::= SEQUENCE { + # -- NOTE: first tag is [1], not [0] + # pvno [1] INTEGER (5) , + # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), + # padata [3] SEQUENCE OF PA-DATA OPTIONAL + # -- NOTE: not empty --, + # req-body [4] KDC-REQ-BODY + # } + # + KDC_REQ_obj = { + 'pvno': 5, + 'msg-type': msg_type, + 'req-body': req_body, + } + if padata is not None: + KDC_REQ_obj['padata'] = padata + if asn1Spec is not None: + KDC_REQ_decoded = pyasn1_native_decode( + KDC_REQ_obj, asn1Spec=asn1Spec) + else: + KDC_REQ_decoded = None + return KDC_REQ_obj, KDC_REQ_decoded + + def AS_REQ_create(self, + padata, # optional + kdc_options, # required + cname, # optional + realm, # required + sname, # optional + from_time, # optional + till_time, # required + renew_time, # optional + nonce, # required + etypes, # required + addresses, # optional + additional_tickets, + native_decoded_only=True, + asn1_print=None, + hexdump=None): + # KDC-REQ ::= SEQUENCE { + # -- NOTE: first tag is [1], not [0] + # pvno [1] INTEGER (5) , + # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), + # padata [3] SEQUENCE OF PA-DATA OPTIONAL + # -- NOTE: not empty --, + # req-body [4] KDC-REQ-BODY + # } + # + # KDC-REQ-BODY ::= SEQUENCE { + # kdc-options [0] KDCOptions, + # cname [1] PrincipalName OPTIONAL + # -- Used only in AS-REQ --, + # realm [2] Realm + # -- Server's realm + # -- Also client's in AS-REQ --, + # sname [3] PrincipalName OPTIONAL, + # from [4] KerberosTime OPTIONAL, + # till [5] KerberosTime, + # rtime [6] KerberosTime OPTIONAL, + # nonce [7] UInt32, + # etype [8] SEQUENCE OF Int32 + # -- EncryptionType + # -- in preference order --, + # addresses [9] HostAddresses OPTIONAL, + # enc-authorization-data [10] EncryptedData OPTIONAL + # -- AuthorizationData --, + # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL + # -- NOTE: not empty + # } + KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create( + kdc_options, + cname, + realm, + sname, + from_time, + till_time, + renew_time, + nonce, + etypes, + addresses, + additional_tickets, + EncAuthorizationData=None, + EncAuthorizationData_key=None, + EncAuthorizationData_usage=None, + asn1_print=asn1_print, + hexdump=hexdump) + obj, decoded = self.KDC_REQ_create( + msg_type=KRB_AS_REQ, + padata=padata, + req_body=KDC_REQ_BODY_obj, + asn1Spec=krb5_asn1.AS_REQ(), + asn1_print=asn1_print, + hexdump=hexdump) + if native_decoded_only: + return decoded + return decoded, obj + + def AP_REQ_create(self, ap_options, ticket, authenticator): + # AP-REQ ::= [APPLICATION 14] SEQUENCE { + # pvno [0] INTEGER (5), + # msg-type [1] INTEGER (14), + # ap-options [2] APOptions, + # ticket [3] Ticket, + # authenticator [4] EncryptedData -- Authenticator + # } + AP_REQ_obj = { + 'pvno': 5, + 'msg-type': KRB_AP_REQ, + 'ap-options': ap_options, + 'ticket': ticket, + 'authenticator': authenticator, + } + return AP_REQ_obj + + def Authenticator_create( + self, crealm, cname, cksum, cusec, ctime, subkey, seq_number, + authorization_data): + # -- Unencrypted authenticator + # Authenticator ::= [APPLICATION 2] SEQUENCE { + # authenticator-vno [0] INTEGER (5), + # crealm [1] Realm, + # cname [2] PrincipalName, + # cksum [3] Checksum OPTIONAL, + # cusec [4] Microseconds, + # ctime [5] KerberosTime, + # subkey [6] EncryptionKey OPTIONAL, + # seq-number [7] UInt32 OPTIONAL, + # authorization-data [8] AuthorizationData OPTIONAL + # } + Authenticator_obj = { + 'authenticator-vno': 5, + 'crealm': crealm, + 'cname': cname, + 'cusec': cusec, + 'ctime': ctime, + } + if cksum is not None: + Authenticator_obj['cksum'] = cksum + if subkey is not None: + Authenticator_obj['subkey'] = subkey + if seq_number is not None: + Authenticator_obj['seq-number'] = seq_number + if authorization_data is not None: + Authenticator_obj['authorization-data'] = authorization_data + return Authenticator_obj + + def PKAuthenticator_create(self, + cusec, + ctime, + nonce, + *, + pa_checksum=None, + freshness_token=None, + kdc_name=None, + kdc_realm=None, + win2k_variant=False): + if win2k_variant: + self.assertIsNone(pa_checksum) + self.assertIsNone(freshness_token) + self.assertIsNotNone(kdc_name) + self.assertIsNotNone(kdc_realm) + else: + self.assertIsNone(kdc_name) + self.assertIsNone(kdc_realm) + + pk_authenticator_obj = { + 'cusec': cusec, + 'ctime': ctime, + 'nonce': nonce, + } + if pa_checksum is not None: + pk_authenticator_obj['paChecksum'] = pa_checksum + if freshness_token is not None: + pk_authenticator_obj['freshnessToken'] = freshness_token + if kdc_name is not None: + pk_authenticator_obj['kdcName'] = kdc_name + if kdc_realm is not None: + pk_authenticator_obj['kdcRealm'] = kdc_realm + + return pk_authenticator_obj + + def TGS_REQ_create(self, + padata, # optional + cusec, + ctime, + ticket, + kdc_options, # required + cname, # optional + realm, # required + sname, # optional + from_time, # optional + till_time, # required + renew_time, # optional + nonce, # required + etypes, # required + addresses, # optional + EncAuthorizationData, + EncAuthorizationData_key, + additional_tickets, + ticket_session_key, + authenticator_subkey=None, + body_checksum_type=None, + native_decoded_only=True, + asn1_print=None, + hexdump=None): + # KDC-REQ ::= SEQUENCE { + # -- NOTE: first tag is [1], not [0] + # pvno [1] INTEGER (5) , + # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), + # padata [3] SEQUENCE OF PA-DATA OPTIONAL + # -- NOTE: not empty --, + # req-body [4] KDC-REQ-BODY + # } + # + # KDC-REQ-BODY ::= SEQUENCE { + # kdc-options [0] KDCOptions, + # cname [1] PrincipalName OPTIONAL + # -- Used only in AS-REQ --, + # realm [2] Realm + # -- Server's realm + # -- Also client's in AS-REQ --, + # sname [3] PrincipalName OPTIONAL, + # from [4] KerberosTime OPTIONAL, + # till [5] KerberosTime, + # rtime [6] KerberosTime OPTIONAL, + # nonce [7] UInt32, + # etype [8] SEQUENCE OF Int32 + # -- EncryptionType + # -- in preference order --, + # addresses [9] HostAddresses OPTIONAL, + # enc-authorization-data [10] EncryptedData OPTIONAL + # -- AuthorizationData --, + # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL + # -- NOTE: not empty + # } + + if authenticator_subkey is not None: + EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY + else: + EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION + + req_body = self.KDC_REQ_BODY_create( + kdc_options=kdc_options, + cname=None, + realm=realm, + sname=sname, + from_time=from_time, + till_time=till_time, + renew_time=renew_time, + nonce=nonce, + etypes=etypes, + addresses=addresses, + additional_tickets=additional_tickets, + EncAuthorizationData=EncAuthorizationData, + EncAuthorizationData_key=EncAuthorizationData_key, + EncAuthorizationData_usage=EncAuthorizationData_usage) + req_body_blob = self.der_encode(req_body, + asn1Spec=krb5_asn1.KDC_REQ_BODY(), + asn1_print=asn1_print, hexdump=hexdump) + + req_body_checksum = self.Checksum_create(ticket_session_key, + KU_TGS_REQ_AUTH_CKSUM, + req_body_blob, + ctype=body_checksum_type) + + subkey_obj = None + if authenticator_subkey is not None: + subkey_obj = authenticator_subkey.export_obj() + seq_number = random.randint(0, 0xfffffffe) + authenticator = self.Authenticator_create( + crealm=realm, + cname=cname, + cksum=req_body_checksum, + cusec=cusec, + ctime=ctime, + subkey=subkey_obj, + seq_number=seq_number, + authorization_data=None) + authenticator = self.der_encode( + authenticator, + asn1Spec=krb5_asn1.Authenticator(), + asn1_print=asn1_print, + hexdump=hexdump) + + authenticator = self.EncryptedData_create( + ticket_session_key, KU_TGS_REQ_AUTH, authenticator) + + ap_options = krb5_asn1.APOptions('0') + ap_req = self.AP_REQ_create(ap_options=str(ap_options), + ticket=ticket, + authenticator=authenticator) + ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(), + asn1_print=asn1_print, hexdump=hexdump) + pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req) + if padata is not None: + padata.append(pa_tgs_req) + else: + padata = [pa_tgs_req] + + obj, decoded = self.KDC_REQ_create( + msg_type=KRB_TGS_REQ, + padata=padata, + req_body=req_body, + asn1Spec=krb5_asn1.TGS_REQ(), + asn1_print=asn1_print, + hexdump=hexdump) + if native_decoded_only: + return decoded + return decoded, obj + + def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None): + # PA-S4U2Self ::= SEQUENCE { + # name [0] PrincipalName, + # realm [1] Realm, + # cksum [2] Checksum, + # auth [3] GeneralString + # } + cksum_data = name['name-type'].to_bytes(4, byteorder='little') + for n in name['name-string']: + cksum_data += n.encode() + cksum_data += realm.encode() + cksum_data += "Kerberos".encode() + cksum = self.Checksum_create(tgt_session_key, + KU_NON_KERB_CKSUM_SALT, + cksum_data, + ctype) + + PA_S4U2Self_obj = { + 'name': name, + 'realm': realm, + 'cksum': cksum, + 'auth': "Kerberos", + } + pa_s4u2self = self.der_encode( + PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self()) + return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self) + + def ChangePasswdDataMS_create(self, + new_password, + target_princ=None, + target_realm=None): + ChangePasswdDataMS_obj = { + 'newpasswd': new_password, + } + if target_princ is not None: + ChangePasswdDataMS_obj['targname'] = target_princ + if target_realm is not None: + ChangePasswdDataMS_obj['targrealm'] = target_realm + + change_password_data = self.der_encode( + ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS()) + + return change_password_data + + def KRB_PRIV_create(self, + subkey, + user_data, + s_address, + timestamp=None, + usec=None, + seq_number=None, + r_address=None): + EncKrbPrivPart_obj = { + 'user-data': user_data, + 's-address': s_address, + } + if timestamp is not None: + EncKrbPrivPart_obj['timestamp'] = timestamp + if usec is not None: + EncKrbPrivPart_obj['usec'] = usec + if seq_number is not None: + EncKrbPrivPart_obj['seq-number'] = seq_number + if r_address is not None: + EncKrbPrivPart_obj['r-address'] = r_address + + enc_krb_priv_part = self.der_encode( + EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart()) + + enc_data = self.EncryptedData_create(subkey, + KU_KRB_PRIV, + enc_krb_priv_part) + + KRB_PRIV_obj = { + 'pvno': 5, + 'msg-type': KRB_PRIV, + 'enc-part': enc_data, + } + + krb_priv = self.der_encode( + KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV()) + + return krb_priv + + def ContentInfo_create(self, content_type, content): + content_info_obj = { + 'contentType': content_type, + 'content': content, + } + + return content_info_obj + + def EncapsulatedContentInfo_create(self, content_type, content): + encapsulated_content_info_obj = { + 'eContentType': content_type, + 'eContent': content, + } + + return encapsulated_content_info_obj + + def SignedData_create(self, + digest_algorithms, + encap_content_info, + signer_infos, + *, + version=None, + certificates=None, + crls=None): + def is_cert_version_present(version): + return certificates is not None and any( + version in cert for cert in certificates) + + def is_crl_version_present(version): + return crls is not None and any( + version in crl for crl in crls) + + def is_signer_info_version_present(version): + return signer_infos is not None and any( + signer_info['version'] == version + for signer_info in signer_infos) + + def data_version(): + # per RFC5652 5.1: + if is_cert_version_present('other') or ( + is_crl_version_present('other')): + return 5 + + if is_cert_version_present('v2AttrCert'): + return 4 + + if is_cert_version_present('v1AttrCert') or ( + is_signer_info_version_present(3)) or ( + encap_content_info['eContentType'] != krb5_asn1.id_data + ): + return 3 + + return 1 + + if version is None: + version = data_version() + + signed_data_obj = { + 'version': version, + 'digestAlgorithms': digest_algorithms, + 'encapContentInfo': encap_content_info, + 'signerInfos': signer_infos, + } + + if certificates is not None: + signed_data_obj['certificates'] = certificates + if crls is not None: + signed_data_obj['crls'] = crls + + return signed_data_obj + + def AuthPack_create(self, + pk_authenticator, + *, + client_public_value=None, + supported_cms_types=None, + client_dh_nonce=None, + win2k_variant=False): + if win2k_variant: + self.assertIsNone(supported_cms_types) + self.assertIsNone(client_dh_nonce) + + auth_pack_obj = { + 'pkAuthenticator': pk_authenticator, + } + + if client_public_value is not None: + auth_pack_obj['clientPublicValue'] = client_public_value + if supported_cms_types is not None: + auth_pack_obj['supportedCMSTypes'] = supported_cms_types + if client_dh_nonce is not None: + auth_pack_obj['clientDHNonce'] = client_dh_nonce + + return auth_pack_obj + + def PK_AS_REQ_create(self, + signed_auth_pack, + *, + trusted_certifiers=None, + kdc_pk_id=None, + kdc_cert=None, + encryption_cert=None, + win2k_variant=False): + if win2k_variant: + self.assertIsNone(kdc_pk_id) + asn1_spec = krb5_asn1.PA_PK_AS_REQ_Win2k + else: + self.assertIsNone(kdc_cert) + self.assertIsNone(encryption_cert) + asn1_spec = krb5_asn1.PA_PK_AS_REQ + + content_info_obj = self.ContentInfo_create( + krb5_asn1.id_signedData, signed_auth_pack) + content_info = self.der_encode(content_info_obj, + asn1Spec=krb5_asn1.ContentInfo()) + + pk_as_req_obj = { + 'signedAuthPack': content_info, + } + + if trusted_certifiers is not None: + pk_as_req_obj['trustedCertifiers'] = trusted_certifiers + if kdc_pk_id is not None: + pk_as_req_obj['kdcPkId'] = kdc_pk_id + if kdc_cert is not None: + pk_as_req_obj['kdcCert'] = kdc_cert + if encryption_cert is not None: + pk_as_req_obj['encryptionCert'] = encryption_cert + + return self.der_encode(pk_as_req_obj, asn1Spec=asn1_spec()) + + def SignerInfo_create(self, + signer_id, + digest_algorithm, + signature_algorithm, + signature, + *, + version=None, + signed_attrs=None, + unsigned_attrs=None): + if version is None: + # per RFC5652 5.3: + if 'issuerAndSerialNumber' in signer_id: + version = 1 + elif 'subjectKeyIdentifier' in signer_id: + version = 3 + else: + self.fail(f'unknown signer ID version ({signer_id})') + + signer_info_obj = { + 'version': version, + 'sid': signer_id, + 'digestAlgorithm': digest_algorithm, + 'signatureAlgorithm': signature_algorithm, + 'signature': signature, + } + + if signed_attrs is not None: + signer_info_obj['signedAttrs'] = signed_attrs + if unsigned_attrs is not None: + signer_info_obj['unsignedAttrs'] = unsigned_attrs + + return signer_info_obj + + def SignerIdentifier_create(self, *, + issuer_and_serial_number=None, + subject_key_id=None): + if issuer_and_serial_number is not None: + return {'issuerAndSerialNumber': issuer_and_serial_number} + + if subject_key_id is not None: + return {'subjectKeyIdentifier': subject_key_id} + + self.fail('identifier not specified') + + def AlgorithmIdentifier_create(self, + algorithm, + *, + parameters=None): + algorithm_id_obj = { + 'algorithm': algorithm, + } + + if parameters is not None: + algorithm_id_obj['parameters'] = parameters + + return algorithm_id_obj + + def SubjectPublicKeyInfo_create(self, + algorithm, + public_key): + return { + 'algorithm': algorithm, + 'subjectPublicKey': public_key, + } + + def ValidationParms_create(self, + seed, + pgen_counter): + return { + 'seed': seed, + 'pgenCounter': pgen_counter, + } + + def DomainParameters_create(self, + p, + g, + *, + q=None, + j=None, + validation_parms=None): + domain_params_obj = { + 'p': p, + 'g': g, + } + + if q is not None: + domain_params_obj['q'] = q + if j is not None: + domain_params_obj['j'] = j + if validation_parms is not None: + domain_params_obj['validationParms'] = validation_parms + + return domain_params_obj + + def length_in_bytes(self, value): + """Return the length in bytes of an integer once it is encoded as + bytes.""" + + self.assertGreaterEqual(value, 0, 'value must be positive') + self.assertIsInstance(value, int) + + length_in_bits = max(1, math.log2(value + 1)) + length_in_bytes = math.ceil(length_in_bits / 8) + return length_in_bytes + + def bytes_from_int(self, value, *, length=None): + """Return an integer encoded big-endian into bytes of an optionally + specified length. + """ + if length is None: + length = self.length_in_bytes(value) + return value.to_bytes(length, 'big') + + def int_from_bytes(self, data): + """Return an integer decoded from bytes in big-endian format.""" + return int.from_bytes(data, 'big') + + def int_from_bit_string(self, string): + """Return an integer decoded from a bitstring.""" + return int(string, base=2) + + def bit_string_from_int(self, value): + """Return a bitstring encoding of an integer.""" + + string = f'{value:b}' + + # The bitstring must be padded to a multiple of 8 bits in length, or + # pyasn1 will interpret it incorrectly (as if the padding bits were + # present, but on the wrong end). + length = len(string) + padding_len = math.ceil(length / 8) * 8 - length + return '0' * padding_len + string + + def bit_string_from_bytes(self, data): + """Return a bitstring encoding of bytes in big-endian format.""" + value = self.int_from_bytes(data) + return self.bit_string_from_int(value) + + def bytes_from_bit_string(self, string): + """Return big-endian format bytes encoded from a bitstring.""" + value = self.int_from_bit_string(string) + length = math.ceil(len(string) / 8) + return value.to_bytes(length, 'big') + + def asn1_length(self, data): + """Return the ASN.1 encoding of the length of some data.""" + + length = len(data) + + self.assertGreater(length, 0) + if length < 0x80: + return bytes([length]) + + encoding_len = self.length_in_bytes(length) + self.assertLess(encoding_len, 0x80, + 'item is too long to be ASN.1 encoded') + + data = self.bytes_from_int(length, length=encoding_len) + return bytes([0x80 | encoding_len]) + data + + @staticmethod + def octetstring2key(x, enctype): + """This implements the function defined in RFC4556 3.2.3.1 “Using + Diffie-Hellman Key Exchange”.""" + + seedsize = kcrypto.seedsize(enctype) + seed = b'' + + # A counter that cycles through the bytes 0x00–0xff. + counter = itertools.cycle(map(lambda x: bytes([x]), + range(256))) + + while len(seed) < seedsize: + digest = hashes.Hash(hashes.SHA1(), default_backend()) + digest.update(next(counter) + x) + seed += digest.finalize() + + key = kcrypto.random_to_key(enctype, seed[:seedsize]) + return RodcPacEncryptionKey(key, kvno=None) + + def unpad(self, data): + """Return unpadded data.""" + padding_len = data[-1] + expected_padding = bytes([padding_len]) * padding_len + self.assertEqual(expected_padding, data[-padding_len:], + 'invalid padding bytes') + + return data[:-padding_len] + + def try_decode(self, data, module=None): + """Try to decode some data of unknown type with various known ASN.1 + schemata (optionally restricted to those from a particular module) and + print any results that seem promising. For use when debugging. + """ + + if module is None: + # Try a couple of known ASN.1 modules. + self.try_decode(data, krb5_asn1) + self.try_decode(data, pyasn1.type.univ) + + # It’s helpful to stop and give the user a chance to examine the + # results. + self.fail('decoding done') + + names = dir(module) + for name in names: + item = getattr(module, name) + if not callable(item): + continue + + try: + decoded = self.der_decode(data, asn1Spec=item()) + except Exception: + # Initiating the schema or decoding the ASN.1 failed for + # whatever reason. + pass + else: + # Decoding succeeded: print the structure to be examined. + print(f'\t{name}') + pprint(decoded) + + def cipher_from_algorithm(self, algorithm): + if algorithm == str(krb5_asn1.aes256_CBC_PAD): + return algorithms.AES + + if algorithm == str(krb5_asn1.des_EDE3_CBC): + return algorithms.TripleDES + + self.fail(f'unknown cipher algorithm {algorithm}') + + def hash_from_algorithm(self, algorithm): + # Let someone pass in an ObjectIdentifier. + algorithm = str(algorithm) + + if algorithm == str(krb5_asn1.id_sha1): + return hashes.SHA1 + + if algorithm == str(krb5_asn1.sha1WithRSAEncryption): + return hashes.SHA1 + + if algorithm == str(krb5_asn1.rsaEncryption): + return hashes.SHA1 + + if algorithm == str(krb5_asn1.id_pkcs1_sha256WithRSAEncryption): + return hashes.SHA256 + + if algorithm == str(krb5_asn1.id_sha512): + return hashes.SHA512 + + self.fail(f'unknown hash algorithm {algorithm}') + + def hash_from_algorithm_id(self, algorithm_id): + self.assertIsInstance(algorithm_id, dict) + + hash = self.hash_from_algorithm(algorithm_id['algorithm']) + + parameters = algorithm_id.get('parameters') + if self.strict_checking: + self.assertIsNotNone(parameters) + if parameters is not None: + self.assertEqual(b'\x05\x00', parameters) + + return hash + + def create_freshness_token(self, + epoch=None, + *, + offset=None, + krbtgt_creds=None): + timestamp, usec = self.get_KerberosTimeWithUsec(epoch, offset) + + # Encode the freshness token as PA-ENC-TS-ENC. + ts_enc = self.PA_ENC_TS_ENC_create(timestamp, usec) + ts_enc = self.der_encode(ts_enc, asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) + + if krbtgt_creds is None: + krbtgt_creds = self.get_krbtgt_creds() + krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) + + # Encrypt the freshness token. + freshness = self.EncryptedData_create(krbtgt_key, KU_AS_FRESHNESS, ts_enc) + + freshness_token = self.der_encode(freshness, + asn1Spec=krb5_asn1.EncryptedData()) + + # Prepend a couple of zero bytes. + freshness_token = bytes(2) + freshness_token + + return freshness_token + + def kpasswd_create(self, + subkey, + user_data, + version, + seq_number, + ap_req, + local_address, + remote_address): + self.assertIsNotNone(self.s, 'call self.connect() first') + + timestamp, usec = self.get_KerberosTimeWithUsec() + + krb_priv = self.KRB_PRIV_create(subkey, + user_data, + s_address=local_address, + timestamp=timestamp, + usec=usec, + seq_number=seq_number, + r_address=remote_address) + + size = 6 + len(ap_req) + len(krb_priv) + self.assertLess(size, 0x10000) + + msg = bytearray() + msg.append(size >> 8) + msg.append(size & 0xff) + msg.append(version >> 8) + msg.append(version & 0xff) + msg.append(len(ap_req) >> 8) + msg.append(len(ap_req) & 0xff) + # Note: for sets, there could be a little-endian four-byte length here. + + msg.extend(ap_req) + msg.extend(krb_priv) + + return msg + + def get_enc_part(self, obj, key, usage): + self.assertElementEqual(obj, 'pvno', 5) + + enc_part = obj['enc-part'] + self.assertElementEqual(enc_part, 'etype', key.etype) + self.assertElementKVNO(enc_part, 'kvno', key.kvno) + + enc_part = key.decrypt(usage, enc_part['cipher']) + + return enc_part + + def kpasswd_exchange(self, + ticket, + new_password, + expected_code, + expected_msg, + mode, + target_princ=None, + target_realm=None, + ap_options=None, + send_seq_number=True): + if mode is self.KpasswdMode.SET: + version = 0xff80 + user_data = self.ChangePasswdDataMS_create(new_password, + target_princ, + target_realm) + elif mode is self.KpasswdMode.CHANGE: + self.assertIsNone(target_princ, + 'target_princ only valid for pw set') + self.assertIsNone(target_realm, + 'target_realm only valid for pw set') + + version = 1 + user_data = new_password.encode('utf-8') + else: + self.fail(f'invalid mode {mode}') + + subkey = self.RandomKey(kcrypto.Enctype.AES256) + + if ap_options is None: + ap_options = '0' + ap_options = str(krb5_asn1.APOptions(ap_options)) + + kdc_exchange_dict = { + 'tgt': ticket, + 'authenticator_subkey': subkey, + 'auth_data': None, + 'ap_options': ap_options, + } + + if send_seq_number: + seq_number = random.randint(0, 0xfffffffe) + else: + seq_number = None + + ap_req = self.generate_ap_req(kdc_exchange_dict, + None, + req_body=None, + armor=False, + usage=KU_AP_REQ_AUTH, + seq_number=seq_number) + + self.connect(self.host, port=464) + self.assertIsNotNone(self.s) + + family = self.s.family + + if family == socket.AF_INET: + addr_type = 2 # IPv4 + elif family == socket.AF_INET6: + addr_type = 24 # IPv6 + else: + self.fail(f'unknown family {family}') + + def create_address(ip): + return { + 'addr-type': addr_type, + 'address': socket.inet_pton(family, ip), + } + + local_ip = self.s.getsockname()[0] + local_address = create_address(local_ip) + + # remote_ip = self.s.getpeername()[0] + # remote_address = create_address(remote_ip) + + # TODO: due to a bug (?), MIT Kerberos will not accept the request + # unless r-address is set to our _local_ address. Heimdal, on the other + # hand, requires the r-address is set to the remote address (as + # expected). To avoid problems, avoid sending r-address for now. + remote_address = None + + msg = self.kpasswd_create(subkey, + user_data, + version, + seq_number, + ap_req, + local_address, + remote_address) + + self.send_msg(msg) + rep_pdu = self.recv_pdu_raw() + + self._disconnect('transaction done') + + self.assertIsNotNone(rep_pdu) + + header = rep_pdu[:6] + reply = rep_pdu[6:] + + reply_len = (header[0] << 8) | header[1] + reply_version = (header[2] << 8) | header[3] + ap_rep_len = (header[4] << 8) | header[5] + + self.assertEqual(reply_len, len(rep_pdu)) + self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW + self.assertLess(ap_rep_len, reply_len) + + self.assertNotEqual(0x7e, rep_pdu[1]) + self.assertNotEqual(0x5e, rep_pdu[1]) + + if ap_rep_len: + # We received an AP-REQ and KRB-PRIV as a response. This may or may + # not indicate an error, depending on the status code. + ap_rep = reply[:ap_rep_len] + krb_priv = reply[ap_rep_len:] + + key = ticket.session_key + + ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP()) + self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP) + enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART) + enc_part = self.der_decode( + enc_part, asn1Spec=krb5_asn1.EncAPRepPart()) + + self.assertElementPresent(enc_part, 'ctime') + self.assertElementPresent(enc_part, 'cusec') + # self.assertElementMissing(enc_part, 'subkey') # TODO + # self.assertElementPresent(enc_part, 'seq-number') # TODO + + try: + krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV()) + except PyAsn1Error: + self.fail() + + self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV) + priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV) + priv_enc_part = self.der_decode( + priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart()) + + self.assertElementMissing(priv_enc_part, 'timestamp') + self.assertElementMissing(priv_enc_part, 'usec') + # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO + # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO + # self.assertElementMissing(priv_enc_part, 'r-address') # TODO + + result_data = priv_enc_part['user-data'] + else: + # We received a KRB-ERROR as a response, indicating an error. + krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR()) + + sname = self.PrincipalName_create( + name_type=NT_PRINCIPAL, + names=['kadmin', 'changepw']) + realm = self.get_krbtgt_creds().get_realm().upper() + + self.assertElementEqual(krb_error, 'pvno', 5) + self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR) + self.assertElementMissing(krb_error, 'ctime') + self.assertElementMissing(krb_error, 'usec') + self.assertElementPresent(krb_error, 'stime') + self.assertElementPresent(krb_error, 'susec') + + error_code = krb_error['error-code'] + if isinstance(expected_code, int): + self.assertEqual(error_code, expected_code) + else: + self.assertIn(error_code, expected_code) + + self.assertElementMissing(krb_error, 'crealm') + self.assertElementMissing(krb_error, 'cname') + self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8')) + self.assertElementEqualPrincipal(krb_error, 'sname', sname) + self.assertElementMissing(krb_error, 'e-text') + + result_data = krb_error['e-data'] + + status = result_data[:2] + message = result_data[2:] + + status_code = (status[0] << 8) | status[1] + if isinstance(expected_code, int): + self.assertEqual(status_code, expected_code) + else: + self.assertIn(status_code, expected_code) + + if not message: + self.assertEqual(0, status_code, + 'got an error result, but no message') + return + + # Check the first character of the message. + if message[0]: + if isinstance(expected_msg, bytes): + self.assertEqual(message, expected_msg) + else: + self.assertIn(message, expected_msg) + else: + # We got AD password policy information. + self.assertEqual(30, len(message)) + + (empty_bytes, + min_length, + history_length, + properties, + expire_time, + min_age) = struct.unpack('>HIIIQQ', message) + + def _generic_kdc_exchange(self, + kdc_exchange_dict, # required + cname=None, # optional + realm=None, # required + sname=None, # optional + from_time=None, # optional + till_time=None, # required + renew_time=None, # optional + etypes=None, # required + addresses=None, # optional + additional_tickets=None, # optional + EncAuthorizationData=None, # optional + EncAuthorizationData_key=None, # optional + EncAuthorizationData_usage=None): # optional + + check_error_fn = kdc_exchange_dict['check_error_fn'] + check_rep_fn = kdc_exchange_dict['check_rep_fn'] + generate_fast_fn = kdc_exchange_dict['generate_fast_fn'] + generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn'] + generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn'] + generate_padata_fn = kdc_exchange_dict['generate_padata_fn'] + callback_dict = kdc_exchange_dict['callback_dict'] + req_msg_type = kdc_exchange_dict['req_msg_type'] + req_asn1Spec = kdc_exchange_dict['req_asn1Spec'] + rep_msg_type = kdc_exchange_dict['rep_msg_type'] + + expected_error_mode = kdc_exchange_dict['expected_error_mode'] + kdc_options = kdc_exchange_dict['kdc_options'] + + pac_request = kdc_exchange_dict['pac_request'] + pac_options = kdc_exchange_dict['pac_options'] + + # Parameters specific to the inner request body + inner_req = kdc_exchange_dict['inner_req'] + + # Parameters specific to the outer request body + outer_req = kdc_exchange_dict['outer_req'] + + if till_time is None: + till_time = self.get_KerberosTime(offset=36000) + + if 'nonce' in kdc_exchange_dict: + nonce = kdc_exchange_dict['nonce'] + else: + nonce = self.get_Nonce() + kdc_exchange_dict['nonce'] = nonce + + req_body = self.KDC_REQ_BODY_create( + kdc_options=kdc_options, + cname=cname, + realm=realm, + sname=sname, + from_time=from_time, + till_time=till_time, + renew_time=renew_time, + nonce=nonce, + etypes=etypes, + addresses=addresses, + additional_tickets=additional_tickets, + EncAuthorizationData=EncAuthorizationData, + EncAuthorizationData_key=EncAuthorizationData_key, + EncAuthorizationData_usage=EncAuthorizationData_usage) + + inner_req_body = dict(req_body) + if inner_req is not None: + for key, value in inner_req.items(): + if value is not None: + inner_req_body[key] = value + else: + del inner_req_body[key] + if outer_req is not None: + for key, value in outer_req.items(): + if value is not None: + req_body[key] = value + else: + del req_body[key] + + additional_padata = [] + if pac_request is not None: + pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request) + additional_padata.append(pa_pac_request) + if pac_options is not None: + pa_pac_options = self.get_pa_pac_options(pac_options) + additional_padata.append(pa_pac_options) + + if req_msg_type == KRB_AS_REQ: + tgs_req = None + tgs_req_padata = None + else: + self.assertEqual(KRB_TGS_REQ, req_msg_type) + + tgs_req = self.generate_ap_req(kdc_exchange_dict, + callback_dict, + req_body, + armor=False) + tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req) + + if generate_fast_padata_fn is not None: + self.assertIsNotNone(generate_fast_fn) + # This can alter req_body... + fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict, + callback_dict, + req_body) + else: + fast_padata = [] + + if generate_fast_armor_fn is not None: + self.assertIsNotNone(generate_fast_fn) + fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict, + callback_dict, + None, + armor=True) + + fast_armor_type = kdc_exchange_dict['fast_armor_type'] + fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type, + fast_ap_req) + else: + fast_armor = None + + if generate_padata_fn is not None: + # This can alter req_body... + outer_padata, req_body = generate_padata_fn(kdc_exchange_dict, + callback_dict, + req_body) + self.assertIsNotNone(outer_padata) + self.assertNotIn(PADATA_KDC_REQ, + [pa['padata-type'] for pa in outer_padata], + 'Don\'t create TGS-REQ manually') + else: + outer_padata = None + + if generate_fast_fn is not None: + armor_key = kdc_exchange_dict['armor_key'] + self.assertIsNotNone(armor_key) + + if req_msg_type == KRB_AS_REQ: + checksum_blob = self.der_encode( + req_body, + asn1Spec=krb5_asn1.KDC_REQ_BODY()) + else: + self.assertEqual(KRB_TGS_REQ, req_msg_type) + checksum_blob = tgs_req + + checksum = self.Checksum_create(armor_key, + KU_FAST_REQ_CHKSUM, + checksum_blob) + + fast_padata += additional_padata + fast = generate_fast_fn(kdc_exchange_dict, + callback_dict, + inner_req_body, + fast_padata, + fast_armor, + checksum) + else: + fast = None + + padata = [] + + if tgs_req_padata is not None: + padata.append(tgs_req_padata) + + if fast is not None: + padata.append(fast) + + if outer_padata is not None: + padata += outer_padata + + if fast is None: + padata += additional_padata + + if not padata: + padata = None + + kdc_exchange_dict['req_padata'] = padata + kdc_exchange_dict['fast_padata'] = fast_padata + kdc_exchange_dict['req_body'] = inner_req_body + + req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type, + padata=padata, + req_body=req_body, + asn1Spec=req_asn1Spec()) + + kdc_exchange_dict['req_obj'] = req_obj + + to_rodc = kdc_exchange_dict['to_rodc'] + + rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc) + self.assertIsNotNone(rep) + + msg_type = self.getElementValue(rep, 'msg-type') + self.assertIsNotNone(msg_type) + + expected_msg_type = None + if check_error_fn is not None: + expected_msg_type = KRB_ERROR + self.assertIsNone(check_rep_fn) + self.assertNotEqual(0, len(expected_error_mode)) + self.assertNotIn(0, expected_error_mode) + if check_rep_fn is not None: + expected_msg_type = rep_msg_type + self.assertIsNone(check_error_fn) + self.assertEqual(0, len(expected_error_mode)) + self.assertIsNotNone(expected_msg_type) + if msg_type == KRB_ERROR: + error_code = self.getElementValue(rep, 'error-code') + fail_msg = f'Got unexpected error: {error_code}' + else: + fail_msg = f'Expected to fail with error: {expected_error_mode}' + self.assertEqual(msg_type, expected_msg_type, fail_msg) + + if msg_type == KRB_ERROR: + return check_error_fn(kdc_exchange_dict, + callback_dict, + rep) + + return check_rep_fn(kdc_exchange_dict, callback_dict, rep) + + def as_exchange_dict(self, + creds=None, + client_cert=None, + expected_crealm=None, + expected_cname=None, + expected_anon=False, + expected_srealm=None, + expected_sname=None, + expected_account_name=None, + expected_groups=None, + unexpected_groups=None, + expected_upn_name=None, + expected_sid=None, + expected_requester_sid=None, + expected_domain_sid=None, + expected_device_domain_sid=None, + expected_supported_etypes=None, + expected_flags=None, + unexpected_flags=None, + ticket_decryption_key=None, + expect_ticket_checksum=None, + expect_full_checksum=None, + generate_fast_fn=None, + generate_fast_armor_fn=None, + generate_fast_padata_fn=None, + fast_armor_type=FX_FAST_ARMOR_AP_REQUEST, + generate_padata_fn=None, + check_error_fn=None, + check_rep_fn=None, + check_kdc_private_fn=None, + check_patypes=True, + callback_dict=None, + expected_error_mode=0, + expect_status=None, + expected_status=None, + expected_salt=None, + authenticator_subkey=None, + preauth_key=None, + armor_key=None, + armor_tgt=None, + armor_subkey=None, + auth_data=None, + kdc_options='', + inner_req=None, + outer_req=None, + pac_request=None, + pac_options=None, + ap_options=None, + fast_ap_options=None, + strict_edata_checking=True, + using_pkinit=PkInit.NOT_USED, + pk_nonce=None, + expect_edata=None, + expect_pac=True, + expect_client_claims=None, + expect_device_info=None, + expect_device_claims=None, + expect_upn_dns_info_ex=None, + expect_pac_attrs=None, + expect_pac_attrs_pac_request=None, + expect_requester_sid=None, + rc4_support=True, + expected_client_claims=None, + unexpected_client_claims=None, + expected_device_claims=None, + unexpected_device_claims=None, + expect_resource_groups_flag=None, + expected_device_groups=None, + expected_extra_pac_buffers=None, + to_rodc=False): + if expected_error_mode == 0: + expected_error_mode = () + elif not isinstance(expected_error_mode, collections.abc.Container): + expected_error_mode = (expected_error_mode,) + + kdc_exchange_dict = { + 'req_msg_type': KRB_AS_REQ, + 'req_asn1Spec': krb5_asn1.AS_REQ, + 'rep_msg_type': KRB_AS_REP, + 'rep_asn1Spec': krb5_asn1.AS_REP, + 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart, + 'creds': creds, + 'client_cert': client_cert, + 'expected_crealm': expected_crealm, + 'expected_cname': expected_cname, + 'expected_anon': expected_anon, + 'expected_srealm': expected_srealm, + 'expected_sname': expected_sname, + 'expected_account_name': expected_account_name, + 'expected_groups': expected_groups, + 'unexpected_groups': unexpected_groups, + 'expected_upn_name': expected_upn_name, + 'expected_sid': expected_sid, + 'expected_requester_sid': expected_requester_sid, + 'expected_domain_sid': expected_domain_sid, + 'expected_device_domain_sid': expected_device_domain_sid, + 'expected_supported_etypes': expected_supported_etypes, + 'expected_flags': expected_flags, + 'unexpected_flags': unexpected_flags, + 'ticket_decryption_key': ticket_decryption_key, + 'expect_ticket_checksum': expect_ticket_checksum, + 'expect_full_checksum': expect_full_checksum, + 'generate_fast_fn': generate_fast_fn, + 'generate_fast_armor_fn': generate_fast_armor_fn, + 'generate_fast_padata_fn': generate_fast_padata_fn, + 'fast_armor_type': fast_armor_type, + 'generate_padata_fn': generate_padata_fn, + 'check_error_fn': check_error_fn, + 'check_rep_fn': check_rep_fn, + 'check_kdc_private_fn': check_kdc_private_fn, + 'check_patypes': check_patypes, + 'callback_dict': callback_dict, + 'expected_error_mode': expected_error_mode, + 'expect_status': expect_status, + 'expected_status': expected_status, + 'expected_salt': expected_salt, + 'authenticator_subkey': authenticator_subkey, + 'preauth_key': preauth_key, + 'armor_key': armor_key, + 'armor_tgt': armor_tgt, + 'armor_subkey': armor_subkey, + 'auth_data': auth_data, + 'kdc_options': kdc_options, + 'inner_req': inner_req, + 'outer_req': outer_req, + 'pac_request': pac_request, + 'pac_options': pac_options, + 'ap_options': ap_options, + 'fast_ap_options': fast_ap_options, + 'strict_edata_checking': strict_edata_checking, + 'using_pkinit': using_pkinit, + 'pk_nonce': pk_nonce, + 'expect_edata': expect_edata, + 'expect_pac': expect_pac, + 'expect_client_claims': expect_client_claims, + 'expect_device_info': expect_device_info, + 'expect_device_claims': expect_device_claims, + 'expect_upn_dns_info_ex': expect_upn_dns_info_ex, + 'expect_pac_attrs': expect_pac_attrs, + 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request, + 'expect_requester_sid': expect_requester_sid, + 'rc4_support': rc4_support, + 'expected_client_claims': expected_client_claims, + 'unexpected_client_claims': unexpected_client_claims, + 'expected_device_claims': expected_device_claims, + 'unexpected_device_claims': unexpected_device_claims, + 'expect_resource_groups_flag': expect_resource_groups_flag, + 'expected_device_groups': expected_device_groups, + 'expected_extra_pac_buffers': expected_extra_pac_buffers, + 'to_rodc': to_rodc + } + if callback_dict is None: + callback_dict = {} + + return kdc_exchange_dict + + def tgs_exchange_dict(self, + creds=None, + expected_crealm=None, + expected_cname=None, + expected_anon=False, + expected_srealm=None, + expected_sname=None, + expected_account_name=None, + expected_groups=None, + unexpected_groups=None, + expected_upn_name=None, + expected_sid=None, + expected_requester_sid=None, + expected_domain_sid=None, + expected_device_domain_sid=None, + expected_supported_etypes=None, + expected_flags=None, + unexpected_flags=None, + ticket_decryption_key=None, + expect_ticket_checksum=None, + expect_full_checksum=None, + generate_fast_fn=None, + generate_fast_armor_fn=None, + generate_fast_padata_fn=None, + fast_armor_type=FX_FAST_ARMOR_AP_REQUEST, + generate_padata_fn=None, + check_error_fn=None, + check_rep_fn=None, + check_kdc_private_fn=None, + check_patypes=True, + expected_error_mode=0, + expect_status=None, + expected_status=None, + callback_dict=None, + tgt=None, + armor_key=None, + armor_tgt=None, + armor_subkey=None, + authenticator_subkey=None, + auth_data=None, + body_checksum_type=None, + kdc_options='', + inner_req=None, + outer_req=None, + pac_request=None, + pac_options=None, + ap_options=None, + fast_ap_options=None, + strict_edata_checking=True, + expect_edata=None, + expect_pac=True, + expect_client_claims=None, + expect_device_info=None, + expect_device_claims=None, + expect_upn_dns_info_ex=None, + expect_pac_attrs=None, + expect_pac_attrs_pac_request=None, + expect_requester_sid=None, + expected_proxy_target=None, + expected_transited_services=None, + rc4_support=True, + expected_client_claims=None, + unexpected_client_claims=None, + expected_device_claims=None, + unexpected_device_claims=None, + expect_resource_groups_flag=None, + expected_device_groups=None, + expected_extra_pac_buffers=None, + to_rodc=False): + if expected_error_mode == 0: + expected_error_mode = () + elif not isinstance(expected_error_mode, collections.abc.Container): + expected_error_mode = (expected_error_mode,) + + kdc_exchange_dict = { + 'req_msg_type': KRB_TGS_REQ, + 'req_asn1Spec': krb5_asn1.TGS_REQ, + 'rep_msg_type': KRB_TGS_REP, + 'rep_asn1Spec': krb5_asn1.TGS_REP, + 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart, + 'creds': creds, + 'expected_crealm': expected_crealm, + 'expected_cname': expected_cname, + 'expected_anon': expected_anon, + 'expected_srealm': expected_srealm, + 'expected_sname': expected_sname, + 'expected_account_name': expected_account_name, + 'expected_groups': expected_groups, + 'unexpected_groups': unexpected_groups, + 'expected_upn_name': expected_upn_name, + 'expected_sid': expected_sid, + 'expected_requester_sid': expected_requester_sid, + 'expected_domain_sid': expected_domain_sid, + 'expected_device_domain_sid': expected_device_domain_sid, + 'expected_supported_etypes': expected_supported_etypes, + 'expected_flags': expected_flags, + 'unexpected_flags': unexpected_flags, + 'ticket_decryption_key': ticket_decryption_key, + 'expect_ticket_checksum': expect_ticket_checksum, + 'expect_full_checksum': expect_full_checksum, + 'generate_fast_fn': generate_fast_fn, + 'generate_fast_armor_fn': generate_fast_armor_fn, + 'generate_fast_padata_fn': generate_fast_padata_fn, + 'fast_armor_type': fast_armor_type, + 'generate_padata_fn': generate_padata_fn, + 'check_error_fn': check_error_fn, + 'check_rep_fn': check_rep_fn, + 'check_kdc_private_fn': check_kdc_private_fn, + 'check_patypes': check_patypes, + 'callback_dict': callback_dict, + 'expected_error_mode': expected_error_mode, + 'expect_status': expect_status, + 'expected_status': expected_status, + 'tgt': tgt, + 'body_checksum_type': body_checksum_type, + 'armor_key': armor_key, + 'armor_tgt': armor_tgt, + 'armor_subkey': armor_subkey, + 'auth_data': auth_data, + 'authenticator_subkey': authenticator_subkey, + 'kdc_options': kdc_options, + 'inner_req': inner_req, + 'outer_req': outer_req, + 'pac_request': pac_request, + 'pac_options': pac_options, + 'ap_options': ap_options, + 'fast_ap_options': fast_ap_options, + 'strict_edata_checking': strict_edata_checking, + 'expect_edata': expect_edata, + 'expect_pac': expect_pac, + 'expect_client_claims': expect_client_claims, + 'expect_device_info': expect_device_info, + 'expect_device_claims': expect_device_claims, + 'expect_upn_dns_info_ex': expect_upn_dns_info_ex, + 'expect_pac_attrs': expect_pac_attrs, + 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request, + 'expect_requester_sid': expect_requester_sid, + 'expected_proxy_target': expected_proxy_target, + 'expected_transited_services': expected_transited_services, + 'rc4_support': rc4_support, + 'expected_client_claims': expected_client_claims, + 'unexpected_client_claims': unexpected_client_claims, + 'expected_device_claims': expected_device_claims, + 'unexpected_device_claims': unexpected_device_claims, + 'expect_resource_groups_flag': expect_resource_groups_flag, + 'expected_device_groups': expected_device_groups, + 'expected_extra_pac_buffers': expected_extra_pac_buffers, + 'to_rodc': to_rodc + } + if callback_dict is None: + callback_dict = {} + + return kdc_exchange_dict + + def generic_check_kdc_rep(self, + kdc_exchange_dict, + callback_dict, + rep): + + expected_crealm = kdc_exchange_dict['expected_crealm'] + expected_anon = kdc_exchange_dict['expected_anon'] + expected_srealm = kdc_exchange_dict['expected_srealm'] + expected_sname = kdc_exchange_dict['expected_sname'] + ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key'] + check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn'] + rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec'] + msg_type = kdc_exchange_dict['rep_msg_type'] + armor_key = kdc_exchange_dict['armor_key'] + + self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP + padata = self.getElementValue(rep, 'padata') + if self.strict_checking: + self.assertElementEqualUTF8(rep, 'crealm', expected_crealm) + if self.cname_checking: + if expected_anon: + expected_cname = self.PrincipalName_create( + name_type=NT_WELLKNOWN, + names=['WELLKNOWN', 'ANONYMOUS']) + else: + expected_cname = kdc_exchange_dict['expected_cname'] + self.assertElementEqualPrincipal(rep, 'cname', expected_cname) + self.assertElementPresent(rep, 'ticket') + ticket = self.getElementValue(rep, 'ticket') + ticket_encpart = None + ticket_cipher = None + self.assertIsNotNone(ticket) + if ticket is not None: # Never None, but gives indentation + self.assertElementEqual(ticket, 'tkt-vno', 5) + self.assertElementEqualUTF8(ticket, 'realm', expected_srealm) + self.assertElementEqualPrincipal(ticket, 'sname', expected_sname) + self.assertElementPresent(ticket, 'enc-part') + ticket_encpart = self.getElementValue(ticket, 'enc-part') + self.assertIsNotNone(ticket_encpart) + if ticket_encpart is not None: # Never None, but gives indentation + self.assertElementPresent(ticket_encpart, 'etype') + + kdc_options = kdc_exchange_dict['kdc_options'] + pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1 + expect_kvno = (pos >= len(kdc_options) + or kdc_options[pos] != '1') + if expect_kvno: + # 'unspecified' means present, with any value != 0 + self.assertElementKVNO(ticket_encpart, 'kvno', + self.unspecified_kvno) + else: + # For user-to-user, don't expect a kvno. + self.assertElementMissing(ticket_encpart, 'kvno') + + self.assertElementPresent(ticket_encpart, 'cipher') + ticket_cipher = self.getElementValue(ticket_encpart, 'cipher') + self.assertElementPresent(rep, 'enc-part') + encpart = self.getElementValue(rep, 'enc-part') + encpart_cipher = None + self.assertIsNotNone(encpart) + if encpart is not None: # Never None, but gives indentation + self.assertElementPresent(encpart, 'etype') + self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect') + self.assertElementPresent(encpart, 'cipher') + encpart_cipher = self.getElementValue(encpart, 'cipher') + + if self.padata_checking: + self.check_reply_padata(kdc_exchange_dict, + callback_dict, + encpart, + padata) + + ticket_checksum = None + + # Get the decryption key for the encrypted part + encpart_decryption_key, encpart_decryption_usage = ( + self.get_preauth_key(kdc_exchange_dict)) + + pa_dict = self.get_pa_dict(padata) + + pk_as_rep = pa_dict.get(PADATA_PK_AS_REP) + if pk_as_rep is not None: + pk_as_rep_asn1_spec = krb5_asn1.PA_PK_AS_REP + reply_key_pack_asn1_spec = krb5_asn1.ReplyKeyPack + pk_win2k = False + else: + pk_as_rep = pa_dict.get(PADATA_PK_AS_REP_19) + pk_as_rep_asn1_spec = krb5_asn1.PA_PK_AS_REP_Win2k + reply_key_pack_asn1_spec = krb5_asn1.ReplyKeyPack_Win2k + pk_win2k = True + if pk_as_rep is not None: + pk_as_rep = self.der_decode(pk_as_rep, + asn1Spec=pk_as_rep_asn1_spec()) + + using_pkinit = kdc_exchange_dict['using_pkinit'] + if using_pkinit is PkInit.PUBLIC_KEY: + content_info = self.der_decode( + pk_as_rep['encKeyPack'], + asn1Spec=krb5_asn1.ContentInfo()) + self.assertEqual(str(krb5_asn1.id_envelopedData), + content_info['contentType']) + + content = self.der_decode(content_info['content'], + asn1Spec=krb5_asn1.EnvelopedData()) + + self.assertEqual(0, content['version']) + originator_info = content['originatorInfo'] + self.assertFalse(originator_info.get('certs')) + self.assertFalse(originator_info.get('crls')) + self.assertFalse(content.get('unprotectedAttrs')) + + encrypted_content_info = content['encryptedContentInfo'] + recipient_infos = content['recipientInfos'] + + self.assertEqual(1, len(recipient_infos)) + ktri = recipient_infos[0]['ktri'] + + if self.strict_checking: + self.assertEqual(0, ktri['version']) + + private_key = encpart_decryption_key + self.assertIsInstance(private_key, + asymmetric.rsa.RSAPrivateKey) + + client_subject_key_id = ( + x509.SubjectKeyIdentifier.from_public_key( + private_key.public_key())) + + # Check that the client certificate is named as the recipient. + ktri_rid = ktri['rid'] + try: + issuer_and_serial_number = ktri_rid[ + 'issuerAndSerialNumber'] + except KeyError: + subject_key_id = ktri_rid['subjectKeyIdentifier'] + self.assertEqual(subject_key_id, + client_subject_key_id.digest) + else: + client_certificate = kdc_exchange_dict['client_cert'] + + self.assertIsNotNone(issuer_and_serial_number['issuer']) + self.assertEqual(issuer_and_serial_number['serialNumber'], + client_certificate.serial_number) + + key_encryption_algorithm = ktri['keyEncryptionAlgorithm'] + self.assertEqual(str(krb5_asn1.rsaEncryption), + key_encryption_algorithm['algorithm']) + if self.strict_checking: + self.assertEqual( + b'\x05\x00', + key_encryption_algorithm.get('parameters')) + + encrypted_key = ktri['encryptedKey'] + + # Decrypt the key. + pad_len = 256 - len(encrypted_key) + if pad_len: + encrypted_key = bytes(pad_len) + encrypted_key + decrypted_key = private_key.decrypt( + encrypted_key, + padding=asymmetric.padding.PKCS1v15()) + + self.assertEqual(str(krb5_asn1.id_signedData), + encrypted_content_info['contentType']) + + encrypted_content = encrypted_content_info['encryptedContent'] + encryption_algorithm = encrypted_content_info[ + 'contentEncryptionAlgorithm'] + + cipher_algorithm = self.cipher_from_algorithm(encryption_algorithm['algorithm']) + + # This will serve as the IV. + parameters = self.der_decode( + encryption_algorithm['parameters'], + asn1Spec=krb5_asn1.CMSCBCParameter()) + + # Decrypt the content. + cipher = Cipher(cipher_algorithm(decrypted_key), + modes.CBC(parameters), + default_backend()) + decryptor = cipher.decryptor() + decrypted_content = decryptor.update(encrypted_content) + decrypted_content += decryptor.finalize() + + # The padding doesn’t fully comply to PKCS7 with a specified + # blocksize, so we must unpad the data ourselves. + decrypted_content = self.unpad(decrypted_content) + + signed_data = None + signed_data_rfc2315 = None + + first_tag = decrypted_content[0] + if first_tag == 0x30: # ASN.1 SEQUENCE tag + signed_data = decrypted_content + else: + # Windows encodes the ASN.1 incorrectly, neglecting to add + # the SEQUENCE tag. We’ll have to prepend it ourselves in + # order for the decoding to work. + encoded_len = self.asn1_length(decrypted_content) + decrypted_content = bytes([0x30]) + encoded_len + ( + decrypted_content) + + if first_tag == 0x02: # ASN.1 INTEGER tag + + # The INTEGER tag indicates that the data is encoded + # with the earlier variant of the SignedData ASN.1 + # schema specified in RFC2315, as per [MS-PKCA] 2.2.4 + # (PA-PK-AS-REP). + signed_data_rfc2315 = decrypted_content + + elif first_tag == 0x06: # ASN.1 OBJECT IDENTIFIER tag + + # The OBJECT IDENTIFIER tag indicates that the data is + # encoded as SignedData and wrapped in a ContentInfo + # structure, which we shall have to decode first. This + # seems to be the case when the supportedCMSTypes field + # in the client’s AuthPack is missing or empty. + + content_info = self.der_decode( + decrypted_content, + asn1Spec=krb5_asn1.ContentInfo()) + self.assertEqual(str(krb5_asn1.id_signedData), + content_info['contentType']) + signed_data = content_info['content'] + else: + self.fail(f'got reply with unknown initial tag ' + f'({first_tag})') + + if signed_data is not None: + signed_data = self.der_decode( + signed_data, asn1Spec=krb5_asn1.SignedData()) + + encap_content_info = signed_data['encapContentInfo'] + + content_type = encap_content_info['eContentType'] + content = encap_content_info['eContent'] + elif signed_data_rfc2315 is not None: + signed_data = self.der_decode( + signed_data_rfc2315, + asn1Spec=krb5_asn1.SignedData_RFC2315()) + + encap_content_info = signed_data['contentInfo'] + + content_type = encap_content_info['contentType'] + content = self.der_decode( + encap_content_info['content'], + asn1Spec=pyasn1.type.univ.OctetString()) + else: + self.fail('we must have got SignedData') + + self.assertEqual(str(krb5_asn1.id_pkinit_rkeyData), + content_type) + reply_key_pack = self.der_decode( + content, asn1Spec=reply_key_pack_asn1_spec()) + + req_obj = kdc_exchange_dict['req_obj'] + req_asn1Spec = kdc_exchange_dict['req_asn1Spec'] + req_obj = self.der_encode(req_obj, + asn1Spec=req_asn1Spec()) + + reply_key = reply_key_pack['replyKey'] + + # Reply the encpart decryption key with the decrypted key from + # the reply. + encpart_decryption_key = self.SessionKey_create( + etype=reply_key['keytype'], + contents=reply_key['keyvalue'], + kvno=None) + + if not pk_win2k: + as_checksum = reply_key_pack['asChecksum'] + + # Verify the checksum over the AS request body. + kcrypto.verify_checksum(as_checksum['cksumtype'], + encpart_decryption_key.key, + KU_PKINIT_AS_REQ, + req_obj, + as_checksum['checksum']) + elif using_pkinit is PkInit.DIFFIE_HELLMAN: + content_info = self.der_decode( + pk_as_rep['dhInfo']['dhSignedData'], + asn1Spec=krb5_asn1.ContentInfo()) + self.assertEqual(str(krb5_asn1.id_signedData), + content_info['contentType']) + + signed_data = self.der_decode(content_info['content'], + asn1Spec=krb5_asn1.SignedData()) + + encap_content_info = signed_data['encapContentInfo'] + content = encap_content_info['eContent'] + + self.assertEqual(str(krb5_asn1.id_pkinit_DHKeyData), + encap_content_info['eContentType']) + + dh_key_info = self.der_decode( + content, asn1Spec=krb5_asn1.KDCDHKeyInfo()) + + self.assertNotIn('dhKeyExpiration', dh_key_info) + + dh_private_key = encpart_decryption_key + self.assertIsInstance(dh_private_key, + asymmetric.dh.DHPrivateKey) + + self.assertElementEqual(dh_key_info, 'nonce', + kdc_exchange_dict['pk_nonce']) + + dh_public_key_data = self.bytes_from_bit_string( + dh_key_info['subjectPublicKey']) + dh_public_key_decoded = self.der_decode( + dh_public_key_data, asn1Spec=krb5_asn1.DHPublicKey()) + + dh_numbers = dh_private_key.parameters().parameter_numbers() + + public_numbers = asymmetric.dh.DHPublicNumbers( + dh_public_key_decoded, dh_numbers) + dh_public_key = public_numbers.public_key(default_backend()) + + # Perform the Diffie-Hellman key exchange. + shared_secret = dh_private_key.exchange(dh_public_key) + + # Pad the shared secret out to the length of ‘p’. + p_len = self.length_in_bytes(dh_numbers.p) + padding_len = p_len - len(shared_secret) + self.assertGreaterEqual(padding_len, 0) + padded_shared_secret = bytes(padding_len) + shared_secret + + reply_key_enc_type = self.expected_etype(kdc_exchange_dict) + + # At the moment, we don’t specify a nonce in the request, so we + # can assume these are empty. + client_nonce = b'' + server_nonce = b'' + + ciphertext = padded_shared_secret + client_nonce + server_nonce + + # Replace the encpart decryption key with the key derived from + # the Diffie-Hellman key exchange. + encpart_decryption_key = self.octetstring2key( + ciphertext, reply_key_enc_type) + else: + self.fail(f'invalid value for using_pkinit: {using_pkinit}') + + self.assertEqual(3, signed_data['version']) + + digest_algorithms = signed_data['digestAlgorithms'] + self.assertEqual(1, len(digest_algorithms)) + digest_algorithm = digest_algorithms[0] + # Ensure the hash algorithm is valid. + _ = self.hash_from_algorithm_id(digest_algorithm) + + self.assertFalse(signed_data.get('crls')) + + signer_infos = signed_data['signerInfos'] + self.assertEqual(1, len(signer_infos)) + signer_info = signer_infos[0] + + self.assertEqual(1, signer_info['version']) + + # Get the certificate presented by the KDC. + kdc_certificates = signed_data['certificates'] + self.assertEqual(1, len(kdc_certificates)) + kdc_certificate = self.der_encode( + kdc_certificates[0], asn1Spec=krb5_asn1.CertificateChoices()) + kdc_certificate = x509.load_der_x509_certificate(kdc_certificate, + default_backend()) + + # Verify that the KDC’s certificate is named as the signer. + sid = signer_info['sid'] + try: + issuer_and_serial_number = sid['issuerAndSerialNumber'] + except KeyError: + extension = kdc_certificate.extensions.get_extension_for_oid( + x509.oid.ExtensionOID.SUBJECT_KEY_IDENTIFIER) + cert_subject_key_id = extension.value.digest + self.assertEqual(sid['subjectKeyIdentifier'], cert_subject_key_id) + else: + self.assertIsNotNone(issuer_and_serial_number['issuer']) + self.assertEqual(issuer_and_serial_number['serialNumber'], + kdc_certificate.serial_number) + + digest_algorithm = signer_info['digestAlgorithm'] + digest_hash_fn = self.hash_from_algorithm_id(digest_algorithm) + + signed_attrs = signer_info['signedAttrs'] + self.assertEqual(2, len(signed_attrs)) + + signed_attr0 = signed_attrs[0] + self.assertEqual(str(krb5_asn1.id_contentType), + signed_attr0['type']) + signed_attr0_values = signed_attr0['values'] + self.assertEqual(1, len(signed_attr0_values)) + signed_attr0_value = self.der_decode( + signed_attr0_values[0], + asn1Spec=krb5_asn1.ContentType()) + if using_pkinit is PkInit.DIFFIE_HELLMAN: + self.assertEqual(str(krb5_asn1.id_pkinit_DHKeyData), + signed_attr0_value) + else: + self.assertEqual(str(krb5_asn1.id_pkinit_rkeyData), + signed_attr0_value) + + signed_attr1 = signed_attrs[1] + self.assertEqual(str(krb5_asn1.id_messageDigest), + signed_attr1['type']) + signed_attr1_values = signed_attr1['values'] + self.assertEqual(1, len(signed_attr1_values)) + message_digest = self.der_decode(signed_attr1_values[0], + krb5_asn1.MessageDigest()) + + signature_algorithm = signer_info['signatureAlgorithm'] + hash_fn = self.hash_from_algorithm_id(signature_algorithm) + + # Compute the hash of the content to be signed. With the + # Diffie-Hellman key exchange, this signature is over the type + # KDCDHKeyInfo; otherwise, it is over the type ReplyKeyPack. + digest = hashes.Hash(digest_hash_fn(), default_backend()) + digest.update(content) + digest = digest.finalize() + + # Verify the hash. Note: this is a non–constant time comparison. + self.assertEqual(digest, message_digest) + + # Re-encode the attributes ready for verifying the signature. + cms_attrs = self.der_encode(signed_attrs, + asn1Spec=krb5_asn1.CMSAttributes()) + + # Verify the signature. + kdc_public_key = kdc_certificate.public_key() + kdc_public_key.verify( + signer_info['signature'], + cms_attrs, + asymmetric.padding.PKCS1v15(), + hash_fn()) + + self.assertFalse(signer_info.get('unsignedAttrs')) + + if armor_key is not None: + if PADATA_FX_FAST in pa_dict: + fx_fast_data = pa_dict[PADATA_FX_FAST] + fast_response = self.check_fx_fast_data(kdc_exchange_dict, + fx_fast_data, + armor_key, + finished=True) + + if 'strengthen-key' in fast_response: + strengthen_key = self.EncryptionKey_import( + fast_response['strengthen-key']) + encpart_decryption_key = ( + self.generate_strengthen_reply_key( + strengthen_key, + encpart_decryption_key)) + + fast_finished = fast_response.get('finished') + if fast_finished is not None: + ticket_checksum = fast_finished['ticket-checksum'] + + self.check_rep_padata(kdc_exchange_dict, + callback_dict, + fast_response['padata'], + error_code=0) + + ticket_private = None + if ticket_decryption_key is not None: + self.assertElementEqual(ticket_encpart, 'etype', + ticket_decryption_key.etype) + self.assertElementKVNO(ticket_encpart, 'kvno', + ticket_decryption_key.kvno) + ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, + ticket_cipher) + ticket_private = self.der_decode( + ticket_decpart, + asn1Spec=krb5_asn1.EncTicketPart()) + + encpart_private = None + self.assertIsNotNone(encpart_decryption_key) + if encpart_decryption_key is not None: + self.assertElementEqual(encpart, 'etype', + encpart_decryption_key.etype) + if self.strict_checking: + self.assertElementKVNO(encpart, 'kvno', + encpart_decryption_key.kvno) + rep_decpart = encpart_decryption_key.decrypt( + encpart_decryption_usage, + encpart_cipher) + # MIT KDC encodes both EncASRepPart and EncTGSRepPart with + # application tag 26 + try: + encpart_private = self.der_decode( + rep_decpart, + asn1Spec=rep_encpart_asn1Spec()) + except Exception: + encpart_private = self.der_decode( + rep_decpart, + asn1Spec=krb5_asn1.EncTGSRepPart()) + + kdc_exchange_dict['reply_key'] = encpart_decryption_key + + self.assertIsNotNone(check_kdc_private_fn) + if check_kdc_private_fn is not None: + check_kdc_private_fn(kdc_exchange_dict, callback_dict, + rep, ticket_private, encpart_private, + ticket_checksum) + + return rep + + def check_fx_fast_data(self, + kdc_exchange_dict, + fx_fast_data, + armor_key, + finished=False, + expect_strengthen_key=True): + fx_fast_data = self.der_decode(fx_fast_data, + asn1Spec=krb5_asn1.PA_FX_FAST_REPLY()) + + enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep'] + self.assertEqual(enc_fast_rep['etype'], armor_key.etype) + + fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher']) + + fast_response = self.der_decode(fast_rep, + asn1Spec=krb5_asn1.KrbFastResponse()) + + if expect_strengthen_key and self.strict_checking: + self.assertIn('strengthen-key', fast_response) + + if finished: + self.assertIn('finished', fast_response) + + # Ensure that the nonce matches the nonce in the body of the request + # (RFC6113 5.4.3). + nonce = kdc_exchange_dict['nonce'] + self.assertEqual(nonce, fast_response['nonce']) + + return fast_response + + def generic_check_kdc_private(self, + kdc_exchange_dict, + callback_dict, + rep, + ticket_private, + encpart_private, + ticket_checksum): + kdc_options = kdc_exchange_dict['kdc_options'] + canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1 + canonicalize = (canon_pos < len(kdc_options) + and kdc_options[canon_pos] == '1') + renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1 + renewable = (renewable_pos < len(kdc_options) + and kdc_options[renewable_pos] == '1') + renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1 + renew = (renew_pos < len(kdc_options) + and kdc_options[renew_pos] == '1') + expect_renew_till = renewable or renew + + expected_crealm = kdc_exchange_dict['expected_crealm'] + expected_cname = kdc_exchange_dict['expected_cname'] + expected_srealm = kdc_exchange_dict['expected_srealm'] + expected_sname = kdc_exchange_dict['expected_sname'] + ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key'] + + rep_msg_type = kdc_exchange_dict['rep_msg_type'] + + expected_flags = kdc_exchange_dict.get('expected_flags') + unexpected_flags = kdc_exchange_dict.get('unexpected_flags') + + ticket = self.getElementValue(rep, 'ticket') + + if ticket_checksum is not None: + armor_key = kdc_exchange_dict['armor_key'] + self.verify_ticket_checksum(ticket, ticket_checksum, armor_key) + + to_rodc = kdc_exchange_dict['to_rodc'] + if to_rodc: + krbtgt_creds = self.get_rodc_krbtgt_creds() + else: + krbtgt_creds = self.get_krbtgt_creds() + krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) + + krbtgt_keys = [krbtgt_key] + if not self.strict_checking: + krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds( + krbtgt_creds, + etype=kcrypto.Enctype.RC4) + krbtgt_keys.append(krbtgt_key_rc4) + + if self.expect_pac and self.is_tgs(expected_sname): + expect_pac = True + else: + expect_pac = kdc_exchange_dict['expect_pac'] + + ticket_session_key = None + if ticket_private is not None: + self.assertElementFlags(ticket_private, 'flags', + expected_flags, + unexpected_flags) + self.assertElementPresent(ticket_private, 'key') + ticket_key = self.getElementValue(ticket_private, 'key') + self.assertIsNotNone(ticket_key) + if ticket_key is not None: # Never None, but gives indentation + self.assertElementPresent(ticket_key, 'keytype') + self.assertElementPresent(ticket_key, 'keyvalue') + ticket_session_key = self.EncryptionKey_import(ticket_key) + self.assertElementEqualUTF8(ticket_private, 'crealm', + expected_crealm) + if self.cname_checking: + self.assertElementEqualPrincipal(ticket_private, 'cname', + expected_cname) + self.assertElementPresent(ticket_private, 'transited') + self.assertElementPresent(ticket_private, 'authtime') + if self.strict_checking: + self.assertElementPresent(ticket_private, 'starttime') + self.assertElementPresent(ticket_private, 'endtime') + if self.strict_checking: + if expect_renew_till: + self.assertElementPresent(ticket_private, 'renew-till') + else: + self.assertElementMissing(ticket_private, 'renew-till') + if self.strict_checking: + self.assertElementMissing(ticket_private, 'caddr') + if expect_pac is not None: + if expect_pac: + self.assertElementPresent(ticket_private, + 'authorization-data', + expect_empty=not expect_pac) + else: + # It is more correct to not have an authorization-data + # present than an empty one. + # + # https://github.com/krb5/krb5/pull/1225#issuecomment-995104193 + v = self.getElementValue(ticket_private, + 'authorization-data') + if v is not None: + self.assertElementPresent(ticket_private, + 'authorization-data', + expect_empty=True) + + encpart_session_key = None + if encpart_private is not None: + self.assertElementPresent(encpart_private, 'key') + encpart_key = self.getElementValue(encpart_private, 'key') + self.assertIsNotNone(encpart_key) + if encpart_key is not None: # Never None, but gives indentation + self.assertElementPresent(encpart_key, 'keytype') + self.assertElementPresent(encpart_key, 'keyvalue') + encpart_session_key = self.EncryptionKey_import(encpart_key) + self.assertElementPresent(encpart_private, 'last-req') + expected_nonce = kdc_exchange_dict.get('pk_nonce') + if not expected_nonce: + expected_nonce = kdc_exchange_dict['nonce'] + self.assertElementEqual(encpart_private, 'nonce', + expected_nonce) + if rep_msg_type == KRB_AS_REP: + if self.strict_checking: + self.assertElementPresent(encpart_private, + 'key-expiration') + else: + self.assertElementMissing(encpart_private, + 'key-expiration') + self.assertElementFlags(encpart_private, 'flags', + expected_flags, + unexpected_flags) + self.assertElementPresent(encpart_private, 'authtime') + if self.strict_checking: + self.assertElementPresent(encpart_private, 'starttime') + self.assertElementPresent(encpart_private, 'endtime') + if self.strict_checking: + if expect_renew_till: + self.assertElementPresent(encpart_private, 'renew-till') + else: + self.assertElementMissing(encpart_private, 'renew-till') + self.assertElementEqualUTF8(encpart_private, 'srealm', + expected_srealm) + self.assertElementEqualPrincipal(encpart_private, 'sname', + expected_sname) + if self.strict_checking: + self.assertElementMissing(encpart_private, 'caddr') + + sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict) + + sent_enc_pa_rep = self.sent_enc_pa_rep(kdc_exchange_dict) + + enc_padata = self.getElementValue(encpart_private, + 'encrypted-pa-data') + if (canonicalize or '1' in sent_pac_options or ( + rep_msg_type == KRB_AS_REP and sent_enc_pa_rep)): + if self.strict_checking: + self.assertIsNotNone(enc_padata) + + if enc_padata is not None: + enc_pa_dict = self.get_pa_dict(enc_padata) + if self.strict_checking: + if canonicalize: + self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict) + else: + self.assertNotIn(PADATA_SUPPORTED_ETYPES, + enc_pa_dict) + + if '1' in sent_pac_options: + self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict) + else: + self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict) + + if rep_msg_type == KRB_AS_REP and sent_enc_pa_rep: + self.assertIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict) + else: + self.assertNotIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict) + + if PADATA_SUPPORTED_ETYPES in enc_pa_dict: + expected_supported_etypes = kdc_exchange_dict[ + 'expected_supported_etypes'] + + (supported_etypes,) = struct.unpack( + '<L', + enc_pa_dict[PADATA_SUPPORTED_ETYPES]) + + ignore_bits = (security.KERB_ENCTYPE_DES_CBC_CRC | + security.KERB_ENCTYPE_DES_CBC_MD5) + + self.assertEqual( + supported_etypes & ~ignore_bits, + expected_supported_etypes & ~ignore_bits, + f'PADATA_SUPPORTED_ETYPES: got: {supported_etypes} (0x{supported_etypes:X}), ' + f'expected: {expected_supported_etypes} (0x{expected_supported_etypes:X})') + + if PADATA_PAC_OPTIONS in enc_pa_dict: + pac_options = self.der_decode( + enc_pa_dict[PADATA_PAC_OPTIONS], + asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) + + self.assertElementEqual(pac_options, 'options', + sent_pac_options) + + if PADATA_REQ_ENC_PA_REP in enc_pa_dict: + enc_pa_rep = enc_pa_dict[PADATA_REQ_ENC_PA_REP] + + enc_pa_rep = self.der_decode( + enc_pa_rep, + asn1Spec=krb5_asn1.Checksum()) + + reply_key = kdc_exchange_dict['reply_key'] + req_obj = kdc_exchange_dict['req_obj'] + req_asn1Spec = kdc_exchange_dict['req_asn1Spec'] + + req_obj = self.der_encode(req_obj, + asn1Spec=req_asn1Spec()) + + checksum = enc_pa_rep['checksum'] + ctype = enc_pa_rep['cksumtype'] + + reply_key.verify_checksum(KU_AS_REQ, + req_obj, + ctype, + checksum) + else: + if enc_padata is not None: + self.assertEqual(enc_padata, []) + + if ticket_session_key is not None and encpart_session_key is not None: + self.assertEqual(ticket_session_key.etype, + encpart_session_key.etype) + self.assertEqual(ticket_session_key.key.contents, + encpart_session_key.key.contents) + if encpart_session_key is not None: + session_key = encpart_session_key + else: + session_key = ticket_session_key + ticket_creds = KerberosTicketCreds( + ticket, + session_key, + crealm=expected_crealm, + cname=expected_cname, + srealm=expected_srealm, + sname=expected_sname, + decryption_key=ticket_decryption_key, + ticket_private=ticket_private, + encpart_private=encpart_private) + + if ticket_private is not None: + pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac) + if expect_pac is True: + self.assertIsNotNone(pac_data) + elif expect_pac is False: + self.assertIsNone(pac_data) + + if pac_data is not None: + self.check_pac_buffers(pac_data, kdc_exchange_dict) + + expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum'] + expect_full_checksum = kdc_exchange_dict['expect_full_checksum'] + if expect_ticket_checksum or expect_full_checksum: + self.assertIsNotNone(ticket_decryption_key) + + if ticket_decryption_key is not None: + service_ticket = (rep_msg_type == KRB_TGS_REP + and not self.is_tgs_principal(expected_sname)) + self.verify_ticket(ticket_creds, krbtgt_keys, + service_ticket=service_ticket, + expect_pac=expect_pac, + expect_ticket_checksum=expect_ticket_checksum + or self.tkt_sig_support, + expect_full_checksum=expect_full_checksum + or self.full_sig_support) + + kdc_exchange_dict['rep_ticket_creds'] = ticket_creds + + # Check the SIDs in a LOGON_INFO PAC buffer. + def check_logon_info_sids(self, logon_info_buffer, kdc_exchange_dict): + info3 = logon_info_buffer.info.info.info3 + logon_info = info3.base + resource_groups = logon_info_buffer.info.info.resource_groups + + expected_groups = kdc_exchange_dict['expected_groups'] + unexpected_groups = kdc_exchange_dict['unexpected_groups'] + expected_domain_sid = kdc_exchange_dict['expected_domain_sid'] + expected_sid = kdc_exchange_dict['expected_sid'] + + domain_sid = logon_info.domain_sid + if expected_domain_sid is not None: + self.assertEqual(expected_domain_sid, str(domain_sid)) + + if expected_sid is not None: + got_sid = f'{domain_sid}-{logon_info.rid}' + self.assertEqual(expected_sid, got_sid) + + if expected_groups is None and unexpected_groups is None: + # Nothing more to do. + return + + # Check the SIDs in the PAC. + + # Form a representation of the PAC, containing at first the primary + # GID. + primary_sid = f'{domain_sid}-{logon_info.primary_gid}' + pac_sids = { + (primary_sid, self.SidType.PRIMARY_GID, None), + } + + # Collect the Extra SIDs. + if info3.sids is not None: + self.assertTrue(logon_info.user_flags & ( + netlogon.NETLOGON_EXTRA_SIDS), + 'extra SIDs present, but EXTRA_SIDS flag not set') + self.assertTrue(info3.sids, 'got empty SIDs') + + for sid_attr in info3.sids: + got_sid = str(sid_attr.sid) + if unexpected_groups is not None: + self.assertNotIn(got_sid, unexpected_groups) + + pac_sid = (got_sid, + self.SidType.EXTRA_SID, + sid_attr.attributes) + self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID') + pac_sids.add(pac_sid) + else: + self.assertFalse(logon_info.user_flags & ( + netlogon.NETLOGON_EXTRA_SIDS), + 'no extra SIDs present, but EXTRA_SIDS flag set') + + # Collect the Base RIDs. + if logon_info.groups.rids is not None: + self.assertTrue(logon_info.groups.rids, 'got empty RIDs') + + for group in logon_info.groups.rids: + got_sid = f'{domain_sid}-{group.rid}' + if unexpected_groups is not None: + self.assertNotIn(got_sid, unexpected_groups) + + pac_sid = (got_sid, self.SidType.BASE_SID, group.attributes) + self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID') + pac_sids.add(pac_sid) + + # Collect the Resource SIDs. + expect_resource_groups_flag = kdc_exchange_dict[ + 'expect_resource_groups_flag'] + expect_set_reason = '' + expect_reset_reason = '' + if expect_resource_groups_flag is None: + expect_resource_groups_flag = ( + resource_groups.groups.rids is not None) + expect_set_reason = 'resource groups present, but ' + expect_reset_reason = 'no resource groups present, but ' + + if expect_resource_groups_flag: + self.assertTrue( + logon_info.user_flags & netlogon.NETLOGON_RESOURCE_GROUPS, + f'{expect_set_reason}RESOURCE_GROUPS flag unexpectedly reset') + else: + self.assertFalse( + logon_info.user_flags & netlogon.NETLOGON_RESOURCE_GROUPS, + f'{expect_reset_reason}RESOURCE_GROUPS flag unexpectedly set') + + if resource_groups.groups.rids is not None: + self.assertTrue(resource_groups.groups.rids, 'got empty RIDs') + + resource_group_sid = resource_groups.domain_sid + for resource_group in resource_groups.groups.rids: + got_sid = f'{resource_group_sid}-{resource_group.rid}' + if unexpected_groups is not None: + self.assertNotIn(got_sid, unexpected_groups) + + pac_sid = (got_sid, + self.SidType.RESOURCE_SID, + resource_group.attributes) + self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID') + pac_sids.add(pac_sid) + + # Compare the aggregated SIDs against the set of expected SIDs. + if expected_groups is not None: + if ... in expected_groups: + # The caller is only interested in asserting the + # presence of particular groups, and doesn't mind if + # other groups are present as well. + pac_sids.add(...) + self.assertLessEqual(expected_groups, pac_sids, + 'expected groups') + else: + # The caller wants to make sure the groups match + # exactly. + self.assertEqual(expected_groups, pac_sids, + 'expected != got') + + def check_device_info(self, device_info, kdc_exchange_dict): + armor_tgt = kdc_exchange_dict['armor_tgt'] + armor_auth_data = armor_tgt.ticket_private.get( + 'authorization-data') + self.assertIsNotNone(armor_auth_data, + 'missing authdata for armor TGT') + armor_pac_data = self.get_pac(armor_auth_data) + armor_pac = ndr_unpack(krb5pac.PAC_DATA, armor_pac_data) + for armor_pac_buffer in armor_pac.buffers: + if armor_pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO: + armor_info = armor_pac_buffer.info.info.info3 + break + else: + self.fail('missing logon info for armor PAC') + self.assertEqual(armor_info.base.rid, device_info.rid) + + device_domain_sid = kdc_exchange_dict['expected_device_domain_sid'] + expected_device_groups = kdc_exchange_dict['expected_device_groups'] + if kdc_exchange_dict['expect_device_info']: + self.assertIsNotNone(device_domain_sid) + self.assertIsNotNone(expected_device_groups) + + if device_domain_sid is not None: + self.assertEqual(device_domain_sid, str(device_info.domain_sid)) + else: + device_domain_sid = str(device_info.domain_sid) + + # Check the device info SIDs. + + # A representation of the device info groups. + primary_sid = f'{device_domain_sid}-{device_info.primary_gid}' + got_sids = { + (primary_sid, self.SidType.PRIMARY_GID, None), + } + + # Collect the groups. + if device_info.groups.rids is not None: + self.assertTrue(device_info.groups.rids, 'got empty RIDs') + + for group in device_info.groups.rids: + got_sid = f'{device_domain_sid}-{group.rid}' + + device_sid = (got_sid, self.SidType.BASE_SID, group.attributes) + self.assertNotIn(device_sid, got_sids, 'got duplicated SID') + got_sids.add(device_sid) + + # Collect the SIDs. + if device_info.sids is not None: + self.assertTrue(device_info.sids, 'got empty SIDs') + + for sid_attr in device_info.sids: + got_sid = str(sid_attr.sid) + + in_a_domain = sid_attr.sid.num_auths == 5 and ( + str(sid_attr.sid).startswith('S-1-5-21-')) + self.assertFalse(in_a_domain, + f'got unexpected SID for domain: {got_sid} ' + f'(should be in device_info.domain_groups)') + + device_sid = (got_sid, + self.SidType.EXTRA_SID, + sid_attr.attributes) + self.assertNotIn(device_sid, got_sids, 'got duplicated SID') + got_sids.add(device_sid) + + # Collect the domain groups. + if device_info.domain_groups is not None: + self.assertTrue(device_info.domain_groups, 'got empty domain groups') + + for domain_group in device_info.domain_groups: + self.assertTrue(domain_group, 'got empty domain group') + + got_domain_sids = set() + + resource_group_sid = domain_group.domain_sid + + in_a_domain = resource_group_sid.num_auths == 4 and ( + str(resource_group_sid).startswith('S-1-5-21-')) + self.assertTrue( + in_a_domain, + f'got unexpected domain SID for non-domain: {resource_group_sid} ' + f'(should be in device_info.sids)') + + for resource_group in domain_group.groups.rids: + got_sid = f'{resource_group_sid}-{resource_group.rid}' + + device_sid = (got_sid, + self.SidType.RESOURCE_SID, + resource_group.attributes) + self.assertNotIn(device_sid, got_domain_sids, 'got duplicated SID') + got_domain_sids.add(device_sid) + + got_domain_sids = frozenset(got_domain_sids) + self.assertNotIn(got_domain_sids, got_sids) + got_sids.add(got_domain_sids) + + # Compare the aggregated device SIDs against the set of expected device + # SIDs. + if expected_device_groups is not None: + self.assertEqual(expected_device_groups, got_sids, + 'expected != got') + + def check_pac_buffers(self, pac_data, kdc_exchange_dict): + pac = ndr_unpack(krb5pac.PAC_DATA, pac_data) + + rep_msg_type = kdc_exchange_dict['rep_msg_type'] + armor_tgt = kdc_exchange_dict['armor_tgt'] + + compound_id = rep_msg_type == KRB_TGS_REP and armor_tgt is not None + + expected_sname = kdc_exchange_dict['expected_sname'] + expect_client_claims = kdc_exchange_dict['expect_client_claims'] + expect_device_info = kdc_exchange_dict['expect_device_info'] + expect_device_claims = kdc_exchange_dict['expect_device_claims'] + + expected_types = [krb5pac.PAC_TYPE_LOGON_INFO, + krb5pac.PAC_TYPE_SRV_CHECKSUM, + krb5pac.PAC_TYPE_KDC_CHECKSUM, + krb5pac.PAC_TYPE_LOGON_NAME, + krb5pac.PAC_TYPE_UPN_DNS_INFO] + + kdc_options = kdc_exchange_dict['kdc_options'] + pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1 + constrained_delegation = (pos < len(kdc_options) + and kdc_options[pos] == '1') + if constrained_delegation: + expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION) + + require_strict = set() + unchecked = set() + if not self.tkt_sig_support: + require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM) + if not self.full_sig_support: + require_strict.add(krb5pac.PAC_TYPE_FULL_CHECKSUM) + + expected_client_claims = kdc_exchange_dict['expected_client_claims'] + unexpected_client_claims = kdc_exchange_dict[ + 'unexpected_client_claims'] + + if self.kdc_claims_support and expect_client_claims: + expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO) + else: + self.assertFalse( + expected_client_claims, + 'expected client claims, but client claims not expected in ' + 'PAC') + self.assertFalse( + unexpected_client_claims, + 'unexpected client claims, but client claims not expected in ' + 'PAC') + + if expect_client_claims is None: + unchecked.add(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO) + + expected_device_claims = kdc_exchange_dict['expected_device_claims'] + unexpected_device_claims = kdc_exchange_dict['unexpected_device_claims'] + + expected_device_groups = kdc_exchange_dict['expected_device_groups'] + + if (self.kdc_claims_support and self.kdc_compound_id_support + and expect_device_claims and compound_id): + expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO) + else: + self.assertFalse( + expect_device_claims, + 'expected device claims buffer, but device claims not ' + 'expected in PAC') + self.assertFalse( + expected_device_claims, + 'expected device claims, but device claims not expected in ' + 'PAC') + self.assertFalse( + unexpected_device_claims, + 'unexpected device claims, but device claims not expected in ' + 'PAC') + + if expect_device_claims is None and compound_id: + unchecked.add(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO) + + if self.kdc_compound_id_support and compound_id and expect_device_info: + expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO) + else: + self.assertFalse(expect_device_info, + 'expected device info with no armor TGT or ' + 'for non-TGS request') + self.assertFalse(expected_device_groups, + 'expected device groups, but device info not ' + 'expected in PAC') + + if expect_device_info is None and compound_id: + unchecked.add(krb5pac.PAC_TYPE_DEVICE_INFO) + + if rep_msg_type == KRB_TGS_REP: + if not self.is_tgs_principal(expected_sname): + expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM) + expected_types.append(krb5pac.PAC_TYPE_FULL_CHECKSUM) + + expect_extra_pac_buffers = self.is_tgs(expected_sname) + + expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs'] + + if expect_pac_attrs: + expect_pac_attrs_pac_request = kdc_exchange_dict[ + 'expect_pac_attrs_pac_request'] + else: + expect_pac_attrs_pac_request = kdc_exchange_dict[ + 'pac_request'] + + if expect_pac_attrs is None: + if self.expect_extra_pac_buffers: + expect_pac_attrs = expect_extra_pac_buffers + else: + require_strict.add(krb5pac.PAC_TYPE_ATTRIBUTES_INFO) + if expect_pac_attrs: + expected_types.append(krb5pac.PAC_TYPE_ATTRIBUTES_INFO) + + expect_requester_sid = kdc_exchange_dict['expect_requester_sid'] + expected_requester_sid = kdc_exchange_dict['expected_requester_sid'] + + if expect_requester_sid is None: + if self.expect_extra_pac_buffers: + expect_requester_sid = expect_extra_pac_buffers + else: + require_strict.add(krb5pac.PAC_TYPE_REQUESTER_SID) + if expected_requester_sid is not None: + expect_requester_sid = True + if expect_requester_sid: + expected_types.append(krb5pac.PAC_TYPE_REQUESTER_SID) + + sent_pk_as_req = self.sent_pk_as_req(kdc_exchange_dict) or ( + self.sent_pk_as_req_win2k(kdc_exchange_dict)) + if sent_pk_as_req: + expected_types.append(krb5pac.PAC_TYPE_CREDENTIAL_INFO) + + expected_extra_pac_buffers = kdc_exchange_dict['expected_extra_pac_buffers'] + if expected_extra_pac_buffers is not None: + expected_types.extend(expected_extra_pac_buffers) + + buffer_types = [pac_buffer.type + for pac_buffer in pac.buffers] + self.assertSequenceElementsEqual( + expected_types, buffer_types, + require_ordered=False, + require_strict=require_strict, + unchecked=unchecked) + + expected_account_name = kdc_exchange_dict['expected_account_name'] + expected_sid = kdc_exchange_dict['expected_sid'] + + expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex'] + if expect_upn_dns_info_ex is None and ( + expected_account_name is not None + or expected_sid is not None): + expect_upn_dns_info_ex = True + + for pac_buffer in pac.buffers: + if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION: + expected_proxy_target = kdc_exchange_dict[ + 'expected_proxy_target'] + expected_transited_services = kdc_exchange_dict[ + 'expected_transited_services'] + + delegation_info = pac_buffer.info.info + + self.assertEqual(expected_proxy_target, + str(delegation_info.proxy_target)) + + transited_services = list(map( + str, delegation_info.transited_services)) + self.assertEqual(expected_transited_services, + transited_services) + + elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME: + expected_cname = kdc_exchange_dict['expected_cname'] + account_name = '/'.join(expected_cname['name-string']) + + self.assertEqual(account_name, pac_buffer.info.account_name) + + elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO: + info3 = pac_buffer.info.info.info3 + logon_info = info3.base + + if expected_account_name is not None: + self.assertEqual(expected_account_name, + str(logon_info.account_name)) + + self.check_logon_info_sids(pac_buffer, kdc_exchange_dict) + + elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO: + upn_dns_info = pac_buffer.info + upn_dns_info_ex = upn_dns_info.ex + + expected_realm = kdc_exchange_dict['expected_crealm'] + self.assertEqual(expected_realm, + upn_dns_info.dns_domain_name) + + expected_upn_name = kdc_exchange_dict['expected_upn_name'] + if expected_upn_name is not None: + self.assertEqual(expected_upn_name, + upn_dns_info.upn_name) + + if expect_upn_dns_info_ex: + self.assertIsNotNone(upn_dns_info_ex) + + if upn_dns_info_ex is not None: + if expected_account_name is not None: + self.assertEqual(expected_account_name, + upn_dns_info_ex.samaccountname) + + if expected_sid is not None: + self.assertEqual(expected_sid, + str(upn_dns_info_ex.objectsid)) + + elif (pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO + and expect_pac_attrs): + attr_info = pac_buffer.info + + self.assertEqual(2, attr_info.flags_length) + + flags = attr_info.flags + + requested_pac = bool(flags & 1) + given_pac = bool(flags & 2) + + self.assertEqual(expect_pac_attrs_pac_request is True, + requested_pac) + self.assertEqual(expect_pac_attrs_pac_request is None, + given_pac) + + elif (pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID + and expect_requester_sid): + requester_sid = pac_buffer.info.sid + + if expected_requester_sid is None: + expected_requester_sid = expected_sid + if expected_sid is not None: + self.assertEqual(expected_requester_sid, + str(requester_sid)) + + elif pac_buffer.type in {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO, + krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO}: + remaining = pac_buffer.info.remaining + + if pac_buffer.type == krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO: + claims_type = 'client claims' + expected_claims = expected_client_claims + unexpected_claims = unexpected_client_claims + else: + claims_type = 'device claims' + expected_claims = expected_device_claims + unexpected_claims = unexpected_device_claims + + if not remaining: + # Windows may produce an empty claims buffer. + self.assertFalse(expected_claims, + f'expected {claims_type}, but the PAC ' + f'buffer was empty') + continue + + if expected_claims: + empty_msg = f', and {claims_type} were expected' + else: + empty_msg = f' for {claims_type} (should be missing)' + + claims_metadata_ndr = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, + remaining) + claims_metadata = claims_metadata_ndr.claims.metadata + self.assertIsNotNone(claims_metadata, + f'got empty CLAIMS_SET_METADATA_NDR ' + f'inner structure {empty_msg}') + + self.assertIsNotNone(claims_metadata.claims_set, + f'got empty CLAIMS_SET_METADATA ' + f'structure {empty_msg}') + + uncompressed_size = claims_metadata.uncompressed_claims_set_size + compression_format = claims_metadata.compression_format + + if uncompressed_size < ( + claims.CLAIM_LOWER_COMPRESSION_THRESHOLD): + self.assertEqual(claims.CLAIMS_COMPRESSION_FORMAT_NONE, + compression_format, + f'{claims_type} unexpectedly ' + f'compressed ({uncompressed_size} ' + f'bytes uncompressed)') + elif uncompressed_size >= ( + claims.CLAIM_UPPER_COMPRESSION_THRESHOLD): + self.assertEqual( + claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF, + compression_format, + f'{claims_type} unexpectedly not compressed ' + f'({uncompressed_size} bytes uncompressed)') + + claims_set = claims_metadata.claims_set.claims.claims + self.assertIsNotNone(claims_set, + f'got empty CLAIMS_SET_NDR inner ' + f'structure {empty_msg}') + + claims_arrays = claims_set.claims_arrays + self.assertIsNotNone(claims_arrays, + f'got empty CLAIMS_SET structure ' + f'{empty_msg}') + self.assertGreater(len(claims_arrays), 0, + f'got empty claims array {empty_msg}') + self.assertEqual(len(claims_arrays), + claims_set.claims_array_count, + f'{claims_type} arrays size mismatch') + + got_claims = {} + + for claims_array in claims_arrays: + claim_entries = claims_array.claim_entries + self.assertIsNotNone(claim_entries, + f'got empty CLAIMS_ARRAY structure ' + f'{empty_msg}') + self.assertGreater(len(claim_entries), 0, + f'got empty claim entries array ' + f'{empty_msg}') + self.assertEqual(len(claim_entries), + claims_array.claims_count, + f'{claims_type} entries array size ' + f'mismatch') + + for entry in claim_entries: + if unexpected_claims is not None: + self.assertNotIn(entry.id, unexpected_claims, + f'got unexpected {claims_type} ' + f'in PAC') + if expected_claims is None: + continue + + expected_claim = expected_claims.get(entry.id) + if expected_claim is None: + continue + + self.assertNotIn(entry.id, got_claims, + f'got duplicate {claims_type}') + + self.assertIsNotNone(entry.values.values, + f'got {claims_type} with no ' + f'values') + self.assertGreater(len(entry.values.values), 0, + f'got empty {claims_type} values ' + f'array') + self.assertEqual(len(entry.values.values), + entry.values.value_count, + f'{claims_type} values array size ' + f'mismatch') + + expected_claim_values = expected_claim.get('values') + self.assertIsNotNone(expected_claim_values, + f'got expected {claims_type} ' + f'with no values') + + values = type(expected_claim_values)( + entry.values.values) + + got_claims[entry.id] = { + 'source_type': claims_array.claims_source_type, + 'type': entry.type, + 'values': values, + } + + self.assertEqual(expected_claims, got_claims or None, + f'{claims_type} did not match expectations') + + elif pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO: + device_info = pac_buffer.info.info + + self.check_device_info(device_info, kdc_exchange_dict) + + elif pac_buffer.type == krb5pac.PAC_TYPE_CREDENTIAL_INFO: + credential_info = pac_buffer.info + + expected_etype = self.expected_etype(kdc_exchange_dict) + + self.assertEqual(0, credential_info.version) + self.assertEqual(expected_etype, + credential_info.encryption_type) + + encrypted_data = credential_info.encrypted_data + reply_key = kdc_exchange_dict['reply_key'] + + data = reply_key.decrypt(KU_NON_KERB_SALT, encrypted_data) + + credential_data_ndr = ndr_unpack( + krb5pac.PAC_CREDENTIAL_DATA_NDR, data) + + credential_data = credential_data_ndr.ctr.data + + self.assertEqual(1, credential_data.credential_count) + self.assertEqual(credential_data.credential_count, + len(credential_data.credentials)) + + package = credential_data.credentials[0] + self.assertEqual('NTLM', str(package.package_name)) + + ntlm_blob = bytes(package.credential) + + ntlm_package = ndr_unpack(krb5pac.PAC_CREDENTIAL_NTLM_SECPKG, + ntlm_blob) + + self.assertEqual(0, ntlm_package.version) + self.assertEqual(krb5pac.PAC_CREDENTIAL_NTLM_HAS_NT_HASH, + ntlm_package.flags) + + creds = kdc_exchange_dict['creds'] + nt_password = bytes(ntlm_package.nt_password.hash) + self.assertEqual(creds.get_nt_hash(), nt_password) + + lm_password = bytes(ntlm_package.lm_password.hash) + self.assertEqual(bytes(16), lm_password) + + def generic_check_kdc_error(self, + kdc_exchange_dict, + callback_dict, + rep, + inner=False): + + rep_msg_type = kdc_exchange_dict['rep_msg_type'] + + expected_anon = kdc_exchange_dict['expected_anon'] + expected_srealm = kdc_exchange_dict['expected_srealm'] + expected_sname = kdc_exchange_dict['expected_sname'] + expected_error_mode = kdc_exchange_dict['expected_error_mode'] + + sent_fast = self.sent_fast(kdc_exchange_dict) + + fast_armor_type = kdc_exchange_dict['fast_armor_type'] + + self.assertElementEqual(rep, 'pvno', 5) + self.assertElementEqual(rep, 'msg-type', KRB_ERROR) + error_code = self.getElementValue(rep, 'error-code') + self.assertIn(error_code, expected_error_mode) + if self.strict_checking: + self.assertElementMissing(rep, 'ctime') + self.assertElementMissing(rep, 'cusec') + self.assertElementPresent(rep, 'stime') + self.assertElementPresent(rep, 'susec') + # error-code checked above + if expected_anon and not inner: + expected_cname = self.PrincipalName_create( + name_type=NT_WELLKNOWN, + names=['WELLKNOWN', 'ANONYMOUS']) + self.assertElementEqualPrincipal(rep, 'cname', expected_cname) + elif self.strict_checking: + self.assertElementMissing(rep, 'cname') + if self.strict_checking: + self.assertElementMissing(rep, 'crealm') + self.assertElementEqualUTF8(rep, 'realm', expected_srealm) + self.assertElementEqualPrincipal(rep, 'sname', expected_sname) + self.assertElementMissing(rep, 'e-text') + expect_status = kdc_exchange_dict['expect_status'] + expected_status = kdc_exchange_dict['expected_status'] + expect_edata = kdc_exchange_dict['expect_edata'] + if expect_edata is None: + expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS + and (not sent_fast or fast_armor_type is None + or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST) + and not inner) + if inner and expect_edata is self.expect_padata_outer: + expect_edata = False + if not expect_edata: + self.assertFalse(expect_status) + if self.strict_checking or expect_status is False: + self.assertElementMissing(rep, 'e-data') + return rep + edata = self.getElementValue(rep, 'e-data') + if self.strict_checking or expect_status: + self.assertIsNotNone(edata) + if edata is not None: + try: + error_data = self.der_decode( + edata, + asn1Spec=krb5_asn1.KERB_ERROR_DATA()) + except PyAsn1Error: + if expect_status: + # The test requires that the KDC be declared to support + # NTSTATUS values in e-data to proceed. + self.assertTrue( + self.expect_nt_status, + 'expected status code (which, according to ' + 'EXPECT_NT_STATUS=0, the KDC does not support)') + + self.fail('expected to get status code') + + rep_padata = self.der_decode( + edata, asn1Spec=krb5_asn1.METHOD_DATA()) + self.assertGreater(len(rep_padata), 0) + + if sent_fast: + self.assertEqual(1, len(rep_padata)) + rep_pa_dict = self.get_pa_dict(rep_padata) + self.assertIn(PADATA_FX_FAST, rep_pa_dict) + + armor_key = kdc_exchange_dict['armor_key'] + self.assertIsNotNone(armor_key) + fast_response = self.check_fx_fast_data( + kdc_exchange_dict, + rep_pa_dict[PADATA_FX_FAST], + armor_key, + expect_strengthen_key=False) + + rep_padata = fast_response['padata'] + + etype_info2 = self.check_rep_padata(kdc_exchange_dict, + callback_dict, + rep_padata, + error_code) + + kdc_exchange_dict['preauth_etype_info2'] = etype_info2 + else: + self.assertTrue(self.expect_nt_status, + 'got status code, but EXPECT_NT_STATUS=0') + + if expect_status is not None: + self.assertTrue(expect_status, + 'got unexpected status code') + + self.assertEqual(KERB_ERR_TYPE_EXTENDED, + error_data['data-type']) + + extended_error = error_data['data-value'] + + self.assertEqual(12, len(extended_error)) + + status = int.from_bytes(extended_error[:4], 'little') + flags = int.from_bytes(extended_error[8:], 'little') + + self.assertEqual(expected_status, status) + + if rep_msg_type == KRB_TGS_REP: + self.assertEqual(3, flags) + else: + self.assertEqual(1, flags) + + return rep + + def check_reply_padata(self, + kdc_exchange_dict, + callback_dict, + encpart, + rep_padata): + expected_patypes = () + + sent_fast = self.sent_fast(kdc_exchange_dict) + rep_msg_type = kdc_exchange_dict['rep_msg_type'] + + if sent_fast: + expected_patypes += (PADATA_FX_FAST,) + elif rep_msg_type == KRB_AS_REP: + if self.sent_pk_as_req(kdc_exchange_dict): + expected_patypes += PADATA_PK_AS_REP, + elif self.sent_pk_as_req_win2k(kdc_exchange_dict): + expected_patypes += PADATA_PK_AS_REP_19, + else: + chosen_etype = self.getElementValue(encpart, 'etype') + self.assertIsNotNone(chosen_etype) + + if chosen_etype in {kcrypto.Enctype.AES256, + kcrypto.Enctype.AES128}: + expected_patypes += (PADATA_ETYPE_INFO2,) + + preauth_key = kdc_exchange_dict['preauth_key'] + self.assertIsInstance(preauth_key, Krb5EncryptionKey) + if preauth_key.etype == kcrypto.Enctype.RC4 and rep_padata is None: + rep_padata = () + elif rep_msg_type == KRB_TGS_REP: + if expected_patypes == () and rep_padata is None: + rep_padata = () + + if not self.strict_checking and rep_padata is None: + rep_padata = () + + self.assertIsNotNone(rep_padata) + got_patypes = tuple(pa['padata-type'] for pa in rep_padata) + self.assertSequenceElementsEqual(expected_patypes, got_patypes, + # Windows does not add this. + unchecked={PADATA_PKINIT_KX}) + + if len(expected_patypes) == 0: + return None + + pa_dict = self.get_pa_dict(rep_padata) + + etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2) + if etype_info2 is not None: + etype_info2 = self.der_decode(etype_info2, + asn1Spec=krb5_asn1.ETYPE_INFO2()) + self.assertEqual(len(etype_info2), 1) + elem = etype_info2[0] + + e = self.getElementValue(elem, 'etype') + self.assertEqual(e, chosen_etype) + salt = self.getElementValue(elem, 'salt') + self.assertIsNotNone(salt) + expected_salt = kdc_exchange_dict['expected_salt'] + if expected_salt is not None: + self.assertEqual(salt, expected_salt) + s2kparams = self.getElementValue(elem, 's2kparams') + if self.strict_checking: + self.assertIsNone(s2kparams) + + @staticmethod + def greatest_common_etype(etypes, proposed_etypes): + return max(filter(lambda e: e in etypes, proposed_etypes), + default=None) + + @staticmethod + def first_common_etype(etypes, proposed_etypes): + return next(filter(lambda e: e in etypes, proposed_etypes), None) + + def supported_aes_rc4_etypes(self, kdc_exchange_dict): + creds = kdc_exchange_dict['creds'] + supported_etypes = self.get_default_enctypes(creds) + + rc4_support = kdc_exchange_dict['rc4_support'] + + aes_etypes = set() + if kcrypto.Enctype.AES256 in supported_etypes: + aes_etypes.add(kcrypto.Enctype.AES256) + if kcrypto.Enctype.AES128 in supported_etypes: + aes_etypes.add(kcrypto.Enctype.AES128) + + rc4_etypes = set() + if rc4_support and kcrypto.Enctype.RC4 in supported_etypes: + rc4_etypes.add(kcrypto.Enctype.RC4) + + return aes_etypes, rc4_etypes + + def greatest_aes_rc4_etypes(self, kdc_exchange_dict): + req_body = kdc_exchange_dict['req_body'] + proposed_etypes = req_body['etype'] + + aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes(kdc_exchange_dict) + + expected_aes = self.greatest_common_etype(aes_etypes, proposed_etypes) + expected_rc4 = self.greatest_common_etype(rc4_etypes, proposed_etypes) + + return expected_aes, expected_rc4 + + def expected_etype(self, kdc_exchange_dict): + req_body = kdc_exchange_dict['req_body'] + proposed_etypes = req_body['etype'] + + aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes( + kdc_exchange_dict) + + return self.first_common_etype(aes_etypes | rc4_etypes, + proposed_etypes) + + def check_rep_padata(self, + kdc_exchange_dict, + callback_dict, + rep_padata, + error_code): + rep_msg_type = kdc_exchange_dict['rep_msg_type'] + + sent_fast = self.sent_fast(kdc_exchange_dict) + sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict) + + if rep_msg_type == KRB_TGS_REP: + self.assertTrue(sent_fast) + + rc4_support = kdc_exchange_dict['rc4_support'] + + expected_aes, expected_rc4 = self.greatest_aes_rc4_etypes( + kdc_exchange_dict) + + expect_etype_info2 = () + expect_etype_info = False + if expected_aes is not None: + expect_etype_info2 += (expected_aes,) + if expected_rc4 is not None: + if error_code != 0: + expect_etype_info2 += (expected_rc4,) + if expected_aes is None: + expect_etype_info = True + + if expect_etype_info: + self.assertGreater(len(expect_etype_info2), 0) + + sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict) + + check_patypes = kdc_exchange_dict['check_patypes'] + if check_patypes: + expected_patypes = () + if sent_fast and error_code != 0: + expected_patypes += (PADATA_FX_ERROR,) + expected_patypes += (PADATA_FX_COOKIE,) + + if rep_msg_type == KRB_TGS_REP: + if ('1' in sent_pac_options + and error_code not in (0, KDC_ERR_GENERIC)): + expected_patypes += (PADATA_PAC_OPTIONS,) + elif error_code != KDC_ERR_GENERIC: + if expect_etype_info: + expected_patypes += (PADATA_ETYPE_INFO,) + if len(expect_etype_info2) != 0: + expected_patypes += (PADATA_ETYPE_INFO2,) + + sent_freshness = self.sent_freshness(kdc_exchange_dict) + + if error_code not in (KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW, + KDC_ERR_POLICY, KDC_ERR_CLIENT_REVOKED): + if sent_fast: + expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,) + else: + expected_patypes += (PADATA_ENC_TIMESTAMP,) + + if not sent_enc_challenge: + expected_patypes += (PADATA_PK_AS_REQ,) + if not sent_freshness: + expected_patypes += (PADATA_PK_AS_REP_19,) + + if sent_freshness: + expected_patypes += PADATA_AS_FRESHNESS, + + if (self.kdc_fast_support + and not sent_fast + and not sent_enc_challenge): + expected_patypes += (PADATA_FX_FAST,) + expected_patypes += (PADATA_FX_COOKIE,) + + require_strict = {PADATA_FX_COOKIE, + PADATA_FX_FAST, + PADATA_PAC_OPTIONS, + PADATA_PK_AS_REP_19, + PADATA_PK_AS_REQ, + PADATA_PKINIT_KX, + PADATA_GSS} + strict_edata_checking = kdc_exchange_dict['strict_edata_checking'] + if not strict_edata_checking: + require_strict.add(PADATA_ETYPE_INFO2) + require_strict.add(PADATA_ENCRYPTED_CHALLENGE) + + got_patypes = tuple(pa['padata-type'] for pa in rep_padata) + self.assertSequenceElementsEqual(expected_patypes, got_patypes, + require_strict=require_strict, + unchecked={PADATA_PW_SALT}) + + if not expected_patypes: + return None + + pa_dict = self.get_pa_dict(rep_padata) + + enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP) + if enc_timestamp is not None: + self.assertEqual(len(enc_timestamp), 0) + + pk_as_req = pa_dict.get(PADATA_PK_AS_REQ) + if pk_as_req is not None: + self.assertEqual(len(pk_as_req), 0) + + pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19) + if pk_as_rep19 is not None: + self.assertEqual(len(pk_as_rep19), 0) + + freshness_token = pa_dict.get(PADATA_AS_FRESHNESS) + if freshness_token is not None: + self.assertEqual(bytes(2), freshness_token[:2]) + + freshness = self.der_decode(freshness_token[2:], + asn1Spec=krb5_asn1.EncryptedData()) + + krbtgt_creds = self.get_krbtgt_creds() + krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) + + self.assertElementEqual(freshness, 'etype', krbtgt_key.etype) + self.assertElementKVNO(freshness, 'kvno', krbtgt_key.kvno) + + # Decrypt the freshness token. + ts_enc = krbtgt_key.decrypt(KU_AS_FRESHNESS, + freshness['cipher']) + + # Ensure that we can decode it as PA-ENC-TS-ENC. + ts_enc = self.der_decode(ts_enc, + asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) + freshness_time = self.get_EpochFromKerberosTime( + ts_enc['patimestamp']) + freshness_time += ts_enc['pausec'] / 1e6 + + # Ensure that it is reasonably close to the current time (within + # five minutes, to allow for clock skew). + current_time = datetime.datetime.now( + datetime.timezone.utc).timestamp() + self.assertLess(current_time - 5 * 60, freshness_time) + self.assertLess(freshness_time, current_time + 5 * 60) + + kdc_exchange_dict['freshness_token'] = freshness_token + + fx_fast = pa_dict.get(PADATA_FX_FAST) + if fx_fast is not None: + self.assertEqual(len(fx_fast), 0) + + fast_cookie = pa_dict.get(PADATA_FX_COOKIE) + if fast_cookie is not None: + kdc_exchange_dict['fast_cookie'] = fast_cookie + + fast_error = pa_dict.get(PADATA_FX_ERROR) + if fast_error is not None: + fast_error = self.der_decode(fast_error, + asn1Spec=krb5_asn1.KRB_ERROR()) + self.generic_check_kdc_error(kdc_exchange_dict, + callback_dict, + fast_error, + inner=True) + + pac_options = pa_dict.get(PADATA_PAC_OPTIONS) + if pac_options is not None: + pac_options = self.der_decode( + pac_options, + asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) + self.assertElementEqual(pac_options, 'options', sent_pac_options) + + enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE) + if enc_challenge is not None: + if not sent_enc_challenge: + self.assertEqual(len(enc_challenge), 0) + else: + armor_key = kdc_exchange_dict['armor_key'] + self.assertIsNotNone(armor_key) + + preauth_key, _ = self.get_preauth_key(kdc_exchange_dict) + + kdc_challenge_key = self.generate_kdc_challenge_key( + armor_key, preauth_key) + + # Ensure that the encrypted challenge FAST factor is supported + # (RFC6113 5.4.6). + if self.strict_checking: + self.assertNotEqual(len(enc_challenge), 0) + if len(enc_challenge) != 0: + encrypted_challenge = self.der_decode( + enc_challenge, + asn1Spec=krb5_asn1.EncryptedData()) + self.assertEqual(encrypted_challenge['etype'], + kdc_challenge_key.etype) + + challenge = kdc_challenge_key.decrypt( + KU_ENC_CHALLENGE_KDC, + encrypted_challenge['cipher']) + challenge = self.der_decode( + challenge, + asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) + + # Retrieve the returned timestamp. + rep_patime = challenge['patimestamp'] + self.assertIn('pausec', challenge) + + # Ensure the returned time is within five minutes of the + # current time. + rep_time = self.get_EpochFromKerberosTime(rep_patime) + current_time = time.time() + + self.assertLess(current_time - 300, rep_time) + self.assertLess(rep_time, current_time + 300) + + etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2) + if etype_info2 is not None: + etype_info2 = self.der_decode(etype_info2, + asn1Spec=krb5_asn1.ETYPE_INFO2()) + self.assertGreaterEqual(len(etype_info2), 1) + if self.strict_checking: + self.assertEqual(len(etype_info2), len(expect_etype_info2)) + for i in range(0, len(etype_info2)): + e = self.getElementValue(etype_info2[i], 'etype') + if self.strict_checking: + self.assertEqual(e, expect_etype_info2[i]) + salt = self.getElementValue(etype_info2[i], 'salt') + if e == kcrypto.Enctype.RC4: + if self.strict_checking: + self.assertIsNone(salt) + else: + self.assertIsNotNone(salt) + expected_salt = kdc_exchange_dict['expected_salt'] + if expected_salt is not None: + self.assertEqual(salt, expected_salt) + s2kparams = self.getElementValue(etype_info2[i], 's2kparams') + if self.strict_checking: + self.assertIsNone(s2kparams) + + etype_info = pa_dict.get(PADATA_ETYPE_INFO) + if etype_info is not None: + etype_info = self.der_decode(etype_info, + asn1Spec=krb5_asn1.ETYPE_INFO()) + self.assertEqual(len(etype_info), 1) + e = self.getElementValue(etype_info[0], 'etype') + self.assertEqual(e, kcrypto.Enctype.RC4) + if rc4_support: + self.assertEqual(e, expect_etype_info2[0]) + salt = self.getElementValue(etype_info[0], 'salt') + if self.strict_checking: + self.assertIsNotNone(salt) + self.assertEqual(len(salt), 0) + + return etype_info2 + + def generate_simple_fast(self, + kdc_exchange_dict, + _callback_dict, + req_body, + fast_padata, + fast_armor, + checksum, + fast_options=''): + armor_key = kdc_exchange_dict['armor_key'] + + fast_req = self.KRB_FAST_REQ_create(fast_options, + fast_padata, + req_body) + fast_req = self.der_encode(fast_req, + asn1Spec=krb5_asn1.KrbFastReq()) + fast_req = self.EncryptedData_create(armor_key, + KU_FAST_ENC, + fast_req) + + fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor, + checksum, + fast_req) + + fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req) + fx_fast_request = self.der_encode( + fx_fast_request, + asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST()) + + fast_padata = self.PA_DATA_create(PADATA_FX_FAST, + fx_fast_request) + + return fast_padata + + def generate_ap_req(self, + kdc_exchange_dict, + _callback_dict, + req_body, + armor, + usage=None, + seq_number=None): + req_body_checksum = None + + if armor: + self.assertIsNone(req_body) + + tgt = kdc_exchange_dict['armor_tgt'] + authenticator_subkey = kdc_exchange_dict['armor_subkey'] + else: + tgt = kdc_exchange_dict['tgt'] + authenticator_subkey = kdc_exchange_dict['authenticator_subkey'] + + if req_body is not None: + body_checksum_type = kdc_exchange_dict['body_checksum_type'] + + req_body_blob = self.der_encode( + req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY()) + + req_body_checksum = self.Checksum_create( + tgt.session_key, + KU_TGS_REQ_AUTH_CKSUM, + req_body_blob, + ctype=body_checksum_type) + + auth_data = kdc_exchange_dict['auth_data'] + + subkey_obj = None + if authenticator_subkey is not None: + subkey_obj = authenticator_subkey.export_obj() + if seq_number is None: + seq_number = random.randint(0, 0xfffffffe) + (ctime, cusec) = self.get_KerberosTimeWithUsec() + authenticator_obj = self.Authenticator_create( + crealm=tgt.crealm, + cname=tgt.cname, + cksum=req_body_checksum, + cusec=cusec, + ctime=ctime, + subkey=subkey_obj, + seq_number=seq_number, + authorization_data=auth_data) + authenticator_blob = self.der_encode( + authenticator_obj, + asn1Spec=krb5_asn1.Authenticator()) + + if usage is None: + usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH + authenticator = self.EncryptedData_create(tgt.session_key, + usage, + authenticator_blob) + + if armor: + ap_options = kdc_exchange_dict['fast_ap_options'] + else: + ap_options = kdc_exchange_dict['ap_options'] + if ap_options is None: + ap_options = str(krb5_asn1.APOptions('0')) + ap_req_obj = self.AP_REQ_create(ap_options=ap_options, + ticket=tgt.ticket, + authenticator=authenticator) + ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ()) + + return ap_req + + def generate_simple_tgs_padata(self, + kdc_exchange_dict, + callback_dict, + req_body): + ap_req = self.generate_ap_req(kdc_exchange_dict, + callback_dict, + req_body, + armor=False) + pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req) + padata = [pa_tgs_req] + + return padata, req_body + + def get_preauth_key(self, kdc_exchange_dict): + msg_type = kdc_exchange_dict['rep_msg_type'] + + if msg_type == KRB_AS_REP: + key = kdc_exchange_dict['preauth_key'] + usage = KU_AS_REP_ENC_PART + else: # KRB_TGS_REP + authenticator_subkey = kdc_exchange_dict['authenticator_subkey'] + if authenticator_subkey is not None: + key = authenticator_subkey + usage = KU_TGS_REP_ENC_PART_SUB_KEY + else: + tgt = kdc_exchange_dict['tgt'] + key = tgt.session_key + usage = KU_TGS_REP_ENC_PART_SESSION + + self.assertIsNotNone(key) + + return key, usage + + def generate_armor_key(self, subkey, session_key): + armor_key = kcrypto.cf2(subkey.key, + session_key.key, + b'subkeyarmor', + b'ticketarmor') + armor_key = Krb5EncryptionKey(armor_key, None) + + return armor_key + + def generate_strengthen_reply_key(self, strengthen_key, reply_key): + strengthen_reply_key = kcrypto.cf2(strengthen_key.key, + reply_key.key, + b'strengthenkey', + b'replykey') + strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key, + reply_key.kvno) + + return strengthen_reply_key + + def generate_client_challenge_key(self, armor_key, longterm_key): + client_challenge_key = kcrypto.cf2(armor_key.key, + longterm_key.key, + b'clientchallengearmor', + b'challengelongterm') + client_challenge_key = Krb5EncryptionKey(client_challenge_key, None) + + return client_challenge_key + + def generate_kdc_challenge_key(self, armor_key, longterm_key): + kdc_challenge_key = kcrypto.cf2(armor_key.key, + longterm_key.key, + b'kdcchallengearmor', + b'challengelongterm') + kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None) + + return kdc_challenge_key + + def verify_ticket_checksum(self, ticket, expected_checksum, armor_key): + expected_type = expected_checksum['cksumtype'] + self.assertEqual(armor_key.ctype, expected_type) + + ticket_blob = self.der_encode(ticket, + asn1Spec=krb5_asn1.Ticket()) + checksum = self.Checksum_create(armor_key, + KU_FAST_FINISHED, + ticket_blob) + self.assertEqual(expected_checksum, checksum) + + def verify_ticket(self, ticket, krbtgt_keys, service_ticket, + expect_pac=True, + expect_ticket_checksum=True, + expect_full_checksum=None): + # Decrypt the ticket. + + key = ticket.decryption_key + enc_part = ticket.ticket['enc-part'] + + self.assertElementEqual(enc_part, 'etype', key.etype) + self.assertElementKVNO(enc_part, 'kvno', key.kvno) + + enc_part = key.decrypt(KU_TICKET, enc_part['cipher']) + enc_part = self.der_decode( + enc_part, asn1Spec=krb5_asn1.EncTicketPart()) + + # Fetch the authorization data from the ticket. + auth_data = enc_part.get('authorization-data') + if expect_pac: + self.assertIsNotNone(auth_data) + elif auth_data is None: + return + + # Get a copy of the authdata with an empty PAC, and the existing PAC + # (if present). + empty_pac = self.get_empty_pac() + auth_data, pac_data = self.replace_pac(auth_data, + empty_pac, + expect_pac=expect_pac) + if not expect_pac: + return + + # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the + # raw type to create a new PAC with zeroed signatures for + # verification. This is because on Windows, the resource_groups field + # is added to PAC_LOGON_INFO after the info3 field has been created, + # which results in a different ordering of pointer values than Samba + # (see commit 0e201ecdc53). Using the raw type avoids changing + # PAC_LOGON_INFO, so verification against Windows can work. We still + # need the PAC_DATA type to retrieve the actual checksums, because the + # signatures in the raw type may contain padding bytes. + pac = ndr_unpack(krb5pac.PAC_DATA, + pac_data) + raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW, + pac_data) + + checksums = {} + + full_checksum_buffer = None + + for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers): + buffer_type = pac_buffer.type + if buffer_type in self.pac_checksum_types: + self.assertNotIn(buffer_type, checksums, + f'Duplicate checksum type {buffer_type}') + + # Fetch the checksum and the checksum type from the PAC buffer. + checksum = pac_buffer.info.signature + ctype = pac_buffer.info.type + if ctype & 1 << 31: + ctype |= -1 << 31 + + checksums[buffer_type] = checksum, ctype + + if buffer_type == krb5pac.PAC_TYPE_FULL_CHECKSUM: + full_checksum_buffer = raw_pac_buffer + elif buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM: + # Zero the checksum field so that we can later verify the + # checksums. The ticket checksum field is not zeroed. + + signature = ndr_unpack( + krb5pac.PAC_SIGNATURE_DATA, + raw_pac_buffer.info.remaining) + signature.signature = bytes(len(checksum)) + raw_pac_buffer.info.remaining = ndr_pack( + signature) + + # Re-encode the PAC. + pac_data = ndr_pack(raw_pac) + + if full_checksum_buffer is not None: + signature = ndr_unpack( + krb5pac.PAC_SIGNATURE_DATA, + full_checksum_buffer.info.remaining) + signature.signature = bytes(len(checksum)) + full_checksum_buffer.info.remaining = ndr_pack( + signature) + + # Re-encode the PAC. + full_pac_data = ndr_pack(raw_pac) + + # Verify the signatures. + + server_checksum, server_ctype = checksums[ + krb5pac.PAC_TYPE_SRV_CHECKSUM] + key.verify_checksum(KU_NON_KERB_CKSUM_SALT, + pac_data, + server_ctype, + server_checksum) + + kdc_checksum, kdc_ctype = checksums[ + krb5pac.PAC_TYPE_KDC_CHECKSUM] + + if isinstance(krbtgt_keys, collections.abc.Container): + if self.strict_checking: + krbtgt_key = krbtgt_keys[0] + else: + krbtgt_key = next(key for key in krbtgt_keys + if key.ctype == kdc_ctype) + else: + krbtgt_key = krbtgt_keys + + krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, + server_checksum, + kdc_ctype, + kdc_checksum) + + if not service_ticket: + self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums) + self.assertNotIn(krb5pac.PAC_TYPE_FULL_CHECKSUM, checksums) + else: + ticket_checksum, ticket_ctype = checksums.get( + krb5pac.PAC_TYPE_TICKET_CHECKSUM, + (None, None)) + if expect_ticket_checksum: + self.assertIsNotNone(ticket_checksum) + elif expect_ticket_checksum is False: + self.assertIsNone(ticket_checksum) + if ticket_checksum is not None: + enc_part['authorization-data'] = auth_data + enc_part = self.der_encode(enc_part, + asn1Spec=krb5_asn1.EncTicketPart()) + + krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, + enc_part, + ticket_ctype, + ticket_checksum) + + full_checksum, full_ctype = checksums.get( + krb5pac.PAC_TYPE_FULL_CHECKSUM, + (None, None)) + if expect_full_checksum: + self.assertIsNotNone(full_checksum) + elif expect_full_checksum is False: + self.assertIsNone(full_checksum) + if full_checksum is not None: + krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, + full_pac_data, + full_ctype, + full_checksum) + + def modified_ticket(self, + ticket, *, + new_ticket_key=None, + modify_fn=None, + modify_pac_fn=None, + exclude_pac=False, + allow_empty_authdata=False, + update_pac_checksums=None, + checksum_keys=None, + include_checksums=None): + if checksum_keys is None: + # A dict containing a key for each checksum type to be created in + # the PAC. + checksum_keys = {} + else: + checksum_keys = dict(checksum_keys) + + if include_checksums is None: + # A dict containing a value for each checksum type; True if the + # checksum type is to be included in the PAC, False if it is to be + # excluded, or None/not present if the checksum is to be included + # based on its presence in the original PAC. + include_checksums = {} + else: + include_checksums = dict(include_checksums) + + # Check that the values passed in by the caller make sense. + + self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types) + self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types) + + if update_pac_checksums is None: + update_pac_checksums = not exclude_pac + + if exclude_pac: + self.assertIsNone(modify_pac_fn) + self.assertFalse(update_pac_checksums) + + if not update_pac_checksums: + self.assertFalse(checksum_keys) + self.assertFalse(include_checksums) + + expect_pac = bool(modify_pac_fn) + + key = ticket.decryption_key + + if new_ticket_key is None: + # Use the same key to re-encrypt the ticket. + new_ticket_key = key + + if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys: + # If the server signature key is not present, fall back to the key + # used to encrypt the ticket. + checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key + + if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys: + # If the ticket signature key is not present, fall back to the key + # used for the KDC signature. + kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM) + if kdc_checksum_key is not None: + checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = ( + kdc_checksum_key) + + if krb5pac.PAC_TYPE_FULL_CHECKSUM not in checksum_keys: + # If the full signature key is not present, fall back to the key + # used for the KDC signature. + kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM) + if kdc_checksum_key is not None: + checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] = ( + kdc_checksum_key) + + # Decrypt the ticket. + + enc_part = ticket.ticket['enc-part'] + + self.assertElementEqual(enc_part, 'etype', key.etype) + self.assertElementKVNO(enc_part, 'kvno', key.kvno) + + enc_part = key.decrypt(KU_TICKET, enc_part['cipher']) + enc_part = self.der_decode( + enc_part, asn1Spec=krb5_asn1.EncTicketPart()) + + # Modify the ticket here. + if callable(modify_fn): + enc_part = modify_fn(enc_part) + elif modify_fn: + for fn in modify_fn: + enc_part = fn(enc_part) + + auth_data = enc_part.get('authorization-data') + if expect_pac: + self.assertIsNotNone(auth_data) + if auth_data is not None: + new_pac = None + if exclude_pac: + need_to_call_replace_pac = True + elif not modify_pac_fn and not update_pac_checksums: + need_to_call_replace_pac = False + else: + need_to_call_replace_pac = True + # Get a copy of the authdata with an empty PAC, and the + # existing PAC (if present). + empty_pac = self.get_empty_pac() + empty_pac_auth_data, pac_data = self.replace_pac( + auth_data, + empty_pac, + expect_pac=expect_pac) + + if pac_data is not None: + pac = ndr_unpack(krb5pac.PAC_DATA, pac_data) + + # Modify the PAC here. + if callable(modify_pac_fn): + pac = modify_pac_fn(pac) + elif modify_pac_fn: + for fn in modify_pac_fn: + pac = fn(pac) + + if update_pac_checksums: + # Get the enc-part with an empty PAC, which is needed + # to create a ticket signature. + enc_part_to_sign = enc_part.copy() + enc_part_to_sign['authorization-data'] = ( + empty_pac_auth_data) + enc_part_to_sign = self.der_encode( + enc_part_to_sign, + asn1Spec=krb5_asn1.EncTicketPart()) + + self.update_pac_checksums(pac, + checksum_keys, + include_checksums, + enc_part_to_sign) + + # Re-encode the PAC. + pac_data = ndr_pack(pac) + new_pac = self.AuthorizationData_create(AD_WIN2K_PAC, + pac_data) + + # Replace the PAC in the authorization data and re-add it to the + # ticket enc-part. + if need_to_call_replace_pac: + auth_data, _ = self.replace_pac( + auth_data, new_pac, + expect_pac=expect_pac, + allow_empty_authdata=allow_empty_authdata) + enc_part['authorization-data'] = auth_data + + # Re-encrypt the ticket enc-part with the new key. + enc_part_new = self.der_encode(enc_part, + asn1Spec=krb5_asn1.EncTicketPart()) + enc_part_new = self.EncryptedData_create(new_ticket_key, + KU_TICKET, + enc_part_new) + + # Create a copy of the ticket with the new enc-part. + new_ticket = ticket.ticket.copy() + new_ticket['enc-part'] = enc_part_new + + new_ticket_creds = KerberosTicketCreds( + new_ticket, + session_key=ticket.session_key, + crealm=ticket.crealm, + cname=ticket.cname, + srealm=ticket.srealm, + sname=ticket.sname, + decryption_key=new_ticket_key, + ticket_private=enc_part, + encpart_private=ticket.encpart_private) + + return new_ticket_creds + + def update_pac_checksums(self, + pac, + checksum_keys, + include_checksums, + enc_part=None): + pac_buffers = pac.buffers + checksum_buffers = {} + + # Find the relevant PAC checksum buffers. + for pac_buffer in pac_buffers: + buffer_type = pac_buffer.type + if buffer_type in self.pac_checksum_types: + self.assertNotIn(buffer_type, checksum_buffers, + f'Duplicate checksum type {buffer_type}') + + checksum_buffers[buffer_type] = pac_buffer + + # Create any additional buffers that were requested but not + # present. Conversely, remove any buffers that were requested to be + # removed. + for buffer_type in self.pac_checksum_types: + if buffer_type in checksum_buffers: + if include_checksums.get(buffer_type) is False: + checksum_buffer = checksum_buffers.pop(buffer_type) + + pac.num_buffers -= 1 + pac_buffers.remove(checksum_buffer) + + elif include_checksums.get(buffer_type) is True: + info = krb5pac.PAC_SIGNATURE_DATA() + + checksum_buffer = krb5pac.PAC_BUFFER() + checksum_buffer.type = buffer_type + checksum_buffer.info = info + + pac_buffers.append(checksum_buffer) + pac.num_buffers += 1 + + checksum_buffers[buffer_type] = checksum_buffer + + # Fill the relevant checksum buffers. + for buffer_type, checksum_buffer in checksum_buffers.items(): + checksum_key = checksum_keys[buffer_type] + ctype = checksum_key.ctype & ((1 << 32) - 1) + + if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM: + self.assertIsNotNone(enc_part) + + signature = checksum_key.make_rodc_checksum( + KU_NON_KERB_CKSUM_SALT, + enc_part) + + elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM: + signature = checksum_key.make_zeroed_checksum() + + else: + signature = checksum_key.make_rodc_zeroed_checksum() + + checksum_buffer.info.signature = signature + checksum_buffer.info.type = ctype + + # Add the new checksum buffers to the PAC. + pac.buffers = pac_buffers + + # Calculate the full checksum and insert it into the PAC. + full_checksum_buffer = checksum_buffers.get( + krb5pac.PAC_TYPE_FULL_CHECKSUM) + if full_checksum_buffer is not None: + full_checksum_key = checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] + + pac_data = ndr_pack(pac) + full_checksum = full_checksum_key.make_rodc_checksum( + KU_NON_KERB_CKSUM_SALT, + pac_data) + + full_checksum_buffer.info.signature = full_checksum + + # Calculate the server and KDC checksums and insert them into the PAC. + + server_checksum_buffer = checksum_buffers.get( + krb5pac.PAC_TYPE_SRV_CHECKSUM) + if server_checksum_buffer is not None: + server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] + + pac_data = ndr_pack(pac) + server_checksum = server_checksum_key.make_checksum( + KU_NON_KERB_CKSUM_SALT, + pac_data) + + server_checksum_buffer.info.signature = server_checksum + + kdc_checksum_buffer = checksum_buffers.get( + krb5pac.PAC_TYPE_KDC_CHECKSUM) + if kdc_checksum_buffer is not None: + if server_checksum_buffer is None: + # There's no server signature to make the checksum over, so + # just make the checksum over an empty bytes object. + server_checksum = bytes() + + kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM] + + kdc_checksum = kdc_checksum_key.make_rodc_checksum( + KU_NON_KERB_CKSUM_SALT, + server_checksum) + + kdc_checksum_buffer.info.signature = kdc_checksum + + def replace_pac(self, auth_data, new_pac, expect_pac=True, + allow_empty_authdata=False): + if new_pac is not None: + self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC) + self.assertElementPresent(new_pac, 'ad-data') + + new_auth_data = [] + + ad_relevant = None + old_pac = None + + for authdata_elem in auth_data: + if authdata_elem['ad-type'] == AD_IF_RELEVANT: + ad_relevant = self.der_decode( + authdata_elem['ad-data'], + asn1Spec=krb5_asn1.AD_IF_RELEVANT()) + + relevant_elems = [] + for relevant_elem in ad_relevant: + if relevant_elem['ad-type'] == AD_WIN2K_PAC: + self.assertIsNone(old_pac, 'Multiple PACs detected') + old_pac = relevant_elem['ad-data'] + + if new_pac is not None: + relevant_elems.append(new_pac) + else: + relevant_elems.append(relevant_elem) + if expect_pac: + self.assertIsNotNone(old_pac, 'Expected PAC') + + if relevant_elems or allow_empty_authdata: + ad_relevant = self.der_encode( + relevant_elems, + asn1Spec=krb5_asn1.AD_IF_RELEVANT()) + + authdata_elem = self.AuthorizationData_create( + AD_IF_RELEVANT, + ad_relevant) + else: + authdata_elem = None + + if authdata_elem is not None or allow_empty_authdata: + new_auth_data.append(authdata_elem) + + if expect_pac: + self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT') + + return new_auth_data, old_pac + + def get_pac(self, auth_data, expect_pac=True): + _, pac = self.replace_pac(auth_data, None, expect_pac) + return pac + + def get_ticket_pac(self, ticket, expect_pac=True): + auth_data = ticket.ticket_private.get('authorization-data') + if expect_pac: + self.assertIsNotNone(auth_data) + elif auth_data is None: + return None + + return self.get_pac(auth_data, expect_pac=expect_pac) + + def get_krbtgt_checksum_key(self): + krbtgt_creds = self.get_krbtgt_creds() + krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) + + return { + krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key + } + + def is_tgs_principal(self, principal): + if self.is_tgs(principal): + return True + + if self.kadmin_is_tgs and self.is_kadmin(principal): + return True + + return False + + def is_kadmin(self, principal): + name = principal['name-string'][0] + return name in ('kadmin', b'kadmin') + + def is_tgs(self, principal): + name_string = principal['name-string'] + if 1 <= len(name_string) <= 2: + return name_string[0] in ('krbtgt', b'krbtgt') + + return False + + def is_tgt(self, ticket): + sname = ticket.ticket['sname'] + return self.is_tgs(sname) + + def get_empty_pac(self): + return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1)) + + def get_outer_pa_dict(self, kdc_exchange_dict): + return self.get_pa_dict(kdc_exchange_dict['req_padata']) + + def get_fast_pa_dict(self, kdc_exchange_dict): + req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata']) + + if req_pa_dict: + return req_pa_dict + + return self.get_outer_pa_dict(kdc_exchange_dict) + + def sent_fast(self, kdc_exchange_dict): + outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict) + + return PADATA_FX_FAST in outer_pa_dict + + def sent_enc_challenge(self, kdc_exchange_dict): + fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) + + return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict + + def sent_enc_pa_rep(self, kdc_exchange_dict): + fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) + + return PADATA_REQ_ENC_PA_REP in fast_pa_dict + + def sent_pk_as_req(self, kdc_exchange_dict): + fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) + + return PADATA_PK_AS_REQ in fast_pa_dict + + def sent_pk_as_req_win2k(self, kdc_exchange_dict): + fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) + + return PADATA_PK_AS_REP_19 in fast_pa_dict + + def sent_freshness(self, kdc_exchange_dict): + fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) + + return PADATA_AS_FRESHNESS in fast_pa_dict + + def get_sent_pac_options(self, kdc_exchange_dict): + fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) + + if PADATA_PAC_OPTIONS not in fast_pa_dict: + return '' + + pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS], + asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) + pac_options = pac_options['options'] + + # Mask out unsupported bits. + pac_options, remaining = pac_options[:4], pac_options[4:] + pac_options += '0' * len(remaining) + + return pac_options + + def get_krbtgt_sname(self): + krbtgt_creds = self.get_krbtgt_creds() + krbtgt_username = krbtgt_creds.get_username() + krbtgt_realm = krbtgt_creds.get_realm() + krbtgt_sname = self.PrincipalName_create( + name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm]) + + return krbtgt_sname + + def add_requester_sid(self, pac, sid): + pac_buffers = pac.buffers + + buffer_types = [pac_buffer.type for pac_buffer in pac_buffers] + self.assertNotIn(krb5pac.PAC_TYPE_REQUESTER_SID, buffer_types) + + requester_sid = krb5pac.PAC_REQUESTER_SID() + requester_sid.sid = security.dom_sid(sid) + + requester_sid_buffer = krb5pac.PAC_BUFFER() + requester_sid_buffer.type = krb5pac.PAC_TYPE_REQUESTER_SID + requester_sid_buffer.info = requester_sid + + pac_buffers.append(requester_sid_buffer) + + pac.buffers = pac_buffers + pac.num_buffers += 1 + + return pac + + def modify_lifetime(self, ticket, lifetime, requester_sid=None): + # Get the krbtgt key. + krbtgt_creds = self.get_krbtgt_creds() + + krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) + checksum_keys = { + krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key, + } + + current_time = time.time() + + # Set authtime and starttime to an hour in the past, to show that they + # do not affect ticket rejection. + start_time = self.get_KerberosTime(epoch=current_time, offset=-60 * 60) + + # Set the endtime of the ticket relative to our current time, so that + # the ticket has 'lifetime' seconds remaining to live. + end_time = self.get_KerberosTime(epoch=current_time, offset=lifetime) + + # Modify the times in the ticket. + def modify_ticket_times(enc_part): + enc_part['authtime'] = start_time + if 'starttime' in enc_part: + enc_part['starttime'] = start_time + + enc_part['endtime'] = end_time + + return enc_part + + # We have to set the times in both the ticket and the PAC, otherwise + # Heimdal will complain. + def modify_pac_time(pac): + pac_buffers = pac.buffers + + for pac_buffer in pac_buffers: + if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME: + logon_time = self.get_EpochFromKerberosTime(start_time) + pac_buffer.info.logon_time = unix2nttime(logon_time) + break + else: + self.fail('failed to find LOGON_NAME PAC buffer') + + pac.buffers = pac_buffers + + return pac + + def modify_pac_fn(pac): + if requester_sid is not None: + # Add a requester SID to show that the KDC will then accept + # this kpasswd ticket as if it were a TGT. + pac = self.add_requester_sid(pac, sid=requester_sid) + pac = modify_pac_time(pac) + return pac + + # Do the actual modification. + return self.modified_ticket(ticket, + new_ticket_key=krbtgt_key, + modify_fn=modify_ticket_times, + modify_pac_fn=modify_pac_fn, + checksum_keys=checksum_keys) + + def _test_as_exchange(self, + cname, + realm, + sname, + till, + expected_error_mode, + expected_crealm, + expected_cname, + expected_srealm, + expected_sname, + expected_salt, + etypes, + padata, + kdc_options, + creds=None, + renew_time=None, + expected_account_name=None, + expected_groups=None, + unexpected_groups=None, + expected_upn_name=None, + expected_sid=None, + expected_domain_sid=None, + expected_flags=None, + unexpected_flags=None, + expected_supported_etypes=None, + preauth_key=None, + ticket_decryption_key=None, + pac_request=None, + pac_options=None, + expect_pac=True, + expect_pac_attrs=None, + expect_pac_attrs_pac_request=None, + expect_requester_sid=None, + expect_client_claims=None, + expect_device_claims=None, + expected_client_claims=None, + unexpected_client_claims=None, + expected_device_claims=None, + unexpected_device_claims=None, + expect_edata=None, + expect_status=None, + expected_status=None, + rc4_support=True, + to_rodc=False): + + def _generate_padata_copy(_kdc_exchange_dict, + _callback_dict, + req_body): + return padata, req_body + + if not expected_error_mode: + check_error_fn = None + check_rep_fn = self.generic_check_kdc_rep + else: + check_error_fn = self.generic_check_kdc_error + check_rep_fn = None + + if padata is not None: + generate_padata_fn = _generate_padata_copy + else: + generate_padata_fn = None + + kdc_exchange_dict = self.as_exchange_dict( + creds=creds, + expected_crealm=expected_crealm, + expected_cname=expected_cname, + expected_srealm=expected_srealm, + expected_sname=expected_sname, + expected_account_name=expected_account_name, + expected_groups=expected_groups, + unexpected_groups=unexpected_groups, + expected_upn_name=expected_upn_name, + expected_sid=expected_sid, + expected_domain_sid=expected_domain_sid, + expected_supported_etypes=expected_supported_etypes, + ticket_decryption_key=ticket_decryption_key, + generate_padata_fn=generate_padata_fn, + check_error_fn=check_error_fn, + check_rep_fn=check_rep_fn, + check_kdc_private_fn=self.generic_check_kdc_private, + expected_error_mode=expected_error_mode, + expected_salt=expected_salt, + expected_flags=expected_flags, + unexpected_flags=unexpected_flags, + preauth_key=preauth_key, + kdc_options=str(kdc_options), + pac_request=pac_request, + pac_options=pac_options, + expect_pac=expect_pac, + expect_pac_attrs=expect_pac_attrs, + expect_pac_attrs_pac_request=expect_pac_attrs_pac_request, + expect_requester_sid=expect_requester_sid, + expect_client_claims=expect_client_claims, + expect_device_claims=expect_device_claims, + expected_client_claims=expected_client_claims, + unexpected_client_claims=unexpected_client_claims, + expected_device_claims=expected_device_claims, + unexpected_device_claims=unexpected_device_claims, + expect_edata=expect_edata, + expect_status=expect_status, + expected_status=expected_status, + rc4_support=rc4_support, + to_rodc=to_rodc) + + rep = self._generic_kdc_exchange(kdc_exchange_dict, + cname=cname, + realm=realm, + sname=sname, + till_time=till, + renew_time=renew_time, + etypes=etypes) + + return rep, kdc_exchange_dict |