From 6884720fae8a2622b14e93d9e35ca5fcc2283b40 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 8 Feb 2021 11:31:05 +0100 Subject: Adding upstream version 3.1.0. Signed-off-by: Daniel Baumann --- tests/test_smart_completion_public_schema_only.py | 1112 +++++++++++++++++++++ 1 file changed, 1112 insertions(+) create mode 100644 tests/test_smart_completion_public_schema_only.py (limited to 'tests/test_smart_completion_public_schema_only.py') diff --git a/tests/test_smart_completion_public_schema_only.py b/tests/test_smart_completion_public_schema_only.py new file mode 100644 index 0000000..b935709 --- /dev/null +++ b/tests/test_smart_completion_public_schema_only.py @@ -0,0 +1,1112 @@ +from metadata import ( + MetaData, + alias, + name_join, + fk_join, + join, + keyword, + schema, + table, + view, + function, + column, + wildcard_expansion, + get_result, + result_set, + qual, + no_qual, + parametrize, +) +from prompt_toolkit.completion import Completion +from utils import completions_to_set + + +metadata = { + "tables": { + "users": ["id", "parentid", "email", "first_name", "last_name"], + "Users": ["userid", "username"], + "orders": ["id", "ordered_date", "status", "email"], + "select": ["id", "insert", "ABC"], + }, + "views": {"user_emails": ["id", "email"], "functions": ["function"]}, + "functions": [ + ["custom_fun", [], [], [], "", False, False, False, False], + ["_custom_fun", [], [], [], "", False, False, False, False], + ["custom_func1", [], [], [], "", False, False, False, False], + ["custom_func2", [], [], [], "", False, False, False, False], + [ + "set_returning_func", + ["x", "y"], + ["integer", "integer"], + ["b", "b"], + "", + False, + False, + True, + False, + ], + ], + "datatypes": ["custom_type1", "custom_type2"], + "foreignkeys": [ + ("public", "users", "id", "public", "users", "parentid"), + ("public", "users", "id", "public", "Users", "userid"), + ], +} + +metadata = dict((k, {"public": v}) for k, v in metadata.items()) + +testdata = MetaData(metadata) + +cased_users_col_names = ["ID", "PARENTID", "Email", "First_Name", "last_name"] +cased_users2_col_names = ["UserID", "UserName"] +cased_func_names = [ + "Custom_Fun", + "_custom_fun", + "Custom_Func1", + "custom_func2", + "set_returning_func", +] +cased_tbls = ["Users", "Orders"] +cased_views = ["User_Emails", "Functions"] +casing = ( + ["SELECT", "PUBLIC"] + + cased_func_names + + cased_tbls + + cased_views + + cased_users_col_names + + cased_users2_col_names +) +# Lists for use in assertions +cased_funcs = [ + function(f) + for f in ("Custom_Fun()", "_custom_fun()", "Custom_Func1()", "custom_func2()") +] + [function("set_returning_func(x := , y := )", display="set_returning_func(x, y)")] +cased_tbls = [table(t) for t in (cased_tbls + ['"Users"', '"select"'])] +cased_rels = [view(t) for t in cased_views] + cased_funcs + cased_tbls +cased_users_cols = [column(c) for c in cased_users_col_names] +aliased_rels = ( + [table(t) for t in ("users u", '"Users" U', "orders o", '"select" s')] + + [view("user_emails ue"), view("functions f")] + + [ + function(f) + for f in ( + "_custom_fun() cf", + "custom_fun() cf", + "custom_func1() cf", + "custom_func2() cf", + ) + ] + + [ + function( + "set_returning_func(x := , y := ) srf", + display="set_returning_func(x, y) srf", + ) + ] +) +cased_aliased_rels = ( + [table(t) for t in ("Users U", '"Users" U', "Orders O", '"select" s')] + + [view("User_Emails UE"), view("Functions F")] + + [ + function(f) + for f in ( + "_custom_fun() cf", + "Custom_Fun() CF", + "Custom_Func1() CF", + "custom_func2() cf", + ) + ] + + [ + function( + "set_returning_func(x := , y := ) srf", + display="set_returning_func(x, y) srf", + ) + ] +) +completers = testdata.get_completers(casing) + + +# Just to make sure that this doesn't crash +@parametrize("completer", completers()) +def test_function_column_name(completer): + for l in range( + len("SELECT * FROM Functions WHERE function:"), + len("SELECT * FROM Functions WHERE function:text") + 1, + ): + assert [] == get_result( + completer, "SELECT * FROM Functions WHERE function:text"[:l] + ) + + +@parametrize("action", ["ALTER", "DROP", "CREATE", "CREATE OR REPLACE"]) +@parametrize("completer", completers()) +def test_drop_alter_function(completer, action): + assert get_result(completer, action + " FUNCTION set_ret") == [ + function("set_returning_func(x integer, y integer)", -len("set_ret")) + ] + + +@parametrize("completer", completers()) +def test_empty_string_completion(completer): + result = get_result(completer, "") + assert completions_to_set( + testdata.keywords() + testdata.specials() + ) == completions_to_set(result) + + +@parametrize("completer", completers()) +def test_select_keyword_completion(completer): + result = get_result(completer, "SEL") + assert completions_to_set(result) == completions_to_set([keyword("SELECT", -3)]) + + +@parametrize("completer", completers()) +def test_builtin_function_name_completion(completer): + result = get_result(completer, "SELECT MA") + assert completions_to_set(result) == completions_to_set( + [ + function("MAKE_DATE", -2), + function("MAKE_INTERVAL", -2), + function("MAKE_TIME", -2), + function("MAKE_TIMESTAMP", -2), + function("MAKE_TIMESTAMPTZ", -2), + function("MASKLEN", -2), + function("MAX", -2), + keyword("MAXEXTENTS", -2), + keyword("MATERIALIZED VIEW", -2), + ] + ) + + +@parametrize("completer", completers()) +def test_builtin_function_matches_only_at_start(completer): + text = "SELECT IN" + + result = [c.text for c in get_result(completer, text)] + + assert "MIN" not in result + + +@parametrize("completer", completers(casing=False, aliasing=False)) +def test_user_function_name_completion(completer): + result = get_result(completer, "SELECT cu") + assert completions_to_set(result) == completions_to_set( + [ + function("custom_fun()", -2), + function("_custom_fun()", -2), + function("custom_func1()", -2), + function("custom_func2()", -2), + function("CURRENT_DATE", -2), + function("CURRENT_TIMESTAMP", -2), + function("CUME_DIST", -2), + function("CURRENT_TIME", -2), + keyword("CURRENT", -2), + ] + ) + + +@parametrize("completer", completers(casing=False, aliasing=False)) +def test_user_function_name_completion_matches_anywhere(completer): + result = get_result(completer, "SELECT om") + assert completions_to_set(result) == completions_to_set( + [ + function("custom_fun()", -2), + function("_custom_fun()", -2), + function("custom_func1()", -2), + function("custom_func2()", -2), + ] + ) + + +@parametrize("completer", completers(casing=True)) +def test_list_functions_for_special(completer): + result = get_result(completer, r"\df ") + assert completions_to_set(result) == completions_to_set( + [schema("PUBLIC")] + [function(f) for f in cased_func_names] + ) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +def test_suggested_column_names_from_visible_table(completer): + result = get_result(completer, "SELECT from users", len("SELECT ")) + assert completions_to_set(result) == completions_to_set( + testdata.columns_functions_and_keywords("users") + ) + + +@parametrize("completer", completers(casing=True, qualify=no_qual)) +def test_suggested_cased_column_names(completer): + result = get_result(completer, "SELECT from users", len("SELECT ")) + assert completions_to_set(result) == completions_to_set( + cased_funcs + + cased_users_cols + + testdata.builtin_functions() + + testdata.keywords() + ) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +@parametrize("text", ["SELECT from users", "INSERT INTO Orders SELECT from users"]) +def test_suggested_auto_qualified_column_names(text, completer): + position = text.index(" ") + 1 + cols = [column(c.lower()) for c in cased_users_col_names] + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + cols + testdata.functions_and_keywords() + ) + + +@parametrize("completer", completers(casing=False, qualify=qual)) +@parametrize( + "text", + [ + 'SELECT from users U NATURAL JOIN "Users"', + 'INSERT INTO Orders SELECT from users U NATURAL JOIN "Users"', + ], +) +def test_suggested_auto_qualified_column_names_two_tables(text, completer): + position = text.index(" ") + 1 + cols = [column("U." + c.lower()) for c in cased_users_col_names] + cols += [column('"Users".' + c.lower()) for c in cased_users2_col_names] + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + cols + testdata.functions_and_keywords() + ) + + +@parametrize("completer", completers(casing=True, qualify=["always"])) +@parametrize("text", ["UPDATE users SET ", "INSERT INTO users("]) +def test_no_column_qualification(text, completer): + cols = [column(c) for c in cased_users_col_names] + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set(cols) + + +@parametrize("completer", completers(casing=True, qualify=["always"])) +def test_suggested_cased_always_qualified_column_names(completer): + text = "SELECT from users" + position = len("SELECT ") + cols = [column("users." + c) for c in cased_users_col_names] + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + cased_funcs + cols + testdata.builtin_functions() + testdata.keywords() + ) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +def test_suggested_column_names_in_function(completer): + result = get_result(completer, "SELECT MAX( from users", len("SELECT MAX(")) + assert completions_to_set(result) == completions_to_set( + (testdata.columns_functions_and_keywords("users")) + ) + + +@parametrize("completer", completers(casing=False)) +def test_suggested_column_names_with_table_dot(completer): + result = get_result(completer, "SELECT users. from users", len("SELECT users.")) + assert completions_to_set(result) == completions_to_set(testdata.columns("users")) + + +@parametrize("completer", completers(casing=False)) +def test_suggested_column_names_with_alias(completer): + result = get_result(completer, "SELECT u. from users u", len("SELECT u.")) + assert completions_to_set(result) == completions_to_set(testdata.columns("users")) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +def test_suggested_multiple_column_names(completer): + result = get_result(completer, "SELECT id, from users u", len("SELECT id, ")) + assert completions_to_set(result) == completions_to_set( + (testdata.columns_functions_and_keywords("users")) + ) + + +@parametrize("completer", completers(casing=False)) +def test_suggested_multiple_column_names_with_alias(completer): + result = get_result( + completer, "SELECT u.id, u. from users u", len("SELECT u.id, u.") + ) + assert completions_to_set(result) == completions_to_set(testdata.columns("users")) + + +@parametrize("completer", completers(casing=True)) +def test_suggested_cased_column_names_with_alias(completer): + result = get_result( + completer, "SELECT u.id, u. from users u", len("SELECT u.id, u.") + ) + assert completions_to_set(result) == completions_to_set(cased_users_cols) + + +@parametrize("completer", completers(casing=False)) +def test_suggested_multiple_column_names_with_dot(completer): + result = get_result( + completer, + "SELECT users.id, users. from users u", + len("SELECT users.id, users."), + ) + assert completions_to_set(result) == completions_to_set(testdata.columns("users")) + + +@parametrize("completer", completers(casing=False)) +def test_suggest_columns_after_three_way_join(completer): + text = """SELECT * FROM users u1 + INNER JOIN users u2 ON u1.id = u2.id + INNER JOIN users u3 ON u2.id = u3.""" + result = get_result(completer, text) + assert column("id") in result + + +join_condition_texts = [ + 'INSERT INTO orders SELECT * FROM users U JOIN "Users" U2 ON ', + """INSERT INTO public.orders(orderid) + SELECT * FROM users U JOIN "Users" U2 ON """, + 'SELECT * FROM users U JOIN "Users" U2 ON ', + 'SELECT * FROM users U INNER join "Users" U2 ON ', + 'SELECT * FROM USERS U right JOIN "Users" U2 ON ', + 'SELECT * FROM users U LEFT JOIN "Users" U2 ON ', + 'SELECT * FROM Users U FULL JOIN "Users" U2 ON ', + 'SELECT * FROM users U right outer join "Users" U2 ON ', + 'SELECT * FROM Users U LEFT OUTER JOIN "Users" U2 ON ', + 'SELECT * FROM users U FULL OUTER JOIN "Users" U2 ON ', + """SELECT * + FROM users U + FULL OUTER JOIN "Users" U2 ON + """, +] + + +@parametrize("completer", completers(casing=False)) +@parametrize("text", join_condition_texts) +def test_suggested_join_conditions(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [alias("U"), alias("U2"), fk_join("U2.userid = U.id")] + ) + + +@parametrize("completer", completers(casing=True)) +@parametrize("text", join_condition_texts) +def test_cased_join_conditions(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [alias("U"), alias("U2"), fk_join("U2.UserID = U.ID")] + ) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + """SELECT * + FROM users + CROSS JOIN "Users" + NATURAL JOIN users u + JOIN "Users" u2 ON + """ + ], +) +def test_suggested_join_conditions_with_same_table_twice(completer, text): + result = get_result(completer, text) + assert result == [ + fk_join("u2.userid = u.id"), + fk_join("u2.userid = users.id"), + name_join('u2.userid = "Users".userid'), + name_join('u2.username = "Users".username'), + alias("u"), + alias("u2"), + alias("users"), + alias('"Users"'), + ] + + +@parametrize("completer", completers()) +@parametrize("text", ["SELECT * FROM users JOIN users u2 on foo."]) +def test_suggested_join_conditions_with_invalid_qualifier(completer, text): + result = get_result(completer, text) + assert result == [] + + +@parametrize("completer", completers(casing=False)) +@parametrize( + ("text", "ref"), + [ + ("SELECT * FROM users JOIN NonTable on ", "NonTable"), + ("SELECT * FROM users JOIN nontable nt on ", "nt"), + ], +) +def test_suggested_join_conditions_with_invalid_table(completer, text, ref): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [alias("users"), alias(ref)] + ) + + +@parametrize("completer", completers(casing=False, aliasing=False)) +@parametrize( + "text", + [ + 'SELECT * FROM "Users" u JOIN u', + 'SELECT * FROM "Users" u JOIN uid', + 'SELECT * FROM "Users" u JOIN userid', + 'SELECT * FROM "Users" u JOIN id', + ], +) +def test_suggested_joins_fuzzy(completer, text): + result = get_result(completer, text) + last_word = text.split()[-1] + expected = join("users ON users.id = u.userid", -len(last_word)) + assert expected in result + + +join_texts = [ + "SELECT * FROM Users JOIN ", + """INSERT INTO "Users" + SELECT * + FROM Users + INNER JOIN """, + """INSERT INTO public."Users"(username) + SELECT * + FROM Users + INNER JOIN """, + """SELECT * + FROM Users + INNER JOIN """, +] + + +@parametrize("completer", completers(casing=False, aliasing=False)) +@parametrize("text", join_texts) +def test_suggested_joins(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas_and_from_clause_items() + + [ + join('"Users" ON "Users".userid = Users.id'), + join("users users2 ON users2.id = Users.parentid"), + join("users users2 ON users2.parentid = Users.id"), + ] + ) + + +@parametrize("completer", completers(casing=True, aliasing=False)) +@parametrize("text", join_texts) +def test_cased_joins(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [schema("PUBLIC")] + + cased_rels + + [ + join('"Users" ON "Users".UserID = Users.ID'), + join("Users Users2 ON Users2.ID = Users.PARENTID"), + join("Users Users2 ON Users2.PARENTID = Users.ID"), + ] + ) + + +@parametrize("completer", completers(casing=False, aliasing=True)) +@parametrize("text", join_texts) +def test_aliased_joins(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas() + + aliased_rels + + [ + join('"Users" U ON U.userid = Users.id'), + join("users u ON u.id = Users.parentid"), + join("users u ON u.parentid = Users.id"), + ] + ) + + +@parametrize("completer", completers(casing=False, aliasing=False)) +@parametrize( + "text", + [ + 'SELECT * FROM public."Users" JOIN ', + 'SELECT * FROM public."Users" RIGHT OUTER JOIN ', + """SELECT * + FROM public."Users" + LEFT JOIN """, + ], +) +def test_suggested_joins_quoted_schema_qualified_table(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas_and_from_clause_items() + + [join('public.users ON users.id = "Users".userid')] + ) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT u.name, o.id FROM users u JOIN orders o ON ", + "SELECT u.name, o.id FROM users u JOIN orders o ON JOIN orders o2 ON", + ], +) +def test_suggested_aliases_after_on(completer, text): + position = len("SELECT u.name, o.id FROM users u JOIN orders o ON ") + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + [ + alias("u"), + name_join("o.id = u.id"), + name_join("o.email = u.email"), + alias("o"), + ] + ) + + +@parametrize("completer", completers()) +@parametrize( + "text", + [ + "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ", + "SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = JOIN orders o2 ON", + ], +) +def test_suggested_aliases_after_on_right_side(completer, text): + position = len("SELECT u.name, o.id FROM users u JOIN orders o ON o.user_id = ") + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set([alias("u"), alias("o")]) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT users.name, orders.id FROM users JOIN orders ON ", + "SELECT users.name, orders.id FROM users JOIN orders ON JOIN orders orders2 ON", + ], +) +def test_suggested_tables_after_on(completer, text): + position = len("SELECT users.name, orders.id FROM users JOIN orders ON ") + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + [ + name_join("orders.id = users.id"), + name_join("orders.email = users.email"), + alias("users"), + alias("orders"), + ] + ) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = JOIN orders orders2 ON", + "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = ", + ], +) +def test_suggested_tables_after_on_right_side(completer, text): + position = len( + "SELECT users.name, orders.id FROM users JOIN orders ON orders.user_id = " + ) + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + [alias("users"), alias("orders")] + ) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT * FROM users INNER JOIN orders USING (", + "SELECT * FROM users INNER JOIN orders USING(", + ], +) +def test_join_using_suggests_common_columns(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [column("id"), column("email")] + ) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT * FROM users u1 JOIN users u2 USING (email) JOIN user_emails ue USING()", + "SELECT * FROM users u1 JOIN users u2 USING(email) JOIN user_emails ue USING ()", + "SELECT * FROM users u1 JOIN user_emails ue USING () JOIN users u2 ue USING(first_name, last_name)", + "SELECT * FROM users u1 JOIN user_emails ue USING() JOIN users u2 ue USING (first_name, last_name)", + ], +) +def test_join_using_suggests_from_last_table(completer, text): + position = text.index("()") + 1 + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + [column("id"), column("email")] + ) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT * FROM users INNER JOIN orders USING (id,", + "SELECT * FROM users INNER JOIN orders USING(id,", + ], +) +def test_join_using_suggests_columns_after_first_column(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [column("id"), column("email")] + ) + + +@parametrize("completer", completers(casing=False, aliasing=False)) +@parametrize( + "text", + [ + "SELECT * FROM ", + "SELECT * FROM users CROSS JOIN ", + "SELECT * FROM users natural join ", + ], +) +def test_table_names_after_from(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas_and_from_clause_items() + ) + assert [c.text for c in result] == [ + "public", + "orders", + '"select"', + "users", + '"Users"', + "functions", + "user_emails", + "_custom_fun()", + "custom_fun()", + "custom_func1()", + "custom_func2()", + "set_returning_func(x := , y := )", + ] + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +def test_auto_escaped_col_names(completer): + result = get_result(completer, 'SELECT from "select"', len("SELECT ")) + assert completions_to_set(result) == completions_to_set( + testdata.columns_functions_and_keywords("select") + ) + + +@parametrize("completer", completers(aliasing=False)) +def test_allow_leading_double_quote_in_last_word(completer): + result = get_result(completer, 'SELECT * from "sele') + + expected = table('"select"', -5) + + assert expected in result + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT 1::", + "CREATE TABLE foo (bar ", + "CREATE FUNCTION foo (bar INT, baz ", + "ALTER TABLE foo ALTER COLUMN bar TYPE ", + ], +) +def test_suggest_datatype(text, completer): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas() + testdata.types() + testdata.builtin_datatypes() + ) + + +@parametrize("completer", completers(casing=False)) +def test_suggest_columns_from_escaped_table_alias(completer): + result = get_result(completer, 'select * from "select" s where s.') + assert completions_to_set(result) == completions_to_set(testdata.columns("select")) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +def test_suggest_columns_from_set_returning_function(completer): + result = get_result(completer, "select from set_returning_func()", len("select ")) + assert completions_to_set(result) == completions_to_set( + testdata.columns_functions_and_keywords("set_returning_func", typ="functions") + ) + + +@parametrize("completer", completers(casing=False)) +def test_suggest_columns_from_aliased_set_returning_function(completer): + result = get_result( + completer, "select f. from set_returning_func() f", len("select f.") + ) + assert completions_to_set(result) == completions_to_set( + testdata.columns("set_returning_func", typ="functions") + ) + + +@parametrize("completer", completers(casing=False)) +def test_join_functions_using_suggests_common_columns(completer): + text = """SELECT * FROM set_returning_func() f1 + INNER JOIN set_returning_func() f2 USING (""" + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.columns("set_returning_func", typ="functions") + ) + + +@parametrize("completer", completers(casing=False)) +def test_join_functions_on_suggests_columns_and_join_conditions(completer): + text = """SELECT * FROM set_returning_func() f1 + INNER JOIN set_returning_func() f2 ON f1.""" + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [name_join("y = f2.y"), name_join("x = f2.x")] + + testdata.columns("set_returning_func", typ="functions") + ) + + +@parametrize("completer", completers()) +def test_learn_keywords(completer): + history = "CREATE VIEW v AS SELECT 1" + completer.extend_query_history(history) + + # Now that we've used `VIEW` once, it should be suggested ahead of other + # keywords starting with v. + text = "create v" + completions = get_result(completer, text) + assert completions[0].text == "VIEW" + + +@parametrize("completer", completers(casing=False, aliasing=False)) +def test_learn_table_names(completer): + history = "SELECT * FROM users; SELECT * FROM orders; SELECT * FROM users" + completer.extend_query_history(history) + + text = "SELECT * FROM " + completions = get_result(completer, text) + + # `users` should be higher priority than `orders` (used more often) + users = table("users") + orders = table("orders") + + assert completions.index(users) < completions.index(orders) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +def test_columns_before_keywords(completer): + text = "SELECT * FROM orders WHERE s" + completions = get_result(completer, text) + + col = column("status", -1) + kw = keyword("SELECT", -1) + + assert completions.index(col) < completions.index(kw) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +@parametrize( + "text", + [ + "SELECT * FROM users", + "INSERT INTO users SELECT * FROM users u", + """INSERT INTO users(id, parentid, email, first_name, last_name) + SELECT * + FROM users u""", + ], +) +def test_wildcard_column_expansion(completer, text): + position = text.find("*") + 1 + + completions = get_result(completer, text, position) + + col_list = "id, parentid, email, first_name, last_name" + expected = [wildcard_expansion(col_list)] + + assert expected == completions + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT u.* FROM users u", + "INSERT INTO public.users SELECT u.* FROM users u", + """INSERT INTO users(id, parentid, email, first_name, last_name) + SELECT u.* + FROM users u""", + ], +) +def test_wildcard_column_expansion_with_alias(completer, text): + position = text.find("*") + 1 + + completions = get_result(completer, text, position) + + col_list = "id, u.parentid, u.email, u.first_name, u.last_name" + expected = [wildcard_expansion(col_list)] + + assert expected == completions + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text,expected", + [ + ( + "SELECT users.* FROM users", + "id, users.parentid, users.email, users.first_name, users.last_name", + ), + ( + "SELECT Users.* FROM Users", + "id, Users.parentid, Users.email, Users.first_name, Users.last_name", + ), + ], +) +def test_wildcard_column_expansion_with_table_qualifier(completer, text, expected): + position = len("SELECT users.*") + + completions = get_result(completer, text, position) + + expected = [wildcard_expansion(expected)] + + assert expected == completions + + +@parametrize("completer", completers(casing=False, qualify=qual)) +def test_wildcard_column_expansion_with_two_tables(completer): + text = 'SELECT * FROM "select" JOIN users u ON true' + position = len("SELECT *") + + completions = get_result(completer, text, position) + + cols = ( + '"select".id, "select".insert, "select"."ABC", ' + "u.id, u.parentid, u.email, u.first_name, u.last_name" + ) + expected = [wildcard_expansion(cols)] + assert completions == expected + + +@parametrize("completer", completers(casing=False)) +def test_wildcard_column_expansion_with_two_tables_and_parent(completer): + text = 'SELECT "select".* FROM "select" JOIN users u ON true' + position = len('SELECT "select".*') + + completions = get_result(completer, text, position) + + col_list = 'id, "select".insert, "select"."ABC"' + expected = [wildcard_expansion(col_list)] + + assert expected == completions + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + ["SELECT U. FROM Users U", "SELECT U. FROM USERS U", "SELECT U. FROM users U"], +) +def test_suggest_columns_from_unquoted_table(completer, text): + position = len("SELECT U.") + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set(testdata.columns("users")) + + +@parametrize("completer", completers(casing=False)) +def test_suggest_columns_from_quoted_table(completer): + result = get_result(completer, 'SELECT U. FROM "Users" U', len("SELECT U.")) + assert completions_to_set(result) == completions_to_set(testdata.columns("Users")) + + +@parametrize("completer", completers(casing=False, aliasing=False)) +@parametrize("text", ["SELECT * FROM ", "SELECT * FROM Orders o CROSS JOIN "]) +def test_schema_or_visible_table_completion(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas_and_from_clause_items() + ) + + +@parametrize("completer", completers(casing=False, aliasing=True)) +@parametrize("text", ["SELECT * FROM "]) +def test_table_aliases(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas() + aliased_rels + ) + + +@parametrize("completer", completers(casing=False, aliasing=True)) +@parametrize("text", ["SELECT * FROM Orders o CROSS JOIN "]) +def test_duplicate_table_aliases(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas() + + [ + table("orders o2"), + table("users u"), + table('"Users" U'), + table('"select" s'), + view("user_emails ue"), + view("functions f"), + function("_custom_fun() cf"), + function("custom_fun() cf"), + function("custom_func1() cf"), + function("custom_func2() cf"), + function( + "set_returning_func(x := , y := ) srf", + display="set_returning_func(x, y) srf", + ), + ] + ) + + +@parametrize("completer", completers(casing=True, aliasing=True)) +@parametrize("text", ["SELECT * FROM Orders o CROSS JOIN "]) +def test_duplicate_aliases_with_casing(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [ + schema("PUBLIC"), + table("Orders O2"), + table("Users U"), + table('"Users" U'), + table('"select" s'), + view("User_Emails UE"), + view("Functions F"), + function("_custom_fun() cf"), + function("Custom_Fun() CF"), + function("Custom_Func1() CF"), + function("custom_func2() cf"), + function( + "set_returning_func(x := , y := ) srf", + display="set_returning_func(x, y) srf", + ), + ] + ) + + +@parametrize("completer", completers(casing=True, aliasing=True)) +@parametrize("text", ["SELECT * FROM "]) +def test_aliases_with_casing(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [schema("PUBLIC")] + cased_aliased_rels + ) + + +@parametrize("completer", completers(casing=True, aliasing=False)) +@parametrize("text", ["SELECT * FROM "]) +def test_table_casing(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [schema("PUBLIC")] + cased_rels + ) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "INSERT INTO users ()", + "INSERT INTO users()", + "INSERT INTO users () SELECT * FROM orders;", + "INSERT INTO users() SELECT * FROM users u cross join orders o", + ], +) +def test_insert(completer, text): + position = text.find("(") + 1 + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set(testdata.columns("users")) + + +@parametrize("completer", completers(casing=False, aliasing=False)) +def test_suggest_cte_names(completer): + text = """ + WITH cte1 AS (SELECT a, b, c FROM foo), + cte2 AS (SELECT d, e, f FROM bar) + SELECT * FROM + """ + result = get_result(completer, text) + expected = completions_to_set( + [ + Completion("cte1", 0, display_meta="table"), + Completion("cte2", 0, display_meta="table"), + ] + ) + assert expected <= completions_to_set(result) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +def test_suggest_columns_from_cte(completer): + result = get_result( + completer, + "WITH cte AS (SELECT foo, bar FROM baz) SELECT FROM cte", + len("WITH cte AS (SELECT foo, bar FROM baz) SELECT "), + ) + expected = [ + Completion("foo", 0, display_meta="column"), + Completion("bar", 0, display_meta="column"), + ] + testdata.functions_and_keywords() + + assert completions_to_set(expected) == completions_to_set(result) + + +@parametrize("completer", completers(casing=False, qualify=no_qual)) +@parametrize( + "text", + [ + "WITH cte AS (SELECT foo FROM bar) SELECT * FROM cte WHERE cte.", + "WITH cte AS (SELECT foo FROM bar) SELECT * FROM cte c WHERE c.", + ], +) +def test_cte_qualified_columns(completer, text): + result = get_result(completer, text) + expected = [Completion("foo", 0, display_meta="column")] + assert completions_to_set(expected) == completions_to_set(result) + + +@parametrize( + "keyword_casing,expected,texts", + [ + ("upper", "SELECT", ("", "s", "S", "Sel")), + ("lower", "select", ("", "s", "S", "Sel")), + ("auto", "SELECT", ("", "S", "SEL", "seL")), + ("auto", "select", ("s", "sel", "SEl")), + ], +) +def test_keyword_casing_upper(keyword_casing, expected, texts): + for text in texts: + completer = testdata.get_completer({"keyword_casing": keyword_casing}) + completions = get_result(completer, text) + assert expected in [cpl.text for cpl in completions] + + +@parametrize("completer", completers()) +def test_keyword_after_alter(completer): + text = "ALTER TABLE users ALTER " + expected = Completion("COLUMN", start_position=0, display_meta="keyword") + completions = get_result(completer, text) + assert expected in completions + + +@parametrize("completer", completers()) +def test_set_schema(completer): + text = "SET SCHEMA " + result = get_result(completer, text) + expected = completions_to_set([schema("'public'")]) + assert completions_to_set(result) == expected + + +@parametrize("completer", completers()) +def test_special_name_completion(completer): + result = get_result(completer, "\\t") + assert completions_to_set(result) == completions_to_set( + [ + Completion( + text="\\timing", + start_position=-2, + display_meta="Toggle timing of commands.", + ) + ] + ) -- cgit v1.2.3