# -*- coding: utf-8 eval: (blacken-mode 1) -*-
# SPDX-License-Identifier: GPL-2.0-or-later
#
# October 1 2021, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2021-2022, LabN Consulting, L.L.C.
#
# pylint: disable=protected-access
"""A module that defines objects for standalone use."""
import asyncio
import errno
import getpass
import ipaddress
import logging
import os
import random
import re
import shlex
import socket
import subprocess
import time

from pathlib import Path

from . import cli
from .base import BaseMunet
from .base import Bridge
from .base import Commander
from .base import LinuxNamespace
from .base import MunetError
from .base import Timeout
from .base import _async_get_exec_path
from .base import _get_exec_path
from .base import cmd_error
from .base import commander
from .base import fsafe_name
from .base import get_exec_path_host
from .config import config_subst
from .config import config_to_dict_with_key
from .config import find_matching_net_config
from .config import find_with_kv
from .config import merge_kind_config
from .watchlog import WatchLog


class L3ContainerNotRunningError(MunetError):
    """Exception if no running container exists."""


def get_loopback_ips(c, nid):
    if ip := c.get("ip"):
        if ip == "auto":
            return [ipaddress.ip_interface("10.255.0.0/32") + nid]
        if isinstance(ip, str):
            return [ipaddress.ip_interface(ip)]
        return [ipaddress.ip_interface(x) for x in ip]
    return []


def make_ip_network(net, inc):
    n = ipaddress.ip_network(net)
    return ipaddress.ip_network(
        (n.network_address + inc * n.num_addresses, n.prefixlen)
    )


def make_ip_interface(ia, inc):
    ia = ipaddress.ip_interface(ia)
    # this turns into a /32 fix this
    ia = ia + ia.network.num_addresses * inc
    # IPv6
    ia = ipaddress.ip_interface(str(ia).replace("/32", "/24").replace("/128", "/64"))
    return ia


def get_ip_network(c, brid, ipv6=False):
    ip = c.get("ipv6" if ipv6 else "ip")
    if ip and str(ip) != "auto":
        try:
            ifip = ipaddress.ip_interface(ip)
            if ifip.ip == ifip.network.network_address:
                return ifip.network
            return ifip
        except ValueError:
            return ipaddress.ip_network(ip)
    if ipv6:
        return make_ip_interface("fc00::fe/64", brid)
    return make_ip_interface("10.0.0.254/24", brid)


def parse_pciaddr(devaddr):
    comp = re.match(
        "(?:([0-9A-Fa-f]{4}):)?([0-9A-Fa-f]{2}):([0-9A-Fa-f]{2}).([0-7])", devaddr
    ).groups()
    if comp[0] is None:
        comp[0] = "0000"
    return [int(x, 16) for x in comp]


def read_int_value(path):
    return int(open(path, encoding="ascii").read())


def read_str_value(path):
    return open(path, encoding="ascii").read().strip()


def read_sym_basename(path):
    return os.path.basename(os.readlink(path))


async def to_thread(func):
    """to_thread for python < 3.9."""
    try:
        return await asyncio.to_thread(func)
    except AttributeError:
        logging.warning("Using backport to_thread")
        return await asyncio.get_running_loop().run_in_executor(None, func)


def convert_ranges_to_bitmask(ranges):
    bitmask = 0
    for r in ranges.split(","):
        if "-" not in r:
            bitmask |= 1 << int(r)
        else:
            x, y = (int(x) for x in r.split("-"))
            for b in range(x, y + 1):
                bitmask |= 1 << b
    return bitmask


class L2Bridge(Bridge):
    """A linux bridge with no IP network address."""

    def __init__(self, name=None, unet=None, logger=None, mtu=None, config=None):
        """Create a linux Bridge."""
        super().__init__(name=name, unet=unet, logger=logger, mtu=mtu)

        self.config = config if config else {}

    async def _async_delete(self):
        self.logger.debug("%s: deleting", self)
        await super()._async_delete()


class L3Bridge(Bridge):
    """A linux bridge with associated IP network address."""

    def __init__(self, name=None, unet=None, logger=None, mtu=None, config=None):
        """Create a linux Bridge."""
        super().__init__(name=name, unet=unet, logger=logger, mtu=mtu)

        self.config = config if config else {}

        self.ip_interface = get_ip_network(self.config, self.id)
        if hasattr(self.ip_interface, "network"):
            self.ip_address = self.ip_interface.ip
            self.ip_network = self.ip_interface.network
            self.cmd_raises(f"ip addr add {self.ip_interface} dev {name}")
        else:
            self.ip_address = None
            self.ip_network = self.ip_interface

        self.logger.debug("%s: set IPv4 network address to %s", self, self.ip_interface)
        self.cmd_raises("sysctl -w net.ipv4.ip_forward=1")

        self.ip6_interface = None
        if self.unet.ipv6_enable:
            self.ip6_interface = get_ip_network(self.config, self.id, ipv6=True)
            if hasattr(self.ip6_interface, "network"):
                self.ip6_address = self.ip6_interface.ip
                self.ip6_network = self.ip6_interface.network
                self.cmd_raises(f"ip addr add {self.ip6_interface} dev {name}")
            else:
                self.ip6_address = None
                self.ip6_network = self.ip6_interface

            self.logger.debug(
                "%s: set IPv6 network address to %s", self, self.ip_interface
            )
            self.cmd_raises("sysctl -w net.ipv6.conf.all.forwarding=1")

        self.is_nat = self.config.get("nat", False)
        if self.is_nat:
            self.cmd_raises(
                "iptables -t nat -A POSTROUTING "
                f"-s {self.ip_network} ! -d {self.ip_network} "
                f"! -o {self.name} -j MASQUERADE"
            )

    def get_intf_addr(self, ifname, ipv6=False):
        # None is a valid interface, we have the same address for all interfaces
        # just make sure they aren't asking for something we don't have.
        if ifname is not None and ifname not in self.intfs:
            return None
        return self.ip6_interface if ipv6 else self.ip_interface

    async def _async_delete(self):
        self.logger.debug("%s: deleting", self)

        if self.config.get("nat", False):
            self.cmd_status(
                "iptables -t nat -D POSTROUTING "
                f"-s {self.ip_network} ! -d {self.ip_network} "
                f"! -o {self.name} -j MASQUERADE"
            )
        await super()._async_delete()


class NodeMixin:
    """Node attributes and functionality."""

    next_ord = 1

    @classmethod
    def _get_next_ord(cls):
        # Do not use `cls` here b/c that makes the variable class specific
        n = L3NodeMixin.next_ord
        L3NodeMixin.next_ord = n + 1
        return n

    def __init__(self, *args, config=None, **kwargs):
        """Create a Node."""
        super().__init__(*args, **kwargs)

        self.config = config if config else {}
        config = self.config

        self.id = int(config["id"]) if "id" in config else self._get_next_ord()

        self.cmd_p = None
        self.container_id = None
        self.cleanup_called = False

        # Clear and create rundir early
        assert self.unet is not None
        self.rundir = self.unet.rundir.joinpath(self.name)
        commander.cmd_raises(f"rm -rf {self.rundir}")
        commander.cmd_raises(f"mkdir -p {self.rundir}")

    def _shebang_prep(self, config_key):
        cmd = self.config.get(config_key, "").strip()
        if not cmd:
            return []

        script_name = fsafe_name(config_key)

        # shell_cmd is a union and can be boolean or string
        shell_cmd = self.config.get("shell", "/bin/bash")
        if not isinstance(shell_cmd, str):
            if shell_cmd:
                # i.e., "shell: true"
                shell_cmd = "/bin/bash"
            else:
                # i.e., "shell: false"
                shell_cmd = ""

        # If we have a shell_cmd then we create a cleanup_cmds file in run_cmd
        # and volume mounted it
        if shell_cmd:
            # Create cleanup cmd file
            cmd = cmd.replace("%CONFIGDIR%", str(self.unet.config_dirname))
            cmd = cmd.replace("%RUNDIR%", str(self.rundir))
            cmd = cmd.replace("%NAME%", str(self.name))
            cmd += "\n"

            # Write out our cleanup cmd file at this time too.
            cmdpath = os.path.join(self.rundir, f"{script_name}.shebang")
            with open(cmdpath, mode="w+", encoding="utf-8") as cmdfile:
                cmdfile.write(f"#!{shell_cmd}\n")
                cmdfile.write(cmd)
                cmdfile.flush()
            commander.cmd_raises(f"chmod 755 {cmdpath}")

            if self.container_id:
                # XXX this counts on it being mounted in container, ugly
                cmds = [f"/tmp/{script_name}.shebang"]
            else:
                cmds = [cmdpath]
        else:
            cmds = []
            if isinstance(cmd, str):
                cmds.extend(shlex.split(cmd))
            else:
                cmds.extend(cmd)
            cmds = [
                x.replace("%CONFIGDIR%", str(self.unet.config_dirname)) for x in cmds
            ]
            cmds = [x.replace("%RUNDIR%", str(self.rundir)) for x in cmds]
            cmds = [x.replace("%NAME%", str(self.name)) for x in cmds]

        return cmds

    async def _async_shebang_cmd(self, config_key, warn=True):
        cmds = self._shebang_prep(config_key)
        if not cmds:
            return 0

        rc, o, e = await self.async_cmd_status(cmds, warn=warn)
        if not rc and warn and (o or e):
            self.logger.info(
                f"async_shebang_cmd ({config_key}): %s", cmd_error(rc, o, e)
            )
        elif rc and warn:
            self.logger.warning(
                f"async_shebang_cmd ({config_key}): %s", cmd_error(rc, o, e)
            )
        else:
            self.logger.debug(
                f"async_shebang_cmd ({config_key}): %s", cmd_error(rc, o, e)
            )

        return rc

    def has_run_cmd(self) -> bool:
        return bool(self.config.get("cmd", "").strip())

    async def get_proc_child_pid(self, p):
        # commander is right for both unshare inline (our proc pidns)
        # and non-inline (root pidns).

        # This doesn't work b/c we can't get back to the root pidns

        rootcmd = self.unet.rootcmd
        pgrep = rootcmd.get_exec_path("pgrep")
        spid = str(p.pid)
        for _ in Timeout(4):
            if p.returncode is not None:
                self.logger.debug("%s: proc %s exited before getting child", self, p)
                return None

            rc, o, e = await rootcmd.async_cmd_status(
                [pgrep, "-o", "-P", spid], warn=False
            )
            if rc == 0:
                return int(o.strip())

            await asyncio.sleep(0.1)
            self.logger.debug(
                "%s: no child of proc %s: %s", self, p, cmd_error(rc, o, e)
            )
        self.logger.warning("%s: timeout getting child pid of proc %s", self, p)
        return None

    async def run_cmd(self):
        """Run the configured commands for this node."""
        self.logger.debug(
            "[rundir %s exists %s]", self.rundir, os.path.exists(self.rundir)
        )

        cmds = self._shebang_prep("cmd")
        if not cmds:
            return

        stdout = open(os.path.join(self.rundir, "cmd.out"), "wb")
        stderr = open(os.path.join(self.rundir, "cmd.err"), "wb")
        self.cmd_pid = None
        self.cmd_p = await self.async_popen(
            cmds,
            stdin=subprocess.DEVNULL,
            stdout=stdout,
            stderr=stderr,
            start_new_session=True,  # allows us to signal all children to exit
        )

        # If our process is actually the child of an nsenter fetch its pid.
        if self.nsenter_fork:
            self.cmd_pid = await self.get_proc_child_pid(self.cmd_p)

        self.logger.debug(
            "%s: async_popen %s => %s (cmd_pid %s)",
            self,
            cmds,
            self.cmd_p.pid,
            self.cmd_pid,
        )

        self.pytest_hook_run_cmd(stdout, stderr)

        return self.cmd_p

    async def _async_cleanup_cmd(self):
        """Run the configured cleanup commands for this node.

        This function is called by subclass' async_cleanup_cmd
        """
        self.cleanup_called = True

        return await self._async_shebang_cmd("cleanup-cmd")

    def has_cleanup_cmd(self) -> bool:
        return bool(self.config.get("cleanup-cmd", "").strip())

    async def async_cleanup_cmd(self):
        """Run the configured cleanup commands for this node."""
        return await self._async_cleanup_cmd()

    def has_ready_cmd(self) -> bool:
        return bool(self.config.get("ready-cmd", "").strip())

    async def async_ready_cmd(self):
        """Run the configured ready commands for this node."""
        return not await self._async_shebang_cmd("ready-cmd", warn=False)

    def cmd_completed(self, future):
        self.logger.debug("%s: cmd completed callback", self)
        try:
            status = future.result()
            self.logger.debug(
                "%s: node cmd_p completed result: %s cmd: %s", self, status, self.cmd_p
            )
            self.cmd_pid = None
            self.cmd_p = None
        except asyncio.CancelledError:
            # Should we stop the container if we have one?
            self.logger.debug("%s: node cmd_p.wait() canceled", future)

    def pytest_hook_run_cmd(self, stdout, stderr):
        """Handle pytest options related to running the node cmd.

        This function does things such as launch tail'ing windows
        on the given files if requested by the user.

        Args:
            stdout: file-like object with a ``name`` attribute, or a path to a file.
            stderr: file-like object with a ``name`` attribute, or a path to a file.
        """
        if not self.unet:
            return

        outopt = self.unet.cfgopt.getoption("--stdout")
        outopt = outopt if outopt is not None else ""
        if outopt == "all" or self.name in outopt.split(","):
            outname = stdout.name if hasattr(stdout, "name") else stdout
            self.run_in_window(f"tail -F {outname}", title=f"O:{self.name}")

        if stderr:
            erropt = self.unet.cfgopt.getoption("--stderr")
            erropt = erropt if erropt is not None else ""
            if erropt == "all" or self.name in erropt.split(","):
                errname = stderr.name if hasattr(stderr, "name") else stderr
                self.run_in_window(f"tail -F {errname}", title=f"E:{self.name}")

    def pytest_hook_open_shell(self):
        if not self.unet:
            return

        gdbcmd = self.config.get("gdb-cmd")
        shellopt = self.unet.cfgopt.getoption("--gdb", "")
        should_gdb = gdbcmd and (shellopt == "all" or self.name in shellopt.split(","))
        use_emacs = self.unet.cfgopt.getoption("--gdb-use-emacs", False)

        if should_gdb and not use_emacs:
            cmds = self.config.get("gdb-target-cmds", [])
            for cmd in cmds:
                gdbcmd += f" '-ex={cmd}'"

            bps = self.unet.cfgopt.getoption("--gdb-breakpoints", "").split(",")
            for bp in bps:
                if bp:
                    gdbcmd += f" '-ex=b {bp}'"

            cmds = self.config.get("gdb-run-cmds", [])
            for cmd in cmds:
                gdbcmd += f" '-ex={cmd}'"

            self.run_in_window(gdbcmd, ns_only=True)
        elif should_gdb and use_emacs:
            gdbcmd = gdbcmd.replace("gdb ", "gdb -i=mi ")
            ecbin = self.get_exec_path("emacsclient")
            # output = self.cmd_raises(
            #     [ecbin, "--eval", f"(gdb \"{gdbcmd} -ex='p 123456'\")"]
            # )
            _ = self.cmd_raises([ecbin, "--eval", f'(gdb "{gdbcmd}")'])

            # can't figure out how to wait until symbols are loaded, until we do we just
            # have to wait "long enough" for the symbol load to finish :/
            # for _ in range(100):
            #     output = self.cmd_raises(
            #         [
            #             ecbin,
            #             "--eval",
            #             f"gdb-first-prompt",
            #         ]
            #     )
            #     if output == "nil\n":
            #         break
            #     time.sleep(0.25)

            time.sleep(10)

            cmds = self.config.get("gdb-target-cmds", [])
            for cmd in cmds:
                # we may want to quote quotes in the cmd string
                self.cmd_raises(
                    [
                        ecbin,
                        "--eval",
                        f'(gud-gdb-run-command-fetch-lines "{cmd}" "*gud-gdb*")',
                    ]
                )

            bps = self.unet.cfgopt.getoption("--gdb-breakpoints", "").split(",")
            for bp in bps:
                cmd = f"br {bp}"
                self.cmd_raises(
                    [
                        ecbin,
                        "--eval",
                        f'(gud-gdb-run-command-fetch-lines "{cmd}" "*gud-gdb*")',
                    ]
                )

            cmds = self.config.get("gdb-run-cmds", [])
            for cmd in cmds:
                # we may want to quote quotes in the cmd string
                self.cmd_raises(
                    [
                        ecbin,
                        "--eval",
                        f'(gud-gdb-run-command-fetch-lines "{cmd}" "*gud-gdb*")',
                    ]
                )
                gdbcmd += f" '-ex={cmd}'"

        shellopt = self.unet.cfgopt.getoption("--shell")
        shellopt = shellopt if shellopt else ""
        if shellopt == "all" or self.name in shellopt.split(","):
            self.run_in_window("bash")

    async def _async_delete(self):
        self.logger.debug("%s: NodeMixin sub-class _async_delete", self)

        if self.cmd_p:
            await self.async_cleanup_proc(self.cmd_p, self.cmd_pid)
            self.cmd_p = None

        # Next call users "cleanup_cmd:"
        try:
            if not self.cleanup_called:
                await self.async_cleanup_cmd()
        except Exception as error:
            self.logger.warning(
                "Got an error during delete from async_cleanup_cmd: %s", error
            )

        # delete the LinuxNamespace/InterfaceMixin
        await super()._async_delete()


