# gp_drive_maps_user_ext samba gpo policy # Copyright (C) David Mulder 2020 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import json from samba.gp.gpclass import gp_xml_ext, gp_misc_applier, drop_privileges, \ expand_pref_variables from subprocess import Popen, PIPE from samba.gp.gp_scripts_ext import fetch_crontab, install_user_crontab from samba.gp.util.logging import log from samba.gp import gp_scripts_ext gp_scripts_ext.intro = ''' ### autogenerated by samba # # This file is generated by the gp_drive_maps_user_ext Group Policy # Client Side Extension. To modify the contents of this file, # modify the appropriate Group Policy objects which apply # to this machine. DO NOT MODIFY THIS FILE DIRECTLY. # ''' def mount_drive(uri): log.debug('Mounting drive', uri) out, err = Popen(['gio', 'mount', uri], stdout=PIPE, stderr=PIPE).communicate() if err: if b'Location is already mounted' not in err: raise SystemError(err) def unmount_drive(uri): log.debug('Unmounting drive', uri) return Popen(['gio', 'mount', uri, '--unmount']).wait() class gp_drive_maps_user_ext(gp_xml_ext, gp_misc_applier): def parse_value(self, val): vals = super().parse_value(val) if 'props' in vals.keys(): vals['props'] = json.loads(vals['props']) if 'run_once' in vals.keys(): vals['run_once'] = json.loads(vals['run_once']) return vals def unapply(self, guid, uri, val): vals = self.parse_value(val) if 'props' in vals.keys() and \ vals['props']['action'] in ['C', 'R', 'U']: unmount_drive(uri) others, entries = fetch_crontab(self.username) if 'crontab' in vals.keys() and vals['crontab'] in entries: entries.remove(vals['crontab']) install_user_crontab(self.username, others, entries) self.cache_remove_attribute(guid, uri) def apply(self, guid, uri, props, run_once, entry): old_val = self.cache_get_attribute_value(guid, uri) val = self.generate_value(props=json.dumps(props), run_once=json.dumps(run_once), crontab=entry) # The policy has changed, unapply it first if old_val: self.unapply(guid, uri, old_val) if props['action'] in ['C', 'R', 'U']: mount_drive(uri) elif props['action'] == 'D': unmount_drive(uri) if not run_once: others, entries = fetch_crontab(self.username) if entry not in entries: entries.append(entry) install_user_crontab(self.username, others, entries) self.cache_add_attribute(guid, uri, val) def __str__(self): return 'Preferences/Drives' def process_group_policy(self, deleted_gpo_list, changed_gpo_list): for guid, settings in deleted_gpo_list: if str(self) in settings: for uri, val in settings[str(self)].items(): self.unapply(guid, uri, val) for gpo in changed_gpo_list: if gpo.file_sys_path: xml = 'USER/Preferences/Drives/Drives.xml' path = os.path.join(gpo.file_sys_path, xml) xml_conf = drop_privileges('root', self.parse, path) if not xml_conf: continue drives = xml_conf.findall('Drive') attrs = [] for drive in drives: prop = drive.find('Properties') if prop is None: log.warning('Drive is missing Properties', drive.attrib) continue if prop.attrib['thisDrive'] == 'HIDE': log.warning('Drive is hidden', prop.attrib) continue # Don't mount a hidden drive run_once = False filters = drive.find('Filters') if filters: run_once_filter = filters.find('FilterRunOnce') if run_once_filter is not None: run_once = True uri = 'smb:{}'.format(prop.attrib['path'].replace('\\', '/')) # Ensure we expand the preference variables, or fail if we # are unable to (the uri is invalid if we fail). gptpath = os.path.join(gpo.file_sys_path, 'USER') try: uri = expand_pref_variables(uri, gptpath, self.lp, username=self.username) except NameError as e: # If we fail expanding variables, then the URI is # invalid and we can't continue processing this drive # map. We can continue processing other drives, as they # may succeed. This is not a critical error, since some # Windows specific policies won't apply here. log.warn('Failed to expand drive map variables: %s' % e, prop.attrib) continue attrs.append(uri) entry = '' if not run_once: if prop.attrib['action'] in ['C', 'R', 'U']: entry = '@hourly gio mount {}'.format(uri) elif prop.attrib['action'] == 'D': entry = '@hourly gio mount {} --unmount'.format(uri) self.apply(gpo.name, uri, prop.attrib, run_once, entry) self.clean(gpo.name, keep=attrs) def rsop(self, gpo): output = {} if gpo.file_sys_path: xml = 'USER/Preferences/Drives/Drives.xml' path = os.path.join(gpo.file_sys_path, xml) xml_conf = self.parse(path) if not xml_conf: return output drives = xml_conf.findall('Drive') for drive in drives: prop = drive.find('Properties') if prop is None: continue if prop.attrib['thisDrive'] == 'HIDE': continue uri = 'smb:{}'.format(prop.attrib['path'].replace('\\', '/')) if prop.attrib['action'] in ['C', 'R', 'U']: output[prop.attrib['label']] = 'gio mount {}'.format(uri) elif prop.attrib['action'] == 'D': output[prop.attrib['label']] = \ 'gio mount {} --unmount'.format(uri) return output