125 lines
4 KiB
Python
125 lines
4 KiB
Python
# SPDX-License-Identifier: ISC
|
|
# Source: https://vincentbernat.github.io/lldpd/
|
|
# Copyright (c) 2008-2017, Vincent Bernat <vincent@bernat.im>
|
|
|
|
import contextlib
|
|
import ctypes
|
|
import errno
|
|
import os
|
|
import signal
|
|
|
|
# All allowed namespace types
|
|
NAMESPACE_FLAGS = dict(mnt=0x00020000,
|
|
uts=0x04000000,
|
|
ipc=0x08000000,
|
|
user=0x10000000,
|
|
pid=0x20000000,
|
|
net=0x40000000)
|
|
STACKSIZE = 1024*1024
|
|
|
|
libc = ctypes.CDLL('libc.so.6', use_errno=True)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def keep_directory():
|
|
"""Restore the current directory on exit."""
|
|
pwd = os.getcwd()
|
|
try:
|
|
yield
|
|
finally:
|
|
os.chdir(pwd)
|
|
|
|
|
|
class LinuxNamespace:
|
|
"""Combine several namespaces into one.
|
|
This gets a list of namespace types to create and combine into one. The
|
|
combined namespace can be used as a context manager to enter all the
|
|
created namespaces and exit them at the end.
|
|
"""
|
|
|
|
def __init__(self, *namespaces):
|
|
self.namespaces = namespaces
|
|
for ns in namespaces:
|
|
assert ns in NAMESPACE_FLAGS
|
|
|
|
# Get a pipe to signal the future child to exit
|
|
self.pipe = os.pipe()
|
|
|
|
# First, create a child in the given namespaces
|
|
child = ctypes.CFUNCTYPE(ctypes.c_int)(self.child)
|
|
child_stack = ctypes.create_string_buffer(STACKSIZE)
|
|
child_stack_pointer = ctypes.c_void_p(
|
|
ctypes.cast(child_stack,
|
|
ctypes.c_void_p).value + STACKSIZE)
|
|
flags = signal.SIGCHLD
|
|
for ns in namespaces:
|
|
flags |= NAMESPACE_FLAGS[ns]
|
|
pid = libc.clone(child, child_stack_pointer, flags)
|
|
if pid == -1:
|
|
e = ctypes.get_errno()
|
|
raise OSError(e, os.strerror(e))
|
|
|
|
# If a user namespace, map UID 0 to the current one
|
|
if 'user' in namespaces:
|
|
uid_map = '0 {} 1'.format(os.getuid())
|
|
gid_map = '0 {} 1'.format(os.getgid())
|
|
with open('/proc/{}/uid_map'.format(pid), 'w') as f:
|
|
f.write(uid_map)
|
|
with open('/proc/{}/setgroups'.format(pid), 'w') as f:
|
|
f.write('deny')
|
|
with open('/proc/{}/gid_map'.format(pid), 'w') as f:
|
|
f.write(gid_map)
|
|
|
|
# Retrieve a file descriptor to this new namespace
|
|
self.next = [os.open('/proc/{}/ns/{}'.format(pid, x),
|
|
os.O_RDONLY) for x in namespaces]
|
|
|
|
# Keep a file descriptor to our old namespaces
|
|
self.previous = [os.open('/proc/self/ns/{}'.format(x),
|
|
os.O_RDONLY) for x in namespaces]
|
|
|
|
# Tell the child all is done and let it die
|
|
os.close(self.pipe[0])
|
|
if 'pid' not in namespaces:
|
|
os.close(self.pipe[1])
|
|
self.pipe = None
|
|
os.waitpid(pid, 0)
|
|
|
|
def __del__(self):
|
|
pass
|
|
|
|
def child(self):
|
|
"""Cloned child.
|
|
Just be here until our parent extract the file descriptor from
|
|
us.
|
|
"""
|
|
os.close(self.pipe[1])
|
|
|
|
while True:
|
|
try:
|
|
os.read(self.pipe[0], 1)
|
|
except OSError as e:
|
|
if e.errno in [errno.EAGAIN, errno.EINTR]:
|
|
continue
|
|
break
|
|
|
|
os._exit(0) # Adopted code. pylint: disable=protected-access
|
|
|
|
def fd(self, namespace):
|
|
"""Return the file descriptor associated to a namespace"""
|
|
assert namespace in self.namespaces
|
|
return self.next[self.namespaces.index(namespace)]
|
|
|
|
def __enter__(self):
|
|
with keep_directory():
|
|
for n in self.next:
|
|
if libc.setns(n, 0) == -1:
|
|
ns = self.namespaces[self.next.index(n)] # noqa Adopted code. pylint: disable=unused-variable
|
|
e = ctypes.get_errno()
|
|
raise OSError(e, os.strerror(e))
|
|
|
|
def __exit__(self, *exc):
|
|
pass
|
|
|
|
def __repr__(self):
|
|
return 'Namespace({})'.format(", ".join(self.namespaces))
|