import re from textwrap import dedent import psycopg import pytest from unittest.mock import patch, MagicMock from pgspecial.main import PGSpecial, NO_QUERY from utils import run, dbtest, requires_json, requires_jsonb from pgcli.main import PGCli, exception_formatter as main_exception_formatter from pgcli.packages.parseutils.meta import FunctionMetadata def function_meta_data( func_name, schema_name="public", arg_names=None, arg_types=None, arg_modes=None, return_type=None, is_aggregate=False, is_window=False, is_set_returning=False, is_extension=False, arg_defaults=None, ): return FunctionMetadata( schema_name, func_name, arg_names, arg_types, arg_modes, return_type, is_aggregate, is_window, is_set_returning, is_extension, arg_defaults, ) @dbtest def test_conn(executor): run(executor, """create table test(a text)""") run(executor, """insert into test values('abc')""") assert run(executor, """select * from test""", join=True) == dedent( """\ +-----+ | a | |-----| | abc | +-----+ SELECT 1""" ) @dbtest def test_copy(executor): executor_copy = executor.copy() run(executor_copy, """create table test(a text)""") run(executor_copy, """insert into test values('abc')""") assert run(executor_copy, """select * from test""", join=True) == dedent( """\ +-----+ | a | |-----| | abc | +-----+ SELECT 1""" ) @dbtest def test_bools_are_treated_as_strings(executor): run(executor, """create table test(a boolean)""") run(executor, """insert into test values(True)""") assert run(executor, """select * from test""", join=True) == dedent( """\ +------+ | a | |------| | True | +------+ SELECT 1""" ) @dbtest def test_expanded_slash_G(executor, pgspecial): # Tests whether we reset the expanded output after a \G. run(executor, """create table test(a boolean)""") run(executor, """insert into test values(True)""") results = run(executor, r"""select * from test \G""", pgspecial=pgspecial) assert pgspecial.expanded_output == False @dbtest def test_schemata_table_views_and_columns_query(executor): run(executor, "create table a(x text, y text)") run(executor, "create table b(z text)") run(executor, "create view d as select 1 as e") run(executor, "create schema schema1") run(executor, "create table schema1.c (w text DEFAULT 'meow')") run(executor, "create schema schema2") # schemata # don't enforce all members of the schemas since they may include postgres # temporary schemas assert set(executor.schemata()) >= { "public", "pg_catalog", "information_schema", "schema1", "schema2", } assert executor.search_path() == ["pg_catalog", "public"] # tables assert set(executor.tables()) >= { ("public", "a"), ("public", "b"), ("schema1", "c"), } assert set(executor.table_columns()) >= { ("public", "a", "x", "text", False, None), ("public", "a", "y", "text", False, None), ("public", "b", "z", "text", False, None), ("schema1", "c", "w", "text", True, "'meow'::text"), } # views assert set(executor.views()) >= {("public", "d")} assert set(executor.view_columns()) >= { ("public", "d", "e", "integer", False, None) } @dbtest def test_foreign_key_query(executor): run(executor, "create schema schema1") run(executor, "create schema schema2") run(executor, "create table schema1.parent(parentid int PRIMARY KEY)") run( executor, "create table schema2.child(childid int PRIMARY KEY, motherid int REFERENCES schema1.parent)", ) assert set(executor.foreignkeys()) >= { ("schema1", "parent", "parentid", "schema2", "child", "motherid") } @dbtest def test_functions_query(executor): run( executor, """create function func1() returns int language sql as $$select 1$$""", ) run(executor, "create schema schema1") run( executor, """create function schema1.func2() returns int language sql as $$select 2$$""", ) run( executor, """create function func3() returns table(x int, y int) language sql as $$select 1, 2 from generate_series(1,5)$$;""", ) run( executor, """create function func4(x int) returns setof int language sql as $$select generate_series(1,5)$$;""", ) funcs = set(executor.functions()) assert funcs >= { function_meta_data(func_name="func1", return_type="integer"), function_meta_data( func_name="func3", arg_names=["x", "y"], arg_types=["integer", "integer"], arg_modes=["t", "t"], return_type="record", is_set_returning=True, ), function_meta_data( schema_name="public", func_name="func4", arg_names=("x",), arg_types=("integer",), return_type="integer", is_set_returning=True, ), function_meta_data( schema_name="schema1", func_name="func2", return_type="integer" ), } @dbtest def test_datatypes_query(executor): run(executor, "create type foo AS (a int, b text)") types = list(executor.datatypes()) assert types == [("public", "foo")] @dbtest def test_database_list(executor): databases = executor.databases() assert "_test_db" in databases @dbtest def test_invalid_syntax(executor, exception_formatter): result = run( executor, "invalid syntax!", exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=False), ) assert 'syntax error at or near "invalid"' in result[0] assert "SQLSTATE" not in result[0] @dbtest def test_invalid_syntax_verbose(executor): result = run( executor, "invalid syntax!", exception_formatter=lambda x: main_exception_formatter(x, verbose_errors=True), ) fields = r""" Severity: ERROR Severity \(non-localized\): ERROR SQLSTATE code: 42601 Message: syntax error at or near "invalid" Position: 1 File: scan\.l Line: \d+ Routine: scanner_yyerror """.strip() assert re.search(fields, result[0]) @dbtest def test_invalid_column_name(executor, exception_formatter): result = run( executor, "select invalid command", exception_formatter=exception_formatter ) assert 'column "invalid" does not exist' in result[0] @pytest.fixture(params=[True, False]) def expanded(request): return request.param @dbtest def test_unicode_support_in_output(executor, expanded): run(executor, "create table unicodechars(t text)") run(executor, "insert into unicodechars (t) values ('é')") # See issue #24, this raises an exception without proper handling assert "é" in run( executor, "select * from unicodechars", join=True, expanded=expanded ) @dbtest def test_not_is_special(executor, pgspecial): """is_special is set to false for database queries.""" query = "select 1" result = list(executor.run(query, pgspecial=pgspecial)) success, is_special = result[0][5:] assert success == True assert is_special == False @dbtest def test_execute_from_file_no_arg(executor, pgspecial): r"""\i without a filename returns an error.""" result = list(executor.run(r"\i", pgspecial=pgspecial)) status, sql, success, is_special = result[0][3:] assert "missing required argument" in status assert success == False assert is_special == True @dbtest @patch("pgcli.main.os") def test_execute_from_file_io_error(os, executor, pgspecial): r"""\i with an os_error returns an error.""" # Inject an OSError. os.path.expanduser.side_effect = OSError("test") # Check the result. result = list(executor.run(r"\i test", pgspecial=pgspecial)) status, sql, success, is_special = result[0][3:] assert status == "test" assert success == False assert is_special == True @dbtest def test_execute_from_commented_file_that_executes_another_file( executor, pgspecial, tmpdir ): # https://github.com/dbcli/pgcli/issues/1336 sqlfile1 = tmpdir.join("test01.sql") sqlfile1.write("-- asdf \n\\h") sqlfile2 = tmpdir.join("test00.sql") sqlfile2.write("--An useless comment;\nselect now();\n-- another useless comment") rcfile = str(tmpdir.join("rcfile")) print(rcfile) cli = PGCli(pgexecute=executor, pgclirc_file=rcfile) assert cli != None statement = "--comment\n\\h" result = run(executor, statement, pgspecial=cli.pgspecial) assert result != None assert result[0].find("ALTER TABLE") @dbtest def test_execute_commented_first_line_and_special(executor, pgspecial, tmpdir): # just some base cases that should work also statement = "--comment\nselect now();" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("now") >= 0 statement = "/*comment*/\nselect now();" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("now") >= 0 # https://github.com/dbcli/pgcli/issues/1362 statement = "--comment\n\\h" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("ALTER") >= 0 assert result[1].find("ABORT") >= 0 statement = "--comment1\n--comment2\n\\h" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("ALTER") >= 0 assert result[1].find("ABORT") >= 0 statement = "/*comment*/\n\h;" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("ALTER") >= 0 assert result[1].find("ABORT") >= 0 statement = """/*comment1 comment2*/ \h""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("ALTER") >= 0 assert result[1].find("ABORT") >= 0 statement = """/*comment1 comment2*/ /*comment 3 comment4*/ \\h""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("ALTER") >= 0 assert result[1].find("ABORT") >= 0 statement = " /*comment*/\n\h;" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("ALTER") >= 0 assert result[1].find("ABORT") >= 0 statement = "/*comment\ncomment line2*/\n\h;" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("ALTER") >= 0 assert result[1].find("ABORT") >= 0 statement = " /*comment\ncomment line2*/\n\h;" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("ALTER") >= 0 assert result[1].find("ABORT") >= 0 statement = """\\h /*comment4 */""" result = run(executor, statement, pgspecial=pgspecial) print(result) assert result != None assert result[0].find("No help") >= 0 # TODO: we probably don't want to do this but sqlparse is not parsing things well # we relly want it to find help but right now, sqlparse isn't dropping the /*comment*/ # style comments after command statement = """/*comment1*/ \h /*comment4 */""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[0].find("No help") >= 0 # TODO: same for this one statement = """/*comment1 comment3 comment2*/ \\h /*comment4 comment5 comment6*/""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[0].find("No help") >= 0 @dbtest def test_execute_commented_first_line_and_normal(executor, pgspecial, tmpdir): # https://github.com/dbcli/pgcli/issues/1403 # just some base cases that should work also statement = "--comment\nselect now();" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("now") >= 0 statement = "/*comment*/\nselect now();" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[1].find("now") >= 0 # this simulates the original error (1403) without having to add/drop tables # since it was just an error on reading input files and not the actual # command itself # test that the statement works statement = """VALUES (1, 'one'), (2, 'two'), (3, 'three');""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 # test the statement with a \n in the middle statement = """VALUES (1, 'one'),\n (2, 'two'), (3, 'three');""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 # test the statement with a newline in the middle statement = """VALUES (1, 'one'), (2, 'two'), (3, 'three');""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 # now add a single comment line statement = """--comment\nVALUES (1, 'one'), (2, 'two'), (3, 'three');""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 # doing without special char \n statement = """--comment VALUES (1,'one'), (2, 'two'), (3, 'three');""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 # two comment lines statement = """--comment\n--comment2\nVALUES (1,'one'), (2, 'two'), (3, 'three');""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 # doing without special char \n statement = """--comment --comment2 VALUES (1,'one'), (2, 'two'), (3, 'three'); """ result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 # multiline comment + newline in middle of the statement statement = """/*comment comment2 comment3*/ VALUES (1,'one'), (2, 'two'), (3, 'three');""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 # multiline comment + newline in middle of the statement # + comments after the statement statement = """/*comment comment2 comment3*/ VALUES (1,'one'), (2, 'two'), (3, 'three'); --comment4 --comment5""" result = run(executor, statement, pgspecial=pgspecial) assert result != None assert result[5].find("three") >= 0 @dbtest def test_multiple_queries_same_line(executor): result = run(executor, "select 'foo'; select 'bar'") assert len(result) == 12 # 2 * (output+status) * 3 lines assert "foo" in result[3] assert "bar" in result[9] @dbtest def test_multiple_queries_with_special_command_same_line(executor, pgspecial): result = run(executor, r"select 'foo'; \d", pgspecial=pgspecial) assert len(result) == 11 # 2 * (output+status) * 3 lines assert "foo" in result[3] # This is a lame check. :( assert "Schema" in result[7] @dbtest def test_multiple_queries_same_line_syntaxerror(executor, exception_formatter): result = run( executor, "select 'fooé'; invalid syntax é", exception_formatter=exception_formatter, ) assert "fooé" in result[3] assert 'syntax error at or near "invalid"' in result[-1] @pytest.fixture def pgspecial(): return PGCli().pgspecial @dbtest def test_special_command_help(executor, pgspecial): result = run(executor, "\\?", pgspecial=pgspecial)[1].split("|") assert "Command" in result[1] assert "Description" in result[2] @dbtest def test_bytea_field_support_in_output(executor): run(executor, "create table binarydata(c bytea)") run(executor, "insert into binarydata (c) values (decode('DEADBEEF', 'hex'))") assert "\\xdeadbeef" in run(executor, "select * from binarydata", join=True) @dbtest def test_unicode_support_in_unknown_type(executor): assert "日本語" in run(executor, "SELECT '日本語' AS japanese;", join=True) @dbtest def test_unicode_support_in_enum_type(executor): run(executor, "CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy', '日本語')") run(executor, "CREATE TABLE person (name TEXT, current_mood mood)") run(executor, "INSERT INTO person VALUES ('Moe', '日本語')") assert "日本語" in run(executor, "SELECT * FROM person", join=True) @requires_json def test_json_renders_without_u_prefix(executor, expanded): run(executor, "create table jsontest(d json)") run(executor, """insert into jsontest (d) values ('{"name": "Éowyn"}')""") result = run( executor, "SELECT d FROM jsontest LIMIT 1", join=True, expanded=expanded ) assert '{"name": "Éowyn"}' in result @requires_jsonb def test_jsonb_renders_without_u_prefix(executor, expanded): run(executor, "create table jsonbtest(d jsonb)") run(executor, """insert into jsonbtest (d) values ('{"name": "Éowyn"}')""") result = run( executor, "SELECT d FROM jsonbtest LIMIT 1", join=True, expanded=expanded ) assert '{"name": "Éowyn"}' in result @dbtest def test_date_time_types(executor): run(executor, "SET TIME ZONE UTC") assert ( run(executor, "SELECT (CAST('00:00:00' AS time))", join=True).split("\n")[3] == "| 00:00:00 |" ) assert ( run(executor, "SELECT (CAST('00:00:00+14:59' AS timetz))", join=True).split( "\n" )[3] == "| 00:00:00+14:59 |" ) assert ( run(executor, "SELECT (CAST('4713-01-01 BC' AS date))", join=True).split("\n")[ 3 ] == "| 4713-01-01 BC |" ) assert ( run( executor, "SELECT (CAST('4713-01-01 00:00:00 BC' AS timestamp))", join=True ).split("\n")[3] == "| 4713-01-01 00:00:00 BC |" ) assert ( run( executor, "SELECT (CAST('4713-01-01 00:00:00+00 BC' AS timestamptz))", join=True, ).split("\n")[3] == "| 4713-01-01 00:00:00+00 BC |" ) assert ( run( executor, "SELECT (CAST('-123456789 days 12:23:56' AS interval))", join=True ).split("\n")[3] == "| -123456789 days, 12:23:56 |" ) @dbtest @pytest.mark.parametrize("value", ["10000000", "10000000.0", "10000000000000"]) def test_large_numbers_render_directly(executor, value): run(executor, "create table numbertest(a numeric)") run(executor, f"insert into numbertest (a) values ({value})") assert value in run(executor, "select * from numbertest", join=True) @dbtest @pytest.mark.parametrize("command", ["di", "dv", "ds", "df", "dT"]) @pytest.mark.parametrize("verbose", ["", "+"]) @pytest.mark.parametrize("pattern", ["", "x", "*.*", "x.y", "x.*", "*.y"]) def test_describe_special(executor, command, verbose, pattern, pgspecial): # We don't have any tests for the output of any of the special commands, # but we can at least make sure they run without error sql = r"\{command}{verbose} {pattern}".format(**locals()) list(executor.run(sql, pgspecial=pgspecial)) @dbtest @pytest.mark.parametrize("sql", ["invalid sql", "SELECT 1; select error;"]) def test_raises_with_no_formatter(executor, sql): with pytest.raises(psycopg.ProgrammingError): list(executor.run(sql)) @dbtest def test_on_error_resume(executor, exception_formatter): sql = "select 1; error; select 1;" result = list( executor.run(sql, on_error_resume=True, exception_formatter=exception_formatter) ) assert len(result) == 3 @dbtest def test_on_error_stop(executor, exception_formatter): sql = "select 1; error; select 1;" result = list( executor.run( sql, on_error_resume=False, exception_formatter=exception_formatter ) ) assert len(result) == 2 # @dbtest # def test_unicode_notices(executor): # sql = "DO language plpgsql $$ BEGIN RAISE NOTICE '有人更改'; END $$;" # result = list(executor.run(sql)) # assert result[0][0] == u'NOTICE: 有人更改\n' @dbtest def test_nonexistent_function_definition(executor): with pytest.raises(RuntimeError): result = executor.view_definition("there_is_no_such_function") @dbtest def test_function_definition(executor): run( executor, """ CREATE OR REPLACE FUNCTION public.the_number_three() RETURNS int LANGUAGE sql AS $function$ select 3; $function$ """, ) result = executor.function_definition("the_number_three") @dbtest def test_function_notice_order(executor): run( executor, """ CREATE OR REPLACE FUNCTION demo_order() RETURNS VOID AS $$ BEGIN RAISE NOTICE 'first'; RAISE NOTICE 'second'; RAISE NOTICE 'third'; RAISE NOTICE 'fourth'; RAISE NOTICE 'fifth'; RAISE NOTICE 'sixth'; END; $$ LANGUAGE plpgsql; """, ) executor.function_definition("demo_order") result = run(executor, "select demo_order()") assert "first\nsecond\nthird\nfourth\nfifth\nsixth" in result[0] assert "+------------+" in result[1] assert "| demo_order |" in result[2] assert "|------------|" in result[3] assert "| |" in result[4] assert "+------------+" in result[5] assert "SELECT 1" in result[6] @dbtest def test_view_definition(executor): run(executor, "create table tbl1 (a text, b numeric)") run(executor, "create view vw1 AS SELECT * FROM tbl1") run(executor, "create materialized view mvw1 AS SELECT * FROM tbl1") result = executor.view_definition("vw1") assert 'VIEW "public"."vw1" AS' in result assert "FROM tbl1" in result # import pytest; pytest.set_trace() result = executor.view_definition("mvw1") assert "MATERIALIZED VIEW" in result @dbtest def test_nonexistent_view_definition(executor): with pytest.raises(RuntimeError): result = executor.view_definition("there_is_no_such_view") with pytest.raises(RuntimeError): result = executor.view_definition("mvw1") @dbtest def test_short_host(executor): with patch.object(executor, "host", "localhost"): assert executor.short_host == "localhost" with patch.object(executor, "host", "localhost.example.org"): assert executor.short_host == "localhost" with patch.object( executor, "host", "localhost1.example.org,localhost2.example.org" ): assert executor.short_host == "localhost1" with patch.object(executor, "host", "ec2-11-222-333-444.compute-1.amazonaws.com"): assert executor.short_host == "ec2-11-222-333-444" with patch.object(executor, "host", "1.2.3.4"): assert executor.short_host == "1.2.3.4" class VirtualCursor: """Mock a cursor to virtual database like pgbouncer.""" def __init__(self): self.protocol_error = False self.protocol_message = "" self.description = None self.status = None self.statusmessage = "Error" def execute(self, *args, **kwargs): self.protocol_error = True self.protocol_message = "Command not supported" @dbtest def test_exit_without_active_connection(executor): quit_handler = MagicMock() pgspecial = PGSpecial() pgspecial.register( quit_handler, "\\q", "\\q", "Quit pgcli.", arg_type=NO_QUERY, case_sensitive=True, aliases=(":q",), ) with patch.object( executor.conn, "cursor", side_effect=psycopg.InterfaceError("I'm broken!") ): # we should be able to quit the app, even without active connection run(executor, "\\q", pgspecial=pgspecial) quit_handler.assert_called_once() # an exception should be raised when running a query without active connection with pytest.raises(psycopg.InterfaceError): run(executor, "select 1", pgspecial=pgspecial) @dbtest def test_virtual_database(executor): virtual_connection = MagicMock() virtual_connection.cursor.return_value = VirtualCursor() with patch.object(executor, "conn", virtual_connection): result = run(executor, "select 1") assert "Command not supported" in result