diff options
| author | Joseph Sutton <josephsutton@catalyst.net.nz> | 2023-07-03 16:34:11 +1200 |
|---|---|---|
| committer | Andrew Bartlett <abartlet@samba.org> | 2023-07-19 01:47:33 +0000 |
| commit | ecc62bc120792ef8157b6f700b42dabdbb9518e5 (patch) | |
| tree | b4e094a10d77ef2cf59f8abff6f64fdfe8477d05 /python/samba | |
| parent | f7393da2c0724839ec8a0510daa114eb8d75a707 (diff) | |
| download | samba-ecc62bc120792ef8157b6f700b42dabdbb9518e5.tar.gz samba-ecc62bc120792ef8157b6f700b42dabdbb9518e5.tar.bz2 samba-ecc62bc120792ef8157b6f700b42dabdbb9518e5.zip | |
tests/krb5: Add tests for PK-INIT Freshness Extension (RFC 8070)
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python/samba')
| -rwxr-xr-x | python/samba/tests/krb5/pkinit_tests.py | 376 | ||||
| -rw-r--r-- | python/samba/tests/krb5/raw_testcase.py | 79 | ||||
| -rw-r--r-- | python/samba/tests/krb5/rfc4120_constants.py | 3 |
3 files changed, 452 insertions, 6 deletions
diff --git a/python/samba/tests/krb5/pkinit_tests.py b/python/samba/tests/krb5/pkinit_tests.py index 2be2cf4aa44..1e1ff1af454 100755 --- a/python/samba/tests/krb5/pkinit_tests.py +++ b/python/samba/tests/krb5/pkinit_tests.py @@ -41,7 +41,14 @@ from samba.tests.krb5.raw_testcase import PkInit from samba.tests.krb5.rfc4120_constants import ( DES_EDE3_CBC, KDC_ERR_ETYPE_NOSUPP, + KDC_ERR_MODIFIED, + KDC_ERR_PREAUTH_EXPIRED, + KDC_ERR_PREAUTH_FAILED, + KDC_ERR_PREAUTH_REQUIRED, + KU_PA_ENC_TIMESTAMP, NT_PRINCIPAL, + PADATA_AS_FRESHNESS, + PADATA_ENC_TIMESTAMP, PADATA_PK_AS_REQ, ) import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 @@ -255,6 +262,364 @@ class PkInitTests(KDCBaseTest): using_pkinit=PkInit.DIFFIE_HELLMAN, certificate_signature=hashes.SHA256) + def test_pkinit_freshness(self): + """Test public-key PK-INIT with the PKINIT Freshness Extension.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + # Perform the AS-REQ to get the freshness token. + kdc_exchange_dict = self._as_req(client_creds, target_creds, + freshness=b'', + expect_error=KDC_ERR_PREAUTH_REQUIRED, + expect_edata=True) + freshness_token = kdc_exchange_dict.get('freshness_token') + self.assertIsNotNone(freshness_token) + + # Include the freshness token in the PK-INIT request. + self._pkinit_req(client_creds, target_creds, + freshness_token=freshness_token) + + def test_pkinit_freshness_dh(self): + """Test Diffie-Hellman PK-INIT with the PKINIT Freshness Extension.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + kdc_exchange_dict = self._as_req(client_creds, target_creds, + freshness=b'', + expect_error=KDC_ERR_PREAUTH_REQUIRED, + expect_edata=True) + freshness_token = kdc_exchange_dict.get('freshness_token') + self.assertIsNotNone(freshness_token) + + self._pkinit_req(client_creds, target_creds, + using_pkinit=PkInit.DIFFIE_HELLMAN, + freshness_token=freshness_token) + + def test_pkinit_freshness_non_empty(self): + """Test sending a non-empty freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + kdc_exchange_dict = self._as_req( + client_creds, target_creds, + freshness=b'A genuine freshness token', + expect_error=KDC_ERR_PREAUTH_REQUIRED, + expect_edata=True) + freshness_token = kdc_exchange_dict.get('freshness_token') + self.assertIsNotNone(freshness_token) + + def test_pkinit_freshness_with_enc_ts(self): + """Test sending a freshness token and ENC-TS in the same request.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + kdc_exchange_dict = self._as_req(client_creds, target_creds, + freshness=b'', + send_enc_ts=True) + + # There should be no freshness token in the reply. + freshness_token = kdc_exchange_dict.get('freshness_token') + self.assertIsNone(freshness_token) + + def test_pkinit_freshness_current(self): + """Test public-key PK-INIT with an up-to-date freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + freshness_token = self.create_freshness_token() + + self._pkinit_req(client_creds, target_creds, + freshness_token=freshness_token) + + def test_pkinit_freshness_current_dh(self): + """Test Diffie-Hellman PK-INIT with an up-to-date freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + freshness_token = self.create_freshness_token() + + self._pkinit_req(client_creds, target_creds, + using_pkinit=PkInit.DIFFIE_HELLMAN, + freshness_token=freshness_token) + + def test_pkinit_freshness_old(self): + """Test public-key PK-INIT with an old freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + # Present a freshness token from fifteen minutes in the past. + fifteen_minutes = timedelta(minutes=15).total_seconds() + freshness_token = self.create_freshness_token(offset=-fifteen_minutes) + + # The request should be rejected. + self._pkinit_req(client_creds, target_creds, + freshness_token=freshness_token, + expect_error=KDC_ERR_PREAUTH_EXPIRED) + + def test_pkinit_freshness_old_dh(self): + """Test Diffie-Hellman PK-INIT with an old freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + # Present a freshness token from fifteen minutes in the past. + fifteen_minutes = timedelta(minutes=15).total_seconds() + freshness_token = self.create_freshness_token(offset=-fifteen_minutes) + + # The request should be rejected. + self._pkinit_req(client_creds, target_creds, + using_pkinit=PkInit.DIFFIE_HELLMAN, + freshness_token=freshness_token, + expect_error=KDC_ERR_PREAUTH_EXPIRED) + + def test_pkinit_freshness_future(self): + """Test public-key PK-INIT with a freshness token from the future.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + # Present a freshness token from fifteen minutes in the future. + fifteen_minutes = timedelta(minutes=15).total_seconds() + freshness_token = self.create_freshness_token(offset=fifteen_minutes) + + # The request should be rejected. + self._pkinit_req(client_creds, target_creds, + freshness_token=freshness_token, + expect_error=KDC_ERR_PREAUTH_EXPIRED) + + def test_pkinit_freshness_future_dh(self): + """Test Diffie-Hellman PK-INIT with a freshness token from the future. + """ + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + # Present a freshness token from fifteen minutes in the future. + fifteen_minutes = timedelta(minutes=15).total_seconds() + freshness_token = self.create_freshness_token(offset=fifteen_minutes) + + # The request should be rejected. + self._pkinit_req(client_creds, target_creds, + using_pkinit=PkInit.DIFFIE_HELLMAN, + freshness_token=freshness_token, + expect_error=KDC_ERR_PREAUTH_EXPIRED) + + def test_pkinit_freshness_invalid(self): + """Test public-key PK-INIT with an invalid freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + freshness_token = b'A genuine freshness token' + + # The request should be rejected. + self._pkinit_req(client_creds, target_creds, + freshness_token=freshness_token, + expect_error=KDC_ERR_MODIFIED) + + def test_pkinit_freshness_invalid_dh(self): + """Test Diffie-Hellman PK-INIT with an invalid freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + freshness_token = b'A genuine freshness token' + + # The request should be rejected. + self._pkinit_req(client_creds, target_creds, + using_pkinit=PkInit.DIFFIE_HELLMAN, + freshness_token=freshness_token, + expect_error=KDC_ERR_MODIFIED) + + def test_pkinit_freshness_rodc_ts(self): + """Test public-key PK-INIT with an RODC-issued freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds() + freshness_token = self.create_freshness_token( + krbtgt_creds=rodc_krbtgt_creds) + + # The token should be rejected. + self._pkinit_req(client_creds, target_creds, + freshness_token=freshness_token, + expect_error=KDC_ERR_PREAUTH_FAILED) + + def test_pkinit_freshness_rodc_dh(self): + """Test Diffie-Hellman PK-INIT with an RODC-issued freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds() + freshness_token = self.create_freshness_token( + krbtgt_creds=rodc_krbtgt_creds) + + # The token should be rejected. + self._pkinit_req(client_creds, target_creds, + using_pkinit=PkInit.DIFFIE_HELLMAN, + freshness_token=freshness_token, + expect_error=KDC_ERR_PREAUTH_FAILED) + + def test_pkinit_freshness_wrong_header(self): + """Test public-key PK-INIT with a modified freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + freshness_token = self.create_freshness_token() + + # Modify the leading two bytes of the freshness token. + freshness_token = b'@@' + freshness_token[2:] + + # Expect to get an error. + self._pkinit_req(client_creds, target_creds, + freshness_token=freshness_token, + expect_error=KDC_ERR_MODIFIED) + + def test_pkinit_freshness_wrong_header_dh(self): + """Test Diffie-Hellman PK-INIT with a modified freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + freshness_token = self.create_freshness_token() + + # Modify the leading two bytes of the freshness token. + freshness_token = b'@@' + freshness_token[2:] + + # Expect to get an error. + self._pkinit_req(client_creds, target_creds, + using_pkinit=PkInit.DIFFIE_HELLMAN, + freshness_token=freshness_token, + expect_error=KDC_ERR_MODIFIED) + + def test_pkinit_freshness_empty(self): + """Test public-key PK-INIT with an empty freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + # Expect to get an error. + self._pkinit_req(client_creds, target_creds, + freshness_token=b'', + expect_error=KDC_ERR_MODIFIED) + + def test_pkinit_freshness_empty_dh(self): + """Test Diffie-Hellman PK-INIT with an empty freshness token.""" + client_creds = self._get_creds() + target_creds = self.get_service_creds() + + # Expect to get an error. + self._pkinit_req(client_creds, target_creds, + using_pkinit=PkInit.DIFFIE_HELLMAN, + freshness_token=b'', + expect_error=KDC_ERR_MODIFIED) + + def _as_req(self, + creds, + target_creds, + *, + expect_error=0, + expect_edata=False, + etypes=None, + freshness=None, + send_enc_ts=False, + ): + if send_enc_ts: + preauth_key = self.PasswordKey_from_creds(creds, kcrypto.Enctype.AES256) + else: + preauth_key = None + + if freshness is not None or send_enc_ts: + def generate_padata_fn(_kdc_exchange_dict, + _callback_dict, + req_body): + padata = [] + + if freshness is not None: + freshness_padata = self.PA_DATA_create(PADATA_AS_FRESHNESS, + freshness) + padata.append(freshness_padata) + + if send_enc_ts: + patime, pausec = self.get_KerberosTimeWithUsec() + enc_ts = self.PA_ENC_TS_ENC_create(patime, pausec) + enc_ts = self.der_encode( + enc_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) + + enc_ts = self.EncryptedData_create(preauth_key, + KU_PA_ENC_TIMESTAMP, + enc_ts) + enc_ts = self.der_encode( + enc_ts, asn1Spec=krb5_asn1.EncryptedData()) + + enc_ts = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, enc_ts) + + padata.append(enc_ts) + + return padata, req_body + else: + generate_padata_fn = None + + user_name = creds.get_username() + cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, + names=user_name.split('/')) + + target_name = target_creds.get_username() + target_realm = target_creds.get_realm() + + sname = self.PrincipalName_create(name_type=NT_PRINCIPAL, + names=['host', target_name[:-1]]) + + if expect_error: + check_error_fn = self.generic_check_kdc_error + check_rep_fn = None + + expected_sname = sname + else: + check_error_fn = None + check_rep_fn = self.generic_check_kdc_rep + + expected_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL, + names=[target_name]) + + kdc_options = ('forwardable,' + 'renewable,' + 'canonicalize,' + 'renewable-ok') + kdc_options = krb5_asn1.KDCOptions(kdc_options) + + ticket_decryption_key = self.TicketDecryptionKey_from_creds( + target_creds) + + kdc_exchange_dict = self.as_exchange_dict( + creds=creds, + expected_crealm=creds.get_realm(), + expected_cname=cname, + expected_srealm=target_realm, + expected_sname=expected_sname, + expected_supported_etypes=target_creds.tgs_supported_enctypes, + ticket_decryption_key=ticket_decryption_key, + generate_padata_fn=generate_padata_fn, + check_error_fn=check_error_fn, + check_rep_fn=check_rep_fn, + check_kdc_private_fn=self.generic_check_kdc_private, + expected_error_mode=expect_error, + expected_salt=creds.get_salt(), + preauth_key=preauth_key, + kdc_options=str(kdc_options), + expect_edata=expect_edata) + + till = self.get_KerberosTime(offset=36000) + + if etypes is None: + etypes = kcrypto.Enctype.AES256, kcrypto.Enctype.RC4, + + rep = self._generic_kdc_exchange(kdc_exchange_dict, + cname=cname, + realm=target_realm, + sname=sname, + till_time=till, + etypes=etypes) + if expect_error: + self.check_error_rep(rep, expect_error) + else: + self.check_as_reply(rep) + + return kdc_exchange_dict + def _pkinit_req(self, creds, target_creds, @@ -266,6 +631,7 @@ class PkInitTests(KDCBaseTest): supported_cms_types=None, signature_algorithm=None, certificate_signature=None, + freshness_token=None, ): self.assertIsNot(using_pkinit, PkInit.NOT_USED) @@ -491,10 +857,12 @@ class PkInitTests(KDCBaseTest): # Create the authenticator, which shows that we had possession of # the private key at some point. - authenticator_obj = self.PKAuthenticator_create(cusec, - ctime, - pk_nonce, - pa_checksum=digest) + authenticator_obj = self.PKAuthenticator_create( + cusec, + ctime, + pk_nonce, + pa_checksum=digest, + freshness_token=freshness_token) if using_pkinit is PkInit.DIFFIE_HELLMAN: dh_public_key = dh_private_key.public_key() diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py index bd76d902540..b7d2d2a42e5 100644 --- a/python/samba/tests/krb5/raw_testcase.py +++ b/python/samba/tests/krb5/raw_testcase.py @@ -79,6 +79,7 @@ from samba.tests.krb5.rfc4120_constants import ( KRB_TGS_REP, KRB_TGS_REQ, KU_AP_REQ_AUTH, + KU_AS_FRESHNESS, KU_AS_REP_ENC_PART, KU_AP_REQ_ENC_PART, KU_AS_REQ, @@ -101,6 +102,7 @@ from samba.tests.krb5.rfc4120_constants import ( NT_PRINCIPAL, NT_SRV_INST, NT_WELLKNOWN, + PADATA_AS_FRESHNESS, PADATA_ENCRYPTED_CHALLENGE, PADATA_ENC_TIMESTAMP, PADATA_ETYPE_INFO, @@ -1839,7 +1841,8 @@ class RawKerberosTest(TestCase): ctime, nonce, *, - pa_checksum=None): + pa_checksum=None, + freshness_token=None): pk_authenticator_obj = { 'cusec': cusec, 'ctime': ctime, @@ -1847,6 +1850,8 @@ class RawKerberosTest(TestCase): } if pa_checksum is not None: pk_authenticator_obj['paChecksum'] = pa_checksum + if freshness_token is not None: + pk_authenticator_obj['freshnessToken'] = freshness_token return pk_authenticator_obj @@ -2438,6 +2443,32 @@ class RawKerberosTest(TestCase): return hash + def create_freshness_token(self, + epoch=None, + *, + offset=None, + krbtgt_creds=None): + timestamp, usec = self.get_KerberosTimeWithUsec(epoch, offset) + + # Encode the freshness token as PA-ENC-TS-ENC. + ts_enc = self.PA_ENC_TS_ENC_create(timestamp, usec) + ts_enc = self.der_encode(ts_enc, asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) + + if krbtgt_creds is None: + krbtgt_creds = self.get_krbtgt_creds() + krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) + + # Encrypt the freshness token. + freshness = self.EncryptedData_create(krbtgt_key, KU_AS_FRESHNESS, ts_enc) + + freshness_token = self.der_encode(freshness, + asn1Spec=krb5_asn1.EncryptedData()) + + # Prepend a couple of zero bytes. + freshness_token = bytes(2) + freshness_token + + return freshness_token + def kpasswd_create(self, subkey, user_data, @@ -4920,6 +4951,8 @@ class RawKerberosTest(TestCase): if len(expect_etype_info2) != 0: expected_patypes += (PADATA_ETYPE_INFO2,) + sent_freshness = self.sent_freshness(kdc_exchange_dict) + if error_code not in (KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW, KDC_ERR_POLICY, KDC_ERR_CLIENT_REVOKED): if sent_fast: @@ -4929,7 +4962,11 @@ class RawKerberosTest(TestCase): if not sent_enc_challenge: expected_patypes += (PADATA_PK_AS_REQ,) - expected_patypes += (PADATA_PK_AS_REP_19,) + if not sent_freshness: + expected_patypes += (PADATA_PK_AS_REP_19,) + + if sent_freshness: + expected_patypes += PADATA_AS_FRESHNESS, if (self.kdc_fast_support and not sent_fast @@ -4970,6 +5007,39 @@ class RawKerberosTest(TestCase): if pk_as_rep19 is not None: self.assertEqual(len(pk_as_rep19), 0) + freshness_token = pa_dict.get(PADATA_AS_FRESHNESS) + if freshness_token is not None: + self.assertEqual(bytes(2), freshness_token[:2]) + + freshness = self.der_decode(freshness_token[2:], + asn1Spec=krb5_asn1.EncryptedData()) + + krbtgt_creds = self.get_krbtgt_creds() + krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) + + self.assertElementEqual(freshness, 'etype', krbtgt_key.etype) + self.assertElementKVNO(freshness, 'kvno', krbtgt_key.kvno) + + # Decrypt the freshness token. + ts_enc = krbtgt_key.decrypt(KU_AS_FRESHNESS, + freshness['cipher']) + + # Ensure that we can decode it as PA-ENC-TS-ENC. + ts_enc = self.der_decode(ts_enc, + asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) + freshness_time = self.get_EpochFromKerberosTime( + ts_enc['patimestamp']) + freshness_time += ts_enc['pausec'] / 1e6 + + # Ensure that it is reasonably close to the current time (within + # five minutes, to allow for clock skew). + current_time = datetime.datetime.now( + datetime.timezone.utc).timestamp() + self.assertLess(current_time - 5 * 60, freshness_time) + self.assertLess(freshness_time, current_time + 5 * 60) + + kdc_exchange_dict['freshness_token'] = freshness_token + fx_fast = pa_dict.get(PADATA_FX_FAST) if fx_fast is not None: self.assertEqual(len(fx_fast), 0) @@ -5797,6 +5867,11 @@ class RawKerberosTest(TestCase): return PADATA_PK_AS_REQ in fast_pa_dict + def sent_freshness(self, kdc_exchange_dict): + fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) + + return PADATA_AS_FRESHNESS in fast_pa_dict + def get_sent_pac_options(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) diff --git a/python/samba/tests/krb5/rfc4120_constants.py b/python/samba/tests/krb5/rfc4120_constants.py index d9d8c30a4b3..583ffbaf6af 100644 --- a/python/samba/tests/krb5/rfc4120_constants.py +++ b/python/samba/tests/krb5/rfc4120_constants.py @@ -125,6 +125,7 @@ KDC_ERR_DIGEST_IN_CERT_NOT_ACCEPTED = 78 KDC_ERR_PA_CHECKSUM_MUST_BE_INCLUDED = 79 KDC_ERR_DIGEST_IN_SIGNED_DATA_NOT_ACCEPTED = 80 KDC_ERR_PUBLIC_KEY_ENCRYPTION_NOT_SUPPORTED = 81 +KDC_ERR_PREAUTH_EXPIRED = 90 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS = 93 # Kpasswd error codes @@ -234,6 +235,8 @@ KU_ENC_CHALLENGE_CLIENT = 54 KU_ENC_CHALLENGE_KDC = 55 KU_AS_REQ = 56 +KU_AS_FRESHNESS = 60 + # Armor types FX_FAST_ARMOR_AP_REQUEST = 1 |
