diff options
Diffstat (limited to 'tests/topotests/munet/base.py')
-rw-r--r-- | tests/topotests/munet/base.py | 3111 |
1 files changed, 3111 insertions, 0 deletions
diff --git a/tests/topotests/munet/base.py b/tests/topotests/munet/base.py new file mode 100644 index 0000000..06ca4de --- /dev/null +++ b/tests/topotests/munet/base.py @@ -0,0 +1,3111 @@ +# -*- coding: utf-8 eval: (blacken-mode 1) -*- +# SPDX-License-Identifier: GPL-2.0-or-later +# +# July 9 2021, Christian Hopps <chopps@labn.net> +# +# Copyright 2021, LabN Consulting, L.L.C. +# +"""A module that implements core functionality for library or standalone use.""" +import asyncio +import datetime +import errno +import ipaddress +import logging +import os +import platform +import re +import readline +import shlex +import signal +import subprocess +import sys +import tempfile +import time as time_mod + +from collections import defaultdict +from pathlib import Path +from typing import Union + +from . import config as munet_config +from . import linux + + +try: + import pexpect + + from pexpect.fdpexpect import fdspawn + from pexpect.popen_spawn import PopenSpawn + + have_pexpect = True +except ImportError: + have_pexpect = False + +PEXPECT_PROMPT = "PEXPECT_PROMPT>" +PEXPECT_CONTINUATION_PROMPT = "PEXPECT_PROMPT+" + +root_hostname = subprocess.check_output("hostname") +our_pid = os.getpid() + + +detailed_cmd_logging = False + + +class MunetError(Exception): + """A generic munet error.""" + + +class CalledProcessError(subprocess.CalledProcessError): + """Improved logging subclass of subprocess.CalledProcessError.""" + + def __str__(self): + o = self.output.strip() if self.output else "" + e = self.stderr.strip() if self.stderr else "" + s = f"returncode: {self.returncode} command: {self.cmd}" + o = "\n\tstdout: " + o if o else "" + e = "\n\tstderr: " + e if e else "" + return s + o + e + + def __repr__(self): + o = self.output.strip() if self.output else "" + e = self.stderr.strip() if self.stderr else "" + return f"munet.base.CalledProcessError({self.returncode}, {self.cmd}, {o}, {e})" + + +class Timeout: + """An object to passively monitor for timeouts.""" + + def __init__(self, delta): + self.delta = datetime.timedelta(seconds=delta) + self.started_on = datetime.datetime.now() + self.expires_on = self.started_on + self.delta + + def elapsed(self): + elapsed = datetime.datetime.now() - self.started_on + return elapsed.total_seconds() + + def is_expired(self): + return datetime.datetime.now() > self.expires_on + + def remaining(self): + remaining = self.expires_on - datetime.datetime.now() + return remaining.total_seconds() + + def __iter__(self): + return self + + def __next__(self): + remaining = self.remaining() + if remaining <= 0: + raise StopIteration() + return remaining + + +def fsafe_name(name): + return "".join(x if x.isalnum() else "_" for x in name) + + +def indent(s): + return "\t" + s.replace("\n", "\n\t") + + +def shell_quote(command): + """Return command wrapped in single quotes.""" + if sys.version_info[0] >= 3: + return shlex.quote(command) + return "'" + command.replace("'", "'\"'\"'") + "'" + + +def cmd_error(rc, o, e): + s = f"rc {rc}" + o = "\n\tstdout: " + o.strip() if o and o.strip() else "" + e = "\n\tstderr: " + e.strip() if e and e.strip() else "" + return s + o + e + + +def shorten(s): + s = s.strip() + i = s.find("\n") + if i > 0: + s = s[: i - 1] + if not s.endswith("..."): + s += "..." + if len(s) > 72: + s = s[:69] + if not s.endswith("..."): + s += "..." + return s + + +def comm_result(rc, o, e): + s = f"\n\treturncode {rc}" if rc else "" + o = "\n\tstdout: " + shorten(o) if o and o.strip() else "" + e = "\n\tstderr: " + shorten(e) if e and e.strip() else "" + return s + o + e + + +def proc_str(p): + if hasattr(p, "args"): + args = p.args if isinstance(p.args, str) else " ".join(p.args) + else: + args = "" + return f"proc pid: {p.pid} args: {args}" + + +def proc_error(p, o, e): + if hasattr(p, "args"): + args = p.args if isinstance(p.args, str) else " ".join(p.args) + else: + args = "" + + s = f"rc {p.returncode} pid {p.pid}" + a = "\n\targs: " + args if args else "" + o = "\n\tstdout: " + (o.strip() if o and o.strip() else "*empty*") + e = "\n\tstderr: " + (e.strip() if e and e.strip() else "*empty*") + return s + a + o + e + + +def comm_error(p): + rc = p.poll() + assert rc is not None + if not hasattr(p, "saved_output"): + p.saved_output = p.communicate() + return proc_error(p, *p.saved_output) + + +async def acomm_error(p): + rc = p.returncode + assert rc is not None + if not hasattr(p, "saved_output"): + p.saved_output = await p.communicate() + return proc_error(p, *p.saved_output) + + +def get_kernel_version(): + kvs = ( + subprocess.check_output("uname -r", shell=True, text=True).strip().split("-", 1) + ) + kv = kvs[0].split(".") + kv = [int(x) for x in kv] + return kv + + +def convert_number(value) -> int: + """Convert a number value with a possible suffix to an integer. + + >>> convert_number("100k") == 100 * 1024 + True + >>> convert_number("100M") == 100 * 1000 * 1000 + True + >>> convert_number("100Gi") == 100 * 1024 * 1024 * 1024 + True + >>> convert_number("55") == 55 + True + """ + if value is None: + raise ValueError("Invalid value None for convert_number") + rate = str(value) + base = 1000 + if rate[-1] == "i": + base = 1024 + rate = rate[:-1] + suffix = "KMGTPEZY" + index = suffix.find(rate[-1]) + if index == -1: + base = 1024 + index = suffix.lower().find(rate[-1]) + if index != -1: + rate = rate[:-1] + return int(rate) * base ** (index + 1) + + +def is_file_like(fo): + return isinstance(fo, int) or hasattr(fo, "fileno") + + +def get_tc_bits_value(user_value): + value = convert_number(user_value) / 1000 + return f"{value:03f}kbit" + + +def get_tc_bytes_value(user_value): + # Raw numbers are bytes in tc + return convert_number(user_value) + + +def get_tmp_dir(uniq): + return os.path.join(tempfile.mkdtemp(), uniq) + + +async def _async_get_exec_path(binary, cmdf, cache): + if isinstance(binary, str): + bins = [binary] + else: + bins = binary + for b in bins: + if b in cache: + return cache[b] + + rc, output, _ = await cmdf("which " + b, warn=False) + if not rc: + cache[b] = os.path.abspath(output.strip()) + return cache[b] + return None + + +def _get_exec_path(binary, cmdf, cache): + if isinstance(binary, str): + bins = [binary] + else: + bins = binary + for b in bins: + if b in cache: + return cache[b] + + rc, output, _ = cmdf("which " + b, warn=False) + if not rc: + cache[b] = os.path.abspath(output.strip()) + return cache[b] + return None + + +def get_event_loop(): + """Configure and return our non-thread using event loop. + + This function configures a new child watcher to not use threads. + Threads cannot be used when we inline unshare a PID namespace. + """ + policy = asyncio.get_event_loop_policy() + loop = policy.get_event_loop() + owatcher = policy.get_child_watcher() + logging.debug( + "event_loop_fixture: global policy %s, current loop %s, current watcher %s", + policy, + loop, + owatcher, + ) + + policy.set_child_watcher(None) + owatcher.close() + + try: + watcher = asyncio.PidfdChildWatcher() # pylint: disable=no-member + except Exception: + watcher = asyncio.SafeChildWatcher() + loop = policy.get_event_loop() + + logging.debug( + "event_loop_fixture: attaching new watcher %s to loop and setting in policy", + watcher, + ) + watcher.attach_loop(loop) + policy.set_child_watcher(watcher) + policy.set_event_loop(loop) + assert asyncio.get_event_loop_policy().get_child_watcher() is watcher + + return loop + + +class Commander: # pylint: disable=R0904 + """An object that can execute commands.""" + + tmux_wait_gen = 0 + + def __init__(self, name, logger=None, unet=None, **kwargs): + """Create a Commander. + + Args: + name: name of the commander object + logger: logger to use for logging commands a defualt is supplied if this + is None + unet: unet that owns this object, only used by Commander in run_in_window, + otherwise can be None. + """ + # del kwargs # deal with lint warning + # logging.warning("Commander: name %s kwargs %s", name, kwargs) + + self.name = name + self.unet = unet + self.deleting = False + self.last = None + self.exec_paths = {} + + if not logger: + logname = f"munet.{self.__class__.__name__.lower()}.{name}" + self.logger = logging.getLogger(logname) + self.logger.setLevel(logging.DEBUG) + else: + self.logger = logger + + super().__init__(**kwargs) + + @property + def is_vm(self): + return False + + @property + def is_container(self): + return False + + def set_logger(self, logfile): + self.logger = logging.getLogger(__name__ + ".commander." + self.name) + self.logger.setLevel(logging.DEBUG) + if isinstance(logfile, str): + handler = logging.FileHandler(logfile, mode="w") + else: + handler = logging.StreamHandler(logfile) + + fmtstr = "%(asctime)s.%(msecs)03d %(levelname)s: {}({}): %(message)s".format( + self.__class__.__name__, self.name + ) + handler.setFormatter(logging.Formatter(fmt=fmtstr)) + self.logger.addHandler(handler) + + def _get_pre_cmd(self, use_str, use_pty, **kwargs): + """Get the pre-user-command values. + + The values returned here should be what is required to cause the user's command + to execute in the correct context (e.g., namespace, container, sshremote). + """ + del kwargs + del use_pty + return "" if use_str else [] + + def __str__(self): + return f"{self.__class__.__name__}({self.name})" + + async def async_get_exec_path(self, binary): + """Return the full path to the binary executable. + + `binary` :: binary name or list of binary names + """ + return await _async_get_exec_path( + binary, self.async_cmd_status_nsonly, self.exec_paths + ) + + def get_exec_path(self, binary): + """Return the full path to the binary executable. + + `binary` :: binary name or list of binary names + """ + return _get_exec_path(binary, self.cmd_status_nsonly, self.exec_paths) + + def get_exec_path_host(self, binary): + """Return the full path to the binary executable. + + If the object is actually a derived class (e.g., a container) this method will + return the exec path for the native namespace rather than the container. The + path is the one which the other xxx_host methods will use. + + `binary` :: binary name or list of binary names + """ + return get_exec_path_host(binary) + + def test(self, flags, arg): + """Run test binary, with flags and arg.""" + test_path = self.get_exec_path(["test"]) + rc, _, _ = self.cmd_status([test_path, flags, arg], warn=False) + return not rc + + def test_nsonly(self, flags, arg): + """Run test binary, with flags and arg.""" + test_path = self.get_exec_path(["test"]) + rc, _, _ = self.cmd_status_nsonly([test_path, flags, arg], warn=False) + return not rc + + def path_exists(self, path): + """Check if path exists.""" + return self.test("-e", path) + + async def cleanup_pid(self, pid, kill_pid=None): + """Signal a pid to exit with escalating forcefulness.""" + if kill_pid is None: + kill_pid = pid + + for sn in (signal.SIGHUP, signal.SIGKILL): + self.logger.debug( + "%s: %s %s (wait %s)", self, signal.Signals(sn).name, kill_pid, pid + ) + + os.kill(kill_pid, sn) + + # No need to wait after this. + if sn == signal.SIGKILL: + return + + # try each signal, waiting 15 seconds for exit before advancing + wait_sec = 30 + self.logger.debug("%s: waiting %ss for pid to exit", self, wait_sec) + for _ in Timeout(wait_sec): + try: + status = os.waitpid(pid, os.WNOHANG) + if status == (0, 0): + await asyncio.sleep(0.1) + else: + self.logger.debug("pid %s exited status %s", pid, status) + return + except OSError as error: + if error.errno == errno.ECHILD: + self.logger.debug("%s: pid %s was reaped", self, pid) + else: + self.logger.warning( + "%s: error waiting on pid %s: %s", self, pid, error + ) + return + self.logger.debug("%s: timeout waiting on pid %s to exit", self, pid) + + def _get_sub_args(self, cmd_list, defaults, use_pty=False, ns_only=False, **kwargs): + """Returns pre-command, cmd, and default keyword args.""" + assert not isinstance(cmd_list, str) + + defaults["shell"] = False + pre_cmd_list = self._get_pre_cmd(False, use_pty, ns_only=ns_only, **kwargs) + cmd_list = [str(x) for x in cmd_list] + + # os_env = {k: v for k, v in os.environ.items() if k.startswith("MUNET")} + # env = {**os_env, **(kwargs["env"] if "env" in kwargs else {})} + env = {**(kwargs["env"] if "env" in kwargs else os.environ)} + if "MUNET_NODENAME" not in env: + env["MUNET_NODENAME"] = self.name + kwargs["env"] = env + + defaults.update(kwargs) + + return pre_cmd_list, cmd_list, defaults + + def _common_prologue(self, async_exec, method, cmd, skip_pre_cmd=False, **kwargs): + cmd_list = self._get_cmd_as_list(cmd) + if method == "_spawn": + defaults = { + "encoding": "utf-8", + "codec_errors": "ignore", + } + else: + defaults = { + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + } + if not async_exec: + defaults["encoding"] = "utf-8" + + pre_cmd_list, cmd_list, defaults = self._get_sub_args( + cmd_list, defaults, **kwargs + ) + + use_pty = kwargs.get("use_pty", False) + if method == "_spawn": + # spawn doesn't take "shell" keyword arg + if "shell" in defaults: + del defaults["shell"] + # this is required to avoid receiving a STOPPED signal on expect! + if not use_pty: + defaults["preexec_fn"] = os.setsid + defaults["env"]["PS1"] = "$ " + + if not detailed_cmd_logging: + pre_cmd_str = shlex.join(pre_cmd_list) if not skip_pre_cmd else "" + if "nsenter" in pre_cmd_str: + self.logger.debug('%s("%s")', method, shlex.join(cmd_list)) + elif pre_cmd_str: + self.logger.debug( + '%s("%s") [precmd: %s]', method, shlex.join(cmd_list), pre_cmd_str + ) + else: + self.logger.debug('%s("%s") [no precmd]', method, shlex.join(cmd_list)) + else: + self.logger.debug( + '%s: %s %s("%s", pre_cmd: "%s" use_pty: %s kwargs: %.120s)', + self, + "XXX" if method == "_spawn" else "", + method, + cmd_list, + pre_cmd_list if not skip_pre_cmd else "", + use_pty, + defaults, + ) + + actual_cmd_list = cmd_list if skip_pre_cmd else pre_cmd_list + cmd_list + return actual_cmd_list, defaults + + async def _async_popen(self, method, cmd, **kwargs): + """Create a new asynchronous subprocess.""" + acmd, kwargs = self._common_prologue(True, method, cmd, **kwargs) + p = await asyncio.create_subprocess_exec(*acmd, **kwargs) + return p, acmd + + def _popen(self, method, cmd, **kwargs): + """Create a subprocess.""" + acmd, kwargs = self._common_prologue(False, method, cmd, **kwargs) + p = subprocess.Popen(acmd, **kwargs) + return p, acmd + + def _fdspawn(self, fo, **kwargs): + defaults = {} + defaults.update(kwargs) + + if "echo" in defaults: + del defaults["echo"] + + if "encoding" not in defaults: + defaults["encoding"] = "utf-8" + if "codec_errors" not in defaults: + defaults["codec_errors"] = "ignore" + encoding = defaults["encoding"] + + self.logger.debug("%s: _fdspawn(%s, kwargs: %s)", self, fo, defaults) + + p = fdspawn(fo, **defaults) + + # We don't have TTY like conversions of LF to CRLF + p.crlf = os.linesep.encode(encoding) + + # we own the socket now detach the file descriptor to keep it from closing + if hasattr(fo, "detach"): + fo.detach() + + return p + + def _spawn(self, cmd, skip_pre_cmd=False, use_pty=False, echo=False, **kwargs): + logging.debug( + '%s: XXX _spawn: cmd "%s" skip_pre_cmd %s use_pty %s echo %s kwargs %s', + self, + cmd, + skip_pre_cmd, + use_pty, + echo, + kwargs, + ) + actual_cmd, defaults = self._common_prologue( + False, "_spawn", cmd, skip_pre_cmd=skip_pre_cmd, use_pty=use_pty, **kwargs + ) + + self.logger.debug( + '%s: XXX %s("%s", use_pty %s echo %s defaults: %s)', + self, + "PopenSpawn" if not use_pty else "pexpect.spawn", + actual_cmd, + use_pty, + echo, + defaults, + ) + + # We don't specify a timeout it defaults to 30s is that OK? + if not use_pty: + p = PopenSpawn(actual_cmd, **defaults) + else: + p = pexpect.spawn(actual_cmd[0], actual_cmd[1:], echo=echo, **defaults) + return p, actual_cmd + + def spawn( + self, + cmd, + spawned_re, + expects=(), + sends=(), + use_pty=False, + logfile=None, + logfile_read=None, + logfile_send=None, + trace=None, + **kwargs, + ): + """Create a spawned send/expect process. + + Args: + cmd: list of args to exec/popen with, or an already open socket + spawned_re: what to look for to know when done, `spawn` returns when seen + expects: a list of regex other than `spawned_re` to look for. Commonly, + "ogin:" or "[Pp]assword:"r. + sends: what to send when an element of `expects` matches. So e.g., the + username or password if thats what corresponding expect matched. Can + be the empty string to send nothing. + use_pty: true for pty based expect, otherwise uses popen (pipes/files) + trace: if true then log send/expects + **kwargs - kwargs passed on the _spawn. + + Returns: + A pexpect process. + + Raises: + pexpect.TIMEOUT, pexpect.EOF as documented in `pexpect` + CalledProcessError if EOF is seen and `cmd` exited then + raises a CalledProcessError to indicate the failure. + """ + if is_file_like(cmd): + assert not use_pty + ac = "*socket*" + p = self._fdspawn(cmd, **kwargs) + else: + p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs) + + if logfile: + p.logfile = logfile + if logfile_read: + p.logfile_read = logfile_read + if logfile_send: + p.logfile_send = logfile_send + + # for spawned shells (i.e., a direct command an not a console) + # this is wrong and will cause 2 prompts + if not use_pty: + # This isn't very nice looking + p.echo = False + if not is_file_like(cmd): + p.isalive = lambda: p.proc.poll() is None + if not hasattr(p, "close"): + p.close = p.wait + + # Do a quick check to see if we got the prompt right away, otherwise we may be + # at a console so we send a \n to re-issue the prompt + index = p.expect([spawned_re, pexpect.TIMEOUT, pexpect.EOF], timeout=0.1) + if index == 0: + assert p.match is not None + self.logger.debug( + "%s: got spawned_re quick: '%s' matching '%s'", + self, + p.match.group(0), + spawned_re, + ) + return p + + # Now send a CRLF to cause the prompt (or whatever else) to re-issue + p.send("\n") + try: + patterns = [spawned_re, *expects] + + self.logger.debug("%s: expecting: %s", self, patterns) + + while index := p.expect(patterns): + if trace: + assert p.match is not None + self.logger.debug( + "%s: got expect: '%s' matching %d '%s', sending '%s'", + self, + p.match.group(0), + index, + patterns[index], + sends[index - 1], + ) + if sends[index - 1]: + p.send(sends[index - 1]) + + self.logger.debug("%s: expecting again: %s", self, patterns) + self.logger.debug( + "%s: got spawned_re: '%s' matching '%s'", + self, + p.match.group(0), + spawned_re, + ) + return p + except pexpect.TIMEOUT: + self.logger.error( + "%s: TIMEOUT looking for spawned_re '%s' expect buffer so far:\n%s", + self, + spawned_re, + indent(p.buffer), + ) + raise + except pexpect.EOF as eoferr: + if p.isalive(): + raise + rc = p.status + before = indent(p.before) + error = CalledProcessError(rc, ac, output=before) + self.logger.error( + "%s: EOF looking for spawned_re '%s' before EOF:\n%s", + self, + spawned_re, + before, + ) + p.close() + raise error from eoferr + + async def shell_spawn( + self, + cmd, + prompt, + expects=(), + sends=(), + use_pty=False, + will_echo=False, + is_bourne=True, + init_newline=False, + **kwargs, + ): + """Create a shell REPL (read-eval-print-loop). + + Args: + cmd: shell and list of args to popen with, or an already open socket + prompt: the REPL prompt to look for, the function returns when seen + expects: a list of regex other than `spawned_re` to look for. Commonly, + "ogin:" or "[Pp]assword:"r. + sends: what to send when an element of `expects` matches. So e.g., the + username or password if thats what corresponding expect matched. Can + be the empty string to send nothing. + is_bourne: if False then do not modify shell prompt for internal + parser friently format, and do not expect continuation prompts. + init_newline: send an initial newline for non-bourne shell spawns, otherwise + expect the prompt simply from running the command + use_pty: true for pty based expect, otherwise uses popen (pipes/files) + will_echo: bash is buggy in that it echo's to non-tty unlike any other + sh/ksh, set this value to true if running back + **kwargs - kwargs passed on the _spawn. + """ + combined_prompt = r"({}|{})".format(re.escape(PEXPECT_PROMPT), prompt) + + assert not is_file_like(cmd) or not use_pty + p = self.spawn( + cmd, + combined_prompt, + expects=expects, + sends=sends, + use_pty=use_pty, + echo=False, + **kwargs, + ) + assert not p.echo + + if not is_bourne: + if init_newline: + p.send("\n") + return ShellWrapper(p, prompt, will_echo=will_echo) + + ps1 = PEXPECT_PROMPT + ps2 = PEXPECT_CONTINUATION_PROMPT + + # Avoid problems when =/usr/bin/env= prints the values + ps1p = ps1[:5] + "${UNSET_V}" + ps1[5:] + ps2p = ps2[:5] + "${UNSET_V}" + ps2[5:] + + ps1 = re.escape(ps1) + ps2 = re.escape(ps2) + + extra = "PAGER=cat; export PAGER; TERM=dumb; unset HISTFILE; set +o emacs +o vi" + pchg = "PS1='{0}' PS2='{1}' PROMPT_COMMAND=''\n".format(ps1p, ps2p) + p.send(pchg) + return ShellWrapper(p, ps1, ps2, extra_init_cmd=extra, will_echo=will_echo) + + def popen(self, cmd, **kwargs): + """Creates a pipe with the given `command`. + + Args: + cmd: `str` or `list` of command to open a pipe with. + **kwargs: kwargs is eventually passed on to Popen. If `command` is a string + then will be invoked with `bash -c`, otherwise `command` is a list and + will be invoked without a shell. + + Returns: + a subprocess.Popen object. + """ + return self._popen("popen", cmd, **kwargs)[0] + + def popen_nsonly(self, cmd, **kwargs): + """Creates a pipe with the given `command`. + + Args: + cmd: `str` or `list` of command to open a pipe with. + **kwargs: kwargs is eventually passed on to Popen. If `command` is a string + then will be invoked with `bash -c`, otherwise `command` is a list and + will be invoked without a shell. + + Returns: + a subprocess.Popen object. + """ + return self._popen("popen_nsonly", cmd, ns_only=True, **kwargs)[0] + + async def async_popen(self, cmd, **kwargs): + """Creates a pipe with the given `command`. + + Args: + cmd: `str` or `list` of command to open a pipe with. + **kwargs: kwargs is eventually passed on to create_subprocess_exec. If + `command` is a string then will be invoked with `bash -c`, otherwise + `command` is a list and will be invoked without a shell. + + Returns: + a asyncio.subprocess.Process object. + """ + p, _ = await self._async_popen("async_popen", cmd, **kwargs) + return p + + async def async_popen_nsonly(self, cmd, **kwargs): + """Creates a pipe with the given `command`. + + Args: + cmd: `str` or `list` of command to open a pipe with. + **kwargs: kwargs is eventually passed on to create_subprocess_exec. If + `command` is a string then will be invoked with `bash -c`, otherwise + `command` is a list and will be invoked without a shell. + + Returns: + a asyncio.subprocess.Process object. + """ + p, _ = await self._async_popen( + "async_popen_nsonly", cmd, ns_only=True, **kwargs + ) + return p + + async def async_cleanup_proc(self, p, pid=None): + """Terminate a process started with a popen call. + + Args: + p: return value from :py:`async_popen`, :py:`popen`, et al. + pid: pid to signal instead of p.pid, typically a child of + cmd_p == nsenter. + + Returns: + None on success, the ``p`` if multiple timeouts occur even + after a SIGKILL sent. + """ + if not p: + return None + + if p.returncode is not None: + if isinstance(p, subprocess.Popen): + o, e = p.communicate() + else: + o, e = await p.communicate() + self.logger.debug( + "%s: cmd_p already exited status: %s", self, proc_error(p, o, e) + ) + return None + + if pid is None: + pid = p.pid + + self.logger.debug("%s: terminate process: %s (pid %s)", self, proc_str(p), pid) + try: + # This will SIGHUP and wait a while then SIGKILL and return immediately + await self.cleanup_pid(p.pid, pid) + + # Wait another 2 seconds after the possible SIGKILL above for the + # parent nsenter to cleanup and exit + wait_secs = 2 + if isinstance(p, subprocess.Popen): + o, e = p.communicate(timeout=wait_secs) + else: + o, e = await asyncio.wait_for(p.communicate(), timeout=wait_secs) + self.logger.debug( + "%s: cmd_p exited after kill, status: %s", self, proc_error(p, o, e) + ) + except (asyncio.TimeoutError, subprocess.TimeoutExpired): + self.logger.warning("%s: SIGKILL timeout", self) + return p + except Exception as error: + self.logger.warning( + "%s: kill unexpected exception: %s", self, error, exc_info=True + ) + return p + return None + + @staticmethod + def _cmd_status_input(stdin): + pinput = None + if isinstance(stdin, (bytes, str)): + pinput = stdin + stdin = subprocess.PIPE + return pinput, stdin + + def _cmd_status_finish(self, p, c, ac, o, e, raises, warn): + rc = p.returncode + self.last = (rc, ac, c, o, e) + if not rc: + resstr = comm_result(rc, o, e) + if resstr: + self.logger.debug("%s", resstr) + else: + if warn: + self.logger.warning("%s: proc failed: %s", self, proc_error(p, o, e)) + if raises: + # error = Exception("stderr: {}".format(stderr)) + # This annoyingly doesnt' show stderr when printed normally + raise CalledProcessError(rc, ac, o, e) + return rc, o, e + + def _cmd_status(self, cmds, raises=False, warn=True, stdin=None, **kwargs): + """Execute a command.""" + pinput, stdin = Commander._cmd_status_input(stdin) + p, actual_cmd = self._popen("cmd_status", cmds, stdin=stdin, **kwargs) + o, e = p.communicate(pinput) + return self._cmd_status_finish(p, cmds, actual_cmd, o, e, raises, warn) + + async def _async_cmd_status( + self, cmds, raises=False, warn=True, stdin=None, text=None, **kwargs + ): + """Execute a command.""" + pinput, stdin = Commander._cmd_status_input(stdin) + p, actual_cmd = await self._async_popen( + "async_cmd_status", cmds, stdin=stdin, **kwargs + ) + + if text is False: + encoding = None + else: + encoding = kwargs.get("encoding", "utf-8") + + if encoding is not None and isinstance(pinput, str): + pinput = pinput.encode(encoding) + o, e = await p.communicate(pinput) + if encoding is not None: + o = o.decode(encoding) if o is not None else o + e = e.decode(encoding) if e is not None else e + return self._cmd_status_finish(p, cmds, actual_cmd, o, e, raises, warn) + + def _get_cmd_as_list(self, cmd): + """Given a list or string return a list form for execution. + + If `cmd` is a string then the returned list uses bash and looks + like this: ["/bin/bash", "-c", cmd]. Some node types override + this function if they utilize a different shell as to return + a different list of values. + + Args: + cmd: list or string representing the command to execute. + + Returns: + list of commands to execute. + """ + if not isinstance(cmd, str): + cmds = cmd + else: + # Make sure the code doesn't think `cd` will work. + assert not re.match(r"cd(\s*|\s+(\S+))$", cmd) + cmds = ["/bin/bash", "-c", cmd] + return cmds + + def cmd_nostatus(self, cmd, **kwargs): + """Run given command returning output[s]. + + Args: + cmd: `str` or `list` of the command to execute. If a string is given + it is run using a shell, otherwise the list is executed directly + as the binary and arguments. + **kwargs: kwargs is eventually passed on to Popen. If `command` is a string + then will be invoked with `bash -c`, otherwise `command` is a list and + will be invoked without a shell. + + Returns: + if "stderr" is in kwargs and not equal to subprocess.STDOUT, then + both stdout and stderr are returned, otherwise stderr is combined + with stdout and only stdout is returned. + """ + # + # This method serves as the basis for all derived sync cmd variations, so to + # override sync cmd behavior simply override this function and *not* the other + # variations, unless you are changing only that variation's behavior + # + + # XXX change this back to _cmd_status instead of cmd_status when we + # consolidate and cleanup the container overrides of *cmd_* functions + + cmds = cmd + if "stderr" in kwargs and kwargs["stderr"] != subprocess.STDOUT: + _, o, e = self.cmd_status(cmds, **kwargs) + return o, e + if "stderr" in kwargs: + del kwargs["stderr"] + _, o, _ = self.cmd_status(cmds, stderr=subprocess.STDOUT, **kwargs) + return o + + def cmd_status(self, cmd, **kwargs): + """Run given command returning status and outputs. + + Args: + cmd: `str` or `list` of the command to execute. If a string is given + it is run using a shell, otherwise the list is executed directly + as the binary and arguments. + **kwargs: kwargs is eventually passed on to Popen. If `command` is a string + then will be invoked with `bash -c`, otherwise `command` is a list and + will be invoked without a shell. + + Returns: + (status, output, error) are returned + status: the returncode of the command. + output: stdout as a string from the command. + error: stderr as a string from the command. + """ + # + # This method serves as the basis for all derived sync cmd variations, so to + # override sync cmd behavior simply override this function and *not* the other + # variations, unless you are changing only that variation's behavior + # + return self._cmd_status(cmd, **kwargs) + + def cmd_raises(self, cmd, **kwargs): + """Execute a command. Raise an exception on errors. + + Args: + cmd: `str` or `list` of the command to execute. If a string is given + it is run using a shell, otherwise the list is executed directly + as the binary and arguments. + **kwargs: kwargs is eventually passed on to Popen. If `command` is a string + then will be invoked with `bash -c`, otherwise `command` is a list and + will be invoked without a shell. + + Returns: + output: stdout as a string from the command. + + Raises: + CalledProcessError: on non-zero exit status + """ + _, stdout, _ = self._cmd_status(cmd, raises=True, **kwargs) + return stdout + + def cmd_nostatus_nsonly(self, cmd, **kwargs): + # Make sure the command runs on the host and not in any container. + return self.cmd_nostatus(cmd, ns_only=True, **kwargs) + + def cmd_status_nsonly(self, cmd, **kwargs): + # Make sure the command runs on the host and not in any container. + return self._cmd_status(cmd, ns_only=True, **kwargs) + + def cmd_raises_nsonly(self, cmd, **kwargs): + # Make sure the command runs on the host and not in any container. + _, stdout, _ = self._cmd_status(cmd, raises=True, ns_only=True, **kwargs) + return stdout + + async def async_cmd_status(self, cmd, **kwargs): + """Run given command returning status and outputs. + + Args: + cmd: `str` or `list` of the command to execute. If a string is given + it is run using a shell, otherwise the list is executed directly + as the binary and arguments. + **kwargs: kwargs is eventually passed on to create_subprocess_exec. If + `cmd` is a string then will be invoked with `bash -c`, otherwise + `cmd` is a list and will be invoked without a shell. + + Returns: + (status, output, error) are returned + status: the returncode of the command. + output: stdout as a string from the command. + error: stderr as a string from the command. + """ + # + # This method serves as the basis for all derived async cmd variations, so to + # override async cmd behavior simply override this function and *not* the other + # variations, unless you are changing only that variation's behavior + # + return await self._async_cmd_status(cmd, **kwargs) + + async def async_cmd_nostatus(self, cmd, **kwargs): + """Run given command returning output[s]. + + Args: + cmd: `str` or `list` of the command to execute. If a string is given + it is run using a shell, otherwise the list is executed directly + as the binary and arguments. + **kwargs: kwargs is eventually passed on to create_subprocess_exec. If + `cmd` is a string then will be invoked with `bash -c`, otherwise + `cmd` is a list and will be invoked without a shell. + + Returns: + if "stderr" is in kwargs and not equal to subprocess.STDOUT, then + both stdout and stderr are returned, otherwise stderr is combined + with stdout and only stdout is returned. + + """ + # XXX change this back to _async_cmd_status instead of cmd_status when we + # consolidate and cleanup the container overrides of *cmd_* functions + + cmds = cmd + if "stderr" in kwargs and kwargs["stderr"] != subprocess.STDOUT: + _, o, e = await self._async_cmd_status(cmds, **kwargs) + return o, e + if "stderr" in kwargs: + del kwargs["stderr"] + _, o, _ = await self._async_cmd_status(cmds, stderr=subprocess.STDOUT, **kwargs) + return o + + async def async_cmd_raises(self, cmd, **kwargs): + """Execute a command. Raise an exception on errors. + + Args: + cmd: `str` or `list` of the command to execute. If a string is given + it is run using a shell, otherwise the list is executed directly + as the binary and arguments. + **kwargs: kwargs is eventually passed on to create_subprocess_exec. If + `cmd` is a string then will be invoked with `bash -c`, otherwise + `cmd` is a list and will be invoked without a shell. + + Returns: + output: stdout as a string from the command. + + Raises: + CalledProcessError: on non-zero exit status + """ + _, stdout, _ = await self._async_cmd_status(cmd, raises=True, **kwargs) + return stdout + + async def async_cmd_status_nsonly(self, cmd, **kwargs): + # Make sure the command runs on the host and not in any container. + return await self._async_cmd_status(cmd, ns_only=True, **kwargs) + + async def async_cmd_raises_nsonly(self, cmd, **kwargs): + # Make sure the command runs on the host and not in any container. + _, stdout, _ = await self._async_cmd_status( + cmd, raises=True, ns_only=True, **kwargs + ) + return stdout + + def cmd_legacy(self, cmd, **kwargs): + """Execute a command with stdout and stderr joined, *IGNORES ERROR*.""" + defaults = {"stderr": subprocess.STDOUT} + defaults.update(kwargs) + _, stdout, _ = self._cmd_status(cmd, raises=False, **defaults) + return stdout + + # Run a command in a new window (gnome-terminal, screen, tmux, xterm) + def run_in_window( + self, + cmd, + wait_for=False, + background=False, + name=None, + title=None, + forcex=False, + new_window=False, + tmux_target=None, + ns_only=False, + ): + """Run a command in a new window (TMUX, Screen or XTerm). + + Args: + cmd: string to execute. + wait_for: True to wait for exit from command or `str` as channel neme to + signal on exit, otherwise False + background: Do not change focus to new window. + title: Title for new pane (tmux) or window (xterm). + name: Name of the new window (tmux) + forcex: Force use of X11. + new_window: Open new window (instead of pane) in TMUX + tmux_target: Target for tmux pane. + + Returns: + the pane/window identifier from TMUX (depends on `new_window`) + """ + channel = None + if isinstance(wait_for, str): + channel = wait_for + elif wait_for is True: + channel = "{}-wait-{}".format(our_pid, Commander.tmux_wait_gen) + Commander.tmux_wait_gen += 1 + + if forcex or ("TMUX" not in os.environ and "STY" not in os.environ): + root_level = False + else: + root_level = True + + # SUDO: The important thing to note is that with all these methods we are + # executing on the users windowing system, so even though we are normally + # running as root, we will not be when the command is dispatched. Also + # in the case of SCREEN and X11 we need to sudo *back* to the user as well + # This is also done by SSHRemote by defualt so we should *not* sudo back + # if we are SSHRemote. + + # XXX need to test ssh in screen + # XXX need to test ssh in Xterm + sudo_path = get_exec_path_host(["sudo"]) + # This first test case seems same as last but using list instead of string? + if self.is_vm and self.use_ssh: # pylint: disable=E1101 + if isinstance(cmd, str): + cmd = shlex.split(cmd) + cmd = ["/usr/bin/env", f"MUNET_NODENAME={self.name}"] + cmd + + # get the ssh cmd + cmd = self._get_pre_cmd(False, True, ns_only=ns_only) + [shlex.join(cmd)] + unet = self.unet # pylint: disable=E1101 + uns_cmd = unet._get_pre_cmd( # pylint: disable=W0212 + False, True, ns_only=True, root_level=root_level + ) + # get the nsenter for munet + nscmd = [ + sudo_path, + *uns_cmd, + *cmd, + ] + else: + # This is the command to execute to be inside the namespace. + # We are getting into trouble with quoting. + # Why aren't we passing in MUNET_RUNDIR? + cmd = f"/usr/bin/env MUNET_NODENAME={self.name} {cmd}" + # We need sudo b/c we are executing as the user inside the window system. + sudo_path = get_exec_path_host(["sudo"]) + nscmd = ( + sudo_path + + " " + + self._get_pre_cmd(True, True, ns_only=ns_only, root_level=root_level) + + " " + + cmd + ) + + if "TMUX" in os.environ and not forcex: + cmd = [get_exec_path_host("tmux")] + if new_window: + cmd.append("new-window") + cmd.append("-P") + if name: + cmd.append("-n") + cmd.append(name) + if tmux_target: + cmd.append("-t") + cmd.append(tmux_target) + else: + cmd.append("split-window") + cmd.append("-P") + cmd.append("-h") + if not tmux_target: + tmux_target = os.getenv("TMUX_PANE", "") + if background: + cmd.append("-d") + if tmux_target: + cmd.append("-t") + cmd.append(tmux_target) + + # nscmd is always added as single string argument + if not isinstance(nscmd, str): + nscmd = shlex.join(nscmd) + if title: + nscmd = f"printf '\033]2;{title}\033\\'; {nscmd}" + if channel: + nscmd = f'trap "tmux wait -S {channel}; exit 0" EXIT; {nscmd}' + cmd.append(nscmd) + + elif "STY" in os.environ and not forcex: + # wait for not supported in screen for now + channel = None + cmd = [get_exec_path_host("screen")] + if not os.path.exists( + "/run/screen/S-{}/{}".format(os.environ["USER"], os.environ["STY"]) + ): + # XXX not appropriate for ssh + cmd = ["sudo", "-Eu", os.environ["SUDO_USER"]] + cmd + + if title: + cmd.append("-t") + cmd.append(title) + + if isinstance(nscmd, str): + nscmd = shlex.split(nscmd) + cmd.extend(nscmd) + elif "DISPLAY" in os.environ: + cmd = [get_exec_path_host("xterm")] + if "SUDO_USER" in os.environ: + # Do this b/c making things work as root with xauth seems hard + cmd = [ + get_exec_path_host("sudo"), + "-Eu", + os.environ["SUDO_USER"], + ] + cmd + if title: + cmd.append("-T") + cmd.append(title) + + cmd.append("-e") + if isinstance(nscmd, str): + cmd.extend(shlex.split(nscmd)) + else: + cmd.extend(nscmd) + + # if channel: + # return self.cmd_raises(cmd, skip_pre_cmd=True) + # else: + p = commander.popen( + cmd, + # skip_pre_cmd=True, + stdin=None, + shell=False, + ) + # We should reap the child and report the error then. + time_mod.sleep(2) + if p.poll() is not None: + self.logger.error("%s: Failed to launch xterm: %s", self, comm_error(p)) + return p + else: + self.logger.error( + "DISPLAY, STY, and TMUX not in environment, can't open window" + ) + raise Exception("Window requestd but TMUX, Screen and X11 not available") + + # pane_info = self.cmd_raises(cmd, skip_pre_cmd=True, ns_only=True).strip() + # We are prepending the nsenter command, so use unet.rootcmd + pane_info = commander.cmd_raises(cmd).strip() + + # Re-adjust the layout + if "TMUX" in os.environ: + cmd = [ + get_exec_path_host("tmux"), + "select-layout", + "-t", + pane_info if not tmux_target else tmux_target, + "tiled", + ] + commander.cmd_status(cmd) + + # Wait here if we weren't handed the channel to wait for + if channel and wait_for is True: + cmd = [get_exec_path_host("tmux"), "wait", channel] + # commander.cmd_status(cmd, skip_pre_cmd=True) + commander.cmd_status(cmd) + + return pane_info + + def delete(self): + """Calls self.async_delete within an exec loop.""" + asyncio.run(self.async_delete()) + + async def _async_delete(self): + """Delete this objects resources. + + This is the actual implementation of the resource cleanup, each class + should cleanup it's own resources, generally catching and reporting, + but not reraising any exceptions for it's own cleanup, then it should + invoke `super()._async_delete() without catching any exceptions raised + therein. See other examples in `base.py` or `native.py` + """ + self.logger.info("%s: deleted", self) + + async def async_delete(self): + """Delete the Commander (or derived object). + + The actual implementation for any class should be in `_async_delete` + new derived classes should look at the documentation for that function. + """ + try: + self.deleting = True + await self._async_delete() + except Exception as error: + self.logger.error("%s: error while deleting: %s", self, error) + + +class InterfaceMixin: + """A mixin class to support interface functionality.""" + + def __init__(self, *args, **kwargs): + # del kwargs # get rid of lint + # logging.warning("InterfaceMixin: args: %s kwargs: %s", args, kwargs) + + self._intf_addrs = defaultdict(lambda: [None, None]) + self.net_intfs = {} + self.next_intf_index = 0 + self.basename = "eth" + # self.basename = name + "-eth" + super().__init__(*args, **kwargs) + + @property + def intfs(self): + return sorted(self._intf_addrs.keys()) + + @property + def networks(self): + return sorted(self.net_intfs.keys()) + + def get_intf_addr(self, ifname, ipv6=False): + if ifname not in self._intf_addrs: + return None + return self._intf_addrs[ifname][bool(ipv6)] + + def set_intf_addr(self, ifname, ifaddr): + ifaddr = ipaddress.ip_interface(ifaddr) + self._intf_addrs[ifname][ifaddr.version == 6] = ifaddr + + def net_addr(self, netname, ipv6=False): + if netname not in self.net_intfs: + return None + return self.get_intf_addr(self.net_intfs[netname], ipv6=ipv6) + + def set_intf_basename(self, basename): + self.basename = basename + + def get_next_intf_name(self): + while True: + ifname = self.basename + str(self.next_intf_index) + self.next_intf_index += 1 + if ifname not in self._intf_addrs: + break + return ifname + + def get_ns_ifname(self, ifname): + """Return a namespace unique interface name. + + This function is primarily overriden by L3QemuVM, IOW by any class + that doesn't create it's own network namespace and will share that + with the root (unet) namespace. + + Args: + ifname: the interface name. + + Returns: + A name unique to the namespace of this object. By defualt the assumption + is the ifname is namespace unique. + """ + return ifname + + def register_interface(self, ifname): + if ifname not in self._intf_addrs: + self._intf_addrs[ifname] = [None, None] + + def register_network(self, netname, ifname): + if netname in self.net_intfs: + assert self.net_intfs[netname] == ifname + else: + self.net_intfs[netname] = ifname + + def get_linux_tc_args(self, ifname, config): + """Get interface constraints (jitter, delay, rate) for linux TC. + + The keys and their values are as follows: + + delay (int): number of microseconds + jitter (int): number of microseconds + jitter-correlation (float): % correlation to previous (default 10%) + loss (float): % of loss + loss-correlation (float): % correlation to previous (default 0%) + rate (int or str): bits per second, string allows for use of + {KMGTKiMiGiTi} prefixes "i" means K == 1024 otherwise K == 1000 + """ + del ifname # unused + + netem_args = "" + + def get_number(c, v, d=None): + if v not in c or c[v] is None: + return d + return convert_number(c[v]) + + delay = get_number(config, "delay") + if delay is not None: + netem_args += f" delay {delay}usec" + + jitter = get_number(config, "jitter") + if jitter is not None: + if not delay: + raise ValueError("jitter but no delay specified") + jitter_correlation = get_number(config, "jitter-correlation", 10) + netem_args += f" {jitter}usec {jitter_correlation}%" + + loss = get_number(config, "loss") + if loss is not None: + loss_correlation = get_number(config, "loss-correlation", 0) + if loss_correlation: + netem_args += f" loss {loss}% {loss_correlation}%" + else: + netem_args += f" loss {loss}%" + + if (o_rate := config.get("rate")) is None: + return netem_args, "" + + # + # This comment is not correct, but is trying to talk through/learn the + # machinery. + # + # tokens arrive at `rate` into token buffer. + # limit - number of bytes that can be queued waiting for tokens + # -or- + # latency - maximum amount of time a packet may sit in TBF queue + # + # So this just allows receiving faster than rate for latency amount of + # time, before dropping. + # + # latency = sizeofbucket(limit) / rate (peakrate?) + # + # 32kbit + # -------- = latency = 320ms + # 100kbps + # + # -but then- + # burst ([token] buffer) the largest number of instantaneous + # tokens available (i.e, bucket size). + + tbf_args = "" + DEFLIMIT = 1518 * 1 + DEFBURST = 1518 * 2 + try: + tc_rate = o_rate["rate"] + tc_rate = convert_number(tc_rate) + limit = convert_number(o_rate.get("limit", DEFLIMIT)) + burst = convert_number(o_rate.get("burst", DEFBURST)) + except (KeyError, TypeError): + tc_rate = convert_number(o_rate) + limit = convert_number(DEFLIMIT) + burst = convert_number(DEFBURST) + tbf_args += f" rate {tc_rate/1000}kbit" + if delay: + # give an extra 1/10 of buffer space to handle delay + tbf_args += f" limit {limit} burst {burst}" + else: + tbf_args += f" limit {limit} burst {burst}" + + return netem_args, tbf_args + + def set_intf_constraints(self, ifname, **constraints): + """Set interface outbound constraints. + + Set outbound constraints (jitter, delay, rate) for an interface. All arguments + may also be passed as a string and will be converted to numerical format. All + arguments are also optional. If not specified then that existing constraint will + be cleared. + + Args: + ifname: the name of the interface + delay (int): number of microseconds. + jitter (int): number of microseconds. + jitter-correlation (float): Percent correlation to previous (default 10%). + loss (float): Percent of loss. + loss-correlation (float): Percent correlation to previous (default 25%). + rate (int): bits per second, string allows for use of + {KMGTKiMiGiTi} prefixes "i" means K == 1024 otherwise K == 1000. + """ + nsifname = self.get_ns_ifname(ifname) + netem_args, tbf_args = self.get_linux_tc_args(nsifname, constraints) + count = 1 + selector = f"root handle {count}:" + if netem_args: + self.cmd_raises( + f"tc qdisc add dev {nsifname} {selector} netem {netem_args}" + ) + count += 1 + selector = f"parent {count-1}: handle {count}" + # Place rate limit after delay otherwise limit/burst too complex + if tbf_args: + self.cmd_raises(f"tc qdisc add dev {nsifname} {selector} tbf {tbf_args}") + + self.cmd_raises(f"tc qdisc show dev {nsifname}") + + +class LinuxNamespace(Commander, InterfaceMixin): + """A linux Namespace. + + An object that creates and executes commands in a linux namespace + """ + + def __init__( + self, + name, + net=True, + mount=True, + uts=True, + cgroup=False, + ipc=False, + pid=False, + time=False, + user=False, + unshare_inline=False, + set_hostname=True, + private_mounts=None, + **kwargs, + ): + """Create a new linux namespace. + + Args: + name: Internal name for the namespace. + net: Create network namespace. + mount: Create network namespace. + uts: Create UTS (hostname) namespace. + cgroup: Create cgroup namespace. + ipc: Create IPC namespace. + pid: Create PID namespace, also mounts new /proc. + time: Create time namespace. + user: Create user namespace, also keeps capabilities. + set_hostname: Set the hostname to `name`, uts must also be True. + private_mounts: List of strings of the form + "[/external/path:]/internal/path. If no external path is specified a + tmpfs is mounted on the internal path. Any paths specified are first + passed to `mkdir -p`. + unshare_inline: Unshare the process itself rather than using a proxy. + logger: Passed to superclass. + """ + # logging.warning("LinuxNamespace: name %s kwargs %s", name, kwargs) + + super().__init__(name, **kwargs) + + unet = self.unet + + self.logger.debug("%s: creating", self) + + self.cwd = os.path.abspath(os.getcwd()) + + self.nsflags = [] + self.ifnetns = {} + self.uflags = 0 + self.p_ns_fds = None + self.p_ns_fnames = None + self.pid_ns = False + self.init_pid = None + self.unshare_inline = unshare_inline + self.nsenter_fork = True + + # + # Collect the namespaces to unshare + # + if hasattr(self, "proc_path") and self.proc_path: # pylint: disable=no-member + pp = Path(self.proc_path) # pylint: disable=no-member + else: + pp = unet.proc_path if unet else Path("/proc") + pp = pp.joinpath("%P%", "ns") + + flags = "" + uflags = 0 + nslist = [] + nsflags = [] + if cgroup: + nselm = "cgroup" + nslist.append(nselm) + nsflags.append(f"--{nselm}={pp / nselm}") + flags += "C" + uflags |= linux.CLONE_NEWCGROUP + if ipc: + nselm = "ipc" + nslist.append(nselm) + nsflags.append(f"--{nselm}={pp / nselm}") + flags += "i" + uflags |= linux.CLONE_NEWIPC + if mount or pid: + # We need a new mount namespace for pid + nselm = "mnt" + nslist.append(nselm) + nsflags.append(f"--mount={pp / nselm}") + mount = True + flags += "m" + uflags |= linux.CLONE_NEWNS + if net: + nselm = "net" + nslist.append(nselm) + nsflags.append(f"--{nselm}={pp / nselm}") + # if pid: + # os.system(f"touch /tmp/netns-{name}") + # cmd.append(f"--net=/tmp/netns-{name}") + # else: + flags += "n" + uflags |= linux.CLONE_NEWNET + if pid: + self.pid_ns = True + # We look for this b/c the unshare pid will share with /sibn/init + nselm = "pid_for_children" + nslist.append(nselm) + nsflags.append(f"--pid={pp / nselm}") + flags += "p" + uflags |= linux.CLONE_NEWPID + if time: + nselm = "time" + # XXX time_for_children? + nslist.append(nselm) + nsflags.append(f"--{nselm}={pp / nselm}") + flags += "T" + uflags |= linux.CLONE_NEWTIME + if user: + nselm = "user" + nslist.append(nselm) + nsflags.append(f"--{nselm}={pp / nselm}") + flags += "U" + uflags |= linux.CLONE_NEWUSER + if uts: + nselm = "uts" + nslist.append(nselm) + nsflags.append(f"--{nselm}={pp / nselm}") + flags += "u" + uflags |= linux.CLONE_NEWUTS + + assert flags, "LinuxNamespace with no namespaces requested" + + # Should look path up using resources maybe... + mutini_path = get_our_script_path("mutini") + if not mutini_path: + mutini_path = get_our_script_path("mutini.py") + assert mutini_path + cmd = [mutini_path, f"--unshare-flags={flags}", "-v"] + fname = fsafe_name(self.name) + "-mutini.log" + fname = (unet or self).rundir.joinpath(fname) + stdout = open(fname, "w", encoding="utf-8") + stderr = subprocess.STDOUT + + # + # Save the current namespace info to compare against later + # + + if not unet: + nsdict = {x: os.readlink(f"/proc/self/ns/{x}") for x in nslist} + else: + nsdict = { + x: os.readlink(f"{unet.proc_path}/{unet.pid}/ns/{x}") for x in nslist + } + + # + # (A) Basically we need to save the pid of the unshare call for nsenter. + # + # For `unet is not None` (node case) the level this exists at is based on wether + # unet is using a forking nsenter or not. So if unet.nsenter_fork == True then + # we need the child pid of the p.pid (child of pid returned to us), otherwise + # unet.nsenter_fork == False and we just use p.pid as it will be unshare after + # nsenter exec's it. + # + # For the `unet is None` (unet case) the unshare is at the top level or + # non-existent so we always save the returned p.pid. If we are unshare_inline we + # won't have a __pre_cmd but we can save our child_pid to kill later, otherwise + # we set unet.pid to None b/c there's literally nothing to do. + # + # --------------------------------------------------------------------------- + # Breakdown for nested (non-unet) namespace creation, and what PID + # to use for __pre_cmd nsenter use. + # --------------------------------------------------------------------------- + # + # tl;dr + # - for non-inline unshare: Use BBB with pid_for_children, unless none/none + # #then (AAA) returned + # - for inline unshare: use returned pid (AAA) with pid_for_children + # + # All commands use unet.popen to launch the unshare of mutini or cat. + # mutini for PID unshare, otherwise cat. AAA is the returned pid BBB is the + # child of the returned. + # + # Unshare Variant + # --------------- + # + # Here we are running mutini if we are creating new pid namespace workspace, + # cat otherwise. + # + # [PID+PID] pid tree looks like this: + # + # PID NSPID PPID PGID + # uuu - N/A uuu main unet process + # AAA - uuu AAA nsenter (forking, from unet) (in unet namespaces -pid) + # BBB - AAA AAA unshare --fork --kill-child (forking) + # CCC 1 BBB CCC mutini (non-forking since it is pid 1 in new namespace) + # + # Use BBB if we use pid_for_children, CCC for all + # + # [PID+none] For non-pid workspace creation (but unet pid) we use cat and pid + # tree looks like this: + # + # PID PPID PGID + # uuu N/A uuu main unet process + # AAA uuu AAA nsenter (forking) (in unet namespaces -pid) + # BBB AAA AAA unshare -> cat (from unshare non-forking) + # + # Use BBB for all + # + # [none+PID] For pid workspace creation (but NOT unet pid) we use mutini and pid + # tree looks like this: + # + # PID NSPID PPID PGID + # uuu - N/A uuu main unet process + # AAA - uuu AAA nsenter -> unshare --fork --kill-child + # BBB 1 AAA AAA mutini (non-forking since it is pid 1 in new namespace) + # + # Use AAA if we use pid_for_children, BBB for all + # + # [none+none] For non-pid workspace and non-pid unet we use cat and pid tree + # looks like this: + # + # PID PPID PGID + # uuu N/A uuu main unet process + # AAA uuu AAA nsenter -> unshare -> cat + # + # Use AAA for all, there's no BBB + # + # Inline-Unshare Variant + # ---------------------- + # + # For unshare_inline and new PID namespace we have unshared all but our PID + # namespace, but our children end up in the new namespace so the fork popen + # does is good enough. + # + # [PID+PID] pid tree looks like this: + # + # PID NSPID PPID PGID + # uuu - N/A uuu main unet process + # AAA - uuu AAA unshare --fork --kill-child (forking) + # BBB 1 AAA BBB mutini + # + # Use AAA if we use pid_for_children, BBB for all + # + # [PID+none] For non-pid workspace creation (but unet pid) we use cat and pid + # tree looks like this: + # + # PID PPID PGID + # uuu N/A uuu main unet process + # AAA uuu AAA unshare -> cat + # + # Use AAA for all + # + # [none+PID] For pid workspace creation (but NOT unet pid) we use mutini and pid + # tree looks like this: + # + # PID NSPID PPID PGID + # uuu - N/A uuu main unet process + # AAA - uuu AAA unshare --fork --kill-child + # BBB 1 AAA BBB mutini + # + # Use AAA if we use pid_for_children, BBB for all + # + # [none+none] For non-pid workspace and non-pid unet we use cat and pid tree + # looks like this: + # + # PID PPID PGID + # uuu N/A uuu main unet process + # AAA uuu AAA unshare -> cat + # + # Use AAA for all. + # + # + # --------------------------------------------------------------------------- + # Breakdown for unet namespace creation, and what PID to use for __pre_cmd + # --------------------------------------------------------------------------- + # + # tl;dr: save returned PID or nothing. + # - for non-inline unshare: Use AAA with pid_for_children (returned pid) + # - for inline unshare: no __precmd as the fork in popen is enough. + # + # Use commander to launch the unshare mutini/cat (for PID/none + # workspace PID) for non-inline case. AAA is the returned pid BBB is the child + # of the returned. + # + # Unshare Variant + # --------------- + # + # Here we are running mutini if we are creating new pid namespace workspace, + # cat otherwise. + # + # [PID] for unet pid creation pid tree looks like this: + # + # PID NSPID PPID PGID + # uuu - N/A uuu main unet process + # AAA - uuu AAA unshare --fork --kill-child (forking) + # BBB 1 AAA BBB mutini + # + # Use AAA if we use pid_for_children, BBB for all + # + # [none] for unet non-pid, pid tree looks like this: + # + # PID PPID PGID + # uuu N/A uuu main unet process + # AAA uuu AAA unshare -> cat + # + # Use AAA for all + # + # Inline-Unshare Variant + # ----------------------- + # + # For unshare_inline and new PID namespace we have unshared all but our PID + # namespace, but our children end up in the new namespace so the fork in popen + # does is good enough. + # + # [PID] for unet pid creation pid tree looks like this: + # + # PID NSPID PPID PGID + # uuu - N/A uuu main unet process + # AAA 1 uuu AAA mutini + # + # Save p / p.pid, but don't configure any nsenter, uneeded. + # + # Use nothing as the fork when doing a popen is enough to be in all the right + # namepsaces. + # + # [none] for unet non-pid, pid tree looks like this: + # + # PID PPID PGID + # uuu N/A uuu main unet process + # + # Nothing, no __pre_cmd. + # + # + + self.ppid = os.getppid() + self.unshare_inline = unshare_inline + if unshare_inline: + assert unet is None + self.uflags = uflags + # + # Open file descriptors for current namespaces for later resotration. + # + try: + kversion = [int(x) for x in platform.release().split("-")[0].split(".")] + kvok = kversion[0] > 5 or (kversion[0] == 5 and kversion[1] >= 8) + except ValueError: + kvok = False + if ( + not kvok + or sys.version_info[0] < 3 + or (sys.version_info[0] == 3 and sys.version_info[1] < 9) + ): + # get list of namespace file descriptors before we unshare + self.p_ns_fds = [] + self.p_ns_fnames = [] + tmpflags = uflags + for i in range(0, 64): + v = 1 << i + if (tmpflags & v) == 0: + continue + tmpflags &= ~v + if v in linux.namespace_files: + path = os.path.join("/proc/self", linux.namespace_files[v]) + if os.path.exists(path): + self.p_ns_fds.append(os.open(path, 0)) + self.p_ns_fnames.append(f"{path} -> {os.readlink(path)}") + self.logger.debug( + "%s: saving old namespace fd %s (%s)", + self, + self.p_ns_fnames[-1], + self.p_ns_fds[-1], + ) + if not tmpflags: + break + else: + self.p_ns_fds = None + self.p_ns_fnames = None + self.ppid_fd = linux.pidfd_open(self.ppid) + + self.logger.debug( + "%s: unshare to new namespaces %s", + self, + linux.clone_flag_string(uflags), + ) + + linux.unshare(uflags) + + if not pid: + p = None + self.pid = None + self.nsenter_fork = False + else: + # Need to fork to create the PID namespace, but we need to continue + # running from the parent so that things like pytest work. We'll execute + # a mutini process to manage the child init 1 duties. + # + # We (the parent pid) can no longer create threads, due to that being + # restricted by the kernel. See EINVAL in clone(2). + # + p = commander.popen( + [mutini_path, "-v"], + stdin=subprocess.PIPE, + stdout=stdout, + stderr=stderr, + text=True, + # new session/pgid so signals don't propagate + start_new_session=True, + shell=False, + ) + self.pid = p.pid + self.nsenter_fork = False + else: + # Using cat and a stdin PIPE is nice as it will exit when we do. However, + # we also detach it from the pgid so that signals do not propagate to it. + # This is b/c it would exit early (e.g., ^C) then, at least the main munet + # proc which has no other processes like frr daemons running, will take the + # main network namespace with it, which will remove the bridges and the + # veth pair (because the bridge side veth is deleted). + self.logger.debug("%s: creating namespace process: %s", self, cmd) + + # Use the parent unet process if we have one this will cause us to inherit + # the namespaces correctly even in the non-inline case. + parent = self.unet if self.unet else commander + + p = parent.popen( + cmd, + stdin=subprocess.PIPE, + stdout=stdout, + stderr=stderr, + text=True, + start_new_session=not unet, + shell=False, + ) + + # The pid number returned is in the global pid namespace. For unshare_inline + # this can be unfortunate b/c our /proc has been remounted in our new pid + # namespace and won't contain global pid namespace pids. To solve for this + # we get all the pid values for the process below. + + # See (A) above for when we need the child pid. + self.logger.debug("%s: namespace process: %s", self, proc_str(p)) + self.pid = p.pid + if unet and unet.nsenter_fork: + assert not unet.unshare_inline + # Need child pid of p.pid + pgrep = unet.rootcmd.get_exec_path("pgrep") + # a sing fork was done + child_pid = unet.rootcmd.cmd_raises([pgrep, "-o", "-P", str(p.pid)]) + self.pid = int(child_pid.strip()) + self.logger.debug("%s: child of namespace process: %s", self, pid) + + self.p = p + + # Let's always have a valid value. + if self.pid is None: + self.pid = our_pid + + # + # Let's find all our pids in the nested PID namespaces + # + if unet: + proc_path = unet.proc_path + else: + proc_path = self.proc_path if hasattr(self, "proc_path") else "/proc" + proc_path = f"{proc_path}/{self.pid}" + + pid_status = open(f"{proc_path}/status", "r", encoding="ascii").read() + m = re.search(r"\nNSpid:((?:\t[0-9]+)+)\n", pid_status) + self.pids = [int(x) for x in m.group(1).strip().split("\t")] + assert self.pids[0] == self.pid + + self.logger.debug("%s: namespace scoped pids: %s", self, self.pids) + + # ----------------------------------------------- + # Now let's wait until unshare completes it's job + # ----------------------------------------------- + timeout = Timeout(30) + if self.pid is not None and self.pid != our_pid: + while (not p or not p.poll()) and not timeout.is_expired(): + # check new namespace values against old (nsdict), unshare + # can actually take a bit to complete. + for fname in tuple(nslist): + # self.pid will be the global pid b/c we didn't unshare_inline + nspath = f"{proc_path}/ns/{fname}" + try: + nsf = os.readlink(nspath) + except OSError as error: + self.logger.debug( + "unswitched: error (ok) checking %s: %s", nspath, error + ) + continue + if nsdict[fname] != nsf: + self.logger.debug( + "switched: original %s current %s", nsdict[fname], nsf + ) + nslist.remove(fname) + elif unshare_inline: + logging.warning( + "unshare_inline not unshared %s == %s", nsdict[fname], nsf + ) + else: + self.logger.debug( + "unswitched: current %s elapsed: %s", nsf, timeout.elapsed() + ) + if not nslist: + self.logger.debug( + "all done waiting for unshare after %s", timeout.elapsed() + ) + break + + elapsed = int(timeout.elapsed()) + if elapsed <= 3: + time_mod.sleep(0.1) + else: + self.logger.info( + "%s: unshare taking more than %ss: %s", self, elapsed, nslist + ) + time_mod.sleep(1) + + if p is not None and p.poll(): + self.logger.error("%s: namespace process failed: %s", self, comm_error(p)) + assert p.poll() is None, "unshare failed" + + # + # Setup the pre-command to enter the target namespace from the running munet + # process using self.pid + # + + if pid: + nsenter_fork = True + elif unet and unet.nsenter_fork: + # if unet created a pid namespace we need to enter it since we aren't + # entering a child pid namespace we created for the node. Otherwise + # we have a /proc remounted under unet, but our process is running in + # the root pid namepsace + nselm = "pid_for_children" + nsflags.append(f"--pid={pp / nselm}") + nsenter_fork = True + else: + # We dont need a fork. + nsflags.append("-F") + nsenter_fork = False + + # Save nsenter values if running from root namespace + # we need this for the unshare_inline case when run externally (e.g., from + # within tmux server). + root_nsflags = [x.replace("%P%", str(self.pid)) for x in nsflags] + self.__root_base_pre_cmd = ["/usr/bin/nsenter", *root_nsflags] + self.__root_pre_cmd = list(self.__root_base_pre_cmd) + + if unshare_inline: + assert unet is None + # We have nothing to do here since our process is now in the correct + # namespaces and children will inherit from us, even the PID namespace will + # be corrent b/c commands are run by first forking. + self.nsenter_fork = False + self.nsflags = [] + self.__base_pre_cmd = [] + else: + # We will use nsenter + self.nsenter_fork = nsenter_fork + self.nsflags = nsflags + self.__base_pre_cmd = list(self.__root_base_pre_cmd) + + self.__pre_cmd = list(self.__base_pre_cmd) + + # Always mark new mount namespaces as recursive private + if mount: + # if self.p is None and not pid: + self.cmd_raises_nsonly("mount --make-rprivate /") + + # We need to remount the procfs for the new PID namespace, since we aren't using + # unshare(1) which does that for us. + if pid and unshare_inline: + assert mount + self.cmd_raises_nsonly("mount -t proc proc /proc") + + # We do not want cmd_status in child classes (e.g., container) for + # the remaining setup calls in this __init__ function. + + if net: + # Remount /sys to pickup any changes in the network, but keep root + # /sys/fs/cgroup. This pattern could be made generic and supported for any + # overlapping mounts + if mount: + tmpmnt = f"/tmp/cgm-{self.pid}" + self.cmd_status_nsonly( + f"mkdir {tmpmnt} && mount --rbind /sys/fs/cgroup {tmpmnt}" + ) + rc = o = e = None + for i in range(0, 10): + rc, o, e = self.cmd_status_nsonly( + "mount -t sysfs sysfs /sys", warn=False + ) + if not rc: + break + self.logger.debug( + "got error mounting new sysfs will retry: %s", + cmd_error(rc, o, e), + ) + time_mod.sleep(1) + else: + raise Exception(cmd_error(rc, o, e)) + + self.cmd_status_nsonly( + f"mount --move {tmpmnt} /sys/fs/cgroup && rmdir {tmpmnt}" + ) + + # Original micronet code + # self.cmd_raises_nsonly("mount -t sysfs sysfs /sys") + # self.cmd_raises_nsonly( + # "mount -o rw,nosuid,nodev,noexec,relatime " + # "-t cgroup2 cgroup /sys/fs/cgroup" + # ) + + # Set the hostname to the namespace name + if uts and set_hostname: + self.cmd_status_nsonly("hostname " + self.name) + nroot = subprocess.check_output("hostname") + if unshare_inline or (unet and unet.unshare_inline): + assert ( + root_hostname != nroot + ), f'hostname unchanged from "{nroot}" wanted "{self.name}"' + else: + # Assert that we didn't just change the host hostname + assert ( + root_hostname == nroot + ), f'root hostname "{root_hostname}" changed to "{nroot}"!' + + if private_mounts: + if isinstance(private_mounts, str): + private_mounts = [private_mounts] + for m in private_mounts: + s = m.split(":", 1) + if len(s) == 1: + self.tmpfs_mount(s[0]) + else: + self.bind_mount(s[0], s[1]) + + # this will fail if running inside the namespace with PID + if pid: + o = self.cmd_nostatus_nsonly("ls -l /proc/1/ns") + else: + o = self.cmd_nostatus_nsonly("ls -l /proc/self/ns") + + self.logger.debug("namespaces:\n %s", o) + + # will cache the path, which is important in delete to avoid running a shell + # which can hang during cleanup + self.ip_path = get_exec_path_host("ip") + if net: + self.cmd_status_nsonly([self.ip_path, "link", "set", "lo", "up"]) + + self.logger.info("%s: created", self) + + def _get_pre_cmd(self, use_str, use_pty, ns_only=False, root_level=False, **kwargs): + """Get the pre-user-command values. + + The values returned here should be what is required to cause the user's command + to execute in the correct context (e.g., namespace, container, sshremote). + """ + del kwargs + del ns_only + del use_pty + pre_cmd = self.__root_pre_cmd if root_level else self.__pre_cmd + return shlex.join(pre_cmd) if use_str else list(pre_cmd) + + def tmpfs_mount(self, inner): + self.logger.debug("Mounting tmpfs on %s", inner) + self.cmd_raises("mkdir -p " + inner) + self.cmd_raises("mount -n -t tmpfs tmpfs " + inner) + + def bind_mount(self, outer, inner): + self.logger.debug("Bind mounting %s on %s", outer, inner) + if commander.test("-f", outer): + self.cmd_raises(f"mkdir -p {os.path.dirname(inner)} && touch {inner}") + else: + if not commander.test("-e", outer): + commander.cmd_raises_nsonly(f"mkdir -p {outer}") + self.cmd_raises(f"mkdir -p {inner}") + self.cmd_raises("mount --rbind {} {} ".format(outer, inner)) + + def add_netns(self, ns): + self.logger.debug("Adding network namespace %s", ns) + + if os.path.exists("/run/netns/{}".format(ns)): + self.logger.warning("%s: Removing existing nsspace %s", self, ns) + try: + self.delete_netns(ns) + except Exception as ex: + self.logger.warning( + "%s: Couldn't remove existing nsspace %s: %s", + self, + ns, + str(ex), + exc_info=True, + ) + self.cmd_raises_nsonly([self.ip_path, "netns", "add", ns]) + + def delete_netns(self, ns): + self.logger.debug("Deleting network namespace %s", ns) + self.cmd_raises_nsonly([self.ip_path, "netns", "delete", ns]) + + def set_intf_netns(self, intf, ns, up=False): + # In case a user hard-codes 1 thinking it "resets" + ns = str(ns) + if ns == "1": + ns = str(self.pid) + + self.logger.debug("Moving interface %s to namespace %s", intf, ns) + + cmd = [self.ip_path, "link", "set", intf, "netns", ns] + if up: + cmd.append("up") + self.intf_ip_cmd(intf, cmd) + if ns == str(self.pid): + # If we are returning then remove from dict + if intf in self.ifnetns: + del self.ifnetns[intf] + else: + self.ifnetns[intf] = ns + + def reset_intf_netns(self, intf): + self.logger.debug("Moving interface %s to default namespace", intf) + self.set_intf_netns(intf, str(self.pid)) + + def intf_ip_cmd(self, intf, cmd): + """Run an ip command, considering an interface's possible namespace.""" + if intf in self.ifnetns: + if isinstance(cmd, list): + assert cmd[0].endswith("ip") + cmd[1:1] = ["-n", self.ifnetns[intf]] + else: + assert cmd.startswith("ip ") + cmd = "ip -n " + self.ifnetns[intf] + cmd[2:] + self.cmd_raises_nsonly(cmd) + + def intf_tc_cmd(self, intf, cmd): + """Run a tc command, considering an interface's possible namespace.""" + if intf in self.ifnetns: + if isinstance(cmd, list): + assert cmd[0].endswith("tc") + cmd[1:1] = ["-n", self.ifnetns[intf]] + else: + assert cmd.startswith("tc ") + cmd = "tc -n " + self.ifnetns[intf] + cmd[2:] + self.cmd_raises_nsonly(cmd) + + def set_ns_cwd(self, cwd: Union[str, Path]): + """Common code for changing pre_cmd and pre_nscmd.""" + self.logger.debug("%s: new CWD %s", self, cwd) + self.__root_pre_cmd = self.__root_base_pre_cmd + ["--wd=" + str(cwd)] + if self.__pre_cmd: + self.__pre_cmd = self.__base_pre_cmd + ["--wd=" + str(cwd)] + elif self.unshare_inline: + os.chdir(cwd) + + async def _async_delete(self): + if type(self) == LinuxNamespace: # pylint: disable=C0123 + self.logger.info("%s: deleting", self) + else: + self.logger.debug("%s: LinuxNamespace sub-class deleting", self) + + # Signal pid namespace proc to exit + if ( + (self.p is None or self.p.pid != self.pid) + and self.pid + and self.pid != our_pid + ): + self.logger.debug( + "cleanup pid on separate pid %s from proc pid %s", + self.pid, + self.p.pid if self.p else None, + ) + await self.cleanup_pid(self.pid) + + if self.p is not None: + self.logger.debug("cleanup proc pid %s", self.p.pid) + await self.async_cleanup_proc(self.p) + + # return to the previous namespace, need to do this in case anothe munet + # is being created, especially when it plans to inherit the parent's (host) + # namespace. + if self.uflags: + logging.info("restoring from inline unshare: cwd: %s", os.getcwd()) + # This only works in linux>=5.8 + if self.p_ns_fds is None: + self.logger.debug( + "%s: restoring namespaces %s", + self, + linux.clone_flag_string(self.uflags), + ) + # fd = linux.pidfd_open(self.ppid) + fd = self.ppid_fd + retry = 3 + for i in range(0, retry): + try: + linux.setns(fd, self.uflags) + except OSError as error: + self.logger.warning( + "%s: could not reset to old namespace fd %s: %s", + self, + fd, + error, + ) + if i == retry - 1: + raise + time_mod.sleep(1) + os.close(fd) + else: + while self.p_ns_fds: + fd = self.p_ns_fds.pop() + fname = self.p_ns_fnames.pop() + self.logger.debug( + "%s: restoring namespace from fd %s (%s)", self, fname, fd + ) + retry = 3 + for i in range(0, retry): + try: + linux.setns(fd, 0) + break + except OSError as error: + self.logger.warning( + "%s: could not reset to old namespace fd %s (%s): %s", + self, + fname, + fd, + error, + ) + if i == retry - 1: + raise + time_mod.sleep(1) + os.close(fd) + self.p_ns_fds = None + self.p_ns_fnames = None + logging.info("restored from unshare: cwd: %s", os.getcwd()) + + self.__root_base_pre_cmd = ["/bin/false"] + self.__base_pre_cmd = ["/bin/false"] + self.__root_pre_cmd = ["/bin/false"] + self.__pre_cmd = ["/bin/false"] + + await super()._async_delete() + + +class SharedNamespace(Commander): + """Share another namespace. + + An object that executes commands in an existing pid's linux namespace + """ + + def __init__(self, name, pid=None, nsflags=None, **kwargs): + """Share a linux namespace. + + Args: + name: Internal name for the namespace. + pid: PID of the process to share with. + nsflags: nsenter flags to pass to inherit namespaces from + """ + super().__init__(name, **kwargs) + + self.logger.debug("%s: Creating", self) + + self.cwd = os.path.abspath(os.getcwd()) + self.pid = pid if pid is not None else our_pid + + nsflags = (x.replace("%P%", str(self.pid)) for x in nsflags) if nsflags else [] + self.__base_pre_cmd = ["/usr/bin/nsenter", *nsflags] if nsflags else [] + self.__pre_cmd = self.__base_pre_cmd + self.ip_path = self.get_exec_path("ip") + + def _get_pre_cmd(self, use_str, use_pty, ns_only=False, root_level=False, **kwargs): + """Get the pre-user-command values. + + The values returned here should be what is required to cause the user's command + to execute in the correct context (e.g., namespace, container, sshremote). + """ + del kwargs + del ns_only + del use_pty + assert not root_level + return shlex.join(self.__pre_cmd) if use_str else list(self.__pre_cmd) + + def set_ns_cwd(self, cwd: Union[str, Path]): + """Common code for changing pre_cmd and pre_nscmd.""" + self.logger.debug("%s: new CWD %s", self, cwd) + self.__pre_cmd = self.__base_pre_cmd + ["--wd=" + str(cwd)] + + +class Bridge(SharedNamespace, InterfaceMixin): + """A linux bridge.""" + + next_ord = 1 + + @classmethod + def _get_next_id(cls): + # Do not use `cls` here b/c that makes the variable class specific + n = Bridge.next_ord + Bridge.next_ord = n + 1 + return n + + def __init__(self, name=None, mtu=None, unet=None, **kwargs): + """Create a linux Bridge.""" + self.id = self._get_next_id() + if not name: + name = "br{}".format(self.id) + + unet_pid = our_pid if unet.pid is None else unet.pid + + super().__init__(name, pid=unet_pid, nsflags=unet.nsflags, unet=unet, **kwargs) + + self.set_intf_basename(self.name + "-e") + + self.mtu = mtu + + self.logger.debug("Bridge: Creating") + + assert len(self.name) <= 16 # Make sure fits in IFNAMSIZE + self.cmd_raises(f"ip link delete {name} || true") + self.cmd_raises(f"ip link add {name} type bridge") + if self.mtu: + self.cmd_raises(f"ip link set {name} mtu {self.mtu}") + self.cmd_raises(f"ip link set {name} up") + + self.logger.debug("%s: Created, Running", self) + + def get_ifname(self, netname): + return self.net_intfs[netname] if netname in self.net_intfs else None + + async def _async_delete(self): + """Stop the bridge (i.e., delete the linux resources).""" + if type(self) == Bridge: # pylint: disable=C0123 + self.logger.info("%s: deleting", self) + else: + self.logger.debug("%s: Bridge sub-class deleting", self) + + rc, o, e = await self.async_cmd_status( + [self.ip_path, "link", "show", self.name], + stdin=subprocess.DEVNULL, + start_new_session=True, + warn=False, + ) + if not rc: + rc, o, e = await self.async_cmd_status( + [self.ip_path, "link", "delete", self.name], + stdin=subprocess.DEVNULL, + start_new_session=True, + warn=False, + ) + if rc: + self.logger.error( + "%s: error deleting bridge %s: %s", + self, + self.name, + cmd_error(rc, o, e), + ) + await super()._async_delete() + + +class BaseMunet(LinuxNamespace): + """Munet.""" + + def __init__( + self, + name="munet", + isolated=True, + pid=True, + rundir=None, + pytestconfig=None, + **kwargs, + ): + """Create a Munet.""" + # logging.warning("BaseMunet: %s", name) + + self.hosts = {} + self.switches = {} + self.links = {} + self.macs = {} + self.rmacs = {} + self.isolated = isolated + + self.cli_server = None + self.cli_sockpath = None + self.cli_histfile = None + self.cli_in_window_cmds = {} + self.cli_run_cmds = {} + + # + # We need a directory for various files + # + if not rundir: + rundir = "/tmp/munet" + self.rundir = Path(rundir) + + # + # Always having a global /proc is required to keep things from exploding + # complexity with nested new pid namespaces.. + # + if pid: + self.proc_path = Path(tempfile.mkdtemp(suffix="-proc", prefix="mu-")) + logging.debug("%s: mounting /proc on proc_path %s", name, self.proc_path) + linux.mount("proc", str(self.proc_path), "proc") + else: + self.proc_path = Path("/proc") + + # + # Now create a root level commander that works regardless of whether we inline + # unshare or not. Save it in the global variable as well + # + + if not self.isolated: + self.rootcmd = commander + elif not pid: + nsflags = ( + f"--mount={self.proc_path / '1/ns/mnt'}", + f"--net={self.proc_path / '1/ns/net'}", + f"--uts={self.proc_path / '1/ns/uts'}", + # f"--ipc={self.proc_path / '1/ns/ipc'}", + # f"--time={self.proc_path / '1/ns/time'}", + # f"--cgroup={self.proc_path / '1/ns/cgroup'}", + ) + self.rootcmd = SharedNamespace("root", pid=1, nsflags=nsflags) + else: + # XXX user + nsflags = ( + # XXX Backing up PID namespace just doesn't work. + # f"--pid={self.proc_path / '1/ns/pid_for_children'}", + f"--mount={self.proc_path / '1/ns/mnt'}", + f"--net={self.proc_path / '1/ns/net'}", + f"--uts={self.proc_path / '1/ns/uts'}", + # f"--ipc={self.proc_path / '1/ns/ipc'}", + # f"--time={self.proc_path / '1/ns/time'}", + # f"--cgroup={self.proc_path / '1/ns/cgroup'}", + ) + self.rootcmd = SharedNamespace("root", pid=1, nsflags=nsflags) + global roothost # pylint: disable=global-statement + + roothost = self.rootcmd + + self.cfgopt = munet_config.ConfigOptionsProxy(pytestconfig) + + super().__init__( + name, mount=True, net=isolated, uts=isolated, pid=pid, unet=None, **kwargs + ) + + # This allows us to cleanup any leftover running munet's + if "MUNET_PID" in os.environ: + if os.environ["MUNET_PID"] != str(our_pid): + logging.error( + "Found env MUNET_PID != our pid %s, instead its %s, changing", + our_pid, + os.environ["MUNET_PID"], + ) + os.environ["MUNET_PID"] = str(our_pid) + + # this is for testing purposes do not use + if not BaseMunet.g_unet: + BaseMunet.g_unet = self + + self.logger.debug("%s: Creating", self) + + def __getitem__(self, key): + if key in self.switches: + return self.switches[key] + return self.hosts[key] + + def add_host(self, name, cls=LinuxNamespace, **kwargs): + """Add a host to munet.""" + self.logger.debug("%s: add_host %s(%s)", self, cls.__name__, name) + + self.hosts[name] = cls(name, unet=self, **kwargs) + + # Create a new mounted FS for tracking nested network namespaces creatd by the + # user with `ip netns add` + + # XXX why is this failing with podman??? + # self.hosts[name].tmpfs_mount("/run/netns") + + return self.hosts[name] + + def add_link(self, node1, node2, if1, if2, mtu=None, **intf_constraints): + """Add a link between switch and node or 2 nodes. + + If constraints are given they are applied to each endpoint. See + `InterfaceMixin::set_intf_constraints()` for more info. + """ + isp2p = False + + try: + name1 = node1.name + except AttributeError: + if node1 in self.switches: + node1 = self.switches[node1] + else: + node1 = self.hosts[node1] + name1 = node1.name + + try: + name2 = node2.name + except AttributeError: + if node2 in self.switches: + node2 = self.switches[node2] + else: + node2 = self.hosts[node2] + name2 = node2.name + + if name1 in self.switches: + assert name2 in self.hosts + elif name2 in self.switches: + assert name1 in self.hosts + name1, name2 = name2, name1 + if1, if2 = if2, if1 + else: + # p2p link + assert name1 in self.hosts + assert name2 in self.hosts + isp2p = True + + lname = "{}:{}-{}:{}".format(name1, if1, name2, if2) + self.logger.debug("%s: add_link %s%s", self, lname, " p2p" if isp2p else "") + self.links[lname] = (name1, if1, name2, if2) + + # And create the veth now. + if isp2p: + lhost, rhost = self.hosts[name1], self.hosts[name2] + lifname = "i1{:x}".format(lhost.pid) + + # Done at root level + nsif1 = lhost.get_ns_ifname(if1) + nsif2 = rhost.get_ns_ifname(if2) + + # Use pids[-1] to get the unet scoped pid for hosts + self.cmd_raises_nsonly( + f"ip link add {lifname} type veth peer name {nsif2}" + f" netns {rhost.pids[-1]}" + ) + self.cmd_raises_nsonly(f"ip link set {lifname} netns {lhost.pids[-1]}") + + lhost.cmd_raises_nsonly("ip link set {} name {}".format(lifname, nsif1)) + if mtu: + lhost.cmd_raises_nsonly("ip link set {} mtu {}".format(nsif1, mtu)) + lhost.cmd_raises_nsonly("ip link set {} up".format(nsif1)) + lhost.register_interface(if1) + + if mtu: + rhost.cmd_raises_nsonly("ip link set {} mtu {}".format(nsif2, mtu)) + rhost.cmd_raises_nsonly("ip link set {} up".format(nsif2)) + rhost.register_interface(if2) + else: + switch = self.switches[name1] + rhost = self.hosts[name2] + + nsif1 = switch.get_ns_ifname(if1) + nsif2 = rhost.get_ns_ifname(if2) + + if mtu is None: + mtu = switch.mtu + + if len(nsif1) > 16: + self.logger.error('"%s" len %s > 16', nsif1, len(nsif1)) + elif len(nsif2) > 16: + self.logger.error('"%s" len %s > 16', nsif2, len(nsif2)) + assert len(nsif1) <= 16 and len(nsif2) <= 16 # Make sure fits in IFNAMSIZE + + self.logger.debug("%s: Creating veth pair for link %s", self, lname) + + # Use pids[-1] to get the unet scoped pid for hosts + # switch is already in our namespace so nothing to convert. + self.cmd_raises_nsonly( + f"ip link add {nsif1} type veth peer name {nsif2}" + f" netns {rhost.pids[-1]}" + ) + + if mtu: + # if switch.mtu: + # # the switch interface should match the switch config + # switch.cmd_raises_nsonly( + # "ip link set {} mtu {}".format(if1, switch.mtu) + # ) + switch.cmd_raises_nsonly("ip link set {} mtu {}".format(nsif1, mtu)) + rhost.cmd_raises_nsonly("ip link set {} mtu {}".format(nsif2, mtu)) + + switch.register_interface(if1) + rhost.register_interface(if2) + rhost.register_network(switch.name, if2) + + switch.cmd_raises_nsonly(f"ip link set {nsif1} master {switch.name}") + + switch.cmd_raises_nsonly(f"ip link set {nsif1} up") + rhost.cmd_raises_nsonly(f"ip link set {nsif2} up") + + # Cache the MAC values, and reverse mapping + self.get_mac(name1, nsif1) + self.get_mac(name2, nsif2) + + # Setup interface constraints if provided + if intf_constraints: + node1.set_intf_constraints(if1, **intf_constraints) + node2.set_intf_constraints(if2, **intf_constraints) + + def add_switch(self, name, cls=Bridge, **kwargs): + """Add a switch to munet.""" + self.logger.debug("%s: add_switch %s(%s)", self, cls.__name__, name) + self.switches[name] = cls(name, unet=self, **kwargs) + return self.switches[name] + + def get_mac(self, name, ifname): + if name in self.hosts: + dev = self.hosts[name] + else: + dev = self.switches[name] + + nsifname = self.get_ns_ifname(ifname) + + if (name, ifname) not in self.macs: + _, output, _ = dev.cmd_status_nsonly("ip -o link show " + nsifname) + m = re.match(".*link/(loopback|ether) ([0-9a-fA-F:]+) .*", output) + mac = m.group(2) + self.macs[(name, ifname)] = mac + self.rmacs[mac] = (name, ifname) + + return self.macs[(name, ifname)] + + async def _delete_link(self, lname): + rname, rif = self.links[lname][2:4] + host = self.hosts[rname] + nsrif = host.get_ns_ifname(rif) + + self.logger.debug("%s: Deleting veth pair for link %s", self, lname) + rc, o, e = await host.async_cmd_status_nsonly( + [self.ip_path, "link", "delete", nsrif], + stdin=subprocess.DEVNULL, + start_new_session=True, + warn=False, + ) + if rc: + self.logger.error("Err del veth pair %s: %s", lname, cmd_error(rc, o, e)) + + async def _delete_links(self): + # for x in self.links: + # await self._delete_link(x) + return await asyncio.gather(*[self._delete_link(x) for x in self.links]) + + async def _async_delete(self): + """Delete the munet topology.""" + # logger = self.logger if False else logging + logger = self.logger + if type(self) == BaseMunet: # pylint: disable=C0123 + logger.info("%s: deleting.", self) + else: + logger.debug("%s: BaseMunet sub-class deleting.", self) + + logger.debug("Deleting links") + try: + await self._delete_links() + except Exception as error: + logger.error("%s: error deleting links: %s", self, error, exc_info=True) + + logger.debug("Deleting hosts and bridges") + try: + # Delete hosts and switches, wait for them all to complete + # even if there is an exception. + htask = [x.async_delete() for x in self.hosts.values()] + stask = [x.async_delete() for x in self.switches.values()] + await asyncio.gather(*htask, *stask, return_exceptions=True) + except Exception as error: + logger.error( + "%s: error deleting hosts and switches: %s", self, error, exc_info=True + ) + + self.links = {} + self.hosts = {} + self.switches = {} + + try: + if self.cli_server: + self.cli_server.cancel() + self.cli_server = None + if self.cli_sockpath: + await self.async_cmd_status( + "rm -rf " + os.path.dirname(self.cli_sockpath) + ) + self.cli_sockpath = None + except Exception as error: + logger.error( + "%s: error cli server or sockpaths: %s", self, error, exc_info=True + ) + + try: + if self.cli_histfile: + readline.write_history_file(self.cli_histfile) + self.cli_histfile = None + except Exception as error: + logger.error( + "%s: error saving history file: %s", self, error, exc_info=True + ) + + # XXX for some reason setns during the delete is changing our dir to /. + cwd = os.getcwd() + + try: + await super()._async_delete() + except Exception as error: + logger.error( + "%s: error deleting parent classes: %s", self, error, exc_info=True + ) + os.chdir(cwd) + + try: + if self.proc_path and str(self.proc_path) != "/proc": + logger.debug("%s: umount, remove proc_path %s", self, self.proc_path) + linux.umount(str(self.proc_path), 0) + os.rmdir(self.proc_path) + except Exception as error: + logger.warning( + "%s: error umount and removing proc_path %s: %s", + self, + self.proc_path, + error, + exc_info=True, + ) + try: + linux.umount(str(self.proc_path), linux.MNT_DETACH) + except Exception as error2: + logger.error( + "%s: error umount with detach proc_path %s: %s", + self, + self.proc_path, + error2, + exc_info=True, + ) + + if BaseMunet.g_unet == self: + BaseMunet.g_unet = None + + +BaseMunet.g_unet = None + +if True: # pylint: disable=using-constant-test + + class ShellWrapper: + """A Read-Execute-Print-Loop (REPL) interface. + + A newline or prompt changing command should be sent to the + spawned child prior to creation as the `prompt` will be `expect`ed + """ + + def __init__( + self, + spawn, + prompt, + continuation_prompt=None, + extra_init_cmd=None, + will_echo=False, + escape_ansi=False, + ): + self.echo = will_echo + self.escape = ( + re.compile(r"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]") if escape_ansi else None + ) + + logging.debug( + 'ShellWraper: XXX prompt "%s" will_echo %s child.echo %s', + prompt, + will_echo, + spawn.echo, + ) + + self.child = spawn + if self.child.echo: + logging.info("Setting child to echo") + self.child.setecho(False) + self.child.waitnoecho() + assert not self.child.echo + + self.prompt = prompt + self.cont_prompt = continuation_prompt + + # Use expect_exact if we can as it should be faster + self.expects = [prompt] + if re.escape(prompt) == prompt and hasattr(self.child, "expect_exact"): + self._expectf = self.child.expect_exact + else: + self._expectf = self.child.expect + if continuation_prompt: + self.expects.append(continuation_prompt) + if re.escape(continuation_prompt) != continuation_prompt: + self._expectf = self.child.expect + + if extra_init_cmd: + self.expect_prompt() + self.child.sendline(extra_init_cmd) + self.expect_prompt() + + def expect_prompt(self, timeout=-1): + return self._expectf(self.expects, timeout=timeout) + + def run_command(self, command, timeout=-1): + """Pexpect REPLWrapper compatible run_command. + + This will split `command` into lines and feed each one to the shell. + + Args: + command: string of commands separated by newlines, a trailing + newline will cause and empty line to be sent. + timeout: pexpect timeout value. + """ + lines = command.splitlines() + if command[-1] == "\n": + lines.append("") + output = "" + index = 0 + for line in lines: + self.child.sendline(line) + index = self.expect_prompt(timeout=timeout) + output += self.child.before + + if index: + if hasattr(self.child, "kill"): + self.child.kill(signal.SIGINT) + else: + self.child.send("\x03") + self.expect_prompt(timeout=30 if self.child.timeout is None else -1) + raise ValueError("Continuation prompt found at end of commands") + + if self.escape: + output = self.escape.sub("", output) + + return output + + def cmd_nostatus(self, cmd, timeout=-1): + r"""Execute a shell command. + + Returns: + (strip/cleaned \r) output + """ + output = self.run_command(cmd, timeout) + output = output.replace("\r\n", "\n") + if self.echo: + # remove the command + idx = output.find(cmd) + if idx == -1: + logging.warning( + "Didn't find command ('%s') in expected output ('%s')", + cmd, + output, + ) + else: + # Remove up to and including the command from the output stream + output = output[idx + len(cmd) :] + + return output.replace("\r", "").strip() + + def cmd_status(self, cmd, timeout=-1): + r"""Execute a shell command. + + Returns: + status and (strip/cleaned \r) output + """ + # Run the command getting the output + output = self.cmd_nostatus(cmd, timeout) + + # Now get the status + scmd = "echo $?" + rcstr = self.run_command(scmd) + rcstr = rcstr.replace("\r\n", "\n") + if self.echo: + # remove the command + idx = rcstr.find(scmd) + if idx == -1: + if self.echo: + logging.warning( + "Didn't find status ('%s') in expected output ('%s')", + scmd, + rcstr, + ) + try: + rc = int(rcstr) + except Exception: + rc = 255 + else: + rcstr = rcstr[idx + len(scmd) :].strip() + try: + rc = int(rcstr) + except ValueError as error: + logging.error( + "%s: error with expected status output: %s: %s", + self, + error, + rcstr, + exc_info=True, + ) + rc = 255 + return rc, output + + def cmd_raises(self, cmd, timeout=-1): + r"""Execute a shell command. + + Returns: + (strip/cleaned \r) ouptut + + Raises: + CalledProcessError: on non-zero exit status + """ + rc, output = self.cmd_status(cmd, timeout) + if rc: + raise CalledProcessError(rc, cmd, output) + return output + + +# --------------------------- +# Root level utility function +# --------------------------- + + +def get_exec_path(binary): + return commander.get_exec_path(binary) + + +def get_exec_path_host(binary): + return commander.get_exec_path(binary) + + +def get_our_script_path(script): + # would be nice to find this w/o using a path lookup + sdir = os.path.dirname(os.path.abspath(__file__)) + spath = os.path.join(sdir, script) + if os.path.exists(spath): + return spath + return get_exec_path(script) + + +commander = Commander("munet") +roothost = None |