class SSHRemote(NodeMixin, Commander):
    """SSHRemote a node representing an ssh connection to something."""

    def __init__(
        self,
        name,
        server,
        port=22,
        user=None,
        password=None,
        idfile=None,
        **kwargs,
    ):
        super().__init__(name, **kwargs)

        self.logger.debug("%s: creating", self)

        # Things done in LinuxNamepsace we need to replicate here.
        self.rundir = self.unet.rundir.joinpath(self.name)
        self.unet.cmd_raises(f"rm -rf {self.rundir}")
        self.unet.cmd_raises(f"mkdir -p {self.rundir}")

        self.mgmt_ip = None
        self.mgmt_ip6 = None

        self.port = port

        if user:
            self.user = user
        elif "SUDO_USER" in os.environ:
            self.user = os.environ["SUDO_USER"]
        else:
            self.user = getpass.getuser()
        self.password = password
        self.idfile = idfile

        self.server = f"{self.user}@{server}"

        # Setup our base `pre-cmd` values
        #
        # We maybe should add environment variable transfer here in particular
        # MUNET_NODENAME. The problem is the user has to explicitly approve
        # of SendEnv variables.
        self.__base_cmd = [
            get_exec_path_host("sudo"),
            "-E",
            f"-u{self.user}",
            get_exec_path_host("ssh"),
        ]
        if port != 22:
            self.__base_cmd.append(f"-p{port}")
        self.__base_cmd.append("-q")
        self.__base_cmd.append("-oStrictHostKeyChecking=no")
        self.__base_cmd.append("-oUserKnownHostsFile=/dev/null")
        if self.idfile:
            self.__base_cmd.append(f"-i{self.idfile}")
        # Would be nice but has to be accepted by server config so not very useful.
        # self.__base_cmd.append("-oSendVar='TEST'")
        self.__base_cmd_pty = list(self.__base_cmd)
        self.__base_cmd_pty.append("-t")
        self.__base_cmd.append(self.server)
        self.__base_cmd_pty.append(self.server)
        # self.set_pre_cmd(pre_cmd, pre_cmd_tty)

        self.logger.info("%s: created", self)

    def has_ready_cmd(self) -> bool:
        return bool(self.config.get("ready-cmd", "").strip())

    def _get_pre_cmd(self, use_str, use_pty, ns_only=False, **kwargs):
        pre_cmd = []
        if self.unet:
            pre_cmd = self.unet._get_pre_cmd(False, use_pty, ns_only=False, **kwargs)
        if ns_only:
            return pre_cmd

        # XXX grab the env from kwargs and add to podman exec
        # env = kwargs.get("env", {})
        if use_pty:
            pre_cmd = pre_cmd + self.__base_cmd_pty
        else:
            pre_cmd = pre_cmd + self.__base_cmd
        return shlex.join(pre_cmd) if use_str else list(pre_cmd)

    def _get_cmd_as_list(self, cmd):
        """Given a list or string return a list form for execution.

        If cmd is a string then [cmd] is returned, for most other
        node types ["bash", "-c", cmd] is returned but in our case
        ssh is the shell.

        Args:
            cmd: list or string representing the command to execute.
            str_shell: if True and `cmd` is a string then run the
              command using bash -c
        Returns:
            list of commands to execute.
        """
        return [cmd] if isinstance(cmd, str) else cmd


