summaryrefslogtreecommitdiffstats
path: root/tests/topotests/munet/cli.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/munet/cli.py')
-rw-r--r--tests/topotests/munet/cli.py962
1 files changed, 962 insertions, 0 deletions
diff --git a/tests/topotests/munet/cli.py b/tests/topotests/munet/cli.py
new file mode 100644
index 0000000..f631073
--- /dev/null
+++ b/tests/topotests/munet/cli.py
@@ -0,0 +1,962 @@
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# July 24 2021, Christian Hopps <chopps@labn.net>
+#
+# Copyright 2021, LabN Consulting, L.L.C.
+#
+"""A module that implements a CLI."""
+import argparse
+import asyncio
+import functools
+import logging
+import multiprocessing
+import os
+import pty
+import re
+import readline
+import select
+import shlex
+import socket
+import subprocess
+import sys
+import tempfile
+import termios
+import tty
+
+
+try:
+ from . import linux
+ from .config import list_to_dict_with_key
+except ImportError:
+ # We cannot use relative imports and still run this module directly as a script, and
+ # there are some use cases where we want to run this file as a script.
+ sys.path.append(os.path.dirname(os.path.realpath(__file__)))
+ import linux
+
+ from config import list_to_dict_with_key
+
+
+ENDMARKER = b"\x00END\x00"
+
+logger = logging.getLogger(__name__)
+
+
+def lineiter(sock):
+ s = ""
+ while True:
+ sb = sock.recv(256)
+ if not sb:
+ return
+
+ s += sb.decode("utf-8")
+ i = s.find("\n")
+ if i != -1:
+ yield s[:i]
+ s = s[i + 1 :]
+
+
+# Would be nice to convert to async, but really not needed as used
+def spawn(unet, host, cmd, iow, ns_only):
+ if sys.stdin.isatty():
+ old_tty = termios.tcgetattr(sys.stdin)
+ tty.setraw(sys.stdin.fileno())
+
+ try:
+ master_fd, slave_fd = pty.openpty()
+
+ ns = unet.hosts[host] if host and host != unet else unet
+ popenf = ns.popen_nsonly if ns_only else ns.popen
+
+ # use os.setsid() make it run in a new process group, or bash job
+ # control will not be enabled
+ p = popenf(
+ cmd,
+ # _common_prologue, later in call chain, only does this for use_pty == False
+ preexec_fn=os.setsid,
+ stdin=slave_fd,
+ stdout=slave_fd,
+ stderr=slave_fd,
+ universal_newlines=True,
+ use_pty=True,
+ # XXX this is actually implementing "run on host" for real
+ # skip_pre_cmd=ns_only,
+ )
+ iow.write("\r")
+ iow.flush()
+
+ while p.poll() is None:
+ r, _, _ = select.select([sys.stdin, master_fd], [], [], 0.25)
+ if sys.stdin in r:
+ d = os.read(sys.stdin.fileno(), 10240)
+ os.write(master_fd, d)
+ elif master_fd in r:
+ o = os.read(master_fd, 10240)
+ if o:
+ iow.write(o.decode("utf-8", "ignore"))
+ iow.flush()
+ finally:
+ # restore tty settings back
+ if sys.stdin.isatty():
+ termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
+
+
+def is_host_regex(restr):
+ return len(restr) > 2 and restr[0] == "/" and restr[-1] == "/"
+
+
+def get_host_regex(restr):
+ if len(restr) < 3 or restr[0] != "/" or restr[-1] != "/":
+ return None
+ return re.compile(restr[1:-1])
+
+
+def host_in(restr, names):
+ """Determine if matcher is a regex that matches one of names."""
+ if not (regexp := get_host_regex(restr)):
+ return restr in names
+ for name in names:
+ if regexp.fullmatch(name):
+ return True
+ return False
+
+
+def expand_host(restr, names):
+ """Expand name or regexp into list of hosts."""
+ hosts = []
+ regexp = get_host_regex(restr)
+ if not regexp:
+ assert restr in names
+ hosts.append(restr)
+ else:
+ for name in names:
+ if regexp.fullmatch(name):
+ hosts.append(name)
+ return sorted(hosts)
+
+
+def expand_hosts(restrs, names):
+ """Expand list of host names or regex into list of hosts."""
+ hosts = []
+ for restr in restrs:
+ hosts += expand_host(restr, names)
+ return sorted(hosts)
+
+
+def host_cmd_split(unet, line, toplevel):
+ all_hosts = set(unet.hosts)
+ csplit = line.split()
+ i = 0
+ banner = False
+ for i, e in enumerate(csplit):
+ if is_re := is_host_regex(e):
+ banner = True
+ if not host_in(e, all_hosts):
+ if not is_re:
+ break
+ else:
+ i += 1
+
+ if i == 0 and csplit and csplit[0] == "*":
+ hosts = sorted(all_hosts)
+ csplit = csplit[1:]
+ banner = True
+ elif i == 0 and csplit and csplit[0] == ".":
+ hosts = [unet]
+ csplit = csplit[1:]
+ else:
+ hosts = expand_hosts(csplit[:i], all_hosts)
+ csplit = csplit[i:]
+
+ if not hosts and not csplit[:i]:
+ if toplevel:
+ hosts = [unet]
+ else:
+ hosts = sorted(all_hosts)
+ banner = True
+
+ if not csplit:
+ return hosts, "", "", True
+
+ i = line.index(csplit[0])
+ i += len(csplit[0])
+ return hosts, csplit[0], line[i:].strip(), banner
+
+
+def win_cmd_host_split(unet, cmd, kinds, defall):
+ if kinds:
+ all_hosts = {
+ x for x in unet.hosts if unet.hosts[x].config.get("kind", "") in kinds
+ }
+ else:
+ all_hosts = set(unet.hosts)
+
+ csplit = cmd.split()
+ i = 0
+ for i, e in enumerate(csplit):
+ if not host_in(e, all_hosts):
+ if not is_host_regex(e):
+ break
+ else:
+ i += 1
+
+ if i == 0 and csplit and csplit[0] == "*":
+ hosts = sorted(all_hosts)
+ csplit = csplit[1:]
+ elif i == 0 and csplit and csplit[0] == ".":
+ hosts = [unet]
+ csplit = csplit[1:]
+ else:
+ hosts = expand_hosts(csplit[:i], all_hosts)
+
+ if not hosts and defall and not csplit[:i]:
+ hosts = sorted(all_hosts)
+
+ # Filter hosts based on cmd
+ cmd = " ".join(csplit[i:])
+ return hosts, cmd
+
+
+def proc_readline(fd, prompt, histfile):
+ """Read a line of input from user while running in a sub-process."""
+ # How do we change the command though, that's what's displayed in ps normally
+ linux.set_process_name("Munet CLI")
+ try:
+ # For some reason sys.stdin is fileno == 16 and useless
+ sys.stdin = os.fdopen(0)
+ histfile = init_history(None, histfile)
+ line = input(prompt)
+ readline.write_history_file(histfile)
+ if line is None:
+ os.write(fd, b"\n")
+ os.write(fd, bytes(f":{str(line)}\n", encoding="utf-8"))
+ except EOFError:
+ os.write(fd, b"\n")
+ except KeyboardInterrupt:
+ os.write(fd, b"I\n")
+ except Exception as error:
+ os.write(fd, bytes(f"E{str(error)}\n", encoding="utf-8"))
+
+
+async def async_input_reader(rfd):
+ """Read a line of input from the user input sub-process pipe."""
+ rpipe = os.fdopen(rfd, mode="r")
+ reader = asyncio.StreamReader()
+
+ def protocol_factory():
+ return asyncio.StreamReaderProtocol(reader)
+
+ loop = asyncio.get_event_loop()
+ transport, _ = await loop.connect_read_pipe(protocol_factory, rpipe)
+ o = await reader.readline()
+ transport.close()
+
+ o = o.decode("utf-8").strip()
+ if not o:
+ return None
+ if o[0] == "I":
+ raise KeyboardInterrupt()
+ if o[0] == "E":
+ raise Exception(o[1:])
+ assert o[0] == ":"
+ return o[1:]
+
+
+#
+# A lot of work to add async `input` handling without creating a thread. We cannot use
+# threads when unshare_inline is used with pid namespace per kernel clone(2)
+# restriction.
+#
+async def async_input(prompt, histfile):
+ """Asynchronously read a line from the user."""
+ rfd, wfd = os.pipe()
+ p = multiprocessing.Process(target=proc_readline, args=(wfd, prompt, histfile))
+ p.start()
+ logging.debug("started async_input input process: %s", p)
+ try:
+ return await async_input_reader(rfd)
+ finally:
+ logging.debug("joining async_input input process")
+ p.join()
+
+
+def make_help_str(unet):
+ w = sorted([x if x else "" for x in unet.cli_in_window_cmds])
+ ww = unet.cli_in_window_cmds
+ u = sorted([x if x else "" for x in unet.cli_run_cmds])
+ uu = unet.cli_run_cmds
+
+ s = (
+ """
+Basic Commands:
+ cli :: open a secondary CLI window
+ help :: this help
+ hosts :: list hosts
+ quit :: quit the cli
+
+ HOST can be a host or one of the following:
+ - '*' for all hosts
+ - '.' for the parent munet
+ - a regex specified between '/' (e.g., '/rtr.*/')
+
+New Window Commands:\n"""
+ + "\n".join([f" {ww[v][0]}\t:: {ww[v][1]}" for v in w])
+ + """\nInline Commands:\n"""
+ + "\n".join([f" {uu[v][0]}\t:: {uu[v][1]}" for v in u])
+ + "\n"
+ )
+ return s
+
+
+def get_shcmd(unet, host, kinds, execfmt, ucmd):
+ if host is None:
+ h = None
+ kind = None
+ elif host is unet or host == "":
+ h = unet
+ kind = ""
+ else:
+ h = unet.hosts[host]
+ kind = h.config.get("kind", "")
+ if kinds and kind not in kinds:
+ return ""
+ if not isinstance(execfmt, str):
+ execfmt = execfmt.get(kind, {}).get("exec", "")
+ if not execfmt:
+ return ""
+
+ # Do substitutions for {} in string
+ numfmt = len(re.findall(r"{\d*}", execfmt))
+ if numfmt > 1:
+ ucmd = execfmt.format(*shlex.split(ucmd))
+ elif numfmt:
+ ucmd = execfmt.format(ucmd)
+ elif len(re.findall(r"{[a-zA-Z_][0-9a-zA-Z_\.]*}", execfmt)):
+ if execfmt.endswith('"'):
+ fstring = "f'''" + execfmt + "'''"
+ else:
+ fstring = 'f"""' + execfmt + '"""'
+ ucmd = eval( # pylint: disable=W0123
+ fstring,
+ globals(),
+ {"host": h, "unet": unet, "user_input": ucmd},
+ )
+ else:
+ # No variable or usercmd substitution at all.
+ ucmd = execfmt
+
+ # Do substitution for munet variables
+ ucmd = ucmd.replace("%CONFIGDIR%", str(unet.config_dirname))
+ if host is None or host is unet:
+ ucmd = ucmd.replace("%RUNDIR%", str(unet.rundir))
+ return ucmd.replace("%NAME%", ".")
+ ucmd = ucmd.replace("%RUNDIR%", str(os.path.join(unet.rundir, host)))
+ if h.mgmt_ip:
+ ucmd = ucmd.replace("%IPADDR%", str(h.mgmt_ip))
+ elif h.mgmt_ip6:
+ ucmd = ucmd.replace("%IPADDR%", str(h.mgmt_ip6))
+ if h.mgmt_ip6:
+ ucmd = ucmd.replace("%IP6ADDR%", str(h.mgmt_ip6))
+ return ucmd.replace("%NAME%", str(host))
+
+
+async def run_command(
+ unet,
+ outf,
+ line,
+ execfmt,
+ banner,
+ hosts,
+ toplevel,
+ kinds,
+ ns_only=False,
+ interactive=False,
+):
+ """Runs a command on a set of hosts.
+
+ Runs `execfmt`. Prior to executing the string the following transformations are
+ performed on it.
+
+ `execfmt` may also be a dictionary of dicitonaries keyed on kind with `exec` holding
+ the kind's execfmt string.
+
+ - if `{}` is present then `str.format` is called to replace `{}` with any extra
+ input values after the command and hosts are removed from the input.
+ - else if any `{digits}` are present then `str.format` is called to replace
+ `{digits}` with positional args obtained from the addittional user input
+ first passed to `shlex.split`.
+ - else f-string style interpolation is performed on the string with
+ the local variables `host` (the current node object or None),
+ `unet` (the Munet object), and `user_input` (the additional command input)
+ defined.
+
+ The output is sent to `outf`. If `ns_only` is True then the `execfmt` is
+ run using `Commander.cmd_status_nsonly` otherwise it is run with
+ `Commander.cmd_status`.
+ """
+ if kinds:
+ logging.info("Filtering hosts to kinds: %s", kinds)
+ hosts = [x for x in hosts if unet.hosts[x].config.get("kind", "") in kinds]
+ logging.info("Filtered hosts: %s", hosts)
+
+ if not hosts:
+ if not toplevel:
+ return
+ hosts = [unet]
+
+ # if unknowns := [x for x in hosts if x not in unet.hosts]:
+ # outf.write("%% Unknown host[s]: %s\n" % ", ".join(unknowns))
+ # return
+
+ # if sys.stdin.isatty() and interactive:
+ if interactive:
+ for host in hosts:
+ shcmd = get_shcmd(unet, host, kinds, execfmt, line)
+ if not shcmd:
+ continue
+ if len(hosts) > 1 or banner:
+ outf.write(f"------ Host: {host} ------\n")
+ spawn(unet, host if not toplevel else unet, shcmd, outf, ns_only)
+ if len(hosts) > 1 or banner:
+ outf.write(f"------- End: {host} ------\n")
+ outf.write("\n")
+ return
+
+ aws = []
+ for host in hosts:
+ shcmd = get_shcmd(unet, host, kinds, execfmt, line)
+ if not shcmd:
+ continue
+ if toplevel:
+ ns = unet
+ else:
+ ns = unet.hosts[host] if host and host != unet else unet
+ if ns_only:
+ cmdf = ns.async_cmd_status_nsonly
+ else:
+ cmdf = ns.async_cmd_status
+ aws.append(cmdf(shcmd, warn=False, stderr=subprocess.STDOUT))
+
+ results = await asyncio.gather(*aws, return_exceptions=True)
+ for host, result in zip(hosts, results):
+ if isinstance(result, Exception):
+ o = str(result) + "\n"
+ rc = -1
+ else:
+ rc, o, _ = result
+ if len(hosts) > 1 or banner:
+ outf.write(f"------ Host: {host} ------\n")
+ if rc:
+ outf.write(f"*** non-zero exit status: {rc}\n")
+ outf.write(o)
+ if len(hosts) > 1 or banner:
+ outf.write(f"------- End: {host} ------\n")
+
+
+cli_builtins = ["cli", "help", "hosts", "quit"]
+
+
+class Completer:
+ """A completer class for the CLI."""
+
+ def __init__(self, unet):
+ self.unet = unet
+
+ def complete(self, text, state):
+ line = readline.get_line_buffer()
+ tokens = line.split()
+ # print(f"\nXXX: tokens: {tokens} text: '{text}' state: {state}'\n")
+
+ first_token = not tokens or (text and len(tokens) == 1)
+
+ # If we have already have a builtin command we are done
+ if tokens and tokens[0] in cli_builtins:
+ return [None]
+
+ cli_run_cmds = set(self.unet.cli_run_cmds.keys())
+ top_run_cmds = {x for x in cli_run_cmds if self.unet.cli_run_cmds[x][3]}
+ cli_run_cmds -= top_run_cmds
+ cli_win_cmds = set(self.unet.cli_in_window_cmds.keys())
+ hosts = set(self.unet.hosts.keys())
+ is_window_cmd = bool(tokens) and tokens[0] in cli_win_cmds
+ done_set = set()
+ if bool(tokens):
+ if text:
+ done_set = set(tokens[:-1])
+ else:
+ done_set = set(tokens)
+
+ # Determine the domain for completions
+ if not tokens or first_token:
+ all_cmds = (
+ set(cli_builtins) | hosts | cli_run_cmds | cli_win_cmds | top_run_cmds
+ )
+ elif is_window_cmd:
+ all_cmds = hosts
+ elif tokens and tokens[0] in top_run_cmds:
+ # nothing to complete if a top level command
+ pass
+ elif not bool(done_set & cli_run_cmds):
+ all_cmds = hosts | cli_run_cmds
+
+ if not text:
+ completes = all_cmds
+ else:
+ # print(f"\nXXX: all_cmds: {all_cmds} text: '{text}'\n")
+ completes = {x + " " for x in all_cmds if x.startswith(text)}
+
+ # print(f"\nXXX: completes: {completes} text: '{text}' state: {state}'\n")
+ # remove any completions already present
+ completes -= done_set
+ completes = sorted(completes) + [None]
+ return completes[state]
+
+
+async def doline(
+ unet, line, outf, background=False, notty=False
+): # pylint: disable=R0911
+ line = line.strip()
+ m = re.fullmatch(r"^(\S+)(?:\s+(.*))?$", line)
+ if not m:
+ return True
+
+ cmd = m.group(1)
+ nline = m.group(2) if m.group(2) else ""
+
+ if cmd in ("q", "quit"):
+ return False
+
+ if cmd == "help":
+ outf.write(make_help_str(unet))
+ return True
+ if cmd in ("h", "hosts"):
+ outf.write(f"% Hosts:\t{' '.join(sorted(unet.hosts.keys()))}\n")
+ return True
+ if cmd == "cli":
+ await remote_cli(
+ unet,
+ "secondary> ",
+ "Secondary CLI",
+ background,
+ )
+ return True
+
+ #
+ # In window commands
+ #
+
+ if cmd in unet.cli_in_window_cmds:
+ execfmt, toplevel, kinds, kwargs = unet.cli_in_window_cmds[cmd][2:]
+
+ # if toplevel:
+ # ucmd = " ".join(nline.split())
+ # else:
+ hosts, ucmd = win_cmd_host_split(unet, nline, kinds, False)
+ if not hosts:
+ if not toplevel:
+ return True
+ hosts = [unet]
+
+ if isinstance(execfmt, str):
+ found_brace = "{}" in execfmt
+ else:
+ found_brace = False
+ for d in execfmt.values():
+ if "{}" in d["exec"]:
+ found_brace = True
+ break
+ if not found_brace and ucmd and not toplevel:
+ # CLI command does not expect user command so treat as hosts of which some
+ # must be unknown
+ unknowns = [x for x in ucmd.split() if x not in unet.hosts]
+ outf.write(f"% Unknown host[s]: {' '.join(unknowns)}\n")
+ return True
+
+ try:
+ if not hosts and toplevel:
+ hosts = [unet]
+
+ for host in hosts:
+ shcmd = get_shcmd(unet, host, kinds, execfmt, ucmd)
+ if toplevel or host == unet:
+ unet.run_in_window(shcmd, **kwargs)
+ else:
+ unet.hosts[host].run_in_window(shcmd, **kwargs)
+ except Exception as error:
+ outf.write(f"% Error: {error}\n")
+ return True
+
+ #
+ # Inline commands
+ #
+
+ toplevel = unet.cli_run_cmds[cmd][3] if cmd in unet.cli_run_cmds else False
+ # if toplevel:
+ # logging.debug("top-level: cmd: '%s' nline: '%s'", cmd, nline)
+ # hosts = None
+ # banner = False
+ # else:
+
+ hosts, cmd, nline, banner = host_cmd_split(unet, line, toplevel)
+ hoststr = "munet" if hosts == [unet] else f"{hosts}"
+ logging.debug("hosts: '%s' cmd: '%s' nline: '%s'", hoststr, cmd, nline)
+
+ if cmd in unet.cli_run_cmds:
+ pass
+ elif "" in unet.cli_run_cmds:
+ nline = f"{cmd} {nline}"
+ cmd = ""
+ else:
+ outf.write(f"% Unknown command: {cmd} {nline}\n")
+ return True
+
+ execfmt, toplevel, kinds, ns_only, interactive = unet.cli_run_cmds[cmd][2:]
+ if interactive and notty:
+ outf.write("% Error: interactive command must be run from primary CLI\n")
+ return True
+
+ await run_command(
+ unet,
+ outf,
+ nline,
+ execfmt,
+ banner,
+ hosts,
+ toplevel,
+ kinds,
+ ns_only,
+ interactive,
+ )
+
+ return True
+
+
+async def cli_client(sockpath, prompt="munet> "):
+ """Implement the user-facing CLI for a remote munet reached by a socket."""
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.settimeout(10)
+ sock.connect(sockpath)
+
+ # Go into full non-blocking mode now
+ sock.settimeout(None)
+
+ print("\n--- Munet CLI Starting ---\n\n")
+ while True:
+ line = input(prompt)
+ if line is None:
+ return
+
+ # Need to put \n back
+ line += "\n"
+
+ # Send the CLI command
+ sock.send(line.encode("utf-8"))
+
+ def bendswith(b, sentinel):
+ slen = len(sentinel)
+ return len(b) >= slen and b[-slen:] == sentinel
+
+ # Collect the output
+ rb = b""
+ while not bendswith(rb, ENDMARKER):
+ lb = sock.recv(4096)
+ if not lb:
+ return
+ rb += lb
+
+ # Remove the marker
+ rb = rb[: -len(ENDMARKER)]
+
+ # Write the output
+ sys.stdout.write(rb.decode("utf-8", "ignore"))
+
+
+async def local_cli(unet, outf, prompt, histfile, background):
+ """Implement the user-side CLI for local munet."""
+ assert unet is not None
+ completer = Completer(unet)
+ readline.parse_and_bind("tab: complete")
+ readline.set_completer(completer.complete)
+
+ print("\n--- Munet CLI Starting ---\n\n")
+ while True:
+ try:
+ line = await async_input(prompt, histfile)
+ if line is None:
+ return
+
+ if not await doline(unet, line, outf, background):
+ return
+ except KeyboardInterrupt:
+ outf.write("%% Caught KeyboardInterrupt\nUse ^D or 'quit' to exit")
+
+
+def init_history(unet, histfile):
+ try:
+ if histfile is None:
+ histfile = os.path.expanduser("~/.munet-history.txt")
+ if not os.path.exists(histfile):
+ if unet:
+ unet.cmd("touch " + histfile)
+ else:
+ subprocess.run("touch " + histfile, shell=True, check=True)
+ if histfile:
+ readline.read_history_file(histfile)
+ return histfile
+ except Exception as error:
+ logging.warning("init_history failed: %s", error)
+ return None
+
+
+async def cli_client_connected(unet, background, reader, writer):
+ """Handle CLI commands inside the munet process from a socket."""
+ # # Go into full non-blocking mode now
+ # client.settimeout(None)
+ logging.debug("cli client connected")
+ while True:
+ line = await reader.readline()
+ if not line:
+ logging.debug("client closed cli connection")
+ break
+ line = line.decode("utf-8").strip()
+
+ class EncodingFile:
+ """Wrap a writer to encode in utf-8."""
+
+ def __init__(self, writer):
+ self.writer = writer
+
+ def write(self, x):
+ self.writer.write(x.encode("utf-8", "ignore"))
+
+ def flush(self):
+ self.writer.flush()
+
+ if not await doline(unet, line, EncodingFile(writer), background, notty=True):
+ logging.debug("server closing cli connection")
+ return
+
+ writer.write(ENDMARKER)
+ await writer.drain()
+
+
+async def remote_cli(unet, prompt, title, background):
+ """Open a CLI in a new window."""
+ try:
+ if not unet.cli_sockpath:
+ sockpath = os.path.join(tempfile.mkdtemp("-sockdir", "pty-"), "cli.sock")
+ ccfunc = functools.partial(cli_client_connected, unet, background)
+ s = await asyncio.start_unix_server(ccfunc, path=sockpath)
+ unet.cli_server = asyncio.create_task(s.serve_forever(), name="cli-task")
+ unet.cli_sockpath = sockpath
+ logging.info("server created on :\n%s\n", sockpath)
+
+ # Open a new window with a new CLI
+ python_path = await unet.async_get_exec_path(["python3", "python"])
+ us = os.path.realpath(__file__)
+ cmd = f"{python_path} {us}"
+ if unet.cli_histfile:
+ cmd += " --histfile=" + unet.cli_histfile
+ if prompt:
+ cmd += f" --prompt='{prompt}'"
+ cmd += " " + unet.cli_sockpath
+ unet.run_in_window(cmd, title=title, background=False)
+ except Exception as error:
+ logging.error("cli server: unexpected exception: %s", error)
+
+
+def add_cli_in_window_cmd(
+ unet, name, helpfmt, helptxt, execfmt, toplevel, kinds, **kwargs
+):
+ """Adds a CLI command to the CLI.
+
+ The command `cmd` is added to the commands executable by the user from the CLI. See
+ `base.Commander.run_in_window` for the arguments that can be passed in `args` and
+ `kwargs` to this function.
+
+ Args:
+ unet: unet object
+ name: command string (no spaces)
+ helpfmt: format of command to display in help (left side)
+ helptxt: help string for command (right side)
+ execfmt: interpreter `cmd` to pass to `host.run_in_window()`, if {} present then
+ allow for user commands to be entered and inserted. May also be a dict of dict
+ keyed on kind with sub-key of "exec" providing the `execfmt` string for that
+ kind.
+ toplevel: run command in common top-level namespaec not inside hosts
+ kinds: limit CLI command to nodes which match list of kinds.
+ **kwargs: keyword args to pass to `host.run_in_window()`
+ """
+ unet.cli_in_window_cmds[name] = (helpfmt, helptxt, execfmt, toplevel, kinds, kwargs)
+
+
+def add_cli_run_cmd(
+ unet,
+ name,
+ helpfmt,
+ helptxt,
+ execfmt,
+ toplevel,
+ kinds,
+ ns_only=False,
+ interactive=False,
+):
+ """Adds a CLI command to the CLI.
+
+ The command `cmd` is added to the commands executable by the user from the CLI.
+ See `run_command` above in the `doline` function and for the arguments that can
+ be passed in to this function.
+
+ Args:
+ unet: unet object
+ name: command string (no spaces)
+ helpfmt: format of command to display in help (left side)
+ helptxt: help string for command (right side)
+ execfmt: format string to insert user cmds into for execution. May also be a
+ dict of dict keyed on kind with sub-key of "exec" providing the `execfmt`
+ string for that kind.
+ toplevel: run command in common top-level namespaec not inside hosts
+ kinds: limit CLI command to nodes which match list of kinds.
+ ns_only: Should execute the command on the host vs in the node namespace.
+ interactive: Should execute the command inside an allocated pty (interactive)
+ """
+ unet.cli_run_cmds[name] = (
+ helpfmt,
+ helptxt,
+ execfmt,
+ toplevel,
+ kinds,
+ ns_only,
+ interactive,
+ )
+
+
+def add_cli_config(unet, config):
+ """Adds CLI commands based on config.
+
+ All exec strings will have %CONFIGDIR%, %NAME% and %RUNDIR% replaced with the
+ corresponding config directory and the current nodes `name` and `rundir`.
+ Additionally, the exec string will have f-string style interpolation performed
+ with the local variables `host` (node object or None), `unet` (Munet object) and
+ `user_input` (if provided to the CLI command) defined.
+
+ The format of the config dictionary can be seen in the following example.
+ The first list entry represents the default command because it has no `name` key.
+
+ commands:
+ - help: "run the given FRR command using vtysh"
+ format: "[HOST ...] FRR-CLI-COMMAND"
+ exec: "vtysh -c {}"
+ ns-only: false # the default
+ interactive: false # the default
+ - name: "vtysh"
+ help: "Open a FRR CLI inside new terminal[s] on the given HOST[s]"
+ format: "vtysh HOST [HOST ...]"
+ exec: "vtysh"
+ new-window: true
+ - name: "capture"
+ help: "Capture packets on a given network"
+ format: "pcap NETWORK"
+ exec: "tshark -s 9200 -i {0} -w /tmp/capture-{0}.pcap"
+ new-window: true
+ top-level: true # run in top-level container namespace, above hosts
+
+ The `new_window` key can also be a dictionary which will be passed as keyward
+ arguments to the `Commander.run_in_window()` function.
+
+ Args:
+ unet: unet object
+ config: dictionary of cli config
+ """
+ for cli_cmd in config.get("commands", []):
+ name = cli_cmd.get("name", None)
+ helpfmt = cli_cmd.get("format", "")
+ helptxt = cli_cmd.get("help", "")
+ execfmt = list_to_dict_with_key(cli_cmd.get("exec-kind"), "kind")
+ if not execfmt:
+ execfmt = cli_cmd.get("exec", "bash -c '{}'")
+ toplevel = cli_cmd.get("top-level", False)
+ kinds = cli_cmd.get("kinds", [])
+ stdargs = (unet, name, helpfmt, helptxt, execfmt, toplevel, kinds)
+ new_window = cli_cmd.get("new-window", None)
+ if isinstance(new_window, dict):
+ add_cli_in_window_cmd(*stdargs, **new_window)
+ elif bool(new_window):
+ add_cli_in_window_cmd(*stdargs)
+ else:
+ # on-host is deprecated it really implemented "ns-only"
+ add_cli_run_cmd(
+ *stdargs,
+ cli_cmd.get("ns-only", cli_cmd.get("on-host")),
+ cli_cmd.get("interactive", False),
+ )
+
+
+def cli(
+ unet,
+ histfile=None,
+ sockpath=None,
+ force_window=False,
+ title=None,
+ prompt=None,
+ background=True,
+):
+ asyncio.run(
+ async_cli(unet, histfile, sockpath, force_window, title, prompt, background)
+ )
+
+
+async def async_cli(
+ unet,
+ histfile=None,
+ sockpath=None,
+ force_window=False,
+ title=None,
+ prompt=None,
+ background=True,
+):
+ if prompt is None:
+ prompt = "munet> "
+
+ if force_window or not sys.stdin.isatty():
+ await remote_cli(unet, prompt, title, background)
+
+ if not unet:
+ logger.debug("client-cli using sockpath %s", sockpath)
+
+ try:
+ if sockpath:
+ await cli_client(sockpath, prompt)
+ else:
+ await local_cli(unet, sys.stdout, prompt, histfile, background)
+ except KeyboardInterrupt:
+ print("\n...^C exiting CLI")
+ except EOFError:
+ pass
+ except Exception as ex:
+ logger.critical("cli: got exception: %s", ex, exc_info=True)
+ raise
+
+
+if __name__ == "__main__":
+ # logging.basicConfig(level=logging.DEBUG, filename="/tmp/topotests/cli-client.log")
+ logging.basicConfig(level=logging.DEBUG)
+ logger = logging.getLogger("cli-client")
+ logger.info("Start logging cli-client")
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--histfile", help="file to user for history")
+ parser.add_argument("--prompt", help="prompt string to use")
+ parser.add_argument("socket", help="path to pair of sockets to communicate over")
+ cli_args = parser.parse_args()
+
+ cli_prompt = cli_args.prompt if cli_args.prompt else "munet> "
+ asyncio.run(
+ async_cli(
+ None,
+ cli_args.histfile,
+ cli_args.socket,
+ prompt=cli_prompt,
+ background=False,
+ )
+ )