diff options
| author | Joseph Sutton <josephsutton@catalyst.net.nz> | 2023-07-03 14:49:43 +1200 |
|---|---|---|
| committer | Andrew Bartlett <abartlet@samba.org> | 2023-07-19 01:47:33 +0000 |
| commit | 7584e7a3a131795b7bb57c59c53754e9b4ab1855 (patch) | |
| tree | 9a9d820a87c8c64e4ff7588d14cca39c37a17ddc /python/samba | |
| parent | 7f9547fda793af65346708bbe14f8a4995d50a5a (diff) | |
| download | samba-7584e7a3a131795b7bb57c59c53754e9b4ab1855.tar.gz samba-7584e7a3a131795b7bb57c59c53754e9b4ab1855.tar.bz2 samba-7584e7a3a131795b7bb57c59c53754e9b4ab1855.zip | |
tests/krb5: Add helper methods for PK-INIT testing
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python/samba')
| -rw-r--r-- | python/samba/tests/krb5/raw_testcase.py | 418 |
1 files changed, 418 insertions, 0 deletions
diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py index 63aca4eff3b..70607ff8493 100644 --- a/python/samba/tests/krb5/raw_testcase.py +++ b/python/samba/tests/krb5/raw_testcase.py @@ -25,8 +25,14 @@ import random import binascii import itertools import collections +import math from enum import Enum +from pprint import pprint + +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.ciphers import algorithms +from cryptography.hazmat.backends import default_backend from pyasn1.codec.der.decoder import decode as pyasn1_der_decode from pyasn1.codec.der.encoder import encode as pyasn1_der_encode @@ -34,6 +40,7 @@ from pyasn1.codec.native.decoder import decode as pyasn1_native_decode from pyasn1.codec.native.encoder import encode as pyasn1_native_encode from pyasn1.codec.ber.encoder import BitStringEncoder +import pyasn1.type.univ from pyasn1.error import PyAsn1Error @@ -1807,6 +1814,22 @@ class RawKerberosTest(TestCase): Authenticator_obj['authorization-data'] = authorization_data return Authenticator_obj + def PKAuthenticator_create(self, + cusec, + ctime, + nonce, + *, + pa_checksum=None): + pk_authenticator_obj = { + 'cusec': cusec, + 'ctime': ctime, + 'nonce': nonce, + } + if pa_checksum is not None: + pk_authenticator_obj['paChecksum'] = pa_checksum + + return pk_authenticator_obj + def TGS_REQ_create(self, padata, # optional cusec, @@ -2019,6 +2042,382 @@ class RawKerberosTest(TestCase): return krb_priv + def ContentInfo_create(self, content_type, content): + content_info_obj = { + 'contentType': content_type, + 'content': content, + } + + return content_info_obj + + def EncapsulatedContentInfo_create(self, content_type, content): + encapsulated_content_info_obj = { + 'eContentType': content_type, + 'eContent': content, + } + + return encapsulated_content_info_obj + + def SignedData_create(self, + digest_algorithms, + encap_content_info, + signer_infos, + *, + version=None, + certificates=None, + crls=None): + def is_cert_version_present(version): + return certificates is not None and any( + version in cert for cert in certificates) + + def is_crl_version_present(version): + return crls is not None and any( + version in crl for crl in crls) + + def is_signer_info_version_present(version): + return signer_infos is not None and any( + signer_info['version'] == version + for signer_info in signer_infos) + + def data_version(): + # per RFC5652 5.1: + if is_cert_version_present('other') or ( + is_crl_version_present('other')): + return 5 + + if is_cert_version_present('v2AttrCert'): + return 4 + + if is_cert_version_present('v1AttrCert') or ( + is_signer_info_version_present(3)) or ( + encap_content_info['eContentType'] != krb5_asn1.id_data + ): + return 3 + + return 1 + + if version is None: + version = data_version() + + signed_data_obj = { + 'version': version, + 'digestAlgorithms': digest_algorithms, + 'encapContentInfo': encap_content_info, + 'signerInfos': signer_infos, + } + + if certificates is not None: + signed_data_obj['certificates'] = certificates + if crls is not None: + signed_data_obj['crls'] = crls + + return signed_data_obj + + def AuthPack_create(self, + pk_authenticator, + *, + client_public_value=None, + supported_cms_types=None, + client_dh_nonce=None): + auth_pack_obj = { + 'pkAuthenticator': pk_authenticator, + } + + if client_public_value is not None: + auth_pack_obj['clientPublicValue'] = client_public_value + if supported_cms_types is not None: + auth_pack_obj['supportedCMSTypes'] = supported_cms_types + if client_dh_nonce is not None: + auth_pack_obj['clientDHNonce'] = client_dh_nonce + + return auth_pack_obj + + def PK_AS_REQ_create(self, + signed_auth_pack, + *, + trusted_certifiers=None, + kdc_pk_id=None): + content_info_obj = self.ContentInfo_create( + krb5_asn1.id_signedData, signed_auth_pack) + content_info = self.der_encode(content_info_obj, + asn1Spec=krb5_asn1.ContentInfo()) + + pk_as_req_obj = { + 'signedAuthPack': content_info, + } + + if trusted_certifiers is not None: + pk_as_req_obj['trustedCertifiers'] = trusted_certifiers + if kdc_pk_id is not None: + pk_as_req_obj['kdcPkId'] = kdc_pk_id + + return self.der_encode(pk_as_req_obj, + asn1Spec=krb5_asn1.PA_PK_AS_REQ()) + + def SignerInfo_create(self, + signer_id, + digest_algorithm, + signature_algorithm, + signature, + *, + version=None, + signed_attrs=None, + unsigned_attrs=None): + if version is None: + # per RFC5652 5.3: + if 'issuerAndSerialNumber' in signer_id: + version = 1 + elif 'subjectKeyIdentifier' in signer_id: + version = 3 + else: + self.fail(f'unknown signer ID version ({signer_id})') + + signer_info_obj = { + 'version': version, + 'sid': signer_id, + 'digestAlgorithm': digest_algorithm, + 'signatureAlgorithm': signature_algorithm, + 'signature': signature, + } + + if signed_attrs is not None: + signer_info_obj['signedAttrs'] = signed_attrs + if unsigned_attrs is not None: + signer_info_obj['unsignedAttrs'] = unsigned_attrs + + return signer_info_obj + + def SignerIdentifier_create(self, *, + issuer_and_serial_number=None, + subject_key_id=None): + if issuer_and_serial_number is not None: + return {'issuerAndSerialNumber': issuer_and_serial_number} + + if subject_key_id is not None: + return {'subjectKeyIdentifier': subject_key_id} + + self.fail('identifier not specified') + + def AlgorithmIdentifier_create(self, + algorithm, + *, + parameters=None): + algorithm_id_obj = { + 'algorithm': algorithm, + } + + if parameters is not None: + algorithm_id_obj['parameters'] = parameters + + return algorithm_id_obj + + def SubjectPublicKeyInfo_create(self, + algorithm, + public_key): + return { + 'algorithm': algorithm, + 'subjectPublicKey': public_key, + } + + def ValidationParms_create(self, + seed, + pgen_counter): + return { + 'seed': seed, + 'pgenCounter': pgen_counter, + } + + def DomainParameters_create(self, + p, + g, + *, + q=None, + j=None, + validation_parms=None): + domain_params_obj = { + 'p': p, + 'g': g, + } + + if q is not None: + domain_params_obj['q'] = q + if j is not None: + domain_params_obj['j'] = j + if validation_parms is not None: + domain_params_obj['validationParms'] = validation_parms + + return domain_params_obj + + def length_in_bytes(self, value): + """Return the length in bytes of an integer once it is encoded as + bytes.""" + + self.assertGreaterEqual(value, 0, 'value must be positive') + self.assertIsInstance(value, int) + + length_in_bits = max(1, math.log2(value + 1)) + length_in_bytes = math.ceil(length_in_bits / 8) + return length_in_bytes + + def bytes_from_int(self, value, *, length=None): + """Return an integer encoded big-endian into bytes of an optionally + specified length. + """ + if length is None: + length = self.length_in_bytes(value) + return value.to_bytes(length, 'big') + + def int_from_bytes(self, data): + """Return an integer decoded from bytes in big-endian format.""" + return int.from_bytes(data, 'big') + + def int_from_bit_string(self, string): + """Return an integer decoded from a bitstring.""" + return int(string, base=2) + + def bit_string_from_int(self, value): + """Return a bitstring encoding of an integer.""" + + string = f'{value:b}' + + # The bitstring must be padded to a multiple of 8 bits in length, or + # pyasn1 will interpret it incorrectly (as if the padding bits were + # present, but on the wrong end). + length = len(string) + padding_len = math.ceil(length / 8) * 8 - length + return '0' * padding_len + string + + def bit_string_from_bytes(self, data): + """Return a bitstring encoding of bytes in big-endian format.""" + value = self.int_from_bytes(data) + return self.bit_string_from_int(value) + + def bytes_from_bit_string(self, string): + """Return big-endian format bytes encoded from a bitstring.""" + value = self.int_from_bit_string(string) + length = math.ceil(len(string) / 8) + return value.to_bytes(length, 'big') + + def asn1_length(self, data): + """Return the ASN.1 encoding of the length of some data.""" + + length = len(data) + + self.assertGreater(length, 0) + if length < 0x80: + return bytes([length]) + + encoding_len = self.length_in_bytes(length) + self.assertLess(encoding_len, 0x80, + 'item is too long to be ASN.1 encoded') + + data = self.bytes_from_int(length, length=encoding_len) + return bytes([0x80 | encoding_len]) + data + + @staticmethod + def octetstring2key(x, enctype): + """This implements the function defined in RFC4556 3.2.3.1 “Using + Diffie-Hellman Key Exchange”.""" + + seedsize = kcrypto.seedsize(enctype) + seed = b'' + + # A counter that cycles through the bytes 0x00–0xff. + counter = itertools.cycle(map(lambda x: bytes([x]), + range(256))) + + while len(seed) < seedsize: + digest = hashes.Hash(hashes.SHA1(), default_backend()) + digest.update(next(counter) + x) + seed += digest.finalize() + + key = kcrypto.random_to_key(enctype, seed[:seedsize]) + return RodcPacEncryptionKey(key, kvno=None) + + def unpad(self, data): + """Return unpadded data.""" + padding_len = data[-1] + expected_padding = bytes([padding_len]) * padding_len + self.assertEqual(expected_padding, data[-padding_len:], + 'invalid padding bytes') + + return data[:-padding_len] + + def try_decode(self, data, module=None): + """Try to decode some data of unknown type with various known ASN.1 + schemata (optionally restricted to those from a particular module) and + print any results that seem promising. For use when debugging. + """ + + if module is None: + # Try a couple of known ASN.1 modules. + self.try_decode(data, krb5_asn1) + self.try_decode(data, pyasn1.type.univ) + + # It’s helpful to stop and give the user a chance to examine the + # results. + self.fail('decoding done') + + names = dir(module) + for name in names: + item = getattr(module, name) + if not callable(item): + continue + + try: + decoded = self.der_decode(data, asn1Spec=item()) + except Exception: + # Initiating the schema or decoding the ASN.1 failed for + # whatever reason. + pass + else: + # Decoding succeeded: print the structure to be examined. + print(f'\t{name}') + pprint(decoded) + + def cipher_from_algorithm(self, algorithm): + if algorithm == str(krb5_asn1.aes256_CBC_PAD): + return algorithms.AES + + if algorithm == str(krb5_asn1.des_EDE3_CBC): + return algorithms.TripleDES + + self.fail(f'unknown cipher algorithm {algorithm}') + + def hash_from_algorithm(self, algorithm): + # Let someone pass in an ObjectIdentifier. + algorithm = str(algorithm) + + if algorithm == str(krb5_asn1.id_sha1): + return hashes.SHA1 + + if algorithm == str(krb5_asn1.sha1WithRSAEncryption): + return hashes.SHA1 + + if algorithm == str(krb5_asn1.rsaEncryption): + return hashes.SHA1 + + if algorithm == str(krb5_asn1.id_pkcs1_sha256WithRSAEncryption): + return hashes.SHA256 + + if algorithm == str(krb5_asn1.id_sha512): + return hashes.SHA512 + + self.fail(f'unknown hash algorithm {algorithm}') + + def hash_from_algorithm_id(self, algorithm_id): + self.assertIsInstance(algorithm_id, dict) + + hash = self.hash_from_algorithm(algorithm_id['algorithm']) + + parameters = algorithm_id.get('parameters') + if self.strict_checking: + self.assertIsNotNone(parameters) + if parameters is not None: + self.assertEqual(b'\x05\x00', parameters) + + return hash + def kpasswd_create(self, subkey, user_data, @@ -4000,6 +4399,10 @@ class RawKerberosTest(TestCase): return max(filter(lambda e: e in etypes, proposed_etypes), default=None) + @staticmethod + def first_common_etype(etypes, proposed_etypes): + return next(filter(lambda e: e in etypes, proposed_etypes), None) + def supported_aes_rc4_etypes(self, kdc_exchange_dict): creds = kdc_exchange_dict['creds'] supported_etypes = self.get_default_enctypes(creds) @@ -4029,6 +4432,16 @@ class RawKerberosTest(TestCase): return expected_aes, expected_rc4 + def expected_etype(self, kdc_exchange_dict): + req_body = kdc_exchange_dict['req_body'] + proposed_etypes = req_body['etype'] + + aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes( + kdc_exchange_dict) + + return self.first_common_etype(aes_etypes | rc4_etypes, + proposed_etypes) + def check_rep_padata(self, kdc_exchange_dict, callback_dict, @@ -4951,6 +5364,11 @@ class RawKerberosTest(TestCase): return PADATA_REQ_ENC_PA_REP in fast_pa_dict + def sent_pk_as_req(self, kdc_exchange_dict): + fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) + + return PADATA_PK_AS_REQ in fast_pa_dict + def get_sent_pac_options(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) |
