diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 15:26:00 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 15:26:00 +0000 |
commit | 830407e88f9d40d954356c3754f2647f91d5c06a (patch) | |
tree | d6a0ece6feea91f3c656166dbaa884ef8a29740e /tests/integration/deckard/contrib/namespaces.py | |
parent | Initial commit. (diff) | |
download | knot-resolver-830407e88f9d40d954356c3754f2647f91d5c06a.tar.xz knot-resolver-830407e88f9d40d954356c3754f2647f91d5c06a.zip |
Adding upstream version 5.6.0.upstream/5.6.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/integration/deckard/contrib/namespaces.py')
-rw-r--r-- | tests/integration/deckard/contrib/namespaces.py | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/tests/integration/deckard/contrib/namespaces.py b/tests/integration/deckard/contrib/namespaces.py new file mode 100644 index 0000000..16c2285 --- /dev/null +++ b/tests/integration/deckard/contrib/namespaces.py @@ -0,0 +1,125 @@ +# SPDX-License-Identifier: ISC +# Source: https://vincentbernat.github.io/lldpd/ +# Copyright (c) 2008-2017, Vincent Bernat <vincent@bernat.im> + +import contextlib +import ctypes +import errno +import os +import signal + +# All allowed namespace types +NAMESPACE_FLAGS = dict(mnt=0x00020000, + uts=0x04000000, + ipc=0x08000000, + user=0x10000000, + pid=0x20000000, + net=0x40000000) +STACKSIZE = 1024*1024 + +libc = ctypes.CDLL('libc.so.6', use_errno=True) + + +@contextlib.contextmanager +def keep_directory(): + """Restore the current directory on exit.""" + pwd = os.getcwd() + try: + yield + finally: + os.chdir(pwd) + + +class LinuxNamespace: + """Combine several namespaces into one. + This gets a list of namespace types to create and combine into one. The + combined namespace can be used as a context manager to enter all the + created namespaces and exit them at the end. + """ + + def __init__(self, *namespaces): + self.namespaces = namespaces + for ns in namespaces: + assert ns in NAMESPACE_FLAGS + + # Get a pipe to signal the future child to exit + self.pipe = os.pipe() + + # First, create a child in the given namespaces + child = ctypes.CFUNCTYPE(ctypes.c_int)(self.child) + child_stack = ctypes.create_string_buffer(STACKSIZE) + child_stack_pointer = ctypes.c_void_p( + ctypes.cast(child_stack, + ctypes.c_void_p).value + STACKSIZE) + flags = signal.SIGCHLD + for ns in namespaces: + flags |= NAMESPACE_FLAGS[ns] + pid = libc.clone(child, child_stack_pointer, flags) + if pid == -1: + e = ctypes.get_errno() + raise OSError(e, os.strerror(e)) + + # If a user namespace, map UID 0 to the current one + if 'user' in namespaces: + uid_map = '0 {} 1'.format(os.getuid()) + gid_map = '0 {} 1'.format(os.getgid()) + with open('/proc/{}/uid_map'.format(pid), 'w') as f: + f.write(uid_map) + with open('/proc/{}/setgroups'.format(pid), 'w') as f: + f.write('deny') + with open('/proc/{}/gid_map'.format(pid), 'w') as f: + f.write(gid_map) + + # Retrieve a file descriptor to this new namespace + self.next = [os.open('/proc/{}/ns/{}'.format(pid, x), + os.O_RDONLY) for x in namespaces] + + # Keep a file descriptor to our old namespaces + self.previous = [os.open('/proc/self/ns/{}'.format(x), + os.O_RDONLY) for x in namespaces] + + # Tell the child all is done and let it die + os.close(self.pipe[0]) + if 'pid' not in namespaces: + os.close(self.pipe[1]) + self.pipe = None + os.waitpid(pid, 0) + + def __del__(self): + pass + + def child(self): + """Cloned child. + Just be here until our parent extract the file descriptor from + us. + """ + os.close(self.pipe[1]) + + while True: + try: + os.read(self.pipe[0], 1) + except OSError as e: + if e.errno in [errno.EAGAIN, errno.EINTR]: + continue + break + + os._exit(0) # Adopted code. pylint: disable=protected-access + + def fd(self, namespace): + """Return the file descriptor associated to a namespace""" + assert namespace in self.namespaces + return self.next[self.namespaces.index(namespace)] + + def __enter__(self): + with keep_directory(): + for n in self.next: + if libc.setns(n, 0) == -1: + ns = self.namespaces[self.next.index(n)] # noqa Adopted code. pylint: disable=unused-variable + e = ctypes.get_errno() + raise OSError(e, os.strerror(e)) + + def __exit__(self, *exc): + pass + + def __repr__(self): + return 'Namespace({})'.format(", ".join(self.namespaces)) |