import pytest from pgcli.packages.parseutils import is_destructive from pgcli.packages.parseutils.tables import extract_tables from pgcli.packages.parseutils.utils import find_prev_keyword, is_open_quote def test_empty_string(): tables = extract_tables("") assert tables == () def test_simple_select_single_table(): tables = extract_tables("select * from abc") assert tables == ((None, "abc", None, False),) @pytest.mark.parametrize( "sql", ['select * from "abc"."def"', 'select * from abc."def"'] ) def test_simple_select_single_table_schema_qualified_quoted_table(sql): tables = extract_tables(sql) assert tables == (("abc", "def", '"def"', False),) @pytest.mark.parametrize("sql", ["select * from abc.def", 'select * from "abc".def']) def test_simple_select_single_table_schema_qualified(sql): tables = extract_tables(sql) assert tables == (("abc", "def", None, False),) def test_simple_select_single_table_double_quoted(): tables = extract_tables('select * from "Abc"') assert tables == ((None, "Abc", None, False),) def test_simple_select_multiple_tables(): tables = extract_tables("select * from abc, def") assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} def test_simple_select_multiple_tables_double_quoted(): tables = extract_tables('select * from "Abc", "Def"') assert set(tables) == {(None, "Abc", None, False), (None, "Def", None, False)} def test_simple_select_single_table_deouble_quoted_aliased(): tables = extract_tables('select * from "Abc" a') assert tables == ((None, "Abc", "a", False),) def test_simple_select_multiple_tables_deouble_quoted_aliased(): tables = extract_tables('select * from "Abc" a, "Def" d') assert set(tables) == {(None, "Abc", "a", False), (None, "Def", "d", False)} def test_simple_select_multiple_tables_schema_qualified(): tables = extract_tables("select * from abc.def, ghi.jkl") assert set(tables) == {("abc", "def", None, False), ("ghi", "jkl", None, False)} def test_simple_select_with_cols_single_table(): tables = extract_tables("select a,b from abc") assert tables == ((None, "abc", None, False),) def test_simple_select_with_cols_single_table_schema_qualified(): tables = extract_tables("select a,b from abc.def") assert tables == (("abc", "def", None, False),) def test_simple_select_with_cols_multiple_tables(): tables = extract_tables("select a,b from abc, def") assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} def test_simple_select_with_cols_multiple_qualified_tables(): tables = extract_tables("select a,b from abc.def, def.ghi") assert set(tables) == {("abc", "def", None, False), ("def", "ghi", None, False)} def test_select_with_hanging_comma_single_table(): tables = extract_tables("select a, from abc") assert tables == ((None, "abc", None, False),) def test_select_with_hanging_comma_multiple_tables(): tables = extract_tables("select a, from abc, def") assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} def test_select_with_hanging_period_multiple_tables(): tables = extract_tables("SELECT t1. FROM tabl1 t1, tabl2 t2") assert set(tables) == {(None, "tabl1", "t1", False), (None, "tabl2", "t2", False)} def test_simple_insert_single_table(): tables = extract_tables('insert into abc (id, name) values (1, "def")') # sqlparse mistakenly assigns an alias to the table # AND mistakenly identifies the field list as # assert tables == ((None, 'abc', 'abc', False),) assert tables == ((None, "abc", "abc", False),) @pytest.mark.xfail def test_simple_insert_single_table_schema_qualified(): tables = extract_tables('insert into abc.def (id, name) values (1, "def")') assert tables == (("abc", "def", None, False),) def test_simple_update_table_no_schema(): tables = extract_tables("update abc set id = 1") assert tables == ((None, "abc", None, False),) def test_simple_update_table_with_schema(): tables = extract_tables("update abc.def set id = 1") assert tables == (("abc", "def", None, False),) @pytest.mark.parametrize("join_type", ["", "INNER", "LEFT", "RIGHT OUTER"]) def test_join_table(join_type): sql = f"SELECT * FROM abc a {join_type} JOIN def d ON a.id = d.num" tables = extract_tables(sql) assert set(tables) == {(None, "abc", "a", False), (None, "def", "d", False)} def test_join_table_schema_qualified(): tables = extract_tables("SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num") assert set(tables) == {("abc", "def", "x", False), ("ghi", "jkl", "y", False)} def test_incomplete_join_clause(): sql = """select a.x, b.y from abc a join bcd b on a.id = """ tables = extract_tables(sql) assert tables == ((None, "abc", "a", False), (None, "bcd", "b", False)) def test_join_as_table(): tables = extract_tables("SELECT * FROM my_table AS m WHERE m.a > 5") assert tables == ((None, "my_table", "m", False),) def test_multiple_joins(): sql = """select * from t1 inner join t2 ON t1.id = t2.t1_id inner join t3 ON t2.id = t3.""" tables = extract_tables(sql) assert tables == ( (None, "t1", None, False), (None, "t2", None, False), (None, "t3", None, False), ) def test_subselect_tables(): sql = "SELECT * FROM (SELECT FROM abc" tables = extract_tables(sql) assert tables == ((None, "abc", None, False),) @pytest.mark.parametrize("text", ["SELECT * FROM foo.", "SELECT 123 AS foo"]) def test_extract_no_tables(text): tables = extract_tables(text) assert tables == tuple() @pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) def test_simple_function_as_table(arg_list): tables = extract_tables(f"SELECT * FROM foo({arg_list})") assert tables == ((None, "foo", None, True),) @pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) def test_simple_schema_qualified_function_as_table(arg_list): tables = extract_tables(f"SELECT * FROM foo.bar({arg_list})") assert tables == (("foo", "bar", None, True),) @pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) def test_simple_aliased_function_as_table(arg_list): tables = extract_tables(f"SELECT * FROM foo({arg_list}) bar") assert tables == ((None, "foo", "bar", True),) def test_simple_table_and_function(): tables = extract_tables("SELECT * FROM foo JOIN bar()") assert set(tables) == {(None, "foo", None, False), (None, "bar", None, True)} def test_complex_table_and_function(): tables = extract_tables( """SELECT * FROM foo.bar baz JOIN bar.qux(x, y, z) quux""" ) assert set(tables) == {("foo", "bar", "baz", False), ("bar", "qux", "quux", True)} def test_find_prev_keyword_using(): q = "select * from tbl1 inner join tbl2 using (col1, " kw, q2 = find_prev_keyword(q) assert kw.value == "(" and q2 == "select * from tbl1 inner join tbl2 using (" @pytest.mark.parametrize( "sql", [ "select * from foo where bar", "select * from foo where bar = 1 and baz or ", "select * from foo where bar = 1 and baz between qux and ", ], ) def test_find_prev_keyword_where(sql): kw, stripped = find_prev_keyword(sql) assert kw.value == "where" and stripped == "select * from foo where" @pytest.mark.parametrize( "sql", ["create table foo (bar int, baz ", "select * from foo() as bar (baz "] ) def test_find_prev_keyword_open_parens(sql): kw, _ = find_prev_keyword(sql) assert kw.value == "(" @pytest.mark.parametrize( "sql", [ "", "$$ foo $$", "$$ 'foo' $$", '$$ "foo" $$', "$$ $a$ $$", "$a$ $$ $a$", "foo bar $$ baz $$", ], ) def test_is_open_quote__closed(sql): assert not is_open_quote(sql) @pytest.mark.parametrize( "sql", [ "$$", ";;;$$", "foo $$ bar $$; foo $$", "$$ foo $a$", "foo 'bar baz", "$a$ foo ", '$$ "foo" ', "$$ $a$ ", "foo bar $$ baz", ], ) def test_is_open_quote__open(sql): assert is_open_quote(sql) @pytest.mark.parametrize( ("sql", "warning_level", "expected"), [ ("update abc set x = 1", "all", True), ("update abc set x = 1 where y = 2", "all", True), ("update abc set x = 1", "moderate", True), ("update abc set x = 1 where y = 2", "moderate", False), ("select x, y, z from abc", "all", False), ("drop abc", "all", True), ("alter abc", "all", True), ("delete abc", "all", True), ("truncate abc", "all", True), ], ) def test_is_destructive(sql, warning_level, expected): assert is_destructive(sql, warning_level=warning_level) == expected