summaryrefslogtreecommitdiffstats
path: root/tests/test_pgexecute.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_pgexecute.py')
-rw-r--r--tests/test_pgexecute.py140
1 files changed, 83 insertions, 57 deletions
diff --git a/tests/test_pgexecute.py b/tests/test_pgexecute.py
index 9273be9..109674c 100644
--- a/tests/test_pgexecute.py
+++ b/tests/test_pgexecute.py
@@ -2,7 +2,7 @@ from textwrap import dedent
import psycopg2
import pytest
-from mock import patch, MagicMock
+from unittest.mock import patch, MagicMock
from pgspecial.main import PGSpecial, NO_QUERY
from utils import run, dbtest, requires_json, requires_jsonb
@@ -89,7 +89,7 @@ def test_expanded_slash_G(executor, pgspecial):
# Tests whether we reset the expanded output after a \G.
run(executor, """create table test(a boolean)""")
run(executor, """insert into test values(True)""")
- results = run(executor, """select * from test \G""", pgspecial=pgspecial)
+ results = run(executor, r"""select * from test \G""", pgspecial=pgspecial)
assert pgspecial.expanded_output == False
@@ -105,31 +105,35 @@ def test_schemata_table_views_and_columns_query(executor):
# schemata
# don't enforce all members of the schemas since they may include postgres
# temporary schemas
- assert set(executor.schemata()) >= set(
- ["public", "pg_catalog", "information_schema", "schema1", "schema2"]
- )
+ assert set(executor.schemata()) >= {
+ "public",
+ "pg_catalog",
+ "information_schema",
+ "schema1",
+ "schema2",
+ }
assert executor.search_path() == ["pg_catalog", "public"]
# tables
- assert set(executor.tables()) >= set(
- [("public", "a"), ("public", "b"), ("schema1", "c")]
- )
-
- assert set(executor.table_columns()) >= set(
- [
- ("public", "a", "x", "text", False, None),
- ("public", "a", "y", "text", False, None),
- ("public", "b", "z", "text", False, None),
- ("schema1", "c", "w", "text", True, "'meow'::text"),
- ]
- )
+ assert set(executor.tables()) >= {
+ ("public", "a"),
+ ("public", "b"),
+ ("schema1", "c"),
+ }
+
+ assert set(executor.table_columns()) >= {
+ ("public", "a", "x", "text", False, None),
+ ("public", "a", "y", "text", False, None),
+ ("public", "b", "z", "text", False, None),
+ ("schema1", "c", "w", "text", True, "'meow'::text"),
+ }
# views
- assert set(executor.views()) >= set([("public", "d")])
+ assert set(executor.views()) >= {("public", "d")}
- assert set(executor.view_columns()) >= set(
- [("public", "d", "e", "integer", False, None)]
- )
+ assert set(executor.view_columns()) >= {
+ ("public", "d", "e", "integer", False, None)
+ }
@dbtest
@@ -142,9 +146,9 @@ def test_foreign_key_query(executor):
"create table schema2.child(childid int PRIMARY KEY, motherid int REFERENCES schema1.parent)",
)
- assert set(executor.foreignkeys()) >= set(
- [("schema1", "parent", "parentid", "schema2", "child", "motherid")]
- )
+ assert set(executor.foreignkeys()) >= {
+ ("schema1", "parent", "parentid", "schema2", "child", "motherid")
+ }
@dbtest
@@ -175,30 +179,28 @@ def test_functions_query(executor):
)
funcs = set(executor.functions())
- assert funcs >= set(
- [
- function_meta_data(func_name="func1", return_type="integer"),
- function_meta_data(
- func_name="func3",
- arg_names=["x", "y"],
- arg_types=["integer", "integer"],
- arg_modes=["t", "t"],
- return_type="record",
- is_set_returning=True,
- ),
- function_meta_data(
- schema_name="public",
- func_name="func4",
- arg_names=("x",),
- arg_types=("integer",),
- return_type="integer",
- is_set_returning=True,
- ),
- function_meta_data(
- schema_name="schema1", func_name="func2", return_type="integer"
- ),
- ]
- )
+ assert funcs >= {
+ function_meta_data(func_name="func1", return_type="integer"),
+ function_meta_data(
+ func_name="func3",
+ arg_names=["x", "y"],
+ arg_types=["integer", "integer"],
+ arg_modes=["t", "t"],
+ return_type="record",
+ is_set_returning=True,
+ ),
+ function_meta_data(
+ schema_name="public",
+ func_name="func4",
+ arg_names=("x",),
+ arg_types=("integer",),
+ return_type="integer",
+ is_set_returning=True,
+ ),
+ function_meta_data(
+ schema_name="schema1", func_name="func2", return_type="integer"
+ ),
+ }
@dbtest
@@ -257,8 +259,8 @@ def test_not_is_special(executor, pgspecial):
@dbtest
def test_execute_from_file_no_arg(executor, pgspecial):
- """\i without a filename returns an error."""
- result = list(executor.run("\i", pgspecial=pgspecial))
+ r"""\i without a filename returns an error."""
+ result = list(executor.run(r"\i", pgspecial=pgspecial))
status, sql, success, is_special = result[0][3:]
assert "missing required argument" in status
assert success == False
@@ -268,12 +270,12 @@ def test_execute_from_file_no_arg(executor, pgspecial):
@dbtest
@patch("pgcli.main.os")
def test_execute_from_file_io_error(os, executor, pgspecial):
- """\i with an io_error returns an error."""
- # Inject an IOError.
- os.path.expanduser.side_effect = IOError("test")
+ r"""\i with an os_error returns an error."""
+ # Inject an OSError.
+ os.path.expanduser.side_effect = OSError("test")
# Check the result.
- result = list(executor.run("\i test", pgspecial=pgspecial))
+ result = list(executor.run(r"\i test", pgspecial=pgspecial))
status, sql, success, is_special = result[0][3:]
assert status == "test"
assert success == False
@@ -290,7 +292,7 @@ def test_multiple_queries_same_line(executor):
@dbtest
def test_multiple_queries_with_special_command_same_line(executor, pgspecial):
- result = run(executor, "select 'foo'; \d", pgspecial=pgspecial)
+ result = run(executor, r"select 'foo'; \d", pgspecial=pgspecial)
assert len(result) == 11 # 2 * (output+status) * 3 lines
assert "foo" in result[3]
# This is a lame check. :(
@@ -408,7 +410,7 @@ def test_date_time_types(executor):
@pytest.mark.parametrize("value", ["10000000", "10000000.0", "10000000000000"])
def test_large_numbers_render_directly(executor, value):
run(executor, "create table numbertest(a numeric)")
- run(executor, "insert into numbertest (a) values ({0})".format(value))
+ run(executor, f"insert into numbertest (a) values ({value})")
assert value in run(executor, "select * from numbertest", join=True)
@@ -511,13 +513,28 @@ def test_short_host(executor):
assert executor.short_host == "localhost1"
-class BrokenConnection(object):
+class BrokenConnection:
"""Mock a connection that failed."""
def cursor(self):
raise psycopg2.InterfaceError("I'm broken!")
+class VirtualCursor:
+ """Mock a cursor to virtual database like pgbouncer."""
+
+ def __init__(self):
+ self.protocol_error = False
+ self.protocol_message = ""
+ self.description = None
+ self.status = None
+ self.statusmessage = "Error"
+
+ def execute(self, *args, **kwargs):
+ self.protocol_error = True
+ self.protocol_message = "Command not supported"
+
+
@dbtest
def test_exit_without_active_connection(executor):
quit_handler = MagicMock()
@@ -540,3 +557,12 @@ def test_exit_without_active_connection(executor):
# an exception should be raised when running a query without active connection
with pytest.raises(psycopg2.InterfaceError):
run(executor, "select 1", pgspecial=pgspecial)
+
+
+@dbtest
+def test_virtual_database(executor):
+ virtual_connection = MagicMock()
+ virtual_connection.cursor.return_value = VirtualCursor()
+ with patch.object(executor, "conn", virtual_connection):
+ result = run(executor, "select 1")
+ assert "Command not supported" in result