# Would maybe like to refactor this into L3 and Node
class L3NodeMixin(NodeMixin):
    """A linux namespace with IP attributes."""

    def __init__(self, *args, unet=None, **kwargs):
        """Create an L3Node."""
        # logging.warning(
        #     "L3NodeMixin: config %s unet %s kwargs %s", config, unet, kwargs
        # )
        super().__init__(*args, unet=unet, **kwargs)

        self.mgmt_ip = None  # set in parser.py
        self.mgmt_ip6 = None  # set in parser.py
        self.host_intfs = {}
        self.phy_intfs = {}
        self.phycount = 0
        self.phy_odrivers = {}
        self.tapmacs = {}
        self.watched_logs = {}

        self.intf_tc_count = 0

        # super().__init__(name=name, **kwargs)

        self.mount_volumes()

        # -----------------------
        # Setup node's networking
        # -----------------------
        if not unet.ipv6_enable:
            # Disable IPv6
            self.cmd_raises("sysctl -w net.ipv6.conf.all.autoconf=0")
            self.cmd_raises("sysctl -w net.ipv6.conf.all.disable_ipv6=1")
        else:
            self.cmd_raises("sysctl -w net.ipv6.conf.all.autoconf=1")
            self.cmd_raises("sysctl -w net.ipv6.conf.all.disable_ipv6=0")

        self.next_p2p_network = ipaddress.ip_network(f"10.254.{self.id}.0/31")
        self.next_p2p_network6 = ipaddress.ip_network(f"fcff:ffff:{self.id:02x}::/127")

        self.loopback_ip = None
        self.loopback_ips = get_loopback_ips(self.config, self.id)
        self.loopback_ip = self.loopback_ips[0] if self.loopback_ips else None
        if self.loopback_ip:
            self.cmd_raises_nsonly(f"ip addr add {self.loopback_ip} dev lo")
            self.cmd_raises_nsonly("ip link set lo up")
            for i, ip in enumerate(self.loopback_ips[1:]):
                self.cmd_raises_nsonly(f"ip addr add {ip} dev lo:{i}")

        # -------------------
        # Setup node's rundir
        # -------------------

        # Not host path based, but we assume same
        self.set_ns_cwd(self.rundir)

        # Save the namespace pid
        with open(os.path.join(self.rundir, "nspid"), "w", encoding="ascii") as f:
            f.write(f"{self.pid}\n")

        with open(os.path.join(self.rundir, "nspids"), "w", encoding="ascii") as f:
            f.write(f'{" ".join([str(x) for x in self.pids])}\n')

        # Create a hosts file to map our name
        hosts_file = os.path.join(self.rundir, "hosts.txt")
        with open(hosts_file, "w", encoding="ascii") as hf:
            hf.write(
                f"""127.0.0.1\tlocalhost {self.name}
::1\tip6-localhost ip6-loopback
fe00::0\tip6-localnet
ff00::0\tip6-mcastprefix
ff02::1\tip6-allnodes
ff02::2\tip6-allrouters
"""
            )
        if hasattr(self, "bind_mount"):
            self.bind_mount(hosts_file, "/etc/hosts")

    def add_watch_log(self, path, watchfor_re=None):
        """Add a WatchLog to this nodes watched logs.

        Args:
            path: If relative is relative to the nodes ``rundir``
            watchfor_re: Regular expression to watch the log for and raise an exception
                         if found.

        Return:
            The watching task if request or None otherwise.
        """
        path = Path(path)
        if not path.is_absolute():
            path = self.rundir.joinpath(path)

        wl = WatchLog(path)
        self.watched_logs[wl.path] = wl
        task = wl.raise_if_match_task(watchfor_re) if watchfor_re else None
        return task

    async def console(
        self,
        concmd,
        prompt=r"(^|\r?\n)[^#\$]*[#\$] ",
        is_bourne=True,
        user=None,
        password=None,
        expects=None,
        sends=None,
        use_pty=False,
        will_echo=False,
        logfile_prefix="console",
        trace=True,
        **kwargs,
    ):
        """Create a REPL (read-eval-print-loop) driving a console.

        Args:
            concmd: string or list to popen with, or an already open socket
            prompt: the REPL prompt to look for, the function returns when seen
            is_bourne: True if the console is a bourne shell
            user: user name to log in with
            password: password to log in with
            expects: a list of regex other than the prompt, the standard user, or
                password to look for. "ogin:" or "[Pp]assword:"r.
            sends: what to send when an element of `expects` matches. Can be the
                empty string to send nothing.
            use_pty: true for pty based expect, otherwise uses popen (pipes/files)
            will_echo: bash is buggy in that it echo's to non-tty unlike any other
                sh/ksh, set this value to true if running back
            logfile_prefix: prefix for 3 logfiles opened to track the console i/o
            trace: trace the send/expect sequence
            **kwargs: kwargs passed on the _spawn.
        """
        lfname = os.path.join(self.rundir, f"{logfile_prefix}-log.txt")
        logfile = open(lfname, "a+", encoding="utf-8")
        logfile.write("-- start logging for: '{}' --\n".format(concmd))

        lfname = os.path.join(self.rundir, f"{logfile_prefix}-read-log.txt")
        logfile_read = open(lfname, "a+", encoding="utf-8")
        logfile_read.write("-- start read logging for: '{}' --\n".format(concmd))

        lfname = os.path.join(self.rundir, f"{logfile_prefix}-send-log.txt")
        logfile_send = open(lfname, "a+", encoding="utf-8")
        logfile_send.write("-- start send logging for: '{}' --\n".format(concmd))

        expects = [] if expects is None else expects
        sends = [] if sends is None else sends
        if user:
            expects.append("ogin:")
            sends.append(user + "\n")
        if password is not None:
            expects.append("assword:")
            sends.append(password + "\n")
        repl = await self.shell_spawn(
            concmd,
            prompt,
            expects=expects,
            sends=sends,
            use_pty=use_pty,
            will_echo=will_echo,
            is_bourne=is_bourne,
            logfile=logfile,
            logfile_read=logfile_read,
            logfile_send=logfile_send,
            trace=trace,
            **kwargs,
        )
        return repl

    async def monitor(
        self,
        sockpath,
        prompt=r"\(qemu\) ",
    ):
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.connect(sockpath)

        pfx = os.path.basename(sockpath)

        lfname = os.path.join(self.rundir, f"{pfx}-log.txt")
        logfile = open(lfname, "a+", encoding="utf-8")
        logfile.write("-- start logging for: '{}' --\n".format(sock))

        lfname = os.path.join(self.rundir, f"{pfx}-read-log.txt")
        logfile_read = open(lfname, "a+", encoding="utf-8")
        logfile_read.write("-- start read logging for: '{}' --\n".format(sock))

        p = self.spawn(sock, prompt, logfile=logfile, logfile_read=logfile_read)
        from .base import ShellWrapper  # pylint: disable=C0415

        p.send("\n")
        return ShellWrapper(p, prompt, None, will_echo=True, escape_ansi=True)

    def mount_volumes(self):
        for m in self.config.get("volumes", []):
            if isinstance(m, str):
                s = m.split(":", 1)
                if len(s) == 1:
                    self.tmpfs_mount(s[0])
                else:
                    spath = s[0]
                    if spath[0] == ".":
                        spath = os.path.abspath(
                            os.path.join(self.unet.config_dirname, spath)
                        )
                    self.bind_mount(spath, s[1])
                continue
            raise NotImplementedError("complex mounts for non-containers")

    def get_ifname(self, netname):
        for c in self.config["connections"]:
            if c["to"] == netname:
                return c["name"]
        return None

    def set_lan_addr(self, switch, cconf):
        if ip := cconf.get("ip"):
            ipaddr = ipaddress.ip_interface(ip)
            assert ipaddr.version == 4
        elif self.unet.autonumber and "ip" not in cconf:
            self.logger.debug(
                "%s: prefixlen of switch %s is %s",
                self,
                switch.name,
                switch.ip_network.prefixlen,
            )
            n = switch.ip_network
            ipaddr = ipaddress.ip_interface((n.network_address + self.id, n.prefixlen))
        else:
            ipaddr = None

        if ip := cconf.get("ipv6"):
            ip6addr = ipaddress.ip_interface(ip)
            assert ipaddr.version == 6
        elif self.unet.ipv6_enable and self.unet.autonumber and "ipv6" not in cconf:
            self.logger.debug(
                "%s: prefixlen of switch %s is %s",
                self,
                switch.name,
                switch.ip6_network.prefixlen,
            )
            n = switch.ip6_network
            ip6addr = ipaddress.ip_interface((n.network_address + self.id, n.prefixlen))
        else:
            ip6addr = None

        dns_network = self.unet.topoconf.get("dns-network")
        for ip in (ipaddr, ip6addr):
            if not ip:
                continue
            ipcmd = "ip " if ip.version == 4 else "ip -6 "
            if dns_network and dns_network == switch.name:
                if ip.version == 4:
                    self.mgmt_ip = ip.ip
                else:
                    self.mgmt_ip6 = ip.ip
            ifname = cconf["name"]
            self.set_intf_addr(ifname, ip)
            self.logger.debug("%s: adding %s to lan intf %s", self, ip, ifname)
            if not self.is_vm:
                self.intf_ip_cmd(ifname, ipcmd + f"addr add {ip} dev {ifname}")
                if hasattr(switch, "is_nat") and switch.is_nat:
                    swaddr = (
                        switch.ip_address if ip.version == 4 else switch.ip6_address
                    )
                    self.cmd_raises(ipcmd + f"route add default via {swaddr}")

    def _set_p2p_addr(self, other, cconf, occonf, ipv6=False):
        ipkey = "ipv6" if ipv6 else "ip"
        ipaddr = ipaddress.ip_interface(cconf[ipkey]) if cconf.get(ipkey) else None
        oipaddr = ipaddress.ip_interface(occonf[ipkey]) if occonf.get(ipkey) else None
        self.logger.debug(
            "%s: set_p2p_addr %s %s %s", self, other.name, ipaddr, oipaddr
        )

        if not ipaddr and not oipaddr:
            if self.unet.autonumber:
                if ipv6:
                    n = self.next_p2p_network6
                    self.next_p2p_network6 = make_ip_network(n, 1)
                else:
                    n = self.next_p2p_network
                    self.next_p2p_network = make_ip_network(n, 1)

                ipaddr = ipaddress.ip_interface(n)
                oipaddr = ipaddress.ip_interface((ipaddr.ip + 1, n.prefixlen))
            else:
                return

        if ipaddr:
            ifname = cconf["name"]
            self.set_intf_addr(ifname, ipaddr)
            self.logger.debug("%s: adding %s to p2p intf %s", self, ipaddr, ifname)
            if "physical" not in cconf and not self.is_vm:
                self.intf_ip_cmd(ifname, f"ip addr add {ipaddr} dev {ifname}")

        if oipaddr:
            oifname = occonf["name"]
            other.set_intf_addr(oifname, oipaddr)
            self.logger.debug(
                "%s: adding %s to other p2p intf %s", other, oipaddr, oifname
            )
            if "physical" not in occonf and not other.is_vm:
                other.intf_ip_cmd(oifname, f"ip addr add {oipaddr} dev {oifname}")

    def set_p2p_addr(self, other, cconf, occonf):
        self._set_p2p_addr(other, cconf, occonf, ipv6=False)
        if self.unet.ipv6_enable:
            self._set_p2p_addr(other, cconf, occonf, ipv6=True)

    async def add_host_intf(self, hname, lname, mtu=None):
        if hname in self.host_intfs:
            return
        self.host_intfs[hname] = lname

        # See if this interace is missing and needs to be fixed
        rc, o, _ = self.unet.rootcmd.cmd_status("ip -o link show")
        m = re.search(rf"\d+:\s+(\S+):.*altname {re.escape(hname)}\W", o)
        if m:
            # need to rename
            dname = m.group(1)
            self.logger.info("Fixing misnamed %s to %s", dname, hname)
            self.unet.rootcmd.cmd_status(
                f"ip link property del dev {dname} altname {hname}"
            )
            self.unet.rootcmd.cmd_status(f"ip link set {dname} name {hname}")

        rc, o, _ = self.unet.rootcmd.cmd_status("ip -o link show")
        m = re.search(rf"\d+:\s+{re.escape(hname)}:.*", o)
        if m:
            self.unet.rootcmd.cmd_nostatus(f"ip link set {hname} down ")
            self.unet.rootcmd.cmd_raises(f"ip link set {hname} netns {self.pid}")
        # Wait for interface to show up in namespace
        for retry in range(0, 10):
            rc, o, _ = self.cmd_status(f"ip -o link show {hname}")
            if not rc:
                if re.search(rf"\d+: {re.escape(hname)}:.*", o):
                    break
            if retry > 0:
                await asyncio.sleep(1)
        self.cmd_raises(f"ip link set {hname} name {lname}")
        if mtu:
            self.cmd_raises(f"ip link set {lname} mtu {mtu}")
        self.cmd_raises(f"ip link set {lname} up")

    async def rem_host_intf(self, hname):
        lname = self.host_intfs[hname]
        self.cmd_raises(f"ip link set {lname} down")
        self.cmd_raises(f"ip link set {lname} name {hname}")
        self.cmd_status(f"ip link set netns 1 dev {hname}")
        # The above is failing sometimes and not sure why
        # logging.error(
        #     "XXX after setns %s",
        #     self.unet.rootcmd.cmd_nostatus(f"ip link show {hname}"),
        # )
        del self.host_intfs[hname]

    async def add_phy_intf(self, devaddr, lname):
        """Add a physical inteface (i.e. mv it to vfio-pci driver.

        This is primarily useful for Qemu, but also for things like TREX or DPDK
        """
        if devaddr in self.phy_intfs:
            return
        self.phy_intfs[devaddr] = lname
        index = len(self.phy_intfs)

        _, _, off, fun = parse_pciaddr(devaddr)
        doffset = off * 8 + fun

        is_virtual = self.unet.rootcmd.path_exists(
            f"/sys/bus/pci/devices/{devaddr}/physfn"
        )
        if is_virtual:
            pfname = self.unet.rootcmd.cmd_raises(
                f"ls -1 /sys/bus/pci/devices/{devaddr}/physfn/net"
            ).strip()
            pdevaddr = read_sym_basename(f"/sys/bus/pci/devices/{devaddr}/physfn")
            _, _, poff, pfun = parse_pciaddr(pdevaddr)
            poffset = poff * 8 + pfun

            offset = read_int_value(
                f"/sys/bus/pci/devices/{devaddr}/physfn/sriov_offset"
            )
            stride = read_int_value(
                f"/sys/bus/pci/devices/{devaddr}/physfn/sriov_stride"
            )
            vf = (doffset - offset - poffset) // stride
            mac = f"02:cc:cc:cc:{index:02x}:{self.id:02x}"
            # Some devices require the parent to be up (e.g., ixbge)
            self.unet.rootcmd.cmd_raises(f"ip link set {pfname} up")
            self.unet.rootcmd.cmd_raises(f"ip link set {pfname} vf {vf} mac {mac}")
            self.unet.rootcmd.cmd_status(f"ip link set {pfname} vf {vf} trust on")
            self.tapmacs[devaddr] = mac

        self.logger.info("Adding physical PCI device %s as %s", devaddr, lname)

        # Get interface name and set to down if present
        ec, ifname, _ = self.unet.rootcmd.cmd_status(
            f"ls /sys/bus/pci/devices/{devaddr}/net/", warn=False
        )
        ifname = ifname.strip()
        if not ec and ifname:
            # XXX Should only do this is the device is up, and then likewise return it
            # up on exit self.phy_intfs_hostname[devaddr] = ifname
            self.logger.info(
                "Setting physical PCI device %s named %s down", devaddr, ifname
            )
            self.unet.rootcmd.cmd_status(
                f"ip link set {ifname} down 2> /dev/null || true"
            )

        # Get the current bound driver, and unbind
        try:
            driver = read_sym_basename(f"/sys/bus/pci/devices/{devaddr}/driver")
            driver = driver.strip()
        except Exception:
            driver = ""
        if driver:
            if driver == "vfio-pci":
                self.logger.info(
                    "Physical PCI device %s already bound to vfio-pci", devaddr
                )
                return

            self.logger.info(
                "Unbinding physical PCI device %s from driver %s", devaddr, driver
            )
            self.phy_odrivers[devaddr] = driver
            self.unet.rootcmd.cmd_raises(
                f"echo {devaddr} | timeout 10 tee /sys/bus/pci/drivers/{driver}/unbind"
            )

        # Add the device vendor and device id to vfio-pci in case it's the first time
        vendor = read_str_value(f"/sys/bus/pci/devices/{devaddr}/vendor")
        devid = read_str_value(f"/sys/bus/pci/devices/{devaddr}/device")
        self.logger.info("Adding device IDs %s:%s to vfio-pci", vendor, devid)
        ec, _, _ = self.unet.rootcmd.cmd_status(
            f"echo {vendor} {devid} > /sys/bus/pci/drivers/vfio-pci/new_id", warn=False
        )

        for retry in range(0, 10):
            if self.unet.rootcmd.path_exists(
                f"/sys/bus/pci/drivers/vfio-pci/{devaddr}"
            ):
                break
            if retry > 0:
                await asyncio.sleep(1)

            # Bind to vfio-pci if wasn't added with new_id
            self.logger.info("Binding physical PCI device %s to vfio-pci", devaddr)
            ec, _, _ = self.unet.rootcmd.cmd_status(
                f"echo {devaddr} > /sys/bus/pci/drivers/vfio-pci/bind"
            )

    async def rem_phy_intf(self, devaddr):
        """Remove a physical inteface (i.e. mv it away from vfio-pci driver.

        This is primarily useful for Qemu, but also for things like TREX or DPDK
        """
        lname = self.phy_intfs.get(devaddr, "")
        if lname:
            del self.phy_intfs[devaddr]

        # ifname = self.phy_intfs_hostname.get(devaddr, "")
        # if ifname
        #     del self.phy_intfs_hostname[devaddr]

        driver = self.phy_odrivers.get(devaddr, "")
        if not driver:
            self.logger.info(
                "Physical PCI device %s was bound to vfio-pci on entry", devaddr
            )
            return

        self.logger.info(
            "Unbinding physical PCI device %s from driver vfio-pci", devaddr
        )
        self.unet.rootcmd.cmd_status(
            f"echo {devaddr} | timeout 10 tee /sys/bus/pci/drivers/vfio-pci/unbind"
        )

        self.logger.info("Binding physical PCI device %s to driver %s", devaddr, driver)
        ec, _, _ = self.unet.rootcmd.cmd_status(
            f"echo {devaddr} > /sys/bus/pci/drivers/{driver}/bind"
        )
        if not ec:
            del self.phy_odrivers[devaddr]

    async def _async_delete(self):
        self.logger.debug("%s: L3NodeMixin sub-class _async_delete", self)

        # XXX do we need to run the cleanup command before these infra changes?

        # remove any hostintf interfaces
        for hname in list(self.host_intfs):
            await self.rem_host_intf(hname)

        # delete the LinuxNamespace/InterfaceMixin
        await super()._async_delete()

        # remove any hostintf interfaces, needs to come after normal exits
        for devaddr in list(self.phy_intfs):
            await self.rem_phy_intf(devaddr)


