summaryrefslogtreecommitdiffstats
path: root/tests/topotests/munet/testing/fixtures.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/munet/testing/fixtures.py')
-rw-r--r--tests/topotests/munet/testing/fixtures.py447
1 files changed, 447 insertions, 0 deletions
diff --git a/tests/topotests/munet/testing/fixtures.py b/tests/topotests/munet/testing/fixtures.py
new file mode 100644
index 0000000..25039df
--- /dev/null
+++ b/tests/topotests/munet/testing/fixtures.py
@@ -0,0 +1,447 @@
+# -*- coding: utf-8 eval: (blacken-mode 1) -*-
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# April 22 2022, Christian Hopps <chopps@gmail.com>
+#
+# Copyright (c) 2022, LabN Consulting, L.L.C
+#
+"""A module that implements pytest fixtures.
+
+To use in your project, in your conftest.py add:
+
+ from munet.testing.fixtures import *
+"""
+import contextlib
+import logging
+import os
+
+from pathlib import Path
+from typing import Union
+
+import pytest
+import pytest_asyncio
+
+from ..base import BaseMunet
+from ..base import Bridge
+from ..base import get_event_loop
+from ..cleanup import cleanup_current
+from ..cleanup import cleanup_previous
+from ..native import L3NodeMixin
+from ..parser import async_build_topology
+from ..parser import get_config
+from .util import async_pause_test
+from .util import pause_test
+
+
+@contextlib.asynccontextmanager
+async def achdir(ndir: Union[str, Path], desc=""):
+ odir = os.getcwd()
+ os.chdir(ndir)
+ if desc:
+ logging.debug("%s: chdir from %s to %s", desc, odir, ndir)
+ try:
+ yield
+ finally:
+ if desc:
+ logging.debug("%s: chdir back from %s to %s", desc, ndir, odir)
+ os.chdir(odir)
+
+
+@contextlib.contextmanager
+def chdir(ndir: Union[str, Path], desc=""):
+ odir = os.getcwd()
+ os.chdir(ndir)
+ if desc:
+ logging.debug("%s: chdir from %s to %s", desc, odir, ndir)
+ try:
+ yield
+ finally:
+ if desc:
+ logging.debug("%s: chdir back from %s to %s", desc, ndir, odir)
+ os.chdir(odir)
+
+
+def get_test_logdir(nodeid=None, module=False):
+ """Get log directory relative pathname."""
+ xdist_worker = os.getenv("PYTEST_XDIST_WORKER", "")
+ mode = os.getenv("PYTEST_XDIST_MODE", "no")
+
+ # nodeid: all_protocol_startup/test_all_protocol_startup.py::test_router_running
+ # may be missing "::testname" if module is True
+ if not nodeid:
+ nodeid = os.environ["PYTEST_CURRENT_TEST"].split(" ")[0]
+
+ cur_test = nodeid.replace("[", "_").replace("]", "_")
+ if module:
+ idx = cur_test.rfind("::")
+ path = cur_test if idx == -1 else cur_test[:idx]
+ testname = ""
+ else:
+ path, testname = cur_test.split("::")
+ testname = testname.replace("/", ".")
+ path = path[:-3].replace("/", ".")
+
+ # We use different logdir paths based on how xdist is running.
+ if mode == "each":
+ if module:
+ return os.path.join(path, "worker-logs", xdist_worker)
+ return os.path.join(path, testname, xdist_worker)
+ assert mode in ("no", "load", "loadfile", "loadscope"), f"Unknown dist mode {mode}"
+ return path if module else os.path.join(path, testname)
+
+
+def _push_log_handler(desc, logpath):
+ logpath = os.path.abspath(logpath)
+ logging.debug("conftest: adding %s logging at %s", desc, logpath)
+ root_logger = logging.getLogger()
+ handler = logging.FileHandler(logpath, mode="w")
+ fmt = logging.Formatter("%(asctime)s %(levelname)5s: %(message)s")
+ handler.setFormatter(fmt)
+ root_logger.addHandler(handler)
+ return handler
+
+
+def _pop_log_handler(handler):
+ root_logger = logging.getLogger()
+ logging.debug("conftest: removing logging handler %s", handler)
+ root_logger.removeHandler(handler)
+
+
+@contextlib.contextmanager
+def log_handler(desc, logpath):
+ handler = _push_log_handler(desc, logpath)
+ try:
+ yield
+ finally:
+ _pop_log_handler(handler)
+
+
+# =================
+# Sessions Fixtures
+# =================
+
+
+@pytest.fixture(autouse=True, scope="session")
+def session_autouse():
+ if "PYTEST_TOPOTEST_WORKER" not in os.environ:
+ is_worker = False
+ elif not os.environ["PYTEST_TOPOTEST_WORKER"]:
+ is_worker = False
+ else:
+ is_worker = True
+
+ if not is_worker:
+ # This is unfriendly to multi-instance
+ cleanup_previous()
+
+ # We never pop as we want to keep logging
+ _push_log_handler("session", "/tmp/unet-test/pytest-session.log")
+
+ yield
+
+ if not is_worker:
+ cleanup_current()
+
+
+# ===============
+# Module Fixtures
+# ===============
+
+
+@pytest.fixture(autouse=True, scope="module")
+def module_autouse(request):
+ logpath = get_test_logdir(request.node.name, True)
+ logpath = os.path.join("/tmp/unet-test", logpath, "pytest-exec.log")
+ with log_handler("module", logpath):
+ sdir = os.path.dirname(os.path.realpath(request.fspath))
+ with chdir(sdir, "module autouse fixture"):
+ yield
+
+ if BaseMunet.g_unet:
+ raise Exception("Base Munet was not cleaned up/deleted")
+
+
+@pytest.fixture(scope="module")
+def event_loop():
+ """Create an instance of the default event loop for the session."""
+ loop = get_event_loop()
+ try:
+ logging.info("event_loop_fixture: yielding with new event loop watcher")
+ yield loop
+ finally:
+ loop.close()
+
+
+@pytest.fixture(scope="module")
+def rundir_module():
+ d = os.path.join("/tmp/unet-test", get_test_logdir(module=True))
+ logging.debug("conftest: test module rundir %s", d)
+ return d
+
+
+async def _unet_impl(
+ _rundir, _pytestconfig, unshare=None, top_level_pidns=None, param=None
+):
+ try:
+ # Default is not to unshare inline if not specified otherwise
+ unshare_default = False
+ pidns_default = True
+ if isinstance(param, (tuple, list)):
+ pidns_default = bool(param[2]) if len(param) > 2 else True
+ unshare_default = bool(param[1]) if len(param) > 1 else False
+ param = str(param[0])
+ elif isinstance(param, bool):
+ unshare_default = param
+ param = None
+ if unshare is None:
+ unshare = unshare_default
+ if top_level_pidns is None:
+ top_level_pidns = pidns_default
+
+ logging.info("unet fixture: basename=%s unshare_inline=%s", param, unshare)
+ _unet = await async_build_topology(
+ config=get_config(basename=param) if param else None,
+ rundir=_rundir,
+ unshare_inline=unshare,
+ top_level_pidns=top_level_pidns,
+ pytestconfig=_pytestconfig,
+ )
+ except Exception as error:
+ logging.debug(
+ "unet fixture: unet build failed: %s\nparam: %s",
+ error,
+ param,
+ exc_info=True,
+ )
+ pytest.skip(
+ f"unet fixture: unet build failed: {error}", allow_module_level=True
+ )
+ raise
+
+ try:
+ tasks = await _unet.run()
+ except Exception as error:
+ logging.debug("unet fixture: unet run failed: %s", error, exc_info=True)
+ await _unet.async_delete()
+ pytest.skip(f"unet fixture: unet run failed: {error}", allow_module_level=True)
+ raise
+
+ logging.debug("unet fixture: containers running")
+
+ # Pytest is supposed to always return even if exceptions
+ try:
+ yield _unet
+ except Exception as error:
+ logging.error("unet fixture: yield unet unexpected exception: %s", error)
+
+ logging.debug("unet fixture: module done, deleting unet")
+ await _unet.async_delete()
+
+ # No one ever awaits these so cancel them
+ logging.debug("unet fixture: cleanup")
+ for task in tasks:
+ task.cancel()
+
+ # Reset the class variables so auto number is predictable
+ logging.debug("unet fixture: resetting ords to 1")
+ L3NodeMixin.next_ord = 1
+ Bridge.next_ord = 1
+
+
+@pytest.fixture(scope="module")
+async def unet(request, rundir_module, pytestconfig): # pylint: disable=W0621
+ """A unet creating fixutre.
+
+ The request param is either the basename of the config file or a tuple of the form:
+ (basename, unshare, top_level_pidns), with the second and third elements boolean and
+ optional, defaulting to False, True.
+ """
+ param = request.param if hasattr(request, "param") else None
+ sdir = os.path.dirname(os.path.realpath(request.fspath))
+ async with achdir(sdir, "unet fixture"):
+ async for x in _unet_impl(rundir_module, pytestconfig, param=param):
+ yield x
+
+
+@pytest.fixture(scope="module")
+async def unet_share(request, rundir_module, pytestconfig): # pylint: disable=W0621
+ """A unet creating fixutre.
+
+ This share variant keeps munet from unsharing the process to a new namespace so that
+ root level commands and actions are execute on the host, normally they are executed
+ in the munet namespace which allowing things like scapy inline in tests to work.
+
+ The request param is either the basename of the config file or a tuple of the form:
+ (basename, top_level_pidns), the second value is a boolean.
+ """
+ param = request.param if hasattr(request, "param") else None
+ if isinstance(param, (tuple, list)):
+ param = (param[0], False, param[1])
+ sdir = os.path.dirname(os.path.realpath(request.fspath))
+ async with achdir(sdir, "unet_share fixture"):
+ async for x in _unet_impl(
+ rundir_module, pytestconfig, unshare=False, param=param
+ ):
+ yield x
+
+
+@pytest.fixture(scope="module")
+async def unet_unshare(request, rundir_module, pytestconfig): # pylint: disable=W0621
+ """A unet creating fixutre.
+
+ This unshare variant has the top level munet unshare the process inline so that
+ root level commands and actions are execute in a new namespace. This allows things
+ like scapy inline in tests to work.
+
+ The request param is either the basename of the config file or a tuple of the form:
+ (basename, top_level_pidns), the second value is a boolean.
+ """
+ param = request.param if hasattr(request, "param") else None
+ if isinstance(param, (tuple, list)):
+ param = (param[0], True, param[1])
+ sdir = os.path.dirname(os.path.realpath(request.fspath))
+ async with achdir(sdir, "unet_unshare fixture"):
+ async for x in _unet_impl(
+ rundir_module, pytestconfig, unshare=True, param=param
+ ):
+ yield x
+
+
+# =================
+# Function Fixtures
+# =================
+
+
+@pytest.fixture(autouse=True, scope="function")
+async def function_autouse(request):
+ async with achdir(
+ os.path.dirname(os.path.realpath(request.fspath)), "func.fixture"
+ ):
+ yield
+
+
+@pytest.fixture(autouse=True)
+async def check_for_pause(request, pytestconfig):
+ # When we unshare inline we can't pause in the pytest_runtest_makereport hook
+ # so do it here.
+ if BaseMunet.g_unet and BaseMunet.g_unet.unshare_inline:
+ pause = bool(pytestconfig.getoption("--pause"))
+ if pause:
+ await async_pause_test(f"XXX before test '{request.node.name}'")
+ yield
+
+
+@pytest.fixture(scope="function")
+def stepf(pytestconfig):
+ class Stepnum:
+ """Track the stepnum in closure."""
+
+ num = 0
+
+ def inc(self):
+ self.num += 1
+
+ pause = pytestconfig.getoption("pause")
+ stepnum = Stepnum()
+
+ def stepfunction(desc=""):
+ desc = f": {desc}" if desc else ""
+ if pause:
+ pause_test(f"before step {stepnum.num}{desc}")
+ logging.info("STEP %s%s", stepnum.num, desc)
+ stepnum.inc()
+
+ return stepfunction
+
+
+@pytest_asyncio.fixture(scope="function")
+async def astepf(pytestconfig):
+ class Stepnum:
+ """Track the stepnum in closure."""
+
+ num = 0
+
+ def inc(self):
+ self.num += 1
+
+ pause = pytestconfig.getoption("pause")
+ stepnum = Stepnum()
+
+ async def stepfunction(desc=""):
+ desc = f": {desc}" if desc else ""
+ if pause:
+ await async_pause_test(f"before step {stepnum.num}{desc}")
+ logging.info("STEP %s%s", stepnum.num, desc)
+ stepnum.inc()
+
+ return stepfunction
+
+
+@pytest.fixture(scope="function")
+def rundir():
+ d = os.path.join("/tmp/unet-test", get_test_logdir(module=False))
+ logging.debug("conftest: test function rundir %s", d)
+ return d
+
+
+# Configure logging
+@pytest.hookimpl(hookwrapper=True, tryfirst=True)
+def pytest_runtest_setup(item):
+ d = os.path.join(
+ "/tmp/unet-test", get_test_logdir(nodeid=item.nodeid, module=False)
+ )
+ config = item.config
+ logging_plugin = config.pluginmanager.get_plugin("logging-plugin")
+ filename = Path(d, "pytest-exec.log")
+ logging_plugin.set_log_path(str(filename))
+ logging.debug("conftest: test function setup: rundir %s", d)
+ yield
+
+
+@pytest.fixture
+async def unet_perfunc(request, rundir, pytestconfig): # pylint: disable=W0621
+ param = request.param if hasattr(request, "param") else None
+ async for x in _unet_impl(rundir, pytestconfig, param=param):
+ yield x
+
+
+@pytest.fixture
+async def unet_perfunc_unshare(request, rundir, pytestconfig): # pylint: disable=W0621
+ """Build unet per test function with an optional topology basename parameter.
+
+ The fixture can be parameterized to choose different config files.
+ For example, use as follows to run the test with unet_perfunc configured
+ first with a config file named `cfg1.yaml` then with config file `cfg2.yaml`
+ (where the actual files could end with `json` or `toml` rather than `yaml`).
+
+ @pytest.mark.parametrize(
+ "unet_perfunc", ["cfg1", "cfg2]", indirect=["unet_perfunc"]
+ )
+ def test_example(unet_perfunc)
+ """
+ param = request.param if hasattr(request, "param") else None
+ async for x in _unet_impl(rundir, pytestconfig, unshare=True, param=param):
+ yield x
+
+
+@pytest.fixture
+async def unet_perfunc_share(request, rundir, pytestconfig): # pylint: disable=W0621
+ """Build unet per test function with an optional topology basename parameter.
+
+ This share variant keeps munet from unsharing the process to a new namespace so that
+ root level commands and actions are execute on the host, normally they are executed
+ in the munet namespace which allowing things like scapy inline in tests to work.
+
+ The fixture can be parameterized to choose different config files. For example, use
+ as follows to run the test with unet_perfunc configured first with a config file
+ named `cfg1.yaml` then with config file `cfg2.yaml` (where the actual files could
+ end with `json` or `toml` rather than `yaml`).
+
+ @pytest.mark.parametrize(
+ "unet_perfunc", ["cfg1", "cfg2]", indirect=["unet_perfunc"]
+ )
+ def test_example(unet_perfunc)
+ """
+ param = request.param if hasattr(request, "param") else None
+ async for x in _unet_impl(rundir, pytestconfig, unshare=False, param=param):
+ yield x