summaryrefslogtreecommitdiffstats
path: root/python/samba/policies.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:20:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:20:00 +0000
commit8daa83a594a2e98f39d764422bfbdbc62c9efd44 (patch)
tree4099e8021376c7d8c05bdf8503093d80e9c7bad0 /python/samba/policies.py
parentInitial commit. (diff)
downloadsamba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.tar.xz
samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.zip
Adding upstream version 2:4.20.0+dfsg.upstream/2%4.20.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'python/samba/policies.py')
-rw-r--r--python/samba/policies.py388
1 files changed, 388 insertions, 0 deletions
diff --git a/python/samba/policies.py b/python/samba/policies.py
new file mode 100644
index 0000000..4539232
--- /dev/null
+++ b/python/samba/policies.py
@@ -0,0 +1,388 @@
+# Utilities for working with policies in SYSVOL Registry.pol files
+#
+# Copyright (C) David Mulder <dmulder@samba.org> 2022
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from io import StringIO
+import ldb
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.dcerpc import preg
+from samba.netcmd.common import netcmd_finddc
+from samba.netcmd.gpcommon import (
+ create_directory_hier,
+ smb_connection,
+ get_gpo_dn
+)
+from samba import NTSTATUSError
+from numbers import Number
+from samba.registry import str_regtype
+from samba.ntstatus import (
+ NT_STATUS_OBJECT_NAME_INVALID,
+ NT_STATUS_OBJECT_NAME_NOT_FOUND,
+ NT_STATUS_OBJECT_PATH_NOT_FOUND,
+ NT_STATUS_INVALID_PARAMETER
+)
+from samba.gp_parse.gp_ini import GPTIniParser
+from samba.common import get_string
+from samba.dcerpc import security
+from samba.ntacls import dsacl2fsacl
+from samba.dcerpc.misc import REG_BINARY, REG_MULTI_SZ, REG_SZ, GUID
+
+GPT_EMPTY = \
+"""
+[General]
+Version=0
+"""
+
+class RegistryGroupPolicies(object):
+ def __init__(self, gpo, lp, creds, samdb, host=None):
+ self.gpo = gpo
+ self.lp = lp
+ self.creds = creds
+ self.samdb = samdb
+ realm = self.lp.get('realm')
+ self.pol_dir = '\\'.join([realm.lower(), 'Policies', gpo, '%s'])
+ self.pol_file = '\\'.join([self.pol_dir, 'Registry.pol'])
+ self.policy_dn = get_gpo_dn(self.samdb, self.gpo)
+
+ if host and host.startswith('ldap://'):
+ dc_hostname = host[7:]
+ else:
+ dc_hostname = netcmd_finddc(self.lp, self.creds)
+
+ self.conn = smb_connection(dc_hostname,
+ 'sysvol',
+ lp=self.lp,
+ creds=self.creds)
+
+ # Get new security descriptor
+ ds_sd_flags = (security.SECINFO_OWNER |
+ security.SECINFO_GROUP |
+ security.SECINFO_DACL)
+ msg = self.samdb.search(base=self.policy_dn, scope=ldb.SCOPE_BASE,
+ attrs=['nTSecurityDescriptor'])[0]
+ ds_sd_ndr = msg['nTSecurityDescriptor'][0]
+ ds_sd = ndr_unpack(security.descriptor, ds_sd_ndr).as_sddl()
+
+ # Create a file system security descriptor
+ domain_sid = security.dom_sid(self.samdb.get_domain_sid())
+ sddl = dsacl2fsacl(ds_sd, domain_sid)
+ self.fs_sd = security.descriptor.from_sddl(sddl, domain_sid)
+
+ def __load_registry_pol(self, pol_file):
+ try:
+ pol_data = ndr_unpack(preg.file, self.conn.loadfile(pol_file))
+ except NTSTATUSError as e:
+ if e.args[0] in [NT_STATUS_OBJECT_NAME_INVALID,
+ NT_STATUS_OBJECT_NAME_NOT_FOUND,
+ NT_STATUS_OBJECT_PATH_NOT_FOUND]:
+ pol_data = preg.file() # The file doesn't exist
+ else:
+ raise
+ return pol_data
+
+ def __save_file(self, file_dir, file_name, data):
+ create_directory_hier(self.conn, file_dir)
+ self.conn.savefile(file_name, data)
+ self.conn.set_acl(file_name, self.fs_sd)
+
+ def __save_registry_pol(self, pol_dir, pol_file, pol_data):
+ self.__save_file(pol_dir, pol_file, ndr_pack(pol_data))
+
+ def __validate_json(self, json_input, remove=False):
+ if type(json_input) != list:
+ raise SyntaxError('JSON not formatted correctly')
+ for entry in json_input:
+ if type(entry) != dict:
+ raise SyntaxError('JSON not formatted correctly')
+ keys = ['keyname', 'valuename', 'class']
+ if not remove:
+ keys.extend(['data', 'type'])
+ if not all([k in entry for k in keys]):
+ raise SyntaxError('JSON not formatted correctly')
+
+ def __determine_data_type(self, entry):
+ if isinstance(entry['type'], Number):
+ return entry['type']
+ else:
+ for i in range(12):
+ if str_regtype(i) == entry['type'].upper():
+ return i
+ raise TypeError('Unknown type %s' % entry['type'])
+
+ def __set_data(self, rtype, data):
+ # JSON can't store bytes, and have to be set via an int array
+ if rtype == REG_BINARY and type(data) == list:
+ return bytes(data)
+ elif rtype == REG_MULTI_SZ and type(data) == list:
+ data = ('\x00').join(data) + '\x00\x00'
+ return data.encode('utf-16-le')
+ elif rtype == REG_SZ and type(data) == str:
+ return data.encode('utf-8')
+ return data
+
+ def __pol_replace(self, pol_data, entry):
+ for e in pol_data.entries:
+ if e.keyname == entry['keyname'] and \
+ e.valuename == entry['valuename']:
+ e.data = self.__set_data(e.type, entry['data'])
+ break
+ else:
+ e = preg.entry()
+ e.keyname = entry['keyname']
+ e.valuename = entry['valuename']
+ e.type = self.__determine_data_type(entry)
+ e.data = self.__set_data(e.type, entry['data'])
+ entries = list(pol_data.entries)
+ entries.append(e)
+ pol_data.entries = entries
+ pol_data.num_entries = len(entries)
+
+ def __pol_remove(self, pol_data, entry):
+ entries = []
+ for e in pol_data.entries:
+ if not (e.keyname == entry['keyname'] and
+ e.valuename == entry['valuename']):
+ entries.append(e)
+ pol_data.entries = entries
+ pol_data.num_entries = len(entries)
+
+ def increment_gpt_ini(self, machine_changed=False, user_changed=False):
+ if not machine_changed and not user_changed:
+ return
+ GPT_INI = self.pol_dir % 'GPT.INI'
+ try:
+ data = self.conn.loadfile(GPT_INI)
+ except NTSTATUSError as e:
+ if e.args[0] in [NT_STATUS_OBJECT_NAME_INVALID,
+ NT_STATUS_OBJECT_NAME_NOT_FOUND,
+ NT_STATUS_OBJECT_PATH_NOT_FOUND]:
+ data = GPT_EMPTY
+ else:
+ raise
+ parser = GPTIniParser()
+ parser.parse(data)
+ version = 0
+ machine_version = 0
+ user_version = 0
+ if parser.ini_conf.has_option('General', 'Version'):
+ version = int(parser.ini_conf.get('General',
+ 'Version').encode('utf-8'))
+ machine_version = version & 0x0000FFFF
+ user_version = version >> 16
+ if machine_changed:
+ machine_version += 1
+ if user_changed:
+ user_version += 1
+ version = (user_version << 16) + machine_version
+
+ # Set the new version in the GPT.INI
+ if not parser.ini_conf.has_section('General'):
+ parser.ini_conf.add_section('General')
+ parser.ini_conf.set('General', 'Version', str(version))
+ with StringIO() as out_data:
+ parser.ini_conf.write(out_data)
+ out_data.seek(0)
+ self.__save_file(self.pol_dir % '', GPT_INI,
+ out_data.read().encode('utf-8'))
+
+ # Set the new versionNumber on the ldap object
+ m = ldb.Message()
+ m.dn = self.policy_dn
+ m['new_value'] = ldb.MessageElement(str(version), ldb.FLAG_MOD_REPLACE,
+ 'versionNumber')
+ self.samdb.modify(m)
+
+ def __validate_extension_registration(self, ext_name, ext_attr):
+ try:
+ ext_name_guid = GUID(ext_name)
+ except NTSTATUSError as e:
+ if e.args[0] == NT_STATUS_INVALID_PARAMETER:
+ raise SyntaxError('Extension name not formatted correctly')
+ raise
+ if ext_attr not in ['gPCMachineExtensionNames',
+ 'gPCUserExtensionNames']:
+ raise SyntaxError('Extension attribute incorrect')
+ return '{%s}' % ext_name_guid
+
+ def register_extension_name(self, ext_name, ext_attr):
+ ext_name = self.__validate_extension_registration(ext_name, ext_attr)
+ res = self.samdb.search(base=self.policy_dn, scope=ldb.SCOPE_BASE,
+ attrs=[ext_attr])
+ if len(res) == 0 or ext_attr not in res[0]:
+ ext_names = '[]'
+ else:
+ ext_names = get_string(res[0][ext_attr][-1])
+ if ext_name not in ext_names:
+ ext_names = '[' + ext_names.strip('[]') + ext_name + ']'
+ else:
+ return
+
+ m = ldb.Message()
+ m.dn = self.policy_dn
+ m['new_value'] = ldb.MessageElement(ext_names, ldb.FLAG_MOD_REPLACE,
+ ext_attr)
+ self.samdb.modify(m)
+
+ def unregister_extension_name(self, ext_name, ext_attr):
+ ext_name = self.__validate_extension_registration(ext_name, ext_attr)
+ res = self.samdb.search(base=self.policy_dn, scope=ldb.SCOPE_BASE,
+ attrs=[ext_attr])
+ if len(res) == 0 or ext_attr not in res[0]:
+ return
+ else:
+ ext_names = get_string(res[0][ext_attr][-1])
+ if ext_name in ext_names:
+ ext_names = ext_names.replace(ext_name, '')
+ else:
+ return
+
+ m = ldb.Message()
+ m.dn = self.policy_dn
+ m['new_value'] = ldb.MessageElement(ext_names, ldb.FLAG_MOD_REPLACE,
+ ext_attr)
+ self.samdb.modify(m)
+
+ def remove_s(self, json_input):
+ """remove_s
+ json_input: JSON list of entries to remove from GPO
+
+ Example json_input:
+ [
+ {
+ "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage",
+ "valuename": "StartPage",
+ "class": "USER",
+ },
+ {
+ "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage",
+ "valuename": "URL",
+ "class": "USER",
+ },
+ ]
+ """
+ self.__validate_json(json_input, remove=True)
+ user_pol_data = self.__load_registry_pol(self.pol_file % 'User')
+ machine_pol_data = self.__load_registry_pol(self.pol_file % 'Machine')
+
+ machine_changed = False
+ user_changed = False
+ for entry in json_input:
+ cls = entry['class'].lower()
+ if cls == 'machine' or cls == 'both':
+ machine_changed = True
+ self.__pol_remove(machine_pol_data, entry)
+ if cls == 'user' or cls == 'both':
+ user_changed = True
+ self.__pol_remove(user_pol_data, entry)
+ if user_changed:
+ self.__save_registry_pol(self.pol_dir % 'User',
+ self.pol_file % 'User',
+ user_pol_data)
+ if machine_changed:
+ self.__save_registry_pol(self.pol_dir % 'Machine',
+ self.pol_file % 'Machine',
+ machine_pol_data)
+ self.increment_gpt_ini(machine_changed, user_changed)
+
+ def merge_s(self, json_input):
+ """merge_s
+ json_input: JSON list of entries to merge into GPO
+
+ Example json_input:
+ [
+ {
+ "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage",
+ "valuename": "StartPage",
+ "class": "USER",
+ "type": "REG_SZ",
+ "data": "homepage"
+ },
+ {
+ "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage",
+ "valuename": "URL",
+ "class": "USER",
+ "type": "REG_SZ",
+ "data": "google.com"
+ },
+ ]
+ """
+ self.__validate_json(json_input)
+ user_pol_data = self.__load_registry_pol(self.pol_file % 'User')
+ machine_pol_data = self.__load_registry_pol(self.pol_file % 'Machine')
+
+ machine_changed = False
+ user_changed = False
+ for entry in json_input:
+ cls = entry['class'].lower()
+ if cls == 'machine' or cls == 'both':
+ machine_changed = True
+ self.__pol_replace(machine_pol_data, entry)
+ if cls == 'user' or cls == 'both':
+ user_changed = True
+ self.__pol_replace(user_pol_data, entry)
+ if user_changed:
+ self.__save_registry_pol(self.pol_dir % 'User',
+ self.pol_file % 'User',
+ user_pol_data)
+ if machine_changed:
+ self.__save_registry_pol(self.pol_dir % 'Machine',
+ self.pol_file % 'Machine',
+ machine_pol_data)
+ self.increment_gpt_ini(machine_changed, user_changed)
+
+ def replace_s(self, json_input):
+ """replace_s
+ json_input: JSON list of entries to replace entries in GPO
+
+ Example json_input:
+ [
+ {
+ "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage",
+ "valuename": "StartPage",
+ "class": "USER",
+ "data": "homepage"
+ },
+ {
+ "keyname": "Software\\Policies\\Mozilla\\Firefox\\Homepage",
+ "valuename": "URL",
+ "class": "USER",
+ "data": "google.com"
+ },
+ ]
+ """
+ self.__validate_json(json_input)
+ user_pol_data = preg.file()
+ machine_pol_data = preg.file()
+
+ machine_changed = False
+ user_changed = False
+ for entry in json_input:
+ cls = entry['class'].lower()
+ if cls == 'machine' or cls == 'both':
+ machine_changed = True
+ self.__pol_replace(machine_pol_data, entry)
+ if cls == 'user' or cls == 'both':
+ user_changed = True
+ self.__pol_replace(user_pol_data, entry)
+ if user_changed:
+ self.__save_registry_pol(self.pol_dir % 'User',
+ self.pol_file % 'User',
+ user_pol_data)
+ if machine_changed:
+ self.__save_registry_pol(self.pol_dir % 'Machine',
+ self.pol_file % 'Machine',
+ machine_pol_data)
+ self.increment_gpt_ini(machine_changed, user_changed)