summaryrefslogtreecommitdiffstats
path: root/tests/topotests/munet/mutest/userapi.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/topotests/munet/mutest/userapi.py')
-rw-r--r--tests/topotests/munet/mutest/userapi.py102
1 files changed, 96 insertions, 6 deletions
diff --git a/tests/topotests/munet/mutest/userapi.py b/tests/topotests/munet/mutest/userapi.py
index 7967dd0..abc63af 100644
--- a/tests/topotests/munet/mutest/userapi.py
+++ b/tests/topotests/munet/mutest/userapi.py
@@ -65,8 +65,11 @@ import json
import logging
import pprint
import re
+import subprocess
+import sys
import time
+from argparse import Namespace
from pathlib import Path
from typing import Any
from typing import Union
@@ -76,6 +79,51 @@ from deepdiff import DeepDiff as json_cmp
from munet.base import Commander
+class ScriptError(Exception):
+ """An unrecoverable script failure."""
+
+
+class CLIOnErrorError(Exception):
+ """Enter CLI after error."""
+
+
+def pause_test(desc=""):
+ isatty = sys.stdout.isatty()
+ if not isatty:
+ desc = f" for {desc}" if desc else ""
+ logging.info("NO PAUSE on non-tty terminal%s", desc)
+ return
+
+ while True:
+ if desc:
+ print(f"\n== PAUSING: {desc} ==")
+ try:
+ user = input('PAUSED, "cli" for CLI, "pdb" to debug, "Enter" to continue: ')
+ except EOFError:
+ print("^D...continuing")
+ break
+ user = user.strip()
+ if user == "cli":
+ raise CLIOnErrorError()
+ if user == "pdb":
+ breakpoint() # pylint: disable=W1515
+ elif user:
+ print(f'Unrecognized input: "{user}"')
+ else:
+ break
+
+
+def act_on_result(success, args, desc=""):
+ if args.pause:
+ pause_test(desc)
+ elif success:
+ return
+ if args.cli_on_error:
+ raise CLIOnErrorError()
+ if args.pause_on_error:
+ pause_test(desc)
+
+
class TestCaseInfo:
"""Object to hold nestable TestCase Results."""
@@ -140,6 +188,7 @@ class TestCase:
name: str,
path: Path,
targets: dict,
+ args: Namespace,
output_logger: logging.Logger = None,
result_logger: logging.Logger = None,
full_summary: bool = False,
@@ -157,6 +206,7 @@ class TestCase:
self.__in_section = False
self.targets = targets
+ self.args = args
self.last = ""
self.last_m = None
@@ -285,7 +335,10 @@ class TestCase:
# Extract any docstring as a title.
if print_header:
- title = locals()[f"_{name}"].__doc__.lstrip()
+ title = locals()[f"_{name}"].__doc__
+ if title is None:
+ title = ""
+ title = title.lstrip()
if self.__short_doc_header and (title := title.lstrip()):
if (idx := title.find("\n")) != -1:
title = title[:idx].strip()
@@ -299,6 +352,10 @@ class TestCase:
# Here's where we can do async in the future if we want.
# result = await locals()[f"_{name}"](_ok_result)
+ except ScriptError as error:
+ return error
+ except CLIOnErrorError:
+ raise
except Exception as error:
logging.error(
"Unexpected exception executing %s: %s", name, error, exc_info=True
@@ -381,7 +438,9 @@ class TestCase:
target: the target to execute the command on.
cmd: string to execut on the target.
"""
- out = self.targets[target].cmd_nostatus(cmd, warn=False)
+ out = self.targets[target].cmd_nostatus(
+ cmd, stdin=subprocess.DEVNULL, warn=False
+ )
self.last = out = out.rstrip()
report = out if out else "<no output>"
self.logf("COMMAND OUTPUT:\n%s", report)
@@ -398,12 +457,14 @@ class TestCase:
target: the target to execute the command on.
cmd: string to execute on the target.
"""
- out = self.targets[target].cmd_nostatus(cmd, warn=False)
+ out = self.targets[target].cmd_nostatus(
+ cmd, stdin=subprocess.DEVNULL, warn=False
+ )
self.last = out = out.rstrip()
try:
js = json.loads(out)
except Exception as error:
- js = {}
+ js = None
self.olog.warning(
"JSON load failed. Check command output is in JSON format: %s",
error,
@@ -482,20 +543,33 @@ class TestCase:
exact_match: if True then the json must exactly match.
"""
js = self._command_json(target, cmd)
+ if js is None:
+ # Always fail on bad json, even if user expected failure
+ # return expect_fail, {}
+ return False, {}
+
try:
+ # Convert to string to validate the input is valid JSON
+ if not isinstance(match, str):
+ match = json.dumps(match)
expect = json.loads(match)
except Exception as error:
expect = {}
self.olog.warning(
"JSON load failed. Check match value is in JSON format: %s", error
)
+ # Always fail on bad json, even if user expected failure
+ # return expect_fail, {}
+ return False, {}
if exact_match:
deep_diff = json_cmp(expect, js)
# Convert DeepDiff completely into dicts or lists at all levels
json_diff = json.loads(deep_diff.to_json())
else:
- deep_diff = json_cmp(expect, js, ignore_order=True)
+ deep_diff = json_cmp(
+ expect, js, ignore_order=True, cutoff_intersection_for_pairs=1
+ )
# Convert DeepDiff completely into dicts or lists at all levels
json_diff = json.loads(deep_diff.to_json())
# Remove new fields in json object from diff
@@ -570,6 +644,7 @@ class TestCase:
"""
path = Path(pathname)
path = self.info.path.parent.joinpath(path)
+ do_cli = False
self.oplogf(
"include: new path: %s create section: %s currently __in_section: %s",
@@ -589,7 +664,12 @@ class TestCase:
self.info.path = path
self.oplogf("include: swapped info path: new %s old %s", path, old_path)
- self.__exec_script(path, print_header=new_section, add_newline=new_section)
+ try:
+ e = self.__exec_script(
+ path, print_header=new_section, add_newline=new_section
+ )
+ except CLIOnErrorError:
+ do_cli = True
if new_section:
# Something within the section creating include has also created a section
@@ -616,6 +696,11 @@ class TestCase:
self.info.path = old_path
self.oplogf("include: restored info path: %s", old_path)
+ if do_cli:
+ raise CLIOnErrorError()
+ if e:
+ raise ScriptError(e)
+
def __end_section(self):
self.oplogf("__end_section: __in_section: %s", self.__in_section)
info = self.__pop_execinfo()
@@ -719,6 +804,7 @@ class TestCase:
)
if desc:
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success, ret
def test_step(self, expr_or_value: Any, desc: str, target: str = "") -> bool:
@@ -728,6 +814,7 @@ class TestCase:
"""
success = bool(expr_or_value)
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success
def match_step_json(
@@ -760,6 +847,7 @@ class TestCase:
)
if desc:
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success, ret
def wait_step(
@@ -808,6 +896,7 @@ class TestCase:
)
if desc:
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success, ret
def wait_step_json(
@@ -846,6 +935,7 @@ class TestCase:
)
if desc:
self.__post_result(target, success, desc)
+ act_on_result(success, self.args, desc)
return success, ret