summaryrefslogtreecommitdiffstats
path: root/plugins/python/example_policy_plugin.py
blob: dfb15ca44084965ddef1205d7a29f4ea3b8201ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import sudo

import errno
import sys
import os
import pwd
import grp
import shutil


VERSION = 1.0


class SudoPolicyPlugin(sudo.Plugin):
    """Example sudo policy plugin

    Demonstrates how to use the sudo policy plugin API. All functions are added
    as an example on their syntax, but note that most of them are optional
    (except check_policy).

    On detailed description of the functions refer to sudo_plugin manual (man
    sudo_plugin).

    Most functions can express error or reject through their "int" return value
    as documented in the manual. The sudo module also has constants for these:
        sudo.RC.ACCEPT / sudo.RC.OK  1
        sudo.RC.REJECT               0
        sudo.RC.ERROR               -1
        sudo.RC.USAGE_ERROR         -2

    If the plugin encounters an error, instead of just returning sudo.RC.ERROR
    result code it can also add a message describing the problem.
    This can be done by raising the special exception:
        raise sudo.PluginError("Message")
    This added message will be used by the audit plugins.

    If the function returns "None" (for example does not call return), it will
    be considered sudo.RC.OK. If an exception other than sudo.PluginError is
    raised, its backtrace will be shown to the user and the plugin function
    returns sudo.RC.ERROR. If that is not acceptable, catch it.
    """

    _allowed_commands = ("id", "whoami")
    _safe_password = "12345"

    # -- Plugin API functions --

    def __init__(self, user_env: tuple, settings: tuple,
                 version: str, **kwargs):
        """The constructor matches the C sudo plugin API open() call

        Other variables you can currently use as arguments are:
            user_info: tuple
            plugin_options: tuple

        For their detailed description, see the open() call of the C plugin API
        in the sudo manual ("man sudo").
        """
        if not version.startswith("1."):
            raise sudo.PluginError(
                "This plugin plugin is not compatible with python plugin"
                "API version {}".format(version))

        self.user_env = sudo.options_as_dict(user_env)
        self.settings = sudo.options_as_dict(settings)

    def check_policy(self, argv: tuple, env_add: tuple):
        cmd = argv[0]
        # Example for a simple reject:
        if not self._is_command_allowed(cmd):
            sudo.log_error("You are not allowed to run this command!")
            return sudo.RC.REJECT

            raise sudo.PluginError("You are not allowed to run this command!")

        # The environment the command will be executed with (we allow any here)
        user_env_out = sudo.options_from_dict(self.user_env) + env_add

        command_info_out = sudo.options_from_dict({
            "command": self._find_on_path(cmd),  # Absolute path of command
            "runas_uid": self._runas_uid(),      # The user id
            "runas_gid": self._runas_gid(),      # The group id
        })

        return (sudo.RC.ACCEPT, command_info_out, argv, user_env_out)

    def init_session(self, user_pwd: tuple, user_env: tuple):
        """Perform session setup

        Beware that user_pwd can be None if user is not present in the password
        database. Otherwise it is a tuple convertible to pwd.struct_passwd.
        """
        # conversion example:
        user_pwd = pwd.struct_passwd(user_pwd) if user_pwd else None

        # This is how you change the user_env:
        return (sudo.RC.OK, user_env + ("PLUGIN_EXAMPLE_ENV=1",))

        # If you do not want to change user_env, you can just return (or None):
        # return sudo.RC.OK

    def list(self, argv: tuple, is_verbose: int, user: str):
        cmd = argv[0] if argv else None
        as_user_text = "as user '{}'".format(user) if user else ""

        if cmd:
            allowed_text = "" if self._is_command_allowed(cmd) else "NOT "
            sudo.log_info("You are {}allowed to execute command '{}'{}"
                          .format(allowed_text, cmd, as_user_text))

        if not cmd or is_verbose:
            sudo.log_info("Only the following commands are allowed:",
                          ", ".join(self._allowed_commands), as_user_text)

    def validate(self):
        pass  # we have no cache

    def invalidate(self, remove: int):
        pass  # we have no cache

    def show_version(self, is_verbose: int):
        sudo.log_info("Python Example Policy Plugin "
                      "version: {}".format(VERSION))
        if is_verbose:
            sudo.log_info("Python interpreter version:", sys.version)

    def close(self, exit_status: int, error: int) -> None:
        if error == 0:
            sudo.log_info("The command returned with exit_status {}".format(
                exit_status))
        else:
            error_name = errno.errorcode.get(error, "???")
            sudo.log_error(
                "Failed to execute command, execve syscall returned "
                "{} ({})".format(error, error_name))

    # -- Helper functions --

    def _is_command_allowed(self, cmd):
        return os.path.basename(cmd) in self._allowed_commands

    def _find_on_path(self, cmd):
        if os.path.isabs(cmd):
            return cmd

        path = self.user_env.get("PATH", "/usr/bin:/bin")
        absolute_cmd = shutil.which(cmd, path=path)
        if not absolute_cmd:
            raise sudo.PluginError("Can not find cmd '{}' on PATH".format(cmd))
        return absolute_cmd

    def _runas_pwd(self):
        runas_user = self.settings.get("runas_user") or "root"
        try:
            return pwd.getpwnam(runas_user)
        except KeyError:
            raise sudo.PluginError("Could not find user "
                                   "'{}'".format(runas_user))

    def _runas_uid(self):
        return self._runas_pwd().pw_uid

    def _runas_gid(self):
        runas_group = self.settings.get("runas_group")
        if runas_group is None:
            return self._runas_pwd().pw_gid

        try:
            return grp.getgrnam(runas_group).gr_gid
        except KeyError:
            raise sudo.PluginError(
                "Could not find group '{}'".format(runas_group))