#!/usr/bin/env python3 # Unix SMB/CIFS implementation. # # Tests for Key Trust authentication # # Based on pkinit_tests.py # # Copyright (C) Gary Lockyer 2025 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import sys import os sys.path.insert(0, "bin/python") os.environ["PYTHONUNBUFFERED"] = "1" from datetime import datetime, timedelta from cryptography import x509 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.x509.oid import NameOID from samba.dcerpc import security from samba.domain.models import User from samba.key_credential_link import create_key_credential_link from samba.tests.krb5 import kcrypto from samba.tests.krb5.kdc_base_test import KDCBaseTest from samba.tests.krb5.raw_testcase import PkInit, RawKerberosTest from samba.tests.krb5.rfc4120_constants import ( DES_EDE3_CBC, KDC_ERR_CLIENT_NOT_TRUSTED, NT_PRINCIPAL, PADATA_PK_AS_REQ, ) import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 SidType = RawKerberosTest.SidType global_asn1_print = False global_hexdump = False class KeyTrustTests(KDCBaseTest): @classmethod def setUpClass(cls): super().setUpClass() def setUp(self): super().setUp() self.do_asn1_print = global_asn1_print self.do_hexdump = global_hexdump def test_key_trust_user(self): """ Test key trust logon for a normal account """ client_creds = self._get_creds() target_creds = self.get_service_creds() certificate = self._create_certificate(client_creds) public_key = certificate.public_bytes(Encoding.DER) self._add_key_cred_link(client_creds, public_key) self._pkinit_req(client_creds, target_creds, certificate) def test_key_trust_user_mismatched_keys(self): """ Test key trust logon for a normal account, where the certificate public key does not match the msDS-KeyCredentialLink value """ client_creds = self._get_creds() target_creds = self.get_service_creds() certificate = self._create_certificate(client_creds) private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) public_key = private_key.public_key().public_bytes( encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo ) self._add_key_cred_link(client_creds, public_key) self._pkinit_req( client_creds, target_creds, certificate, expect_error=KDC_ERR_CLIENT_NOT_TRUSTED, ) def test_key_trust_user_no_keys(self): """ Test key trust logon for a normal account, with no msDS-KeyCredentialLink """ client_creds = self._get_creds() target_creds = self.get_service_creds() certificate = self._create_certificate(client_creds) self._pkinit_req( client_creds, target_creds, certificate, expect_error=KDC_ERR_CLIENT_NOT_TRUSTED, ) def test_key_trust_computer(self): """ Test key trust logon with a computer account. """ client_creds = self._get_creds(self.AccountType.COMPUTER) target_creds = self.get_service_creds() certificate = self._create_certificate(client_creds) public_key = certificate.public_bytes(Encoding.DER) self._add_key_cred_link(client_creds, public_key) self._pkinit_req(client_creds, target_creds, certificate) def test_key_trust_computer_mismatched_keys(self): """ Test key trust logon for a computer account, where the certificate public key does not match the msDS-KeyCredentialLink value """ client_creds = self._get_creds(self.AccountType.COMPUTER) target_creds = self.get_service_creds() certificate = self._create_certificate(client_creds) private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) public_key = private_key.public_key().public_bytes( encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo ) self._add_key_cred_link(client_creds, public_key) self._pkinit_req( client_creds, target_creds, certificate, expect_error=KDC_ERR_CLIENT_NOT_TRUSTED, ) def test_key_trust_computer_no_keys(self): """ Test key trust logon with a computer account, with no msDS-KeyCredentialLink """ client_creds = self._get_creds(self.AccountType.COMPUTER) target_creds = self.get_service_creds() certificate = self._create_certificate(client_creds) self._pkinit_req( client_creds, target_creds, certificate = certificate, expect_error=KDC_ERR_CLIENT_NOT_TRUSTED, ) def _get_creds(self, account_type=KDCBaseTest.AccountType.USER, use_cache=False): """ Return credentials with an account having a UPN for performing PK-INIT. Modified from the version in python/samba/tests/krb5/pkinit_tests.py """ samdb = self.get_samdb() realm = samdb.domain_dns_name().upper() opts = {"upn": f"{{account}}.{realm}@{realm}"} return self.get_cached_creds( account_type=account_type, opts=opts, use_cache=use_cache ) def _add_key_cred_link(self, creds, public_key): """ Update the msDS-KeyCredentialLink for the user specified in creds with the supplied public key """ link = create_key_credential_link(self.get_samdb(), creds.get_dn(), public_key) user = User.find(self.get_samdb(), creds.get_username()) self.assertIsNotNone(user) if user is not None: user.key_credential_link = link user.save(self.get_samdb()) def _create_certificate(self, creds): """ Create a new self signed certificate Modified from the version in python/samba/tests/krb5/pkinit_tests.py """ certificate_signature = hashes.SHA256 user_name = creds.get_username() builder = x509.CertificateBuilder() # Add the subject name. cert_name = f"{user_name}@{creds.get_realm().lower()}" subject_name = x509.Name( [ # This name can be anything; it isn’t needed to authorize the # user. The SubjectAlternativeName is used for that instead. x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "SambaState"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "SambaSelfTesting"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "Users"), x509.NameAttribute(NameOID.COMMON_NAME, f"{cert_name}"), ] ) builder = builder.subject_name(subject_name) # The new certificate is self signed builder = builder.issuer_name(subject_name) one_day = timedelta(1, 0, 0) # Put the certificate start time in the past to avoid issues where the # KDC considers the certificate to be invalid due to clock skew. Note # that if the certificate predates the existence of the account in AD, # Windows will refuse authentication unless a strong mapping is # present (in the certificate, or in AD). # See https://support.microsoft.com/en-us/topic/kb5014754-certificate-based-authentication-changes-on-windows-domain-controllers-ad2c23b0-15d8-4340-a468-4d4f3b188f16#ID0EFR builder = builder.not_valid_before(datetime.today() - one_day) builder = builder.not_valid_after(datetime.today() + (one_day * 30)) builder = builder.serial_number(x509.random_serial_number()) public_key = creds.get_public_key() builder = builder.public_key(public_key) # Add the SubjectAlternativeName. Windows uses this to map the account # to the certificate. id_pkinit_ms_san = x509.ObjectIdentifier(str(krb5_asn1.id_pkinit_ms_san)) encoded_upn = self.der_encode(creds.get_upn(), asn1Spec=krb5_asn1.MS_UPN_SAN()) ms_upn_san = x509.OtherName(id_pkinit_ms_san, encoded_upn) builder = builder.add_extension( x509.SubjectAlternativeName([ms_upn_san]), critical=False, ) # The key identifier is used to identify the certificate. subject_key_id = x509.SubjectKeyIdentifier.from_public_key(public_key) builder = builder.add_extension( subject_key_id, critical=True, ) # Add the key usages for which this certificate is valid. builder = builder.add_extension( # Heimdal requires that the certificate be valid for digital # signatures. x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=False, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False), critical=True, ) certificate = builder.sign( private_key=creds.get_private_key(), algorithm=certificate_signature(), backend=default_backend(), ) return certificate def _pkinit_req( self, creds, target_creds, certificate, expect_error=0, ): """ Modified from the version in python/samba/tests/krb5/pkinit_tests.py """ signature_algorithm = krb5_asn1.sha1WithRSAEncryption signature_algorithm_id = self.AlgorithmIdentifier_create(signature_algorithm) digest_algorithm = krb5_asn1.id_sha1 digest_algorithm_id = self.AlgorithmIdentifier_create(digest_algorithm) private_key = creds.get_private_key() preauth_key = private_key pk_nonce = self.get_Nonce() def generate_pk_padata(_kdc_exchange_dict, _callback_dict, req_body): # Suppress unused parameter warnings _ = (_kdc_exchange_dict, _callback_dict) checksum_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY()) # Calculate the SHA1 checksum over the KDC-REQ-BODY. This checksum # is required to be present in the authenticator, and must be SHA1. digest = hashes.Hash(hashes.SHA1(), default_backend()) digest.update(checksum_blob) digest = digest.finalize() ctime, cusec = self.get_KerberosTimeWithUsec() # Create the authenticator, which shows that we had possession of # the private key at some point. authenticator_obj = self.PKAuthenticator_create( cusec, ctime, pk_nonce, pa_checksum=digest ) client_public_value = None # An optional set of algorithms supported by the client in # decreasing order of preference. For whatever reason, if this # field is missing or empty, Windows will respond with a slightly # differently encoded ReplyKeyPack, wrapping it first in a # ContentInfo structure. supported_cms_types = [ self.AlgorithmIdentifier_create( krb5_asn1.id_pkcs1_sha256WithRSAEncryption ), ] auth_pack_obj = self.AuthPack_create( authenticator_obj, client_public_value=client_public_value, supported_cms_types=supported_cms_types, ) asn1_spec = krb5_asn1.AuthPack auth_pack = self.der_encode(auth_pack_obj, asn1Spec=asn1_spec()) signature_hash = self.hash_from_algorithm(signature_algorithm) digest_hash = self.hash_from_algorithm(digest_algorithm) digest = hashes.Hash(digest_hash(), default_backend()) digest.update(auth_pack) digest = digest.finalize() encap_content_info_obj = self.EncapsulatedContentInfo_create( krb5_asn1.id_pkinit_authData, auth_pack ) subject_key_id = certificate.extensions.get_extension_for_oid( x509.ExtensionOID.SUBJECT_KEY_IDENTIFIER ) signer_identifier = self.SignerIdentifier_create( subject_key_id=subject_key_id.value.digest ) auth_data_attr = self.Attribute_create( krb5_asn1.id_pkinit_authData, [auth_pack] ) message_digest = self.der_encode(digest, asn1Spec=krb5_asn1.MessageDigest()) message_digest_attr = self.Attribute_create( krb5_asn1.id_messageDigest, [message_digest] ) signed_attrs = [ # Note: these attributes are optional. auth_data_attr, message_digest_attr, ] encoded_signed_attrs = self.der_encode( signed_attrs, asn1Spec=krb5_asn1.CMSAttributes() ) pad = padding.PKCS1v15() signed = private_key.sign( encoded_signed_attrs, padding=pad, algorithm=signature_hash() ) signer_info = self.SignerInfo_create( signer_identifier, digest_algorithm_id, signature_algorithm_id, signed, signed_attrs=signed_attrs, ) encoded_cert = certificate.public_bytes(serialization.Encoding.DER) decoded_cert = self.der_decode( encoded_cert, asn1Spec=krb5_asn1.CertificateChoices() ) signed_auth_pack = self.SignedData_create( [digest_algorithm_id], encap_content_info_obj, signer_infos=[signer_info], certificates=[decoded_cert], crls=None, ) signed_auth_pack = self.der_encode( signed_auth_pack, asn1Spec=krb5_asn1.SignedData() ) pk_as_req = self.PK_AS_REQ_create(signed_auth_pack) pa_type = PADATA_PK_AS_REQ padata = [self.PA_DATA_create(pa_type, pk_as_req)] return padata, req_body user_name = creds.get_username() cname = self.PrincipalName_create( name_type=NT_PRINCIPAL, names=user_name.split("/") ) target_name = target_creds.get_username() target_realm = target_creds.get_realm() target_sname = self.PrincipalName_create( name_type=NT_PRINCIPAL, names=["host", target_name[:-1]] ) expected_sname = self.PrincipalName_create( name_type=NT_PRINCIPAL, names=[target_name] ) if expect_error: check_error_fn = self.generic_check_kdc_error check_rep_fn = None expected_sname = target_sname else: check_error_fn = None check_rep_fn = self.generic_check_kdc_rep kdc_options = "forwardable,renewable,canonicalize,renewable-ok" kdc_options = krb5_asn1.KDCOptions(kdc_options) ticket_decryption_key = self.TicketDecryptionKey_from_creds(target_creds) expected_groups = None unexpected_groups = { ( security.SID_FRESH_PUBLIC_KEY_IDENTITY, SidType.EXTRA_SID, security.SE_GROUP_DEFAULT_FLAGS, ) } kdc_exchange_dict = self.as_exchange_dict( creds=creds, client_cert=certificate, expected_crealm=creds.get_realm(), expected_cname=cname, expected_srealm=target_realm, expected_sname=expected_sname, expected_supported_etypes=target_creds.tgs_supported_enctypes, expected_groups=expected_groups, unexpected_groups=unexpected_groups, ticket_decryption_key=ticket_decryption_key, generate_padata_fn=generate_pk_padata, check_error_fn=check_error_fn, check_rep_fn=check_rep_fn, check_kdc_private_fn=self.generic_check_kdc_private, expected_error_mode=expect_error, expected_salt=creds.get_salt(), preauth_key=preauth_key, kdc_options=str(kdc_options), using_pkinit=PkInit.PUBLIC_KEY, pk_nonce=pk_nonce, expect_edata=False, expect_matching_nt_hash_in_pac=True, ) till = self.get_KerberosTime(offset=36000) etypes = ( kcrypto.Enctype.AES256, kcrypto.Enctype.RC4, ) # DES-EDE3-CBC is required for public-key PK-INIT to work on Windows. etypes += (DES_EDE3_CBC,) rep = self._generic_kdc_exchange( kdc_exchange_dict, cname=cname, realm=target_realm, sname=target_sname, till_time=till, etypes=etypes, ) if expect_error: self.check_error_rep(rep, expect_error) return None self.check_as_reply(rep) return kdc_exchange_dict if __name__ == "__main__": global_asn1_print = False global_hexdump = False import unittest unittest.main()