summaryrefslogtreecommitdiffstats
path: root/tests/topotests/munet/linux.py
blob: 519c55f84d8237db7384d05edb7e236a5ee1959b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
# SPDX-License-Identifier: GPL-2.0-or-later
#
# June 10 2022, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2022, LabN Consulting, L.L.C.
#
"""A module that gives access to linux unshare system call."""

import ctypes  # pylint: disable=C0415
import ctypes.util  # pylint: disable=C0415
import errno
import functools
import os


libc = None


def raise_oserror(enum):
    s = errno.errorcode[enum] if enum in errno.errorcode else str(enum)
    error = OSError(s)
    error.errno = enum
    error.strerror = s
    raise error


def _load_libc():
    global libc  # pylint: disable=W0601,W0603
    if libc:
        return
    lcpath = ctypes.util.find_library("c")
    libc = ctypes.CDLL(lcpath, use_errno=True)


def pause():
    if not libc:
        _load_libc()
    libc.pause()


MS_RDONLY = 1
MS_NOSUID = 1 << 1
MS_NODEV = 1 << 2
MS_NOEXEC = 1 << 3
MS_SYNCHRONOUS = 1 << 4
MS_REMOUNT = 1 << 5
MS_MANDLOCK = 1 << 6
MS_DIRSYNC = 1 << 7
MS_NOSYMFOLLOW = 1 << 8
MS_NOATIME = 1 << 10
MS_NODIRATIME = 1 << 11
MS_BIND = 1 << 12
MS_MOVE = 1 << 13
MS_REC = 1 << 14
MS_SILENT = 1 << 15
MS_POSIXACL = 1 << 16
MS_UNBINDABLE = 1 << 17
MS_PRIVATE = 1 << 18
MS_SLAVE = 1 << 19
MS_SHARED = 1 << 20
MS_RELATIME = 1 << 21
MS_KERNMOUNT = 1 << 22
MS_I_VERSION = 1 << 23
MS_STRICTATIME = 1 << 24
MS_LAZYTIME = 1 << 25


def mount(source, target, fs, flags=0, options=""):
    if not libc:
        _load_libc()
    libc.mount.argtypes = (
        ctypes.c_char_p,
        ctypes.c_char_p,
        ctypes.c_char_p,
        ctypes.c_ulong,
        ctypes.c_char_p,
    )
    fsenc = fs.encode() if fs else None
    optenc = options.encode() if options else None
    ret = libc.mount(source.encode(), target.encode(), fsenc, flags, optenc)
    if ret < 0:
        err = ctypes.get_errno()
        raise OSError(
            err,
            f"Error mounting {source} ({fs}) on {target}"
            f" with options '{options}': {os.strerror(err)}",
        )


# unmout options
MNT_FORCE = 0x1
MNT_DETACH = 0x2
MNT_EXPIRE = 0x4
UMOUNT_NOFOLLOW = 0x8


def umount(target, options):
    if not libc:
        _load_libc()
    libc.umount.argtypes = (ctypes.c_char_p, ctypes.c_uint)

    ret = libc.umount(target.encode(), int(options))
    if ret < 0:
        err = ctypes.get_errno()
        raise OSError(
            err,
            f"Error umounting {target} with options '{options}': {os.strerror(err)}",
        )


def pidfd_open(pid, flags=0):
    if hasattr(os, "pidfd_open") and os.pidfd_open is not pidfd_open:
        return os.pidfd_open(pid, flags)  # pylint: disable=no-member

    if not libc:
        _load_libc()

    try:
        pfof = libc.pidfd_open
    except AttributeError:
        __NR_pidfd_open = 434
        _pidfd_open = libc.syscall
        _pidfd_open.restype = ctypes.c_int
        _pidfd_open.argtypes = ctypes.c_long, ctypes.c_uint, ctypes.c_uint
        pfof = functools.partial(_pidfd_open, __NR_pidfd_open)

    fd = pfof(int(pid), int(flags))
    if fd == -1:
        raise_oserror(ctypes.get_errno())

    return fd


# Runtime patch if kernel supports the call.
if not hasattr(os, "pidfd_open"):
    try:
        import platform

        kversion = [int(x) for x in platform.release().split("-")[0].split(".")]
        kvok = kversion[0] > 5 or (kversion[0] == 5 and kversion[1] >= 4)
    except ValueError:
        kvok = False
    if kvok:
        os.pidfd_open = pidfd_open


def setns(fd, nstype):  # noqa: D402
    """See setns(2) manpage."""
    if not libc:
        _load_libc()

    if libc.setns(int(fd), int(nstype)) == -1:
        raise_oserror(ctypes.get_errno())


