diff options
Diffstat (limited to 'python/samba/gp/gp_cert_auto_enroll_ext.py')
-rw-r--r-- | python/samba/gp/gp_cert_auto_enroll_ext.py | 572 |
1 files changed, 572 insertions, 0 deletions
diff --git a/python/samba/gp/gp_cert_auto_enroll_ext.py b/python/samba/gp/gp_cert_auto_enroll_ext.py new file mode 100644 index 0000000..9b743cb --- /dev/null +++ b/python/samba/gp/gp_cert_auto_enroll_ext.py @@ -0,0 +1,572 @@ +# gp_cert_auto_enroll_ext samba group policy +# Copyright (C) David Mulder <dmulder@suse.com> 2021 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import operator +import requests +from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE +from samba import Ldb +from ldb import SCOPE_SUBTREE, SCOPE_BASE +from samba.auth import system_session +from samba.gp.gpclass import get_dc_hostname +import base64 +from shutil import which +from subprocess import Popen, PIPE +import re +import json +from samba.gp.util.logging import log +import struct +try: + from cryptography.hazmat.primitives.serialization.pkcs7 import \ + load_der_pkcs7_certificates +except ModuleNotFoundError: + def load_der_pkcs7_certificates(x): return [] + log.error('python cryptography missing pkcs7 support. ' + 'Certificate chain parsing will fail') +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.x509 import load_der_x509_certificate +from cryptography.hazmat.backends import default_backend +from samba.common import get_string + +cert_wrap = b""" +-----BEGIN CERTIFICATE----- +%s +-----END CERTIFICATE-----""" +endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \ + '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP' + +global_trust_dirs = ['/etc/pki/trust/anchors', # SUSE + '/etc/pki/ca-trust/source/anchors', # RHEL/Fedora + '/usr/local/share/ca-certificates'] # Debian/Ubuntu + +def octet_string_to_objectGUID(data): + """Convert an octet string to an objectGUID.""" + return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0], + '%02x' % struct.unpack('<H', data[4:6])[0], + '%02x' % struct.unpack('<H', data[6:8])[0], + '%02x' % struct.unpack('>H', data[8:10])[0], + '%02x%02x' % struct.unpack('>HL', data[10:])) + + +def group_and_sort_end_point_information(end_point_information): + """Group and Sort End Point Information. + + [MS-CAESO] 4.4.5.3.2.3 + In this step autoenrollment processes the end point information by grouping + it by CEP ID and sorting in the order with which it will use the end point + to access the CEP information. + """ + # Create groups of the CertificateEnrollmentPolicyEndPoint instances that + # have the same value of the EndPoint.PolicyID datum. + end_point_groups = {} + for e in end_point_information: + if e['PolicyID'] not in end_point_groups.keys(): + end_point_groups[e['PolicyID']] = [] + end_point_groups[e['PolicyID']].append(e) + + # Sort each group by following these rules: + for end_point_group in end_point_groups.values(): + # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending + # order based on the EndPoint.Cost value. + end_point_group.sort(key=lambda e: e['Cost']) + + # For instances that have the same EndPoint.Cost: + cost_list = [e['Cost'] for e in end_point_group] + costs = set(cost_list) + for cost in costs: + i = cost_list.index(cost) + j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1 + if i == j: + continue + + # Sort those that have EndPoint.Authentication equal to Kerberos + # first. Then sort those that have EndPoint.Authentication equal to + # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint + # instances follow in an arbitrary order. + def sort_auth(e): + # 0x2 - Kerberos + if e['AuthFlags'] == 0x2: + return 0 + # 0x1 - Anonymous + elif e['AuthFlags'] == 0x1: + return 1 + else: + return 2 + end_point_group[i:j+1] = sorted(end_point_group[i:j+1], + key=sort_auth) + return list(end_point_groups.values()) + +def obtain_end_point_information(entries): + """Obtain End Point Information. + + [MS-CAESO] 4.4.5.3.2.2 + In this step autoenrollment initializes the + CertificateEnrollmentPolicyEndPoints table. + """ + end_point_information = {} + section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\' + for e in entries: + if not e.keyname.startswith(section): + continue + name = e.keyname.replace(section, '') + if name not in end_point_information.keys(): + end_point_information[name] = {} + end_point_information[name][e.valuename] = e.data + for ca in end_point_information.values(): + m = re.match(endpoint_re, ca['URL']) + if m: + name = '%s-CA' % m.group('server').replace('.', '-') + ca['name'] = name + ca['hostname'] = m.group('server') + ca['auth'] = m.group('auth') + elif ca['URL'].lower() != 'ldap:': + edata = { 'endpoint': ca['URL'] } + log.error('Failed to parse the endpoint', edata) + return {} + end_point_information = \ + group_and_sort_end_point_information(end_point_information.values()) + return end_point_information + +def fetch_certification_authorities(ldb): + """Initialize CAs. + + [MS-CAESO] 4.4.5.3.1.2 + """ + result = [] + basedn = ldb.get_default_basedn() + # Autoenrollment MUST do an LDAP search for the CA information + # (pKIEnrollmentService) objects under the following container: + dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn + attrs = ['cACertificate', 'cn', 'dNSHostName'] + expr = '(objectClass=pKIEnrollmentService)' + res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs) + if len(res) == 0: + return result + for es in res: + data = { 'name': get_string(es['cn'][0]), + 'hostname': get_string(es['dNSHostName'][0]), + 'cACertificate': get_string(base64.b64encode(es['cACertificate'][0])) + } + result.append(data) + return result + +def fetch_template_attrs(ldb, name, attrs=None): + if attrs is None: + attrs = ['msPKI-Minimal-Key-Size'] + basedn = ldb.get_default_basedn() + dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn + expr = '(cn=%s)' % name + res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs) + if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]: + return dict(res[0]) + else: + return {'msPKI-Minimal-Key-Size': ['2048']} + +def format_root_cert(cert): + return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert.encode(), 0, re.DOTALL) + +def find_cepces_submit(): + certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger', + '/usr/libexec/certmonger'] + return which('cepces-submit', path=':'.join(certmonger_dirs)) + +def get_supported_templates(server): + cepces_submit = find_cepces_submit() + if not cepces_submit: + log.error('Failed to find cepces-submit') + return [] + + env = os.environ + env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES' + p = Popen([cepces_submit, '--server=%s' % server, '--auth=Kerberos'], + env=env, stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + if p.returncode != 0: + data = {'Error': err.decode()} + log.error('Failed to fetch the list of supported templates.', data) + return out.strip().split() + + +def getca(ca, url, trust_dir): + """Fetch Certificate Chain from the CA.""" + root_cert = os.path.join(trust_dir, '%s.crt' % ca['name']) + root_certs = [] + + try: + r = requests.get(url=url, params={'operation': 'GetCACert', + 'message': 'CAIdentifier'}) + except requests.exceptions.ConnectionError: + log.warn('Could not connect to Network Device Enrollment Service.') + r = None + if r is None or r.content == b'' or r.headers['Content-Type'] == 'text/html': + log.warn('Unable to fetch root certificates (requires NDES).') + if 'cACertificate' in ca: + log.warn('Installing the server certificate only.') + der_certificate = base64.b64decode(ca['cACertificate']) + try: + cert = load_der_x509_certificate(der_certificate) + except TypeError: + cert = load_der_x509_certificate(der_certificate, + default_backend()) + cert_data = cert.public_bytes(Encoding.PEM) + with open(root_cert, 'wb') as w: + w.write(cert_data) + root_certs.append(root_cert) + return root_certs + + if r.headers['Content-Type'] == 'application/x-x509-ca-cert': + # Older versions of load_der_x509_certificate require a backend param + try: + cert = load_der_x509_certificate(r.content) + except TypeError: + cert = load_der_x509_certificate(r.content, default_backend()) + cert_data = cert.public_bytes(Encoding.PEM) + with open(root_cert, 'wb') as w: + w.write(cert_data) + root_certs.append(root_cert) + elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert': + certs = load_der_pkcs7_certificates(r.content) + for i in range(0, len(certs)): + cert = certs[i].public_bytes(Encoding.PEM) + filename, extension = root_cert.rsplit('.', 1) + dest = '%s.%d.%s' % (filename, i, extension) + with open(dest, 'wb') as w: + w.write(cert) + root_certs.append(dest) + else: + log.warn('getca: Wrong (or missing) MIME content type') + + return root_certs + + +def find_global_trust_dir(): + """Return the global trust dir using known paths from various Linux distros.""" + for trust_dir in global_trust_dirs: + if os.path.isdir(trust_dir): + return trust_dir + return global_trust_dirs[0] + +def update_ca_command(): + """Return the command to update the CA trust store.""" + return which('update-ca-certificates') or which('update-ca-trust') + +def changed(new_data, old_data): + """Return True if any key present in both dicts has changed.""" + return any((new_data[k] != old_data[k] if k in old_data else False) + for k in new_data.keys()) + +def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'): + """Install the root certificate chain.""" + data = dict({'files': [], 'templates': []}, **ca) + url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname'] + + log.info("Try to get root or server certificates") + + root_certs = getca(ca, url, trust_dir) + data['files'].extend(root_certs) + global_trust_dir = find_global_trust_dir() + for src in root_certs: + # Symlink the certs to global trust dir + dst = os.path.join(global_trust_dir, os.path.basename(src)) + try: + os.symlink(src, dst) + data['files'].append(dst) + log.info("Created symlink: %s -> %s" % (src, dst)) + except PermissionError: + log.warn('Failed to symlink root certificate to the' + ' admin trust anchors') + except FileNotFoundError: + log.warn('Failed to symlink root certificate to the' + ' admin trust anchors.' + ' The directory was not found', global_trust_dir) + except FileExistsError: + # If we're simply downloading a renewed cert, the symlink + # already exists. Ignore the FileExistsError. Preserve the + # existing symlink in the unapply data. + data['files'].append(dst) + + update = update_ca_command() + log.info("Running %s" % (update)) + if update is not None: + ret = Popen([update]).wait() + if ret != 0: + log.error('Failed to run %s' % (update)) + + # Setup Certificate Auto Enrollment + getcert = which('getcert') + cepces_submit = find_cepces_submit() + if getcert is not None and cepces_submit is not None: + p = Popen([getcert, 'add-ca', '-c', ca['name'], '-e', + '%s --server=%s --auth=%s' % (cepces_submit, + ca['hostname'], auth)], + stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + log.debug(out.decode()) + if p.returncode != 0: + if p.returncode == 2: + log.info('The CA [%s] already exists' % ca['name']) + else: + data = {'Error': err.decode(), 'CA': ca['name']} + log.error('Failed to add Certificate Authority', data) + + supported_templates = get_supported_templates(ca['hostname']) + for template in supported_templates: + attrs = fetch_template_attrs(ldb, template) + nickname = '%s.%s' % (ca['name'], template.decode()) + keyfile = os.path.join(private_dir, '%s.key' % nickname) + certfile = os.path.join(trust_dir, '%s.crt' % nickname) + p = Popen([getcert, 'request', '-c', ca['name'], + '-T', template.decode(), + '-I', nickname, '-k', keyfile, '-f', certfile, + '-g', attrs['msPKI-Minimal-Key-Size'][0]], + stdout=PIPE, stderr=PIPE) + out, err = p.communicate() + log.debug(out.decode()) + if p.returncode != 0: + if p.returncode == 2: + log.info('The template [%s] already exists' % (nickname)) + else: + data = {'Error': err.decode(), 'Certificate': nickname} + log.error('Failed to request certificate', data) + + data['files'].extend([keyfile, certfile]) + data['templates'].append(nickname) + if update is not None: + ret = Popen([update]).wait() + if ret != 0: + log.error('Failed to run %s' % (update)) + else: + log.warn('certmonger and cepces must be installed for ' + + 'certificate auto enrollment to work') + return json.dumps(data) + +class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier): + def __str__(self): + return r'Cryptography\AutoEnrollment' + + def unapply(self, guid, attribute, value): + ca_cn = base64.b64decode(attribute) + data = json.loads(value) + getcert = which('getcert') + if getcert is not None: + Popen([getcert, 'remove-ca', '-c', ca_cn]).wait() + for nickname in data['templates']: + Popen([getcert, 'stop-tracking', '-i', nickname]).wait() + for f in data['files']: + if os.path.exists(f): + if os.path.exists(f): + os.unlink(f) + self.cache_remove_attribute(guid, attribute) + + def apply(self, guid, ca, applier_func, *args, **kwargs): + attribute = base64.b64encode(ca['name'].encode()).decode() + # If the policy has changed, unapply, then apply new policy + old_val = self.cache_get_attribute_value(guid, attribute) + old_data = json.loads(old_val) if old_val is not None else {} + templates = ['%s.%s' % (ca['name'], t.decode()) for t in get_supported_templates(ca['hostname'])] \ + if old_val is not None else [] + new_data = { 'templates': templates, **ca } + if changed(new_data, old_data) or self.cache_get_apply_state() == GPOSTATE.ENFORCE: + self.unapply(guid, attribute, old_val) + # If policy is already applied and unchanged, skip application + if old_val is not None and not changed(new_data, old_data) and \ + self.cache_get_apply_state() != GPOSTATE.ENFORCE: + return + + # Apply the policy and log the changes + data = applier_func(*args, **kwargs) + self.cache_add_attribute(guid, attribute, data) + + def process_group_policy(self, deleted_gpo_list, changed_gpo_list, + trust_dir=None, private_dir=None): + if trust_dir is None: + trust_dir = self.lp.cache_path('certs') + if private_dir is None: + private_dir = self.lp.private_path('certs') + if not os.path.exists(trust_dir): + os.mkdir(trust_dir, mode=0o755) + if not os.path.exists(private_dir): + os.mkdir(private_dir, mode=0o700) + + for guid, settings in deleted_gpo_list: + if str(self) in settings: + for ca_cn_enc, data in settings[str(self)].items(): + self.unapply(guid, ca_cn_enc, data) + + for gpo in changed_gpo_list: + if gpo.file_sys_path: + section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment' + pol_file = 'MACHINE/Registry.pol' + path = os.path.join(gpo.file_sys_path, pol_file) + pol_conf = self.parse(path) + if not pol_conf: + continue + for e in pol_conf.entries: + if e.keyname == section and e.valuename == 'AEPolicy': + # This policy applies as specified in [MS-CAESO] 4.4.5.1 + if e.data & 0x8000: + continue # The policy is disabled + enroll = e.data & 0x1 == 0x1 + manage = e.data & 0x2 == 0x2 + retrive_pending = e.data & 0x4 == 0x4 + if enroll: + ca_names = self.__enroll(gpo.name, + pol_conf.entries, + trust_dir, private_dir) + + # Cleanup any old CAs that have been removed + ca_attrs = [base64.b64encode(n.encode()).decode() + for n in ca_names] + self.clean(gpo.name, keep=ca_attrs) + else: + # If enrollment has been disabled for this GPO, + # remove any existing policy + ca_attrs = \ + self.cache_get_all_attribute_values(gpo.name) + self.clean(gpo.name, remove=list(ca_attrs.keys())) + + def __read_cep_data(self, guid, ldb, end_point_information, + trust_dir, private_dir): + """Read CEP Data. + + [MS-CAESO] 4.4.5.3.2.4 + In this step autoenrollment initializes instances of the + CertificateEnrollmentPolicy by accessing end points associated with CEP + groups created in the previous step. + """ + # For each group created in the previous step: + for end_point_group in end_point_information: + # Pick an arbitrary instance of the + # CertificateEnrollmentPolicyEndPoint from the group + e = end_point_group[0] + + # If this instance does not have the AutoEnrollmentEnabled flag set + # in the EndPoint.Flags, continue with the next group. + if not e['Flags'] & 0x10: + continue + + # If the current group contains a + # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI + # equal to "LDAP": + if any([e['URL'] == 'LDAP:' for e in end_point_group]): + # Perform an LDAP search to read the value of the objectGuid + # attribute of the root object of the forest root domain NC. If + # any errors are encountered, continue with the next group. + res = ldb.search('', SCOPE_BASE, '(objectClass=*)', + ['rootDomainNamingContext']) + if len(res) != 1: + continue + res2 = ldb.search(res[0]['rootDomainNamingContext'][0], + SCOPE_BASE, '(objectClass=*)', + ['objectGUID']) + if len(res2) != 1: + continue + + # Compare the value read in the previous step to the + # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint + # instance. If the values do not match, continue with the next + # group. + objectGUID = '{%s}' % \ + octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper() + if objectGUID != e['PolicyID']: + continue + + # For each CertificateEnrollmentPolicyEndPoint instance for that + # group: + ca_names = [] + for ca in end_point_group: + # If EndPoint.URI equals "LDAP": + if ca['URL'] == 'LDAP:': + # This is a basic configuration. + cas = fetch_certification_authorities(ldb) + for _ca in cas: + self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir, + private_dir) + ca_names.append(_ca['name']) + # If EndPoint.URI starts with "HTTPS//": + elif ca['URL'].lower().startswith('https://'): + self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir, + private_dir, auth=ca['auth']) + ca_names.append(ca['name']) + else: + edata = { 'endpoint': ca['URL'] } + log.error('Unrecognized endpoint', edata) + return ca_names + + def __enroll(self, guid, entries, trust_dir, private_dir): + url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp) + ldb = Ldb(url=url, session_info=system_session(), + lp=self.lp, credentials=self.creds) + + ca_names = [] + end_point_information = obtain_end_point_information(entries) + if len(end_point_information) > 0: + ca_names.extend(self.__read_cep_data(guid, ldb, + end_point_information, + trust_dir, private_dir)) + else: + cas = fetch_certification_authorities(ldb) + for ca in cas: + self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir, + private_dir) + ca_names.append(ca['name']) + return ca_names + + def rsop(self, gpo): + output = {} + pol_file = 'MACHINE/Registry.pol' + section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment' + if gpo.file_sys_path: + path = os.path.join(gpo.file_sys_path, pol_file) + pol_conf = self.parse(path) + if not pol_conf: + return output + for e in pol_conf.entries: + if e.keyname == section and e.valuename == 'AEPolicy': + enroll = e.data & 0x1 == 0x1 + if e.data & 0x8000 or not enroll: + continue + output['Auto Enrollment Policy'] = {} + url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp) + ldb = Ldb(url=url, session_info=system_session(), + lp=self.lp, credentials=self.creds) + end_point_information = \ + obtain_end_point_information(pol_conf.entries) + cas = fetch_certification_authorities(ldb) + if len(end_point_information) > 0: + cas2 = [ep for sl in end_point_information for ep in sl] + if any([ca['URL'] == 'LDAP:' for ca in cas2]): + cas.extend(cas2) + else: + cas = cas2 + for ca in cas: + if 'URL' in ca and ca['URL'] == 'LDAP:': + continue + policy = 'Auto Enrollment Policy' + cn = ca['name'] + if policy not in output: + output[policy] = {} + output[policy][cn] = {} + if 'cACertificate' in ca: + output[policy][cn]['CA Certificate'] = \ + format_root_cert(ca['cACertificate']).decode() + output[policy][cn]['Auto Enrollment Server'] = \ + ca['hostname'] + supported_templates = \ + get_supported_templates(ca['hostname']) + output[policy][cn]['Templates'] = \ + [t.decode() for t in supported_templates] + return output |