class L3NamespaceNode(L3NodeMixin, LinuxNamespace):
    """A namespace L3 node."""

    def __init__(self, name, pid=True, **kwargs):
        # logging.warning(
        #     "L3NamespaceNode: name %s MRO: %s kwargs %s",
        #     name,
        #     L3NamespaceNode.mro(),
        #     kwargs,
        # )
        super().__init__(name, pid=pid, **kwargs)
        super().pytest_hook_open_shell()

    async def _async_delete(self):
        self.logger.debug("%s: deleting", self)
        await super()._async_delete()


class L3ContainerNode(L3NodeMixin, LinuxNamespace):
    """An container (podman) based L3 node."""

    def __init__(self, name, config, **kwargs):
        """Create a Container Node."""
        self.cont_exec_paths = {}
        self.container_id = None
        self.container_image = config["image"]
        self.extra_mounts = []
        assert self.container_image

        self.cmd_p = None
        self.cmd_pid = None
        self.__base_cmd = []
        self.__base_cmd_pty = []

        # don't we have a mutini or cat process?
        super().__init__(
            name=name,
            config=config,
            # pid=True,
            # cgroup=True,
            # private_mounts=["/sys/fs/cgroup:/sys/fs/cgroup"],
            **kwargs,
        )

    @property
    def is_container(self):
        return True

    def get_exec_path(self, binary):
        """Return the full path to the binary executable inside the image.

        `binary` :: binary name or list of binary names
        """
        return _get_exec_path(binary, self.cmd_status, self.cont_exec_paths)

    async def async_get_exec_path(self, binary):
        """Return the full path to the binary executable inside the image.

        `binary` :: binary name or list of binary names
        """
        path = await _async_get_exec_path(
            binary, self.async_cmd_status, self.cont_exec_paths
        )
        return path

    def get_exec_path_host(self, binary):
        """Return the full path to the binary executable on the host.

        `binary` :: binary name or list of binary names
        """
        return get_exec_path_host(binary)

    def _get_pre_cmd(self, use_str, use_pty, ns_only=False, root_level=False, **kwargs):
        if ns_only:
            return super()._get_pre_cmd(
                use_str, use_pty, ns_only=True, root_level=root_level, **kwargs
            )
        if not self.cmd_p:
            if self.container_id:
                s = f"{self}: Running command in namespace b/c container exited"
                self.logger.warning("%s", s)
                raise L3ContainerNotRunningError(s)
            self.logger.debug("%s: Running command in namespace b/c no container", self)
            return super()._get_pre_cmd(
                use_str, use_pty, ns_only=True, root_level=root_level, **kwargs
            )

        # We need to enter our namespaces when running the podman command
        pre_cmd = super()._get_pre_cmd(
            False, use_pty, ns_only=True, root_level=root_level, **kwargs
        )

        # XXX grab the env from kwargs and add to podman exec
        # env = kwargs.get("env", {})
        if use_pty:
            pre_cmd = pre_cmd + self.__base_cmd_pty
        else:
            pre_cmd = pre_cmd + self.__base_cmd
        return shlex.join(pre_cmd) if use_str else pre_cmd

    def tmpfs_mount(self, inner):
        # eventually would be nice to support live mounting
        assert not self.container_id
        self.logger.debug("Mounting tmpfs on %s", inner)
        self.extra_mounts.append(f"--mount=type=tmpfs,destination={inner}")

    def bind_mount(self, outer, inner):
        # eventually would be nice to support live mounting
        assert not self.container_id
        # First bind the mount in the parent this allows things like /etc/hosts to work
        # correctly when running "nsonly" commands
        super().bind_mount(outer, inner)
        # Then arrange for binding in the container as well.
        self.logger.debug("Bind mounting %s on %s", outer, inner)
        if not self.test_nsonly("-e", outer):
            self.cmd_raises_nsonly(f"mkdir -p {outer}")
        self.extra_mounts.append(f"--mount=type=bind,src={outer},dst={inner}")

    def mount_volumes(self):
        args = []
        for m in self.config.get("volumes", []):
            if isinstance(m, str):
                s = m.split(":", 1)
                if len(s) == 1:
                    args.append("--mount=type=tmpfs,destination=" + m)
                else:
                    spath = s[0]
                    spath = os.path.abspath(
                        os.path.join(
                            os.path.dirname(self.unet.config["config_pathname"]), spath
                        )
                    )
                    if not self.test_nsonly("-e", spath):
                        self.cmd_raises_nsonly(f"mkdir -p {spath}")
                    args.append(f"--mount=type=bind,src={spath},dst={s[1]}")
                continue

        for m in self.config.get("mounts", []):
            margs = ["type=" + m["type"]]
            for k, v in m.items():
                if k == "type":
                    continue
                if v:
                    if k in ("src", "source"):
                        v = os.path.abspath(
                            os.path.join(
                                os.path.dirname(self.unet.config["config_pathname"]), v
                            )
                        )
                        if not self.test_nsonly("-e", v):
                            self.cmd_raises_nsonly(f"mkdir -p {v}")
                    margs.append(f"{k}={v}")
                else:
                    margs.append(f"{k}")
            args.append("--mount=" + ",".join(margs))

        if args:
            # Need to work on a way to mount into live container too
            self.extra_mounts += args

    def has_run_cmd(self) -> bool:
        return True

    async def run_cmd(self):
        """Run the configured commands for this node."""
        self.logger.debug("%s: starting container", self.name)
        self.logger.debug(
            "[rundir %s exists %s]", self.rundir, os.path.exists(self.rundir)
        )

        self.container_id = f"{self.name}-{os.getpid()}"
        proc_path = self.unet.proc_path if self.unet else "/proc"
        cmds = [
            get_exec_path_host("podman"),
            "run",
            f"--name={self.container_id}",
            # f"--net=ns:/proc/{self.pid}/ns/net",
            f"--net=ns:{proc_path}/{self.pid}/ns/net",
            f"--hostname={self.name}",
            f"--add-host={self.name}:127.0.0.1",
            # We can't use --rm here b/c podman fails on "stop".
            # u"--rm",
        ]

        if self.config.get("init", True):
            cmds.append("--init")

        if self.config.get("privileged", False):
            cmds.append("--privileged")
            # If we don't do this then the host file system is remounted read-only on
            # exit!
            cmds.append("--systemd=false")
        else:
            cmds.extend(
                [
                    # "--cap-add=SYS_ADMIN",
                    "--cap-add=NET_ADMIN",
                    "--cap-add=NET_RAW",
                ]
            )

        # Add volumes:
        if self.extra_mounts:
            cmds += self.extra_mounts

        # Add environment variables:
        envdict = self.config.get("env", {})
        if envdict is None:
            envdict = {}
        for k, v in envdict.items():
            cmds.append(f"--env={k}={v}")

        # Update capabilities
        cmds += [f"--cap-add={x}" for x in self.config.get("cap-add", [])]
        cmds += [f"--cap-drop={x}" for x in self.config.get("cap-drop", [])]
        # cmds += [f"--expose={x.split(':')[0]}" for x in self.config.get("ports", [])]
        cmds += [f"--publish={x}" for x in self.config.get("ports", [])]

        # Add extra flags from user:
        if "podman" in self.config:
            for x in self.config["podman"].get("extra-args", []):
                cmds.append(x.strip())

        # shell_cmd is a union and can be boolean or string
        shell_cmd = self.config.get("shell", "/bin/bash")
        if not isinstance(shell_cmd, str):
            if shell_cmd:
                shell_cmd = "/bin/bash"
            else:
                shell_cmd = ""

        # Create shebang files, filled later on
        for key in ("cleanup-cmd", "ready-cmd"):
            shebang_cmd = self.config.get(key, "").strip()
            if shell_cmd and shebang_cmd:
                script_name = fsafe_name(key)
                # Will write the file contents out when the command is run
                shebang_cmdpath = os.path.join(self.rundir, f"{script_name}.shebang")
                await self.async_cmd_raises_nsonly(f"touch {shebang_cmdpath}")
                await self.async_cmd_raises_nsonly(f"chmod 755 {shebang_cmdpath}")
                cmds += [
                    # How can we override this?
                    # u'--entrypoint=""',
                    f"--volume={shebang_cmdpath}:/tmp/{script_name}.shebang",
                ]

        cmd = self.config.get("cmd", "").strip()

        # See if we have a custom update for this `kind`
        if kind := self.config.get("kind", None):
            if kind in kind_run_cmd_update:
                cmds, cmd = await kind_run_cmd_update[kind](self, shell_cmd, cmds, cmd)

        # Create running command file
        if shell_cmd and cmd:
            assert isinstance(cmd, str)
            # make cmd \n terminated for script
            cmd = cmd.rstrip()
            cmd = cmd.replace("%CONFIGDIR%", str(self.unet.config_dirname))
            cmd = cmd.replace("%RUNDIR%", str(self.rundir))
            cmd = cmd.replace("%NAME%", str(self.name))
            cmd += "\n"
            cmdpath = os.path.join(self.rundir, "cmd.shebang")
            with open(cmdpath, mode="w+", encoding="utf-8") as cmdfile:
                cmdfile.write(f"#!{shell_cmd}\n")
                cmdfile.write(cmd)
                cmdfile.flush()
            self.cmd_raises_nsonly(f"chmod 755 {cmdpath}")
            cmds += [
                # How can we override this?
                # u'--entrypoint=""',
                f"--volume={cmdpath}:/tmp/cmds.shebang",
                self.container_image,
                "/tmp/cmds.shebang",
            ]
        else:
            # `cmd` is a direct run (no shell) cmd
            cmds.append(self.container_image)
            if cmd:
                if isinstance(cmd, str):
                    cmds.extend(shlex.split(cmd))
                else:
                    cmds.extend(cmd)

            cmds = [
                x.replace("%CONFIGDIR%", str(self.unet.config_dirname)) for x in cmds
            ]
            cmds = [x.replace("%RUNDIR%", str(self.rundir)) for x in cmds]
            cmds = [x.replace("%NAME%", str(self.name)) for x in cmds]

        stdout = open(os.path.join(self.rundir, "cmd.out"), "wb")
        stderr = open(os.path.join(self.rundir, "cmd.err"), "wb")
        # Using nsonly avoids using `podman exec` to execute the cmds.
        self.cmd_p = await self.async_popen_nsonly(
            cmds,
            stdin=subprocess.DEVNULL,
            stdout=stdout,
            stderr=stderr,
            start_new_session=True,  # keeps main tty signals away from podman
        )

        # If our process is actually the child of an nsenter fetch its pid.
        if self.nsenter_fork:
            self.cmd_pid = await self.get_proc_child_pid(self.cmd_p)

        self.logger.debug(
            "%s: async_popen => %s (%s)", self, self.cmd_p.pid, self.cmd_pid
        )

        self.pytest_hook_run_cmd(stdout, stderr)

        # ---------------------------------------
        # Now let's wait until container shows up
        # ---------------------------------------
        timeout = Timeout(30)
        while self.cmd_p.returncode is None and not timeout.is_expired():
            o = await self.async_cmd_raises_nsonly(
                f"podman ps -q -f name={self.container_id}"
            )
            if o.strip():
                break
            elapsed = int(timeout.elapsed())
            if elapsed <= 3:
                await asyncio.sleep(0.1)
            else:
                self.logger.info("%s: run_cmd taking more than %ss", self, elapsed)
                await asyncio.sleep(1)
        if self.cmd_p.returncode is not None:
            # leave self.container_id set to cause exception on use
            self.logger.warning(
                "%s: run_cmd exited quickly (%ss) rc: %s",
                self,
                timeout.elapsed(),
                self.cmd_p.returncode,
            )
        elif timeout.is_expired():
            self.logger.critical(
                "%s: timeout (%ss) waiting for container to start",
                self.name,
                timeout.elapsed(),
            )
            assert not timeout.is_expired()

        #
        # Set our precmd for executing in the container
        #
        self.__base_cmd = [
            get_exec_path_host("podman"),
            "exec",
            f"-eMUNET_RUNDIR={self.unet.rundir}",
            f"-eMUNET_NODENAME={self.name}",
            "-i",
        ]
        self.__base_cmd_pty = list(self.__base_cmd)  # copy list to pty
        self.__base_cmd.append(self.container_id)  # end regular list
        self.__base_cmd_pty.append("-t")  # add pty flags
        self.__base_cmd_pty.append(self.container_id)  # end pty list
        # self.set_pre_cmd(self.__base_cmd, self.__base_cmd_pty)  # set both pre_cmd

        self.logger.info("%s: started container", self.name)

        self.pytest_hook_open_shell()

        return self.cmd_p

    async def async_cleanup_cmd(self):
        """Run the configured cleanup commands for this node."""
        self.cleanup_called = True

        if "cleanup-cmd" not in self.config:
            return

        if not self.cmd_p:
            self.logger.warning("async_cleanup_cmd: container no longer running")
            return

        return await self._async_cleanup_cmd()

    def cmd_completed(self, future):
        try:
            log = self.logger.debug if self.deleting else self.logger.warning
            n = future.result()
            if self.deleting:
                log("contianer `cmd:` result: %s", n)
            else:
                log(
                    "contianer `cmd:` exited early, "
                    "try adding `tail -f /dev/null` to `cmd:`, result: %s",
                    n,
                )
        except asyncio.CancelledError as error:
            # Should we stop the container if we have one? or since we are canceled
            # we know we will be deleting soon?
            self.logger.warning(
                "node container cmd wait() canceled: %s:%s", future, error
            )
        self.cmd_p = None

    async def _async_delete(self):
        self.logger.debug("%s: deleting", self)

        if contid := self.container_id:
            try:
                if not self.cleanup_called:
                    self.logger.debug("calling user cleanup cmd")
                    await self.async_cleanup_cmd()
            except Exception as error:
                self.logger.warning(
                    "Got an error during delete from async_cleanup_cmd: %s", error
                )

            # Clear the container_id field we want to act like a namespace now.
            self.container_id = None

            o = ""
            e = ""
            if self.cmd_p:
                self.logger.debug("podman stop on container: %s", contid)
                if (rc := self.cmd_p.returncode) is None:
                    rc, o, e = await self.async_cmd_status_nsonly(
                        [get_exec_path_host("podman"), "stop", "--time=2", contid]
                    )
                if rc and rc < 128:
                    self.logger.warning(
                        "%s: podman stop on cmd failed: %s",
                        self,
                        cmd_error(rc, o, e),
                    )
                else:
                    # It's gone
                    self.cmd_p = None

            # now remove the container
            self.logger.debug("podman rm on container: %s", contid)
            rc, o, e = await self.async_cmd_status_nsonly(
                [get_exec_path_host("podman"), "rm", contid]
            )
            if rc:
                self.logger.warning(
                    "%s: podman rm failed: %s", self, cmd_error(rc, o, e)
                )
            else:
                self.logger.debug(
                    "podman removed container %s: %s", contid, cmd_error(rc, o, e)
                )

        await super()._async_delete()


