From 9b77fda0d4171f68760c070895dc5700cb6d1e0f Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 6 Sep 2021 06:17:12 +0200 Subject: Merging upstream version 3.2.0. Signed-off-by: Daniel Baumann --- tests/test_pgexecute.py | 140 ++++++++++++++++++++++++++++-------------------- 1 file changed, 83 insertions(+), 57 deletions(-) (limited to 'tests/test_pgexecute.py') diff --git a/tests/test_pgexecute.py b/tests/test_pgexecute.py index 9273be9..109674c 100644 --- a/tests/test_pgexecute.py +++ b/tests/test_pgexecute.py @@ -2,7 +2,7 @@ from textwrap import dedent import psycopg2 import pytest -from mock import patch, MagicMock +from unittest.mock import patch, MagicMock from pgspecial.main import PGSpecial, NO_QUERY from utils import run, dbtest, requires_json, requires_jsonb @@ -89,7 +89,7 @@ def test_expanded_slash_G(executor, pgspecial): # Tests whether we reset the expanded output after a \G. run(executor, """create table test(a boolean)""") run(executor, """insert into test values(True)""") - results = run(executor, """select * from test \G""", pgspecial=pgspecial) + results = run(executor, r"""select * from test \G""", pgspecial=pgspecial) assert pgspecial.expanded_output == False @@ -105,31 +105,35 @@ def test_schemata_table_views_and_columns_query(executor): # schemata # don't enforce all members of the schemas since they may include postgres # temporary schemas - assert set(executor.schemata()) >= set( - ["public", "pg_catalog", "information_schema", "schema1", "schema2"] - ) + assert set(executor.schemata()) >= { + "public", + "pg_catalog", + "information_schema", + "schema1", + "schema2", + } assert executor.search_path() == ["pg_catalog", "public"] # tables - assert set(executor.tables()) >= set( - [("public", "a"), ("public", "b"), ("schema1", "c")] - ) - - assert set(executor.table_columns()) >= set( - [ - ("public", "a", "x", "text", False, None), - ("public", "a", "y", "text", False, None), - ("public", "b", "z", "text", False, None), - ("schema1", "c", "w", "text", True, "'meow'::text"), - ] - ) + assert set(executor.tables()) >= { + ("public", "a"), + ("public", "b"), + ("schema1", "c"), + } + + assert set(executor.table_columns()) >= { + ("public", "a", "x", "text", False, None), + ("public", "a", "y", "text", False, None), + ("public", "b", "z", "text", False, None), + ("schema1", "c", "w", "text", True, "'meow'::text"), + } # views - assert set(executor.views()) >= set([("public", "d")]) + assert set(executor.views()) >= {("public", "d")} - assert set(executor.view_columns()) >= set( - [("public", "d", "e", "integer", False, None)] - ) + assert set(executor.view_columns()) >= { + ("public", "d", "e", "integer", False, None) + } @dbtest @@ -142,9 +146,9 @@ def test_foreign_key_query(executor): "create table schema2.child(childid int PRIMARY KEY, motherid int REFERENCES schema1.parent)", ) - assert set(executor.foreignkeys()) >= set( - [("schema1", "parent", "parentid", "schema2", "child", "motherid")] - ) + assert set(executor.foreignkeys()) >= { + ("schema1", "parent", "parentid", "schema2", "child", "motherid") + } @dbtest @@ -175,30 +179,28 @@ def test_functions_query(executor): ) funcs = set(executor.functions()) - assert funcs >= set( - [ - function_meta_data(func_name="func1", return_type="integer"), - function_meta_data( - func_name="func3", - arg_names=["x", "y"], - arg_types=["integer", "integer"], - arg_modes=["t", "t"], - return_type="record", - is_set_returning=True, - ), - function_meta_data( - schema_name="public", - func_name="func4", - arg_names=("x",), - arg_types=("integer",), - return_type="integer", - is_set_returning=True, - ), - function_meta_data( - schema_name="schema1", func_name="func2", return_type="integer" - ), - ] - ) + assert funcs >= { + function_meta_data(func_name="func1", return_type="integer"), + function_meta_data( + func_name="func3", + arg_names=["x", "y"], + arg_types=["integer", "integer"], + arg_modes=["t", "t"], + return_type="record", + is_set_returning=True, + ), + function_meta_data( + schema_name="public", + func_name="func4", + arg_names=("x",), + arg_types=("integer",), + return_type="integer", + is_set_returning=True, + ), + function_meta_data( + schema_name="schema1", func_name="func2", return_type="integer" + ), + } @dbtest @@ -257,8 +259,8 @@ def test_not_is_special(executor, pgspecial): @dbtest def test_execute_from_file_no_arg(executor, pgspecial): - """\i without a filename returns an error.""" - result = list(executor.run("\i", pgspecial=pgspecial)) + r"""\i without a filename returns an error.""" + result = list(executor.run(r"\i", pgspecial=pgspecial)) status, sql, success, is_special = result[0][3:] assert "missing required argument" in status assert success == False @@ -268,12 +270,12 @@ def test_execute_from_file_no_arg(executor, pgspecial): @dbtest @patch("pgcli.main.os") def test_execute_from_file_io_error(os, executor, pgspecial): - """\i with an io_error returns an error.""" - # Inject an IOError. - os.path.expanduser.side_effect = IOError("test") + r"""\i with an os_error returns an error.""" + # Inject an OSError. + os.path.expanduser.side_effect = OSError("test") # Check the result. - result = list(executor.run("\i test", pgspecial=pgspecial)) + result = list(executor.run(r"\i test", pgspecial=pgspecial)) status, sql, success, is_special = result[0][3:] assert status == "test" assert success == False @@ -290,7 +292,7 @@ def test_multiple_queries_same_line(executor): @dbtest def test_multiple_queries_with_special_command_same_line(executor, pgspecial): - result = run(executor, "select 'foo'; \d", pgspecial=pgspecial) + result = run(executor, r"select 'foo'; \d", pgspecial=pgspecial) assert len(result) == 11 # 2 * (output+status) * 3 lines assert "foo" in result[3] # This is a lame check. :( @@ -408,7 +410,7 @@ def test_date_time_types(executor): @pytest.mark.parametrize("value", ["10000000", "10000000.0", "10000000000000"]) def test_large_numbers_render_directly(executor, value): run(executor, "create table numbertest(a numeric)") - run(executor, "insert into numbertest (a) values ({0})".format(value)) + run(executor, f"insert into numbertest (a) values ({value})") assert value in run(executor, "select * from numbertest", join=True) @@ -511,13 +513,28 @@ def test_short_host(executor): assert executor.short_host == "localhost1" -class BrokenConnection(object): +class BrokenConnection: """Mock a connection that failed.""" def cursor(self): raise psycopg2.InterfaceError("I'm broken!") +class VirtualCursor: + """Mock a cursor to virtual database like pgbouncer.""" + + def __init__(self): + self.protocol_error = False + self.protocol_message = "" + self.description = None + self.status = None + self.statusmessage = "Error" + + def execute(self, *args, **kwargs): + self.protocol_error = True + self.protocol_message = "Command not supported" + + @dbtest def test_exit_without_active_connection(executor): quit_handler = MagicMock() @@ -540,3 +557,12 @@ def test_exit_without_active_connection(executor): # an exception should be raised when running a query without active connection with pytest.raises(psycopg2.InterfaceError): run(executor, "select 1", pgspecial=pgspecial) + + +@dbtest +def test_virtual_database(executor): + virtual_connection = MagicMock() + virtual_connection.cursor.return_value = VirtualCursor() + with patch.object(executor, "conn", virtual_connection): + result = run(executor, "select 1") + assert "Command not supported" in result -- cgit v1.2.3