diff options
Diffstat (limited to 'test/features')
-rw-r--r-- | test/features/db_utils.py | 35 | ||||
-rw-r--r-- | test/features/environment.py | 133 | ||||
-rw-r--r-- | test/features/fixture_utils.py | 5 | ||||
-rw-r--r-- | test/features/steps/auto_vertical.py | 33 | ||||
-rw-r--r-- | test/features/steps/basic_commands.py | 64 | ||||
-rw-r--r-- | test/features/steps/connection.py | 46 | ||||
-rw-r--r-- | test/features/steps/crud_database.py | 80 | ||||
-rw-r--r-- | test/features/steps/crud_table.py | 70 | ||||
-rw-r--r-- | test/features/steps/iocommands.py | 78 | ||||
-rw-r--r-- | test/features/steps/named_queries.py | 51 | ||||
-rw-r--r-- | test/features/steps/specials.py | 10 | ||||
-rw-r--r-- | test/features/steps/utils.py | 4 | ||||
-rw-r--r-- | test/features/steps/wrappers.py | 72 |
13 files changed, 294 insertions, 387 deletions
diff --git a/test/features/db_utils.py b/test/features/db_utils.py index be550e9..175cc1b 100644 --- a/test/features/db_utils.py +++ b/test/features/db_utils.py @@ -1,8 +1,7 @@ import pymysql -def create_db(hostname='localhost', port=3306, username=None, - password=None, dbname=None): +def create_db(hostname="localhost", port=3306, username=None, password=None, dbname=None): """Create test database. :param hostname: string @@ -14,17 +13,12 @@ def create_db(hostname='localhost', port=3306, username=None, """ cn = pymysql.connect( - host=hostname, - port=port, - user=username, - password=password, - charset='utf8mb4', - cursorclass=pymysql.cursors.DictCursor + host=hostname, port=port, user=username, password=password, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor ) with cn.cursor() as cr: - cr.execute('drop database if exists ' + dbname) - cr.execute('create database ' + dbname) + cr.execute("drop database if exists " + dbname) + cr.execute("create database " + dbname) cn.close() @@ -44,20 +38,13 @@ def create_cn(hostname, port, password, username, dbname): """ cn = pymysql.connect( - host=hostname, - port=port, - user=username, - password=password, - db=dbname, - charset='utf8mb4', - cursorclass=pymysql.cursors.DictCursor + host=hostname, port=port, user=username, password=password, db=dbname, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor ) return cn -def drop_db(hostname='localhost', port=3306, username=None, - password=None, dbname=None): +def drop_db(hostname="localhost", port=3306, username=None, password=None, dbname=None): """Drop database. :param hostname: string @@ -68,17 +55,11 @@ def drop_db(hostname='localhost', port=3306, username=None, """ cn = pymysql.connect( - host=hostname, - port=port, - user=username, - password=password, - db=dbname, - charset='utf8mb4', - cursorclass=pymysql.cursors.DictCursor + host=hostname, port=port, user=username, password=password, db=dbname, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor ) with cn.cursor() as cr: - cr.execute('drop database if exists ' + dbname) + cr.execute("drop database if exists " + dbname) close_cn(cn) diff --git a/test/features/environment.py b/test/features/environment.py index 1ea0f08..a3d3764 100644 --- a/test/features/environment.py +++ b/test/features/environment.py @@ -9,96 +9,72 @@ import pexpect from steps.wrappers import run_cli, wait_prompt -test_log_file = os.path.join(os.environ['HOME'], '.mycli.test.log') +test_log_file = os.path.join(os.environ["HOME"], ".mycli.test.log") -SELF_CONNECTING_FEATURES = ( - 'test/features/connection.feature', -) +SELF_CONNECTING_FEATURES = ("test/features/connection.feature",) -MY_CNF_PATH = os.path.expanduser('~/.my.cnf') -MY_CNF_BACKUP_PATH = f'{MY_CNF_PATH}.backup' -MYLOGIN_CNF_PATH = os.path.expanduser('~/.mylogin.cnf') -MYLOGIN_CNF_BACKUP_PATH = f'{MYLOGIN_CNF_PATH}.backup' +MY_CNF_PATH = os.path.expanduser("~/.my.cnf") +MY_CNF_BACKUP_PATH = f"{MY_CNF_PATH}.backup" +MYLOGIN_CNF_PATH = os.path.expanduser("~/.mylogin.cnf") +MYLOGIN_CNF_BACKUP_PATH = f"{MYLOGIN_CNF_PATH}.backup" def get_db_name_from_context(context): - return context.config.userdata.get( - 'my_test_db', None - ) or "mycli_behave_tests" - + return context.config.userdata.get("my_test_db", None) or "mycli_behave_tests" def before_all(context): """Set env parameters.""" - os.environ['LINES'] = "100" - os.environ['COLUMNS'] = "100" - os.environ['EDITOR'] = 'ex' - os.environ['LC_ALL'] = 'en_US.UTF-8' - os.environ['PROMPT_TOOLKIT_NO_CPR'] = '1' - os.environ['MYCLI_HISTFILE'] = os.devnull + os.environ["LINES"] = "100" + os.environ["COLUMNS"] = "100" + os.environ["EDITOR"] = "ex" + os.environ["LC_ALL"] = "en_US.UTF-8" + os.environ["PROMPT_TOOLKIT_NO_CPR"] = "1" + os.environ["MYCLI_HISTFILE"] = os.devnull - test_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) - login_path_file = os.path.join(test_dir, 'mylogin.cnf') -# os.environ['MYSQL_TEST_LOGIN_FILE'] = login_path_file + # test_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) + # login_path_file = os.path.join(test_dir, "mylogin.cnf") + # os.environ['MYSQL_TEST_LOGIN_FILE'] = login_path_file - context.package_root = os.path.abspath( - os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + context.package_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) - os.environ["COVERAGE_PROCESS_START"] = os.path.join(context.package_root, - '.coveragerc') + os.environ["COVERAGE_PROCESS_START"] = os.path.join(context.package_root, ".coveragerc") context.exit_sent = False - vi = '_'.join([str(x) for x in sys.version_info[:3]]) + vi = "_".join([str(x) for x in sys.version_info[:3]]) db_name = get_db_name_from_context(context) - db_name_full = '{0}_{1}'.format(db_name, vi) + db_name_full = "{0}_{1}".format(db_name, vi) # Store get params from config/environment variables context.conf = { - 'host': context.config.userdata.get( - 'my_test_host', - os.getenv('PYTEST_HOST', 'localhost') - ), - 'port': context.config.userdata.get( - 'my_test_port', - int(os.getenv('PYTEST_PORT', '3306')) - ), - 'user': context.config.userdata.get( - 'my_test_user', - os.getenv('PYTEST_USER', 'root') - ), - 'pass': context.config.userdata.get( - 'my_test_pass', - os.getenv('PYTEST_PASSWORD', None) - ), - 'cli_command': context.config.userdata.get( - 'my_cli_command', None) or - sys.executable + ' -c "import coverage ; coverage.process_startup(); import mycli.main; mycli.main.cli()"', - 'dbname': db_name, - 'dbname_tmp': db_name_full + '_tmp', - 'vi': vi, - 'pager_boundary': '---boundary---', + "host": context.config.userdata.get("my_test_host", os.getenv("PYTEST_HOST", "localhost")), + "port": context.config.userdata.get("my_test_port", int(os.getenv("PYTEST_PORT", "3306"))), + "user": context.config.userdata.get("my_test_user", os.getenv("PYTEST_USER", "root")), + "pass": context.config.userdata.get("my_test_pass", os.getenv("PYTEST_PASSWORD", None)), + "cli_command": context.config.userdata.get("my_cli_command", None) + or sys.executable + ' -c "import coverage ; coverage.process_startup(); import mycli.main; mycli.main.cli()"', + "dbname": db_name, + "dbname_tmp": db_name_full + "_tmp", + "vi": vi, + "pager_boundary": "---boundary---", } _, my_cnf = mkstemp() - with open(my_cnf, 'w') as f: + with open(my_cnf, "w") as f: f.write( - '[client]\n' - 'pager={0} {1} {2}\n'.format( - sys.executable, os.path.join(context.package_root, - 'test/features/wrappager.py'), - context.conf['pager_boundary']) + "[client]\n" "pager={0} {1} {2}\n".format( + sys.executable, os.path.join(context.package_root, "test/features/wrappager.py"), context.conf["pager_boundary"] + ) ) - context.conf['defaults-file'] = my_cnf - context.conf['myclirc'] = os.path.join(context.package_root, 'test', - 'myclirc') + context.conf["defaults-file"] = my_cnf + context.conf["myclirc"] = os.path.join(context.package_root, "test", "myclirc") - context.cn = dbutils.create_db(context.conf['host'], context.conf['port'], - context.conf['user'], - context.conf['pass'], - context.conf['dbname']) + context.cn = dbutils.create_db( + context.conf["host"], context.conf["port"], context.conf["user"], context.conf["pass"], context.conf["dbname"] + ) context.fixture_data = fixutils.read_fixture_files() @@ -106,12 +82,10 @@ def before_all(context): def after_all(context): """Unset env parameters.""" dbutils.close_cn(context.cn) - dbutils.drop_db(context.conf['host'], context.conf['port'], - context.conf['user'], context.conf['pass'], - context.conf['dbname']) + dbutils.drop_db(context.conf["host"], context.conf["port"], context.conf["user"], context.conf["pass"], context.conf["dbname"]) # Restore env vars. - #for k, v in context.pgenv.items(): + # for k, v in context.pgenv.items(): # if k in os.environ and v is None: # del os.environ[k] # elif v: @@ -123,8 +97,8 @@ def before_step(context, _): def before_scenario(context, arg): - with open(test_log_file, 'w') as f: - f.write('') + with open(test_log_file, "w") as f: + f.write("") if arg.location.filename not in SELF_CONNECTING_FEATURES: run_cli(context) wait_prompt(context) @@ -140,23 +114,18 @@ def after_scenario(context, _): """Cleans up after each test complete.""" with open(test_log_file) as f: for line in f: - if 'error' in line.lower(): - raise RuntimeError(f'Error in log file: {line}') + if "error" in line.lower(): + raise RuntimeError(f"Error in log file: {line}") - if hasattr(context, 'cli') and not context.exit_sent: + if hasattr(context, "cli") and not context.exit_sent: # Quit nicely. if not context.atprompt: - user = context.conf['user'] - host = context.conf['host'] + user = context.conf["user"] + host = context.conf["host"] dbname = context.currentdb - context.cli.expect_exact( - '{0}@{1}:{2}>'.format( - user, host, dbname - ), - timeout=5 - ) - context.cli.sendcontrol('c') - context.cli.sendcontrol('d') + context.cli.expect_exact("{0}@{1}:{2}>".format(user, host, dbname), timeout=5) + context.cli.sendcontrol("c") + context.cli.sendcontrol("d") context.cli.expect_exact(pexpect.EOF, timeout=5) if os.path.exists(MY_CNF_BACKUP_PATH): diff --git a/test/features/fixture_utils.py b/test/features/fixture_utils.py index f85e0f6..514e41f 100644 --- a/test/features/fixture_utils.py +++ b/test/features/fixture_utils.py @@ -1,5 +1,4 @@ import os -import io def read_fixture_lines(filename): @@ -20,9 +19,9 @@ def read_fixture_files(): fixture_dict = {} current_dir = os.path.dirname(__file__) - fixture_dir = os.path.join(current_dir, 'fixture_data/') + fixture_dir = os.path.join(current_dir, "fixture_data/") for filename in os.listdir(fixture_dir): - if filename not in ['.', '..']: + if filename not in [".", ".."]: fullname = os.path.join(fixture_dir, filename) fixture_dict[filename] = read_fixture_lines(fullname) diff --git a/test/features/steps/auto_vertical.py b/test/features/steps/auto_vertical.py index e1cb26f..ad20067 100644 --- a/test/features/steps/auto_vertical.py +++ b/test/features/steps/auto_vertical.py @@ -6,41 +6,42 @@ import wrappers from utils import parse_cli_args_to_dict -@when('we run dbcli with {arg}') +@when("we run dbcli with {arg}") def step_run_cli_with_arg(context, arg): wrappers.run_cli(context, run_args=parse_cli_args_to_dict(arg)) -@when('we execute a small query') +@when("we execute a small query") def step_execute_small_query(context): - context.cli.sendline('select 1') + context.cli.sendline("select 1") -@when('we execute a large query') +@when("we execute a large query") def step_execute_large_query(context): - context.cli.sendline( - 'select {}'.format(','.join([str(n) for n in range(1, 50)]))) + context.cli.sendline("select {}".format(",".join([str(n) for n in range(1, 50)]))) -@then('we see small results in horizontal format') +@then("we see small results in horizontal format") def step_see_small_results(context): - wrappers.expect_pager(context, dedent("""\ + wrappers.expect_pager( + context, + dedent("""\ +---+\r | 1 |\r +---+\r | 1 |\r +---+\r \r - """), timeout=5) - wrappers.expect_exact(context, '1 row in set', timeout=2) + """), + timeout=5, + ) + wrappers.expect_exact(context, "1 row in set", timeout=2) -@then('we see large results in vertical format') +@then("we see large results in vertical format") def step_see_large_results(context): - rows = ['{n:3}| {n}'.format(n=str(n)) for n in range(1, 50)] - expected = ('***************************[ 1. row ]' - '***************************\r\n' + - '{}\r\n'.format('\r\n'.join(rows) + '\r\n')) + rows = ["{n:3}| {n}".format(n=str(n)) for n in range(1, 50)] + expected = "***************************[ 1. row ]" "***************************\r\n" + "{}\r\n".format("\r\n".join(rows) + "\r\n") wrappers.expect_pager(context, expected, timeout=10) - wrappers.expect_exact(context, '1 row in set', timeout=2) + wrappers.expect_exact(context, "1 row in set", timeout=2) diff --git a/test/features/steps/basic_commands.py b/test/features/steps/basic_commands.py index 425ef67..ec1e47a 100644 --- a/test/features/steps/basic_commands.py +++ b/test/features/steps/basic_commands.py @@ -5,18 +5,18 @@ to call the step in "*.feature" file. """ -from behave import when +from behave import when, then from textwrap import dedent import tempfile import wrappers -@when('we run dbcli') +@when("we run dbcli") def step_run_cli(context): wrappers.run_cli(context) -@when('we wait for prompt') +@when("we wait for prompt") def step_wait_prompt(context): wrappers.wait_prompt(context) @@ -24,77 +24,75 @@ def step_wait_prompt(context): @when('we send "ctrl + d"') def step_ctrl_d(context): """Send Ctrl + D to hopefully exit.""" - context.cli.sendcontrol('d') + context.cli.sendcontrol("d") context.exit_sent = True -@when('we send "\?" command') +@when(r'we send "\?" command') def step_send_help(context): - """Send \? + r"""Send \? to see help. """ - context.cli.sendline('\\?') - wrappers.expect_exact( - context, context.conf['pager_boundary'] + '\r\n', timeout=5) + context.cli.sendline("\\?") + wrappers.expect_exact(context, context.conf["pager_boundary"] + "\r\n", timeout=5) -@when(u'we send source command') +@when("we send source command") def step_send_source_command(context): with tempfile.NamedTemporaryFile() as f: - f.write(b'\?') + f.write(b"\\?") f.flush() - context.cli.sendline('\. {0}'.format(f.name)) - wrappers.expect_exact( - context, context.conf['pager_boundary'] + '\r\n', timeout=5) + context.cli.sendline("\\. {0}".format(f.name)) + wrappers.expect_exact(context, context.conf["pager_boundary"] + "\r\n", timeout=5) -@when(u'we run query to check application_name') +@when("we run query to check application_name") def step_check_application_name(context): context.cli.sendline( "SELECT 'found' FROM performance_schema.session_connect_attrs WHERE attr_name = 'program_name' AND attr_value = 'mycli'" ) -@then(u'we see found') +@then("we see found") def step_see_found(context): wrappers.expect_exact( context, - context.conf['pager_boundary'] + '\r' + dedent(''' + context.conf["pager_boundary"] + + "\r" + + dedent(""" +-------+\r | found |\r +-------+\r | found |\r +-------+\r \r - ''') + context.conf['pager_boundary'], - timeout=5 + """) + + context.conf["pager_boundary"], + timeout=5, ) -@then(u'we confirm the destructive warning') -def step_confirm_destructive_command(context): +@then("we confirm the destructive warning") +def step_confirm_destructive_command(context): # noqa """Confirm destructive command.""" - wrappers.expect_exact( - context, 'You\'re about to run a destructive command.\r\nDo you want to proceed? (y/n):', timeout=2) - context.cli.sendline('y') + wrappers.expect_exact(context, "You're about to run a destructive command.\r\nDo you want to proceed? (y/n):", timeout=2) + context.cli.sendline("y") -@when(u'we answer the destructive warning with "{confirmation}"') -def step_confirm_destructive_command(context, confirmation): +@when('we answer the destructive warning with "{confirmation}"') +def step_confirm_destructive_command(context, confirmation): # noqa """Confirm destructive command.""" - wrappers.expect_exact( - context, 'You\'re about to run a destructive command.\r\nDo you want to proceed? (y/n):', timeout=2) + wrappers.expect_exact(context, "You're about to run a destructive command.\r\nDo you want to proceed? (y/n):", timeout=2) context.cli.sendline(confirmation) -@then(u'we answer the destructive warning with invalid "{confirmation}" and see text "{text}"') -def step_confirm_destructive_command(context, confirmation, text): +@then('we answer the destructive warning with invalid "{confirmation}" and see text "{text}"') +def step_confirm_destructive_command(context, confirmation, text): # noqa """Confirm destructive command.""" - wrappers.expect_exact( - context, 'You\'re about to run a destructive command.\r\nDo you want to proceed? (y/n):', timeout=2) + wrappers.expect_exact(context, "You're about to run a destructive command.\r\nDo you want to proceed? (y/n):", timeout=2) context.cli.sendline(confirmation) wrappers.expect_exact(context, text, timeout=2) # we must exit the Click loop, or the feature will hang - context.cli.sendline('n') + context.cli.sendline("n") diff --git a/test/features/steps/connection.py b/test/features/steps/connection.py index e16dd86..80d0653 100644 --- a/test/features/steps/connection.py +++ b/test/features/steps/connection.py @@ -1,9 +1,7 @@ import io import os -import shlex from behave import when, then -import pexpect import wrappers from test.features.steps.utils import parse_cli_args_to_dict @@ -12,60 +10,44 @@ from test.utils import HOST, PORT, USER, PASSWORD from mycli.config import encrypt_mylogin_cnf -TEST_LOGIN_PATH = 'test_login_path' +TEST_LOGIN_PATH = "test_login_path" @when('we run mycli with arguments "{exact_args}" without arguments "{excluded_args}"') @when('we run mycli without arguments "{excluded_args}"') -def step_run_cli_without_args(context, excluded_args, exact_args=''): - wrappers.run_cli( - context, - run_args=parse_cli_args_to_dict(exact_args), - exclude_args=parse_cli_args_to_dict(excluded_args).keys() - ) +def step_run_cli_without_args(context, excluded_args, exact_args=""): + wrappers.run_cli(context, run_args=parse_cli_args_to_dict(exact_args), exclude_args=parse_cli_args_to_dict(excluded_args).keys()) @then('status contains "{expression}"') def status_contains(context, expression): - wrappers.expect_exact(context, f'{expression}', timeout=5) + wrappers.expect_exact(context, f"{expression}", timeout=5) # Normally, the shutdown after scenario waits for the prompt. # But we may have changed the prompt, depending on parameters, # so let's wait for its last character - context.cli.expect_exact('>') + context.cli.expect_exact(">") context.atprompt = True -@when('we create my.cnf file') +@when("we create my.cnf file") def step_create_my_cnf_file(context): - my_cnf = ( - '[client]\n' - f'host = {HOST}\n' - f'port = {PORT}\n' - f'user = {USER}\n' - f'password = {PASSWORD}\n' - ) - with open(MY_CNF_PATH, 'w') as f: + my_cnf = "[client]\n" f"host = {HOST}\n" f"port = {PORT}\n" f"user = {USER}\n" f"password = {PASSWORD}\n" + with open(MY_CNF_PATH, "w") as f: f.write(my_cnf) -@when('we create mylogin.cnf file') +@when("we create mylogin.cnf file") def step_create_mylogin_cnf_file(context): - os.environ.pop('MYSQL_TEST_LOGIN_FILE', None) - mylogin_cnf = ( - f'[{TEST_LOGIN_PATH}]\n' - f'host = {HOST}\n' - f'port = {PORT}\n' - f'user = {USER}\n' - f'password = {PASSWORD}\n' - ) - with open(MYLOGIN_CNF_PATH, 'wb') as f: + os.environ.pop("MYSQL_TEST_LOGIN_FILE", None) + mylogin_cnf = f"[{TEST_LOGIN_PATH}]\n" f"host = {HOST}\n" f"port = {PORT}\n" f"user = {USER}\n" f"password = {PASSWORD}\n" + with open(MYLOGIN_CNF_PATH, "wb") as f: input_file = io.StringIO(mylogin_cnf) f.write(encrypt_mylogin_cnf(input_file).read()) -@then('we are logged in') +@then("we are logged in") def we_are_logged_in(context): db_name = get_db_name_from_context(context) - context.cli.expect_exact(f'{db_name}>', timeout=5) + context.cli.expect_exact(f"{db_name}>", timeout=5) context.atprompt = True diff --git a/test/features/steps/crud_database.py b/test/features/steps/crud_database.py index 841f37d..56ff114 100644 --- a/test/features/steps/crud_database.py +++ b/test/features/steps/crud_database.py @@ -11,105 +11,99 @@ import wrappers from behave import when, then -@when('we create database') +@when("we create database") def step_db_create(context): """Send create database.""" - context.cli.sendline('create database {0};'.format( - context.conf['dbname_tmp'])) + context.cli.sendline("create database {0};".format(context.conf["dbname_tmp"])) - context.response = { - 'database_name': context.conf['dbname_tmp'] - } + context.response = {"database_name": context.conf["dbname_tmp"]} -@when('we drop database') +@when("we drop database") def step_db_drop(context): """Send drop database.""" - context.cli.sendline('drop database {0};'.format( - context.conf['dbname_tmp'])) + context.cli.sendline("drop database {0};".format(context.conf["dbname_tmp"])) -@when('we connect to test database') +@when("we connect to test database") def step_db_connect_test(context): """Send connect to database.""" - db_name = context.conf['dbname'] + db_name = context.conf["dbname"] context.currentdb = db_name - context.cli.sendline('use {0};'.format(db_name)) + context.cli.sendline("use {0};".format(db_name)) -@when('we connect to quoted test database') +@when("we connect to quoted test database") def step_db_connect_quoted_tmp(context): """Send connect to database.""" - db_name = context.conf['dbname'] + db_name = context.conf["dbname"] context.currentdb = db_name - context.cli.sendline('use `{0}`;'.format(db_name)) + context.cli.sendline("use `{0}`;".format(db_name)) -@when('we connect to tmp database') +@when("we connect to tmp database") def step_db_connect_tmp(context): """Send connect to database.""" - db_name = context.conf['dbname_tmp'] + db_name = context.conf["dbname_tmp"] context.currentdb = db_name - context.cli.sendline('use {0}'.format(db_name)) + context.cli.sendline("use {0}".format(db_name)) -@when('we connect to dbserver') +@when("we connect to dbserver") def step_db_connect_dbserver(context): """Send connect to database.""" - context.currentdb = 'mysql' - context.cli.sendline('use mysql') + context.currentdb = "mysql" + context.cli.sendline("use mysql") -@then('dbcli exits') +@then("dbcli exits") def step_wait_exit(context): """Make sure the cli exits.""" wrappers.expect_exact(context, pexpect.EOF, timeout=5) -@then('we see dbcli prompt') +@then("we see dbcli prompt") def step_see_prompt(context): """Wait to see the prompt.""" - user = context.conf['user'] - host = context.conf['host'] + user = context.conf["user"] + host = context.conf["host"] dbname = context.currentdb - wrappers.wait_prompt(context, '{0}@{1}:{2}> '.format(user, host, dbname)) + wrappers.wait_prompt(context, "{0}@{1}:{2}> ".format(user, host, dbname)) -@then('we see help output') +@then("we see help output") def step_see_help(context): - for expected_line in context.fixture_data['help_commands.txt']: + for expected_line in context.fixture_data["help_commands.txt"]: wrappers.expect_exact(context, expected_line, timeout=1) -@then('we see database created') +@then("we see database created") def step_see_db_created(context): """Wait to see create database output.""" - wrappers.expect_exact(context, 'Query OK, 1 row affected', timeout=2) + wrappers.expect_exact(context, "Query OK, 1 row affected", timeout=2) -@then('we see database dropped') +@then("we see database dropped") def step_see_db_dropped(context): """Wait to see drop database output.""" - wrappers.expect_exact(context, 'Query OK, 0 rows affected', timeout=2) + wrappers.expect_exact(context, "Query OK, 0 rows affected", timeout=2) -@then('we see database dropped and no default database') +@then("we see database dropped and no default database") def step_see_db_dropped_no_default(context): """Wait to see drop database output.""" - user = context.conf['user'] - host = context.conf['host'] - database = '(none)' + user = context.conf["user"] + host = context.conf["host"] + database = "(none)" context.currentdb = None - wrappers.expect_exact(context, 'Query OK, 0 rows affected', timeout=2) - wrappers.wait_prompt(context, '{0}@{1}:{2}>'.format(user, host, database)) + wrappers.expect_exact(context, "Query OK, 0 rows affected", timeout=2) + wrappers.wait_prompt(context, "{0}@{1}:{2}>".format(user, host, database)) -@then('we see database connected') +@then("we see database connected") def step_see_db_connected(context): """Wait to see drop database output.""" - wrappers.expect_exact( - context, 'You are now connected to database "', timeout=2) + wrappers.expect_exact(context, 'You are now connected to database "', timeout=2) wrappers.expect_exact(context, '"', timeout=2) - wrappers.expect_exact(context, ' as user "{0}"'.format( - context.conf['user']), timeout=2) + wrappers.expect_exact(context, ' as user "{0}"'.format(context.conf["user"]), timeout=2) diff --git a/test/features/steps/crud_table.py b/test/features/steps/crud_table.py index f715f0c..48a6408 100644 --- a/test/features/steps/crud_table.py +++ b/test/features/steps/crud_table.py @@ -10,103 +10,109 @@ from behave import when, then from textwrap import dedent -@when('we create table') +@when("we create table") def step_create_table(context): """Send create table.""" - context.cli.sendline('create table a(x text);') + context.cli.sendline("create table a(x text);") -@when('we insert into table') +@when("we insert into table") def step_insert_into_table(context): """Send insert into table.""" - context.cli.sendline('''insert into a(x) values('xxx');''') + context.cli.sendline("""insert into a(x) values('xxx');""") -@when('we update table') +@when("we update table") def step_update_table(context): """Send insert into table.""" - context.cli.sendline('''update a set x = 'yyy' where x = 'xxx';''') + context.cli.sendline("""update a set x = 'yyy' where x = 'xxx';""") -@when('we select from table') +@when("we select from table") def step_select_from_table(context): """Send select from table.""" - context.cli.sendline('select * from a;') + context.cli.sendline("select * from a;") -@when('we delete from table') +@when("we delete from table") def step_delete_from_table(context): """Send deete from table.""" - context.cli.sendline('''delete from a where x = 'yyy';''') + context.cli.sendline("""delete from a where x = 'yyy';""") -@when('we drop table') +@when("we drop table") def step_drop_table(context): """Send drop table.""" - context.cli.sendline('drop table a;') + context.cli.sendline("drop table a;") -@then('we see table created') +@then("we see table created") def step_see_table_created(context): """Wait to see create table output.""" - wrappers.expect_exact(context, 'Query OK, 0 rows affected', timeout=2) + wrappers.expect_exact(context, "Query OK, 0 rows affected", timeout=2) -@then('we see record inserted') +@then("we see record inserted") def step_see_record_inserted(context): """Wait to see insert output.""" - wrappers.expect_exact(context, 'Query OK, 1 row affected', timeout=2) + wrappers.expect_exact(context, "Query OK, 1 row affected", timeout=2) -@then('we see record updated') +@then("we see record updated") def step_see_record_updated(context): """Wait to see update output.""" - wrappers.expect_exact(context, 'Query OK, 1 row affected', timeout=2) + wrappers.expect_exact(context, "Query OK, 1 row affected", timeout=2) -@then('we see data selected') +@then("we see data selected") def step_see_data_selected(context): """Wait to see select output.""" wrappers.expect_pager( - context, dedent("""\ + context, + dedent("""\ +-----+\r | x |\r +-----+\r | yyy |\r +-----+\r \r - """), timeout=2) - wrappers.expect_exact(context, '1 row in set', timeout=2) + """), + timeout=2, + ) + wrappers.expect_exact(context, "1 row in set", timeout=2) -@then('we see record deleted') +@then("we see record deleted") def step_see_data_deleted(context): """Wait to see delete output.""" - wrappers.expect_exact(context, 'Query OK, 1 row affected', timeout=2) + wrappers.expect_exact(context, "Query OK, 1 row affected", timeout=2) -@then('we see table dropped') +@then("we see table dropped") def step_see_table_dropped(context): """Wait to see drop output.""" - wrappers.expect_exact(context, 'Query OK, 0 rows affected', timeout=2) + wrappers.expect_exact(context, "Query OK, 0 rows affected", timeout=2) -@when('we select null') +@when("we select null") def step_select_null(context): """Send select null.""" - context.cli.sendline('select null;') + context.cli.sendline("select null;") -@then('we see null selected') +@then("we see null selected") def step_see_null_selected(context): """Wait to see null output.""" wrappers.expect_pager( - context, dedent("""\ + context, + dedent("""\ +--------+\r | NULL |\r +--------+\r | <null> |\r +--------+\r \r - """), timeout=2) - wrappers.expect_exact(context, '1 row in set', timeout=2) + """), + timeout=2, + ) + wrappers.expect_exact(context, "1 row in set", timeout=2) diff --git a/test/features/steps/iocommands.py b/test/features/steps/iocommands.py index bbabf43..07d5c77 100644 --- a/test/features/steps/iocommands.py +++ b/test/features/steps/iocommands.py @@ -5,101 +5,93 @@ from behave import when, then from textwrap import dedent -@when('we start external editor providing a file name') +@when("we start external editor providing a file name") def step_edit_file(context): """Edit file with external editor.""" - context.editor_file_name = os.path.join( - context.package_root, 'test_file_{0}.sql'.format(context.conf['vi'])) + context.editor_file_name = os.path.join(context.package_root, "test_file_{0}.sql".format(context.conf["vi"])) if os.path.exists(context.editor_file_name): os.remove(context.editor_file_name) - context.cli.sendline('\e {0}'.format( - os.path.basename(context.editor_file_name))) - wrappers.expect_exact( - context, 'Entering Ex mode. Type "visual" to go to Normal mode.', timeout=2) - wrappers.expect_exact(context, '\r\n:', timeout=2) + context.cli.sendline("\\e {0}".format(os.path.basename(context.editor_file_name))) + wrappers.expect_exact(context, 'Entering Ex mode. Type "visual" to go to Normal mode.', timeout=2) + wrappers.expect_exact(context, "\r\n:", timeout=2) @when('we type "{query}" in the editor') def step_edit_type_sql(context, query): - context.cli.sendline('i') + context.cli.sendline("i") context.cli.sendline(query) - context.cli.sendline('.') - wrappers.expect_exact(context, '\r\n:', timeout=2) + context.cli.sendline(".") + wrappers.expect_exact(context, "\r\n:", timeout=2) -@when('we exit the editor') +@when("we exit the editor") def step_edit_quit(context): - context.cli.sendline('x') + context.cli.sendline("x") wrappers.expect_exact(context, "written", timeout=2) @then('we see "{query}" in prompt') def step_edit_done_sql(context, query): - for match in query.split(' '): + for match in query.split(" "): wrappers.expect_exact(context, match, timeout=5) # Cleanup the command line. - context.cli.sendcontrol('c') + context.cli.sendcontrol("c") # Cleanup the edited file. if context.editor_file_name and os.path.exists(context.editor_file_name): os.remove(context.editor_file_name) -@when(u'we tee output') +@when("we tee output") def step_tee_ouptut(context): - context.tee_file_name = os.path.join( - context.package_root, 'tee_file_{0}.sql'.format(context.conf['vi'])) + context.tee_file_name = os.path.join(context.package_root, "tee_file_{0}.sql".format(context.conf["vi"])) if os.path.exists(context.tee_file_name): os.remove(context.tee_file_name) - context.cli.sendline('tee {0}'.format( - os.path.basename(context.tee_file_name))) + context.cli.sendline("tee {0}".format(os.path.basename(context.tee_file_name))) -@when(u'we select "select {param}"') +@when('we select "select {param}"') def step_query_select_number(context, param): - context.cli.sendline(u'select {}'.format(param)) - wrappers.expect_pager(context, dedent(u"""\ + context.cli.sendline("select {}".format(param)) + wrappers.expect_pager( + context, + dedent( + """\ +{dashes}+\r | {param} |\r +{dashes}+\r | {param} |\r +{dashes}+\r \r - """.format(param=param, dashes='-' * (len(param) + 2)) - ), timeout=5) - wrappers.expect_exact(context, '1 row in set', timeout=2) + """.format(param=param, dashes="-" * (len(param) + 2)) + ), + timeout=5, + ) + wrappers.expect_exact(context, "1 row in set", timeout=2) -@then(u'we see result "{result}"') +@then('we see result "{result}"') def step_see_result(context, result): - wrappers.expect_exact( - context, - u"| {} |".format(result), - timeout=2 - ) + wrappers.expect_exact(context, "| {} |".format(result), timeout=2) -@when(u'we query "{query}"') +@when('we query "{query}"') def step_query(context, query): context.cli.sendline(query) -@when(u'we notee output') +@when("we notee output") def step_notee_output(context): - context.cli.sendline('notee') + context.cli.sendline("notee") -@then(u'we see 123456 in tee output') +@then("we see 123456 in tee output") def step_see_123456_in_ouput(context): with open(context.tee_file_name) as f: - assert '123456' in f.read() + assert "123456" in f.read() if os.path.exists(context.tee_file_name): os.remove(context.tee_file_name) -@then(u'delimiter is set to "{delimiter}"') +@then('delimiter is set to "{delimiter}"') def delimiter_is_set(context, delimiter): - wrappers.expect_exact( - context, - u'Changed delimiter to {}'.format(delimiter), - timeout=2 - ) + wrappers.expect_exact(context, "Changed delimiter to {}".format(delimiter), timeout=2) diff --git a/test/features/steps/named_queries.py b/test/features/steps/named_queries.py index bc1f866..93d68ba 100644 --- a/test/features/steps/named_queries.py +++ b/test/features/steps/named_queries.py @@ -9,82 +9,79 @@ import wrappers from behave import when, then -@when('we save a named query') +@when("we save a named query") def step_save_named_query(context): """Send \fs command.""" - context.cli.sendline('\\fs foo SELECT 12345') + context.cli.sendline("\\fs foo SELECT 12345") -@when('we use a named query') +@when("we use a named query") def step_use_named_query(context): """Send \f command.""" - context.cli.sendline('\\f foo') + context.cli.sendline("\\f foo") -@when('we delete a named query') +@when("we delete a named query") def step_delete_named_query(context): """Send \fd command.""" - context.cli.sendline('\\fd foo') + context.cli.sendline("\\fd foo") -@then('we see the named query saved') +@then("we see the named query saved") def step_see_named_query_saved(context): """Wait to see query saved.""" - wrappers.expect_exact(context, 'Saved.', timeout=2) + wrappers.expect_exact(context, "Saved.", timeout=2) -@then('we see the named query executed') +@then("we see the named query executed") def step_see_named_query_executed(context): """Wait to see select output.""" - wrappers.expect_exact(context, 'SELECT 12345', timeout=2) + wrappers.expect_exact(context, "SELECT 12345", timeout=2) -@then('we see the named query deleted') +@then("we see the named query deleted") def step_see_named_query_deleted(context): """Wait to see query deleted.""" - wrappers.expect_exact(context, 'foo: Deleted', timeout=2) + wrappers.expect_exact(context, "foo: Deleted", timeout=2) -@when('we save a named query with parameters') +@when("we save a named query with parameters") def step_save_named_query_with_parameters(context): """Send \fs command for query with parameters.""" context.cli.sendline('\\fs foo_args SELECT $1, "$2", "$3"') -@when('we use named query with parameters') +@when("we use named query with parameters") def step_use_named_query_with_parameters(context): """Send \f command with parameters.""" context.cli.sendline('\\f foo_args 101 second "third value"') -@then('we see the named query with parameters executed') +@then("we see the named query with parameters executed") def step_see_named_query_with_parameters_executed(context): """Wait to see select output.""" - wrappers.expect_exact( - context, 'SELECT 101, "second", "third value"', timeout=2) + wrappers.expect_exact(context, 'SELECT 101, "second", "third value"', timeout=2) -@when('we use named query with too few parameters') +@when("we use named query with too few parameters") def step_use_named_query_with_too_few_parameters(context): """Send \f command with missing parameters.""" - context.cli.sendline('\\f foo_args 101') + context.cli.sendline("\\f foo_args 101") -@then('we see the named query with parameters fail with missing parameters') +@then("we see the named query with parameters fail with missing parameters") def step_see_named_query_with_parameters_fail_with_missing_parameters(context): """Wait to see select output.""" - wrappers.expect_exact( - context, 'missing substitution for $2 in query:', timeout=2) + wrappers.expect_exact(context, "missing substitution for $2 in query:", timeout=2) -@when('we use named query with too many parameters') +@when("we use named query with too many parameters") def step_use_named_query_with_too_many_parameters(context): """Send \f command with extra parameters.""" - context.cli.sendline('\\f foo_args 101 102 103 104') + context.cli.sendline("\\f foo_args 101 102 103 104") -@then('we see the named query with parameters fail with extra parameters') +@then("we see the named query with parameters fail with extra parameters") def step_see_named_query_with_parameters_fail_with_extra_parameters(context): """Wait to see select output.""" - wrappers.expect_exact( - context, 'query does not have substitution parameter $4:', timeout=2) + wrappers.expect_exact(context, "query does not have substitution parameter $4:", timeout=2) diff --git a/test/features/steps/specials.py b/test/features/steps/specials.py index e8b99e3..1b50a00 100644 --- a/test/features/steps/specials.py +++ b/test/features/steps/specials.py @@ -9,10 +9,10 @@ import wrappers from behave import when, then -@when('we refresh completions') +@when("we refresh completions") def step_refresh_completions(context): """Send refresh command.""" - context.cli.sendline('rehash') + context.cli.sendline("rehash") @then('we see text "{text}"') @@ -20,8 +20,8 @@ def step_see_text(context, text): """Wait to see given text message.""" wrappers.expect_exact(context, text, timeout=2) -@then('we see completions refresh started') + +@then("we see completions refresh started") def step_see_refresh_started(context): """Wait to see refresh output.""" - wrappers.expect_exact( - context, 'Auto-completion refresh started in the background.', timeout=2) + wrappers.expect_exact(context, "Auto-completion refresh started in the background.", timeout=2) diff --git a/test/features/steps/utils.py b/test/features/steps/utils.py index 1ae63d2..873f9d4 100644 --- a/test/features/steps/utils.py +++ b/test/features/steps/utils.py @@ -4,8 +4,8 @@ import shlex def parse_cli_args_to_dict(cli_args: str): args_dict = {} for arg in shlex.split(cli_args): - if '=' in arg: - key, value = arg.split('=') + if "=" in arg: + key, value = arg.split("=") args_dict[key] = value else: args_dict[arg] = None diff --git a/test/features/steps/wrappers.py b/test/features/steps/wrappers.py index 6408f23..f9325c6 100644 --- a/test/features/steps/wrappers.py +++ b/test/features/steps/wrappers.py @@ -18,10 +18,9 @@ def expect_exact(context, expected, timeout): timedout = True if timedout: # Strip color codes out of the output. - actual = re.sub(r'\x1b\[([0-9A-Za-z;?])+[m|K]?', - '', context.cli.before) + actual = re.sub(r"\x1b\[([0-9A-Za-z;?])+[m|K]?", "", context.cli.before) raise Exception( - textwrap.dedent('''\ + textwrap.dedent("""\ Expected: --- {0!r} @@ -34,17 +33,12 @@ def expect_exact(context, expected, timeout): --- {2!r} --- - ''').format( - expected, - actual, - context.logfile.getvalue() - ) + """).format(expected, actual, context.logfile.getvalue()) ) def expect_pager(context, expected, timeout): - expect_exact(context, "{0}\r\n{1}{0}\r\n".format( - context.conf['pager_boundary'], expected), timeout=timeout) + expect_exact(context, "{0}\r\n{1}{0}\r\n".format(context.conf["pager_boundary"], expected), timeout=timeout) def run_cli(context, run_args=None, exclude_args=None): @@ -63,55 +57,49 @@ def run_cli(context, run_args=None, exclude_args=None): else: rendered_args.append(key) - if conf.get('host', None): - add_arg('host', '-h', conf['host']) - if conf.get('user', None): - add_arg('user', '-u', conf['user']) - if conf.get('pass', None): - add_arg('pass', '-p', conf['pass']) - if conf.get('port', None): - add_arg('port', '-P', str(conf['port'])) - if conf.get('dbname', None): - add_arg('dbname', '-D', conf['dbname']) - if conf.get('defaults-file', None): - add_arg('defaults_file', '--defaults-file', conf['defaults-file']) - if conf.get('myclirc', None): - add_arg('myclirc', '--myclirc', conf['myclirc']) - if conf.get('login_path'): - add_arg('login_path', '--login-path', conf['login_path']) + if conf.get("host", None): + add_arg("host", "-h", conf["host"]) + if conf.get("user", None): + add_arg("user", "-u", conf["user"]) + if conf.get("pass", None): + add_arg("pass", "-p", conf["pass"]) + if conf.get("port", None): + add_arg("port", "-P", str(conf["port"])) + if conf.get("dbname", None): + add_arg("dbname", "-D", conf["dbname"]) + if conf.get("defaults-file", None): + add_arg("defaults_file", "--defaults-file", conf["defaults-file"]) + if conf.get("myclirc", None): + add_arg("myclirc", "--myclirc", conf["myclirc"]) + if conf.get("login_path"): + add_arg("login_path", "--login-path", conf["login_path"]) for arg_name, arg_value in conf.items(): - if arg_name.startswith('-'): + if arg_name.startswith("-"): add_arg(arg_name, arg_name, arg_value) try: - cli_cmd = context.conf['cli_command'] + cli_cmd = context.conf["cli_command"] except KeyError: - cli_cmd = ( - '{0!s} -c "' - 'import coverage ; ' - 'coverage.process_startup(); ' - 'import mycli.main; ' - 'mycli.main.cli()' - '"' - ).format(sys.executable) + cli_cmd = ('{0!s} -c "' "import coverage ; " "coverage.process_startup(); " "import mycli.main; " "mycli.main.cli()" '"').format( + sys.executable + ) cmd_parts = [cli_cmd] + rendered_args - cmd = ' '.join(cmd_parts) + cmd = " ".join(cmd_parts) context.cli = pexpect.spawnu(cmd, cwd=context.package_root) context.logfile = StringIO() context.cli.logfile = context.logfile context.exit_sent = False - context.currentdb = context.conf['dbname'] + context.currentdb = context.conf["dbname"] def wait_prompt(context, prompt=None): """Make sure prompt is displayed.""" if prompt is None: - user = context.conf['user'] - host = context.conf['host'] + user = context.conf["user"] + host = context.conf["host"] dbname = context.currentdb - prompt = '{0}@{1}:{2}>'.format( - user, host, dbname), + prompt = ("{0}@{1}:{2}>".format(user, host, dbname),) expect_exact(context, prompt, timeout=5) context.atprompt = True |