diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 03:06:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 03:06:41 +0000 |
commit | 708c091a8b4db6a55be1c96ae33ee0da632b269f (patch) | |
tree | aac9e87c59cb8bc7e3cd429e9200c3ca017cb591 /tests/test_smart_completion_multiple_schemata.py | |
parent | Initial commit. (diff) | |
download | pgcli-708c091a8b4db6a55be1c96ae33ee0da632b269f.tar.xz pgcli-708c091a8b4db6a55be1c96ae33ee0da632b269f.zip |
Adding upstream version 4.0.1.upstream/4.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/test_smart_completion_multiple_schemata.py')
-rw-r--r-- | tests/test_smart_completion_multiple_schemata.py | 757 |
1 files changed, 757 insertions, 0 deletions
diff --git a/tests/test_smart_completion_multiple_schemata.py b/tests/test_smart_completion_multiple_schemata.py new file mode 100644 index 0000000..5c9c9af --- /dev/null +++ b/tests/test_smart_completion_multiple_schemata.py @@ -0,0 +1,757 @@ +import itertools +from metadata import ( + MetaData, + alias, + name_join, + fk_join, + join, + schema, + table, + function, + wildcard_expansion, + column, + get_result, + result_set, + qual, + no_qual, + parametrize, +) +from utils import completions_to_set + +metadata = { + "tables": { + "public": { + "users": ["id", "email", "first_name", "last_name"], + "orders": ["id", "ordered_date", "status", "datestamp"], + "select": ["id", "localtime", "ABC"], + }, + "custom": { + "users": ["id", "phone_number"], + "Users": ["userid", "username"], + "products": ["id", "product_name", "price"], + "shipments": ["id", "address", "user_id"], + }, + "Custom": {"projects": ["projectid", "name"]}, + "blog": { + "entries": ["entryid", "entrytitle", "entrytext"], + "tags": ["tagid", "name"], + "entrytags": ["entryid", "tagid"], + "entacclog": ["entryid", "username", "datestamp"], + }, + }, + "functions": { + "public": [ + ["func1", [], [], [], "", False, False, False, False], + ["func2", [], [], [], "", False, False, False, False], + ], + "custom": [ + ["func3", [], [], [], "", False, False, False, False], + [ + "set_returning_func", + ["x"], + ["integer"], + ["o"], + "integer", + False, + False, + True, + False, + ], + ], + "Custom": [["func4", [], [], [], "", False, False, False, False]], + "blog": [ + [ + "extract_entry_symbols", + ["_entryid", "symbol"], + ["integer", "text"], + ["i", "o"], + "", + False, + False, + True, + False, + ], + [ + "enter_entry", + ["_title", "_text", "entryid"], + ["text", "text", "integer"], + ["i", "i", "o"], + "", + False, + False, + False, + False, + ], + ], + }, + "datatypes": {"public": ["typ1", "typ2"], "custom": ["typ3", "typ4"]}, + "foreignkeys": { + "custom": [("public", "users", "id", "custom", "shipments", "user_id")], + "blog": [ + ("blog", "entries", "entryid", "blog", "entacclog", "entryid"), + ("blog", "entries", "entryid", "blog", "entrytags", "entryid"), + ("blog", "tags", "tagid", "blog", "entrytags", "tagid"), + ], + }, + "defaults": { + "public": { + ("orders", "id"): "nextval('orders_id_seq'::regclass)", + ("orders", "datestamp"): "now()", + ("orders", "status"): "'PENDING'::text", + } + }, +} + +testdata = MetaData(metadata) +cased_schemas = [schema(x) for x in ("public", "blog", "CUSTOM", '"Custom"')] +casing = ( + "SELECT", + "Orders", + "User_Emails", + "CUSTOM", + "Func1", + "Entries", + "Tags", + "EntryTags", + "EntAccLog", + "EntryID", + "EntryTitle", + "EntryText", +) +completers = testdata.get_completers(casing) + + +@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual)) +@parametrize("table", ["users", '"users"']) +def test_suggested_column_names_from_shadowed_visible_table(completer, table): + result = get_result(completer, "SELECT FROM " + table, len("SELECT ")) + assert completions_to_set(result) == completions_to_set( + testdata.columns_functions_and_keywords("users") + ) + + +@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual)) +@parametrize( + "text", + [ + "SELECT from custom.users", + "WITH users as (SELECT 1 AS foo) SELECT from custom.users", + ], +) +def test_suggested_column_names_from_qualified_shadowed_table(completer, text): + result = get_result(completer, text, position=text.find(" ") + 1) + assert completions_to_set(result) == completions_to_set( + testdata.columns_functions_and_keywords("users", "custom") + ) + + +@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual)) +@parametrize("text", ["WITH users as (SELECT 1 AS foo) SELECT from users"]) +def test_suggested_column_names_from_cte(completer, text): + result = completions_to_set(get_result(completer, text, text.find(" ") + 1)) + assert result == completions_to_set( + [column("foo")] + testdata.functions_and_keywords() + ) + + +@parametrize("completer", completers(casing=False)) +@parametrize( + "text", + [ + "SELECT * FROM users JOIN custom.shipments ON ", + """SELECT * + FROM public.users + JOIN custom.shipments ON """, + ], +) +def test_suggested_join_conditions(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [ + alias("users"), + alias("shipments"), + name_join("shipments.id = users.id"), + fk_join("shipments.user_id = users.id"), + ] + ) + + +@parametrize("completer", completers(filtr=True, casing=False, aliasing=False)) +@parametrize( + ("query", "tbl"), + itertools.product( + ( + "SELECT * FROM public.{0} RIGHT OUTER JOIN ", + """SELECT * + FROM {0} + JOIN """, + ), + ("users", '"users"', "Users"), + ), +) +def test_suggested_joins(completer, query, tbl): + result = get_result(completer, query.format(tbl)) + assert completions_to_set(result) == completions_to_set( + testdata.schemas_and_from_clause_items() + + [join(f"custom.shipments ON shipments.user_id = {tbl}.id")] + ) + + +@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual)) +def test_suggested_column_names_from_schema_qualifed_table(completer): + result = get_result(completer, "SELECT from custom.products", len("SELECT ")) + assert completions_to_set(result) == completions_to_set( + testdata.columns_functions_and_keywords("products", "custom") + ) + + +@parametrize( + "text", + [ + "INSERT INTO orders(", + "INSERT INTO orders (", + "INSERT INTO public.orders(", + "INSERT INTO public.orders (", + ], +) +@parametrize("completer", completers(filtr=True, casing=False)) +def test_suggested_columns_with_insert(completer, text): + assert completions_to_set(get_result(completer, text)) == completions_to_set( + testdata.columns("orders") + ) + + +@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual)) +def test_suggested_column_names_in_function(completer): + result = get_result( + completer, "SELECT MAX( from custom.products", len("SELECT MAX(") + ) + assert completions_to_set(result) == completions_to_set( + testdata.columns_functions_and_keywords("products", "custom") + ) + + +@parametrize("completer", completers(casing=False, aliasing=False)) +@parametrize( + "text", + ["SELECT * FROM Custom.", "SELECT * FROM custom.", 'SELECT * FROM "custom".'], +) +@parametrize("use_leading_double_quote", [False, True]) +def test_suggested_table_names_with_schema_dot( + completer, text, use_leading_double_quote +): + if use_leading_double_quote: + text += '"' + start_position = -1 + else: + start_position = 0 + + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.from_clause_items("custom", start_position) + ) + + +@parametrize("completer", completers(casing=False, aliasing=False)) +@parametrize("text", ['SELECT * FROM "Custom".']) +@parametrize("use_leading_double_quote", [False, True]) +def test_suggested_table_names_with_schema_dot2( + completer, text, use_leading_double_quote +): + if use_leading_double_quote: + text += '"' + start_position = -1 + else: + start_position = 0 + + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.from_clause_items("Custom", start_position) + ) + + +@parametrize("completer", completers(filtr=True, casing=False)) +def test_suggested_column_names_with_qualified_alias(completer): + result = get_result(completer, "SELECT p. from custom.products p", len("SELECT p.")) + assert completions_to_set(result) == completions_to_set( + testdata.columns("products", "custom") + ) + + +@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual)) +def test_suggested_multiple_column_names(completer): + result = get_result( + completer, "SELECT id, from custom.products", len("SELECT id, ") + ) + assert completions_to_set(result) == completions_to_set( + testdata.columns_functions_and_keywords("products", "custom") + ) + + +@parametrize("completer", completers(filtr=True, casing=False)) +def test_suggested_multiple_column_names_with_alias(completer): + result = get_result( + completer, "SELECT p.id, p. from custom.products p", len("SELECT u.id, u.") + ) + assert completions_to_set(result) == completions_to_set( + testdata.columns("products", "custom") + ) + + +@parametrize("completer", completers(filtr=True, casing=False)) +@parametrize( + "text", + [ + "SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON ", + "SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON JOIN public.orders z ON z.id > y.id", + ], +) +def test_suggestions_after_on(completer, text): + position = len( + "SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON " + ) + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + [ + alias("x"), + alias("y"), + name_join("y.price = x.price"), + name_join("y.product_name = x.product_name"), + name_join("y.id = x.id"), + ] + ) + + +@parametrize("completer", completers()) +def test_suggested_aliases_after_on_right_side(completer): + text = "SELECT x.id, y.product_name FROM custom.products x JOIN custom.products y ON x.id = " + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set([alias("x"), alias("y")]) + + +@parametrize("completer", completers(filtr=True, casing=False, aliasing=False)) +def test_table_names_after_from(completer): + text = "SELECT * FROM " + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas_and_from_clause_items() + ) + + +@parametrize("completer", completers(filtr=True, casing=False)) +def test_schema_qualified_function_name(completer): + text = "SELECT custom.func" + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [ + function("func3()", -len("func")), + function("set_returning_func()", -len("func")), + ] + ) + + +@parametrize("completer", completers(filtr=True, casing=False, aliasing=False)) +def test_schema_qualified_function_name_after_from(completer): + text = "SELECT * FROM custom.set_r" + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [ + function("set_returning_func()", -len("func")), + ] + ) + + +@parametrize("completer", completers(filtr=True, casing=False, aliasing=False)) +def test_unqualified_function_name_not_returned(completer): + text = "SELECT * FROM set_r" + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set([]) + + +@parametrize("completer", completers(filtr=True, casing=False, aliasing=False)) +def test_unqualified_function_name_in_search_path(completer): + completer.search_path = ["public", "custom"] + text = "SELECT * FROM set_r" + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [ + function("set_returning_func()", -len("func")), + ] + ) + + +@parametrize("completer", completers(filtr=True, casing=False)) +@parametrize( + "text", + [ + "SELECT 1::custom.", + "CREATE TABLE foo (bar custom.", + "CREATE FUNCTION foo (bar INT, baz custom.", + "ALTER TABLE foo ALTER COLUMN bar TYPE custom.", + ], +) +def test_schema_qualified_type_name(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set(testdata.types("custom")) + + +@parametrize("completer", completers(filtr=True, casing=False)) +def test_suggest_columns_from_aliased_set_returning_function(completer): + result = get_result( + completer, "select f. from custom.set_returning_func() f", len("select f.") + ) + assert completions_to_set(result) == completions_to_set( + testdata.columns("set_returning_func", "custom", "functions") + ) + + +@parametrize("completer", completers(filtr=True, casing=False, qualify=no_qual)) +@parametrize( + "text", + [ + "SELECT * FROM custom.set_returning_func()", + "SELECT * FROM Custom.set_returning_func()", + "SELECT * FROM Custom.Set_Returning_Func()", + ], +) +def test_wildcard_column_expansion_with_function(completer, text): + position = len("SELECT *") + + completions = get_result(completer, text, position) + + col_list = "x" + expected = [wildcard_expansion(col_list)] + + assert expected == completions + + +@parametrize("completer", completers(filtr=True, casing=False)) +def test_wildcard_column_expansion_with_alias_qualifier(completer): + text = "SELECT p.* FROM custom.products p" + position = len("SELECT p.*") + + completions = get_result(completer, text, position) + + col_list = "id, p.product_name, p.price" + expected = [wildcard_expansion(col_list)] + + assert expected == completions + + +@parametrize("completer", completers(filtr=True, casing=False)) +@parametrize( + "text", + [ + """ + SELECT count(1) FROM users; + CREATE FUNCTION foo(custom.products _products) returns custom.shipments + LANGUAGE SQL + AS $foo$ + SELECT 1 FROM custom.shipments; + INSERT INTO public.orders(*) values(-1, now(), 'preliminary'); + SELECT 2 FROM custom.users; + $foo$; + SELECT count(1) FROM custom.shipments; + """, + "INSERT INTO public.orders(*", + "INSERT INTO public.Orders(*", + "INSERT INTO public.orders (*", + "INSERT INTO public.Orders (*", + "INSERT INTO orders(*", + "INSERT INTO Orders(*", + "INSERT INTO orders (*", + "INSERT INTO Orders (*", + "INSERT INTO public.orders(*)", + "INSERT INTO public.Orders(*)", + "INSERT INTO public.orders (*)", + "INSERT INTO public.Orders (*)", + "INSERT INTO orders(*)", + "INSERT INTO Orders(*)", + "INSERT INTO orders (*)", + "INSERT INTO Orders (*)", + ], +) +def test_wildcard_column_expansion_with_insert(completer, text): + position = text.index("*") + 1 + completions = get_result(completer, text, position) + + expected = [wildcard_expansion("ordered_date, status")] + assert expected == completions + + +@parametrize("completer", completers(filtr=True, casing=False)) +def test_wildcard_column_expansion_with_table_qualifier(completer): + text = 'SELECT "select".* FROM public."select"' + position = len('SELECT "select".*') + + completions = get_result(completer, text, position) + + col_list = 'id, "select"."localtime", "select"."ABC"' + expected = [wildcard_expansion(col_list)] + + assert expected == completions + + +@parametrize("completer", completers(filtr=True, casing=False, qualify=qual)) +def test_wildcard_column_expansion_with_two_tables(completer): + text = 'SELECT * FROM public."select" JOIN custom.users ON true' + position = len("SELECT *") + + completions = get_result(completer, text, position) + + cols = ( + '"select".id, "select"."localtime", "select"."ABC", ' + "users.id, users.phone_number" + ) + expected = [wildcard_expansion(cols)] + assert completions == expected + + +@parametrize("completer", completers(filtr=True, casing=False)) +def test_wildcard_column_expansion_with_two_tables_and_parent(completer): + text = 'SELECT "select".* FROM public."select" JOIN custom.users u ON true' + position = len('SELECT "select".*') + + completions = get_result(completer, text, position) + + col_list = 'id, "select"."localtime", "select"."ABC"' + expected = [wildcard_expansion(col_list)] + + assert expected == completions + + +@parametrize("completer", completers(filtr=True, casing=False)) +@parametrize( + "text", + [ + "SELECT U. FROM custom.Users U", + "SELECT U. FROM custom.USERS U", + "SELECT U. FROM custom.users U", + 'SELECT U. FROM "custom".Users U', + 'SELECT U. FROM "custom".USERS U', + 'SELECT U. FROM "custom".users U', + ], +) +def test_suggest_columns_from_unquoted_table(completer, text): + position = len("SELECT U.") + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + testdata.columns("users", "custom") + ) + + +@parametrize("completer", completers(filtr=True, casing=False)) +@parametrize( + "text", ['SELECT U. FROM custom."Users" U', 'SELECT U. FROM "custom"."Users" U'] +) +def test_suggest_columns_from_quoted_table(completer, text): + position = len("SELECT U.") + result = get_result(completer, text, position) + assert completions_to_set(result) == completions_to_set( + testdata.columns("Users", "custom") + ) + + +texts = ["SELECT * FROM ", "SELECT * FROM public.Orders O CROSS JOIN "] + + +@parametrize("completer", completers(filtr=True, casing=False, aliasing=False)) +@parametrize("text", texts) +def test_schema_or_visible_table_completion(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas_and_from_clause_items() + ) + + +@parametrize("completer", completers(aliasing=True, casing=False, filtr=True)) +@parametrize("text", texts) +def test_table_aliases(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + testdata.schemas() + + [ + table("users u"), + table("orders o" if text == "SELECT * FROM " else "orders o2"), + table('"select" s'), + function("func1() f"), + function("func2() f"), + ] + ) + + +@parametrize("completer", completers(aliasing=True, casing=True, filtr=True)) +@parametrize("text", texts) +def test_aliases_with_casing(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + cased_schemas + + [ + table("users u"), + table("Orders O" if text == "SELECT * FROM " else "Orders O2"), + table('"select" s'), + function("Func1() F"), + function("func2() f"), + ] + ) + + +@parametrize("completer", completers(aliasing=False, casing=True, filtr=True)) +@parametrize("text", texts) +def test_table_casing(completer, text): + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + cased_schemas + + [ + table("users"), + table("Orders"), + table('"select"'), + function("Func1()"), + function("func2()"), + ] + ) + + +@parametrize("completer", completers(aliasing=False, casing=True)) +def test_alias_search_without_aliases2(completer): + text = "SELECT * FROM blog.et" + result = get_result(completer, text) + assert result[0] == table("EntryTags", -2) + + +@parametrize("completer", completers(aliasing=False, casing=True)) +def test_alias_search_without_aliases1(completer): + text = "SELECT * FROM blog.e" + result = get_result(completer, text) + assert result[0] == table("Entries", -1) + + +@parametrize("completer", completers(aliasing=True, casing=True)) +def test_alias_search_with_aliases2(completer): + text = "SELECT * FROM blog.et" + result = get_result(completer, text) + assert result[0] == table("EntryTags ET", -2) + + +@parametrize("completer", completers(aliasing=True, casing=True)) +def test_alias_search_with_aliases1(completer): + text = "SELECT * FROM blog.e" + result = get_result(completer, text) + assert result[0] == table("Entries E", -1) + + +@parametrize("completer", completers(aliasing=True, casing=True)) +def test_join_alias_search_with_aliases1(completer): + text = "SELECT * FROM blog.Entries E JOIN blog.e" + result = get_result(completer, text) + assert result[:2] == [ + table("Entries E2", -1), + join("EntAccLog EAL ON EAL.EntryID = E.EntryID", -1), + ] + + +@parametrize("completer", completers(aliasing=False, casing=True)) +def test_join_alias_search_without_aliases1(completer): + text = "SELECT * FROM blog.Entries JOIN blog.e" + result = get_result(completer, text) + assert result[:2] == [ + table("Entries", -1), + join("EntAccLog ON EntAccLog.EntryID = Entries.EntryID", -1), + ] + + +@parametrize("completer", completers(aliasing=True, casing=True)) +def test_join_alias_search_with_aliases2(completer): + text = "SELECT * FROM blog.Entries E JOIN blog.et" + result = get_result(completer, text) + assert result[0] == join("EntryTags ET ON ET.EntryID = E.EntryID", -2) + + +@parametrize("completer", completers(aliasing=False, casing=True)) +def test_join_alias_search_without_aliases2(completer): + text = "SELECT * FROM blog.Entries JOIN blog.et" + result = get_result(completer, text) + assert result[0] == join("EntryTags ON EntryTags.EntryID = Entries.EntryID", -2) + + +@parametrize("completer", completers()) +def test_function_alias_search_without_aliases(completer): + text = "SELECT blog.ees" + result = get_result(completer, text) + first = result[0] + assert first.start_position == -3 + assert first.text == "extract_entry_symbols()" + assert first.display_text == "extract_entry_symbols(_entryid)" + + +@parametrize("completer", completers()) +def test_function_alias_search_with_aliases(completer): + text = "SELECT blog.ee" + result = get_result(completer, text) + first = result[0] + assert first.start_position == -2 + assert first.text == "enter_entry(_title := , _text := )" + assert first.display_text == "enter_entry(_title, _text)" + + +@parametrize("completer", completers(filtr=True, casing=True, qualify=no_qual)) +def test_column_alias_search(completer): + result = get_result(completer, "SELECT et FROM blog.Entries E", len("SELECT et")) + cols = ("EntryText", "EntryTitle", "EntryID") + assert result[:3] == [column(c, -2) for c in cols] + + +@parametrize("completer", completers(casing=True)) +def test_column_alias_search_qualified(completer): + result = get_result( + completer, "SELECT E.ei FROM blog.Entries E", len("SELECT E.ei") + ) + cols = ("EntryID", "EntryTitle") + assert result[:3] == [column(c, -2) for c in cols] + + +@parametrize("completer", completers(casing=False, filtr=False, aliasing=False)) +def test_schema_object_order(completer): + result = get_result(completer, "SELECT * FROM u") + assert result[:3] == [ + table(t, pos=-1) for t in ("users", 'custom."Users"', "custom.users") + ] + + +@parametrize("completer", completers(casing=False, filtr=False, aliasing=False)) +def test_all_schema_objects(completer): + text = "SELECT * FROM " + result = get_result(completer, text) + assert completions_to_set(result) >= completions_to_set( + [table(x) for x in ("orders", '"select"', "custom.shipments")] + + [function(x + "()") for x in ("func2",)] + ) + + +@parametrize("completer", completers(filtr=False, aliasing=False, casing=True)) +def test_all_schema_objects_with_casing(completer): + text = "SELECT * FROM " + result = get_result(completer, text) + assert completions_to_set(result) >= completions_to_set( + [table(x) for x in ("Orders", '"select"', "CUSTOM.shipments")] + + [function(x + "()") for x in ("func2",)] + ) + + +@parametrize("completer", completers(casing=False, filtr=False, aliasing=True)) +def test_all_schema_objects_with_aliases(completer): + text = "SELECT * FROM " + result = get_result(completer, text) + assert completions_to_set(result) >= completions_to_set( + [table(x) for x in ("orders o", '"select" s', "custom.shipments s")] + + [function(x) for x in ("func2() f",)] + ) + + +@parametrize("completer", completers(casing=False, filtr=False, aliasing=True)) +def test_set_schema(completer): + text = "SET SCHEMA " + result = get_result(completer, text) + assert completions_to_set(result) == completions_to_set( + [schema("'blog'"), schema("'Custom'"), schema("'custom'"), schema("'public'")] + ) |