summaryrefslogtreecommitdiffstats
path: root/tests/topotests/munet/testing
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/munet/testing')
-rw-r--r--tests/topotests/munet/testing/__init__.py1
-rw-r--r--tests/topotests/munet/testing/fixtures.py447
-rw-r--r--tests/topotests/munet/testing/hooks.py225
-rw-r--r--tests/topotests/munet/testing/util.py110
4 files changed, 783 insertions, 0 deletions
diff --git a/tests/topotests/munet/testing/__init__.py b/tests/topotests/munet/testing/__init__.py
new file mode 100644
index 0000000..63cbfab
--- /dev/null
+++ b/tests/topotests/munet/testing/__init__.py
@@ -0,0 +1 @@
+"""Sub-package supporting munet use in pytest."""
diff --git a/tests/topotests/munet/testing/fixtures.py b/tests/topotests/munet/testing/fixtures.py
new file mode 100644
index 0000000..25039df
--- /dev/null
+++ b/tests/topotests/munet/testing/fixtures.py
@@ -0,0 +1,447 @@
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# April 22 2022, Christian Hopps <chopps@gmail.com>
+#
+# Copyright (c) 2022, LabN Consulting, L.L.C
+#
+"""A module that implements pytest fixtures.
+
+To use in your project, in your conftest.py add:
+
+ from munet.testing.fixtures import *
+"""
+import contextlib
+import logging
+import os
+
+from pathlib import Path
+from typing import Union
+
+import pytest
+import pytest_asyncio
+
+from ..base import BaseMunet
+from ..base import Bridge
+from ..base import get_event_loop
+from ..cleanup import cleanup_current
+from ..cleanup import cleanup_previous
+from ..native import L3NodeMixin
+from ..parser import async_build_topology
+from ..parser import get_config
+from .util import async_pause_test
+from .util import pause_test
+
+
+@contextlib.asynccontextmanager
+async def achdir(ndir: Union[str, Path], desc=""):
+ odir = os.getcwd()
+ os.chdir(ndir)
+ if desc:
+ logging.debug("%s: chdir from %s to %s", desc, odir, ndir)
+ try:
+ yield
+ finally:
+ if desc:
+ logging.debug("%s: chdir back from %s to %s", desc, ndir, odir)
+ os.chdir(odir)
+
+
+@contextlib.contextmanager
+def chdir(ndir: Union[str, Path], desc=""):
+ odir = os.getcwd()
+ os.chdir(ndir)
+ if desc:
+ logging.debug("%s: chdir from %s to %s", desc, odir, ndir)
+ try:
+ yield
+ finally:
+ if desc:
+ logging.debug("%s: chdir back from %s to %s", desc, ndir, odir)
+ os.chdir(odir)
+
+
+def get_test_logdir(nodeid=None, module=False):
+ """Get log directory relative pathname."""
+ xdist_worker = os.getenv("PYTEST_XDIST_WORKER", "")
+ mode = os.getenv("PYTEST_XDIST_MODE", "no")
+
+ # nodeid: all_protocol_startup/test_all_protocol_startup.py::test_router_running
+ # may be missing "::testname" if module is True
+ if not nodeid:
+ nodeid = os.environ["PYTEST_CURRENT_TEST"].split(" ")[0]
+
+ cur_test = nodeid.replace("[", "_").replace("]", "_")
+ if module:
+ idx = cur_test.rfind("::")
+ path = cur_test if idx == -1 else cur_test[:idx]
+ testname = ""
+ else:
+ path, testname = cur_test.split("::")
+ testname = testname.replace("/", ".")
+ path = path[:-3].replace("/", ".")
+
+ # We use different logdir paths based on how xdist is running.
+ if mode == "each":
+ if module:
+ return os.path.join(path, "worker-logs", xdist_worker)
+ return os.path.join(path, testname, xdist_worker)
+ assert mode in ("no", "load", "loadfile", "loadscope"), f"Unknown dist mode {mode}"
+ return path if module else os.path.join(path, testname)
+
+
+def _push_log_handler(desc, logpath):
+ logpath = os.path.abspath(logpath)
+ logging.debug("conftest: adding %s logging at %s", desc, logpath)
+ root_logger = logging.getLogger()
+ handler = logging.FileHandler(logpath, mode="w")
+ fmt = logging.Formatter("%(asctime)s %(levelname)5s: %(message)s")
+ handler.setFormatter(fmt)
+ root_logger.addHandler(handler)
+ return handler
+
+
+def _pop_log_handler(handler):
+ root_logger = logging.getLogger()
+ logging.debug("conftest: removing logging handler %s", handler)
+ root_logger.removeHandler(handler)
+
+
+@contextlib.contextmanager
+def log_handler(desc, logpath):
+ handler = _push_log_handler(desc, logpath)
+ try:
+ yield
+ finally:
+ _pop_log_handler(handler)
+
+
+# =================
+# Sessions Fixtures
+# =================
+
+
+@pytest.fixture(autouse=True, scope="session")
+def session_autouse():
+ if "PYTEST_TOPOTEST_WORKER" not in os.environ:
+ is_worker = False
+ elif not os.environ["PYTEST_TOPOTEST_WORKER"]:
+ is_worker = False
+ else:
+ is_worker = True
+
+ if not is_worker:
+ # This is unfriendly to multi-instance
+ cleanup_previous()
+
+ # We never pop as we want to keep logging
+ _push_log_handler("session", "/tmp/unet-test/pytest-session.log")
+
+ yield
+
+ if not is_worker:
+ cleanup_current()
+
+
+# ===============
+# Module Fixtures
+# ===============
+
+
+@pytest.fixture(autouse=True, scope="module")
+def module_autouse(request):
+ logpath = get_test_logdir(request.node.name, True)
+ logpath = os.path.join("/tmp/unet-test", logpath, "pytest-exec.log")
+ with log_handler("module", logpath):
+ sdir = os.path.dirname(os.path.realpath(request.fspath))
+ with chdir(sdir, "module autouse fixture"):
+ yield
+
+ if BaseMunet.g_unet:
+ raise Exception("Base Munet was not cleaned up/deleted")
+
+
+@pytest.fixture(scope="module")
+def event_loop():
+ """Create an instance of the default event loop for the session."""
+ loop = get_event_loop()
+ try:
+ logging.info("event_loop_fixture: yielding with new event loop watcher")
+ yield loop
+ finally:
+ loop.close()
+
+
+@pytest.fixture(scope="module")
+def rundir_module():
+ d = os.path.join("/tmp/unet-test", get_test_logdir(module=True))
+ logging.debug("conftest: test module rundir %s", d)
+ return d
+
+
+async def _unet_impl(
+ _rundir, _pytestconfig, unshare=None, top_level_pidns=None, param=None
+):
+ try:
+ # Default is not to unshare inline if not specified otherwise
+ unshare_default = False
+ pidns_default = True
+ if isinstance(param, (tuple, list)):
+ pidns_default = bool(param[2]) if len(param) > 2 else True
+ unshare_default = bool(param[1]) if len(param) > 1 else False
+ param = str(param[0])
+ elif isinstance(param, bool):
+ unshare_default = param
+ param = None
+ if unshare is None:
+ unshare = unshare_default
+ if top_level_pidns is None:
+ top_level_pidns = pidns_default
+
+ logging.info("unet fixture: basename=%s unshare_inline=%s", param, unshare)
+ _unet = await async_build_topology(
+ config=get_config(basename=param) if param else None,
+ rundir=_rundir,
+ unshare_inline=unshare,
+ top_level_pidns=top_level_pidns,
+ pytestconfig=_pytestconfig,
+ )
+ except Exception as error:
+ logging.debug(
+ "unet fixture: unet build failed: %s\nparam: %s",
+ error,
+ param,
+ exc_info=True,
+ )
+ pytest.skip(
+ f"unet fixture: unet build failed: {error}", allow_module_level=True
+ )
+ raise
+
+ try:
+ tasks = await _unet.run()
+ except Exception as error:
+ logging.debug("unet fixture: unet run failed: %s", error, exc_info=True)
+ await _unet.async_delete()
+ pytest.skip(f"unet fixture: unet run failed: {error}", allow_module_level=True)
+ raise
+
+ logging.debug("unet fixture: containers running")
+
+ # Pytest is supposed to always return even if exceptions
+ try:
+ yield _unet
+ except Exception as error:
+ logging.error("unet fixture: yield unet unexpected exception: %s", error)
+
+ logging.debug("unet fixture: module done, deleting unet")
+ await _unet.async_delete()
+
+ # No one ever awaits these so cancel them
+ logging.debug("unet fixture: cleanup")
+ for task in tasks:
+ task.cancel()
+
+ # Reset the class variables so auto number is predictable
+ logging.debug("unet fixture: resetting ords to 1")
+ L3NodeMixin.next_ord = 1
+ Bridge.next_ord = 1
+
+
+@pytest.fixture(scope="module")
+async def unet(request, rundir_module, pytestconfig): # pylint: disable=W0621
+ """A unet creating fixutre.
+
+ The request param is either the basename of the config file or a tuple of the form:
+ (basename, unshare, top_level_pidns), with the second and third elements boolean and
+ optional, defaulting to False, True.
+ """
+ param = request.param if hasattr(request, "param") else None
+ sdir = os.path.dirname(os.path.realpath(request.fspath))
+ async with achdir(sdir, "unet fixture"):
+ async for x in _unet_impl(rundir_module, pytestconfig, param=param):
+ yield x
+
+
+@pytest.fixture(scope="module")
+async def unet_share(request, rundir_module, pytestconfig): # pylint: disable=W0621
+ """A unet creating fixutre.
+
+ This share variant keeps munet from unsharing the process to a new namespace so that
+ root level commands and actions are execute on the host, normally they are executed
+ in the munet namespace which allowing things like scapy inline in tests to work.
+
+ The request param is either the basename of the config file or a tuple of the form:
+ (basename, top_level_pidns), the second value is a boolean.
+ """
+ param = request.param if hasattr(request, "param") else None
+ if isinstance(param, (tuple, list)):
+ param = (param[0], False, param[1])
+ sdir = os.path.dirname(os.path.realpath(request.fspath))
+ async with achdir(sdir, "unet_share fixture"):
+ async for x in _unet_impl(
+ rundir_module, pytestconfig, unshare=False, param=param
+ ):
+ yield x
+
+
+@pytest.fixture(scope="module")
+async def unet_unshare(request, rundir_module, pytestconfig): # pylint: disable=W0621
+ """A unet creating fixutre.
+
+ This unshare variant has the top level munet unshare the process inline so that
+ root level commands and actions are execute in a new namespace. This allows things
+ like scapy inline in tests to work.
+
+ The request param is either the basename of the config file or a tuple of the form:
+ (basename, top_level_pidns), the second value is a boolean.
+ """
+ param = request.param if hasattr(request, "param") else None
+ if isinstance(param, (tuple, list)):
+ param = (param[0], True, param[1])
+ sdir = os.path.dirname(os.path.realpath(request.fspath))
+ async with achdir(sdir, "unet_unshare fixture"):
+ async for x in _unet_impl(
+ rundir_module, pytestconfig, unshare=True, param=param
+ ):
+ yield x
+
+
+# =================
+# Function Fixtures
+# =================
+
+
+@pytest.fixture(autouse=True, scope="function")
+async def function_autouse(request):
+ async with achdir(
+ os.path.dirname(os.path.realpath(request.fspath)), "func.fixture"
+ ):
+ yield
+
+
+@pytest.fixture(autouse=True)
+async def check_for_pause(request, pytestconfig):
+ # When we unshare inline we can't pause in the pytest_runtest_makereport hook
+ # so do it here.
+ if BaseMunet.g_unet and BaseMunet.g_unet.unshare_inline:
+ pause = bool(pytestconfig.getoption("--pause"))
+ if pause:
+ await async_pause_test(f"XXX before test '{request.node.name}'")
+ yield
+
+
+@pytest.fixture(scope="function")
+def stepf(pytestconfig):
+ class Stepnum:
+ """Track the stepnum in closure."""
+
+ num = 0
+
+ def inc(self):
+ self.num += 1
+
+ pause = pytestconfig.getoption("pause")
+ stepnum = Stepnum()
+
+ def stepfunction(desc=""):
+ desc = f": {desc}" if desc else ""
+ if pause:
+ pause_test(f"before step {stepnum.num}{desc}")
+ logging.info("STEP %s%s", stepnum.num, desc)
+ stepnum.inc()
+
+ return stepfunction
+
+
+@pytest_asyncio.fixture(scope="function")
+async def astepf(pytestconfig):
+ class Stepnum:
+ """Track the stepnum in closure."""
+
+ num = 0
+
+ def inc(self):
+ self.num += 1
+
+ pause = pytestconfig.getoption("pause")
+ stepnum = Stepnum()
+
+ async def stepfunction(desc=""):
+ desc = f": {desc}" if desc else ""
+ if pause:
+ await async_pause_test(f"before step {stepnum.num}{desc}")
+ logging.info("STEP %s%s", stepnum.num, desc)
+ stepnum.inc()
+
+ return stepfunction
+
+
+@pytest.fixture(scope="function")
+def rundir():
+ d = os.path.join("/tmp/unet-test", get_test_logdir(module=False))
+ logging.debug("conftest: test function rundir %s", d)
+ return d
+
+
+# Configure logging
+@pytest.hookimpl(hookwrapper=True, tryfirst=True)
+def pytest_runtest_setup(item):
+ d = os.path.join(
+ "/tmp/unet-test", get_test_logdir(nodeid=item.nodeid, module=False)
+ )
+ config = item.config
+ logging_plugin = config.pluginmanager.get_plugin("logging-plugin")
+ filename = Path(d, "pytest-exec.log")
+ logging_plugin.set_log_path(str(filename))
+ logging.debug("conftest: test function setup: rundir %s", d)
+ yield
+
+
+@pytest.fixture
+async def unet_perfunc(request, rundir, pytestconfig): # pylint: disable=W0621
+ param = request.param if hasattr(request, "param") else None
+ async for x in _unet_impl(rundir, pytestconfig, param=param):
+ yield x
+
+
+@pytest.fixture
+async def unet_perfunc_unshare(request, rundir, pytestconfig): # pylint: disable=W0621
+ """Build unet per test function with an optional topology basename parameter.
+
+ The fixture can be parameterized to choose different config files.
+ For example, use as follows to run the test with unet_perfunc configured
+ first with a config file named `cfg1.yaml` then with config file `cfg2.yaml`
+ (where the actual files could end with `json` or `toml` rather than `yaml`).
+
+ @pytest.mark.parametrize(
+ "unet_perfunc", ["cfg1", "cfg2]", indirect=["unet_perfunc"]
+ )
+ def test_example(unet_perfunc)
+ """
+ param = request.param if hasattr(request, "param") else None
+ async for x in _unet_impl(rundir, pytestconfig, unshare=True, param=param):
+ yield x
+
+
+@pytest.fixture
+async def unet_perfunc_share(request, rundir, pytestconfig): # pylint: disable=W0621
+ """Build unet per test function with an optional topology basename parameter.
+
+ This share variant keeps munet from unsharing the process to a new namespace so that
+ root level commands and actions are execute on the host, normally they are executed
+ in the munet namespace which allowing things like scapy inline in tests to work.
+
+ The fixture can be parameterized to choose different config files. For example, use
+ as follows to run the test with unet_perfunc configured first with a config file
+ named `cfg1.yaml` then with config file `cfg2.yaml` (where the actual files could
+ end with `json` or `toml` rather than `yaml`).
+
+ @pytest.mark.parametrize(
+ "unet_perfunc", ["cfg1", "cfg2]", indirect=["unet_perfunc"]
+ )
+ def test_example(unet_perfunc)
+ """
+ param = request.param if hasattr(request, "param") else None
+ async for x in _unet_impl(rundir, pytestconfig, unshare=False, param=param):
+ yield x
diff --git a/tests/topotests/munet/testing/hooks.py b/tests/topotests/munet/testing/hooks.py
new file mode 100644
index 0000000..9b6a49a
--- /dev/null
+++ b/tests/topotests/munet/testing/hooks.py
@@ -0,0 +1,225 @@
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# April 22 2022, Christian Hopps <chopps@gmail.com>
+#
+# Copyright (c) 2022, LabN Consulting, L.L.C
+#
+"""A module that implements pytest hooks.
+
+To use in your project, in your conftest.py add:
+
+ from munet.testing.hooks import *
+"""
+import logging
+import os
+import sys
+import traceback
+
+import pytest
+
+from ..base import BaseMunet # pylint: disable=import-error
+from ..cli import cli # pylint: disable=import-error
+from .util import pause_test
+
+
+# ===================
+# Hooks (non-fixture)
+# ===================
+
+
+def pytest_addoption(parser):
+ parser.addoption(
+ "--cli-on-error",
+ action="store_true",
+ help="CLI on test failure",
+ )
+
+ parser.addoption(
+ "--coverage",
+ action="store_true",
+ help="Enable coverage gathering if supported",
+ )
+
+ parser.addoption(
+ "--gdb",
+ default="",
+ metavar="HOST[,HOST...]",
+ help="Comma-separated list of nodes to launch gdb on, or 'all'",
+ )
+ parser.addoption(
+ "--gdb-breakpoints",
+ default="",
+ metavar="BREAKPOINT[,BREAKPOINT...]",
+ help="Comma-separated list of breakpoints",
+ )
+ parser.addoption(
+ "--gdb-use-emacs",
+ action="store_true",
+ help="Use emacsclient to run gdb instead of a shell",
+ )
+
+ parser.addoption(
+ "--pcap",
+ default="",
+ metavar="NET[,NET...]",
+ help="Comma-separated list of networks to capture packets on, or 'all'",
+ )
+
+ parser.addoption(
+ "--pause",
+ action="store_true",
+ help="Pause after each test",
+ )
+ parser.addoption(
+ "--pause-at-end",
+ action="store_true",
+ help="Pause before taking munet down",
+ )
+ parser.addoption(
+ "--pause-on-error",
+ action="store_true",
+ help="Pause after (disables default when --shell or -vtysh given)",
+ )
+ parser.addoption(
+ "--no-pause-on-error",
+ dest="pause_on_error",
+ action="store_false",
+ help="Do not pause after (disables default when --shell or -vtysh given)",
+ )
+
+ parser.addoption(
+ "--shell",
+ default="",
+ metavar="NODE[,NODE...]",
+ help="Comma-separated list of nodes to spawn shell on, or 'all'",
+ )
+
+ parser.addoption(
+ "--stdout",
+ default="",
+ metavar="NODE[,NODE...]",
+ help="Comma-separated list of nodes to open tail-f stdout window on, or 'all'",
+ )
+
+ parser.addoption(
+ "--stderr",
+ default="",
+ metavar="NODE[,NODE...]",
+ help="Comma-separated list of nodes to open tail-f stderr window on, or 'all'",
+ )
+
+
+def pytest_configure(config):
+ if "PYTEST_XDIST_WORKER" not in os.environ:
+ os.environ["PYTEST_XDIST_MODE"] = config.getoption("dist", "no")
+ os.environ["PYTEST_IS_WORKER"] = ""
+ is_xdist = os.environ["PYTEST_XDIST_MODE"] != "no"
+ is_worker = False
+ else:
+ os.environ["PYTEST_IS_WORKER"] = os.environ["PYTEST_XDIST_WORKER"]
+ is_xdist = True
+ is_worker = True
+
+ # Turn on live logging if user specified verbose and the config has a CLI level set
+ if config.getoption("--verbose") and not is_xdist and not config.getini("log_cli"):
+ if config.getoption("--log-cli-level", None) is None:
+ # By setting the CLI option to the ini value it enables log_cli=1
+ cli_level = config.getini("log_cli_level")
+ if cli_level is not None:
+ config.option.log_cli_level = cli_level
+
+ have_tmux = bool(os.getenv("TMUX", ""))
+ have_screen = not have_tmux and bool(os.getenv("STY", ""))
+ have_xterm = not have_tmux and not have_screen and bool(os.getenv("DISPLAY", ""))
+ have_windows = have_tmux or have_screen or have_xterm
+ have_windows_pause = have_tmux or have_xterm
+ xdist_no_windows = is_xdist and not is_worker and not have_windows_pause
+
+ for winopt in ["--shell", "--stdout", "--stderr"]:
+ b = config.getoption(winopt)
+ if b and xdist_no_windows:
+ pytest.exit(
+ f"{winopt} use requires byobu/TMUX/XTerm "
+ f"under dist {os.environ['PYTEST_XDIST_MODE']}"
+ )
+ elif b and not is_xdist and not have_windows:
+ pytest.exit(f"{winopt} use requires byobu/TMUX/SCREEN/XTerm")
+
+
+def pytest_runtest_makereport(item, call):
+ """Pause or invoke CLI as directed by config."""
+ isatty = sys.stdout.isatty()
+
+ pause = bool(item.config.getoption("--pause"))
+ skipped = False
+
+ if call.excinfo is None:
+ error = False
+ elif call.excinfo.typename == "Skipped":
+ skipped = True
+ error = False
+ pause = False
+ else:
+ error = True
+ modname = item.parent.module.__name__
+ exval = call.excinfo.value
+ logging.error(
+ "test %s/%s failed: %s: stdout: '%s' stderr: '%s'",
+ modname,
+ item.name,
+ exval,
+ exval.stdout if hasattr(exval, "stdout") else "NA",
+ exval.stderr if hasattr(exval, "stderr") else "NA",
+ )
+ if not pause:
+ pause = item.config.getoption("--pause-on-error")
+
+ if error and isatty and item.config.getoption("--cli-on-error"):
+ if not BaseMunet.g_unet:
+ logging.error("Could not launch CLI b/c no munet exists yet")
+ else:
+ print(f"\nCLI-ON-ERROR: {call.excinfo.typename}")
+ print(f"CLI-ON-ERROR:\ntest {modname}/{item.name} failed: {exval}")
+ if hasattr(exval, "stdout") and exval.stdout:
+ print("stdout: " + exval.stdout.replace("\n", "\nstdout: "))
+ if hasattr(exval, "stderr") and exval.stderr:
+ print("stderr: " + exval.stderr.replace("\n", "\nstderr: "))
+ cli(BaseMunet.g_unet)
+
+ if pause:
+ if skipped:
+ item.skip_more_pause = True
+ elif hasattr(item, "skip_more_pause"):
+ pass
+ elif call.when == "setup":
+ if error:
+ item.skip_more_pause = True
+
+ # we can't asyncio.run() (which pause does) if we are unhsare_inline
+ # at this point, count on an autouse fixture to pause instead in this
+ # case
+ if not BaseMunet.g_unet or not BaseMunet.g_unet.unshare_inline:
+ pause_test(f"before test '{item.nodeid}'")
+
+ # check for a result to try and catch setup (or module setup) failure
+ # e.g., after a module level fixture fails, we do not want to pause on every
+ # skipped test.
+ elif call.when == "teardown" and call.excinfo:
+ logging.warning(
+ "Caught exception during teardown: %s\n:Traceback:\n%s",
+ call.excinfo,
+ "".join(traceback.format_tb(call.excinfo.tb)),
+ )
+ pause_test(f"after teardown after test '{item.nodeid}'")
+ elif call.when == "teardown" and call.result:
+ pause_test(f"after test '{item.nodeid}'")
+ elif error:
+ item.skip_more_pause = True
+ print(f"\nPAUSE-ON-ERROR: {call.excinfo.typename}")
+ print(f"PAUSE-ON-ERROR:\ntest {modname}/{item.name} failed: {exval}")
+ if hasattr(exval, "stdout") and exval.stdout:
+ print("stdout: " + exval.stdout.replace("\n", "\nstdout: "))
+ if hasattr(exval, "stderr") and exval.stderr:
+ print("stderr: " + exval.stderr.replace("\n", "\nstderr: "))
+ pause_test(f"PAUSE-ON-ERROR: '{item.nodeid}'")
diff --git a/tests/topotests/munet/testing/util.py b/tests/topotests/munet/testing/util.py
new file mode 100644
index 0000000..a1a94bc
--- /dev/null
+++ b/tests/topotests/munet/testing/util.py
@@ -0,0 +1,110 @@
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# April 22 2022, Christian Hopps <chopps@gmail.com>
+#
+# Copyright (c) 2022, LabN Consulting, L.L.C
+#
+"""Utility functions useful when using munet testing functionailty in pytest."""
+import asyncio
+import datetime
+import functools
+import logging
+import sys
+import time
+
+from ..base import BaseMunet
+from ..cli import async_cli
+
+
+# =================
+# Utility Functions
+# =================
+
+
+async def async_pause_test(desc=""):
+ isatty = sys.stdout.isatty()
+ if not isatty:
+ desc = f" for {desc}" if desc else ""
+ logging.info("NO PAUSE on non-tty terminal%s", desc)
+ return
+
+ while True:
+ if desc:
+ print(f"\n== PAUSING: {desc} ==")
+ try:
+ user = input('PAUSED, "cli" for CLI, "pdb" to debug, "Enter" to continue: ')
+ except EOFError:
+ print("^D...continuing")
+ break
+ user = user.strip()
+ if user == "cli":
+ await async_cli(BaseMunet.g_unet)
+ elif user == "pdb":
+ breakpoint() # pylint: disable=W1515
+ elif user:
+ print(f'Unrecognized input: "{user}"')
+ else:
+ break
+
+
+def pause_test(desc=""):
+ asyncio.run(async_pause_test(desc))
+
+
+def retry(retry_timeout, initial_wait=0, expected=True):
+ """decorator: retry while functions return is not None or raises an exception.
+
+ * `retry_timeout`: Retry for at least this many seconds; after waiting
+ initial_wait seconds
+ * `initial_wait`: Sleeps for this many seconds before first executing function
+ * `expected`: if False then the return logic is inverted, except for exceptions,
+ (i.e., a non None ends the retry loop, and returns that value)
+ """
+
+ def _retry(func):
+ @functools.wraps(func)
+ def func_retry(*args, **kwargs):
+ retry_sleep = 2
+
+ # Allow the wrapped function's args to override the fixtures
+ _retry_timeout = kwargs.pop("retry_timeout", retry_timeout)
+ _expected = kwargs.pop("expected", expected)
+ _initial_wait = kwargs.pop("initial_wait", initial_wait)
+ retry_until = datetime.datetime.now() + datetime.timedelta(
+ seconds=_retry_timeout + _initial_wait
+ )
+
+ if initial_wait > 0:
+ logging.info("Waiting for [%s]s as initial delay", initial_wait)
+ time.sleep(initial_wait)
+
+ while True:
+ seconds_left = (retry_until - datetime.datetime.now()).total_seconds()
+ try:
+ ret = func(*args, **kwargs)
+ if _expected and ret is None:
+ logging.debug("Function succeeds")
+ return ret
+ logging.debug("Function returned %s", ret)
+ except Exception as error:
+ logging.info("Function raised exception: %s", str(error))
+ ret = error
+
+ if seconds_left < 0:
+ logging.info("Retry timeout of %ds reached", _retry_timeout)
+ if isinstance(ret, Exception):
+ raise ret
+ return ret
+
+ logging.info(
+ "Sleeping %ds until next retry with %.1f retry time left",
+ retry_sleep,
+ seconds_left,
+ )
+ time.sleep(retry_sleep)
+
+ func_retry._original = func # pylint: disable=W0212
+ return func_retry
+
+ return _retry