summaryrefslogtreecommitdiffstats
path: root/tests/integration/deckard/contrib/namespaces.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration/deckard/contrib/namespaces.py')
-rw-r--r--tests/integration/deckard/contrib/namespaces.py125
1 files changed, 125 insertions, 0 deletions
diff --git a/tests/integration/deckard/contrib/namespaces.py b/tests/integration/deckard/contrib/namespaces.py
new file mode 100644
index 0000000..16c2285
--- /dev/null
+++ b/tests/integration/deckard/contrib/namespaces.py
@@ -0,0 +1,125 @@
+# SPDX-License-Identifier: ISC
+# Source: https://vincentbernat.github.io/lldpd/
+# Copyright (c) 2008-2017, Vincent Bernat <vincent@bernat.im>
+
+import contextlib
+import ctypes
+import errno
+import os
+import signal
+
+# All allowed namespace types
+NAMESPACE_FLAGS = dict(mnt=0x00020000,
+ uts=0x04000000,
+ ipc=0x08000000,
+ user=0x10000000,
+ pid=0x20000000,
+ net=0x40000000)
+STACKSIZE = 1024*1024
+
+libc = ctypes.CDLL('libc.so.6', use_errno=True)
+
+
+@contextlib.contextmanager
+def keep_directory():
+ """Restore the current directory on exit."""
+ pwd = os.getcwd()
+ try:
+ yield
+ finally:
+ os.chdir(pwd)
+
+
+class LinuxNamespace:
+ """Combine several namespaces into one.
+ This gets a list of namespace types to create and combine into one. The
+ combined namespace can be used as a context manager to enter all the
+ created namespaces and exit them at the end.
+ """
+
+ def __init__(self, *namespaces):
+ self.namespaces = namespaces
+ for ns in namespaces:
+ assert ns in NAMESPACE_FLAGS
+
+ # Get a pipe to signal the future child to exit
+ self.pipe = os.pipe()
+
+ # First, create a child in the given namespaces
+ child = ctypes.CFUNCTYPE(ctypes.c_int)(self.child)
+ child_stack = ctypes.create_string_buffer(STACKSIZE)
+ child_stack_pointer = ctypes.c_void_p(
+ ctypes.cast(child_stack,
+ ctypes.c_void_p).value + STACKSIZE)
+ flags = signal.SIGCHLD
+ for ns in namespaces:
+ flags |= NAMESPACE_FLAGS[ns]
+ pid = libc.clone(child, child_stack_pointer, flags)
+ if pid == -1:
+ e = ctypes.get_errno()
+ raise OSError(e, os.strerror(e))
+
+ # If a user namespace, map UID 0 to the current one
+ if 'user' in namespaces:
+ uid_map = '0 {} 1'.format(os.getuid())
+ gid_map = '0 {} 1'.format(os.getgid())
+ with open('/proc/{}/uid_map'.format(pid), 'w') as f:
+ f.write(uid_map)
+ with open('/proc/{}/setgroups'.format(pid), 'w') as f:
+ f.write('deny')
+ with open('/proc/{}/gid_map'.format(pid), 'w') as f:
+ f.write(gid_map)
+
+ # Retrieve a file descriptor to this new namespace
+ self.next = [os.open('/proc/{}/ns/{}'.format(pid, x),
+ os.O_RDONLY) for x in namespaces]
+
+ # Keep a file descriptor to our old namespaces
+ self.previous = [os.open('/proc/self/ns/{}'.format(x),
+ os.O_RDONLY) for x in namespaces]
+
+ # Tell the child all is done and let it die
+ os.close(self.pipe[0])
+ if 'pid' not in namespaces:
+ os.close(self.pipe[1])
+ self.pipe = None
+ os.waitpid(pid, 0)
+
+ def __del__(self):
+ pass
+
+ def child(self):
+ """Cloned child.
+ Just be here until our parent extract the file descriptor from
+ us.
+ """
+ os.close(self.pipe[1])
+
+ while True:
+ try:
+ os.read(self.pipe[0], 1)
+ except OSError as e:
+ if e.errno in [errno.EAGAIN, errno.EINTR]:
+ continue
+ break
+
+ os._exit(0) # Adopted code. pylint: disable=protected-access
+
+ def fd(self, namespace):
+ """Return the file descriptor associated to a namespace"""
+ assert namespace in self.namespaces
+ return self.next[self.namespaces.index(namespace)]
+
+ def __enter__(self):
+ with keep_directory():
+ for n in self.next:
+ if libc.setns(n, 0) == -1:
+ ns = self.namespaces[self.next.index(n)] # noqa Adopted code. pylint: disable=unused-variable
+ e = ctypes.get_errno()
+ raise OSError(e, os.strerror(e))
+
+ def __exit__(self, *exc):
+ pass
+
+ def __repr__(self):
+ return 'Namespace({})'.format(", ".join(self.namespaces))