summaryrefslogtreecommitdiffstats
path: root/tests/parseutils/test_parseutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/parseutils/test_parseutils.py')
-rw-r--r--tests/parseutils/test_parseutils.py310
1 files changed, 310 insertions, 0 deletions
diff --git a/tests/parseutils/test_parseutils.py b/tests/parseutils/test_parseutils.py
new file mode 100644
index 0000000..349cbd0
--- /dev/null
+++ b/tests/parseutils/test_parseutils.py
@@ -0,0 +1,310 @@
+import pytest
+from pgcli.packages.parseutils import (
+ is_destructive,
+ parse_destructive_warning,
+ BASE_KEYWORDS,
+ ALL_KEYWORDS,
+)
+from pgcli.packages.parseutils.tables import extract_tables
+from pgcli.packages.parseutils.utils import find_prev_keyword, is_open_quote
+
+
+def test_empty_string():
+ tables = extract_tables("")
+ assert tables == ()
+
+
+def test_simple_select_single_table():
+ tables = extract_tables("select * from abc")
+ assert tables == ((None, "abc", None, False),)
+
+
+@pytest.mark.parametrize(
+ "sql", ['select * from "abc"."def"', 'select * from abc."def"']
+)
+def test_simple_select_single_table_schema_qualified_quoted_table(sql):
+ tables = extract_tables(sql)
+ assert tables == (("abc", "def", '"def"', False),)
+
+
+@pytest.mark.parametrize("sql", ["select * from abc.def", 'select * from "abc".def'])
+def test_simple_select_single_table_schema_qualified(sql):
+ tables = extract_tables(sql)
+ assert tables == (("abc", "def", None, False),)
+
+
+def test_simple_select_single_table_double_quoted():
+ tables = extract_tables('select * from "Abc"')
+ assert tables == ((None, "Abc", None, False),)
+
+
+def test_simple_select_multiple_tables():
+ tables = extract_tables("select * from abc, def")
+ assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)}
+
+
+def test_simple_select_multiple_tables_double_quoted():
+ tables = extract_tables('select * from "Abc", "Def"')
+ assert set(tables) == {(None, "Abc", None, False), (None, "Def", None, False)}
+
+
+def test_simple_select_single_table_deouble_quoted_aliased():
+ tables = extract_tables('select * from "Abc" a')
+ assert tables == ((None, "Abc", "a", False),)
+
+
+def test_simple_select_multiple_tables_deouble_quoted_aliased():
+ tables = extract_tables('select * from "Abc" a, "Def" d')
+ assert set(tables) == {(None, "Abc", "a", False), (None, "Def", "d", False)}
+
+
+def test_simple_select_multiple_tables_schema_qualified():
+ tables = extract_tables("select * from abc.def, ghi.jkl")
+ assert set(tables) == {("abc", "def", None, False), ("ghi", "jkl", None, False)}
+
+
+def test_simple_select_with_cols_single_table():
+ tables = extract_tables("select a,b from abc")
+ assert tables == ((None, "abc", None, False),)
+
+
+def test_simple_select_with_cols_single_table_schema_qualified():
+ tables = extract_tables("select a,b from abc.def")
+ assert tables == (("abc", "def", None, False),)
+
+
+def test_simple_select_with_cols_multiple_tables():
+ tables = extract_tables("select a,b from abc, def")
+ assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)}
+
+
+def test_simple_select_with_cols_multiple_qualified_tables():
+ tables = extract_tables("select a,b from abc.def, def.ghi")
+ assert set(tables) == {("abc", "def", None, False), ("def", "ghi", None, False)}
+
+
+def test_select_with_hanging_comma_single_table():
+ tables = extract_tables("select a, from abc")
+ assert tables == ((None, "abc", None, False),)
+
+
+def test_select_with_hanging_comma_multiple_tables():
+ tables = extract_tables("select a, from abc, def")
+ assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)}
+
+
+def test_select_with_hanging_period_multiple_tables():
+ tables = extract_tables("SELECT t1. FROM tabl1 t1, tabl2 t2")
+ assert set(tables) == {(None, "tabl1", "t1", False), (None, "tabl2", "t2", False)}
+
+
+def test_simple_insert_single_table():
+ tables = extract_tables('insert into abc (id, name) values (1, "def")')
+
+ # sqlparse mistakenly assigns an alias to the table
+ # AND mistakenly identifies the field list as
+ # assert tables == ((None, 'abc', 'abc', False),)
+
+ assert tables == ((None, "abc", "abc", False),)
+
+
+@pytest.mark.xfail
+def test_simple_insert_single_table_schema_qualified():
+ tables = extract_tables('insert into abc.def (id, name) values (1, "def")')
+ assert tables == (("abc", "def", None, False),)
+
+
+def test_simple_update_table_no_schema():
+ tables = extract_tables("update abc set id = 1")
+ assert tables == ((None, "abc", None, False),)
+
+
+def test_simple_update_table_with_schema():
+ tables = extract_tables("update abc.def set id = 1")
+ assert tables == (("abc", "def", None, False),)
+
+
+@pytest.mark.parametrize("join_type", ["", "INNER", "LEFT", "RIGHT OUTER"])
+def test_join_table(join_type):
+ sql = f"SELECT * FROM abc a {join_type} JOIN def d ON a.id = d.num"
+ tables = extract_tables(sql)
+ assert set(tables) == {(None, "abc", "a", False), (None, "def", "d", False)}
+
+
+def test_join_table_schema_qualified():
+ tables = extract_tables("SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num")
+ assert set(tables) == {("abc", "def", "x", False), ("ghi", "jkl", "y", False)}
+
+
+def test_incomplete_join_clause():
+ sql = """select a.x, b.y
+ from abc a join bcd b
+ on a.id = """
+ tables = extract_tables(sql)
+ assert tables == ((None, "abc", "a", False), (None, "bcd", "b", False))
+
+
+def test_join_as_table():
+ tables = extract_tables("SELECT * FROM my_table AS m WHERE m.a > 5")
+ assert tables == ((None, "my_table", "m", False),)
+
+
+def test_multiple_joins():
+ sql = """select * from t1
+ inner join t2 ON
+ t1.id = t2.t1_id
+ inner join t3 ON
+ t2.id = t3."""
+ tables = extract_tables(sql)
+ assert tables == (
+ (None, "t1", None, False),
+ (None, "t2", None, False),
+ (None, "t3", None, False),
+ )
+
+
+def test_subselect_tables():
+ sql = "SELECT * FROM (SELECT FROM abc"
+ tables = extract_tables(sql)
+ assert tables == ((None, "abc", None, False),)
+
+
+@pytest.mark.parametrize("text", ["SELECT * FROM foo.", "SELECT 123 AS foo"])
+def test_extract_no_tables(text):
+ tables = extract_tables(text)
+ assert tables == tuple()
+
+
+@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"])
+def test_simple_function_as_table(arg_list):
+ tables = extract_tables(f"SELECT * FROM foo({arg_list})")
+ assert tables == ((None, "foo", None, True),)
+
+
+@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"])
+def test_simple_schema_qualified_function_as_table(arg_list):
+ tables = extract_tables(f"SELECT * FROM foo.bar({arg_list})")
+ assert tables == (("foo", "bar", None, True),)
+
+
+@pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"])
+def test_simple_aliased_function_as_table(arg_list):
+ tables = extract_tables(f"SELECT * FROM foo({arg_list}) bar")
+ assert tables == ((None, "foo", "bar", True),)
+
+
+def test_simple_table_and_function():
+ tables = extract_tables("SELECT * FROM foo JOIN bar()")
+ assert set(tables) == {(None, "foo", None, False), (None, "bar", None, True)}
+
+
+def test_complex_table_and_function():
+ tables = extract_tables(
+ """SELECT * FROM foo.bar baz
+ JOIN bar.qux(x, y, z) quux"""
+ )
+ assert set(tables) == {("foo", "bar", "baz", False), ("bar", "qux", "quux", True)}
+
+
+def test_find_prev_keyword_using():
+ q = "select * from tbl1 inner join tbl2 using (col1, "
+ kw, q2 = find_prev_keyword(q)
+ assert kw.value == "(" and q2 == "select * from tbl1 inner join tbl2 using ("
+
+
+@pytest.mark.parametrize(
+ "sql",
+ [
+ "select * from foo where bar",
+ "select * from foo where bar = 1 and baz or ",
+ "select * from foo where bar = 1 and baz between qux and ",
+ ],
+)
+def test_find_prev_keyword_where(sql):
+ kw, stripped = find_prev_keyword(sql)
+ assert kw.value == "where" and stripped == "select * from foo where"
+
+
+@pytest.mark.parametrize(
+ "sql", ["create table foo (bar int, baz ", "select * from foo() as bar (baz "]
+)
+def test_find_prev_keyword_open_parens(sql):
+ kw, _ = find_prev_keyword(sql)
+ assert kw.value == "("
+
+
+@pytest.mark.parametrize(
+ "sql",
+ [
+ "",
+ "$$ foo $$",
+ "$$ 'foo' $$",
+ '$$ "foo" $$',
+ "$$ $a$ $$",
+ "$a$ $$ $a$",
+ "foo bar $$ baz $$",
+ ],
+)
+def test_is_open_quote__closed(sql):
+ assert not is_open_quote(sql)
+
+
+@pytest.mark.parametrize(
+ "sql",
+ [
+ "$$",
+ ";;;$$",
+ "foo $$ bar $$; foo $$",
+ "$$ foo $a$",
+ "foo 'bar baz",
+ "$a$ foo ",
+ '$$ "foo" ',
+ "$$ $a$ ",
+ "foo bar $$ baz",
+ ],
+)
+def test_is_open_quote__open(sql):
+ assert is_open_quote(sql)
+
+
+@pytest.mark.parametrize(
+ ("sql", "keywords", "expected"),
+ [
+ ("update abc set x = 1", ALL_KEYWORDS, True),
+ ("update abc set x = 1 where y = 2", ALL_KEYWORDS, True),
+ ("update abc set x = 1", BASE_KEYWORDS, True),
+ ("update abc set x = 1 where y = 2", BASE_KEYWORDS, False),
+ ("select x, y, z from abc", ALL_KEYWORDS, False),
+ ("drop abc", ALL_KEYWORDS, True),
+ ("alter abc", ALL_KEYWORDS, True),
+ ("delete abc", ALL_KEYWORDS, True),
+ ("truncate abc", ALL_KEYWORDS, True),
+ ("insert into abc values (1, 2, 3)", ALL_KEYWORDS, False),
+ ("insert into abc values (1, 2, 3)", BASE_KEYWORDS, False),
+ ("insert into abc values (1, 2, 3)", ["insert"], True),
+ ("insert into abc values (1, 2, 3)", ["insert"], True),
+ ],
+)
+def test_is_destructive(sql, keywords, expected):
+ assert is_destructive(sql, keywords) == expected
+
+
+@pytest.mark.parametrize(
+ ("warning_level", "expected"),
+ [
+ ("true", ALL_KEYWORDS),
+ ("false", []),
+ ("all", ALL_KEYWORDS),
+ ("moderate", BASE_KEYWORDS),
+ ("off", []),
+ ("", []),
+ (None, []),
+ (ALL_KEYWORDS, ALL_KEYWORDS),
+ (BASE_KEYWORDS, BASE_KEYWORDS),
+ ("insert", ["insert"]),
+ ("drop,alter,delete", ["drop", "alter", "delete"]),
+ (["drop", "alter", "delete"], ["drop", "alter", "delete"]),
+ ],
+)
+def test_parse_destructive_warning(warning_level, expected):
+ assert parse_destructive_warning(warning_level) == expected