diff options
Diffstat (limited to '')
-rw-r--r-- | tests/parseutils/test_ctes.py | 137 | ||||
-rw-r--r-- | tests/parseutils/test_function_metadata.py | 19 | ||||
-rw-r--r-- | tests/parseutils/test_parseutils.py | 280 |
3 files changed, 436 insertions, 0 deletions
diff --git a/tests/parseutils/test_ctes.py b/tests/parseutils/test_ctes.py new file mode 100644 index 0000000..3e89cca --- /dev/null +++ b/tests/parseutils/test_ctes.py @@ -0,0 +1,137 @@ +import pytest +from sqlparse import parse +from pgcli.packages.parseutils.ctes import ( + token_start_pos, + extract_ctes, + extract_column_names as _extract_column_names, +) + + +def extract_column_names(sql): + p = parse(sql)[0] + return _extract_column_names(p) + + +def test_token_str_pos(): + sql = "SELECT * FROM xxx" + p = parse(sql)[0] + idx = p.token_index(p.tokens[-1]) + assert token_start_pos(p.tokens, idx) == len("SELECT * FROM ") + + sql = "SELECT * FROM \nxxx" + p = parse(sql)[0] + idx = p.token_index(p.tokens[-1]) + assert token_start_pos(p.tokens, idx) == len("SELECT * FROM \n") + + +def test_single_column_name_extraction(): + sql = "SELECT abc FROM xxx" + assert extract_column_names(sql) == ("abc",) + + +def test_aliased_single_column_name_extraction(): + sql = "SELECT abc def FROM xxx" + assert extract_column_names(sql) == ("def",) + + +def test_aliased_expression_name_extraction(): + sql = "SELECT 99 abc FROM xxx" + assert extract_column_names(sql) == ("abc",) + + +def test_multiple_column_name_extraction(): + sql = "SELECT abc, def FROM xxx" + assert extract_column_names(sql) == ("abc", "def") + + +def test_missing_column_name_handled_gracefully(): + sql = "SELECT abc, 99 FROM xxx" + assert extract_column_names(sql) == ("abc",) + + sql = "SELECT abc, 99, def FROM xxx" + assert extract_column_names(sql) == ("abc", "def") + + +def test_aliased_multiple_column_name_extraction(): + sql = "SELECT abc def, ghi jkl FROM xxx" + assert extract_column_names(sql) == ("def", "jkl") + + +def test_table_qualified_column_name_extraction(): + sql = "SELECT abc.def, ghi.jkl FROM xxx" + assert extract_column_names(sql) == ("def", "jkl") + + +@pytest.mark.parametrize( + "sql", + [ + "INSERT INTO foo (x, y, z) VALUES (5, 6, 7) RETURNING x, y", + "DELETE FROM foo WHERE x > y RETURNING x, y", + "UPDATE foo SET x = 9 RETURNING x, y", + ], +) +def test_extract_column_names_from_returning_clause(sql): + assert extract_column_names(sql) == ("x", "y") + + +def test_simple_cte_extraction(): + sql = "WITH a AS (SELECT abc FROM xxx) SELECT * FROM a" + start_pos = len("WITH a AS ") + stop_pos = len("WITH a AS (SELECT abc FROM xxx)") + ctes, remainder = extract_ctes(sql) + + assert tuple(ctes) == (("a", ("abc",), start_pos, stop_pos),) + assert remainder.strip() == "SELECT * FROM a" + + +def test_cte_extraction_around_comments(): + sql = """--blah blah blah + WITH a AS (SELECT abc def FROM x) + SELECT * FROM a""" + start_pos = len( + """--blah blah blah + WITH a AS """ + ) + stop_pos = len( + """--blah blah blah + WITH a AS (SELECT abc def FROM x)""" + ) + + ctes, remainder = extract_ctes(sql) + assert tuple(ctes) == (("a", ("def",), start_pos, stop_pos),) + assert remainder.strip() == "SELECT * FROM a" + + +def test_multiple_cte_extraction(): + sql = """WITH + x AS (SELECT abc, def FROM x), + y AS (SELECT ghi, jkl FROM y) + SELECT * FROM a, b""" + + start1 = len( + """WITH + x AS """ + ) + + stop1 = len( + """WITH + x AS (SELECT abc, def FROM x)""" + ) + + start2 = len( + """WITH + x AS (SELECT abc, def FROM x), + y AS """ + ) + + stop2 = len( + """WITH + x AS (SELECT abc, def FROM x), + y AS (SELECT ghi, jkl FROM y)""" + ) + + ctes, remainder = extract_ctes(sql) + assert tuple(ctes) == ( + ("x", ("abc", "def"), start1, stop1), + ("y", ("ghi", "jkl"), start2, stop2), + ) diff --git a/tests/parseutils/test_function_metadata.py b/tests/parseutils/test_function_metadata.py new file mode 100644 index 0000000..0350e2a --- /dev/null +++ b/tests/parseutils/test_function_metadata.py @@ -0,0 +1,19 @@ +from pgcli.packages.parseutils.meta import FunctionMetadata + + +def test_function_metadata_eq(): + f1 = FunctionMetadata( + "s", "f", ["x"], ["integer"], [], "int", False, False, False, False, None + ) + f2 = FunctionMetadata( + "s", "f", ["x"], ["integer"], [], "int", False, False, False, False, None + ) + f3 = FunctionMetadata( + "s", "g", ["x"], ["integer"], [], "int", False, False, False, False, None + ) + assert f1 == f2 + assert f1 != f3 + assert not (f1 != f2) + assert not (f1 == f3) + assert hash(f1) == hash(f2) + assert hash(f1) != hash(f3) diff --git a/tests/parseutils/test_parseutils.py b/tests/parseutils/test_parseutils.py new file mode 100644 index 0000000..5a375d7 --- /dev/null +++ b/tests/parseutils/test_parseutils.py @@ -0,0 +1,280 @@ +import pytest +from pgcli.packages.parseutils import is_destructive +from pgcli.packages.parseutils.tables import extract_tables +from pgcli.packages.parseutils.utils import find_prev_keyword, is_open_quote + + +def test_empty_string(): + tables = extract_tables("") + assert tables == () + + +def test_simple_select_single_table(): + tables = extract_tables("select * from abc") + assert tables == ((None, "abc", None, False),) + + +@pytest.mark.parametrize( + "sql", ['select * from "abc"."def"', 'select * from abc."def"'] +) +def test_simple_select_single_table_schema_qualified_quoted_table(sql): + tables = extract_tables(sql) + assert tables == (("abc", "def", '"def"', False),) + + +@pytest.mark.parametrize("sql", ["select * from abc.def", 'select * from "abc".def']) +def test_simple_select_single_table_schema_qualified(sql): + tables = extract_tables(sql) + assert tables == (("abc", "def", None, False),) + + +def test_simple_select_single_table_double_quoted(): + tables = extract_tables('select * from "Abc"') + assert tables == ((None, "Abc", None, False),) + + +def test_simple_select_multiple_tables(): + tables = extract_tables("select * from abc, def") + assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} + + +def test_simple_select_multiple_tables_double_quoted(): + tables = extract_tables('select * from "Abc", "Def"') + assert set(tables) == {(None, "Abc", None, False), (None, "Def", None, False)} + + +def test_simple_select_single_table_deouble_quoted_aliased(): + tables = extract_tables('select * from "Abc" a') + assert tables == ((None, "Abc", "a", False),) + + +def test_simple_select_multiple_tables_deouble_quoted_aliased(): + tables = extract_tables('select * from "Abc" a, "Def" d') + assert set(tables) == {(None, "Abc", "a", False), (None, "Def", "d", False)} + + +def test_simple_select_multiple_tables_schema_qualified(): + tables = extract_tables("select * from abc.def, ghi.jkl") + assert set(tables) == {("abc", "def", None, False), ("ghi", "jkl", None, False)} + + +def test_simple_select_with_cols_single_table(): + tables = extract_tables("select a,b from abc") + assert tables == ((None, "abc", None, False),) + + +def test_simple_select_with_cols_single_table_schema_qualified(): + tables = extract_tables("select a,b from abc.def") + assert tables == (("abc", "def", None, False),) + + +def test_simple_select_with_cols_multiple_tables(): + tables = extract_tables("select a,b from abc, def") + assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} + + +def test_simple_select_with_cols_multiple_qualified_tables(): + tables = extract_tables("select a,b from abc.def, def.ghi") + assert set(tables) == {("abc", "def", None, False), ("def", "ghi", None, False)} + + +def test_select_with_hanging_comma_single_table(): + tables = extract_tables("select a, from abc") + assert tables == ((None, "abc", None, False),) + + +def test_select_with_hanging_comma_multiple_tables(): + tables = extract_tables("select a, from abc, def") + assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} + + +def test_select_with_hanging_period_multiple_tables(): + tables = extract_tables("SELECT t1. FROM tabl1 t1, tabl2 t2") + assert set(tables) == {(None, "tabl1", "t1", False), (None, "tabl2", "t2", False)} + + +def test_simple_insert_single_table(): + tables = extract_tables('insert into abc (id, name) values (1, "def")') + + # sqlparse mistakenly assigns an alias to the table + # AND mistakenly identifies the field list as + # assert tables == ((None, 'abc', 'abc', False),) + + assert tables == ((None, "abc", "abc", False),) + + +@pytest.mark.xfail +def test_simple_insert_single_table_schema_qualified(): + tables = extract_tables('insert into abc.def (id, name) values (1, "def")') + assert tables == (("abc", "def", None, False),) + + +def test_simple_update_table_no_schema(): + tables = extract_tables("update abc set id = 1") + assert tables == ((None, "abc", None, False),) + + +def test_simple_update_table_with_schema(): + tables = extract_tables("update abc.def set id = 1") + assert tables == (("abc", "def", None, False),) + + +@pytest.mark.parametrize("join_type", ["", "INNER", "LEFT", "RIGHT OUTER"]) +def test_join_table(join_type): + sql = f"SELECT * FROM abc a {join_type} JOIN def d ON a.id = d.num" + tables = extract_tables(sql) + assert set(tables) == {(None, "abc", "a", False), (None, "def", "d", False)} + + +def test_join_table_schema_qualified(): + tables = extract_tables("SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num") + assert set(tables) == {("abc", "def", "x", False), ("ghi", "jkl", "y", False)} + + +def test_incomplete_join_clause(): + sql = """select a.x, b.y + from abc a join bcd b + on a.id = """ + tables = extract_tables(sql) + assert tables == ((None, "abc", "a", False), (None, "bcd", "b", False)) + + +def test_join_as_table(): + tables = extract_tables("SELECT * FROM my_table AS m WHERE m.a > 5") + assert tables == ((None, "my_table", "m", False),) + + +def test_multiple_joins(): + sql = """select * from t1 + inner join t2 ON + t1.id = t2.t1_id + inner join t3 ON + t2.id = t3.""" + tables = extract_tables(sql) + assert tables == ( + (None, "t1", None, False), + (None, "t2", None, False), + (None, "t3", None, False), + ) + + +def test_subselect_tables(): + sql = "SELECT * FROM (SELECT FROM abc" + tables = extract_tables(sql) + assert tables == ((None, "abc", None, False),) + + +@pytest.mark.parametrize("text", ["SELECT * FROM foo.", "SELECT 123 AS foo"]) +def test_extract_no_tables(text): + tables = extract_tables(text) + assert tables == tuple() + + +@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) +def test_simple_function_as_table(arg_list): + tables = extract_tables(f"SELECT * FROM foo({arg_list})") + assert tables == ((None, "foo", None, True),) + + +@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) +def test_simple_schema_qualified_function_as_table(arg_list): + tables = extract_tables(f"SELECT * FROM foo.bar({arg_list})") + assert tables == (("foo", "bar", None, True),) + + +@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) +def test_simple_aliased_function_as_table(arg_list): + tables = extract_tables(f"SELECT * FROM foo({arg_list}) bar") + assert tables == ((None, "foo", "bar", True),) + + +def test_simple_table_and_function(): + tables = extract_tables("SELECT * FROM foo JOIN bar()") + assert set(tables) == {(None, "foo", None, False), (None, "bar", None, True)} + + +def test_complex_table_and_function(): + tables = extract_tables( + """SELECT * FROM foo.bar baz + JOIN bar.qux(x, y, z) quux""" + ) + assert set(tables) == {("foo", "bar", "baz", False), ("bar", "qux", "quux", True)} + + +def test_find_prev_keyword_using(): + q = "select * from tbl1 inner join tbl2 using (col1, " + kw, q2 = find_prev_keyword(q) + assert kw.value == "(" and q2 == "select * from tbl1 inner join tbl2 using (" + + +@pytest.mark.parametrize( + "sql", + [ + "select * from foo where bar", + "select * from foo where bar = 1 and baz or ", + "select * from foo where bar = 1 and baz between qux and ", + ], +) +def test_find_prev_keyword_where(sql): + kw, stripped = find_prev_keyword(sql) + assert kw.value == "where" and stripped == "select * from foo where" + + +@pytest.mark.parametrize( + "sql", ["create table foo (bar int, baz ", "select * from foo() as bar (baz "] +) +def test_find_prev_keyword_open_parens(sql): + kw, _ = find_prev_keyword(sql) + assert kw.value == "(" + + +@pytest.mark.parametrize( + "sql", + [ + "", + "$$ foo $$", + "$$ 'foo' $$", + '$$ "foo" $$', + "$$ $a$ $$", + "$a$ $$ $a$", + "foo bar $$ baz $$", + ], +) +def test_is_open_quote__closed(sql): + assert not is_open_quote(sql) + + +@pytest.mark.parametrize( + "sql", + [ + "$$", + ";;;$$", + "foo $$ bar $$; foo $$", + "$$ foo $a$", + "foo 'bar baz", + "$a$ foo ", + '$$ "foo" ', + "$$ $a$ ", + "foo bar $$ baz", + ], +) +def test_is_open_quote__open(sql): + assert is_open_quote(sql) + + +@pytest.mark.parametrize( + ("sql", "warning_level", "expected"), + [ + ("update abc set x = 1", "all", True), + ("update abc set x = 1 where y = 2", "all", True), + ("update abc set x = 1", "moderate", True), + ("update abc set x = 1 where y = 2", "moderate", False), + ("select x, y, z from abc", "all", False), + ("drop abc", "all", True), + ("alter abc", "all", True), + ("delete abc", "all", True), + ("truncate abc", "all", True), + ], +) +def test_is_destructive(sql, warning_level, expected): + assert is_destructive(sql, warning_level=warning_level) == expected |