class L3QemuVM(L3NodeMixin, LinuxNamespace):
    """An VM (qemu) based L3 node."""

    def __init__(self, name, config, **kwargs):
        """Create a Container Node."""
        self.cont_exec_paths = {}
        self.launch_p = None
        self.launch_pid = None
        self.qemu_config = config["qemu"]
        self.extra_mounts = []
        assert self.qemu_config
        self.cmdrepl = None
        self.conrepl = None
        self.is_kvm = False
        self.monrepl = None
        self.tapfds = {}
        self.cpu_thread_map = {}

        self.tapnames = {}

        self.use_ssh = False
        self.__base_cmd = []
        self.__base_cmd_pty = []

        super().__init__(name=name, config=config, pid=False, **kwargs)

        self.sockdir = self.rundir.joinpath("s")
        self.cmd_raises(f"mkdir -p {self.sockdir}")

        self.qemu_config = config_subst(
            self.qemu_config,
            name=self.name,
            rundir=os.path.join(self.rundir, self.name),
            configdir=self.unet.config_dirname,
        )
        self.ssh_keyfile = self.qemu_config.get("sshkey")

    @property
    def is_vm(self):
        return True

    def __setup_ssh(self):
        if not self.ssh_keyfile:
            self.logger.warning("%s: No sshkey config", self)
            return False
        if not self.mgmt_ip and not self.mgmt_ip6:
            self.logger.warning("%s: No mgmt IP to ssh to", self)
            return False
        mgmt_ip = self.mgmt_ip if self.mgmt_ip else self.mgmt_ip6

        #
        # Since we have a keyfile shouldn't need to sudo
        # self.user = os.environ.get("SUDO_USER", "")
        # if not self.user:
        #     self.user = getpass.getuser()
        # self.__base_cmd = [
        #     get_exec_path_host("sudo"),
        #     "-E",
        #     f"-u{self.user}",
        #     get_exec_path_host("ssh"),
        # ]
        #
        port = 22
        self.__base_cmd = [get_exec_path_host("ssh")]
        if port != 22:
            self.__base_cmd.append(f"-p{port}")
        self.__base_cmd.append("-i")
        self.__base_cmd.append(self.ssh_keyfile)
        self.__base_cmd.append("-q")
        self.__base_cmd.append("-oStrictHostKeyChecking=no")
        self.__base_cmd.append("-oUserKnownHostsFile=/dev/null")
        # Would be nice but has to be accepted by server config so not very useful.
        # self.__base_cmd.append("-oSendVar='TEST'")
        self.__base_cmd_pty = list(self.__base_cmd)
        self.__base_cmd_pty.append("-t")

        user = self.qemu_config.get("sshuser", "root")
        self.__base_cmd.append(f"{user}@{mgmt_ip}")
        self.__base_cmd.append("--")
        self.__base_cmd_pty.append(f"{user}@{mgmt_ip}")
        # self.__base_cmd_pty.append("--")
        return True

    def _get_cmd_as_list(self, cmd):
        """Given a list or string return a list form for execution.

        If cmd is a string then [cmd] is returned, for most other
        node types ["bash", "-c", cmd] is returned but in our case
        ssh is the shell.

        Args:
            cmd: list or string representing the command to execute.
            str_shell: if True and `cmd` is a string then run the
              command using bash -c
        Returns:
            list of commands to execute.
        """
        if self.use_ssh and self.launch_p:
            return [cmd] if isinstance(cmd, str) else cmd
        return super()._get_cmd_as_list(cmd)

    def _get_pre_cmd(self, use_str, use_pty, ns_only=False, root_level=False, **kwargs):
        if ns_only:
            return super()._get_pre_cmd(
                use_str, use_pty, ns_only=True, root_level=root_level, **kwargs
            )

        if not self.launch_p:
            self.logger.debug("%s: Running command in namespace b/c no VM", self)
            return super()._get_pre_cmd(
                use_str, use_pty, ns_only=True, root_level=root_level, **kwargs
            )

        if not self.use_ssh:
            self.logger.debug(
                "%s: Running command in namespace b/c no SSH configured", self
            )
            return super()._get_pre_cmd(
                use_str, use_pty, ns_only=True, root_level=root_level, **kwargs
            )

        pre_cmd = self.unet._get_pre_cmd(use_str, use_pty, ns_only=True)

        # This is going to run in the process namespaces.
        # We really want it to run in the munet namespace which will
        # be different unless unshare_inline was used.
        #
        # XXX grab the env from kwargs and add to podman exec
        # env = kwargs.get("env", {})
        if use_pty:
            pre_cmd = pre_cmd + self.__base_cmd_pty
        else:
            pre_cmd = pre_cmd + self.__base_cmd
        return shlex.join(pre_cmd) if use_str else pre_cmd

    async def moncmd(self):
        """Uses internal REPL to send cmmand to qemu monitor and get reply."""

    def tmpfs_mount(self, inner):
        # eventually would be nice to support live mounting
        self.logger.debug("Mounting tmpfs on %s", inner)
        self.extra_mounts.append(("", inner, ""))

    #
    # bind_mount is actually being used to mount into the namespace
    #
    # def bind_mount(self, outer, inner):
    #     # eventually would be nice to support live mounting
    #     assert not self.container_id
    #     if self.test_host("-f", outer):
    #         self.logger.warning("Can't bind mount files with L3QemuVM: %s", outer)
    #         return
    #     self.logger.debug("Bind mounting %s on %s", outer, inner)
    #     if not self.test_host("-e", outer):
    #         self.cmd_raises(f"mkdir -p {outer}")
    #     self.extra_mounts.append((outer, inner, ""))

    def mount_volumes(self):
        """Mount volumes from the config."""
        args = []
        for m in self.config.get("volumes", []):
            if not isinstance(m, str):
                continue
            s = m.split(":", 1)
            if len(s) == 1:
                args.append(("", s[0], ""))
            else:
                spath = s[0]
                spath = os.path.abspath(
                    os.path.join(
                        os.path.dirname(self.unet.config["config_pathname"]), spath
                    )
                )
                if not self.test_nsonly("-e", spath):
                    self.cmd_raises_nsonly(f"mkdir -p {spath}")
                args.append((spath, s[1], ""))

        for m in self.config.get("mounts", []):
            src = m.get("src", m.get("source", ""))
            if src:
                src = os.path.abspath(
                    os.path.join(
                        os.path.dirname(self.unet.config["config_pathname"]), src
                    )
                )
                if not self.test_nsonly("-e", src):
                    self.cmd_raises_nsonly(f"mkdir -p {src}")
            dst = m.get("dst", m.get("destination"))
            assert dst, "destination path required for mount"

            margs = []
            for k, v in m.items():
                if k in ["destination", "dst", "source", "src"]:
                    continue
                if k == "type":
                    assert v in ["bind", "tmpfs"]
                    continue
                if not v:
                    margs.append(k)
                else:
                    margs.append(f"{k}={v}")
            args.append((src, dst, ",".join(margs)))

        if args:
            self.extra_mounts += args

    async def run_cmd(self):
        """Run the configured commands for this node inside VM."""
        self.logger.debug(
            "[rundir %s exists %s]", self.rundir, os.path.exists(self.rundir)
        )

        cmd = self.config.get("cmd", "").strip()
        if not cmd:
            self.logger.debug("%s: no `cmd` to run", self)
            return None

        shell_cmd = self.config.get("shell", "/bin/bash")
        if not isinstance(shell_cmd, str):
            if shell_cmd:
                shell_cmd = "/bin/bash"
            else:
                shell_cmd = ""

        if shell_cmd:
            cmd = cmd.rstrip()
            cmd = f"#!{shell_cmd}\n" + cmd
            cmd = cmd.replace("%CONFIGDIR%", str(self.unet.config_dirname))
            cmd = cmd.replace("%RUNDIR%", str(self.rundir))
            cmd = cmd.replace("%NAME%", str(self.name))
            cmd += "\n"

            # Write a copy to the rundir
            cmdpath = os.path.join(self.rundir, "cmd.shebang")
            with open(cmdpath, mode="w+", encoding="utf-8") as cmdfile:
                cmdfile.write(cmd)
            commander.cmd_raises(f"chmod 755 {cmdpath}")

            # Now write a copy inside the VM
            self.conrepl.cmd_status("cat > /tmp/cmd.shebang << EOF\n" + cmd + "\nEOF")
            self.conrepl.cmd_status("chmod 755 /tmp/cmd.shebang")
            cmds = "/tmp/cmd.shebang"
        else:
            cmd = cmd.replace("%CONFIGDIR%", str(self.unet.config_dirname))
            cmd = cmd.replace("%RUNDIR%", str(self.rundir))
            cmd = cmd.replace("%NAME%", str(self.name))
            cmds = cmd

        # class future_proc:
        #     """Treat awaitable minimally as a proc."""
        #     def __init__(self, aw):
        #         self.aw = aw
        #         # XXX would be nice to have a real value here
        #         self.returncode = 0
        #     async def wait(self):
        #         if self.aw:
        #             return await self.aw
        #         return None

        class now_proc:
            """Treat awaitable minimally as a proc."""

            def __init__(self, output):
                self.output = output
                self.returncode = 0

            async def wait(self):
                return self.output

        if self.cmdrepl:
            # self.cmd_p = future_proc(
            #     # We need our own console here b/c this is async and not returning
            #     # immediately
            #     # self.cmdrepl.run_command(cmds, timeout=120, async_=True)
            #     self.cmdrepl.run_command(cmds, timeout=120)
            # )

            # When run_command supports async_ arg we can use the above...
            self.cmd_p = now_proc(self.cmdrepl.run_command(cmds, timeout=120))

            # stdout and err both combined into logfile from the spawned repl
            stdout = os.path.join(self.rundir, "_cmdcon-log.txt")
            self.pytest_hook_run_cmd(stdout, None)
        else:
            # If we only have a console we can't run in parallel, so run to completion
            self.cmd_p = now_proc(self.conrepl.run_command(cmds, timeout=120))

        return self.cmd_p

    # InterfaceMixin override
    # We need a name unique in the shared namespace.
    def get_ns_ifname(self, ifname):
        return self.name + ifname

    async def add_host_intf(self, hname, lname, mtu=None):
        # L3QemuVM needs it's own add_host_intf for macvtap, We need to create the tap
        # in the host then move that interface so that the ifindex/devfile are
        # different.

        if hname in self.host_intfs:
            return

        self.host_intfs[hname] = lname
        index = len(self.host_intfs)

        tapindex = self.unet.tapcount
        self.unet.tapcount = self.unet.tapcount + 1

        tapname = f"tap{tapindex}"
        self.tapnames[hname] = tapname

        mac = f"02:bb:bb:bb:{index:02x}:{self.id:02x}"
        self.tapmacs[hname] = mac

        self.unet.rootcmd.cmd_raises(
            f"ip link add link {hname} name {tapname} type macvtap"
        )
        if mtu:
            self.unet.rootcmd.cmd_raises(f"ip link set {tapname} mtu {mtu}")
        self.unet.rootcmd.cmd_raises(f"ip link set {tapname} address {mac} up")
        ifindex = self.unet.rootcmd.cmd_raises(
            f"cat /sys/class/net/{tapname}/ifindex"
        ).strip()
        # self.unet.rootcmd.cmd_raises(f"ip link set {tapname} netns {self.pid}")

        tapfile = f"/dev/tap{ifindex}"
        fd = os.open(tapfile, os.O_RDWR)
        self.tapfds[hname] = fd
        self.logger.info(
            "%s: Add host intf: created macvtap interface %s (%s) on %s fd %s",
            self,
            tapname,
            tapfile,
            hname,
            fd,
        )

    async def rem_host_intf(self, hname):
        tapname = self.tapnames[hname]
        self.unet.rootcmd.cmd_raises(f"ip link set {tapname} down")
        self.unet.rootcmd.cmd_raises(f"ip link delete {tapname} type macvtap")
        del self.tapnames[hname]
        del self.host_intfs[hname]

    async def create_tap(self, index, ifname, mtu=None, driver="virtio-net-pci"):
        # XXX we shouldn't be doign a tap on a bridge with a veth
        # we should just be using a tap created earlier which was connected to the
        # bridge. Except we need to handle the case of p2p qemu <-> namespace
        #
        ifname = self.get_ns_ifname(ifname)
        brname = f"{self.name}br{index}"

        tapindex = self.unet.tapcount
        self.unet.tapcount += 1

        mac = f"02:aa:aa:aa:{index:02x}:{self.id:02x}"
        # nic = "tap,model=virtio-net-pci"
        # qemu -net nic,model=virtio,addr=1a:46:0b:ca:bc:7b -net tap,fd=3 3<>/dev/tap11
        self.cmd_raises(f"ip address flush dev {ifname}")
        self.cmd_raises(f"ip tuntap add tap{tapindex} mode tap")
        self.cmd_raises(f"ip link add name {brname} type bridge")
        self.cmd_raises(f"ip link set dev {ifname} master {brname}")
        self.cmd_raises(f"ip link set dev tap{tapindex} master {brname}")
        if mtu:
            self.cmd_raises(f"ip link set dev tap{tapindex} mtu {mtu}")
            self.cmd_raises(f"ip link set dev {ifname} mtu {mtu}")
        self.cmd_raises(f"ip link set dev tap{tapindex} up")
        self.cmd_raises(f"ip link set dev {ifname} up")
        self.cmd_raises(f"ip link set dev {brname} up")
        dev = f"{driver},netdev=n{index},mac={mac}"
        return [
            "-netdev",
            f"tap,id=n{index},ifname=tap{tapindex},script=no,downscript=no",
            "-device",
            dev,
        ]

    async def mount_mounts(self):
        """Mount any shared directories."""
        self.logger.info("Mounting shared directories")
        con = self.conrepl
        for i, m in enumerate(self.extra_mounts):
            outer, mp, uargs = m
            if not outer:
                con.cmd_raises(f"mkdir -p {mp}")
                margs = f"-o {uargs}" if uargs else ""
                con.cmd_raises(f"mount {margs} -t tmpfs tmpfs {mp}")
                continue

            uargs = "" if uargs is None else uargs
            margs = "trans=virtio"
            if uargs:
                margs += f",{uargs}"
            self.logger.info("Mounting %s on %s with %s", outer, mp, margs)
            con.cmd_raises(f"mkdir -p {mp}")
            con.cmd_raises(f"mount -t 9p -o {margs} shared{i} {mp}")

    async def renumber_interfaces(self):
        """Re-number the interfaces.

        After VM comes up need to renumber the interfaces now on the inside.
        """
        self.logger.info("Renumbering interfaces")
        con = self.conrepl
        con.cmd_raises("sysctl -w net.ipv4.ip_forward=1")
        if self.unet.ipv6_enable:
            self.cmd_raises("sysctl -w net.ipv6.conf.all.forwarding=1")
        for ifname in sorted(self.intfs):
            conn = find_with_kv(self.config.get("connections"), "name", ifname)
            to = conn["to"]
            switch = self.unet.switches.get(to)
            mtu = conn.get("mtu")
            if not mtu and switch:
                mtu = switch.config.get("mtu")
            if mtu:
                con.cmd_raises(f"ip link set {ifname} mtu {mtu}")
            con.cmd_raises(f"ip link set {ifname} up")
            # In case there was some preconfig e.g., cloud-init
            con.cmd_raises(f"ip -4 addr flush dev {ifname}")
            sw_is_nat = switch and hasattr(switch, "is_nat") and switch.is_nat
            if ifaddr := self.get_intf_addr(ifname, ipv6=False):
                con.cmd_raises(f"ip addr add {ifaddr} dev {ifname}")
                if sw_is_nat:
                    # In case there was some preconfig e.g., cloud-init
                    con.cmd_raises("ip route flush exact default")
                    con.cmd_raises(f"ip route add default via {switch.ip_address}")
            if ifaddr := self.get_intf_addr(ifname, ipv6=True):
                con.cmd_raises(f"ip -6 addr add {ifaddr} dev {ifname}")
                if sw_is_nat:
                    # In case there was some preconfig e.g., cloud-init
                    con.cmd_raises("ip -6 route flush exact default")
                    con.cmd_raises(f"ip -6 route add default via {switch.ip6_address}")
        con.cmd_raises("ip link set lo up")

        # This is already mounted now
        # if self.unet.cfgopt.getoption("--coverage"):
        #     con.cmd_raises("mount -t debugfs none /sys/kernel/debug")

    async def gather_coverage_data(self):
        con = self.conrepl

        gcda = "/sys/kernel/debug/gcov"
        tmpdir = con.cmd_raises("mktemp -d").strip()
        dest = "/gcov-data.tgz"
        con.cmd_raises(rf"find {gcda} -type d -exec mkdir -p {tmpdir}/{{}} \;")
        con.cmd_raises(
            rf"find {gcda} -name '*.gcda' -exec sh -c 'cat < $0 > {tmpdir}/$0' {{}} \;"
        )
        con.cmd_raises(
            rf"find {gcda} -name '*.gcno' -exec sh -c 'cp -d $0 {tmpdir}/$0' {{}} \;"
        )
        con.cmd_raises(rf"tar cf - -C {tmpdir} sys | gzip -c > {dest}")
        con.cmd_raises(rf"rm -rf {tmpdir}")
        self.logger.info("Saved coverage data in VM at %s", dest)
        if self.use_ssh:
            ldest = os.path.join(self.rundir, "gcov-data.tgz")
            self.cmd_raises(["/bin/cat", dest], stdout=open(ldest, "wb"))
            self.logger.info("Saved coverage data on host at %s", ldest)

    async def _opencons(
        self,
        *cnames,
        prompt=None,
        is_bourne=True,
        user="root",
        password="",
        expects=None,
        sends=None,
        timeout=-1,
    ):
        """Open consoles based on socket file names."""
        timeo = Timeout(timeout)
        cons = []
        for cname in cnames:
            sockpath = os.path.join(self.sockdir, cname)
            connected = False
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            while self.launch_p.returncode is None and not timeo.is_expired():
                try:
                    sock.connect(sockpath)
                    connected = True
                    break
                except OSError as error:
                    if error.errno == errno.ENOENT:
                        self.logger.debug("waiting for console socket: %s", sockpath)
                    else:
                        self.logger.warning(
                            "can't open console socket: %s", error.strerror
                        )
                        raise
                elapsed = int(timeo.elapsed())
                if elapsed <= 3:
                    await asyncio.sleep(0.25)
                else:
                    self.logger.info(
                        "%s: launch (qemu) taking more than %ss", self, elapsed
                    )
                    await asyncio.sleep(1)

            if connected:
                if prompt is None:
                    prompt = r"(^|\r\n)[^#\$]*[#\$] "
                cons.append(
                    await self.console(
                        sock,
                        prompt=prompt,
                        is_bourne=is_bourne,
                        user=user,
                        password=password,
                        use_pty=False,
                        logfile_prefix=cname,
                        will_echo=True,
                        expects=expects,
                        sends=sends,
                        timeout=timeout,
                        trace=True,
                    )
                )
            elif self.launch_p.returncode is not None:
                self.logger.warning(
                    "%s: launch (qemu) exited quickly (%ss) rc: %s",
                    self,
                    timeo.elapsed(),
                    self.launch_p.returncode,
                )
                raise Exception("Qemu launch exited early")
            elif timeo.is_expired():
                self.logger.critical(
                    "%s: timeout (%ss) waiting for qemu to start",
                    self,
                    timeo.elapsed(),
                )
                assert not timeo.is_expired()

        return cons

    async def set_cpu_affinity(self, afflist):
        for i, aff in enumerate(afflist):
            if not aff:
                continue
            # affmask = convert_ranges_to_bitmask(aff)
            if i not in self.cpu_thread_map:
                logging.warning("affinity %s given for missing vcpu %s", aff, i)
                continue
            logging.info("setting vcpu %s affinity to %s", i, aff)
            tid = self.cpu_thread_map[i]
            self.cmd_raises_nsonly(f"taskset -cp {aff} {tid}")

    async def launch(self):
        """Launch qemu."""
        self.logger.info("%s: Launch Qemu", self)

        qc = self.qemu_config
        cc = qc.get("console", {})
        bootd = "d" if "iso" in qc else "c"
        # args = [get_exec_path_host("qemu-system-x86_64"),
        #         "-nodefaults", "-boot", bootd]
        args = [get_exec_path_host("qemu-system-x86_64"), "-boot", bootd]

        args += ["-machine", "q35"]

        if qc.get("kvm"):
            rc, _, e = await self.async_cmd_status_nsonly("ls -l /dev/kvm")
            if rc:
                self.logger.warning("Can't enable KVM no /dev/kvm: %s", e)
            else:
                # [args += ["-enable-kvm", "-cpu", "host"]
                # uargs += ["-accel", "kvm", "-cpu", "Icelake-Server-v5"]
                args += ["-accel", "kvm", "-cpu", "host"]

        if ncpu := qc.get("ncpu"):
            # args += ["-smp", f"sockets={ncpu}"]
            args += ["-smp", f"cores={ncpu}"]
            # args += ["-smp", f"{ncpu},sockets={ncpu},cores=1,threads=1"]

        args.extend(["-m", str(qc.get("memory", "512M"))])

        if "bios" in qc:
            if qc["bios"] == "open-firmware":
                args.extend(["-bios", "/usr/share/qemu/OVMF.fd"])
            else:
                args.extend(["-bios", qc["bios"]])
        if "kernel" in qc:
            args.extend(["-kernel", qc["kernel"]])
        if "initrd" in qc:
            args.extend(["-initrd", qc["initrd"]])
        if "iso" in qc:
            args.extend(["-cdrom", qc["iso"]])

        # we only have append if we have a kernel
        if "kernel" in qc:
            args.append("-append")
            root = qc.get("root", "/dev/ram0")
            # Only 1 serial console the other ports (ttyS[123] hvc[01]) should have
            # gettys in inittab
            append = f"root={root} rw console=ttyS0"
            if "cmdline-extra" in qc:
                append += f" {qc['cmdline-extra']}"
            args.append(append)

        if "extra-args" in qc:
            if isinstance(qc["extra-args"], list):
                args.extend(qc["extra-args"])
            else:
                args.extend(shlex.split(qc["extra-args"]))

        # Walk the list of connections in order so we attach them the same way
        pass_fds = []
        nnics = 0
        pciaddr = 3
        for index, conn in enumerate(self.config["connections"]):
            devaddr = conn.get("physical", "")
            hostintf = conn.get("hostintf", "")
            if devaddr:
                # if devaddr in self.tapmacs:
                #     mac = f",mac={self.tapmacs[devaddr]}"
                # else:
                #     mac = ""
                args += ["-device", f"vfio-pci,host={devaddr},addr={pciaddr}"]
            elif hostintf:
                fd = self.tapfds[hostintf]
                mac = self.tapmacs[hostintf]
                args += [
                    "-nic",
                    f"tap,model=virtio-net-pci,mac={mac},fd={fd},addr={pciaddr}",
                ]
                pass_fds.append(fd)
                nnics += 1
            elif not hostintf:
                driver = conn.get("driver", "virtio-net-pci")
                mtu = conn.get("mtu")
                if not mtu and conn["to"] in self.unet.switches:
                    mtu = self.unet.switches[conn["to"]].config.get("mtu")
                tapargs = await self.create_tap(
                    index, conn["name"], mtu=mtu, driver=driver
                )
                tapargs[-1] += f",addr={pciaddr}"
                args += tapargs
                nnics += 1
            pciaddr += 1
        if not nnics:
            args += ["-nic", "none"]

        dtpl = qc.get("disk-template")
        diskpath = disk = qc.get("disk")
        if dtpl and not disk:
            disk = qc["disk"] = f"{self.name}-{os.path.basename(dtpl)}"
            diskpath = os.path.join(self.rundir, disk)
            if self.path_exists(diskpath):
                logging.debug("Disk '%s' file exists, using.", diskpath)
            else:
                dtplpath = os.path.abspath(
                    os.path.join(
                        os.path.dirname(self.unet.config["config_pathname"]), dtpl
                    )
                )
                logging.info("Create disk '%s' from template '%s'", diskpath, dtplpath)
                self.cmd_raises(
                    f"qemu-img create -f qcow2 -F qcow2 -b {dtplpath} {diskpath}"
                )

        if diskpath:
            args.extend(
                ["-drive", f"file={diskpath},if=none,id=sata-disk0,format=qcow2"]
            )
            args.extend(["-device", "ahci,id=ahci"])
            args.extend(["-device", "ide-hd,bus=ahci.0,drive=sata-disk0"])

        use_stdio = cc.get("stdio", True)
        has_cmd = self.config.get("cmd")
        use_cmdcon = has_cmd and use_stdio

        #
        # Any extra serial/console ports beyond thw first, require entries in
        # inittab to have getty running on them, modify inittab
        #
        # Use -serial stdio for output only, and as the first serial console
        # which kernel uses for printk, as it has serious issues with dropped
        # input chars for some reason.
        #
        # 4 serial ports (max), we'll add extra ports using virtual consoles.
        _sd = self.sockdir
        if use_stdio:
            args += ["-serial", "stdio"]
        args += ["-serial", f"unix:{_sd}/_console,server,nowait"]
        if use_cmdcon:
            args += [
                "-serial",
                f"unix:{_sd}/_cmdcon,server,nowait",
            ]
        args += [
            "-serial",
            f"unix:{_sd}/console,server,nowait",
            # A 2 virtual consoles - /dev/hvc[01]
            # Requires CONFIG_HVC_DRIVER=y CONFIG_VIRTIO_CONSOLE=y
            "-device",
            "virtio-serial",  # serial console bus
            "-chardev",
            f"socket,path={_sd}/vcon0,server=on,wait=off,id=vcon0",
            "-chardev",
            f"socket,path={_sd}/vcon1,server=on,wait=off,id=vcon1",
            "-device",
            "virtconsole,chardev=vcon0",
            "-device",
            "virtconsole,chardev=vcon1",
            # 2 monitors
            "-monitor",
            f"unix:{_sd}/_monitor,server,nowait",
            "-monitor",
            f"unix:{_sd}/monitor,server,nowait",
            "-gdb",
            f"unix:{_sd}/gdbserver,server,nowait",
        ]

        for i, m in enumerate(self.extra_mounts):
            args += [
                "-virtfs",
                f"local,path={m[0]},mount_tag=shared{i},security_model=passthrough",
            ]

        args += ["-nographic"]

        #
        # Launch Qemu
        #

        stdout = open(os.path.join(self.rundir, "qemu.out"), "wb")
        stderr = open(os.path.join(self.rundir, "qemu.err"), "wb")
        self.launch_p = await self.async_popen_nsonly(
            args,
            stdin=subprocess.DEVNULL,
            stdout=stdout,
            stderr=stderr,
            pass_fds=pass_fds,
            # Don't want Keybaord interrupt etc to pass to child.
            # start_new_session=True,
            preexec_fn=os.setsid,
        )

        if self.nsenter_fork:
            self.launch_pid = await self.get_proc_child_pid(self.launch_p)

        self.pytest_hook_run_cmd(stdout, stderr)

        # We've passed these on, so don't need these open here anymore.
        for fd in pass_fds:
            os.close(fd)

        self.logger.debug(
            "%s: popen => %s (%s)", self, self.launch_p.pid, self.launch_pid
        )

        confiles = ["_console"]
        if use_cmdcon:
            confiles.append("_cmdcon")

        #
        # Connect to the console socket, retrying
        #
        prompt = cc.get("prompt")
        cons = await self._opencons(
            *confiles,
            prompt=prompt,
            is_bourne=not bool(prompt),
            user=cc.get("user", "root"),
            password=cc.get("password", ""),
            expects=cc.get("expects"),
            sends=cc.get("sends"),
            timeout=int(cc.get("timeout", 60)),
        )
        self.conrepl = cons[0]
        if use_cmdcon:
            self.cmdrepl = cons[1]
        self.monrepl = await self.monitor(os.path.join(self.sockdir, "_monitor"))

        # the monitor output has super annoying ANSI escapes in it

        output = self.monrepl.cmd_nostatus("info status")
        self.logger.debug("VM status: %s", output)

        output = self.monrepl.cmd_nostatus("info kvm")
        self.logger.debug("KVM status: %s", output)

        #
        # Set thread affinity
        #
        output = self.monrepl.cmd_nostatus("info cpus")
        matches = re.findall(r"CPU #(\d+): *thread_id=(\d+)", output)
        self.cpu_thread_map = {int(k): int(v) for k, v in matches}
        if cpuaff := self.qemu_config.get("cpu-affinity"):
            await self.set_cpu_affinity(cpuaff)

        self.is_kvm = "disabled" not in output

        if qc.get("unix-os", True):
            await self.renumber_interfaces()

        if self.extra_mounts:
            await self.mount_mounts()

        self.use_ssh = bool(self.ssh_keyfile)
        if self.use_ssh:
            self.use_ssh = self.__setup_ssh()

        self.pytest_hook_open_shell()

        return self.launch_p

    def launch_completed(self, future):
        self.logger.debug("%s: launch (qemu) completed called", self)
        self.use_ssh = False
        try:
            n = future.result()
            self.logger.debug("%s: node launch (qemu) completed result: %s", self, n)
        except asyncio.CancelledError as error:
            self.logger.debug(
                "%s: node launch (qemu) cmd wait() canceled: %s", future, error
            )

    async def async_cleanup_cmd(self):
        """Run the configured cleanup commands for this node."""
        self.cleanup_called = True

        if "cleanup-cmd" not in self.config:
            return

        if not self.launch_p:
            self.logger.warning("async_cleanup_cmd: qemu no longer running")
            return

        raise NotImplementedError("Needs to be like run_cmd")
        # return await self._async_cleanup_cmd()

    async def _async_delete(self):
        self.logger.debug("%s: deleting", self)

        # Need to cleanup early b/c it is running on the VM
        if self.cmd_p:
            await self.async_cleanup_proc(self.cmd_p, self.cmd_pid)
            self.cmd_p = None

        try:
            # Need to cleanup early b/c it is running on the VM
            if not self.cleanup_called:
                await self.async_cleanup_cmd()
        except Exception as error:
            self.logger.warning(
                "Got an error during delete from async_cleanup_cmd: %s", error
            )

        try:
            if not self.launch_p:
                self.logger.warning("async_delete: qemu is not running")
            else:
                await self.async_cleanup_proc(self.launch_p, self.launch_pid)
        except Exception as error:
            self.logger.warning("%s: failed to cleanup qemu process: %s", self, error)

        await super()._async_delete()


