diff options
Diffstat (limited to 'tests/topotests/munet/testing/fixtures.py')
-rw-r--r-- | tests/topotests/munet/testing/fixtures.py | 447 |
1 files changed, 447 insertions, 0 deletions
diff --git a/tests/topotests/munet/testing/fixtures.py b/tests/topotests/munet/testing/fixtures.py new file mode 100644 index 0000000..25039df --- /dev/null +++ b/tests/topotests/munet/testing/fixtures.py @@ -0,0 +1,447 @@ +# -*- coding: utf-8 eval: (blacken-mode 1) -*- +# SPDX-License-Identifier: GPL-2.0-or-later +# +# April 22 2022, Christian Hopps <chopps@gmail.com> +# +# Copyright (c) 2022, LabN Consulting, L.L.C +# +"""A module that implements pytest fixtures. + +To use in your project, in your conftest.py add: + + from munet.testing.fixtures import * +""" +import contextlib +import logging +import os + +from pathlib import Path +from typing import Union + +import pytest +import pytest_asyncio + +from ..base import BaseMunet +from ..base import Bridge +from ..base import get_event_loop +from ..cleanup import cleanup_current +from ..cleanup import cleanup_previous +from ..native import L3NodeMixin +from ..parser import async_build_topology +from ..parser import get_config +from .util import async_pause_test +from .util import pause_test + + +@contextlib.asynccontextmanager +async def achdir(ndir: Union[str, Path], desc=""): + odir = os.getcwd() + os.chdir(ndir) + if desc: + logging.debug("%s: chdir from %s to %s", desc, odir, ndir) + try: + yield + finally: + if desc: + logging.debug("%s: chdir back from %s to %s", desc, ndir, odir) + os.chdir(odir) + + +@contextlib.contextmanager +def chdir(ndir: Union[str, Path], desc=""): + odir = os.getcwd() + os.chdir(ndir) + if desc: + logging.debug("%s: chdir from %s to %s", desc, odir, ndir) + try: + yield + finally: + if desc: + logging.debug("%s: chdir back from %s to %s", desc, ndir, odir) + os.chdir(odir) + + +def get_test_logdir(nodeid=None, module=False): + """Get log directory relative pathname.""" + xdist_worker = os.getenv("PYTEST_XDIST_WORKER", "") + mode = os.getenv("PYTEST_XDIST_MODE", "no") + + # nodeid: all_protocol_startup/test_all_protocol_startup.py::test_router_running + # may be missing "::testname" if module is True + if not nodeid: + nodeid = os.environ["PYTEST_CURRENT_TEST"].split(" ")[0] + + cur_test = nodeid.replace("[", "_").replace("]", "_") + if module: + idx = cur_test.rfind("::") + path = cur_test if idx == -1 else cur_test[:idx] + testname = "" + else: + path, testname = cur_test.split("::") + testname = testname.replace("/", ".") + path = path[:-3].replace("/", ".") + + # We use different logdir paths based on how xdist is running. + if mode == "each": + if module: + return os.path.join(path, "worker-logs", xdist_worker) + return os.path.join(path, testname, xdist_worker) + assert mode in ("no", "load", "loadfile", "loadscope"), f"Unknown dist mode {mode}" + return path if module else os.path.join(path, testname) + + +def _push_log_handler(desc, logpath): + logpath = os.path.abspath(logpath) + logging.debug("conftest: adding %s logging at %s", desc, logpath) + root_logger = logging.getLogger() + handler = logging.FileHandler(logpath, mode="w") + fmt = logging.Formatter("%(asctime)s %(levelname)5s: %(message)s") + handler.setFormatter(fmt) + root_logger.addHandler(handler) + return handler + + +def _pop_log_handler(handler): + root_logger = logging.getLogger() + logging.debug("conftest: removing logging handler %s", handler) + root_logger.removeHandler(handler) + + +@contextlib.contextmanager +def log_handler(desc, logpath): + handler = _push_log_handler(desc, logpath) + try: + yield + finally: + _pop_log_handler(handler) + + +# ================= +# Sessions Fixtures +# ================= + + +@pytest.fixture(autouse=True, scope="session") +def session_autouse(): + if "PYTEST_TOPOTEST_WORKER" not in os.environ: + is_worker = False + elif not os.environ["PYTEST_TOPOTEST_WORKER"]: + is_worker = False + else: + is_worker = True + + if not is_worker: + # This is unfriendly to multi-instance + cleanup_previous() + + # We never pop as we want to keep logging + _push_log_handler("session", "/tmp/unet-test/pytest-session.log") + + yield + + if not is_worker: + cleanup_current() + + +# =============== +# Module Fixtures +# =============== + + +@pytest.fixture(autouse=True, scope="module") +def module_autouse(request): + logpath = get_test_logdir(request.node.name, True) + logpath = os.path.join("/tmp/unet-test", logpath, "pytest-exec.log") + with log_handler("module", logpath): + sdir = os.path.dirname(os.path.realpath(request.fspath)) + with chdir(sdir, "module autouse fixture"): + yield + + if BaseMunet.g_unet: + raise Exception("Base Munet was not cleaned up/deleted") + + +@pytest.fixture(scope="module") +def event_loop(): + """Create an instance of the default event loop for the session.""" + loop = get_event_loop() + try: + logging.info("event_loop_fixture: yielding with new event loop watcher") + yield loop + finally: + loop.close() + + +@pytest.fixture(scope="module") +def rundir_module(): + d = os.path.join("/tmp/unet-test", get_test_logdir(module=True)) + logging.debug("conftest: test module rundir %s", d) + return d + + +async def _unet_impl( + _rundir, _pytestconfig, unshare=None, top_level_pidns=None, param=None +): + try: + # Default is not to unshare inline if not specified otherwise + unshare_default = False + pidns_default = True + if isinstance(param, (tuple, list)): + pidns_default = bool(param[2]) if len(param) > 2 else True + unshare_default = bool(param[1]) if len(param) > 1 else False + param = str(param[0]) + elif isinstance(param, bool): + unshare_default = param + param = None + if unshare is None: + unshare = unshare_default + if top_level_pidns is None: + top_level_pidns = pidns_default + + logging.info("unet fixture: basename=%s unshare_inline=%s", param, unshare) + _unet = await async_build_topology( + config=get_config(basename=param) if param else None, + rundir=_rundir, + unshare_inline=unshare, + top_level_pidns=top_level_pidns, + pytestconfig=_pytestconfig, + ) + except Exception as error: + logging.debug( + "unet fixture: unet build failed: %s\nparam: %s", + error, + param, + exc_info=True, + ) + pytest.skip( + f"unet fixture: unet build failed: {error}", allow_module_level=True + ) + raise + + try: + tasks = await _unet.run() + except Exception as error: + logging.debug("unet fixture: unet run failed: %s", error, exc_info=True) + await _unet.async_delete() + pytest.skip(f"unet fixture: unet run failed: {error}", allow_module_level=True) + raise + + logging.debug("unet fixture: containers running") + + # Pytest is supposed to always return even if exceptions + try: + yield _unet + except Exception as error: + logging.error("unet fixture: yield unet unexpected exception: %s", error) + + logging.debug("unet fixture: module done, deleting unet") + await _unet.async_delete() + + # No one ever awaits these so cancel them + logging.debug("unet fixture: cleanup") + for task in tasks: + task.cancel() + + # Reset the class variables so auto number is predictable + logging.debug("unet fixture: resetting ords to 1") + L3NodeMixin.next_ord = 1 + Bridge.next_ord = 1 + + +@pytest.fixture(scope="module") +async def unet(request, rundir_module, pytestconfig): # pylint: disable=W0621 + """A unet creating fixutre. + + The request param is either the basename of the config file or a tuple of the form: + (basename, unshare, top_level_pidns), with the second and third elements boolean and + optional, defaulting to False, True. + """ + param = request.param if hasattr(request, "param") else None + sdir = os.path.dirname(os.path.realpath(request.fspath)) + async with achdir(sdir, "unet fixture"): + async for x in _unet_impl(rundir_module, pytestconfig, param=param): + yield x + + +@pytest.fixture(scope="module") +async def unet_share(request, rundir_module, pytestconfig): # pylint: disable=W0621 + """A unet creating fixutre. + + This share variant keeps munet from unsharing the process to a new namespace so that + root level commands and actions are execute on the host, normally they are executed + in the munet namespace which allowing things like scapy inline in tests to work. + + The request param is either the basename of the config file or a tuple of the form: + (basename, top_level_pidns), the second value is a boolean. + """ + param = request.param if hasattr(request, "param") else None + if isinstance(param, (tuple, list)): + param = (param[0], False, param[1]) + sdir = os.path.dirname(os.path.realpath(request.fspath)) + async with achdir(sdir, "unet_share fixture"): + async for x in _unet_impl( + rundir_module, pytestconfig, unshare=False, param=param + ): + yield x + + +@pytest.fixture(scope="module") +async def unet_unshare(request, rundir_module, pytestconfig): # pylint: disable=W0621 + """A unet creating fixutre. + + This unshare variant has the top level munet unshare the process inline so that + root level commands and actions are execute in a new namespace. This allows things + like scapy inline in tests to work. + + The request param is either the basename of the config file or a tuple of the form: + (basename, top_level_pidns), the second value is a boolean. + """ + param = request.param if hasattr(request, "param") else None + if isinstance(param, (tuple, list)): + param = (param[0], True, param[1]) + sdir = os.path.dirname(os.path.realpath(request.fspath)) + async with achdir(sdir, "unet_unshare fixture"): + async for x in _unet_impl( + rundir_module, pytestconfig, unshare=True, param=param + ): + yield x + + +# ================= +# Function Fixtures +# ================= + + +@pytest.fixture(autouse=True, scope="function") +async def function_autouse(request): + async with achdir( + os.path.dirname(os.path.realpath(request.fspath)), "func.fixture" + ): + yield + + +@pytest.fixture(autouse=True) +async def check_for_pause(request, pytestconfig): + # When we unshare inline we can't pause in the pytest_runtest_makereport hook + # so do it here. + if BaseMunet.g_unet and BaseMunet.g_unet.unshare_inline: + pause = bool(pytestconfig.getoption("--pause")) + if pause: + await async_pause_test(f"XXX before test '{request.node.name}'") + yield + + +@pytest.fixture(scope="function") +def stepf(pytestconfig): + class Stepnum: + """Track the stepnum in closure.""" + + num = 0 + + def inc(self): + self.num += 1 + + pause = pytestconfig.getoption("pause") + stepnum = Stepnum() + + def stepfunction(desc=""): + desc = f": {desc}" if desc else "" + if pause: + pause_test(f"before step {stepnum.num}{desc}") + logging.info("STEP %s%s", stepnum.num, desc) + stepnum.inc() + + return stepfunction + + +@pytest_asyncio.fixture(scope="function") +async def astepf(pytestconfig): + class Stepnum: + """Track the stepnum in closure.""" + + num = 0 + + def inc(self): + self.num += 1 + + pause = pytestconfig.getoption("pause") + stepnum = Stepnum() + + async def stepfunction(desc=""): + desc = f": {desc}" if desc else "" + if pause: + await async_pause_test(f"before step {stepnum.num}{desc}") + logging.info("STEP %s%s", stepnum.num, desc) + stepnum.inc() + + return stepfunction + + +@pytest.fixture(scope="function") +def rundir(): + d = os.path.join("/tmp/unet-test", get_test_logdir(module=False)) + logging.debug("conftest: test function rundir %s", d) + return d + + +# Configure logging +@pytest.hookimpl(hookwrapper=True, tryfirst=True) +def pytest_runtest_setup(item): + d = os.path.join( + "/tmp/unet-test", get_test_logdir(nodeid=item.nodeid, module=False) + ) + config = item.config + logging_plugin = config.pluginmanager.get_plugin("logging-plugin") + filename = Path(d, "pytest-exec.log") + logging_plugin.set_log_path(str(filename)) + logging.debug("conftest: test function setup: rundir %s", d) + yield + + +@pytest.fixture +async def unet_perfunc(request, rundir, pytestconfig): # pylint: disable=W0621 + param = request.param if hasattr(request, "param") else None + async for x in _unet_impl(rundir, pytestconfig, param=param): + yield x + + +@pytest.fixture +async def unet_perfunc_unshare(request, rundir, pytestconfig): # pylint: disable=W0621 + """Build unet per test function with an optional topology basename parameter. + + The fixture can be parameterized to choose different config files. + For example, use as follows to run the test with unet_perfunc configured + first with a config file named `cfg1.yaml` then with config file `cfg2.yaml` + (where the actual files could end with `json` or `toml` rather than `yaml`). + + @pytest.mark.parametrize( + "unet_perfunc", ["cfg1", "cfg2]", indirect=["unet_perfunc"] + ) + def test_example(unet_perfunc) + """ + param = request.param if hasattr(request, "param") else None + async for x in _unet_impl(rundir, pytestconfig, unshare=True, param=param): + yield x + + +@pytest.fixture +async def unet_perfunc_share(request, rundir, pytestconfig): # pylint: disable=W0621 + """Build unet per test function with an optional topology basename parameter. + + This share variant keeps munet from unsharing the process to a new namespace so that + root level commands and actions are execute on the host, normally they are executed + in the munet namespace which allowing things like scapy inline in tests to work. + + The fixture can be parameterized to choose different config files. For example, use + as follows to run the test with unet_perfunc configured first with a config file + named `cfg1.yaml` then with config file `cfg2.yaml` (where the actual files could + end with `json` or `toml` rather than `yaml`). + + @pytest.mark.parametrize( + "unet_perfunc", ["cfg1", "cfg2]", indirect=["unet_perfunc"] + ) + def test_example(unet_perfunc) + """ + param = request.param if hasattr(request, "param") else None + async for x in _unet_impl(rundir, pytestconfig, unshare=False, param=param): + yield x |