diff options
Diffstat (limited to 'tests/topotests/munet/mutest/userapi.py')
-rw-r--r-- | tests/topotests/munet/mutest/userapi.py | 1111 |
1 files changed, 1111 insertions, 0 deletions
diff --git a/tests/topotests/munet/mutest/userapi.py b/tests/topotests/munet/mutest/userapi.py new file mode 100644 index 0000000..1df8c0d --- /dev/null +++ b/tests/topotests/munet/mutest/userapi.py @@ -0,0 +1,1111 @@ +# -*- coding: utf-8 eval: (blacken-mode 1) -*- +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Copyright 2017, 2022, LabN Consulting, L.L.C. +"""Mutest is a simple send/expect based testing framework. + +This module implements the basic send/expect functionality for mutest. The test +developer first creates a munet topology (:ref:`munet-config`) and then writes test +scripts ("test cases") which are composed of calls to the functions defined below +("steps"). In short these are: + +Send/Expect functions: + + - :py:func:`step` + + - :py:func:`step_json` + + - :py:func:`match_step` + + - :py:func:`match_step_json` + + - :py:func:`wait_step` + + - :py:func:`wait_step_json` + +Control/Utility functions: + + - :py:func:`script_dir` + + - :py:func:`include` + + - :py:func:`log` + + - :py:func:`test` + +Test scripts are located by the :command:`mutest` command by their name. The name of a +test script should take the form ``mutest_TESTNAME.py`` where ``TESTNAME`` is replaced +with a user chosen name for the test case. + +Here's a simple example test script which first checks that a specific forwarding entry +is in the FIB for the IP destination ``10.0.1.1``. Then it checks repeatedly for up to +10 seconds for a second forwarding entry in the FIB for the IP destination ``10.0.2.1``. + +.. code-block:: python + + match_step("r1", 'vtysh -c "show ip fib 10.0.1.1"', "Routing entry for 10.0.1.0/24", + "Check for FIB entry for 10.0.1.1") + + wait_step("r1", + 'vtysh -c "show ip fib 10.0.2.1"', + "Routing entry for 10.0.2.0/24", + desc="Check for FIB entry for 10.0.2.1", + timeout=10) + +Notice that the call arguments can be specified by their correct position in the list or +using keyword names, and they can also be specified over multiple lines if preferred. + +All of the functions are documented and defined below. +""" + +# pylint: disable=global-statement + +import functools +import json +import logging +import pprint +import re +import time + +from pathlib import Path +from typing import Any +from typing import Union + +from deepdiff import DeepDiff as json_cmp + +from munet.base import Commander + + +class TestCaseInfo: + """Object to hold nestable TestCase Results.""" + + def __init__(self, tag: str, name: str, path: Path): + self.path = path.absolute() + self.tag = tag + self.name = name + self.steps = 0 + self.passed = 0 + self.failed = 0 + self.start_time = time.time() + self.step_start_time = self.start_time + self.run_time = None + + def __repr__(self): + return ( + f"TestCaseInfo({self.tag} {self.name} steps {self.steps} " + f"p {self.passed} f {self.failed} path {self.path})" + ) + + +class TestCase: + """A mutest testcase. + + This is normally meant to be used internally by the mutest command to + implement the user API. See README-mutest.org for usage details on the + user API. + + Args: + tag: identity of the test in a run. (x.x...) + name: the name of the test case + path: the test file that is being executed. + targets: a dictionary of objects which implement ``cmd_nostatus(str)`` + output_logger: a logger for output and other messages from the test. + result_logger: a logger to output the results of test steps to. + full_summary: if True then print entire doctstring instead of + only the first line in the results report + + Attributes: + tag: identity of the test in a run + name: the name of the test + targets: dictionary of targets. + + steps: total steps executed so far. + passed: number of passing steps. + failed: number of failing steps. + + last: the last command output. + last_m: the last result of re.search during a matching step on the output with + newlines converted to spaces. + + :meta private: + """ + + # sum_hfmt = "{:5.5s} {:4.4s} {:>6.6s} {}" + # sum_dfmt = "{:5s} {:4.4s} {:^6.6s} {}" + sum_fmt = "%-8.8s %4.4s %{}s %6s %s" + + def __init__( + self, + tag: int, + name: str, + path: Path, + targets: dict, + output_logger: logging.Logger = None, + result_logger: logging.Logger = None, + full_summary: bool = False, + ): + + self.info = TestCaseInfo(tag, name, path) + self.__saved_info = [] + self.__short_doc_header = not full_summary + + self.__space_before_result = False + + # we are only ever in a section once, an include ends a section + # so are never in section+include, and another section ends a + # section, so we don't need __in_section to be save in the + # TestCaseInfo struct. + self.__in_section = False + + self.targets = targets + + self.last = "" + self.last_m = None + + self.rlog = result_logger + self.olog = output_logger + self.logf = functools.partial(self.olog.log, logging.INFO) + + oplog = logging.getLogger("mutest.oper") + self.oplogf = oplog.debug + self.oplogf("new TestCase: tag: %s name: %s path: %s", tag, name, path) + + # find the longerst target name and make target field that wide + nmax = max(len(x) for x in targets) + nmax = max(nmax, len("TARGET")) + self.sum_fmt = TestCase.sum_fmt.format(nmax) + + # Let's keep this out of summary for now + self.rlog.debug(self.sum_fmt, "NUMBER", "STAT", "TARGET", "TIME", "DESCRIPTION") + self.rlog.debug("-" * 70) + + @property + def tag(self): + return self.info.tag + + @property + def name(self): + return self.info.name + + @property + def steps(self): + return self.info.steps + + @property + def passed(self): + return self.info.passed + + @property + def failed(self): + return self.info.failed + + def execute(self): + """Execute the test case. + + :meta private: + """ + assert TestCase.g_tc is None + self.oplogf("execute") + try: + TestCase.g_tc = self + e = self.__exec_script(self.info.path, True, False) + except BaseException: + self.__end_test() + raise + return *self.__end_test(), e + + def __del__(self): + if TestCase.g_tc is self: + logging.error("Internal error, TestCase.__end_test() was not called!") + TestCase.g_tc = None + + def __push_execinfo(self, path: Path): + self.oplogf( + "__push_execinfo: path: %s current top is %s", + path, + pprint.pformat(self.info), + ) + newname = self.name + path.stem + self.info.steps += 1 + self.__saved_info.append(self.info) + tag = f"{self.info.tag}.{self.info.steps}" + self.info = TestCaseInfo(tag, newname, path) + self.oplogf("__push_execinfo: now on top: %s", pprint.pformat(self.info)) + + def __pop_execinfo(self): + # do something with tag? + finished_info = self.info + self.info = self.__saved_info.pop() + self.oplogf(" __pop_execinfo: poppped: %s", pprint.pformat(finished_info)) + self.oplogf(" __pop_execinfo: now on top: %s", pprint.pformat(self.info)) + return finished_info + + def __print_header(self, tag, header, add_newline=False): + # self.olog.info(self.sum_fmt, tag, "", "", "", header) + self.olog.info("== %s ==", f"TEST: {tag}. {header}") + if add_newline: + self.rlog.info("") + self.rlog.info("%s. %s", tag, header) + + def __exec_script(self, path, print_header, add_newline): + + # Below was the original method to avoid the global TestCase + # variable; however, we need global functions so we can import them + # into test scripts. Without imports pylint will complain about undefined + # functions and the resulting christmas tree of warnings is annoying. + # + # pylint: disable=possibly-unused-variable,exec-used,redefined-outer-name + # include = self.include + # log = self.logf + # match_step = self.match_step + # match_step_json = self.match_step_json + # step = self.step + # step_json = self.step_json + # test = self.test + # wait_step = self.wait_step + # wait_step_json = self.wait_step_json + + name = f"{path.stem}{self.tag}" + name = re.sub(r"\W|^(?=\d)", "_", name) + + _ok_result = "marker" + try: + self.oplogf("__exec_script: path %s", path) + script = open(path, "r", encoding="utf-8").read() + + # Load the script into a function. + script = script.strip() + s2 = ( + # f"async def _{name}(ok_result):\n" + f"def _{name}(ok_result):\n" + + " " + + script.replace("\n", "\n ") + + "\n return ok_result\n" + + "\n" + ) + exec(s2) + + # Extract any docstring as a title. + if print_header: + title = locals()[f"_{name}"].__doc__.lstrip() + if self.__short_doc_header and (title := title.lstrip()): + if (idx := title.find("\n")) != -1: + title = title[:idx].strip() + if not title: + title = f"Test from file: {self.info.path.name}" + self.__print_header(self.info.tag, title, add_newline) + self.__space_before_result = False + + # Execute the function. + result = locals()[f"_{name}"](_ok_result) + + # Here's where we can do async in the future if we want. + # result = await locals()[f"_{name}"](_ok_result) + except Exception as error: + logging.error( + "Unexpected exception executing %s: %s", name, error, exc_info=True + ) + return error + else: + if result is not _ok_result: + logging.info("%s returned early, result: %s", name, result) + else: + self.oplogf("__exec_script: name %s completed normally", name) + return None + + def __post_result(self, target, success, rstr, logstr=None): + self.oplogf( + "__post_result: target: %s success %s rstr %s", target, success, rstr + ) + if success: + self.info.passed += 1 + status = "PASS" + outlf = self.logf + reslf = self.rlog.info + else: + self.info.failed += 1 + status = "FAIL" + outlf = self.olog.warning + reslf = self.rlog.warning + + self.info.steps += 1 + if logstr is not None: + outlf("R:%d %s: %s" % (self.steps, status, logstr)) + + run_time = time.time() - self.info.step_start_time + + stepstr = f"{self.tag}.{self.steps}" + rtimes = _delta_time_str(run_time) + + if self.__space_before_result: + self.rlog.info("") + self.__space_before_result = False + + reslf(self.sum_fmt, stepstr, status, target, rtimes, rstr) + + # start counting for next step now + self.info.step_start_time = time.time() + + def __end_test(self) -> (int, int): + """End the test log final results. + + Returns: + number of steps, number passed, number failed, run time. + """ + self.oplogf("__end_test: __in_section: %s", self.__in_section) + if self.__in_section: + self.__end_section() + + passed, failed = self.info.passed, self.info.failed + + # No close for loggers + # self.olog.close() + # self.rlog.close() + self.olog = None + self.rlog = None + + assert ( + TestCase.g_tc == self + ), "TestCase global unexpectedly someon else in __end_test" + TestCase.g_tc = None + + self.info.run_time = time.time() - self.info.start_time + return passed, failed + + def _command( + self, + target: str, + cmd: str, + ) -> str: + """Execute a ``cmd`` and return result. + + Args: + target: the target to execute the command on. + cmd: string to execut on the target. + """ + out = self.targets[target].cmd_nostatus(cmd, warn=False) + self.last = out = out.rstrip() + report = out if out else "<no output>" + self.logf("COMMAND OUTPUT:\n%s", report) + return out + + def _command_json( + self, + target: str, + cmd: str, + ) -> dict: + """Execute a json ``cmd`` and return json result. + + Args: + target: the target to execute the command on. + cmd: string to execut on the target. + """ + out = self.targets[target].cmd_nostatus(cmd, warn=False) + self.last = out = out.rstrip() + try: + js = json.loads(out) + except Exception as error: + js = {} + self.olog.warning( + "JSON load failed. Check command output is in JSON format: %s", + error, + ) + self.logf("COMMAND OUTPUT:\n%s", out) + return js + + def _match_command( + self, + target: str, + cmd: str, + match: str, + expect_fail: bool, + flags: int, + ) -> (bool, Union[str, list]): + """Execute a ``cmd`` and check result. + + Args: + target: the target to execute the command on. + cmd: string to execute on the target. + match: regex to ``re.search()`` for in output. + expect_fail: if True then succeed when the regexp doesn't match. + flags: python regex flags to modify matching behavior + + Returns: + (success, matches): if the match fails then "matches" will be None, + otherwise if there were matching groups then groups() will be returned in + ``matches`` otherwise group(0) (i.e., the matching text). + """ + out = self._command(target, cmd) + search = re.search(match, out, flags) + self.last_m = search + if search is None: + success = expect_fail + ret = None + else: + success = not expect_fail + ret = search.groups() + if not ret: + ret = search.group(0) + + level = logging.DEBUG if success else logging.WARNING + self.olog.log(level, "matched:%s:", ret) + return success, ret + + def _match_command_json( + self, + target: str, + cmd: str, + match: Union[str, dict], + expect_fail: bool, + ) -> Union[str, dict]: + """Execute a json ``cmd`` and check result. + + Args: + target: the target to execute the command on. + cmd: string to execut on the target. + match: A json ``str`` or object (``dict``) to compare against the json + output from ``cmd``. + expect_fail: if True then succeed when the json doesn't match. + """ + js = self._command_json(target, cmd) + try: + expect = json.loads(match) + except Exception as error: + expect = {} + self.olog.warning( + "JSON load failed. Check match value is in JSON format: %s", error + ) + + if json_diff := json_cmp(expect, js): + success = expect_fail + if not success: + self.logf("JSON DIFF:%s:" % json_diff) + return success, json_diff + + success = not expect_fail + return success, js + + def _wait( + self, + target: str, + cmd: str, + match: Union[str, dict], + is_json: bool, + timeout: float, + interval: float, + expect_fail: bool, + flags: int, + ) -> Union[str, dict]: + """Execute a command repeatedly waiting for result until timeout.""" + startt = time.time() + endt = startt + timeout + + success = False + ret = None + while not success and time.time() < endt: + if is_json: + success, ret = self._match_command_json(target, cmd, match, expect_fail) + else: + success, ret = self._match_command( + target, cmd, match, expect_fail, flags + ) + if not success: + time.sleep(interval) + return success, ret + + # --------------------- + # Public APIs for User + # --------------------- + + def include(self, pathname: str, new_section: bool = False): + """See :py:func:`~munet.mutest.userapi.include`. + + :meta private: + """ + path = Path(pathname) + path = self.info.path.parent.joinpath(path) + + self.oplogf( + "include: new path: %s create section: %s currently __in_section: %s", + path, + new_section, + self.__in_section, + ) + + if new_section: + self.oplogf("include: starting new exec section") + self.__start_exec_section(path) + our_info = self.info + # Note we do *not* mark __in_section True + else: + # swap the current path inside the top info + old_path = self.info.path + self.info.path = path + self.oplogf("include: swapped info path: new %s old %s", path, old_path) + + self.__exec_script(path, print_header=new_section, add_newline=new_section) + + if new_section: + # Something within the section creating include has also created a section + # end it, sections do not cross section creating file boundaries + if self.__in_section: + self.oplogf( + "include done: path: %s __in_section calling __end_section", path + ) + self.__end_section() + + # We should now be back to the info we started with, b/c we don't actually + # start a new section (__in_section) that then could have been ended inside + # the included file. + assert our_info == self.info + + self.oplogf( + "include done: path: %s new_section calling __end_section", path + ) + self.__end_section() + else: + # The current top path could be anything due to multiple inline includes as + # well as section swap in and out. Forcibly return the top path to the file + # we are returning to + self.info.path = old_path + self.oplogf("include: restored info path: %s", old_path) + + def __end_section(self): + self.oplogf("__end_section: __in_section: %s", self.__in_section) + info = self.__pop_execinfo() + passed, failed = info.passed, info.failed + self.info.passed += passed + self.info.failed += failed + self.__space_before_result = True + self.oplogf("__end_section setting __in_section to False") + self.__in_section = False + + def __start_exec_section(self, path): + self.oplogf("__start_exec_section: __in_section: %s", self.__in_section) + if self.__in_section: + self.__end_section() + + self.__push_execinfo(path) + self.__space_before_result = False + self.oplogf("NOT setting __in_section to True") + assert not self.__in_section + + def section(self, desc: str): + """See :py:func:`~munet.mutest.userapi.section`. + + :meta private: + """ + self.oplogf("section: __in_section: %s", self.__in_section) + # Grab path before we pop the current info off the top + path = self.info.path + old_steps = self.info.steps + + if self.__in_section: + self.__end_section() + + self.__push_execinfo(path) + add_nl = self.info.steps <= old_steps + + self.__space_before_result = False + self.__in_section = True + self.oplogf(" section setting __in_section to True") + self.__print_header(self.info.tag, desc, add_nl) + + def step(self, target: str, cmd: str) -> str: + """See :py:func:`~munet.mutest.userapi.step`. + + :meta private: + """ + self.logf( + "#%s.%s:%s:STEP:%s:%s", + self.tag, + self.steps + 1, + self.info.path, + target, + cmd, + ) + return self._command(target, cmd) + + def step_json(self, target: str, cmd: str) -> dict: + """See :py:func:`~munet.mutest.userapi.step_json`. + + :meta private: + """ + self.logf( + "#%s.%s:%s:STEP_JSON:%s:%s", + self.tag, + self.steps + 1, + self.info.path, + target, + cmd, + ) + return self._command_json(target, cmd) + + def match_step( + self, + target: str, + cmd: str, + match: str, + desc: str = "", + expect_fail: bool = False, + flags: int = re.DOTALL, + ) -> (bool, Union[str, list]): + """See :py:func:`~munet.mutest.userapi.match_step`. + + :meta private: + """ + self.logf( + "#%s.%s:%s:MATCH_STEP:%s:%s:%s:%s:%s:%s", + self.tag, + self.steps + 1, + self.info.path, + target, + cmd, + match, + desc, + expect_fail, + flags, + ) + success, ret = self._match_command(target, cmd, match, expect_fail, flags) + if desc: + self.__post_result(target, success, desc) + return success, ret + + def test_step(self, expr_or_value: Any, desc: str, target: str = "") -> bool: + """See :py:func:`~munet.mutest.userapi.test`. + + :meta private: + """ + success = bool(expr_or_value) + self.__post_result(target, success, desc) + return success + + def match_step_json( + self, + target: str, + cmd: str, + match: Union[str, dict], + desc: str = "", + expect_fail: bool = False, + ) -> (bool, Union[str, dict]): + """See :py:func:`~munet.mutest.userapi.match_step_json`. + + :meta private: + """ + self.logf( + "#%s.%s:%s:MATCH_STEP_JSON:%s:%s:%s:%s:%s", + self.tag, + self.steps + 1, + self.info.path, + target, + cmd, + match, + desc, + expect_fail, + ) + success, ret = self._match_command_json(target, cmd, match, expect_fail) + if desc: + self.__post_result(target, success, desc) + return success, ret + + def wait_step( + self, + target: str, + cmd: str, + match: Union[str, dict], + desc: str = "", + timeout=10, + interval=0.5, + expect_fail: bool = False, + flags: int = re.DOTALL, + ) -> (bool, Union[str, list]): + """See :py:func:`~munet.mutest.userapi.wait_step`. + + :meta private: + """ + if interval is None: + interval = min(timeout / 20, 0.25) + self.logf( + "#%s.%s:%s:WAIT_STEP:%s:%s:%s:%s:%s:%s:%s:%s", + self.tag, + self.steps + 1, + self.info.path, + target, + cmd, + match, + timeout, + interval, + desc, + expect_fail, + flags, + ) + success, ret = self._wait( + target, cmd, match, False, timeout, interval, expect_fail, flags + ) + if desc: + self.__post_result(target, success, desc) + return success, ret + + def wait_step_json( + self, + target: str, + cmd: str, + match: Union[str, dict], + desc: str = "", + timeout=10, + interval=None, + expect_fail: bool = False, + ) -> (bool, Union[str, dict]): + """See :py:func:`~munet.mutest.userapi.wait_step_json`. + + :meta private: + """ + if interval is None: + interval = min(timeout / 20, 0.25) + self.logf( + "#%s.%s:%s:WAIT_STEP:%s:%s:%s:%s:%s:%s:%s", + self.tag, + self.steps + 1, + self.info.path, + target, + cmd, + match, + timeout, + interval, + desc, + expect_fail, + ) + success, ret = self._wait( + target, cmd, match, True, timeout, interval, expect_fail, 0 + ) + if desc: + self.__post_result(target, success, desc) + return success, ret + + +# A non-rentrant global to allow for simplified operations +TestCase.g_tc = None + +# pylint: disable=protected-access + + +def _delta_time_str(run_time: float) -> str: + if run_time < 0.0001: + return "0.0" + if run_time < 0.001: + return f"{run_time:1.4f}" + if run_time < 0.01: + return f"{run_time:2.3f}" + if run_time < 0.1: + return f"{run_time:3.2f}" + if run_time < 100: + return f"{run_time:4.1f}" + return f"{run_time:5f}s" + + +def section(desc: str): + """Start a new section for steps, with a description. + + This starts a new section of tests. The result is basically + the same as doing a non-inline include. The current test number + is used to form a new sub-set of test steps. So if the current + test number is 2.3, a section will now number subsequent steps + 2.3.1, 2.3.2, ... + + A subsequent :py:func:`section` or non-inline :py:func:`include` + call ends the current section and advances the base test number. + + Args: + desc: the description for the new section. + """ + TestCase.g_tc.section(desc) + + +def log(fmt, *args, **kwargs): + """Log a message in the testcase output log.""" + return TestCase.g_tc.logf(fmt, *args, **kwargs) + + +def include(pathname: str, new_section=False): + """Include a file as part of testcase. + + Args: + pathname: the file to include. + new_section: if a new section should be created, otherwise + commands are executed inline. + """ + return TestCase.g_tc.include(pathname, new_section) + + +def script_dir() -> Path: + """The pathname to the directory containing the current script file. + + When an include() is called the script_dir is updated to be current with the + includeded file, and is reverted to the previous value when the include completes. + """ + return TestCase.g_tc.info.path.parent + + +def get_target(name: str) -> Commander: + """Get the target object with the given ``name``.""" + return TestCase.g_tc.targets[name] + + +def step(target: str, cmd: str) -> str: + """Execute a ``cmd`` on a ``target`` and return the output. + + Args: + target: the target to execute the ``cmd`` on. + cmd: string to execute on the target. + + Returns: + Returns the ``str`` output of the ``cmd``. + """ + return TestCase.g_tc.step(target, cmd) + + +def step_json(target: str, cmd: str) -> dict: + """Execute a json ``cmd`` on a ``target`` and return the json object. + + Args: + target: the target to execute the ``cmd`` on. + cmd: string to execute on the target. + + Returns: + Returns the json object after parsing the ``cmd`` output. + + If json parse fails, a warning is logged and an empty ``dict`` is used. + """ + return TestCase.g_tc.step_json(target, cmd) + + +def test_step(expr_or_value: Any, desc: str, target: str = "") -> bool: + """Evaluates ``expr_or_value`` and posts a result base on it bool(expr). + + If ``expr_or_value`` evaluates to a positive result (i.e., True, non-zero, non-None, + non-empty string, non-empty list, etc..) then a PASS result is recorded, otherwise + record a FAIL is recorded. + + Args: + expr: an expression or value to evaluate + desc: description of this test step. + target: optional target to associate with this test in the result string. + + Returns: + A bool indicating the test PASS or FAIL result. + """ + return TestCase.g_tc.test_step(expr_or_value, desc, target) + + +def match_step( + target: str, + cmd: str, + match: str, + desc: str = "", + expect_fail: bool = False, + flags: int = re.DOTALL, +) -> (bool, Union[str, list]): + """Execute a ``cmd`` on a ``target`` check result. + + Execute ``cmd`` on ``target`` and check if the regexp in ``match`` + matches or doesn't match (according to the ``expect_fail`` value) the + ``cmd`` output. + + If the ``match`` regexp includes groups and if the match succeeds + the group values will be returned in a list, otherwise the command + output is returned. + + Args: + target: the target to execute the ``cmd`` on. + cmd: string to execut on the ``target``. + match: regex to match against output. + desc: description of test, if no description then no result is logged. + expect_fail: if True then succeed when the regexp doesn't match. + flags: python regex flags to modify matching behavior + + Returns: + Returns a 2-tuple. The first value is a bool indicating ``success``. + The second value will be a list from ``re.Match.groups()`` if non-empty, + otherwise ``re.Match.group(0)`` if there was a match otherwise None. + """ + return TestCase.g_tc.match_step(target, cmd, match, desc, expect_fail, flags) + + +def match_step_json( + target: str, + cmd: str, + match: Union[str, dict], + desc: str = "", + expect_fail: bool = False, +) -> (bool, Union[str, dict]): + """Execute a ``cmd`` on a ``target`` check result. + + Execute ``cmd`` on ``target`` and check if the json object in ``match`` + matches or doesn't match (according to the ``expect_fail`` value) the + json output from ``cmd``. + + Args: + target: the target to execute the ``cmd`` on. + cmd: string to execut on the ``target``. + match: A json ``str`` or object (``dict``) to compare against the json + output from ``cmd``. + desc: description of test, if no description then no result is logged. + expect_fail: if True then succeed if the a json doesn't match. + + Returns: + Returns a 2-tuple. The first value is a bool indicating ``success``. The + second value is a ``str`` diff if there is a difference found in the json + compare, otherwise the value is the json object (``dict``) from the ``cmd``. + + If json parse fails, a warning is logged and an empty ``dict`` is used. + """ + return TestCase.g_tc.match_step_json(target, cmd, match, desc, expect_fail) + + +def wait_step( + target: str, + cmd: str, + match: Union[str, dict], + desc: str = "", + timeout: float = 10.0, + interval: float = 0.5, + expect_fail: bool = False, + flags: int = re.DOTALL, +) -> (bool, Union[str, list]): + """Execute a ``cmd`` on a ``target`` repeatedly, looking for a result. + + Execute ``cmd`` on ``target``, every ``interval`` seconds for up to ``timeout`` + seconds until the output of ``cmd`` does or doesn't match (according to the + ``expect_fail`` value) the ``match`` value. + + Args: + target: the target to execute the ``cmd`` on. + cmd: string to execut on the ``target``. + match: regexp to match against output. + timeout: The number of seconds to repeat the ``cmd`` looking for a match + (or non-match if ``expect_fail`` is True). + interval: The number of seconds between running the ``cmd``. If not + specified the value is calculated from the timeout value so that on + average the cmd will execute 10 times. The minimum calculated interval + is .25s, shorter values can be passed explicitly. + desc: description of test, if no description then no result is logged. + expect_fail: if True then succeed when the regexp *doesn't* match. + flags: python regex flags to modify matching behavior + + Returns: + Returns a 2-tuple. The first value is a bool indicating ``success``. + The second value will be a list from ``re.Match.groups()`` if non-empty, + otherwise ``re.Match.group(0)`` if there was a match otherwise None. + """ + return TestCase.g_tc.wait_step( + target, cmd, match, desc, timeout, interval, expect_fail, flags + ) + + +def wait_step_json( + target: str, + cmd: str, + match: Union[str, dict], + desc: str = "", + timeout=10, + interval=None, + expect_fail: bool = False, +) -> (bool, Union[str, dict]): + """Execute a cmd repeatedly and wait for matching result. + + Execute ``cmd`` on ``target``, every ``interval`` seconds until + the output of ``cmd`` matches or doesn't match (according to the + ``expect_fail`` value) ``match``, for up to ``timeout`` seconds. + + ``match`` is a regular expression to search for in the output of ``cmd`` when + ``is_json`` is False. + + When ``is_json`` is True ``match`` must be a json object or a ``str`` which + parses into a json object. Likewise, the ``cmd`` output is parsed into a json + object and then a comparison is done between the two json objects. + + Args: + target: the target to execute the ``cmd`` on. + cmd: string to execut on the ``target``. + match: A json object or str representation of one to compare against json + output from ``cmd``. + desc: description of test, if no description then no result is logged. + timeout: The number of seconds to repeat the ``cmd`` looking for a match + (or non-match if ``expect_fail`` is True). + interval: The number of seconds between running the ``cmd``. If not + specified the value is calculated from the timeout value so that on + average the cmd will execute 10 times. The minimum calculated interval + is .25s, shorter values can be passed explicitly. + expect_fail: if True then succeed if the a json doesn't match. + + Returns: + Returns a 2-tuple. The first value is a bool indicating ``success``. + The second value is a ``str`` diff if there is a difference found in the + json compare, otherwise the value is a json object (dict) from the ``cmd`` + output. + + If json parse fails, a warning is logged and an empty ``dict`` is used. + """ + return TestCase.g_tc.wait_step_json( + target, cmd, match, desc, timeout, interval, expect_fail + ) + + +def luInclude(filename, CallOnFail=None): + """Backward compatible API, do not use in new tests.""" + return include(filename) + + +def luLast(usenl=False): + """Backward compatible API, do not use in new tests.""" + del usenl + return TestCase.g_tc.last_m + + +def luCommand( + target, + cmd, + regexp=".", + op="none", + result="", + ltime=10, + returnJson=False, + wait_time=0.5, +): + """Backward compatible API, do not use in new tests. + + Only non-json is verified to any degree of confidence by code inspection. + + For non-json should return match.group() if match else return bool(op == "fail"). + + For json if no diff return the json else diff return bool(op == "jsoncmp_fail") + bug if no json from output (fail parse) could maybe generate diff, which could + then return + """ + if op == "wait": + if returnJson: + return wait_step_json(target, cmd, regexp, result, ltime, wait_time) + + success, _ = wait_step(target, cmd, regexp, result, ltime, wait_time) + match = luLast() + if success and match is not None: + return match.group() + return success + + if op == "none": + if returnJson: + return step_json(target, cmd) + return step(target, cmd) + + if returnJson and op in ("jsoncmp_fail", "jsoncmp_pass"): + expect_fail = op == "jsoncmp_fail" + return match_step_json(target, cmd, regexp, result, expect_fail) + + assert not returnJson + assert op in ("fail", "pass") + expect_fail = op == "fail" + success, _ = match_step(target, cmd, regexp, result, expect_fail) + match = luLast() + if success and match is not None: + return match.group() + return success |