class Munet(BaseMunet):
    """Munet."""

    def __init__(
        self,
        rundir=None,
        config=None,
        pid=True,
        logger=None,
        **kwargs,
    ):
        # logging.warning("Munet")

        if not rundir:
            rundir = "/tmp/munet"

        if logger is None:
            logger = logging.getLogger("munet.unet")

        super().__init__("munet", pid=pid, rundir=rundir, logger=logger, **kwargs)

        self.built = False
        self.tapcount = 0

        self.cmd_raises(f"mkdir -p {self.rundir} && chmod 755 {self.rundir}")
        self.set_ns_cwd(self.rundir)

        if not config:
            config = {}
        self.config = config
        if "config_pathname" in config:
            self.config_pathname = os.path.realpath(config["config_pathname"])
            self.config_dirname = os.path.dirname(self.config_pathname)
        else:
            self.config_pathname = ""
            self.config_dirname = ""

        # Done in BaseMunet now
        # # We need some way to actually get back to the root namespace
        # if not self.isolated:
        #     self.rootcmd = commander
        # else:
        #     spid = str(pid)
        #     nsflags = (f"--mount={self.proc_path / spid / 'ns/mnt'}",
        #                f"--net={self.proc_path / spid / 'ns/net'}",
        #                f"--uts={self.proc_path / spid / 'ns/uts'}",
        #                f"--ipc={self.proc_path / spid / 'ns/ipc'}",
        #                f"--cgroup={self.proc_path / spid / 'ns/cgroup'}",
        #                f"--pid={self.proc_path / spid / 'ns/net'}",
        #     self.rootcmd = SharedNamespace("host", pid=1, nsflags=nsflags)

        # Save the namespace pid
        with open(os.path.join(self.rundir, "nspid"), "w", encoding="ascii") as f:
            f.write(f"{self.pid}\n")

        with open(os.path.join(self.rundir, "nspids"), "w", encoding="ascii") as f:
            f.write(f'{" ".join([str(x) for x in self.pids])}\n')

        hosts_file = os.path.join(self.rundir, "hosts.txt")
        with open(hosts_file, "w", encoding="ascii") as hf:
            hf.write(
                f"""127.0.0.1\tlocalhost {self.name}
::1\tip6-localhost ip6-loopback
fe00::0\tip6-localnet
ff00::0\tip6-mcastprefix
ff02::1\tip6-allnodes
ff02::2\tip6-allrouters
"""
            )
        self.bind_mount(hosts_file, "/etc/hosts")

        # Common CLI commands for any topology
        cdict = {
            "commands": [
                {
                    "name": "pcap",
                    "format": "pcap NETWORK",
                    "help": (
                        "capture packets from NETWORK into file capture-NETWORK.pcap"
                        " the command is run within a new window which also shows"
                        " packet summaries. NETWORK can also be an interface specified"
                        " as HOST:INTF. To capture inside the host namespace."
                    ),
                    "exec": "tshark -s 9200 -i {0} -P -w capture-{0}.pcap",
                    "top-level": True,
                    "new-window": {"background": True},
                },
                {
                    "name": "nsterm",
                    "format": "nsterm HOST [HOST ...]",
                    "help": (
                        "open terminal[s] in the namespace only"
                        " (outside containers or VM), * for all"
                    ),
                    "exec": "bash",
                    "new-window": {"ns_only": True},
                },
                {
                    "name": "term",
                    "format": "term HOST [HOST ...]",
                    "help": "open terminal[s] (TMUX or XTerm) on HOST[S], * for all",
                    "exec": "bash",
                    "new-window": True,
                },
                {
                    "name": "xterm",
                    "format": "xterm HOST [HOST ...]",
                    "help": "open XTerm[s] on HOST[S], * for all",
                    "exec": "bash",
                    "new-window": {
                        "forcex": True,
                    },
                },
                {
                    "name": "sh",
                    "format": "[HOST ...] sh <SHELL-COMMAND>",
                    "help": "execute <SHELL-COMMAND> on hosts",
                    "exec": "{}",
                },
                {
                    "name": "shi",
                    "format": "[HOST ...] shi <INTERACTIVE-COMMAND>",
                    "help": "execute <INTERACTIVE-COMMAND> on HOST[s]",
                    "exec": "{}",
                    "interactive": True,
                },
                {
                    "name": "stdout",
                    "exec": (
                        "[ -e %RUNDIR%/qemu.out ] && tail -F %RUNDIR%/qemu.out "
                        "|| tail -F %RUNDIR%/cmd.out"
                    ),
                    "format": "stdout HOST [HOST ...]",
                    "help": "tail -f on the stdout of the qemu/cmd for this node",
                    "new-window": True,
                },
                {
                    "name": "stderr",
                    "exec": (
                        "[ -e %RUNDIR%/qemu.err ] && tail -F %RUNDIR%/qemu.err "
                        "|| tail -F %RUNDIR%/cmd.err"
                    ),
                    "format": "stderr HOST [HOST ...]",
                    "help": "tail -f on the stdout of the qemu/cmd for this node",
                    "new-window": True,
                },
            ]
        }

        cli.add_cli_config(self, cdict)

        if "cli" in config:
            cli.add_cli_config(self, config["cli"])

        if "topology" not in self.config:
            self.config["topology"] = {}

        self.topoconf = self.config["topology"]
        self.ipv6_enable = self.topoconf.get("ipv6-enable", False)

        if self.isolated:
            if not self.ipv6_enable:
                # Disable IPv6
                self.cmd_raises("sysctl -w net.ipv6.conf.all.autoconf=0")
                self.cmd_raises("sysctl -w net.ipv6.conf.all.disable_ipv6=1")
            else:
                self.cmd_raises("sysctl -w net.ipv6.conf.all.autoconf=1")
                self.cmd_raises("sysctl -w net.ipv6.conf.all.disable_ipv6=0")

        # we really need overlay, but overlay-layers (used by overlay-images)
        # counts on things being present in overlay so this temp stuff doesn't work.
        # if self.isolated:
        #     # Let's hide podman details
        #     self.tmpfs_mount("/var/lib/containers/storage/overlay-containers")

        shellopt = self.cfgopt.getoption("--shell")
        shellopt = shellopt if shellopt else ""
        if shellopt == "all" or "." in shellopt.split(","):
            self.run_in_window("bash")

    def __del__(self):
        """Catch case of build object but not async_deleted."""
        if hasattr(self, "built"):
            if not self.deleting:
                logging.critical(
                    "Munet object deleted without calling `async_delete` for cleanup."
                )
        s = super()
        if hasattr(s, "__del__"):
            s.__del__(self)

    async def _async_build(self, logger=None):
        """Build the topology based on config."""
        if self.built:
            self.logger.warning("%s: is already built", self)
            return

        self.built = True

        # Allow for all networks to be auto-numbered
        topoconf = self.topoconf
        autonumber = self.autonumber
        ipv6_enable = self.ipv6_enable

        # ---------------------------------------------
        # Merge Kinds and perform variable substitution
        # ---------------------------------------------

        kinds = self.config.get("kinds", {})

        for name, conf in config_to_dict_with_key(topoconf, "networks", "name").items():
            if kind := conf.get("kind"):
                if kconf := kinds[kind]:
                    conf = merge_kind_config(kconf, conf)
            conf = config_subst(
                conf, name=name, rundir=self.rundir, configdir=self.config_dirname
            )
            if "ip" not in conf and autonumber:
                conf["ip"] = "auto"
            if "ipv6" not in conf and autonumber and ipv6_enable:
                conf["ipv6"] = "auto"
            topoconf["networks"][name] = conf
            self.add_network(name, conf, logger=logger)

        for name, conf in config_to_dict_with_key(topoconf, "nodes", "name").items():
            if kind := conf.get("kind"):
                if kconf := kinds[kind]:
                    conf = merge_kind_config(kconf, conf)

            config_to_dict_with_key(
                conf, "env", "name"
            )  # convert list of env objects to dict

            conf = config_subst(
                conf,
                name=name,
                rundir=os.path.join(self.rundir, name),
                configdir=self.config_dirname,
            )
            topoconf["nodes"][name] = conf
            self.add_l3_node(name, conf, logger=logger)

        # ------------------
        # Create connections
        # ------------------

        # Go through all connections and name them so they are sane to the user
        # otherwise when we do p2p links the names/ords skip around based oddly
        for name, node in self.hosts.items():
            nconf = node.config
            if "connections" not in nconf:
                continue
            nconns = []
            for cconf in nconf["connections"]:
                # Replace string only with a dictionary
                if isinstance(cconf, str):
                    splitconf = cconf.split(":", 1)
                    cconf = {"to": splitconf[0]}
                    if len(splitconf) == 2:
                        cconf["name"] = splitconf[1]
                # Allocate a name if not already assigned
                if "name" not in cconf:
                    cconf["name"] = node.get_next_intf_name()
                nconns.append(cconf)
            nconf["connections"] = nconns

        for name, node in self.hosts.items():
            nconf = node.config
            if "connections" not in nconf:
                continue
            for cconf in nconf["connections"]:
                # Eventually can add support for unconnected intf here.
                if "to" not in cconf:
                    continue
                to = cconf["to"]
                if to in self.switches:
                    switch = self.switches[to]
                    swconf = find_matching_net_config(name, cconf, switch.config)
                    await self.add_native_link(switch, node, swconf, cconf)
                elif cconf["name"] not in node.intfs:
                    # Only add the p2p interface if not already there.
                    other = self.hosts[to]
                    oconf = find_matching_net_config(name, cconf, other.config)
                    await self.add_native_link(node, other, cconf, oconf)

    @property
    def autonumber(self):
        return self.topoconf.get("networks-autonumber", False)

    @autonumber.setter
    def autonumber(self, value):
        self.topoconf["networks-autonumber"] = bool(value)

    async def add_native_link(self, node1, node2, c1=None, c2=None):
        """Add a link between switch and node or 2 nodes."""
        isp2p = False

        c1 = {} if c1 is None else c1
        c2 = {} if c2 is None else c2

        if node1.name in self.switches:
            assert node2.name in self.hosts
        elif node2.name in self.switches:
            assert node1.name in self.hosts
            node1, node2 = node2, node1
            c1, c2 = c2, c1
        else:
            # p2p link
            assert node1.name in self.hosts
            assert node1.name in self.hosts
            isp2p = True

        if "name" not in c1:
            c1["name"] = node1.get_next_intf_name()
        if1 = c1["name"]

        if "name" not in c2:
            c2["name"] = node2.get_next_intf_name()
        if2 = c2["name"]

        do_add_link = True
        for n, c in ((node1, c1), (node2, c2)):
            if "hostintf" in c:
                await n.add_host_intf(c["hostintf"], c["name"], mtu=c.get("mtu"))
                do_add_link = False
            elif "physical" in c:
                await n.add_phy_intf(c["physical"], c["name"])
                do_add_link = False
        if do_add_link:
            assert "hostintf" not in c1
            assert "hostintf" not in c2
            assert "physical" not in c1
            assert "physical" not in c2

            if isp2p:
                mtu1 = c1.get("mtu")
                mtu2 = c2.get("mtu")
                mtu = mtu1 if mtu1 else mtu2
                if mtu1 and mtu2 and mtu1 != mtu2:
                    self.logger.error("mtus differ for add_link %s != %s", mtu1, mtu2)
            else:
                mtu = c2.get("mtu")

            super().add_link(node1, node2, if1, if2, mtu=mtu)

        if isp2p:
            node1.set_p2p_addr(node2, c1, c2)
        else:
            node2.set_lan_addr(node1, c2)

        if "physical" not in c1 and not node1.is_vm:
            node1.set_intf_constraints(if1, **c1)
        if "physical" not in c2 and not node2.is_vm:
            node2.set_intf_constraints(if2, **c2)

    def add_l3_node(self, name, config=None, **kwargs):
        """Add a node to munet."""
        if config and config.get("image"):
            cls = L3ContainerNode
        elif config and config.get("qemu"):
            cls = L3QemuVM
        elif config and config.get("server"):
            cls = SSHRemote
            kwargs["server"] = config["server"]
            kwargs["port"] = int(config.get("server-port", 22))
            if "ssh-identity-file" in config:
                kwargs["idfile"] = config.get("ssh-identity-file")
            if "ssh-user" in config:
                kwargs["user"] = config.get("ssh-user")
            if "ssh-password" in config:
                kwargs["password"] = config.get("ssh-password")
        else:
            cls = L3NamespaceNode
        return super().add_host(name, cls=cls, config=config, **kwargs)

    def add_network(self, name, config=None, **kwargs):
        """Add a l2 or l3 switch to munet."""
        if config is None:
            config = {}

        cls = L3Bridge if config.get("ip") else L2Bridge
        mtu = kwargs.get("mtu", config.get("mtu"))
        return super().add_switch(name, cls=cls, config=config, mtu=mtu, **kwargs)

    async def run(self):
        tasks = []

        hosts = self.hosts.values()
        launch_nodes = [x for x in hosts if hasattr(x, "launch")]
        launch_nodes = [x for x in launch_nodes if x.config.get("qemu")]
        run_nodes = [x for x in hosts if hasattr(x, "has_run_cmd") and x.has_run_cmd()]
        ready_nodes = [
            x for x in hosts if hasattr(x, "has_ready_cmd") and x.has_ready_cmd()
        ]

        pcapopt = self.cfgopt.getoption("--pcap")
        pcapopt = pcapopt if pcapopt else ""
        if pcapopt == "all":
            pcapopt = self.switches.keys()
        if pcapopt:
            for pcap in pcapopt.split(","):
                if ":" in pcap:
                    host, intf = pcap.split(":")
                    pcap = f"{host}-{intf}"
                    host = self.hosts[host]
                else:
                    host = self
                    intf = pcap
                host.run_in_window(
                    f"tshark -s 9200 -i {intf} -P -w capture-{pcap}.pcap",
                    background=True,
                    title=f"cap:{pcap}",
                )

        if launch_nodes:
            # would like a info when verbose here.
            logging.debug("Launching nodes")
            await asyncio.gather(*[x.launch() for x in launch_nodes])

        logging.debug("Launched nodes -- Queueing Waits")

        # Watch for launched processes to exit
        for node in launch_nodes:
            task = asyncio.create_task(
                node.launch_p.wait(), name=f"Node-{node.name}-launch"
            )
            task.add_done_callback(node.launch_completed)
            tasks.append(task)

        logging.debug("Wait complete queued, running cmd")

        if run_nodes:
            # would like a info when verbose here.
            logging.debug("Running `cmd` on nodes")
            await asyncio.gather(*[x.run_cmd() for x in run_nodes])

        logging.debug("Ran cmds -- Queueing Waits")

        # Watch for run_cmd processes to exit
        for node in run_nodes:
            task = asyncio.create_task(node.cmd_p.wait(), name=f"Node-{node.name}-cmd")
            task.add_done_callback(node.cmd_completed)
            tasks.append(task)

        logging.debug("Wait complete queued, waiting for ready")

        # Wait for nodes to be ready
        if ready_nodes:

            async def wait_until_ready(x):
                while not await x.async_ready_cmd():
                    logging.debug("Waiting for ready on: %s", x)
                    await asyncio.sleep(0.25)
                logging.debug("%s is ready!", x)

            logging.debug("Waiting for ready on nodes: %s", ready_nodes)
            _, pending = await asyncio.wait(
                [wait_until_ready(x) for x in ready_nodes], timeout=30
            )
            if pending:
                logging.warning("Timeout waiting for ready: %s", pending)
                for nr in pending:
                    nr.cancel()
                raise asyncio.TimeoutError()
            logging.debug("All nodes ready")

        logging.debug("All done returning tasks: %s", tasks)

        return tasks

    async def _async_delete(self):
        from .testing.util import async_pause_test  # pylint: disable=C0415

        self.logger.debug("%s: deleting.", self)

        if self.cfgopt.getoption("--coverage"):
            nodes = (
                x for x in self.hosts.values() if hasattr(x, "gather_coverage_data")
            )
            try:
                await asyncio.gather(*(x.gather_coverage_data() for x in nodes))
            except Exception as error:
                logging.warning("Error gathering coverage data: %s", error)

        pause = bool(self.cfgopt.getoption("--pause-at-end"))
        pause = pause or bool(self.cfgopt.getoption("--pause"))
        if pause:
            try:
                await async_pause_test("Before MUNET delete")
            except KeyboardInterrupt:
                print("^C...continuing")
            except Exception as error:
                self.logger.error("\n...continuing after error: %s", error)

        # XXX should we cancel launch and run tasks?

        try:
            await super()._async_delete()
        except Exception as error:
            self.logger.error("Error cleaning up: %s", error, exc_info=True)
            raise


