summaryrefslogtreecommitdiffstats
path: root/tests/topotests/munet/linux.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/munet/linux.py')
-rw-r--r--tests/topotests/munet/linux.py267
1 files changed, 267 insertions, 0 deletions
diff --git a/tests/topotests/munet/linux.py b/tests/topotests/munet/linux.py
new file mode 100644
index 0000000..417f745
--- /dev/null
+++ b/tests/topotests/munet/linux.py
@@ -0,0 +1,267 @@
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# June 10 2022, Christian Hopps <chopps@labn.net>
+#
+# Copyright (c) 2022, LabN Consulting, L.L.C.
+#
+"""A module that gives access to linux unshare system call."""
+
+import ctypes # pylint: disable=C0415
+import ctypes.util # pylint: disable=C0415
+import errno
+import functools
+import os
+
+
+libc = None
+
+
+def raise_oserror(enum):
+ s = errno.errorcode[enum] if enum in errno.errorcode else str(enum)
+ error = OSError(s)
+ error.errno = enum
+ error.strerror = s
+ raise error
+
+
+def _load_libc():
+ global libc # pylint: disable=W0601,W0603
+ if libc:
+ return
+ lcpath = ctypes.util.find_library("c")
+ libc = ctypes.CDLL(lcpath, use_errno=True)
+
+
+def pause():
+ if not libc:
+ _load_libc()
+ libc.pause()
+
+
+MS_RDONLY = 1
+MS_NOSUID = 1 << 1
+MS_NODEV = 1 << 2
+MS_NOEXEC = 1 << 3
+MS_SYNCHRONOUS = 1 << 4
+MS_REMOUNT = 1 << 5
+MS_MANDLOCK = 1 << 6
+MS_DIRSYNC = 1 << 7
+MS_NOSYMFOLLOW = 1 << 8
+MS_NOATIME = 1 << 10
+MS_NODIRATIME = 1 << 11
+MS_BIND = 1 << 12
+MS_MOVE = 1 << 13
+MS_REC = 1 << 14
+MS_SILENT = 1 << 15
+MS_POSIXACL = 1 << 16
+MS_UNBINDABLE = 1 << 17
+MS_PRIVATE = 1 << 18
+MS_SLAVE = 1 << 19
+MS_SHARED = 1 << 20
+MS_RELATIME = 1 << 21
+MS_KERNMOUNT = 1 << 22
+MS_I_VERSION = 1 << 23
+MS_STRICTATIME = 1 << 24
+MS_LAZYTIME = 1 << 25
+
+
+def mount(source, target, fs, flags=0, options=""):
+ if not libc:
+ _load_libc()
+ libc.mount.argtypes = (
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_ulong,
+ ctypes.c_char_p,
+ )
+ fsenc = fs.encode() if fs else None
+ optenc = options.encode() if options else None
+ ret = libc.mount(source.encode(), target.encode(), fsenc, flags, optenc)
+ if ret < 0:
+ err = ctypes.get_errno()
+ raise OSError(
+ err,
+ f"Error mounting {source} ({fs}) on {target}"
+ f" with options '{options}': {os.strerror(err)}",
+ )
+
+
+# unmout options
+MNT_FORCE = 0x1
+MNT_DETACH = 0x2
+MNT_EXPIRE = 0x4
+UMOUNT_NOFOLLOW = 0x8
+
+
+def umount(target, options):
+ if not libc:
+ _load_libc()
+ libc.umount.argtypes = (ctypes.c_char_p, ctypes.c_uint)
+
+ ret = libc.umount(target.encode(), int(options))
+ if ret < 0:
+ err = ctypes.get_errno()
+ raise OSError(
+ err,
+ f"Error umounting {target} with options '{options}': {os.strerror(err)}",
+ )
+
+
+def pidfd_open(pid, flags=0):
+ if hasattr(os, "pidfd_open") and os.pidfd_open is not pidfd_open:
+ return os.pidfd_open(pid, flags) # pylint: disable=no-member
+
+ if not libc:
+ _load_libc()
+
+ try:
+ pfof = libc.pidfd_open
+ except AttributeError:
+ __NR_pidfd_open = 434
+ _pidfd_open = libc.syscall
+ _pidfd_open.restype = ctypes.c_int
+ _pidfd_open.argtypes = ctypes.c_long, ctypes.c_uint, ctypes.c_uint
+ pfof = functools.partial(_pidfd_open, __NR_pidfd_open)
+
+ fd = pfof(int(pid), int(flags))
+ if fd == -1:
+ raise_oserror(ctypes.get_errno())
+
+ return fd
+
+
+if not hasattr(os, "pidfd_open"):
+ os.pidfd_open = pidfd_open
+
+
+def setns(fd, nstype): # noqa: D402
+ """See setns(2) manpage."""
+ if not libc:
+ _load_libc()
+
+ if libc.setns(int(fd), int(nstype)) == -1:
+ raise_oserror(ctypes.get_errno())
+
+
+def unshare(flags): # noqa: D402
+ """See unshare(2) manpage."""
+ if not libc:
+ _load_libc()
+
+ if libc.unshare(int(flags)) == -1:
+ raise_oserror(ctypes.get_errno())
+
+
+CLONE_NEWTIME = 0x00000080
+CLONE_VM = 0x00000100
+CLONE_FS = 0x00000200
+CLONE_FILES = 0x00000400
+CLONE_SIGHAND = 0x00000800
+CLONE_PIDFD = 0x00001000
+CLONE_PTRACE = 0x00002000
+CLONE_VFORK = 0x00004000
+CLONE_PARENT = 0x00008000
+CLONE_THREAD = 0x00010000
+CLONE_NEWNS = 0x00020000
+CLONE_SYSVSEM = 0x00040000
+CLONE_SETTLS = 0x00080000
+CLONE_PARENT_SETTID = 0x00100000
+CLONE_CHILD_CLEARTID = 0x00200000
+CLONE_DETACHED = 0x00400000
+CLONE_UNTRACED = 0x00800000
+CLONE_CHILD_SETTID = 0x01000000
+CLONE_NEWCGROUP = 0x02000000
+CLONE_NEWUTS = 0x04000000
+CLONE_NEWIPC = 0x08000000
+CLONE_NEWUSER = 0x10000000
+CLONE_NEWPID = 0x20000000
+CLONE_NEWNET = 0x40000000
+CLONE_IO = 0x80000000
+
+clone_flag_names = {
+ CLONE_NEWTIME: "CLONE_NEWTIME",
+ CLONE_VM: "CLONE_VM",
+ CLONE_FS: "CLONE_FS",
+ CLONE_FILES: "CLONE_FILES",
+ CLONE_SIGHAND: "CLONE_SIGHAND",
+ CLONE_PIDFD: "CLONE_PIDFD",
+ CLONE_PTRACE: "CLONE_PTRACE",
+ CLONE_VFORK: "CLONE_VFORK",
+ CLONE_PARENT: "CLONE_PARENT",
+ CLONE_THREAD: "CLONE_THREAD",
+ CLONE_NEWNS: "CLONE_NEWNS",
+ CLONE_SYSVSEM: "CLONE_SYSVSEM",
+ CLONE_SETTLS: "CLONE_SETTLS",
+ CLONE_PARENT_SETTID: "CLONE_PARENT_SETTID",
+ CLONE_CHILD_CLEARTID: "CLONE_CHILD_CLEARTID",
+ CLONE_DETACHED: "CLONE_DETACHED",
+ CLONE_UNTRACED: "CLONE_UNTRACED",
+ CLONE_CHILD_SETTID: "CLONE_CHILD_SETTID",
+ CLONE_NEWCGROUP: "CLONE_NEWCGROUP",
+ CLONE_NEWUTS: "CLONE_NEWUTS",
+ CLONE_NEWIPC: "CLONE_NEWIPC",
+ CLONE_NEWUSER: "CLONE_NEWUSER",
+ CLONE_NEWPID: "CLONE_NEWPID",
+ CLONE_NEWNET: "CLONE_NEWNET",
+ CLONE_IO: "CLONE_IO",
+}
+
+
+def clone_flag_string(flags):
+ ns = [v for k, v in clone_flag_names.items() if k & flags]
+ if ns:
+ return "|".join(ns)
+ return "None"
+
+
+namespace_files = {
+ CLONE_NEWUSER: "ns/user",
+ CLONE_NEWCGROUP: "ns/cgroup",
+ CLONE_NEWIPC: "ns/ipc",
+ CLONE_NEWUTS: "ns/uts",
+ CLONE_NEWNET: "ns/net",
+ CLONE_NEWPID: "ns/pid_for_children",
+ CLONE_NEWNS: "ns/mnt",
+ CLONE_NEWTIME: "ns/time_for_children",
+}
+
+PR_SET_PDEATHSIG = 1
+PR_GET_PDEATHSIG = 2
+PR_SET_NAME = 15
+PR_GET_NAME = 16
+
+
+def set_process_name(name):
+ if not libc:
+ _load_libc()
+
+ # Why does uncommenting this cause failure?
+ # libc.prctl.argtypes = (
+ # ctypes.c_int,
+ # ctypes.c_ulong,
+ # ctypes.c_ulong,
+ # ctypes.c_ulong,
+ # ctypes.c_ulong,
+ # )
+
+ s = ctypes.create_string_buffer(bytes(name, encoding="ascii"))
+ sr = ctypes.byref(s)
+ libc.prctl(PR_SET_NAME, sr, 0, 0, 0)
+
+
+def set_parent_death_signal(signum):
+ if not libc:
+ _load_libc()
+
+ # Why does uncommenting this cause failure?
+ libc.prctl.argtypes = (
+ ctypes.c_int,
+ ctypes.c_ulong,
+ ctypes.c_ulong,
+ ctypes.c_ulong,
+ ctypes.c_ulong,
+ )
+
+ libc.prctl(PR_SET_PDEATHSIG, signum, 0, 0, 0)