summaryrefslogtreecommitdiff
path: root/python/samba
diff options
context:
space:
mode:
authorJoseph Sutton <josephsutton@catalyst.net.nz>2023-07-03 14:49:43 +1200
committerAndrew Bartlett <abartlet@samba.org>2023-07-19 01:47:33 +0000
commit7584e7a3a131795b7bb57c59c53754e9b4ab1855 (patch)
tree9a9d820a87c8c64e4ff7588d14cca39c37a17ddc /python/samba
parent7f9547fda793af65346708bbe14f8a4995d50a5a (diff)
downloadsamba-7584e7a3a131795b7bb57c59c53754e9b4ab1855.tar.gz
samba-7584e7a3a131795b7bb57c59c53754e9b4ab1855.tar.bz2
samba-7584e7a3a131795b7bb57c59c53754e9b4ab1855.zip
tests/krb5: Add helper methods for PK-INIT testing
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python/samba')
-rw-r--r--python/samba/tests/krb5/raw_testcase.py418
1 files changed, 418 insertions, 0 deletions
diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py
index 63aca4eff3b..70607ff8493 100644
--- a/python/samba/tests/krb5/raw_testcase.py
+++ b/python/samba/tests/krb5/raw_testcase.py
@@ -25,8 +25,14 @@ import random
import binascii
import itertools
import collections
+import math
from enum import Enum
+from pprint import pprint
+
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.ciphers import algorithms
+from cryptography.hazmat.backends import default_backend
from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
@@ -34,6 +40,7 @@ from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
from pyasn1.codec.ber.encoder import BitStringEncoder
+import pyasn1.type.univ
from pyasn1.error import PyAsn1Error
@@ -1807,6 +1814,22 @@ class RawKerberosTest(TestCase):
Authenticator_obj['authorization-data'] = authorization_data
return Authenticator_obj
+ def PKAuthenticator_create(self,
+ cusec,
+ ctime,
+ nonce,
+ *,
+ pa_checksum=None):
+ pk_authenticator_obj = {
+ 'cusec': cusec,
+ 'ctime': ctime,
+ 'nonce': nonce,
+ }
+ if pa_checksum is not None:
+ pk_authenticator_obj['paChecksum'] = pa_checksum
+
+ return pk_authenticator_obj
+
def TGS_REQ_create(self,
padata, # optional
cusec,
@@ -2019,6 +2042,382 @@ class RawKerberosTest(TestCase):
return krb_priv
+ def ContentInfo_create(self, content_type, content):
+ content_info_obj = {
+ 'contentType': content_type,
+ 'content': content,
+ }
+
+ return content_info_obj
+
+ def EncapsulatedContentInfo_create(self, content_type, content):
+ encapsulated_content_info_obj = {
+ 'eContentType': content_type,
+ 'eContent': content,
+ }
+
+ return encapsulated_content_info_obj
+
+ def SignedData_create(self,
+ digest_algorithms,
+ encap_content_info,
+ signer_infos,
+ *,
+ version=None,
+ certificates=None,
+ crls=None):
+ def is_cert_version_present(version):
+ return certificates is not None and any(
+ version in cert for cert in certificates)
+
+ def is_crl_version_present(version):
+ return crls is not None and any(
+ version in crl for crl in crls)
+
+ def is_signer_info_version_present(version):
+ return signer_infos is not None and any(
+ signer_info['version'] == version
+ for signer_info in signer_infos)
+
+ def data_version():
+ # per RFC5652 5.1:
+ if is_cert_version_present('other') or (
+ is_crl_version_present('other')):
+ return 5
+
+ if is_cert_version_present('v2AttrCert'):
+ return 4
+
+ if is_cert_version_present('v1AttrCert') or (
+ is_signer_info_version_present(3)) or (
+ encap_content_info['eContentType'] != krb5_asn1.id_data
+ ):
+ return 3
+
+ return 1
+
+ if version is None:
+ version = data_version()
+
+ signed_data_obj = {
+ 'version': version,
+ 'digestAlgorithms': digest_algorithms,
+ 'encapContentInfo': encap_content_info,
+ 'signerInfos': signer_infos,
+ }
+
+ if certificates is not None:
+ signed_data_obj['certificates'] = certificates
+ if crls is not None:
+ signed_data_obj['crls'] = crls
+
+ return signed_data_obj
+
+ def AuthPack_create(self,
+ pk_authenticator,
+ *,
+ client_public_value=None,
+ supported_cms_types=None,
+ client_dh_nonce=None):
+ auth_pack_obj = {
+ 'pkAuthenticator': pk_authenticator,
+ }
+
+ if client_public_value is not None:
+ auth_pack_obj['clientPublicValue'] = client_public_value
+ if supported_cms_types is not None:
+ auth_pack_obj['supportedCMSTypes'] = supported_cms_types
+ if client_dh_nonce is not None:
+ auth_pack_obj['clientDHNonce'] = client_dh_nonce
+
+ return auth_pack_obj
+
+ def PK_AS_REQ_create(self,
+ signed_auth_pack,
+ *,
+ trusted_certifiers=None,
+ kdc_pk_id=None):
+ content_info_obj = self.ContentInfo_create(
+ krb5_asn1.id_signedData, signed_auth_pack)
+ content_info = self.der_encode(content_info_obj,
+ asn1Spec=krb5_asn1.ContentInfo())
+
+ pk_as_req_obj = {
+ 'signedAuthPack': content_info,
+ }
+
+ if trusted_certifiers is not None:
+ pk_as_req_obj['trustedCertifiers'] = trusted_certifiers
+ if kdc_pk_id is not None:
+ pk_as_req_obj['kdcPkId'] = kdc_pk_id
+
+ return self.der_encode(pk_as_req_obj,
+ asn1Spec=krb5_asn1.PA_PK_AS_REQ())
+
+ def SignerInfo_create(self,
+ signer_id,
+ digest_algorithm,
+ signature_algorithm,
+ signature,
+ *,
+ version=None,
+ signed_attrs=None,
+ unsigned_attrs=None):
+ if version is None:
+ # per RFC5652 5.3:
+ if 'issuerAndSerialNumber' in signer_id:
+ version = 1
+ elif 'subjectKeyIdentifier' in signer_id:
+ version = 3
+ else:
+ self.fail(f'unknown signer ID version ({signer_id})')
+
+ signer_info_obj = {
+ 'version': version,
+ 'sid': signer_id,
+ 'digestAlgorithm': digest_algorithm,
+ 'signatureAlgorithm': signature_algorithm,
+ 'signature': signature,
+ }
+
+ if signed_attrs is not None:
+ signer_info_obj['signedAttrs'] = signed_attrs
+ if unsigned_attrs is not None:
+ signer_info_obj['unsignedAttrs'] = unsigned_attrs
+
+ return signer_info_obj
+
+ def SignerIdentifier_create(self, *,
+ issuer_and_serial_number=None,
+ subject_key_id=None):
+ if issuer_and_serial_number is not None:
+ return {'issuerAndSerialNumber': issuer_and_serial_number}
+
+ if subject_key_id is not None:
+ return {'subjectKeyIdentifier': subject_key_id}
+
+ self.fail('identifier not specified')
+
+ def AlgorithmIdentifier_create(self,
+ algorithm,
+ *,
+ parameters=None):
+ algorithm_id_obj = {
+ 'algorithm': algorithm,
+ }
+
+ if parameters is not None:
+ algorithm_id_obj['parameters'] = parameters
+
+ return algorithm_id_obj
+
+ def SubjectPublicKeyInfo_create(self,
+ algorithm,
+ public_key):
+ return {
+ 'algorithm': algorithm,
+ 'subjectPublicKey': public_key,
+ }
+
+ def ValidationParms_create(self,
+ seed,
+ pgen_counter):
+ return {
+ 'seed': seed,
+ 'pgenCounter': pgen_counter,
+ }
+
+ def DomainParameters_create(self,
+ p,
+ g,
+ *,
+ q=None,
+ j=None,
+ validation_parms=None):
+ domain_params_obj = {
+ 'p': p,
+ 'g': g,
+ }
+
+ if q is not None:
+ domain_params_obj['q'] = q
+ if j is not None:
+ domain_params_obj['j'] = j
+ if validation_parms is not None:
+ domain_params_obj['validationParms'] = validation_parms
+
+ return domain_params_obj
+
+ def length_in_bytes(self, value):
+ """Return the length in bytes of an integer once it is encoded as
+ bytes."""
+
+ self.assertGreaterEqual(value, 0, 'value must be positive')
+ self.assertIsInstance(value, int)
+
+ length_in_bits = max(1, math.log2(value + 1))
+ length_in_bytes = math.ceil(length_in_bits / 8)
+ return length_in_bytes
+
+ def bytes_from_int(self, value, *, length=None):
+ """Return an integer encoded big-endian into bytes of an optionally
+ specified length.
+ """
+ if length is None:
+ length = self.length_in_bytes(value)
+ return value.to_bytes(length, 'big')
+
+ def int_from_bytes(self, data):
+ """Return an integer decoded from bytes in big-endian format."""
+ return int.from_bytes(data, 'big')
+
+ def int_from_bit_string(self, string):
+ """Return an integer decoded from a bitstring."""
+ return int(string, base=2)
+
+ def bit_string_from_int(self, value):
+ """Return a bitstring encoding of an integer."""
+
+ string = f'{value:b}'
+
+ # The bitstring must be padded to a multiple of 8 bits in length, or
+ # pyasn1 will interpret it incorrectly (as if the padding bits were
+ # present, but on the wrong end).
+ length = len(string)
+ padding_len = math.ceil(length / 8) * 8 - length
+ return '0' * padding_len + string
+
+ def bit_string_from_bytes(self, data):
+ """Return a bitstring encoding of bytes in big-endian format."""
+ value = self.int_from_bytes(data)
+ return self.bit_string_from_int(value)
+
+ def bytes_from_bit_string(self, string):
+ """Return big-endian format bytes encoded from a bitstring."""
+ value = self.int_from_bit_string(string)
+ length = math.ceil(len(string) / 8)
+ return value.to_bytes(length, 'big')
+
+ def asn1_length(self, data):
+ """Return the ASN.1 encoding of the length of some data."""
+
+ length = len(data)
+
+ self.assertGreater(length, 0)
+ if length < 0x80:
+ return bytes([length])
+
+ encoding_len = self.length_in_bytes(length)
+ self.assertLess(encoding_len, 0x80,
+ 'item is too long to be ASN.1 encoded')
+
+ data = self.bytes_from_int(length, length=encoding_len)
+ return bytes([0x80 | encoding_len]) + data
+
+ @staticmethod
+ def octetstring2key(x, enctype):
+ """This implements the function defined in RFC4556 3.2.3.1 “Using
+ Diffie-Hellman Key Exchange”."""
+
+ seedsize = kcrypto.seedsize(enctype)
+ seed = b''
+
+ # A counter that cycles through the bytes 0x00–0xff.
+ counter = itertools.cycle(map(lambda x: bytes([x]),
+ range(256)))
+
+ while len(seed) < seedsize:
+ digest = hashes.Hash(hashes.SHA1(), default_backend())
+ digest.update(next(counter) + x)
+ seed += digest.finalize()
+
+ key = kcrypto.random_to_key(enctype, seed[:seedsize])
+ return RodcPacEncryptionKey(key, kvno=None)
+
+ def unpad(self, data):
+ """Return unpadded data."""
+ padding_len = data[-1]
+ expected_padding = bytes([padding_len]) * padding_len
+ self.assertEqual(expected_padding, data[-padding_len:],
+ 'invalid padding bytes')
+
+ return data[:-padding_len]
+
+ def try_decode(self, data, module=None):
+ """Try to decode some data of unknown type with various known ASN.1
+ schemata (optionally restricted to those from a particular module) and
+ print any results that seem promising. For use when debugging.
+ """
+
+ if module is None:
+ # Try a couple of known ASN.1 modules.
+ self.try_decode(data, krb5_asn1)
+ self.try_decode(data, pyasn1.type.univ)
+
+ # It’s helpful to stop and give the user a chance to examine the
+ # results.
+ self.fail('decoding done')
+
+ names = dir(module)
+ for name in names:
+ item = getattr(module, name)
+ if not callable(item):
+ continue
+
+ try:
+ decoded = self.der_decode(data, asn1Spec=item())
+ except Exception:
+ # Initiating the schema or decoding the ASN.1 failed for
+ # whatever reason.
+ pass
+ else:
+ # Decoding succeeded: print the structure to be examined.
+ print(f'\t{name}')
+ pprint(decoded)
+
+ def cipher_from_algorithm(self, algorithm):
+ if algorithm == str(krb5_asn1.aes256_CBC_PAD):
+ return algorithms.AES
+
+ if algorithm == str(krb5_asn1.des_EDE3_CBC):
+ return algorithms.TripleDES
+
+ self.fail(f'unknown cipher algorithm {algorithm}')
+
+ def hash_from_algorithm(self, algorithm):
+ # Let someone pass in an ObjectIdentifier.
+ algorithm = str(algorithm)
+
+ if algorithm == str(krb5_asn1.id_sha1):
+ return hashes.SHA1
+
+ if algorithm == str(krb5_asn1.sha1WithRSAEncryption):
+ return hashes.SHA1
+
+ if algorithm == str(krb5_asn1.rsaEncryption):
+ return hashes.SHA1
+
+ if algorithm == str(krb5_asn1.id_pkcs1_sha256WithRSAEncryption):
+ return hashes.SHA256
+
+ if algorithm == str(krb5_asn1.id_sha512):
+ return hashes.SHA512
+
+ self.fail(f'unknown hash algorithm {algorithm}')
+
+ def hash_from_algorithm_id(self, algorithm_id):
+ self.assertIsInstance(algorithm_id, dict)
+
+ hash = self.hash_from_algorithm(algorithm_id['algorithm'])
+
+ parameters = algorithm_id.get('parameters')
+ if self.strict_checking:
+ self.assertIsNotNone(parameters)
+ if parameters is not None:
+ self.assertEqual(b'\x05\x00', parameters)
+
+ return hash
+
def kpasswd_create(self,
subkey,
user_data,
@@ -4000,6 +4399,10 @@ class RawKerberosTest(TestCase):
return max(filter(lambda e: e in etypes, proposed_etypes),
default=None)
+ @staticmethod
+ def first_common_etype(etypes, proposed_etypes):
+ return next(filter(lambda e: e in etypes, proposed_etypes), None)
+
def supported_aes_rc4_etypes(self, kdc_exchange_dict):
creds = kdc_exchange_dict['creds']
supported_etypes = self.get_default_enctypes(creds)
@@ -4029,6 +4432,16 @@ class RawKerberosTest(TestCase):
return expected_aes, expected_rc4
+ def expected_etype(self, kdc_exchange_dict):
+ req_body = kdc_exchange_dict['req_body']
+ proposed_etypes = req_body['etype']
+
+ aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes(
+ kdc_exchange_dict)
+
+ return self.first_common_etype(aes_etypes | rc4_etypes,
+ proposed_etypes)
+
def check_rep_padata(self,
kdc_exchange_dict,
callback_dict,
@@ -4951,6 +5364,11 @@ class RawKerberosTest(TestCase):
return PADATA_REQ_ENC_PA_REP in fast_pa_dict
+ def sent_pk_as_req(self, kdc_exchange_dict):
+ fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
+
+ return PADATA_PK_AS_REQ in fast_pa_dict
+
def get_sent_pac_options(self, kdc_exchange_dict):
fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)