summaryrefslogtreecommitdiffstats
path: root/plugins/python/example_io_plugin.py
blob: dc4c6a76233ad43e5c984eebbdeb2d9c2c21cda1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import sudo

from os import path
import errno
import signal
import sys
import json


VERSION = 1.0


class SudoIOPlugin(sudo.Plugin):
    """Example sudo input/output plugin

    Demonstrates how to use the sudo IO plugin API. All functions are added as
    an example on their syntax, but note that all of them are optional.

    On detailed description of the functions refer to sudo_plugin manual (man
    sudo_plugin).

    Most functions can express error or reject through their "int" return value
    as documented in the manual. The sudo module also has constants for these:
        sudo.RC.ACCEPT / sudo.RC.OK  1
        sudo.RC.REJECT               0
        sudo.RC.ERROR               -1
        sudo.RC.USAGE_ERROR         -2

    If the plugin encounters an error, instead of just returning sudo.RC.ERROR
    result code it can also add a message describing the problem.
    This can be done by raising the special exception:
        raise sudo.PluginError("Message")
    This added message will be used by the audit plugins.

    If the function returns "None" (for example does not call return), it will
    be considered sudo.RC.OK. If an exception other than sudo.PluginError is
    raised, its backtrace will be shown to the user and the plugin function
    returns sudo.RC.ERROR. If that is not acceptable, catch it.
    """

    # -- Plugin API functions --

    def __init__(self, version: str,
                 plugin_options: tuple, **kwargs):
        """The constructor of the IO plugin.

        Other variables you can currently use as arguments are:
            user_env: tuple
            settings: tuple
            user_info: tuple

        For their detailed description, see the open() call of the C plugin API
        in the sudo manual ("man sudo").
        """
        if not version.startswith("1."):
            raise sudo.SudoException(
                "This plugin plugin is not compatible with python plugin"
                "API version {}".format(version))

        # convert tuple of "key=value"s to dict
        plugin_options = sudo.options_as_dict(plugin_options)

        log_path = plugin_options.get("LogPath", "/tmp")
        self._open_log_file(path.join(log_path, "sudo.log"))
        self._log("", "-- Plugin STARTED --")

    def __del__(self):
        if hasattr(self, "_log_file"):
            self._log("", "-- Plugin DESTROYED --")
            self._log_file.close()

    def open(self, argv: tuple,
             command_info: tuple) -> int:
        """Receives the command the user wishes to run.

        This function works the same as open() call of the C IO plugin API (see
        sudo manual), except that:
         - It only gets called before the user would execute some command (and
           not for a version query for example).
         - Other arguments of the C open() call are received through the
           constructor.
        """
        self._log("EXEC", " ".join(argv))
        self._log("EXEC info", json.dumps(command_info, indent=4))

        return sudo.RC.ACCEPT

    def log_ttyout(self, buf: str) -> int:
        return self._log("TTY OUT", buf.strip())

    def log_ttyin(self, buf: str) -> int:
        return self._log("TTY IN", buf.strip())

    def log_stdin(self, buf: str) -> int:
        return self._log("STD IN", buf.strip())

    def log_stdout(self, buf: str) -> int:
        return self._log("STD OUT", buf.strip())

    def log_stderr(self, buf: str) -> int:
        return self._log("STD ERR", buf.strip())

    def change_winsize(self, line: int, cols: int) -> int:
        self._log("WINSIZE", "{}x{}".format(line, cols))

    def log_suspend(self, signo: int) -> int:
        signal_description = self._signal_name(signo)

        self._log("SUSPEND", signal_description)

    def show_version(self, is_verbose: int) -> int:
        sudo.log_info("Python Example IO Plugin version: {}".format(VERSION))
        if is_verbose:
            sudo.log_info("Python interpreter version:", sys.version)

    def close(self, exit_status: int, error: int) -> None:
        """Called when a command execution finished.

        Works the same as close() from C API (see sudo_plugin manual), except
        that it only gets called if there was a command execution trial (open()
        returned with sudo.RC.ACCEPT).
        """
        if error == 0:
            self._log("CLOSE", "Command returned {}".format(exit_status))
        else:
            error_name = errno.errorcode.get(error, "???")
            self._log("CLOSE", "Failed to execute, execve returned {} ({})"
                               .format(error, error_name))

    # -- Helper functions --

    def _open_log_file(self, log_path):
        sudo.log_info("Example sudo python plugin will log to", log_path)
        self._log_file = open(log_path, "a")

    def _log(self, type, message):
        print(type, message, file=self._log_file)
        return sudo.RC.ACCEPT

    if hasattr(signal, "Signals"):
        def _signal_name(cls, signo: int):
            try:
                return signal.Signals(signo).name
            except ValueError:
                return "signal {}".format(signo)
    else:
        def _signal_name(cls, signo: int):
            for n, v in sorted(signal.__dict__.items()):
                if v != signo:
                    continue;
                if n.startswith("SIG") and not n.startswith("SIG_"):
                    return n
            return "signal {}".format(signo)