diff options
Diffstat (limited to 'tests/pq')
-rw-r--r-- | tests/pq/__init__.py | 0 | ||||
-rw-r--r-- | tests/pq/test_async.py | 210 | ||||
-rw-r--r-- | tests/pq/test_conninfo.py | 48 | ||||
-rw-r--r-- | tests/pq/test_copy.py | 174 | ||||
-rw-r--r-- | tests/pq/test_escaping.py | 188 | ||||
-rw-r--r-- | tests/pq/test_exec.py | 146 | ||||
-rw-r--r-- | tests/pq/test_misc.py | 83 | ||||
-rw-r--r-- | tests/pq/test_pgconn.py | 585 | ||||
-rw-r--r-- | tests/pq/test_pgresult.py | 207 | ||||
-rw-r--r-- | tests/pq/test_pipeline.py | 161 | ||||
-rw-r--r-- | tests/pq/test_pq.py | 57 |
11 files changed, 1859 insertions, 0 deletions
diff --git a/tests/pq/__init__.py b/tests/pq/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/pq/__init__.py diff --git a/tests/pq/test_async.py b/tests/pq/test_async.py new file mode 100644 index 0000000..2c3de98 --- /dev/null +++ b/tests/pq/test_async.py @@ -0,0 +1,210 @@ +from select import select + +import pytest + +import psycopg +from psycopg import pq +from psycopg.generators import execute + + +def execute_wait(pgconn): + return psycopg.waiting.wait(execute(pgconn), pgconn.socket) + + +def test_send_query(pgconn): + # This test shows how to process an async query in all its glory + pgconn.nonblocking = 1 + + # Long query to make sure we have to wait on send + pgconn.send_query( + b"/* %s */ select 'x' as f from pg_sleep(0.01); select 1 as foo;" + % (b"x" * 1_000_000) + ) + + # send loop + waited_on_send = 0 + while True: + f = pgconn.flush() + if f == 0: + break + + waited_on_send += 1 + + rl, wl, xl = select([pgconn.socket], [pgconn.socket], []) + assert not (rl and wl) + if wl: + continue # call flush again() + if rl: + pgconn.consume_input() + continue + + # TODO: this check is not reliable, it fails on travis sometimes + # assert waited_on_send + + # read loop + results = [] + while True: + pgconn.consume_input() + if pgconn.is_busy(): + select([pgconn.socket], [], []) + continue + res = pgconn.get_result() + if res is None: + break + assert res.status == pq.ExecStatus.TUPLES_OK + results.append(res) + + assert len(results) == 2 + assert results[0].nfields == 1 + assert results[0].fname(0) == b"f" + assert results[0].get_value(0, 0) == b"x" + assert results[1].nfields == 1 + assert results[1].fname(0) == b"foo" + assert results[1].get_value(0, 0) == b"1" + + +def test_send_query_compact_test(pgconn): + # Like the above test but use psycopg facilities for compactness + pgconn.send_query( + b"/* %s */ select 'x' as f from pg_sleep(0.01); select 1 as foo;" + % (b"x" * 1_000_000) + ) + results = execute_wait(pgconn) + + assert len(results) == 2 + assert results[0].nfields == 1 + assert results[0].fname(0) == b"f" + assert results[0].get_value(0, 0) == b"x" + assert results[1].nfields == 1 + assert results[1].fname(0) == b"foo" + assert results[1].get_value(0, 0) == b"1" + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.send_query(b"select 1") + + +def test_single_row_mode(pgconn): + pgconn.send_query(b"select generate_series(1,2)") + pgconn.set_single_row_mode() + + results = execute_wait(pgconn) + assert len(results) == 3 + + res = results[0] + assert res.status == pq.ExecStatus.SINGLE_TUPLE + assert res.ntuples == 1 + assert res.get_value(0, 0) == b"1" + + res = results[1] + assert res.status == pq.ExecStatus.SINGLE_TUPLE + assert res.ntuples == 1 + assert res.get_value(0, 0) == b"2" + + res = results[2] + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.ntuples == 0 + + +def test_send_query_params(pgconn): + pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"]) + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == b"8" + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.send_query_params(b"select $1", [b"1"]) + + +def test_send_prepare(pgconn): + pgconn.send_prepare(b"prep", b"select $1::int + $2::int") + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + pgconn.send_query_prepared(b"prep", [b"3", b"5"]) + (res,) = execute_wait(pgconn) + assert res.get_value(0, 0) == b"8" + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.send_prepare(b"prep", b"select $1::int + $2::int") + with pytest.raises(psycopg.OperationalError): + pgconn.send_query_prepared(b"prep", [b"3", b"5"]) + + +def test_send_prepare_types(pgconn): + pgconn.send_prepare(b"prep", b"select $1 + $2", [23, 23]) + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + pgconn.send_query_prepared(b"prep", [b"3", b"5"]) + (res,) = execute_wait(pgconn) + assert res.get_value(0, 0) == b"8" + + +def test_send_prepared_binary_in(pgconn): + val = b"foo\00bar" + pgconn.send_prepare(b"", b"select length($1::bytea), length($2::bytea)") + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + pgconn.send_query_prepared(b"", [val, val], param_formats=[0, 1]) + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == b"3" + assert res.get_value(0, 1) == b"7" + + with pytest.raises(ValueError): + pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1]) + + +@pytest.mark.parametrize("fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]) +def test_send_prepared_binary_out(pgconn, fmt, out): + val = b"foo\00bar" + pgconn.send_prepare(b"", b"select $1::bytea") + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + pgconn.send_query_prepared(b"", [val], param_formats=[1], result_format=fmt) + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == out + + +def test_send_describe_prepared(pgconn): + pgconn.send_prepare(b"prep", b"select $1::int8 + $2::int8 as fld") + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + pgconn.send_describe_prepared(b"prep") + (res,) = execute_wait(pgconn) + assert res.nfields == 1 + assert res.ntuples == 0 + assert res.fname(0) == b"fld" + assert res.ftype(0) == 20 + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.send_describe_prepared(b"prep") + + +@pytest.mark.crdb_skip("server-side cursor") +def test_send_describe_portal(pgconn): + res = pgconn.exec_( + b""" + begin; + declare cur cursor for select * from generate_series(1,10) foo; + """ + ) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + pgconn.send_describe_portal(b"cur") + (res,) = execute_wait(pgconn) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + assert res.nfields == 1 + assert res.fname(0) == b"foo" + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.send_describe_portal(b"cur") diff --git a/tests/pq/test_conninfo.py b/tests/pq/test_conninfo.py new file mode 100644 index 0000000..64d8b8f --- /dev/null +++ b/tests/pq/test_conninfo.py @@ -0,0 +1,48 @@ +import pytest + +import psycopg +from psycopg import pq + + +def test_defaults(monkeypatch): + monkeypatch.setenv("PGPORT", "15432") + defs = pq.Conninfo.get_defaults() + assert len(defs) > 20 + port = [d for d in defs if d.keyword == b"port"][0] + assert port.envvar == b"PGPORT" + assert port.compiled == b"5432" + assert port.val == b"15432" + assert port.label == b"Database-Port" + assert port.dispchar == b"" + assert port.dispsize == 6 + + +@pytest.mark.libpq(">= 10") +def test_conninfo_parse(): + infos = pq.Conninfo.parse( + b"postgresql://host1:123,host2:456/somedb" + b"?target_session_attrs=any&application_name=myapp" + ) + info = {i.keyword: i.val for i in infos if i.val is not None} + assert info[b"host"] == b"host1,host2" + assert info[b"port"] == b"123,456" + assert info[b"dbname"] == b"somedb" + assert info[b"application_name"] == b"myapp" + + +@pytest.mark.libpq("< 10") +def test_conninfo_parse_96(): + conninfo = pq.Conninfo.parse( + b"postgresql://other@localhost/otherdb" + b"?connect_timeout=10&application_name=myapp" + ) + info = {i.keyword: i.val for i in conninfo if i.val is not None} + assert info[b"host"] == b"localhost" + assert info[b"dbname"] == b"otherdb" + assert info[b"application_name"] == b"myapp" + + +def test_conninfo_parse_bad(): + with pytest.raises(psycopg.OperationalError) as e: + pq.Conninfo.parse(b"bad_conninfo=") + assert "bad_conninfo" in str(e.value) diff --git a/tests/pq/test_copy.py b/tests/pq/test_copy.py new file mode 100644 index 0000000..383d272 --- /dev/null +++ b/tests/pq/test_copy.py @@ -0,0 +1,174 @@ +import pytest + +import psycopg +from psycopg import pq + +pytestmark = pytest.mark.crdb_skip("copy") + +sample_values = "values (10::int, 20::int, 'hello'::text), (40, NULL, 'world')" + +sample_tabledef = "col1 int primary key, col2 int, data text" + +sample_text = b"""\ +10\t20\thello +40\t\\N\tworld +""" + +sample_binary_value = """ +5047 434f 5059 0aff 0d0a 00 +00 0000 0000 0000 00 +00 0300 0000 0400 0000 0a00 0000 0400 0000 1400 0000 0568 656c 6c6f + +0003 0000 0004 0000 0028 ffff ffff 0000 0005 776f 726c 64 + +ff ff +""" + +sample_binary_rows = [ + bytes.fromhex("".join(row.split())) for row in sample_binary_value.split("\n\n") +] + +sample_binary = b"".join(sample_binary_rows) + + +def test_put_data_no_copy(pgconn): + with pytest.raises(psycopg.OperationalError): + pgconn.put_copy_data(b"wat") + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.put_copy_data(b"wat") + + +def test_put_end_no_copy(pgconn): + with pytest.raises(psycopg.OperationalError): + pgconn.put_copy_end() + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.put_copy_end() + + +def test_copy_out(pgconn): + ensure_table(pgconn, sample_tabledef) + res = pgconn.exec_(b"copy copy_in from stdin") + assert res.status == pq.ExecStatus.COPY_IN + + for i in range(10): + data = [] + for j in range(20): + data.append( + f"""\ +{i * 20 + j}\t{j}\t{'X' * (i * 20 + j)} +""" + ) + rv = pgconn.put_copy_data("".join(data).encode("ascii")) + assert rv > 0 + + rv = pgconn.put_copy_end() + assert rv > 0 + + res = pgconn.get_result() + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + res = pgconn.exec_( + b"select min(col1), max(col1), count(*), max(length(data)) from copy_in" + ) + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + assert res.get_value(0, 0) == b"0" + assert res.get_value(0, 1) == b"199" + assert res.get_value(0, 2) == b"200" + assert res.get_value(0, 3) == b"199" + + +def test_copy_out_err(pgconn): + ensure_table(pgconn, sample_tabledef) + res = pgconn.exec_(b"copy copy_in from stdin") + assert res.status == pq.ExecStatus.COPY_IN + + for i in range(10): + data = [] + for j in range(20): + data.append( + f"""\ +{i * 20 + j}\thardly a number\tnope +""" + ) + rv = pgconn.put_copy_data("".join(data).encode("ascii")) + assert rv > 0 + + rv = pgconn.put_copy_end() + assert rv > 0 + + res = pgconn.get_result() + assert res.status == pq.ExecStatus.FATAL_ERROR + assert b"hardly a number" in res.error_message + + res = pgconn.exec_(b"select count(*) from copy_in") + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + assert res.get_value(0, 0) == b"0" + + +def test_copy_out_error_end(pgconn): + ensure_table(pgconn, sample_tabledef) + res = pgconn.exec_(b"copy copy_in from stdin") + assert res.status == pq.ExecStatus.COPY_IN + + for i in range(10): + data = [] + for j in range(20): + data.append( + f"""\ +{i * 20 + j}\t{j}\t{'X' * (i * 20 + j)} +""" + ) + rv = pgconn.put_copy_data("".join(data).encode("ascii")) + assert rv > 0 + + rv = pgconn.put_copy_end(b"nuttengoggenio") + assert rv > 0 + + res = pgconn.get_result() + assert res.status == pq.ExecStatus.FATAL_ERROR + assert b"nuttengoggenio" in res.error_message + + res = pgconn.exec_(b"select count(*) from copy_in") + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + assert res.get_value(0, 0) == b"0" + + +def test_get_data_no_copy(pgconn): + with pytest.raises(psycopg.OperationalError): + pgconn.get_copy_data(0) + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.get_copy_data(0) + + +@pytest.mark.parametrize("format", [pq.Format.TEXT, pq.Format.BINARY]) +def test_copy_out_read(pgconn, format): + stmt = f"copy ({sample_values}) to stdout (format {format.name})" + res = pgconn.exec_(stmt.encode("ascii")) + assert res.status == pq.ExecStatus.COPY_OUT + assert res.binary_tuples == format + + if format == pq.Format.TEXT: + want = [row + b"\n" for row in sample_text.splitlines()] + else: + want = sample_binary_rows + + for row in want: + nbytes, data = pgconn.get_copy_data(0) + assert nbytes == len(data) + assert data == row + + assert pgconn.get_copy_data(0) == (-1, b"") + + res = pgconn.get_result() + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + +def ensure_table(pgconn, tabledef, name="copy_in"): + pgconn.exec_(f"drop table if exists {name}".encode("ascii")) + pgconn.exec_(f"create table {name} ({tabledef})".encode("ascii")) diff --git a/tests/pq/test_escaping.py b/tests/pq/test_escaping.py new file mode 100644 index 0000000..ad88d8a --- /dev/null +++ b/tests/pq/test_escaping.py @@ -0,0 +1,188 @@ +import pytest + +import psycopg +from psycopg import pq + +from ..fix_crdb import crdb_scs_off + + +@pytest.mark.parametrize( + "data, want", + [ + (b"", b"''"), + (b"hello", b"'hello'"), + (b"foo'bar", b"'foo''bar'"), + (b"foo\\bar", b" E'foo\\\\bar'"), + ], +) +def test_escape_literal(pgconn, data, want): + esc = pq.Escaping(pgconn) + out = esc.escape_literal(data) + assert out == want + + +@pytest.mark.parametrize("scs", ["on", crdb_scs_off("off")]) +def test_escape_literal_1char(pgconn, scs): + res = pgconn.exec_(f"set standard_conforming_strings to {scs}".encode("ascii")) + assert res.status == pq.ExecStatus.COMMAND_OK + esc = pq.Escaping(pgconn) + special = {b"'": b"''''", b"\\": b" E'\\\\'"} + for c in range(1, 128): + data = bytes([c]) + rv = esc.escape_literal(data) + exp = special.get(data) or b"'%s'" % data + assert rv == exp + + +def test_escape_literal_noconn(pgconn): + esc = pq.Escaping() + with pytest.raises(psycopg.OperationalError): + esc.escape_literal(b"hi") + + esc = pq.Escaping(pgconn) + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + esc.escape_literal(b"hi") + + +@pytest.mark.parametrize( + "data, want", + [ + (b"", b'""'), + (b"hello", b'"hello"'), + (b'foo"bar', b'"foo""bar"'), + (b"foo\\bar", b'"foo\\bar"'), + ], +) +def test_escape_identifier(pgconn, data, want): + esc = pq.Escaping(pgconn) + out = esc.escape_identifier(data) + assert out == want + + +@pytest.mark.parametrize("scs", ["on", crdb_scs_off("off")]) +def test_escape_identifier_1char(pgconn, scs): + res = pgconn.exec_(f"set standard_conforming_strings to {scs}".encode("ascii")) + assert res.status == pq.ExecStatus.COMMAND_OK + esc = pq.Escaping(pgconn) + special = {b'"': b'""""', b"\\": b'"\\"'} + for c in range(1, 128): + data = bytes([c]) + rv = esc.escape_identifier(data) + exp = special.get(data) or b'"%s"' % data + assert rv == exp + + +def test_escape_identifier_noconn(pgconn): + esc = pq.Escaping() + with pytest.raises(psycopg.OperationalError): + esc.escape_identifier(b"hi") + + esc = pq.Escaping(pgconn) + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + esc.escape_identifier(b"hi") + + +@pytest.mark.parametrize( + "data, want", + [ + (b"", b""), + (b"hello", b"hello"), + (b"foo'bar", b"foo''bar"), + (b"foo\\bar", b"foo\\bar"), + ], +) +def test_escape_string(pgconn, data, want): + esc = pq.Escaping(pgconn) + out = esc.escape_string(data) + assert out == want + + +@pytest.mark.parametrize("scs", ["on", crdb_scs_off("off")]) +def test_escape_string_1char(pgconn, scs): + esc = pq.Escaping(pgconn) + res = pgconn.exec_(f"set standard_conforming_strings to {scs}".encode("ascii")) + assert res.status == pq.ExecStatus.COMMAND_OK + special = {b"'": b"''", b"\\": b"\\" if scs == "on" else b"\\\\"} + for c in range(1, 128): + data = bytes([c]) + rv = esc.escape_string(data) + exp = special.get(data) or b"%s" % data + assert rv == exp + + +@pytest.mark.parametrize( + "data, want", + [ + (b"", b""), + (b"hello", b"hello"), + (b"foo'bar", b"foo''bar"), + # This libpq function behaves unpredictably when not passed a conn + (b"foo\\bar", (b"foo\\\\bar", b"foo\\bar")), + ], +) +def test_escape_string_noconn(data, want): + esc = pq.Escaping() + out = esc.escape_string(data) + if isinstance(want, bytes): + assert out == want + else: + assert out in want + + +def test_escape_string_badconn(pgconn): + esc = pq.Escaping(pgconn) + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + esc.escape_string(b"hi") + + +def test_escape_string_badenc(pgconn): + res = pgconn.exec_(b"set client_encoding to 'UTF8'") + assert res.status == pq.ExecStatus.COMMAND_OK + data = "\u20ac".encode()[:-1] + esc = pq.Escaping(pgconn) + with pytest.raises(psycopg.OperationalError): + esc.escape_string(data) + + +@pytest.mark.parametrize("data", [b"hello\00world", b"\00\00\00\00"]) +def test_escape_bytea(pgconn, data): + exp = rb"\x" + b"".join(b"%02x" % c for c in data) + esc = pq.Escaping(pgconn) + rv = esc.escape_bytea(data) + assert rv == exp + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + esc.escape_bytea(data) + + +def test_escape_noconn(pgconn): + data = bytes(range(256)) + esc = pq.Escaping() + escdata = esc.escape_bytea(data) + res = pgconn.exec_params(b"select '%s'::bytea" % escdata, [], result_format=1) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == data + + +def test_escape_1char(pgconn): + esc = pq.Escaping(pgconn) + for c in range(256): + rv = esc.escape_bytea(bytes([c])) + exp = rb"\x%02x" % c + assert rv == exp + + +@pytest.mark.parametrize("data", [b"hello\00world", b"\00\00\00\00"]) +def test_unescape_bytea(pgconn, data): + enc = rb"\x" + b"".join(b"%02x" % c for c in data) + esc = pq.Escaping(pgconn) + rv = esc.unescape_bytea(enc) + assert rv == data + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + esc.unescape_bytea(data) diff --git a/tests/pq/test_exec.py b/tests/pq/test_exec.py new file mode 100644 index 0000000..86c30c0 --- /dev/null +++ b/tests/pq/test_exec.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 + +import pytest + +import psycopg +from psycopg import pq + + +def test_exec_none(pgconn): + with pytest.raises(TypeError): + pgconn.exec_(None) + + +def test_exec(pgconn): + res = pgconn.exec_(b"select 'hel' || 'lo'") + assert res.get_value(0, 0) == b"hello" + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.exec_(b"select 'hello'") + + +def test_exec_params(pgconn): + res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"]) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == b"8" + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"]) + + +def test_exec_params_empty(pgconn): + res = pgconn.exec_params(b"select 8::int", []) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == b"8" + + +def test_exec_params_types(pgconn): + res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23]) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == b"8" + assert res.ftype(0) == 1700 + assert res.get_value(0, 1) == b"8" + assert res.ftype(1) == 23 + + with pytest.raises(ValueError): + pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700]) + + +def test_exec_params_nulls(pgconn): + res = pgconn.exec_params(b"select $1::text, $2::text, $3::text", [b"hi", b"", None]) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == b"hi" + assert res.get_value(0, 1) == b"" + assert res.get_value(0, 2) is None + + +def test_exec_params_binary_in(pgconn): + val = b"foo\00bar" + res = pgconn.exec_params( + b"select length($1::bytea), length($2::bytea)", + [val, val], + param_formats=[0, 1], + ) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == b"3" + assert res.get_value(0, 1) == b"7" + + with pytest.raises(ValueError): + pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1]) + + +@pytest.mark.parametrize("fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]) +def test_exec_params_binary_out(pgconn, fmt, out): + val = b"foo\00bar" + res = pgconn.exec_params( + b"select $1::bytea", [val], param_formats=[1], result_format=fmt + ) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == out + + +def test_prepare(pgconn): + res = pgconn.prepare(b"prep", b"select $1::int + $2::int") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + res = pgconn.exec_prepared(b"prep", [b"3", b"5"]) + assert res.get_value(0, 0) == b"8" + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.prepare(b"prep", b"select $1::int + $2::int") + with pytest.raises(psycopg.OperationalError): + pgconn.exec_prepared(b"prep", [b"3", b"5"]) + + +def test_prepare_types(pgconn): + res = pgconn.prepare(b"prep", b"select $1 + $2", [23, 23]) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + res = pgconn.exec_prepared(b"prep", [b"3", b"5"]) + assert res.get_value(0, 0) == b"8" + + +def test_exec_prepared_binary_in(pgconn): + val = b"foo\00bar" + res = pgconn.prepare(b"", b"select length($1::bytea), length($2::bytea)") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + res = pgconn.exec_prepared(b"", [val, val], param_formats=[0, 1]) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == b"3" + assert res.get_value(0, 1) == b"7" + + with pytest.raises(ValueError): + pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1]) + + +@pytest.mark.parametrize("fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]) +def test_exec_prepared_binary_out(pgconn, fmt, out): + val = b"foo\00bar" + res = pgconn.prepare(b"", b"select $1::bytea") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + res = pgconn.exec_prepared(b"", [val], param_formats=[1], result_format=fmt) + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.get_value(0, 0) == out + + +@pytest.mark.crdb_skip("server-side cursor") +def test_describe_portal(pgconn): + res = pgconn.exec_( + b""" + begin; + declare cur cursor for select * from generate_series(1,10) foo; + """ + ) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + res = pgconn.describe_portal(b"cur") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + assert res.nfields == 1 + assert res.fname(0) == b"foo" + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.describe_portal(b"cur") diff --git a/tests/pq/test_misc.py b/tests/pq/test_misc.py new file mode 100644 index 0000000..599758f --- /dev/null +++ b/tests/pq/test_misc.py @@ -0,0 +1,83 @@ +import pytest + +import psycopg +from psycopg import pq + + +def test_error_message(pgconn): + res = pgconn.exec_(b"wat") + assert res.status == pq.ExecStatus.FATAL_ERROR + msg = pq.error_message(pgconn) + assert "wat" in msg + assert msg == pq.error_message(res) + primary = res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY) + assert primary.decode("ascii") in msg + + with pytest.raises(TypeError): + pq.error_message(None) # type: ignore[arg-type] + + res.clear() + assert pq.error_message(res) == "no details available" + pgconn.finish() + assert "NULL" in pq.error_message(pgconn) + + +@pytest.mark.crdb_skip("encoding") +def test_error_message_encoding(pgconn): + res = pgconn.exec_(b"set client_encoding to latin9") + assert res.status == pq.ExecStatus.COMMAND_OK + + res = pgconn.exec_('select 1 from "foo\u20acbar"'.encode("latin9")) + assert res.status == pq.ExecStatus.FATAL_ERROR + + msg = pq.error_message(pgconn) + assert "foo\u20acbar" in msg + + msg = pq.error_message(res) + assert "foo\ufffdbar" in msg + + msg = pq.error_message(res, encoding="latin9") + assert "foo\u20acbar" in msg + + msg = pq.error_message(res, encoding="ascii") + assert "foo\ufffdbar" in msg + + +def test_make_empty_result(pgconn): + pgconn.exec_(b"wat") + res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR) + assert res.status == pq.ExecStatus.FATAL_ERROR + assert b"wat" in res.error_message + + pgconn.finish() + res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR) + assert res.status == pq.ExecStatus.FATAL_ERROR + assert res.error_message == b"" + + +def test_result_set_attrs(pgconn): + res = pgconn.make_empty_result(pq.ExecStatus.COPY_OUT) + assert res.status == pq.ExecStatus.COPY_OUT + + attrs = [ + pq.PGresAttDesc(b"an_int", 0, 0, 0, 23, 0, 0), + pq.PGresAttDesc(b"a_num", 0, 0, 0, 1700, 0, 0), + pq.PGresAttDesc(b"a_bin_text", 0, 0, 1, 25, 0, 0), + ] + res.set_attributes(attrs) + assert res.nfields == 3 + + assert res.fname(0) == b"an_int" + assert res.fname(1) == b"a_num" + assert res.fname(2) == b"a_bin_text" + + assert res.fformat(0) == 0 + assert res.fformat(1) == 0 + assert res.fformat(2) == 1 + + assert res.ftype(0) == 23 + assert res.ftype(1) == 1700 + assert res.ftype(2) == 25 + + with pytest.raises(psycopg.OperationalError): + res.set_attributes(attrs) diff --git a/tests/pq/test_pgconn.py b/tests/pq/test_pgconn.py new file mode 100644 index 0000000..0566151 --- /dev/null +++ b/tests/pq/test_pgconn.py @@ -0,0 +1,585 @@ +import os +import sys +import ctypes +import logging +import weakref +from select import select + +import pytest + +import psycopg +from psycopg import pq +import psycopg.generators + +from ..utils import gc_collect + + +def test_connectdb(dsn): + conn = pq.PGconn.connect(dsn.encode()) + assert conn.status == pq.ConnStatus.OK, conn.error_message + + +def test_connectdb_error(): + conn = pq.PGconn.connect(b"dbname=psycopg_test_not_for_real") + assert conn.status == pq.ConnStatus.BAD + + +@pytest.mark.parametrize("baddsn", [None, 42]) +def test_connectdb_badtype(baddsn): + with pytest.raises(TypeError): + pq.PGconn.connect(baddsn) + + +def test_connect_async(dsn): + conn = pq.PGconn.connect_start(dsn.encode()) + conn.nonblocking = 1 + while True: + assert conn.status != pq.ConnStatus.BAD + rv = conn.connect_poll() + if rv == pq.PollingStatus.OK: + break + elif rv == pq.PollingStatus.READING: + select([conn.socket], [], []) + elif rv == pq.PollingStatus.WRITING: + select([], [conn.socket], []) + else: + assert False, rv + + assert conn.status == pq.ConnStatus.OK + + conn.finish() + with pytest.raises(psycopg.OperationalError): + conn.connect_poll() + + +@pytest.mark.crdb("skip", reason="connects to any db name") +def test_connect_async_bad(dsn): + parsed_dsn = {e.keyword: e.val for e in pq.Conninfo.parse(dsn.encode()) if e.val} + parsed_dsn[b"dbname"] = b"psycopg_test_not_for_real" + dsn = b" ".join(b"%s='%s'" % item for item in parsed_dsn.items()) + conn = pq.PGconn.connect_start(dsn) + while True: + assert conn.status != pq.ConnStatus.BAD, conn.error_message + rv = conn.connect_poll() + if rv == pq.PollingStatus.FAILED: + break + elif rv == pq.PollingStatus.READING: + select([conn.socket], [], []) + elif rv == pq.PollingStatus.WRITING: + select([], [conn.socket], []) + else: + assert False, rv + + assert conn.status == pq.ConnStatus.BAD + + +def test_finish(pgconn): + assert pgconn.status == pq.ConnStatus.OK + pgconn.finish() + assert pgconn.status == pq.ConnStatus.BAD + pgconn.finish() + assert pgconn.status == pq.ConnStatus.BAD + + +@pytest.mark.slow +def test_weakref(dsn): + conn = pq.PGconn.connect(dsn.encode()) + w = weakref.ref(conn) + conn.finish() + del conn + gc_collect() + assert w() is None + + +@pytest.mark.skipif( + sys.platform == "win32" + and os.environ.get("CI") == "true" + and pq.__impl__ != "python", + reason="can't figure out how to make ctypes run, don't care", +) +def test_pgconn_ptr(pgconn, libpq): + assert isinstance(pgconn.pgconn_ptr, int) + + f = libpq.PQserverVersion + f.argtypes = [ctypes.c_void_p] + f.restype = ctypes.c_int + ver = f(pgconn.pgconn_ptr) + assert ver == pgconn.server_version + + pgconn.finish() + assert pgconn.pgconn_ptr is None + + +def test_info(dsn, pgconn): + info = pgconn.info + assert len(info) > 20 + dbname = [d for d in info if d.keyword == b"dbname"][0] + assert dbname.envvar == b"PGDATABASE" + assert dbname.label == b"Database-Name" + assert dbname.dispchar == b"" + assert dbname.dispsize == 20 + + parsed = pq.Conninfo.parse(dsn.encode()) + # take the name and the user either from params or from env vars + name = [ + o.val or os.environ.get(o.envvar.decode(), "").encode() + for o in parsed + if o.keyword == b"dbname" and o.envvar + ][0] + user = [ + o.val or os.environ.get(o.envvar.decode(), "").encode() + for o in parsed + if o.keyword == b"user" and o.envvar + ][0] + assert dbname.val == (name or user) + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.info + + +@pytest.mark.crdb_skip("pg_terminate_backend") +def test_reset(pgconn): + assert pgconn.status == pq.ConnStatus.OK + pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())") + assert pgconn.status == pq.ConnStatus.BAD + pgconn.reset() + assert pgconn.status == pq.ConnStatus.OK + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.reset() + + assert pgconn.status == pq.ConnStatus.BAD + + +@pytest.mark.crdb_skip("pg_terminate_backend") +def test_reset_async(pgconn): + assert pgconn.status == pq.ConnStatus.OK + pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())") + assert pgconn.status == pq.ConnStatus.BAD + pgconn.reset_start() + while True: + rv = pgconn.reset_poll() + if rv == pq.PollingStatus.READING: + select([pgconn.socket], [], []) + elif rv == pq.PollingStatus.WRITING: + select([], [pgconn.socket], []) + else: + break + + assert rv == pq.PollingStatus.OK + assert pgconn.status == pq.ConnStatus.OK + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.reset_start() + + with pytest.raises(psycopg.OperationalError): + pgconn.reset_poll() + + +def test_ping(dsn): + rv = pq.PGconn.ping(dsn.encode()) + assert rv == pq.Ping.OK + + rv = pq.PGconn.ping(b"port=9999") + assert rv == pq.Ping.NO_RESPONSE + + +def test_db(pgconn): + name = [o.val for o in pgconn.info if o.keyword == b"dbname"][0] + assert pgconn.db == name + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.db + + +def test_user(pgconn): + user = [o.val for o in pgconn.info if o.keyword == b"user"][0] + assert pgconn.user == user + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.user + + +def test_password(pgconn): + # not in info + assert isinstance(pgconn.password, bytes) + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.password + + +def test_host(pgconn): + # might be not in info + assert isinstance(pgconn.host, bytes) + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.host + + +@pytest.mark.libpq(">= 12") +def test_hostaddr(pgconn): + # not in info + assert isinstance(pgconn.hostaddr, bytes), pgconn.hostaddr + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.hostaddr + + +@pytest.mark.libpq("< 12") +def test_hostaddr_missing(pgconn): + with pytest.raises(psycopg.NotSupportedError): + pgconn.hostaddr + + +def test_port(pgconn): + port = [o.val for o in pgconn.info if o.keyword == b"port"][0] + assert pgconn.port == port + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.port + + +@pytest.mark.libpq("< 14") +def test_tty(pgconn): + tty = [o.val for o in pgconn.info if o.keyword == b"tty"][0] + assert pgconn.tty == tty + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.tty + + +@pytest.mark.libpq(">= 14") +def test_tty_noop(pgconn): + assert not any(o.val for o in pgconn.info if o.keyword == b"tty") + assert pgconn.tty == b"" + + +def test_transaction_status(pgconn): + assert pgconn.transaction_status == pq.TransactionStatus.IDLE + pgconn.exec_(b"begin") + assert pgconn.transaction_status == pq.TransactionStatus.INTRANS + pgconn.send_query(b"select 1") + assert pgconn.transaction_status == pq.TransactionStatus.ACTIVE + psycopg.waiting.wait(psycopg.generators.execute(pgconn), pgconn.socket) + assert pgconn.transaction_status == pq.TransactionStatus.INTRANS + pgconn.finish() + assert pgconn.transaction_status == pq.TransactionStatus.UNKNOWN + + +def test_parameter_status(dsn, monkeypatch): + monkeypatch.setenv("PGAPPNAME", "psycopg tests") + pgconn = pq.PGconn.connect(dsn.encode()) + assert pgconn.parameter_status(b"application_name") == b"psycopg tests" + assert pgconn.parameter_status(b"wat") is None + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.parameter_status(b"application_name") + + +@pytest.mark.crdb_skip("encoding") +def test_encoding(pgconn): + res = pgconn.exec_(b"set client_encoding to latin1") + assert res.status == pq.ExecStatus.COMMAND_OK + assert pgconn.parameter_status(b"client_encoding") == b"LATIN1" + + res = pgconn.exec_(b"set client_encoding to 'utf-8'") + assert res.status == pq.ExecStatus.COMMAND_OK + assert pgconn.parameter_status(b"client_encoding") == b"UTF8" + + res = pgconn.exec_(b"set client_encoding to wat") + assert res.status == pq.ExecStatus.FATAL_ERROR + assert pgconn.parameter_status(b"client_encoding") == b"UTF8" + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.parameter_status(b"client_encoding") + + +def test_protocol_version(pgconn): + assert pgconn.protocol_version == 3 + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.protocol_version + + +def test_server_version(pgconn): + assert pgconn.server_version >= 90400 + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.server_version + + +def test_socket(pgconn): + socket = pgconn.socket + assert socket > 0 + pgconn.exec_(f"select pg_terminate_backend({pgconn.backend_pid})".encode()) + # TODO: on my box it raises OperationalError as it should. Not on Travis, + # so let's see if at least an ok value comes out of it. + try: + assert pgconn.socket == socket + except psycopg.OperationalError: + pass + + +def test_error_message(pgconn): + assert pgconn.error_message == b"" + res = pgconn.exec_(b"wat") + assert res.status == pq.ExecStatus.FATAL_ERROR + msg = pgconn.error_message + assert b"wat" in msg + pgconn.finish() + assert b"NULL" in pgconn.error_message # TODO: i10n? + + +def test_backend_pid(pgconn): + assert isinstance(pgconn.backend_pid, int) + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.backend_pid + + +def test_needs_password(pgconn): + # assume connection worked so an eventually needed password wasn't missing + assert pgconn.needs_password is False + pgconn.finish() + pgconn.needs_password + + +def test_used_password(pgconn, dsn, monkeypatch): + assert isinstance(pgconn.used_password, bool) + + # Assume that if a password was passed then it was needed. + # Note that the server may still need a password passed via pgpass + # so it may be that has_password is false but still a password was + # requested by the server and passed by libpq. + info = pq.Conninfo.parse(dsn.encode()) + has_password = ( + "PGPASSWORD" in os.environ + or [i for i in info if i.keyword == b"password"][0].val is not None + ) + if has_password: + assert pgconn.used_password + + pgconn.finish() + pgconn.used_password + + +def test_ssl_in_use(pgconn): + assert isinstance(pgconn.ssl_in_use, bool) + + # If connecting via socket then ssl is not in use + if pgconn.host.startswith(b"/"): + assert not pgconn.ssl_in_use + else: + sslmode = [i.val for i in pgconn.info if i.keyword == b"sslmode"][0] + if sslmode not in (b"disable", b"allow", b"prefer"): + assert pgconn.ssl_in_use + + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + pgconn.ssl_in_use + + +def test_set_single_row_mode(pgconn): + with pytest.raises(psycopg.OperationalError): + pgconn.set_single_row_mode() + + pgconn.send_query(b"select 1") + pgconn.set_single_row_mode() + + +def test_cancel(pgconn): + cancel = pgconn.get_cancel() + cancel.cancel() + cancel.cancel() + pgconn.finish() + cancel.cancel() + with pytest.raises(psycopg.OperationalError): + pgconn.get_cancel() + + +def test_cancel_free(pgconn): + cancel = pgconn.get_cancel() + cancel.free() + with pytest.raises(psycopg.OperationalError): + cancel.cancel() + cancel.free() + + +@pytest.mark.crdb_skip("notify") +def test_notify(pgconn): + assert pgconn.notifies() is None + + pgconn.exec_(b"listen foo") + pgconn.exec_(b"listen bar") + pgconn.exec_(b"notify foo, '1'") + pgconn.exec_(b"notify bar, '2'") + pgconn.exec_(b"notify foo, '3'") + + n = pgconn.notifies() + assert n.relname == b"foo" + assert n.be_pid == pgconn.backend_pid + assert n.extra == b"1" + + n = pgconn.notifies() + assert n.relname == b"bar" + assert n.be_pid == pgconn.backend_pid + assert n.extra == b"2" + + n = pgconn.notifies() + assert n.relname == b"foo" + assert n.be_pid == pgconn.backend_pid + assert n.extra == b"3" + + assert pgconn.notifies() is None + + +@pytest.mark.crdb_skip("do") +def test_notice_nohandler(pgconn): + pgconn.exec_(b"set client_min_messages to notice") + res = pgconn.exec_( + b"do $$begin raise notice 'hello notice'; end$$ language plpgsql" + ) + assert res.status == pq.ExecStatus.COMMAND_OK + + +@pytest.mark.crdb_skip("do") +def test_notice(pgconn): + msgs = [] + + def callback(res): + assert res.status == pq.ExecStatus.NONFATAL_ERROR + msgs.append(res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY)) + + pgconn.exec_(b"set client_min_messages to notice") + pgconn.notice_handler = callback + res = pgconn.exec_( + b"do $$begin raise notice 'hello notice'; end$$ language plpgsql" + ) + + assert res.status == pq.ExecStatus.COMMAND_OK + assert msgs and msgs[0] == b"hello notice" + + +@pytest.mark.crdb_skip("do") +def test_notice_error(pgconn, caplog): + caplog.set_level(logging.WARNING, logger="psycopg") + + def callback(res): + raise Exception("hello error") + + pgconn.exec_(b"set client_min_messages to notice") + pgconn.notice_handler = callback + res = pgconn.exec_( + b"do $$begin raise notice 'hello notice'; end$$ language plpgsql" + ) + + assert res.status == pq.ExecStatus.COMMAND_OK + assert len(caplog.records) == 1 + rec = caplog.records[0] + assert rec.levelno == logging.ERROR + assert "hello error" in rec.message + + +@pytest.mark.libpq("< 14") +@pytest.mark.skipif("sys.platform != 'linux'") +def test_trace_pre14(pgconn, tmp_path): + tracef = tmp_path / "trace" + with tracef.open("w") as f: + pgconn.trace(f.fileno()) + with pytest.raises(psycopg.NotSupportedError): + pgconn.set_trace_flags(0) + pgconn.exec_(b"select 1") + pgconn.untrace() + pgconn.exec_(b"select 2") + traces = tracef.read_text() + assert "select 1" in traces + assert "select 2" not in traces + + +@pytest.mark.libpq(">= 14") +@pytest.mark.skipif("sys.platform != 'linux'") +def test_trace(pgconn, tmp_path): + tracef = tmp_path / "trace" + with tracef.open("w") as f: + pgconn.trace(f.fileno()) + pgconn.set_trace_flags(pq.Trace.SUPPRESS_TIMESTAMPS | pq.Trace.REGRESS_MODE) + pgconn.exec_(b"select 1::int4 as foo") + pgconn.untrace() + pgconn.exec_(b"select 2::int4 as foo") + traces = [line.split("\t") for line in tracef.read_text().splitlines()] + assert traces == [ + ["F", "26", "Query", ' "select 1::int4 as foo"'], + ["B", "28", "RowDescription", ' 1 "foo" NNNN 0 NNNN 4 -1 0'], + ["B", "11", "DataRow", " 1 1 '1'"], + ["B", "13", "CommandComplete", ' "SELECT 1"'], + ["B", "5", "ReadyForQuery", " I"], + ] + + +@pytest.mark.skipif("sys.platform == 'linux'") +def test_trace_nonlinux(pgconn): + with pytest.raises(psycopg.NotSupportedError): + pgconn.trace(1) + + +@pytest.mark.libpq(">= 10") +def test_encrypt_password(pgconn): + enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"md5") + assert enc == b"md594839d658c28a357126f105b9cb14cfc" + + +@pytest.mark.libpq(">= 10") +def test_encrypt_password_scram(pgconn): + enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"scram-sha-256") + assert enc.startswith(b"SCRAM-SHA-256$") + + +@pytest.mark.libpq(">= 10") +def test_encrypt_password_badalgo(pgconn): + with pytest.raises(psycopg.OperationalError): + assert pgconn.encrypt_password(b"psycopg2", b"ashesh", b"wat") + + +@pytest.mark.libpq(">= 10") +@pytest.mark.crdb_skip("password_encryption") +def test_encrypt_password_query(pgconn): + res = pgconn.exec_(b"set password_encryption to 'md5'") + assert res.status == pq.ExecStatus.COMMAND_OK, pgconn.error_message.decode() + enc = pgconn.encrypt_password(b"psycopg2", b"ashesh") + assert enc == b"md594839d658c28a357126f105b9cb14cfc" + + res = pgconn.exec_(b"set password_encryption to 'scram-sha-256'") + assert res.status == pq.ExecStatus.COMMAND_OK + enc = pgconn.encrypt_password(b"psycopg2", b"ashesh") + assert enc.startswith(b"SCRAM-SHA-256$") + + +@pytest.mark.libpq(">= 10") +def test_encrypt_password_closed(pgconn): + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + assert pgconn.encrypt_password(b"psycopg2", b"ashesh") + + +@pytest.mark.libpq("< 10") +def test_encrypt_password_not_supported(pgconn): + # it might even be supported, but not worth the lifetime + with pytest.raises(psycopg.NotSupportedError): + pgconn.encrypt_password(b"psycopg2", b"ashesh", b"md5") + + with pytest.raises(psycopg.NotSupportedError): + pgconn.encrypt_password(b"psycopg2", b"ashesh", b"scram-sha-256") + + +def test_str(pgconn, dsn): + assert "[IDLE]" in str(pgconn) + pgconn.finish() + assert "[BAD]" in str(pgconn) + + pgconn2 = pq.PGconn.connect_start(dsn.encode()) + assert "[" in str(pgconn2) + assert "[IDLE]" not in str(pgconn2) diff --git a/tests/pq/test_pgresult.py b/tests/pq/test_pgresult.py new file mode 100644 index 0000000..3ad818d --- /dev/null +++ b/tests/pq/test_pgresult.py @@ -0,0 +1,207 @@ +import ctypes +import pytest + +from psycopg import pq + + +@pytest.mark.parametrize( + "command, status", + [ + (b"", "EMPTY_QUERY"), + (b"select 1", "TUPLES_OK"), + (b"set timezone to utc", "COMMAND_OK"), + (b"wat", "FATAL_ERROR"), + ], +) +def test_status(pgconn, command, status): + res = pgconn.exec_(command) + assert res.status == getattr(pq.ExecStatus, status) + assert status in repr(res) + + +def test_clear(pgconn): + res = pgconn.exec_(b"select 1") + assert res.status == pq.ExecStatus.TUPLES_OK + res.clear() + assert res.status == pq.ExecStatus.FATAL_ERROR + res.clear() + assert res.status == pq.ExecStatus.FATAL_ERROR + + +def test_pgresult_ptr(pgconn, libpq): + res = pgconn.exec_(b"select 1") + assert isinstance(res.pgresult_ptr, int) + + f = libpq.PQcmdStatus + f.argtypes = [ctypes.c_void_p] + f.restype = ctypes.c_char_p + assert f(res.pgresult_ptr) == b"SELECT 1" + + res.clear() + assert res.pgresult_ptr is None + + +def test_error_message(pgconn): + res = pgconn.exec_(b"select 1") + assert res.error_message == b"" + res = pgconn.exec_(b"select wat") + assert b"wat" in res.error_message + res.clear() + assert res.error_message == b"" + + +def test_error_field(pgconn): + res = pgconn.exec_(b"select wat") + # https://github.com/cockroachdb/cockroach/issues/81794 + assert ( + res.error_field(pq.DiagnosticField.SEVERITY_NONLOCALIZED) + or res.error_field(pq.DiagnosticField.SEVERITY) + ) == b"ERROR" + assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"42703" + assert b"wat" in res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY) + res.clear() + assert res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY) is None + + +@pytest.mark.parametrize("n", range(4)) +def test_ntuples(pgconn, n): + res = pgconn.exec_params(b"select generate_series(1, $1)", [str(n).encode("ascii")]) + assert res.ntuples == n + res.clear() + assert res.ntuples == 0 + + +def test_nfields(pgconn): + res = pgconn.exec_(b"select wat") + assert res.nfields == 0 + res = pgconn.exec_(b"select 1, 2, 3") + assert res.nfields == 3 + res.clear() + assert res.nfields == 0 + + +def test_fname(pgconn): + res = pgconn.exec_(b'select 1 as foo, 2 as "BAR"') + assert res.fname(0) == b"foo" + assert res.fname(1) == b"BAR" + assert res.fname(2) is None + assert res.fname(-1) is None + res.clear() + assert res.fname(0) is None + + +@pytest.mark.crdb("skip", reason="ftable") +def test_ftable_and_col(pgconn): + res = pgconn.exec_( + b""" + drop table if exists t1, t2; + create table t1 as select 1 as f1; + create table t2 as select 2 as f2, 3 as f3; + """ + ) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + res = pgconn.exec_( + b"select f1, f3, 't1'::regclass::oid, 't2'::regclass::oid from t1, t2" + ) + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + + assert res.ftable(0) == int(res.get_value(0, 2).decode("ascii")) + assert res.ftable(1) == int(res.get_value(0, 3).decode("ascii")) + assert res.ftablecol(0) == 1 + assert res.ftablecol(1) == 2 + res.clear() + assert res.ftable(0) == 0 + assert res.ftablecol(0) == 0 + + +@pytest.mark.parametrize("fmt", (0, 1)) +def test_fformat(pgconn, fmt): + res = pgconn.exec_params(b"select 1", [], result_format=fmt) + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + assert res.fformat(0) == fmt + assert res.binary_tuples == fmt + res.clear() + assert res.fformat(0) == 0 + assert res.binary_tuples == 0 + + +def test_ftype(pgconn): + res = pgconn.exec_(b"select 1::int4, 1::numeric, 1::text") + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + assert res.ftype(0) == 23 + assert res.ftype(1) == 1700 + assert res.ftype(2) == 25 + res.clear() + assert res.ftype(0) == 0 + + +def test_fmod(pgconn): + res = pgconn.exec_(b"select 1::int, 1::numeric(10), 1::numeric(10,2)") + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + assert res.fmod(0) == -1 + assert res.fmod(1) == 0xA0004 + assert res.fmod(2) == 0xA0006 + res.clear() + assert res.fmod(0) == 0 + + +def test_fsize(pgconn): + res = pgconn.exec_(b"select 1::int4, 1::bigint, 1::text") + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + assert res.fsize(0) == 4 + assert res.fsize(1) == 8 + assert res.fsize(2) == -1 + res.clear() + assert res.fsize(0) == 0 + + +def test_get_value(pgconn): + res = pgconn.exec_(b"select 'a', '', NULL") + assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message + assert res.get_value(0, 0) == b"a" + assert res.get_value(0, 1) == b"" + assert res.get_value(0, 2) is None + res.clear() + assert res.get_value(0, 0) is None + + +def test_nparams_types(pgconn): + res = pgconn.prepare(b"", b"select $1::int4, $2::text") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + res = pgconn.describe_prepared(b"") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + assert res.nparams == 2 + assert res.param_type(0) == 23 + assert res.param_type(1) == 25 + + res.clear() + assert res.nparams == 0 + assert res.param_type(0) == 0 + + +def test_command_status(pgconn): + res = pgconn.exec_(b"select 1") + assert res.command_status == b"SELECT 1" + res = pgconn.exec_(b"set timezone to utc") + assert res.command_status == b"SET" + res.clear() + assert res.command_status is None + + +def test_command_tuples(pgconn): + res = pgconn.exec_(b"set timezone to utf8") + assert res.command_tuples is None + res = pgconn.exec_(b"select * from generate_series(1, 10)") + assert res.command_tuples == 10 + res.clear() + assert res.command_tuples is None + + +def test_oid_value(pgconn): + res = pgconn.exec_(b"select 1") + assert res.oid_value == 0 + res.clear() + assert res.oid_value == 0 diff --git a/tests/pq/test_pipeline.py b/tests/pq/test_pipeline.py new file mode 100644 index 0000000..00cd54a --- /dev/null +++ b/tests/pq/test_pipeline.py @@ -0,0 +1,161 @@ +import pytest + +import psycopg +from psycopg import pq + + +@pytest.mark.libpq("< 14") +def test_old_libpq(pgconn): + assert pgconn.pipeline_status == 0 + with pytest.raises(psycopg.NotSupportedError): + pgconn.enter_pipeline_mode() + with pytest.raises(psycopg.NotSupportedError): + pgconn.exit_pipeline_mode() + with pytest.raises(psycopg.NotSupportedError): + pgconn.pipeline_sync() + with pytest.raises(psycopg.NotSupportedError): + pgconn.send_flush_request() + + +@pytest.mark.libpq(">= 14") +def test_work_in_progress(pgconn): + assert not pgconn.nonblocking + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"]) + with pytest.raises(psycopg.OperationalError, match="cannot exit pipeline mode"): + pgconn.exit_pipeline_mode() + + +@pytest.mark.libpq(">= 14") +def test_multi_pipelines(pgconn): + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"], param_types=[25]) + pgconn.pipeline_sync() + pgconn.send_query_params(b"select $1", [b"2"], param_types=[25]) + pgconn.pipeline_sync() + + # result from first query + result1 = pgconn.get_result() + assert result1 is not None + assert result1.status == pq.ExecStatus.TUPLES_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # first sync result + sync_result = pgconn.get_result() + assert sync_result is not None + assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC + + # result from second query + result2 = pgconn.get_result() + assert result2 is not None + assert result2.status == pq.ExecStatus.TUPLES_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # second sync result + sync_result = pgconn.get_result() + assert sync_result is not None + assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC + + # pipeline still ON + assert pgconn.pipeline_status == pq.PipelineStatus.ON + + pgconn.exit_pipeline_mode() + + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + + assert result1.get_value(0, 0) == b"1" + assert result2.get_value(0, 0) == b"2" + + +@pytest.mark.libpq(">= 14") +def test_flush_request(pgconn): + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"], param_types=[25]) + pgconn.send_flush_request() + r = pgconn.get_result() + assert r.status == pq.ExecStatus.TUPLES_OK + assert r.get_value(0, 0) == b"1" + pgconn.exit_pipeline_mode() + + +@pytest.fixture +def table(pgconn): + tablename = "pipeline" + pgconn.exec_(f"create table {tablename} (s text)".encode("ascii")) + yield tablename + pgconn.exec_(f"drop table if exists {tablename}".encode("ascii")) + + +@pytest.mark.libpq(">= 14") +def test_pipeline_abort(pgconn, table): + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"]) + pgconn.send_query_params(b"select no_such_function($1)", [b"1"]) + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"2"]) + pgconn.pipeline_sync() + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"3"]) + pgconn.pipeline_sync() + + # result from first INSERT + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.COMMAND_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # error result from second query (SELECT) + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.FATAL_ERROR + + # NULL signals end of result + assert pgconn.get_result() is None + + # pipeline should be aborted, due to previous error + assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED + + # result from second INSERT, aborted due to previous error + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_ABORTED + + # NULL signals end of result + assert pgconn.get_result() is None + + # pipeline is still aborted + assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED + + # sync result + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_SYNC + + # aborted flag is clear, pipeline is on again + assert pgconn.pipeline_status == pq.PipelineStatus.ON + + # result from the third INSERT + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.COMMAND_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # second sync result + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_SYNC + + # NULL signals end of result + assert pgconn.get_result() is None + + pgconn.exit_pipeline_mode() diff --git a/tests/pq/test_pq.py b/tests/pq/test_pq.py new file mode 100644 index 0000000..076c3b6 --- /dev/null +++ b/tests/pq/test_pq.py @@ -0,0 +1,57 @@ +import os + +import pytest + +import psycopg +from psycopg import pq + +from ..utils import check_libpq_version + + +def test_version(): + rv = pq.version() + assert rv > 90500 + assert rv < 200000 # you are good for a while + + +def test_build_version(): + assert pq.__build_version__ and pq.__build_version__ >= 70400 + + +@pytest.mark.skipif("not os.environ.get('PSYCOPG_TEST_WANT_LIBPQ_BUILD')") +def test_want_built_version(): + want = os.environ["PSYCOPG_TEST_WANT_LIBPQ_BUILD"] + got = pq.__build_version__ + assert not check_libpq_version(got, want) + + +@pytest.mark.skipif("not os.environ.get('PSYCOPG_TEST_WANT_LIBPQ_IMPORT')") +def test_want_import_version(): + want = os.environ["PSYCOPG_TEST_WANT_LIBPQ_IMPORT"] + got = pq.version() + assert not check_libpq_version(got, want) + + +# Note: These tests are here because test_pipeline.py tests are all skipped +# when pipeline mode is not supported. + + +@pytest.mark.libpq(">= 14") +def test_pipeline_supported(conn): + assert psycopg.Pipeline.is_supported() + assert psycopg.AsyncPipeline.is_supported() + + with conn.pipeline(): + pass + + +@pytest.mark.libpq("< 14") +def test_pipeline_not_supported(conn): + assert not psycopg.Pipeline.is_supported() + assert not psycopg.AsyncPipeline.is_supported() + + with pytest.raises(psycopg.NotSupportedError) as exc: + with conn.pipeline(): + pass + + assert "too old" in str(exc.value) |