def unshare(flags):  # noqa: D402
    """See unshare(2) manpage."""
    if not libc:
        _load_libc()

    if libc.unshare(int(flags)) == -1:
        raise_oserror(ctypes.get_errno())


CLONE_NEWTIME = 0x00000080
CLONE_VM = 0x00000100
CLONE_FS = 0x00000200
CLONE_FILES = 0x00000400
CLONE_SIGHAND = 0x00000800
CLONE_PIDFD = 0x00001000
CLONE_PTRACE = 0x00002000
CLONE_VFORK = 0x00004000
CLONE_PARENT = 0x00008000
CLONE_THREAD = 0x00010000
CLONE_NEWNS = 0x00020000
CLONE_SYSVSEM = 0x00040000
CLONE_SETTLS = 0x00080000
CLONE_PARENT_SETTID = 0x00100000
CLONE_CHILD_CLEARTID = 0x00200000
CLONE_DETACHED = 0x00400000
CLONE_UNTRACED = 0x00800000
CLONE_CHILD_SETTID = 0x01000000
CLONE_NEWCGROUP = 0x02000000
CLONE_NEWUTS = 0x04000000
CLONE_NEWIPC = 0x08000000
CLONE_NEWUSER = 0x10000000
CLONE_NEWPID = 0x20000000
CLONE_NEWNET = 0x40000000
CLONE_IO = 0x80000000

clone_flag_names = {
    CLONE_NEWTIME: "CLONE_NEWTIME",
    CLONE_VM: "CLONE_VM",
    CLONE_FS: "CLONE_FS",
    CLONE_FILES: "CLONE_FILES",
    CLONE_SIGHAND: "CLONE_SIGHAND",
    CLONE_PIDFD: "CLONE_PIDFD",
    CLONE_PTRACE: "CLONE_PTRACE",
    CLONE_VFORK: "CLONE_VFORK",
    CLONE_PARENT: "CLONE_PARENT",
    CLONE_THREAD: "CLONE_THREAD",
    CLONE_NEWNS: "CLONE_NEWNS",
    CLONE_SYSVSEM: "CLONE_SYSVSEM",
    CLONE_SETTLS: "CLONE_SETTLS",
    CLONE_PARENT_SETTID: "CLONE_PARENT_SETTID",
    CLONE_CHILD_CLEARTID: "CLONE_CHILD_CLEARTID",
    CLONE_DETACHED: "CLONE_DETACHED",
    CLONE_UNTRACED: "CLONE_UNTRACED",
    CLONE_CHILD_SETTID: "CLONE_CHILD_SETTID",
    CLONE_NEWCGROUP: "CLONE_NEWCGROUP",
    CLONE_NEWUTS: "CLONE_NEWUTS",
    CLONE_NEWIPC: "CLONE_NEWIPC",
    CLONE_NEWUSER: "CLONE_NEWUSER",
    CLONE_NEWPID: "CLONE_NEWPID",
    CLONE_NEWNET: "CLONE_NEWNET",
    CLONE_IO: "CLONE_IO",
}


def clone_flag_string(flags):
    ns = [v for k, v in clone_flag_names.items() if k & flags]
    if ns:
        return "|".join(ns)
    return "None"


namespace_files = {
    CLONE_NEWUSER: "ns/user",
    CLONE_NEWCGROUP: "ns/cgroup",
    CLONE_NEWIPC: "ns/ipc",
    CLONE_NEWUTS: "ns/uts",
    CLONE_NEWNET: "ns/net",
    CLONE_NEWPID: "ns/pid_for_children",
    CLONE_NEWNS: "ns/mnt",
    CLONE_NEWTIME: "ns/time_for_children",
}

PR_SET_PDEATHSIG = 1
PR_GET_PDEATHSIG = 2
PR_SET_NAME = 15
PR_GET_NAME = 16


def set_process_name(name):
    if not libc:
        _load_libc()

    # Why does uncommenting this cause failure?
    # libc.prctl.argtypes = (
    #     ctypes.c_int,
    #     ctypes.c_ulong,
    #     ctypes.c_ulong,
    #     ctypes.c_ulong,
    #     ctypes.c_ulong,
    # )

    s = ctypes.create_string_buffer(bytes(name, encoding="ascii"))
    sr = ctypes.byref(s)
    libc.prctl(PR_SET_NAME, sr, 0, 0, 0)


def set_parent_death_signal(signum):
    if not libc:
        _load_libc()

    # Why does uncommenting this cause failure?
    libc.prctl.argtypes = (
        ctypes.c_int,
        ctypes.c_ulong,
        ctypes.c_ulong,
        ctypes.c_ulong,
        ctypes.c_ulong,
    )

    libc.prctl(PR_SET_PDEATHSIG, signum, 0, 0, 0)