From 8ffd3f30ec80895fe0f206b8dd3a25ea1457f9a5 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 19:40:12 +0200 Subject: Adding upstream version 3.5.0. Signed-off-by: Daniel Baumann --- tests/features/steps/wrappers.py | 74 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 tests/features/steps/wrappers.py (limited to 'tests/features/steps/wrappers.py') diff --git a/tests/features/steps/wrappers.py b/tests/features/steps/wrappers.py new file mode 100644 index 0000000..6180517 --- /dev/null +++ b/tests/features/steps/wrappers.py @@ -0,0 +1,74 @@ +import re +import pexpect +from pgcli.main import COLOR_CODE_REGEX +import textwrap + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + + +def expect_exact(context, expected, timeout): + timedout = False + try: + context.cli.expect_exact(expected, timeout=timeout) + except pexpect.TIMEOUT: + timedout = True + if timedout: + # Strip color codes out of the output. + actual = re.sub(r"\x1b\[([0-9A-Za-z;?])+[m|K]?", "", context.cli.before) + raise Exception( + textwrap.dedent( + """\ + Expected: + --- + {0!r} + --- + Actual: + --- + {1!r} + --- + Full log: + --- + {2!r} + --- + """ + ).format(expected, actual, context.logfile.getvalue()) + ) + + +def expect_pager(context, expected, timeout): + formatted = expected if isinstance(expected, list) else [expected] + formatted = [ + f"{context.conf['pager_boundary']}\r\n{t}{context.conf['pager_boundary']}\r\n" + for t in formatted + ] + + expect_exact( + context, + formatted, + timeout=timeout, + ) + + +def run_cli(context, run_args=None, prompt_check=True, currentdb=None): + """Run the process using pexpect.""" + run_args = run_args or [] + cli_cmd = context.conf.get("cli_command") + cmd_parts = [cli_cmd] + run_args + cmd = " ".join(cmd_parts) + context.cli = pexpect.spawnu(cmd, cwd=context.package_root) + context.logfile = StringIO() + context.cli.logfile = context.logfile + context.exit_sent = False + context.currentdb = currentdb or context.conf["dbname"] + context.cli.sendline(r"\pset pager always") + if prompt_check: + wait_prompt(context) + + +def wait_prompt(context): + """Make sure prompt is displayed.""" + prompt_str = "{0}>".format(context.currentdb) + expect_exact(context, [prompt_str + " ", prompt_str, pexpect.EOF], timeout=3) -- cgit v1.2.3