diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/samba/tests/gpo.py | 126 |
1 files changed, 120 insertions, 6 deletions
diff --git a/python/samba/tests/gpo.py b/python/samba/tests/gpo.py index 81bf3b8c03b..9177eef5afa 100644 --- a/python/samba/tests/gpo.py +++ b/python/samba/tests/gpo.py @@ -103,17 +103,21 @@ def dummy_certificate(): # Dummy requests structure for Certificate Auto Enrollment class dummy_requests(object): - @staticmethod - def get(url=None, params=None): + class exceptions(object): + ConnectionError = Exception + + def __init__(self, want_exception=False): + self.want_exception = want_exception + + def get(self, url=None, params=None): + if self.want_exception: + raise self.exceptions.ConnectionError + dummy = requests.Response() dummy._content = dummy_certificate() dummy.headers = {'Content-Type': 'application/x-x509-ca-cert'} return dummy - class exceptions(object): - ConnectionError = Exception -cae.requests = dummy_requests - realm = os.environ.get('REALM') policies = realm + '/POLICIES' realm = realm.lower() @@ -6906,6 +6910,114 @@ class GPOTests(tests.TestCase): # Unstage the Registry.pol file unstage_file(reg_pol) + def test_gp_cert_auto_enroll_ext_without_ndes(self): + local_path = self.lp.cache_path('gpo_cache') + guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}' + reg_pol = os.path.join(local_path, policies, guid, + 'MACHINE/REGISTRY.POL') + cache_dir = self.lp.get('cache directory') + store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb')) + + machine_creds = Credentials() + machine_creds.guess(self.lp) + machine_creds.set_machine_account() + + # Initialize the group policy extension + cae.requests = dummy_requests(want_exception=True) + ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds, + machine_creds.get_username(), store) + + gpos = get_gpo_list(self.server, machine_creds, self.lp, + machine_creds.get_username()) + + # Stage the Registry.pol file with test data + parser = GPPolParser() + parser.load_xml(etree.fromstring(auto_enroll_reg_pol.strip())) + ret = stage_file(reg_pol, ndr_pack(parser.pol_file)) + self.assertTrue(ret, 'Could not create the target %s' % reg_pol) + + # Write the dummy CA entry, Enrollment Services, and Templates Entries + admin_creds = Credentials() + admin_creds.set_username(os.environ.get('DC_USERNAME')) + admin_creds.set_password(os.environ.get('DC_PASSWORD')) + admin_creds.set_realm(os.environ.get('REALM')) + hostname = get_dc_hostname(machine_creds, self.lp) + url = 'ldap://%s' % hostname + ldb = Ldb(url=url, session_info=system_session(), + lp=self.lp, credentials=admin_creds) + # Write the dummy CA + confdn = 'CN=Public Key Services,CN=Services,CN=Configuration,%s' % base_dn + ca_cn = '%s-CA' % hostname.replace('.', '-') + certa_dn = 'CN=%s,CN=Certification Authorities,%s' % (ca_cn, confdn) + ldb.add({'dn': certa_dn, + 'objectClass': 'certificationAuthority', + 'authorityRevocationList': ['XXX'], + 'cACertificate': dummy_certificate(), + 'certificateRevocationList': ['XXX'], + }) + # Write the dummy pKIEnrollmentService + enroll_dn = 'CN=%s,CN=Enrollment Services,%s' % (ca_cn, confdn) + ldb.add({'dn': enroll_dn, + 'objectClass': 'pKIEnrollmentService', + 'cACertificate': dummy_certificate(), + 'certificateTemplates': ['Machine'], + 'dNSHostName': hostname, + }) + # Write the dummy pKICertificateTemplate + template_dn = 'CN=Machine,CN=Certificate Templates,%s' % confdn + ldb.add({'dn': template_dn, + 'objectClass': 'pKICertificateTemplate', + }) + + with TemporaryDirectory() as dname: + try: + ext.process_group_policy([], gpos, dname, dname) + except Exception as e: + self.fail(str(e)) + + ca_crt = os.path.join(dname, '%s.crt' % ca_cn) + self.assertTrue(os.path.exists(ca_crt), + 'Root CA certificate was not requested') + machine_crt = os.path.join(dname, '%s.Machine.crt' % ca_cn) + self.assertTrue(os.path.exists(machine_crt), + 'Machine certificate was not requested') + machine_key = os.path.join(dname, '%s.Machine.key' % ca_cn) + self.assertTrue(os.path.exists(machine_key), + 'Machine key was not generated') + + # Verify RSOP does not fail + ext.rsop([g for g in gpos if g.name == guid][0]) + + # Check that a call to gpupdate --rsop also succeeds + ret = rsop(self.lp) + self.assertEqual(ret, 0, 'gpupdate --rsop failed!') + + # Remove policy + gp_db = store.get_gplog(machine_creds.get_username()) + del_gpos = get_deleted_gpos_list(gp_db, []) + ext.process_group_policy(del_gpos, [], dname) + self.assertFalse(os.path.exists(ca_crt), + 'Root CA certificate was not removed') + self.assertFalse(os.path.exists(machine_crt), + 'Machine certificate was not removed') + self.assertFalse(os.path.exists(machine_key), + 'Machine key was not removed') + out, _ = Popen(['getcert', 'list-cas'], stdout=PIPE).communicate() + self.assertNotIn(get_bytes(ca_cn), out, 'CA was not removed') + out, _ = Popen(['getcert', 'list'], stdout=PIPE).communicate() + self.assertNotIn(b'Machine', out, + 'Machine certificate not removed') + self.assertNotIn(b'Workstation', out, + 'Workstation certificate not removed') + + # Remove the dummy CA, pKIEnrollmentService, and pKICertificateTemplate + ldb.delete(certa_dn) + ldb.delete(enroll_dn) + ldb.delete(template_dn) + + # Unstage the Registry.pol file + unstage_file(reg_pol) + def test_gp_cert_auto_enroll_ext(self): local_path = self.lp.cache_path('gpo_cache') guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}' @@ -6919,6 +7031,7 @@ class GPOTests(tests.TestCase): machine_creds.set_machine_account() # Initialize the group policy extension + cae.requests = dummy_requests() ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds, machine_creds.get_username(), store) @@ -7517,6 +7630,7 @@ class GPOTests(tests.TestCase): machine_creds.set_machine_account() # Initialize the group policy extension + cae.requests = dummy_requests() ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds, machine_creds.get_username(), store) |
