summaryrefslogtreecommitdiffstats
path: root/tests/features/steps/wrappers.py
blob: 618051709acc82d0eb87a8cdc56a2afb9e1e8c0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import re
import pexpect
from pgcli.main import COLOR_CODE_REGEX
import textwrap

try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO


def expect_exact(context, expected, timeout):
    timedout = False
    try:
        context.cli.expect_exact(expected, timeout=timeout)
    except pexpect.TIMEOUT:
        timedout = True
    if timedout:
        # Strip color codes out of the output.
        actual = re.sub(r"\x1b\[([0-9A-Za-z;?])+[m|K]?", "", context.cli.before)
        raise Exception(
            textwrap.dedent(
                """\
                Expected:
                ---
                {0!r}
                ---
                Actual:
                ---
                {1!r}
                ---
                Full log:
                ---
                {2!r}
                ---
            """
            ).format(expected, actual, context.logfile.getvalue())
        )


def expect_pager(context, expected, timeout):
    formatted = expected if isinstance(expected, list) else [expected]
    formatted = [
        f"{context.conf['pager_boundary']}\r\n{t}{context.conf['pager_boundary']}\r\n"
        for t in formatted
    ]

    expect_exact(
        context,
        formatted,
        timeout=timeout,
    )


def run_cli(context, run_args=None, prompt_check=True, currentdb=None):
    """Run the process using pexpect."""
    run_args = run_args or []
    cli_cmd = context.conf.get("cli_command")
    cmd_parts = [cli_cmd] + run_args
    cmd = " ".join(cmd_parts)
    context.cli = pexpect.spawnu(cmd, cwd=context.package_root)
    context.logfile = StringIO()
    context.cli.logfile = context.logfile
    context.exit_sent = False
    context.currentdb = currentdb or context.conf["dbname"]
    context.cli.sendline(r"\pset pager always")
    if prompt_check:
        wait_prompt(context)


def wait_prompt(context):
    """Make sure prompt is displayed."""
    prompt_str = "{0}>".format(context.currentdb)
    expect_exact(context, [prompt_str + " ", prompt_str, pexpect.EOF], timeout=3)