async def run_cmd_update_ceos(node, shell_cmd, cmds, cmd):
    cmd = cmd.strip()
    if shell_cmd or cmd != "/sbin/init":
        return cmds, cmd

    #
    # Add flash dir and mount it
    #
    flashdir = os.path.join(node.rundir, "flash")
    node.cmd_raises_nsonly(f"mkdir -p {flashdir} && chmod 775 {flashdir}")
    cmds += [f"--volume={flashdir}:/mnt/flash"]

    #
    # Startup config (if not present already)
    #
    if startup_config := node.config.get("startup-config", None):
        dest = os.path.join(flashdir, "startup-config")
        if os.path.exists(dest):
            node.logger.info("Skipping copy of startup-config, already present")
        else:
            source = os.path.join(node.unet.config_dirname, startup_config)
            node.cmd_raises_nsonly(f"cp {source} {dest} && chmod 664 {dest}")

    #
    # system mac address (if not present already
    #
    dest = os.path.join(flashdir, "system_mac_address")
    if os.path.exists(dest):
        node.logger.info("Skipping system-mac generation, already present")
    else:
        random_arista_mac = "00:1c:73:%02x:%02x:%02x" % (
            random.randint(0, 255),
            random.randint(0, 255),
            random.randint(0, 255),
        )
        system_mac = node.config.get("system-mac", random_arista_mac)
        with open(dest, "w", encoding="ascii") as f:
            f.write(system_mac + "\n")
        node.cmd_raises_nsonly(f"chmod 664 {dest}")

    args = []

    # Pass special args for the environment variables
    if "env" in node.config:
        args += [f"systemd.setenv={k}={v}" for k, v in node.config["env"].items()]

    return cmds, [cmd] + args


# XXX this is only used by the container code
kind_run_cmd_update = {"ceos": run_cmd_update_ceos}