From 76d27bc43d56d7ef3ca0090fb199777888adf7c3 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 6 Sep 2021 06:17:09 +0200 Subject: Adding upstream version 3.2.0. Signed-off-by: Daniel Baumann --- tests/parseutils/test_parseutils.py | 59 ++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 24 deletions(-) (limited to 'tests/parseutils/test_parseutils.py') diff --git a/tests/parseutils/test_parseutils.py b/tests/parseutils/test_parseutils.py index 50bc889..5a375d7 100644 --- a/tests/parseutils/test_parseutils.py +++ b/tests/parseutils/test_parseutils.py @@ -1,4 +1,5 @@ import pytest +from pgcli.packages.parseutils import is_destructive from pgcli.packages.parseutils.tables import extract_tables from pgcli.packages.parseutils.utils import find_prev_keyword, is_open_quote @@ -34,12 +35,12 @@ def test_simple_select_single_table_double_quoted(): def test_simple_select_multiple_tables(): tables = extract_tables("select * from abc, def") - assert set(tables) == set([(None, "abc", None, False), (None, "def", None, False)]) + assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} def test_simple_select_multiple_tables_double_quoted(): tables = extract_tables('select * from "Abc", "Def"') - assert set(tables) == set([(None, "Abc", None, False), (None, "Def", None, False)]) + assert set(tables) == {(None, "Abc", None, False), (None, "Def", None, False)} def test_simple_select_single_table_deouble_quoted_aliased(): @@ -49,14 +50,12 @@ def test_simple_select_single_table_deouble_quoted_aliased(): def test_simple_select_multiple_tables_deouble_quoted_aliased(): tables = extract_tables('select * from "Abc" a, "Def" d') - assert set(tables) == set([(None, "Abc", "a", False), (None, "Def", "d", False)]) + assert set(tables) == {(None, "Abc", "a", False), (None, "Def", "d", False)} def test_simple_select_multiple_tables_schema_qualified(): tables = extract_tables("select * from abc.def, ghi.jkl") - assert set(tables) == set( - [("abc", "def", None, False), ("ghi", "jkl", None, False)] - ) + assert set(tables) == {("abc", "def", None, False), ("ghi", "jkl", None, False)} def test_simple_select_with_cols_single_table(): @@ -71,14 +70,12 @@ def test_simple_select_with_cols_single_table_schema_qualified(): def test_simple_select_with_cols_multiple_tables(): tables = extract_tables("select a,b from abc, def") - assert set(tables) == set([(None, "abc", None, False), (None, "def", None, False)]) + assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} def test_simple_select_with_cols_multiple_qualified_tables(): tables = extract_tables("select a,b from abc.def, def.ghi") - assert set(tables) == set( - [("abc", "def", None, False), ("def", "ghi", None, False)] - ) + assert set(tables) == {("abc", "def", None, False), ("def", "ghi", None, False)} def test_select_with_hanging_comma_single_table(): @@ -88,14 +85,12 @@ def test_select_with_hanging_comma_single_table(): def test_select_with_hanging_comma_multiple_tables(): tables = extract_tables("select a, from abc, def") - assert set(tables) == set([(None, "abc", None, False), (None, "def", None, False)]) + assert set(tables) == {(None, "abc", None, False), (None, "def", None, False)} def test_select_with_hanging_period_multiple_tables(): tables = extract_tables("SELECT t1. FROM tabl1 t1, tabl2 t2") - assert set(tables) == set( - [(None, "tabl1", "t1", False), (None, "tabl2", "t2", False)] - ) + assert set(tables) == {(None, "tabl1", "t1", False), (None, "tabl2", "t2", False)} def test_simple_insert_single_table(): @@ -126,14 +121,14 @@ def test_simple_update_table_with_schema(): @pytest.mark.parametrize("join_type", ["", "INNER", "LEFT", "RIGHT OUTER"]) def test_join_table(join_type): - sql = "SELECT * FROM abc a {0} JOIN def d ON a.id = d.num".format(join_type) + sql = f"SELECT * FROM abc a {join_type} JOIN def d ON a.id = d.num" tables = extract_tables(sql) - assert set(tables) == set([(None, "abc", "a", False), (None, "def", "d", False)]) + assert set(tables) == {(None, "abc", "a", False), (None, "def", "d", False)} def test_join_table_schema_qualified(): tables = extract_tables("SELECT * FROM abc.def x JOIN ghi.jkl y ON x.id = y.num") - assert set(tables) == set([("abc", "def", "x", False), ("ghi", "jkl", "y", False)]) + assert set(tables) == {("abc", "def", "x", False), ("ghi", "jkl", "y", False)} def test_incomplete_join_clause(): @@ -177,25 +172,25 @@ def test_extract_no_tables(text): @pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) def test_simple_function_as_table(arg_list): - tables = extract_tables("SELECT * FROM foo({0})".format(arg_list)) + tables = extract_tables(f"SELECT * FROM foo({arg_list})") assert tables == ((None, "foo", None, True),) @pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) def test_simple_schema_qualified_function_as_table(arg_list): - tables = extract_tables("SELECT * FROM foo.bar({0})".format(arg_list)) + tables = extract_tables(f"SELECT * FROM foo.bar({arg_list})") assert tables == (("foo", "bar", None, True),) @pytest.mark.parametrize("arg_list", ["", "arg1", "arg1, arg2, arg3"]) def test_simple_aliased_function_as_table(arg_list): - tables = extract_tables("SELECT * FROM foo({0}) bar".format(arg_list)) + tables = extract_tables(f"SELECT * FROM foo({arg_list}) bar") assert tables == ((None, "foo", "bar", True),) def test_simple_table_and_function(): tables = extract_tables("SELECT * FROM foo JOIN bar()") - assert set(tables) == set([(None, "foo", None, False), (None, "bar", None, True)]) + assert set(tables) == {(None, "foo", None, False), (None, "bar", None, True)} def test_complex_table_and_function(): @@ -203,9 +198,7 @@ def test_complex_table_and_function(): """SELECT * FROM foo.bar baz JOIN bar.qux(x, y, z) quux""" ) - assert set(tables) == set( - [("foo", "bar", "baz", False), ("bar", "qux", "quux", True)] - ) + assert set(tables) == {("foo", "bar", "baz", False), ("bar", "qux", "quux", True)} def test_find_prev_keyword_using(): @@ -267,3 +260,21 @@ def test_is_open_quote__closed(sql): ) def test_is_open_quote__open(sql): assert is_open_quote(sql) + + +@pytest.mark.parametrize( + ("sql", "warning_level", "expected"), + [ + ("update abc set x = 1", "all", True), + ("update abc set x = 1 where y = 2", "all", True), + ("update abc set x = 1", "moderate", True), + ("update abc set x = 1 where y = 2", "moderate", False), + ("select x, y, z from abc", "all", False), + ("drop abc", "all", True), + ("alter abc", "all", True), + ("delete abc", "all", True), + ("truncate abc", "all", True), + ], +) +def test_is_destructive(sql, warning_level, expected): + assert is_destructive(sql, warning_level=warning_level) == expected -- cgit v1.2.3