diff options
| author | Joseph Sutton <josephsutton@catalyst.net.nz> | 2021-09-13 22:13:24 +1200 |
|---|---|---|
| committer | Andrew Bartlett <abartlet@samba.org> | 2021-09-15 07:59:31 +0000 |
| commit | 3cc9e77f38f6698aa01abca4285a520c7c0cd2ac (patch) | |
| tree | 71afbb27137a7c10282463943a5a191639bdaf6f /python | |
| parent | af633992e31e839cdd7f77740c1f25d129be2f79 (diff) | |
| download | samba-3cc9e77f38f6698aa01abca4285a520c7c0cd2ac.tar.gz samba-3cc9e77f38f6698aa01abca4285a520c7c0cd2ac.tar.bz2 samba-3cc9e77f38f6698aa01abca4285a520c7c0cd2ac.zip | |
tests/krb5: Allow replicating accounts to the RODC
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14642
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Diffstat (limited to 'python')
| -rw-r--r-- | python/samba/tests/krb5/kdc_base_test.py | 141 |
1 files changed, 139 insertions, 2 deletions
diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index 3681d26bb83..56102cfc4ea 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -31,8 +31,9 @@ from samba import generate_random_password from samba.auth import system_session from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security -from samba.drs_utils import drsuapi_connect +from samba.drs_utils import drs_Replicate, drsuapi_connect from samba.dsdb import ( + DSDB_SYNTAX_BINARY_DN, DS_DOMAIN_FUNCTION_2000, DS_DOMAIN_FUNCTION_2008, DS_GUID_COMPUTERS_CONTAINER, @@ -45,7 +46,7 @@ from samba.dsdb import ( ) from samba.ndr import ndr_pack, ndr_unpack from samba import net -from samba.samdb import SamDB +from samba.samdb import SamDB, dsdb_Dn from samba.tests import delete_force import samba.tests.krb5.kcrypto as kcrypto @@ -104,12 +105,20 @@ class KDCBaseTest(RawKerberosTest): cls.account_cache = {} + cls.ldb_cleanups = [] + @classmethod def tearDownClass(cls): # Clean up any accounts created by create_account. This is # done in tearDownClass() rather than tearDown(), so that # accounts need only be created once for permutation tests. if cls._ldb is not None: + for cleanup in reversed(cls.ldb_cleanups): + try: + cls._ldb.modify(cleanup) + except ldb.LdbError: + pass + for dn in cls.accounts: delete_force(cls._ldb, dn) super().tearDownClass() @@ -250,6 +259,76 @@ class KDCBaseTest(RawKerberosTest): return (creds, dn) + def replicate_account_to_rodc(self, dn): + samdb = self.get_samdb() + rodc_samdb = self.get_rodc_samdb() + + repl_val = f'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY' + + msg = ldb.Message() + msg.dn = ldb.Dn(rodc_samdb, '') + msg['replicateSingleObject'] = ldb.MessageElement( + repl_val, + ldb.FLAG_MOD_REPLACE, + 'replicateSingleObject') + + try: + # Try replication using the replicateSingleObject rootDSE + # operation. + rodc_samdb.modify(msg) + except ldb.LdbError as err: + enum, estr = err.args + self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM) + self.assertIn('rootdse_modify: unknown attribute to change!', + estr) + + # If that method wasn't supported, we may be in the rodc:local test + # environment, where we can try replicating to the local database. + + lp = self.get_lp() + + rodc_creds = Credentials() + rodc_creds.guess(lp) + rodc_creds.set_machine_account(lp) + + local_samdb = SamDB(url=None, session_info=system_session(), + credentials=rodc_creds, lp=lp) + + destination_dsa_guid = misc.GUID(local_samdb.get_ntds_GUID()) + + repl = drs_Replicate(f'ncacn_ip_tcp:{self.dc_host}[seal]', + lp, rodc_creds, + local_samdb, destination_dsa_guid) + + source_dsa_invocation_id = misc.GUID(samdb.invocation_id) + + repl.replicate(dn, + source_dsa_invocation_id, + destination_dsa_guid, + exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET, + rodc=True) + + def check_revealed(self, dn, rodc_dn, revealed=True): + samdb = self.get_samdb() + + res = samdb.search(base=rodc_dn, + scope=ldb.SCOPE_BASE, + attrs=['msDS-RevealedUsers']) + + revealed_users = res[0].get('msDS-RevealedUsers') + if revealed_users is None: + self.assertFalse(revealed) + return + + revealed_dns = set(str(dsdb_Dn(samdb, str(user), + syntax_oid=DSDB_SYNTAX_BINARY_DN).dn) + for user in revealed_users) + + if revealed: + self.assertIn(str(dn), revealed_dns) + else: + self.assertNotIn(str(dn), revealed_dns) + def get_secrets(self, samdb, dn, destination_dsa_guid, source_dsa_invocation_id): @@ -375,6 +454,29 @@ class KDCBaseTest(RawKerberosTest): creds.set_tgs_supported_enctypes(supported_enctypes) creds.set_ap_supported_enctypes(supported_enctypes) + def add_to_group(self, account_dn, group_dn, group_attr): + samdb = self.get_samdb() + + res = samdb.search(base=group_dn, + scope=ldb.SCOPE_BASE, + attrs=[group_attr]) + orig_msg = res[0] + + members = list(orig_msg[group_attr]) + members.append(account_dn) + + msg = ldb.Message() + msg.dn = group_dn + msg[group_attr] = ldb.MessageElement(members, + ldb.FLAG_MOD_REPLACE, + group_attr) + + cleanup = samdb.msg_diff(msg, orig_msg) + self.ldb_cleanups.append(cleanup) + samdb.modify(msg) + + return cleanup + def get_cached_creds(self, *, machine_account, opts=None): @@ -382,6 +484,9 @@ class KDCBaseTest(RawKerberosTest): opts = {} opts_default = { + 'allowed_replication': False, + 'denied_replication': False, + 'revealed_to_rodc': False, 'no_auth_data_required': False, 'supported_enctypes': None, 'not_delegated': False, @@ -407,6 +512,9 @@ class KDCBaseTest(RawKerberosTest): def create_account_opts(self, *, machine_account, + allowed_replication, + denied_replication, + revealed_to_rodc, no_auth_data_required, supported_enctypes, not_delegated, @@ -420,6 +528,9 @@ class KDCBaseTest(RawKerberosTest): self.assertFalse(trusted_to_auth_for_delegation) samdb = self.get_samdb() + rodc_samdb = self.get_rodc_samdb() + + rodc_dn = self.get_server_dn(rodc_samdb) user_name = self.account_base + str(self.account_id) type(self).account_id += 1 @@ -475,6 +586,32 @@ class KDCBaseTest(RawKerberosTest): creds.set_tgs_supported_enctypes(tgs_enctypes) + # Handle secret replication to the RODC. + + if allowed_replication or revealed_to_rodc: + # Allow replicating this account's secrets if requested, or allow + # it only temporarily if we're about to replicate them. + allowed_cleanup = self.add_to_group( + dn, rodc_dn, + 'msDS-RevealOnDemandGroup') + + if revealed_to_rodc: + # Replicate this account's secrets to the RODC. + self.replicate_account_to_rodc(dn) + + if not allowed_replication: + # If we don't want replicating secrets to be allowed for this + # account, disable it again. + samdb.modify(allowed_cleanup) + + self.check_revealed(dn, + rodc_dn, + revealed=revealed_to_rodc) + + if denied_replication: + # Deny replicating this account's secrets to the RODC. + self.add_to_group(dn, rodc_dn, 'msDS-NeverRevealGroup') + return creds def get_client_creds(self, |
