diff options
Diffstat (limited to 'test/features/steps/wrappers.py')
-rw-r--r-- | test/features/steps/wrappers.py | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/test/features/steps/wrappers.py b/test/features/steps/wrappers.py new file mode 100644 index 0000000..565ca59 --- /dev/null +++ b/test/features/steps/wrappers.py @@ -0,0 +1,94 @@ +import re +import pexpect +import sys +import textwrap + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + + +def expect_exact(context, expected, timeout): + timedout = False + try: + context.cli.expect_exact(expected, timeout=timeout) + except pexpect.exceptions.TIMEOUT: + timedout = True + if timedout: + # Strip color codes out of the output. + actual = re.sub(r'\x1b\[([0-9A-Za-z;?])+[m|K]?', + '', context.cli.before) + raise Exception( + textwrap.dedent('''\ + Expected: + --- + {0!r} + --- + Actual: + --- + {1!r} + --- + Full log: + --- + {2!r} + --- + ''').format( + expected, + actual, + context.logfile.getvalue() + ) + ) + + +def expect_pager(context, expected, timeout): + expect_exact(context, "{0}\r\n{1}{0}\r\n".format( + context.conf['pager_boundary'], expected), timeout=timeout) + + +def run_cli(context, run_args=None): + """Run the process using pexpect.""" + run_args = run_args or [] + if context.conf.get('host', None): + run_args.extend(('-h', context.conf['host'])) + if context.conf.get('user', None): + run_args.extend(('-u', context.conf['user'])) + if context.conf.get('pass', None): + run_args.extend(('-p', context.conf['pass'])) + if context.conf.get('dbname', None): + run_args.extend(('-D', context.conf['dbname'])) + if context.conf.get('defaults-file', None): + run_args.extend(('--defaults-file', context.conf['defaults-file'])) + if context.conf.get('myclirc', None): + run_args.extend(('--myclirc', context.conf['myclirc'])) + try: + cli_cmd = context.conf['cli_command'] + except KeyError: + cli_cmd = ( + '{0!s} -c "' + 'import coverage ; ' + 'coverage.process_startup(); ' + 'import mycli.main; ' + 'mycli.main.cli()' + '"' + ).format(sys.executable) + + cmd_parts = [cli_cmd] + run_args + cmd = ' '.join(cmd_parts) + context.cli = pexpect.spawnu(cmd, cwd=context.package_root) + context.logfile = StringIO() + context.cli.logfile = context.logfile + context.exit_sent = False + context.currentdb = context.conf['dbname'] + + +def wait_prompt(context, prompt=None): + """Make sure prompt is displayed.""" + if prompt is None: + user = context.conf['user'] + host = context.conf['host'] + dbname = context.currentdb + prompt = '{0}@{1}:{2}> '.format( + user, host, dbname), + expect_exact(context, prompt, timeout=5) + context.atprompt = True |