summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDavid Mulder <dmulder@suse.com>2022-05-04 15:01:22 -0600
committerAndreas Schneider <asn@cryptomilk.org>2022-05-13 14:46:29 +0000
commitd3e0eec03cd93dcceaec7328ba8252bfa78f968e (patch)
treefd6adc363f8cf91a707009148806a742201eaac4 /python
parent53a55428948e20f6ce42fbf39f99190ef55fb81f (diff)
downloadsamba-d3e0eec03cd93dcceaec7328ba8252bfa78f968e.tar.gz
samba-d3e0eec03cd93dcceaec7328ba8252bfa78f968e.tar.bz2
samba-d3e0eec03cd93dcceaec7328ba8252bfa78f968e.zip
gpo: Remove sscep depends from Cert Auto Enroll
Certificate Auto Enrollment currently depends on sscep to retrieve the root certificate chain. This isn't necessary, since this can be accomplished with a simple GET. Signed-off-by: David Mulder <dmulder@suse.com> Reviewed-by: Andreas Schneider <asn@samba.org>
Diffstat (limited to 'python')
-rw-r--r--python/samba/gp_cert_auto_enroll_ext.py109
-rwxr-xr-xpython/samba/tests/bin/sscep19
-rw-r--r--python/samba/tests/gpo.py58
3 files changed, 125 insertions, 61 deletions
diff --git a/python/samba/gp_cert_auto_enroll_ext.py b/python/samba/gp_cert_auto_enroll_ext.py
index 7b604e5065d..7cf19b8c839 100644
--- a/python/samba/gp_cert_auto_enroll_ext.py
+++ b/python/samba/gp_cert_auto_enroll_ext.py
@@ -16,6 +16,7 @@
import os
import operator
+import requests
from samba.gpclass import gp_pol_ext
from samba import Ldb
from ldb import SCOPE_SUBTREE, SCOPE_BASE
@@ -25,10 +26,19 @@ import base64
from shutil import which
from subprocess import Popen, PIPE
import re
-from glob import glob
import json
from samba.gp.util.logging import log
import struct
+try:
+ from cryptography.hazmat.primitives.serialization.pkcs7 import \
+ load_der_pkcs7_certificates
+except ModuleNotFoundError:
+ def load_der_pkcs7_certificates(x): return []
+ log.error('python cryptography missing pkcs7 support. '
+ 'Certificate chain parsing will fail')
+from cryptography.hazmat.primitives.serialization import Encoding
+from cryptography.x509 import load_der_x509_certificate
+from cryptography.hazmat.backends import default_backend
cert_wrap = b"""
-----BEGIN CERTIFICATE-----
@@ -180,43 +190,72 @@ def get_supported_templates(server):
return out.strip().split()
return []
+
+def getca(ca_name, url, trust_dir):
+ """Fetch Certificate Chain from the CA."""
+ root_cert = os.path.join(trust_dir, '%s.crt' % ca_name)
+ root_certs = []
+
+ try:
+ r = requests.get(url=url, params={'operation': 'GetCACert',
+ 'message': 'CAIdentifier'})
+ except requests.exceptions.ConnectionError:
+ log.warn('Failed to establish a new connection')
+ r = None
+ if r is None or r.content == b'':
+ log.warn('Failed to fetch the root certificate chain.')
+ log.warn('Ensure you have installed and configured the'
+ ' Network Device Enrollment Service.')
+ return root_certs
+
+ if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
+ # Older versions of load_der_x509_certificate require a backend param
+ try:
+ cert = load_der_x509_certificate(r.content)
+ except TypeError:
+ cert = load_der_x509_certificate(r.content, default_backend())
+ cert_data = cert.public_bytes(Encoding.PEM)
+ with open(root_cert, 'wb') as w:
+ w.write(cert_data)
+ root_certs.append(root_cert)
+ elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
+ certs = load_der_pkcs7_certificates(r.content)
+ for i in range(0, len(certs)):
+ cert = certs[i].public_bytes(Encoding.PEM)
+ dest = '%s.%d' % (root_cert, i)
+ with open(dest, 'wb') as w:
+ w.write(cert)
+ root_certs.append(dest)
+ else:
+ log.warn('getca: Wrong (or missing) MIME content type')
+
+ return root_certs
+
+
def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
# Install the root certificate chain
data = {'files': [], 'templates': []}
- sscep = which('sscep')
- if sscep is not None:
- url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % \
- ca['hostname']
- root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
- ret = Popen([sscep, 'getca', '-F', 'sha1', '-c',
- root_cert, '-u', url]).wait()
- if ret != 0:
- log.warn('sscep failed to fetch the root certificate chain.')
- log.warn('Ensure you have installed and configured the' +
- ' Network Device Enrollment Service.')
- root_certs = glob('%s*' % root_cert)
- data['files'].extend(root_certs)
- for src in root_certs:
- # Symlink the certs to global trust dir
- dst = os.path.join(global_trust_dir, os.path.basename(src))
- try:
- os.symlink(src, dst)
- data['files'].append(dst)
- except PermissionError:
- log.warn('Failed to symlink root certificate to the' +
- ' admin trust anchors')
- except FileNotFoundError:
- log.warn('Failed to symlink root certificate to the' +
- ' admin trust anchors.' +
- ' The directory was not found', global_trust_dir)
- except FileExistsError:
- # If we're simply downloading a renewed cert, the symlink
- # already exists. Ignore the FileExistsError. Preserve the
- # existing symlink in the unapply data.
- data['files'].append(dst)
- else:
- log.warn('sscep is not installed, which prevents the installation' +
- ' of the root certificate chain.')
+ url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
+ root_certs = getca(ca['name'], url, trust_dir)
+ data['files'].extend(root_certs)
+ for src in root_certs:
+ # Symlink the certs to global trust dir
+ dst = os.path.join(global_trust_dir, os.path.basename(src))
+ try:
+ os.symlink(src, dst)
+ data['files'].append(dst)
+ except PermissionError:
+ log.warn('Failed to symlink root certificate to the'
+ ' admin trust anchors')
+ except FileNotFoundError:
+ log.warn('Failed to symlink root certificate to the'
+ ' admin trust anchors.'
+ ' The directory was not found', global_trust_dir)
+ except FileExistsError:
+ # If we're simply downloading a renewed cert, the symlink
+ # already exists. Ignore the FileExistsError. Preserve the
+ # existing symlink in the unapply data.
+ data['files'].append(dst)
update = which('update-ca-certificates')
if update is not None:
Popen([update]).wait()
diff --git a/python/samba/tests/bin/sscep b/python/samba/tests/bin/sscep
deleted file mode 100755
index d0d88926766..00000000000
--- a/python/samba/tests/bin/sscep
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/usr/bin/python3
-import optparse
-import os, sys, re
-
-sys.path.insert(0, "bin/python")
-
-if __name__ == "__main__":
- parser = optparse.OptionParser('sscep <cmd> [options]')
- parser.add_option('-F')
- parser.add_option('-c')
- parser.add_option('-u')
-
- (opts, args) = parser.parse_args()
- assert len(args) == 1
- assert args[0] == 'getca'
- assert opts.F == 'sha1'
- # Create dummy root cert (empty file)
- with open(opts.c, 'w') as w:
- pass
diff --git a/python/samba/tests/gpo.py b/python/samba/tests/gpo.py
index 7d3cb878b93..e1f81e8a50d 100644
--- a/python/samba/tests/gpo.py
+++ b/python/samba/tests/gpo.py
@@ -41,8 +41,7 @@ from samba.vgp_motd_ext import vgp_motd_ext
from samba.vgp_issue_ext import vgp_issue_ext
from samba.vgp_access_ext import vgp_access_ext
from samba.gp_gnome_settings_ext import gp_gnome_settings_ext
-from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext, \
- octet_string_to_objectGUID
+from samba import gp_cert_auto_enroll_ext as cae
from samba.gp_firefox_ext import gp_firefox_ext
from samba.gp_chromium_ext import gp_chromium_ext
from samba.gp_firewalld_ext import gp_firewalld_ext
@@ -67,6 +66,51 @@ import ldb as _ldb
from samba.auth import system_session
import json
from shutil import which
+import requests
+from cryptography import x509
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.serialization import Encoding
+from datetime import datetime, timedelta
+
+def dummy_certificate():
+ name = x509.Name([
+ x509.NameAttribute(x509.NameOID.COMMON_NAME,
+ os.environ.get('SERVER'))
+ ])
+ cons = x509.BasicConstraints(ca=True, path_length=0)
+ now = datetime.utcnow()
+
+ key = rsa.generate_private_key(public_exponent=65537, key_size=2048,
+ backend=default_backend())
+
+ cert = (
+ x509.CertificateBuilder()
+ .subject_name(name)
+ .issuer_name(name)
+ .public_key(key.public_key())
+ .serial_number(1000)
+ .not_valid_before(now)
+ .not_valid_after(now + timedelta(seconds=300))
+ .add_extension(cons, False)
+ .sign(key, hashes.SHA256(), default_backend())
+ )
+
+ return cert.public_bytes(encoding=Encoding.DER)
+
+# Dummy requests structure for Certificate Auto Enrollment
+class dummy_requests(object):
+ @staticmethod
+ def get(url=None, params=None):
+ dummy = requests.Response()
+ dummy._content = dummy_certificate()
+ dummy.headers = {'Content-Type': 'application/x-x509-ca-cert'}
+ return dummy
+
+ class exceptions(object):
+ ConnectionError = Exception
+cae.requests = dummy_requests
realm = os.environ.get('REALM')
policies = realm + '/POLICIES'
@@ -8708,8 +8752,8 @@ class GPOTests(tests.TestCase):
machine_creds.set_machine_account()
# Initialize the group policy extension
- ext = gp_cert_auto_enroll_ext(self.lp, machine_creds,
- machine_creds.get_username(), store)
+ ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds,
+ machine_creds.get_username(), store)
ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
if ads.connect():
@@ -9069,8 +9113,8 @@ class GPOTests(tests.TestCase):
machine_creds.set_machine_account()
# Initialize the group policy extension
- ext = gp_cert_auto_enroll_ext(self.lp, machine_creds,
- machine_creds.get_username(), store)
+ ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds,
+ machine_creds.get_username(), store)
ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
if ads.connect():
@@ -9093,7 +9137,7 @@ class GPOTests(tests.TestCase):
_ldb.SCOPE_BASE, '(objectClass=*)', ['objectGUID'])
self.assertTrue(len(res2) == 1, 'objectGUID not found')
objectGUID = b'{%s}' % \
- octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper().encode()
+ cae.octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper().encode()
parser = GPPolParser()
parser.load_xml(etree.fromstring(advanced_enroll_reg_pol.strip() % \
(objectGUID, objectGUID, objectGUID, objectGUID)))