From f8e5c55a036f0e2e2a958e30456270f3f9eba933 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 14:52:13 +0200 Subject: Adding upstream version 1.9.5p2. Signed-off-by: Daniel Baumann --- plugins/python/example_policy_plugin.py | 172 ++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 plugins/python/example_policy_plugin.py (limited to 'plugins/python/example_policy_plugin.py') diff --git a/plugins/python/example_policy_plugin.py b/plugins/python/example_policy_plugin.py new file mode 100644 index 0000000..dfb15ca --- /dev/null +++ b/plugins/python/example_policy_plugin.py @@ -0,0 +1,172 @@ +import sudo + +import errno +import sys +import os +import pwd +import grp +import shutil + + +VERSION = 1.0 + + +class SudoPolicyPlugin(sudo.Plugin): + """Example sudo policy plugin + + Demonstrates how to use the sudo policy plugin API. All functions are added + as an example on their syntax, but note that most of them are optional + (except check_policy). + + On detailed description of the functions refer to sudo_plugin manual (man + sudo_plugin). + + Most functions can express error or reject through their "int" return value + as documented in the manual. The sudo module also has constants for these: + sudo.RC.ACCEPT / sudo.RC.OK 1 + sudo.RC.REJECT 0 + sudo.RC.ERROR -1 + sudo.RC.USAGE_ERROR -2 + + If the plugin encounters an error, instead of just returning sudo.RC.ERROR + result code it can also add a message describing the problem. + This can be done by raising the special exception: + raise sudo.PluginError("Message") + This added message will be used by the audit plugins. + + If the function returns "None" (for example does not call return), it will + be considered sudo.RC.OK. If an exception other than sudo.PluginError is + raised, its backtrace will be shown to the user and the plugin function + returns sudo.RC.ERROR. If that is not acceptable, catch it. + """ + + _allowed_commands = ("id", "whoami") + _safe_password = "12345" + + # -- Plugin API functions -- + + def __init__(self, user_env: tuple, settings: tuple, + version: str, **kwargs): + """The constructor matches the C sudo plugin API open() call + + Other variables you can currently use as arguments are: + user_info: tuple + plugin_options: tuple + + For their detailed description, see the open() call of the C plugin API + in the sudo manual ("man sudo"). + """ + if not version.startswith("1."): + raise sudo.PluginError( + "This plugin plugin is not compatible with python plugin" + "API version {}".format(version)) + + self.user_env = sudo.options_as_dict(user_env) + self.settings = sudo.options_as_dict(settings) + + def check_policy(self, argv: tuple, env_add: tuple): + cmd = argv[0] + # Example for a simple reject: + if not self._is_command_allowed(cmd): + sudo.log_error("You are not allowed to run this command!") + return sudo.RC.REJECT + + raise sudo.PluginError("You are not allowed to run this command!") + + # The environment the command will be executed with (we allow any here) + user_env_out = sudo.options_from_dict(self.user_env) + env_add + + command_info_out = sudo.options_from_dict({ + "command": self._find_on_path(cmd), # Absolute path of command + "runas_uid": self._runas_uid(), # The user id + "runas_gid": self._runas_gid(), # The group id + }) + + return (sudo.RC.ACCEPT, command_info_out, argv, user_env_out) + + def init_session(self, user_pwd: tuple, user_env: tuple): + """Perform session setup + + Beware that user_pwd can be None if user is not present in the password + database. Otherwise it is a tuple convertible to pwd.struct_passwd. + """ + # conversion example: + user_pwd = pwd.struct_passwd(user_pwd) if user_pwd else None + + # This is how you change the user_env: + return (sudo.RC.OK, user_env + ("PLUGIN_EXAMPLE_ENV=1",)) + + # If you do not want to change user_env, you can just return (or None): + # return sudo.RC.OK + + def list(self, argv: tuple, is_verbose: int, user: str): + cmd = argv[0] if argv else None + as_user_text = "as user '{}'".format(user) if user else "" + + if cmd: + allowed_text = "" if self._is_command_allowed(cmd) else "NOT " + sudo.log_info("You are {}allowed to execute command '{}'{}" + .format(allowed_text, cmd, as_user_text)) + + if not cmd or is_verbose: + sudo.log_info("Only the following commands are allowed:", + ", ".join(self._allowed_commands), as_user_text) + + def validate(self): + pass # we have no cache + + def invalidate(self, remove: int): + pass # we have no cache + + def show_version(self, is_verbose: int): + sudo.log_info("Python Example Policy Plugin " + "version: {}".format(VERSION)) + if is_verbose: + sudo.log_info("Python interpreter version:", sys.version) + + def close(self, exit_status: int, error: int) -> None: + if error == 0: + sudo.log_info("The command returned with exit_status {}".format( + exit_status)) + else: + error_name = errno.errorcode.get(error, "???") + sudo.log_error( + "Failed to execute command, execve syscall returned " + "{} ({})".format(error, error_name)) + + # -- Helper functions -- + + def _is_command_allowed(self, cmd): + return os.path.basename(cmd) in self._allowed_commands + + def _find_on_path(self, cmd): + if os.path.isabs(cmd): + return cmd + + path = self.user_env.get("PATH", "/usr/bin:/bin") + absolute_cmd = shutil.which(cmd, path=path) + if not absolute_cmd: + raise sudo.PluginError("Can not find cmd '{}' on PATH".format(cmd)) + return absolute_cmd + + def _runas_pwd(self): + runas_user = self.settings.get("runas_user") or "root" + try: + return pwd.getpwnam(runas_user) + except KeyError: + raise sudo.PluginError("Could not find user " + "'{}'".format(runas_user)) + + def _runas_uid(self): + return self._runas_pwd().pw_uid + + def _runas_gid(self): + runas_group = self.settings.get("runas_group") + if runas_group is None: + return self._runas_pwd().pw_gid + + try: + return grp.getgrnam(runas_group).gr_gid + except KeyError: + raise sudo.PluginError( + "Could not find group '{}'".format(runas_group)) -- cgit v1.2.3