#!/usr/bin/env python3 # Unix SMB/CIFS implementation. # Copyright (C) Stefan Metzmacher 2020 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import sys import os sys.path.insert(0, 'bin/python') os.environ['PYTHONUNBUFFERED'] = '1' from functools import partial import ldb from samba import generate_random_password, ntstatus from samba.dcerpc import netlogon, security from samba.hresult import HRES_SEC_E_LOGON_DENIED import samba.tests.krb5.kcrypto as kcrypto from samba.tests.krb5.kdc_base_test import KDCBaseTest from samba.tests.krb5.rfc4120_constants import ( AES128_CTS_HMAC_SHA1_96, AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5, DES3_CBC_MD5, DES3_CBC_SHA1, DES_CBC_CRC, DES_CBC_MD5, KDC_ERR_ETYPE_NOSUPP, KDC_ERR_POLICY, KDC_ERR_PREAUTH_REQUIRED, KRB_ERROR, NT_PRINCIPAL, NT_SRV_INST, ) import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 global_asn1_print = False global_hexdump = False class ProtectedUsersTests(KDCBaseTest): def setUp(self): super().setUp() self.do_asn1_print = global_asn1_print self.do_hexdump = global_hexdump samdb = self.get_samdb() # Get the old ‘minPwdAge’. minPwdAge = samdb.get_minPwdAge() # Reset the ‘minPwdAge’ as it was before. self.addCleanup(samdb.set_minPwdAge, minPwdAge) # Set it temporarily to ‘0’. samdb.set_minPwdAge('0') # Get account credentials for testing. def _get_creds(self, protected, account_type=KDCBaseTest.AccountType.USER, ntlm=False, member_of=None, supported_enctypes=None, cached=True): opts = { 'kerberos_enabled': not ntlm, } members = () if protected: samdb = self.get_samdb() protected_users_group = (f'') members += (protected_users_group,) if member_of is not None: members += (member_of,) if members: opts['member_of'] = members if supported_enctypes is not None: opts['supported_enctypes'] = supported_enctypes return self.get_cached_creds(account_type=account_type, opts=opts, use_cache=cached) # Test NTLM authentication with a normal account. Authentication should # succeed. def test_ntlm_not_protected(self): client_creds = self._get_creds(protected=False, ntlm=True, cached=False) self._connect(client_creds, simple_bind=False) # Test NTLM authentication with a protected account. Authentication should # fail, as Protected User accounts cannot use NTLM authentication. def test_ntlm_protected(self): client_creds = self._get_creds(protected=True, ntlm=True, cached=False) self._connect(client_creds, simple_bind=False, expect_error=f'{HRES_SEC_E_LOGON_DENIED:08X}') # Test that the Protected Users restrictions still apply when the user is a # member of a group that is itself a member of Protected Users. def test_ntlm_protected_nested(self): samdb = self.get_samdb() group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) protected_users_group = (f'') self.add_to_group(group_dn, ldb.Dn(samdb, protected_users_group), 'member', expect_attr=False) client_creds = self._get_creds(protected=False, ntlm=True, member_of=group_dn) self._connect(client_creds, simple_bind=False, expect_error=f'{HRES_SEC_E_LOGON_DENIED:08X}') # Test SAMR password changes for unprotected and protected accounts. def test_samr_change_password_not_protected(self): # Use a non-cached account so that it is not locked out for other # tests. client_creds = self._get_creds(protected=False, cached=False) self._test_samr_change_password( client_creds, expect_error=None) def test_samr_change_password_protected(self): # Use a non-cached account so that it is not locked out for other # tests. client_creds = self._get_creds(protected=True, cached=False) self._test_samr_change_password( client_creds, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) # Test interactive SamLogon with an unprotected account. def test_samlogon_interactive_not_protected(self): client_creds = self._get_creds(protected=False, ntlm=True) self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation) # Test interactive SamLogon with a protected account. def test_samlogon_interactive_protected(self): client_creds = self._get_creds(protected=True, ntlm=True) self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) # Test network SamLogon with an unprotected account. def test_samlogon_network_not_protected(self): client_creds = self._get_creds(protected=False, ntlm=True) self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) # Test network SamLogon with a protected account. def test_samlogon_network_protected(self): client_creds = self._get_creds(protected=True, ntlm=True) self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) # Test that changing the password of an account in the Protected Users # group still generates an NT hash. def test_protected_nt_hash(self): # Use a non-cached account, as we are changing the password. client_creds = self._get_creds(protected=True, cached=False) client_dn = client_creds.get_dn() new_password = generate_random_password(32, 32) utf16pw = f'"{new_password}"'.encode('utf-16-le') samdb = self.get_samdb() msg = ldb.Message(client_dn) msg['unicodePwd'] = ldb.MessageElement(utf16pw, ldb.FLAG_MOD_REPLACE, 'unicodePwd') samdb.modify(msg) client_creds.set_password(new_password) expected_etypes = { kcrypto.Enctype.AES256, kcrypto.Enctype.AES128, } if self.expect_nt_hash: expected_etypes.add(kcrypto.Enctype.RC4) self.get_keys(client_creds, expected_etypes=expected_etypes) # Test that DES-CBC-CRC cannot be used whether or not the user is # protected. def test_des_cbc_crc_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=DES_CBC_CRC, expect_error=True) def test_des_cbc_crc_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=DES_CBC_CRC, expect_error=True, rc4_support=False) # Test that DES-CBC-MD5 cannot be used whether or not the user is # protected. def test_des_cbc_md5_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=DES_CBC_MD5, expect_error=True) def test_des_cbc_md5_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=DES_CBC_MD5, expect_error=True, rc4_support=False) # Test that DES3-CBC-MD5 cannot be used whether or not the user is # protected. def test_des3_cbc_md5_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=DES3_CBC_MD5, expect_error=True) def test_des3_cbc_md5_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=DES3_CBC_MD5, expect_error=True, rc4_support=False) # Test that DES3-CBC-SHA1 cannot be used whether or not the user is # protected. def test_des3_cbc_sha1_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=DES3_CBC_SHA1, expect_error=True) def test_des3_cbc_sha1_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=DES3_CBC_SHA1, expect_error=True, rc4_support=False) # Test that RC4 may only be used if the user is not protected. def test_rc4_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=ARCFOUR_HMAC_MD5) def test_rc4_protected_aes256_preauth(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=ARCFOUR_HMAC_MD5, preauth_etype=AES256_CTS_HMAC_SHA1_96, rc4_support=False) def test_rc4_protected_rc4_preauth(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=ARCFOUR_HMAC_MD5, preauth_etype=ARCFOUR_HMAC_MD5, expect_error=True, rc4_support=False, expect_edata=False) # Test that AES256 can always be used. def test_aes256_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=AES256_CTS_HMAC_SHA1_96) def test_aes256_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=AES256_CTS_HMAC_SHA1_96, rc4_support=False) def test_aes256_rc4_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=(AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)) def test_aes256_rc4_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=(AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5), rc4_support=False) def test_rc4_aes256_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=(ARCFOUR_HMAC_MD5, AES256_CTS_HMAC_SHA1_96)) def test_rc4_aes256_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=(ARCFOUR_HMAC_MD5, AES256_CTS_HMAC_SHA1_96), rc4_support=False) # Test that AES128 can always be used. def test_aes128_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=AES128_CTS_HMAC_SHA1_96) def test_aes128_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=AES128_CTS_HMAC_SHA1_96, rc4_support=False) def test_aes128_rc4_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=(AES128_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)) def test_aes128_rc4_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=(AES128_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5), rc4_support=False) def test_rc4_aes128_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, etype=(ARCFOUR_HMAC_MD5, AES128_CTS_HMAC_SHA1_96)) def test_rc4_aes128_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, etype=(ARCFOUR_HMAC_MD5, AES128_CTS_HMAC_SHA1_96), rc4_support=False) # Test also with computer accounts. def test_rc4_mac_not_protected(self): client_creds = self._get_creds( protected=False, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=ARCFOUR_HMAC_MD5) def test_rc4_mac_protected_aes256_preauth(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=ARCFOUR_HMAC_MD5, preauth_etype=AES256_CTS_HMAC_SHA1_96, rc4_support=False) def test_rc4_mac_protected_rc4_preauth(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=ARCFOUR_HMAC_MD5, preauth_etype=ARCFOUR_HMAC_MD5, expect_error=True, rc4_support=False, expect_edata=False) def test_aes256_rc4_mac_not_protected(self): client_creds = self._get_creds( protected=False, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=(AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)) def test_aes256_rc4_mac_protected(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=(AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5), rc4_support=False) def test_rc4_aes256_mac_not_protected(self): client_creds = self._get_creds( protected=False, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=(ARCFOUR_HMAC_MD5, AES256_CTS_HMAC_SHA1_96)) def test_rc4_aes256_mac_protected(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=(ARCFOUR_HMAC_MD5, AES256_CTS_HMAC_SHA1_96), rc4_support=False) def test_aes128_rc4_mac_not_protected(self): client_creds = self._get_creds( protected=False, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=(AES128_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)) def test_aes128_rc4_mac_protected(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=(AES128_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5), rc4_support=False) def test_rc4_aes128_mac_not_protected(self): client_creds = self._get_creds( protected=False, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=(ARCFOUR_HMAC_MD5, AES128_CTS_HMAC_SHA1_96)) def test_rc4_aes128_mac_protected(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, etype=(ARCFOUR_HMAC_MD5, AES128_CTS_HMAC_SHA1_96), rc4_support=False) # Test that RC4 can only be used as a preauth etype if the user is not # protected. def test_ts_rc4_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, preauth_etype=ARCFOUR_HMAC_MD5) def test_ts_rc4_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, preauth_etype=ARCFOUR_HMAC_MD5, expect_error=True, rc4_support=False, expect_edata=False) # Test that the etype restrictions still apply if the user is a member of a # group that is itself in the Protected Users group. def test_ts_rc4_protected_nested(self): samdb = self.get_samdb() group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) protected_users_group = (f'') self.add_to_group(group_dn, ldb.Dn(samdb, protected_users_group), 'member', expect_attr=False) client_creds = self._get_creds(protected=False, member_of=group_dn) self._test_etype(client_creds, preauth_etype=ARCFOUR_HMAC_MD5, expect_error=True, rc4_support=False, expect_edata=False) # Test that AES256 can always be used as a preauth etype. def test_ts_aes256_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96) def test_ts_aes256_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96, rc4_support=False) # Test that AES128 can always be used as a preauth etype. def test_ts_aes128_not_protected(self): client_creds = self._get_creds(protected=False) self._test_etype(client_creds, preauth_etype=AES128_CTS_HMAC_SHA1_96) def test_ts_aes128_protected(self): client_creds = self._get_creds(protected=True) self._test_etype(client_creds, preauth_etype=AES128_CTS_HMAC_SHA1_96, rc4_support=False) # Test also with machine accounts. def test_ts_rc4_mac_not_protected(self): client_creds = self._get_creds( protected=False, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, preauth_etype=ARCFOUR_HMAC_MD5) def test_ts_rc4_mac_protected(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, preauth_etype=ARCFOUR_HMAC_MD5, expect_error=True, rc4_support=False, expect_edata=False) def test_ts_aes256_mac_not_protected(self): client_creds = self._get_creds( protected=False, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96) def test_ts_aes256_mac_protected(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96, rc4_support=False) def test_ts_aes128_mac_not_protected(self): client_creds = self._get_creds( protected=False, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, preauth_etype=AES128_CTS_HMAC_SHA1_96) def test_ts_aes128_mac_protected(self): client_creds = self._get_creds( protected=True, account_type=self.AccountType.COMPUTER) self._test_etype(client_creds, preauth_etype=AES128_CTS_HMAC_SHA1_96, rc4_support=False) # Test that the restrictions do not apply to accounts acting as services, # and that RC4 service tickets can still be obtained. def test_service_rc4_only_not_protected(self): client_creds = self.get_client_creds() service_creds = self._get_creds(protected=False, account_type=self.AccountType.COMPUTER, supported_enctypes=kcrypto.Enctype.RC4) tgt = self.get_tgt(client_creds) self.get_service_ticket(tgt, service_creds) def test_service_rc4_only_protected(self): client_creds = self.get_client_creds() service_creds = self._get_creds(protected=True, account_type=self.AccountType.COMPUTER, supported_enctypes=kcrypto.Enctype.RC4) tgt = self.get_tgt(client_creds) self.get_service_ticket(tgt, service_creds) # Test that requesting a ticket with a short lifetime results in a ticket # with that lifetime. def test_tgt_lifetime_shorter_not_protected(self): client_creds = self._get_creds(protected=False) till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._test_etype(client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96, till=till) self.check_ticket_times(tgt, expected_end=till) def test_tgt_lifetime_shorter_protected(self): client_creds = self._get_creds(protected=True) till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._test_etype(client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96, till=till, rc4_support=False) self.check_ticket_times(tgt, expected_end=till, expected_renew_time=till) # Test that requesting a ticket with a long lifetime produces a ticket with # that lifetime, unless the user is protected, whereupon the lifetime will # be capped at four hours. def test_tgt_lifetime_longer_not_protected(self): client_creds = self._get_creds(protected=False) till = self.get_KerberosTime(offset=6 * 60 * 60) # 6 hours tgt = self._test_etype(client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96, till=till) self.check_ticket_times(tgt, expected_end=till) def test_tgt_lifetime_longer_protected(self): client_creds = self._get_creds(protected=True) till = self.get_KerberosTime(offset=6 * 60 * 60) # 6 hours tgt = self._test_etype(client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96, till=till, rc4_support=False) expected_life = 4 * 60 * 60 # 4 hours self.check_ticket_times(tgt, expected_life=expected_life, expected_renew_life=expected_life) # Test that the lifetime of a service ticket is capped to the lifetime of # the TGT. def test_ticket_lifetime_not_protected(self): client_creds = self._get_creds(protected=False) till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._test_etype( client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96, till=till) self.check_ticket_times(tgt, expected_end=till) service_creds = self.get_service_creds() till2 = self.get_KerberosTime(offset=10 * 60 * 60) # 10 hours ticket = self.get_service_ticket(tgt, service_creds, till=till2) self.check_ticket_times(ticket, expected_end=till) def test_ticket_lifetime_protected(self): client_creds = self._get_creds(protected=True) till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._test_etype( client_creds, preauth_etype=AES256_CTS_HMAC_SHA1_96, till=till, rc4_support=False) self.check_ticket_times(tgt, expected_end=till, expected_renew_time=till) service_creds = self.get_service_creds() till2 = self.get_KerberosTime(offset=10 * 60 * 60) # 10 hours ticket = self.get_service_ticket(tgt, service_creds, till=till2) self.check_ticket_times(ticket, expected_end=till) # Test that a request for a forwardable ticket will only be fulfilled if # the user is not protected. def test_forwardable_as_not_protected(self): client_creds = self._get_creds(protected=False) self._get_tgt_check_flags(client_creds, kdc_options='forwardable', expected_flags='forwardable') def test_forwardable_as_protected(self): client_creds = self._get_creds(protected=True) self._get_tgt_check_flags(client_creds, kdc_options='forwardable', unexpected_flags='forwardable', rc4_support=False) # Test that a request for a proxiable ticket will only be fulfilled if the # user is not protected. def test_proxiable_as_not_protected(self): client_creds = self._get_creds(protected=False) self._get_tgt_check_flags(client_creds, kdc_options='proxiable', expected_flags='proxiable') def test_proxiable_as_protected(self): client_creds = self._get_creds(protected=True) self._get_tgt_check_flags(client_creds, kdc_options='proxiable', unexpected_flags='proxiable', rc4_support=False) # An alternate test for Protected Users that passes if we get a policy # error rather than a ticket that is not proxiable. def test_proxiable_as_protected_policy_error(self): client_creds = self._get_creds(protected=True) self._get_tgt_check_flags(client_creds, kdc_options='proxiable', unexpected_flags='proxiable', rc4_support=False, expect_error=True) # Test that if we have a forwardable TGT, then we can use it to obtain a # forwardable service ticket, whether or not the account is protected. def test_forwardable_tgs_not_protected(self): client_creds = self._get_creds(protected=False) tgt = self.get_tgt(client_creds) tgt = self.modified_ticket( tgt, modify_fn=partial(self.modify_ticket_flag, flag='forwardable', value=True), checksum_keys=self.get_krbtgt_checksum_key()) service_creds = self.get_service_creds() self.get_service_ticket( tgt, service_creds, kdc_options='forwardable', expected_flags=krb5_asn1.TicketFlags('forwardable')) def test_forwardable_tgs_protected(self): client_creds = self._get_creds(protected=True) tgt = self.get_tgt(client_creds, rc4_support=False) tgt = self.modified_ticket( tgt, modify_fn=partial(self.modify_ticket_flag, flag='forwardable', value=True), checksum_keys=self.get_krbtgt_checksum_key()) service_creds = self.get_service_creds() self.get_service_ticket( tgt, service_creds, kdc_options='forwardable', expected_flags=krb5_asn1.TicketFlags('forwardable'), rc4_support=False) # Test that if we have a proxiable TGT, then we can use it to obtain a # forwardable service ticket, whether or not the account is protected. def test_proxiable_tgs_not_protected(self): client_creds = self._get_creds(protected=False) tgt = self.get_tgt(client_creds) tgt = self.modified_ticket( tgt, modify_fn=partial(self.modify_ticket_flag, flag='proxiable', value=True), checksum_keys=self.get_krbtgt_checksum_key()) service_creds = self.get_service_creds() self.get_service_ticket( tgt, service_creds, kdc_options='proxiable', expected_flags=krb5_asn1.TicketFlags('proxiable')) def test_proxiable_tgs_protected(self): client_creds = self._get_creds(protected=True) tgt = self.get_tgt(client_creds, rc4_support=False) tgt = self.modified_ticket( tgt, modify_fn=partial(self.modify_ticket_flag, flag='proxiable', value=True), checksum_keys=self.get_krbtgt_checksum_key()) service_creds = self.get_service_creds() self.get_service_ticket( tgt, service_creds, kdc_options='proxiable', expected_flags=krb5_asn1.TicketFlags('proxiable'), rc4_support=False) def check_ticket_times(self, ticket_creds, expected_end=None, expected_life=None, expected_renew_time=None, expected_renew_life=None): ticket = ticket_creds.ticket_private authtime = ticket['authtime'] starttime = ticket.get('starttime', authtime) endtime = ticket['endtime'] renew_till = ticket.get('renew-till', None) starttime = self.get_EpochFromKerberosTime(starttime) if expected_end is None: self.assertIsNotNone(expected_life, 'did not supply expected endtime or lifetime') expected_end = self.get_KerberosTime(epoch=starttime, offset=expected_life) else: self.assertIsNone(expected_life, 'supplied both expected endtime and lifetime') self.assertEqual(expected_end, endtime.decode('ascii')) if renew_till is None: self.assertIsNone(expected_renew_time) self.assertIsNone(expected_renew_life) else: if expected_renew_life is not None: self.assertIsNone( expected_renew_time, 'supplied both expected renew time and lifetime') expected_renew_time = self.get_KerberosTime( epoch=starttime, offset=expected_renew_life) if expected_renew_time is not None: self.assertEqual(expected_renew_time, renew_till.decode('ascii')) def _test_etype(self, creds, expect_error=False, etype=None, preauth_etype=None, till=None, rc4_support=True, expect_edata=None): if etype is None: etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5) elif isinstance(etype, int): etype = (etype,) user_name = creds.get_username() realm = creds.get_realm() salt = creds.get_salt() cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=user_name.split('/')) sname = self.PrincipalName_create(name_type=NT_SRV_INST, names=['krbtgt', realm]) expected_sname = self.PrincipalName_create( name_type=NT_SRV_INST, names=['krbtgt', realm.upper()]) expected_cname = cname if till is None: till = self.get_KerberosTime(offset=36000) renew_time = till krbtgt_creds = self.get_krbtgt_creds() ticket_decryption_key = ( self.TicketDecryptionKey_from_creds(krbtgt_creds)) expected_etypes = krbtgt_creds.tgs_supported_enctypes kdc_options = krb5_asn1.KDCOptions('renewable') expected_flags = krb5_asn1.TicketFlags('renewable') expected_error = KDC_ERR_ETYPE_NOSUPP if expect_error else 0 if preauth_etype is None: if expected_error: expected_error_mode = KDC_ERR_PREAUTH_REQUIRED, expected_error else: expected_error_mode = KDC_ERR_PREAUTH_REQUIRED rep, kdc_exchange_dict = self._test_as_exchange( creds=creds, cname=cname, realm=realm, sname=sname, till=till, renew_time=renew_time, expected_error_mode=expected_error_mode, expected_crealm=realm, expected_cname=expected_cname, expected_srealm=realm, expected_sname=sname, expected_salt=salt, expected_flags=expected_flags, expected_supported_etypes=expected_etypes, etypes=etype, padata=None, kdc_options=kdc_options, ticket_decryption_key=ticket_decryption_key, rc4_support=rc4_support, expect_edata=expect_edata) self.assertIsNotNone(rep) self.assertEqual(KRB_ERROR, rep['msg-type']) error_code = rep['error-code'] if expected_error: self.assertIn(error_code, expected_error_mode) if error_code == expected_error: return else: self.assertEqual(expected_error_mode, error_code) etype_info2 = kdc_exchange_dict['preauth_etype_info2'] preauth_key = self.PasswordKey_from_etype_info2(creds, etype_info2[0], creds.get_kvno()) else: preauth_key = self.PasswordKey_from_creds(creds, preauth_etype) ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key) padata = [ts_enc_padata] expected_realm = realm.upper() rep, kdc_exchange_dict = self._test_as_exchange( creds=creds, cname=cname, realm=realm, sname=sname, till=till, renew_time=renew_time, expected_error_mode=expected_error, expected_crealm=expected_realm, expected_cname=expected_cname, expected_srealm=expected_realm, expected_sname=expected_sname, expected_salt=salt, expected_flags=expected_flags, expected_supported_etypes=expected_etypes, etypes=etype, padata=padata, kdc_options=kdc_options, preauth_key=preauth_key, ticket_decryption_key=ticket_decryption_key, rc4_support=rc4_support, expect_edata=expect_edata) if expect_error: self.check_error_rep(rep, expected_error) return None self.check_as_reply(rep) ticket_creds = kdc_exchange_dict['rep_ticket_creds'] return ticket_creds def _get_tgt_check_flags(self, creds, kdc_options, rc4_support=True, expect_error=False, expected_flags=None, unexpected_flags=None): user_name = creds.get_username() realm = creds.get_realm() salt = creds.get_salt() etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5) cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=user_name.split('/')) sname = self.PrincipalName_create(name_type=NT_SRV_INST, names=['krbtgt', realm]) expected_sname = self.PrincipalName_create( name_type=NT_SRV_INST, names=['krbtgt', realm.upper()]) expected_cname = cname till = self.get_KerberosTime(offset=36000) krbtgt_creds = self.get_krbtgt_creds() ticket_decryption_key = ( self.TicketDecryptionKey_from_creds(krbtgt_creds)) expected_etypes = krbtgt_creds.tgs_supported_enctypes kdc_options = krb5_asn1.KDCOptions(kdc_options) if expected_flags is not None: expected_flags = krb5_asn1.TicketFlags(expected_flags) if unexpected_flags is not None: unexpected_flags = krb5_asn1.TicketFlags(unexpected_flags) rep, kdc_exchange_dict = self._test_as_exchange( creds=creds, cname=cname, realm=realm, sname=sname, till=till, expected_error_mode=KDC_ERR_PREAUTH_REQUIRED, expected_crealm=realm, expected_cname=expected_cname, expected_srealm=realm, expected_sname=sname, expected_salt=salt, expected_flags=expected_flags, unexpected_flags=unexpected_flags, expected_supported_etypes=expected_etypes, etypes=etype, padata=None, kdc_options=kdc_options, ticket_decryption_key=ticket_decryption_key, rc4_support=rc4_support) self.check_pre_authentication(rep) etype_info2 = kdc_exchange_dict['preauth_etype_info2'] preauth_key = self.PasswordKey_from_etype_info2(creds, etype_info2[0], creds.get_kvno()) ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key) padata = [ts_enc_padata] expected_realm = realm.upper() expected_error = KDC_ERR_POLICY if expect_error else 0 rep, kdc_exchange_dict = self._test_as_exchange( creds=creds, cname=cname, realm=realm, sname=sname, till=till, expected_error_mode=expected_error, expected_crealm=expected_realm, expected_cname=expected_cname, expected_srealm=expected_realm, expected_sname=expected_sname, expected_salt=salt, expected_flags=expected_flags, unexpected_flags=unexpected_flags, expected_supported_etypes=expected_etypes, etypes=etype, padata=padata, kdc_options=kdc_options, preauth_key=preauth_key, ticket_decryption_key=ticket_decryption_key, rc4_support=rc4_support) if expect_error: self.check_error_rep(rep, expected_error) return None self.check_as_reply(rep) ticket_creds = kdc_exchange_dict['rep_ticket_creds'] return ticket_creds if __name__ == '__main__': global_asn1_print = False global_hexdump = False import unittest unittest.main()