summaryrefslogtreecommitdiffstats
path: root/tests/pq
diff options
context:
space:
mode:
Diffstat (limited to 'tests/pq')
-rw-r--r--tests/pq/__init__.py0
-rw-r--r--tests/pq/test_async.py210
-rw-r--r--tests/pq/test_conninfo.py48
-rw-r--r--tests/pq/test_copy.py174
-rw-r--r--tests/pq/test_escaping.py188
-rw-r--r--tests/pq/test_exec.py146
-rw-r--r--tests/pq/test_misc.py83
-rw-r--r--tests/pq/test_pgconn.py585
-rw-r--r--tests/pq/test_pgresult.py207
-rw-r--r--tests/pq/test_pipeline.py161
-rw-r--r--tests/pq/test_pq.py57
11 files changed, 1859 insertions, 0 deletions
diff --git a/tests/pq/__init__.py b/tests/pq/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/pq/__init__.py
diff --git a/tests/pq/test_async.py b/tests/pq/test_async.py
new file mode 100644
index 0000000..2c3de98
--- /dev/null
+++ b/tests/pq/test_async.py
@@ -0,0 +1,210 @@
+from select import select
+
+import pytest
+
+import psycopg
+from psycopg import pq
+from psycopg.generators import execute
+
+
+def execute_wait(pgconn):
+ return psycopg.waiting.wait(execute(pgconn), pgconn.socket)
+
+
+def test_send_query(pgconn):
+ # This test shows how to process an async query in all its glory
+ pgconn.nonblocking = 1
+
+ # Long query to make sure we have to wait on send
+ pgconn.send_query(
+ b"/* %s */ select 'x' as f from pg_sleep(0.01); select 1 as foo;"
+ % (b"x" * 1_000_000)
+ )
+
+ # send loop
+ waited_on_send = 0
+ while True:
+ f = pgconn.flush()
+ if f == 0:
+ break
+
+ waited_on_send += 1
+
+ rl, wl, xl = select([pgconn.socket], [pgconn.socket], [])
+ assert not (rl and wl)
+ if wl:
+ continue # call flush again()
+ if rl:
+ pgconn.consume_input()
+ continue
+
+ # TODO: this check is not reliable, it fails on travis sometimes
+ # assert waited_on_send
+
+ # read loop
+ results = []
+ while True:
+ pgconn.consume_input()
+ if pgconn.is_busy():
+ select([pgconn.socket], [], [])
+ continue
+ res = pgconn.get_result()
+ if res is None:
+ break
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ results.append(res)
+
+ assert len(results) == 2
+ assert results[0].nfields == 1
+ assert results[0].fname(0) == b"f"
+ assert results[0].get_value(0, 0) == b"x"
+ assert results[1].nfields == 1
+ assert results[1].fname(0) == b"foo"
+ assert results[1].get_value(0, 0) == b"1"
+
+
+def test_send_query_compact_test(pgconn):
+ # Like the above test but use psycopg facilities for compactness
+ pgconn.send_query(
+ b"/* %s */ select 'x' as f from pg_sleep(0.01); select 1 as foo;"
+ % (b"x" * 1_000_000)
+ )
+ results = execute_wait(pgconn)
+
+ assert len(results) == 2
+ assert results[0].nfields == 1
+ assert results[0].fname(0) == b"f"
+ assert results[0].get_value(0, 0) == b"x"
+ assert results[1].nfields == 1
+ assert results[1].fname(0) == b"foo"
+ assert results[1].get_value(0, 0) == b"1"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_query(b"select 1")
+
+
+def test_single_row_mode(pgconn):
+ pgconn.send_query(b"select generate_series(1,2)")
+ pgconn.set_single_row_mode()
+
+ results = execute_wait(pgconn)
+ assert len(results) == 3
+
+ res = results[0]
+ assert res.status == pq.ExecStatus.SINGLE_TUPLE
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"1"
+
+ res = results[1]
+ assert res.status == pq.ExecStatus.SINGLE_TUPLE
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"2"
+
+ res = results[2]
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.ntuples == 0
+
+
+def test_send_query_params(pgconn):
+ pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"8"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_query_params(b"select $1", [b"1"])
+
+
+def test_send_prepare(pgconn):
+ pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+ (res,) = execute_wait(pgconn)
+ assert res.get_value(0, 0) == b"8"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+
+
+def test_send_prepare_types(pgconn):
+ pgconn.send_prepare(b"prep", b"select $1 + $2", [23, 23])
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+ (res,) = execute_wait(pgconn)
+ assert res.get_value(0, 0) == b"8"
+
+
+def test_send_prepared_binary_in(pgconn):
+ val = b"foo\00bar"
+ pgconn.send_prepare(b"", b"select length($1::bytea), length($2::bytea)")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"", [val, val], param_formats=[0, 1])
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"3"
+ assert res.get_value(0, 1) == b"7"
+
+ with pytest.raises(ValueError):
+ pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1])
+
+
+@pytest.mark.parametrize("fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")])
+def test_send_prepared_binary_out(pgconn, fmt, out):
+ val = b"foo\00bar"
+ pgconn.send_prepare(b"", b"select $1::bytea")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"", [val], param_formats=[1], result_format=fmt)
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == out
+
+
+def test_send_describe_prepared(pgconn):
+ pgconn.send_prepare(b"prep", b"select $1::int8 + $2::int8 as fld")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_describe_prepared(b"prep")
+ (res,) = execute_wait(pgconn)
+ assert res.nfields == 1
+ assert res.ntuples == 0
+ assert res.fname(0) == b"fld"
+ assert res.ftype(0) == 20
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_describe_prepared(b"prep")
+
+
+@pytest.mark.crdb_skip("server-side cursor")
+def test_send_describe_portal(pgconn):
+ res = pgconn.exec_(
+ b"""
+ begin;
+ declare cur cursor for select * from generate_series(1,10) foo;
+ """
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_describe_portal(b"cur")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ assert res.nfields == 1
+ assert res.fname(0) == b"foo"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_describe_portal(b"cur")
diff --git a/tests/pq/test_conninfo.py b/tests/pq/test_conninfo.py
new file mode 100644
index 0000000..64d8b8f
--- /dev/null
+++ b/tests/pq/test_conninfo.py
@@ -0,0 +1,48 @@
+import pytest
+
+import psycopg
+from psycopg import pq
+
+
+def test_defaults(monkeypatch):
+ monkeypatch.setenv("PGPORT", "15432")
+ defs = pq.Conninfo.get_defaults()
+ assert len(defs) > 20
+ port = [d for d in defs if d.keyword == b"port"][0]
+ assert port.envvar == b"PGPORT"
+ assert port.compiled == b"5432"
+ assert port.val == b"15432"
+ assert port.label == b"Database-Port"
+ assert port.dispchar == b""
+ assert port.dispsize == 6
+
+
+@pytest.mark.libpq(">= 10")
+def test_conninfo_parse():
+ infos = pq.Conninfo.parse(
+ b"postgresql://host1:123,host2:456/somedb"
+ b"?target_session_attrs=any&application_name=myapp"
+ )
+ info = {i.keyword: i.val for i in infos if i.val is not None}
+ assert info[b"host"] == b"host1,host2"
+ assert info[b"port"] == b"123,456"
+ assert info[b"dbname"] == b"somedb"
+ assert info[b"application_name"] == b"myapp"
+
+
+@pytest.mark.libpq("< 10")
+def test_conninfo_parse_96():
+ conninfo = pq.Conninfo.parse(
+ b"postgresql://other@localhost/otherdb"
+ b"?connect_timeout=10&application_name=myapp"
+ )
+ info = {i.keyword: i.val for i in conninfo if i.val is not None}
+ assert info[b"host"] == b"localhost"
+ assert info[b"dbname"] == b"otherdb"
+ assert info[b"application_name"] == b"myapp"
+
+
+def test_conninfo_parse_bad():
+ with pytest.raises(psycopg.OperationalError) as e:
+ pq.Conninfo.parse(b"bad_conninfo=")
+ assert "bad_conninfo" in str(e.value)
diff --git a/tests/pq/test_copy.py b/tests/pq/test_copy.py
new file mode 100644
index 0000000..383d272
--- /dev/null
+++ b/tests/pq/test_copy.py
@@ -0,0 +1,174 @@
+import pytest
+
+import psycopg
+from psycopg import pq
+
+pytestmark = pytest.mark.crdb_skip("copy")
+
+sample_values = "values (10::int, 20::int, 'hello'::text), (40, NULL, 'world')"
+
+sample_tabledef = "col1 int primary key, col2 int, data text"
+
+sample_text = b"""\
+10\t20\thello
+40\t\\N\tworld
+"""
+
+sample_binary_value = """
+5047 434f 5059 0aff 0d0a 00
+00 0000 0000 0000 00
+00 0300 0000 0400 0000 0a00 0000 0400 0000 1400 0000 0568 656c 6c6f
+
+0003 0000 0004 0000 0028 ffff ffff 0000 0005 776f 726c 64
+
+ff ff
+"""
+
+sample_binary_rows = [
+ bytes.fromhex("".join(row.split())) for row in sample_binary_value.split("\n\n")
+]
+
+sample_binary = b"".join(sample_binary_rows)
+
+
+def test_put_data_no_copy(pgconn):
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.put_copy_data(b"wat")
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.put_copy_data(b"wat")
+
+
+def test_put_end_no_copy(pgconn):
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.put_copy_end()
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.put_copy_end()
+
+
+def test_copy_out(pgconn):
+ ensure_table(pgconn, sample_tabledef)
+ res = pgconn.exec_(b"copy copy_in from stdin")
+ assert res.status == pq.ExecStatus.COPY_IN
+
+ for i in range(10):
+ data = []
+ for j in range(20):
+ data.append(
+ f"""\
+{i * 20 + j}\t{j}\t{'X' * (i * 20 + j)}
+"""
+ )
+ rv = pgconn.put_copy_data("".join(data).encode("ascii"))
+ assert rv > 0
+
+ rv = pgconn.put_copy_end()
+ assert rv > 0
+
+ res = pgconn.get_result()
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ res = pgconn.exec_(
+ b"select min(col1), max(col1), count(*), max(length(data)) from copy_in"
+ )
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+ assert res.get_value(0, 0) == b"0"
+ assert res.get_value(0, 1) == b"199"
+ assert res.get_value(0, 2) == b"200"
+ assert res.get_value(0, 3) == b"199"
+
+
+def test_copy_out_err(pgconn):
+ ensure_table(pgconn, sample_tabledef)
+ res = pgconn.exec_(b"copy copy_in from stdin")
+ assert res.status == pq.ExecStatus.COPY_IN
+
+ for i in range(10):
+ data = []
+ for j in range(20):
+ data.append(
+ f"""\
+{i * 20 + j}\thardly a number\tnope
+"""
+ )
+ rv = pgconn.put_copy_data("".join(data).encode("ascii"))
+ assert rv > 0
+
+ rv = pgconn.put_copy_end()
+ assert rv > 0
+
+ res = pgconn.get_result()
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ assert b"hardly a number" in res.error_message
+
+ res = pgconn.exec_(b"select count(*) from copy_in")
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+ assert res.get_value(0, 0) == b"0"
+
+
+def test_copy_out_error_end(pgconn):
+ ensure_table(pgconn, sample_tabledef)
+ res = pgconn.exec_(b"copy copy_in from stdin")
+ assert res.status == pq.ExecStatus.COPY_IN
+
+ for i in range(10):
+ data = []
+ for j in range(20):
+ data.append(
+ f"""\
+{i * 20 + j}\t{j}\t{'X' * (i * 20 + j)}
+"""
+ )
+ rv = pgconn.put_copy_data("".join(data).encode("ascii"))
+ assert rv > 0
+
+ rv = pgconn.put_copy_end(b"nuttengoggenio")
+ assert rv > 0
+
+ res = pgconn.get_result()
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ assert b"nuttengoggenio" in res.error_message
+
+ res = pgconn.exec_(b"select count(*) from copy_in")
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+ assert res.get_value(0, 0) == b"0"
+
+
+def test_get_data_no_copy(pgconn):
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.get_copy_data(0)
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.get_copy_data(0)
+
+
+@pytest.mark.parametrize("format", [pq.Format.TEXT, pq.Format.BINARY])
+def test_copy_out_read(pgconn, format):
+ stmt = f"copy ({sample_values}) to stdout (format {format.name})"
+ res = pgconn.exec_(stmt.encode("ascii"))
+ assert res.status == pq.ExecStatus.COPY_OUT
+ assert res.binary_tuples == format
+
+ if format == pq.Format.TEXT:
+ want = [row + b"\n" for row in sample_text.splitlines()]
+ else:
+ want = sample_binary_rows
+
+ for row in want:
+ nbytes, data = pgconn.get_copy_data(0)
+ assert nbytes == len(data)
+ assert data == row
+
+ assert pgconn.get_copy_data(0) == (-1, b"")
+
+ res = pgconn.get_result()
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+
+def ensure_table(pgconn, tabledef, name="copy_in"):
+ pgconn.exec_(f"drop table if exists {name}".encode("ascii"))
+ pgconn.exec_(f"create table {name} ({tabledef})".encode("ascii"))
diff --git a/tests/pq/test_escaping.py b/tests/pq/test_escaping.py
new file mode 100644
index 0000000..ad88d8a
--- /dev/null
+++ b/tests/pq/test_escaping.py
@@ -0,0 +1,188 @@
+import pytest
+
+import psycopg
+from psycopg import pq
+
+from ..fix_crdb import crdb_scs_off
+
+
+@pytest.mark.parametrize(
+ "data, want",
+ [
+ (b"", b"''"),
+ (b"hello", b"'hello'"),
+ (b"foo'bar", b"'foo''bar'"),
+ (b"foo\\bar", b" E'foo\\\\bar'"),
+ ],
+)
+def test_escape_literal(pgconn, data, want):
+ esc = pq.Escaping(pgconn)
+ out = esc.escape_literal(data)
+ assert out == want
+
+
+@pytest.mark.parametrize("scs", ["on", crdb_scs_off("off")])
+def test_escape_literal_1char(pgconn, scs):
+ res = pgconn.exec_(f"set standard_conforming_strings to {scs}".encode("ascii"))
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ esc = pq.Escaping(pgconn)
+ special = {b"'": b"''''", b"\\": b" E'\\\\'"}
+ for c in range(1, 128):
+ data = bytes([c])
+ rv = esc.escape_literal(data)
+ exp = special.get(data) or b"'%s'" % data
+ assert rv == exp
+
+
+def test_escape_literal_noconn(pgconn):
+ esc = pq.Escaping()
+ with pytest.raises(psycopg.OperationalError):
+ esc.escape_literal(b"hi")
+
+ esc = pq.Escaping(pgconn)
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ esc.escape_literal(b"hi")
+
+
+@pytest.mark.parametrize(
+ "data, want",
+ [
+ (b"", b'""'),
+ (b"hello", b'"hello"'),
+ (b'foo"bar', b'"foo""bar"'),
+ (b"foo\\bar", b'"foo\\bar"'),
+ ],
+)
+def test_escape_identifier(pgconn, data, want):
+ esc = pq.Escaping(pgconn)
+ out = esc.escape_identifier(data)
+ assert out == want
+
+
+@pytest.mark.parametrize("scs", ["on", crdb_scs_off("off")])
+def test_escape_identifier_1char(pgconn, scs):
+ res = pgconn.exec_(f"set standard_conforming_strings to {scs}".encode("ascii"))
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ esc = pq.Escaping(pgconn)
+ special = {b'"': b'""""', b"\\": b'"\\"'}
+ for c in range(1, 128):
+ data = bytes([c])
+ rv = esc.escape_identifier(data)
+ exp = special.get(data) or b'"%s"' % data
+ assert rv == exp
+
+
+def test_escape_identifier_noconn(pgconn):
+ esc = pq.Escaping()
+ with pytest.raises(psycopg.OperationalError):
+ esc.escape_identifier(b"hi")
+
+ esc = pq.Escaping(pgconn)
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ esc.escape_identifier(b"hi")
+
+
+@pytest.mark.parametrize(
+ "data, want",
+ [
+ (b"", b""),
+ (b"hello", b"hello"),
+ (b"foo'bar", b"foo''bar"),
+ (b"foo\\bar", b"foo\\bar"),
+ ],
+)
+def test_escape_string(pgconn, data, want):
+ esc = pq.Escaping(pgconn)
+ out = esc.escape_string(data)
+ assert out == want
+
+
+@pytest.mark.parametrize("scs", ["on", crdb_scs_off("off")])
+def test_escape_string_1char(pgconn, scs):
+ esc = pq.Escaping(pgconn)
+ res = pgconn.exec_(f"set standard_conforming_strings to {scs}".encode("ascii"))
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ special = {b"'": b"''", b"\\": b"\\" if scs == "on" else b"\\\\"}
+ for c in range(1, 128):
+ data = bytes([c])
+ rv = esc.escape_string(data)
+ exp = special.get(data) or b"%s" % data
+ assert rv == exp
+
+
+@pytest.mark.parametrize(
+ "data, want",
+ [
+ (b"", b""),
+ (b"hello", b"hello"),
+ (b"foo'bar", b"foo''bar"),
+ # This libpq function behaves unpredictably when not passed a conn
+ (b"foo\\bar", (b"foo\\\\bar", b"foo\\bar")),
+ ],
+)
+def test_escape_string_noconn(data, want):
+ esc = pq.Escaping()
+ out = esc.escape_string(data)
+ if isinstance(want, bytes):
+ assert out == want
+ else:
+ assert out in want
+
+
+def test_escape_string_badconn(pgconn):
+ esc = pq.Escaping(pgconn)
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ esc.escape_string(b"hi")
+
+
+def test_escape_string_badenc(pgconn):
+ res = pgconn.exec_(b"set client_encoding to 'UTF8'")
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ data = "\u20ac".encode()[:-1]
+ esc = pq.Escaping(pgconn)
+ with pytest.raises(psycopg.OperationalError):
+ esc.escape_string(data)
+
+
+@pytest.mark.parametrize("data", [b"hello\00world", b"\00\00\00\00"])
+def test_escape_bytea(pgconn, data):
+ exp = rb"\x" + b"".join(b"%02x" % c for c in data)
+ esc = pq.Escaping(pgconn)
+ rv = esc.escape_bytea(data)
+ assert rv == exp
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ esc.escape_bytea(data)
+
+
+def test_escape_noconn(pgconn):
+ data = bytes(range(256))
+ esc = pq.Escaping()
+ escdata = esc.escape_bytea(data)
+ res = pgconn.exec_params(b"select '%s'::bytea" % escdata, [], result_format=1)
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == data
+
+
+def test_escape_1char(pgconn):
+ esc = pq.Escaping(pgconn)
+ for c in range(256):
+ rv = esc.escape_bytea(bytes([c]))
+ exp = rb"\x%02x" % c
+ assert rv == exp
+
+
+@pytest.mark.parametrize("data", [b"hello\00world", b"\00\00\00\00"])
+def test_unescape_bytea(pgconn, data):
+ enc = rb"\x" + b"".join(b"%02x" % c for c in data)
+ esc = pq.Escaping(pgconn)
+ rv = esc.unescape_bytea(enc)
+ assert rv == data
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ esc.unescape_bytea(data)
diff --git a/tests/pq/test_exec.py b/tests/pq/test_exec.py
new file mode 100644
index 0000000..86c30c0
--- /dev/null
+++ b/tests/pq/test_exec.py
@@ -0,0 +1,146 @@
+#!/usr/bin/env python3
+
+import pytest
+
+import psycopg
+from psycopg import pq
+
+
+def test_exec_none(pgconn):
+ with pytest.raises(TypeError):
+ pgconn.exec_(None)
+
+
+def test_exec(pgconn):
+ res = pgconn.exec_(b"select 'hel' || 'lo'")
+ assert res.get_value(0, 0) == b"hello"
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.exec_(b"select 'hello'")
+
+
+def test_exec_params(pgconn):
+ res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"8"
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
+
+
+def test_exec_params_empty(pgconn):
+ res = pgconn.exec_params(b"select 8::int", [])
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"8"
+
+
+def test_exec_params_types(pgconn):
+ res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23])
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"8"
+ assert res.ftype(0) == 1700
+ assert res.get_value(0, 1) == b"8"
+ assert res.ftype(1) == 23
+
+ with pytest.raises(ValueError):
+ pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700])
+
+
+def test_exec_params_nulls(pgconn):
+ res = pgconn.exec_params(b"select $1::text, $2::text, $3::text", [b"hi", b"", None])
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"hi"
+ assert res.get_value(0, 1) == b""
+ assert res.get_value(0, 2) is None
+
+
+def test_exec_params_binary_in(pgconn):
+ val = b"foo\00bar"
+ res = pgconn.exec_params(
+ b"select length($1::bytea), length($2::bytea)",
+ [val, val],
+ param_formats=[0, 1],
+ )
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"3"
+ assert res.get_value(0, 1) == b"7"
+
+ with pytest.raises(ValueError):
+ pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1])
+
+
+@pytest.mark.parametrize("fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")])
+def test_exec_params_binary_out(pgconn, fmt, out):
+ val = b"foo\00bar"
+ res = pgconn.exec_params(
+ b"select $1::bytea", [val], param_formats=[1], result_format=fmt
+ )
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == out
+
+
+def test_prepare(pgconn):
+ res = pgconn.prepare(b"prep", b"select $1::int + $2::int")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ res = pgconn.exec_prepared(b"prep", [b"3", b"5"])
+ assert res.get_value(0, 0) == b"8"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.prepare(b"prep", b"select $1::int + $2::int")
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.exec_prepared(b"prep", [b"3", b"5"])
+
+
+def test_prepare_types(pgconn):
+ res = pgconn.prepare(b"prep", b"select $1 + $2", [23, 23])
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ res = pgconn.exec_prepared(b"prep", [b"3", b"5"])
+ assert res.get_value(0, 0) == b"8"
+
+
+def test_exec_prepared_binary_in(pgconn):
+ val = b"foo\00bar"
+ res = pgconn.prepare(b"", b"select length($1::bytea), length($2::bytea)")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ res = pgconn.exec_prepared(b"", [val, val], param_formats=[0, 1])
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"3"
+ assert res.get_value(0, 1) == b"7"
+
+ with pytest.raises(ValueError):
+ pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1])
+
+
+@pytest.mark.parametrize("fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")])
+def test_exec_prepared_binary_out(pgconn, fmt, out):
+ val = b"foo\00bar"
+ res = pgconn.prepare(b"", b"select $1::bytea")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ res = pgconn.exec_prepared(b"", [val], param_formats=[1], result_format=fmt)
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == out
+
+
+@pytest.mark.crdb_skip("server-side cursor")
+def test_describe_portal(pgconn):
+ res = pgconn.exec_(
+ b"""
+ begin;
+ declare cur cursor for select * from generate_series(1,10) foo;
+ """
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ res = pgconn.describe_portal(b"cur")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ assert res.nfields == 1
+ assert res.fname(0) == b"foo"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.describe_portal(b"cur")
diff --git a/tests/pq/test_misc.py b/tests/pq/test_misc.py
new file mode 100644
index 0000000..599758f
--- /dev/null
+++ b/tests/pq/test_misc.py
@@ -0,0 +1,83 @@
+import pytest
+
+import psycopg
+from psycopg import pq
+
+
+def test_error_message(pgconn):
+ res = pgconn.exec_(b"wat")
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ msg = pq.error_message(pgconn)
+ assert "wat" in msg
+ assert msg == pq.error_message(res)
+ primary = res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY)
+ assert primary.decode("ascii") in msg
+
+ with pytest.raises(TypeError):
+ pq.error_message(None) # type: ignore[arg-type]
+
+ res.clear()
+ assert pq.error_message(res) == "no details available"
+ pgconn.finish()
+ assert "NULL" in pq.error_message(pgconn)
+
+
+@pytest.mark.crdb_skip("encoding")
+def test_error_message_encoding(pgconn):
+ res = pgconn.exec_(b"set client_encoding to latin9")
+ assert res.status == pq.ExecStatus.COMMAND_OK
+
+ res = pgconn.exec_('select 1 from "foo\u20acbar"'.encode("latin9"))
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+
+ msg = pq.error_message(pgconn)
+ assert "foo\u20acbar" in msg
+
+ msg = pq.error_message(res)
+ assert "foo\ufffdbar" in msg
+
+ msg = pq.error_message(res, encoding="latin9")
+ assert "foo\u20acbar" in msg
+
+ msg = pq.error_message(res, encoding="ascii")
+ assert "foo\ufffdbar" in msg
+
+
+def test_make_empty_result(pgconn):
+ pgconn.exec_(b"wat")
+ res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ assert b"wat" in res.error_message
+
+ pgconn.finish()
+ res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ assert res.error_message == b""
+
+
+def test_result_set_attrs(pgconn):
+ res = pgconn.make_empty_result(pq.ExecStatus.COPY_OUT)
+ assert res.status == pq.ExecStatus.COPY_OUT
+
+ attrs = [
+ pq.PGresAttDesc(b"an_int", 0, 0, 0, 23, 0, 0),
+ pq.PGresAttDesc(b"a_num", 0, 0, 0, 1700, 0, 0),
+ pq.PGresAttDesc(b"a_bin_text", 0, 0, 1, 25, 0, 0),
+ ]
+ res.set_attributes(attrs)
+ assert res.nfields == 3
+
+ assert res.fname(0) == b"an_int"
+ assert res.fname(1) == b"a_num"
+ assert res.fname(2) == b"a_bin_text"
+
+ assert res.fformat(0) == 0
+ assert res.fformat(1) == 0
+ assert res.fformat(2) == 1
+
+ assert res.ftype(0) == 23
+ assert res.ftype(1) == 1700
+ assert res.ftype(2) == 25
+
+ with pytest.raises(psycopg.OperationalError):
+ res.set_attributes(attrs)
diff --git a/tests/pq/test_pgconn.py b/tests/pq/test_pgconn.py
new file mode 100644
index 0000000..0566151
--- /dev/null
+++ b/tests/pq/test_pgconn.py
@@ -0,0 +1,585 @@
+import os
+import sys
+import ctypes
+import logging
+import weakref
+from select import select
+
+import pytest
+
+import psycopg
+from psycopg import pq
+import psycopg.generators
+
+from ..utils import gc_collect
+
+
+def test_connectdb(dsn):
+ conn = pq.PGconn.connect(dsn.encode())
+ assert conn.status == pq.ConnStatus.OK, conn.error_message
+
+
+def test_connectdb_error():
+ conn = pq.PGconn.connect(b"dbname=psycopg_test_not_for_real")
+ assert conn.status == pq.ConnStatus.BAD
+
+
+@pytest.mark.parametrize("baddsn", [None, 42])
+def test_connectdb_badtype(baddsn):
+ with pytest.raises(TypeError):
+ pq.PGconn.connect(baddsn)
+
+
+def test_connect_async(dsn):
+ conn = pq.PGconn.connect_start(dsn.encode())
+ conn.nonblocking = 1
+ while True:
+ assert conn.status != pq.ConnStatus.BAD
+ rv = conn.connect_poll()
+ if rv == pq.PollingStatus.OK:
+ break
+ elif rv == pq.PollingStatus.READING:
+ select([conn.socket], [], [])
+ elif rv == pq.PollingStatus.WRITING:
+ select([], [conn.socket], [])
+ else:
+ assert False, rv
+
+ assert conn.status == pq.ConnStatus.OK
+
+ conn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ conn.connect_poll()
+
+
+@pytest.mark.crdb("skip", reason="connects to any db name")
+def test_connect_async_bad(dsn):
+ parsed_dsn = {e.keyword: e.val for e in pq.Conninfo.parse(dsn.encode()) if e.val}
+ parsed_dsn[b"dbname"] = b"psycopg_test_not_for_real"
+ dsn = b" ".join(b"%s='%s'" % item for item in parsed_dsn.items())
+ conn = pq.PGconn.connect_start(dsn)
+ while True:
+ assert conn.status != pq.ConnStatus.BAD, conn.error_message
+ rv = conn.connect_poll()
+ if rv == pq.PollingStatus.FAILED:
+ break
+ elif rv == pq.PollingStatus.READING:
+ select([conn.socket], [], [])
+ elif rv == pq.PollingStatus.WRITING:
+ select([], [conn.socket], [])
+ else:
+ assert False, rv
+
+ assert conn.status == pq.ConnStatus.BAD
+
+
+def test_finish(pgconn):
+ assert pgconn.status == pq.ConnStatus.OK
+ pgconn.finish()
+ assert pgconn.status == pq.ConnStatus.BAD
+ pgconn.finish()
+ assert pgconn.status == pq.ConnStatus.BAD
+
+
+@pytest.mark.slow
+def test_weakref(dsn):
+ conn = pq.PGconn.connect(dsn.encode())
+ w = weakref.ref(conn)
+ conn.finish()
+ del conn
+ gc_collect()
+ assert w() is None
+
+
+@pytest.mark.skipif(
+ sys.platform == "win32"
+ and os.environ.get("CI") == "true"
+ and pq.__impl__ != "python",
+ reason="can't figure out how to make ctypes run, don't care",
+)
+def test_pgconn_ptr(pgconn, libpq):
+ assert isinstance(pgconn.pgconn_ptr, int)
+
+ f = libpq.PQserverVersion
+ f.argtypes = [ctypes.c_void_p]
+ f.restype = ctypes.c_int
+ ver = f(pgconn.pgconn_ptr)
+ assert ver == pgconn.server_version
+
+ pgconn.finish()
+ assert pgconn.pgconn_ptr is None
+
+
+def test_info(dsn, pgconn):
+ info = pgconn.info
+ assert len(info) > 20
+ dbname = [d for d in info if d.keyword == b"dbname"][0]
+ assert dbname.envvar == b"PGDATABASE"
+ assert dbname.label == b"Database-Name"
+ assert dbname.dispchar == b""
+ assert dbname.dispsize == 20
+
+ parsed = pq.Conninfo.parse(dsn.encode())
+ # take the name and the user either from params or from env vars
+ name = [
+ o.val or os.environ.get(o.envvar.decode(), "").encode()
+ for o in parsed
+ if o.keyword == b"dbname" and o.envvar
+ ][0]
+ user = [
+ o.val or os.environ.get(o.envvar.decode(), "").encode()
+ for o in parsed
+ if o.keyword == b"user" and o.envvar
+ ][0]
+ assert dbname.val == (name or user)
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.info
+
+
+@pytest.mark.crdb_skip("pg_terminate_backend")
+def test_reset(pgconn):
+ assert pgconn.status == pq.ConnStatus.OK
+ pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())")
+ assert pgconn.status == pq.ConnStatus.BAD
+ pgconn.reset()
+ assert pgconn.status == pq.ConnStatus.OK
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.reset()
+
+ assert pgconn.status == pq.ConnStatus.BAD
+
+
+@pytest.mark.crdb_skip("pg_terminate_backend")
+def test_reset_async(pgconn):
+ assert pgconn.status == pq.ConnStatus.OK
+ pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())")
+ assert pgconn.status == pq.ConnStatus.BAD
+ pgconn.reset_start()
+ while True:
+ rv = pgconn.reset_poll()
+ if rv == pq.PollingStatus.READING:
+ select([pgconn.socket], [], [])
+ elif rv == pq.PollingStatus.WRITING:
+ select([], [pgconn.socket], [])
+ else:
+ break
+
+ assert rv == pq.PollingStatus.OK
+ assert pgconn.status == pq.ConnStatus.OK
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.reset_start()
+
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.reset_poll()
+
+
+def test_ping(dsn):
+ rv = pq.PGconn.ping(dsn.encode())
+ assert rv == pq.Ping.OK
+
+ rv = pq.PGconn.ping(b"port=9999")
+ assert rv == pq.Ping.NO_RESPONSE
+
+
+def test_db(pgconn):
+ name = [o.val for o in pgconn.info if o.keyword == b"dbname"][0]
+ assert pgconn.db == name
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.db
+
+
+def test_user(pgconn):
+ user = [o.val for o in pgconn.info if o.keyword == b"user"][0]
+ assert pgconn.user == user
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.user
+
+
+def test_password(pgconn):
+ # not in info
+ assert isinstance(pgconn.password, bytes)
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.password
+
+
+def test_host(pgconn):
+ # might be not in info
+ assert isinstance(pgconn.host, bytes)
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.host
+
+
+@pytest.mark.libpq(">= 12")
+def test_hostaddr(pgconn):
+ # not in info
+ assert isinstance(pgconn.hostaddr, bytes), pgconn.hostaddr
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.hostaddr
+
+
+@pytest.mark.libpq("< 12")
+def test_hostaddr_missing(pgconn):
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.hostaddr
+
+
+def test_port(pgconn):
+ port = [o.val for o in pgconn.info if o.keyword == b"port"][0]
+ assert pgconn.port == port
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.port
+
+
+@pytest.mark.libpq("< 14")
+def test_tty(pgconn):
+ tty = [o.val for o in pgconn.info if o.keyword == b"tty"][0]
+ assert pgconn.tty == tty
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.tty
+
+
+@pytest.mark.libpq(">= 14")
+def test_tty_noop(pgconn):
+ assert not any(o.val for o in pgconn.info if o.keyword == b"tty")
+ assert pgconn.tty == b""
+
+
+def test_transaction_status(pgconn):
+ assert pgconn.transaction_status == pq.TransactionStatus.IDLE
+ pgconn.exec_(b"begin")
+ assert pgconn.transaction_status == pq.TransactionStatus.INTRANS
+ pgconn.send_query(b"select 1")
+ assert pgconn.transaction_status == pq.TransactionStatus.ACTIVE
+ psycopg.waiting.wait(psycopg.generators.execute(pgconn), pgconn.socket)
+ assert pgconn.transaction_status == pq.TransactionStatus.INTRANS
+ pgconn.finish()
+ assert pgconn.transaction_status == pq.TransactionStatus.UNKNOWN
+
+
+def test_parameter_status(dsn, monkeypatch):
+ monkeypatch.setenv("PGAPPNAME", "psycopg tests")
+ pgconn = pq.PGconn.connect(dsn.encode())
+ assert pgconn.parameter_status(b"application_name") == b"psycopg tests"
+ assert pgconn.parameter_status(b"wat") is None
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.parameter_status(b"application_name")
+
+
+@pytest.mark.crdb_skip("encoding")
+def test_encoding(pgconn):
+ res = pgconn.exec_(b"set client_encoding to latin1")
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ assert pgconn.parameter_status(b"client_encoding") == b"LATIN1"
+
+ res = pgconn.exec_(b"set client_encoding to 'utf-8'")
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ assert pgconn.parameter_status(b"client_encoding") == b"UTF8"
+
+ res = pgconn.exec_(b"set client_encoding to wat")
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ assert pgconn.parameter_status(b"client_encoding") == b"UTF8"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.parameter_status(b"client_encoding")
+
+
+def test_protocol_version(pgconn):
+ assert pgconn.protocol_version == 3
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.protocol_version
+
+
+def test_server_version(pgconn):
+ assert pgconn.server_version >= 90400
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.server_version
+
+
+def test_socket(pgconn):
+ socket = pgconn.socket
+ assert socket > 0
+ pgconn.exec_(f"select pg_terminate_backend({pgconn.backend_pid})".encode())
+ # TODO: on my box it raises OperationalError as it should. Not on Travis,
+ # so let's see if at least an ok value comes out of it.
+ try:
+ assert pgconn.socket == socket
+ except psycopg.OperationalError:
+ pass
+
+
+def test_error_message(pgconn):
+ assert pgconn.error_message == b""
+ res = pgconn.exec_(b"wat")
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ msg = pgconn.error_message
+ assert b"wat" in msg
+ pgconn.finish()
+ assert b"NULL" in pgconn.error_message # TODO: i10n?
+
+
+def test_backend_pid(pgconn):
+ assert isinstance(pgconn.backend_pid, int)
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.backend_pid
+
+
+def test_needs_password(pgconn):
+ # assume connection worked so an eventually needed password wasn't missing
+ assert pgconn.needs_password is False
+ pgconn.finish()
+ pgconn.needs_password
+
+
+def test_used_password(pgconn, dsn, monkeypatch):
+ assert isinstance(pgconn.used_password, bool)
+
+ # Assume that if a password was passed then it was needed.
+ # Note that the server may still need a password passed via pgpass
+ # so it may be that has_password is false but still a password was
+ # requested by the server and passed by libpq.
+ info = pq.Conninfo.parse(dsn.encode())
+ has_password = (
+ "PGPASSWORD" in os.environ
+ or [i for i in info if i.keyword == b"password"][0].val is not None
+ )
+ if has_password:
+ assert pgconn.used_password
+
+ pgconn.finish()
+ pgconn.used_password
+
+
+def test_ssl_in_use(pgconn):
+ assert isinstance(pgconn.ssl_in_use, bool)
+
+ # If connecting via socket then ssl is not in use
+ if pgconn.host.startswith(b"/"):
+ assert not pgconn.ssl_in_use
+ else:
+ sslmode = [i.val for i in pgconn.info if i.keyword == b"sslmode"][0]
+ if sslmode not in (b"disable", b"allow", b"prefer"):
+ assert pgconn.ssl_in_use
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.ssl_in_use
+
+
+def test_set_single_row_mode(pgconn):
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.set_single_row_mode()
+
+ pgconn.send_query(b"select 1")
+ pgconn.set_single_row_mode()
+
+
+def test_cancel(pgconn):
+ cancel = pgconn.get_cancel()
+ cancel.cancel()
+ cancel.cancel()
+ pgconn.finish()
+ cancel.cancel()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.get_cancel()
+
+
+def test_cancel_free(pgconn):
+ cancel = pgconn.get_cancel()
+ cancel.free()
+ with pytest.raises(psycopg.OperationalError):
+ cancel.cancel()
+ cancel.free()
+
+
+@pytest.mark.crdb_skip("notify")
+def test_notify(pgconn):
+ assert pgconn.notifies() is None
+
+ pgconn.exec_(b"listen foo")
+ pgconn.exec_(b"listen bar")
+ pgconn.exec_(b"notify foo, '1'")
+ pgconn.exec_(b"notify bar, '2'")
+ pgconn.exec_(b"notify foo, '3'")
+
+ n = pgconn.notifies()
+ assert n.relname == b"foo"
+ assert n.be_pid == pgconn.backend_pid
+ assert n.extra == b"1"
+
+ n = pgconn.notifies()
+ assert n.relname == b"bar"
+ assert n.be_pid == pgconn.backend_pid
+ assert n.extra == b"2"
+
+ n = pgconn.notifies()
+ assert n.relname == b"foo"
+ assert n.be_pid == pgconn.backend_pid
+ assert n.extra == b"3"
+
+ assert pgconn.notifies() is None
+
+
+@pytest.mark.crdb_skip("do")
+def test_notice_nohandler(pgconn):
+ pgconn.exec_(b"set client_min_messages to notice")
+ res = pgconn.exec_(
+ b"do $$begin raise notice 'hello notice'; end$$ language plpgsql"
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK
+
+
+@pytest.mark.crdb_skip("do")
+def test_notice(pgconn):
+ msgs = []
+
+ def callback(res):
+ assert res.status == pq.ExecStatus.NONFATAL_ERROR
+ msgs.append(res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY))
+
+ pgconn.exec_(b"set client_min_messages to notice")
+ pgconn.notice_handler = callback
+ res = pgconn.exec_(
+ b"do $$begin raise notice 'hello notice'; end$$ language plpgsql"
+ )
+
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ assert msgs and msgs[0] == b"hello notice"
+
+
+@pytest.mark.crdb_skip("do")
+def test_notice_error(pgconn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ def callback(res):
+ raise Exception("hello error")
+
+ pgconn.exec_(b"set client_min_messages to notice")
+ pgconn.notice_handler = callback
+ res = pgconn.exec_(
+ b"do $$begin raise notice 'hello notice'; end$$ language plpgsql"
+ )
+
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ assert len(caplog.records) == 1
+ rec = caplog.records[0]
+ assert rec.levelno == logging.ERROR
+ assert "hello error" in rec.message
+
+
+@pytest.mark.libpq("< 14")
+@pytest.mark.skipif("sys.platform != 'linux'")
+def test_trace_pre14(pgconn, tmp_path):
+ tracef = tmp_path / "trace"
+ with tracef.open("w") as f:
+ pgconn.trace(f.fileno())
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.set_trace_flags(0)
+ pgconn.exec_(b"select 1")
+ pgconn.untrace()
+ pgconn.exec_(b"select 2")
+ traces = tracef.read_text()
+ assert "select 1" in traces
+ assert "select 2" not in traces
+
+
+@pytest.mark.libpq(">= 14")
+@pytest.mark.skipif("sys.platform != 'linux'")
+def test_trace(pgconn, tmp_path):
+ tracef = tmp_path / "trace"
+ with tracef.open("w") as f:
+ pgconn.trace(f.fileno())
+ pgconn.set_trace_flags(pq.Trace.SUPPRESS_TIMESTAMPS | pq.Trace.REGRESS_MODE)
+ pgconn.exec_(b"select 1::int4 as foo")
+ pgconn.untrace()
+ pgconn.exec_(b"select 2::int4 as foo")
+ traces = [line.split("\t") for line in tracef.read_text().splitlines()]
+ assert traces == [
+ ["F", "26", "Query", ' "select 1::int4 as foo"'],
+ ["B", "28", "RowDescription", ' 1 "foo" NNNN 0 NNNN 4 -1 0'],
+ ["B", "11", "DataRow", " 1 1 '1'"],
+ ["B", "13", "CommandComplete", ' "SELECT 1"'],
+ ["B", "5", "ReadyForQuery", " I"],
+ ]
+
+
+@pytest.mark.skipif("sys.platform == 'linux'")
+def test_trace_nonlinux(pgconn):
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.trace(1)
+
+
+@pytest.mark.libpq(">= 10")
+def test_encrypt_password(pgconn):
+ enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"md5")
+ assert enc == b"md594839d658c28a357126f105b9cb14cfc"
+
+
+@pytest.mark.libpq(">= 10")
+def test_encrypt_password_scram(pgconn):
+ enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"scram-sha-256")
+ assert enc.startswith(b"SCRAM-SHA-256$")
+
+
+@pytest.mark.libpq(">= 10")
+def test_encrypt_password_badalgo(pgconn):
+ with pytest.raises(psycopg.OperationalError):
+ assert pgconn.encrypt_password(b"psycopg2", b"ashesh", b"wat")
+
+
+@pytest.mark.libpq(">= 10")
+@pytest.mark.crdb_skip("password_encryption")
+def test_encrypt_password_query(pgconn):
+ res = pgconn.exec_(b"set password_encryption to 'md5'")
+ assert res.status == pq.ExecStatus.COMMAND_OK, pgconn.error_message.decode()
+ enc = pgconn.encrypt_password(b"psycopg2", b"ashesh")
+ assert enc == b"md594839d658c28a357126f105b9cb14cfc"
+
+ res = pgconn.exec_(b"set password_encryption to 'scram-sha-256'")
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ enc = pgconn.encrypt_password(b"psycopg2", b"ashesh")
+ assert enc.startswith(b"SCRAM-SHA-256$")
+
+
+@pytest.mark.libpq(">= 10")
+def test_encrypt_password_closed(pgconn):
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ assert pgconn.encrypt_password(b"psycopg2", b"ashesh")
+
+
+@pytest.mark.libpq("< 10")
+def test_encrypt_password_not_supported(pgconn):
+ # it might even be supported, but not worth the lifetime
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.encrypt_password(b"psycopg2", b"ashesh", b"md5")
+
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.encrypt_password(b"psycopg2", b"ashesh", b"scram-sha-256")
+
+
+def test_str(pgconn, dsn):
+ assert "[IDLE]" in str(pgconn)
+ pgconn.finish()
+ assert "[BAD]" in str(pgconn)
+
+ pgconn2 = pq.PGconn.connect_start(dsn.encode())
+ assert "[" in str(pgconn2)
+ assert "[IDLE]" not in str(pgconn2)
diff --git a/tests/pq/test_pgresult.py b/tests/pq/test_pgresult.py
new file mode 100644
index 0000000..3ad818d
--- /dev/null
+++ b/tests/pq/test_pgresult.py
@@ -0,0 +1,207 @@
+import ctypes
+import pytest
+
+from psycopg import pq
+
+
+@pytest.mark.parametrize(
+ "command, status",
+ [
+ (b"", "EMPTY_QUERY"),
+ (b"select 1", "TUPLES_OK"),
+ (b"set timezone to utc", "COMMAND_OK"),
+ (b"wat", "FATAL_ERROR"),
+ ],
+)
+def test_status(pgconn, command, status):
+ res = pgconn.exec_(command)
+ assert res.status == getattr(pq.ExecStatus, status)
+ assert status in repr(res)
+
+
+def test_clear(pgconn):
+ res = pgconn.exec_(b"select 1")
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ res.clear()
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ res.clear()
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+
+
+def test_pgresult_ptr(pgconn, libpq):
+ res = pgconn.exec_(b"select 1")
+ assert isinstance(res.pgresult_ptr, int)
+
+ f = libpq.PQcmdStatus
+ f.argtypes = [ctypes.c_void_p]
+ f.restype = ctypes.c_char_p
+ assert f(res.pgresult_ptr) == b"SELECT 1"
+
+ res.clear()
+ assert res.pgresult_ptr is None
+
+
+def test_error_message(pgconn):
+ res = pgconn.exec_(b"select 1")
+ assert res.error_message == b""
+ res = pgconn.exec_(b"select wat")
+ assert b"wat" in res.error_message
+ res.clear()
+ assert res.error_message == b""
+
+
+def test_error_field(pgconn):
+ res = pgconn.exec_(b"select wat")
+ # https://github.com/cockroachdb/cockroach/issues/81794
+ assert (
+ res.error_field(pq.DiagnosticField.SEVERITY_NONLOCALIZED)
+ or res.error_field(pq.DiagnosticField.SEVERITY)
+ ) == b"ERROR"
+ assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"42703"
+ assert b"wat" in res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY)
+ res.clear()
+ assert res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY) is None
+
+
+@pytest.mark.parametrize("n", range(4))
+def test_ntuples(pgconn, n):
+ res = pgconn.exec_params(b"select generate_series(1, $1)", [str(n).encode("ascii")])
+ assert res.ntuples == n
+ res.clear()
+ assert res.ntuples == 0
+
+
+def test_nfields(pgconn):
+ res = pgconn.exec_(b"select wat")
+ assert res.nfields == 0
+ res = pgconn.exec_(b"select 1, 2, 3")
+ assert res.nfields == 3
+ res.clear()
+ assert res.nfields == 0
+
+
+def test_fname(pgconn):
+ res = pgconn.exec_(b'select 1 as foo, 2 as "BAR"')
+ assert res.fname(0) == b"foo"
+ assert res.fname(1) == b"BAR"
+ assert res.fname(2) is None
+ assert res.fname(-1) is None
+ res.clear()
+ assert res.fname(0) is None
+
+
+@pytest.mark.crdb("skip", reason="ftable")
+def test_ftable_and_col(pgconn):
+ res = pgconn.exec_(
+ b"""
+ drop table if exists t1, t2;
+ create table t1 as select 1 as f1;
+ create table t2 as select 2 as f2, 3 as f3;
+ """
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ res = pgconn.exec_(
+ b"select f1, f3, 't1'::regclass::oid, 't2'::regclass::oid from t1, t2"
+ )
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+
+ assert res.ftable(0) == int(res.get_value(0, 2).decode("ascii"))
+ assert res.ftable(1) == int(res.get_value(0, 3).decode("ascii"))
+ assert res.ftablecol(0) == 1
+ assert res.ftablecol(1) == 2
+ res.clear()
+ assert res.ftable(0) == 0
+ assert res.ftablecol(0) == 0
+
+
+@pytest.mark.parametrize("fmt", (0, 1))
+def test_fformat(pgconn, fmt):
+ res = pgconn.exec_params(b"select 1", [], result_format=fmt)
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+ assert res.fformat(0) == fmt
+ assert res.binary_tuples == fmt
+ res.clear()
+ assert res.fformat(0) == 0
+ assert res.binary_tuples == 0
+
+
+def test_ftype(pgconn):
+ res = pgconn.exec_(b"select 1::int4, 1::numeric, 1::text")
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+ assert res.ftype(0) == 23
+ assert res.ftype(1) == 1700
+ assert res.ftype(2) == 25
+ res.clear()
+ assert res.ftype(0) == 0
+
+
+def test_fmod(pgconn):
+ res = pgconn.exec_(b"select 1::int, 1::numeric(10), 1::numeric(10,2)")
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+ assert res.fmod(0) == -1
+ assert res.fmod(1) == 0xA0004
+ assert res.fmod(2) == 0xA0006
+ res.clear()
+ assert res.fmod(0) == 0
+
+
+def test_fsize(pgconn):
+ res = pgconn.exec_(b"select 1::int4, 1::bigint, 1::text")
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+ assert res.fsize(0) == 4
+ assert res.fsize(1) == 8
+ assert res.fsize(2) == -1
+ res.clear()
+ assert res.fsize(0) == 0
+
+
+def test_get_value(pgconn):
+ res = pgconn.exec_(b"select 'a', '', NULL")
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
+ assert res.get_value(0, 0) == b"a"
+ assert res.get_value(0, 1) == b""
+ assert res.get_value(0, 2) is None
+ res.clear()
+ assert res.get_value(0, 0) is None
+
+
+def test_nparams_types(pgconn):
+ res = pgconn.prepare(b"", b"select $1::int4, $2::text")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ res = pgconn.describe_prepared(b"")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ assert res.nparams == 2
+ assert res.param_type(0) == 23
+ assert res.param_type(1) == 25
+
+ res.clear()
+ assert res.nparams == 0
+ assert res.param_type(0) == 0
+
+
+def test_command_status(pgconn):
+ res = pgconn.exec_(b"select 1")
+ assert res.command_status == b"SELECT 1"
+ res = pgconn.exec_(b"set timezone to utc")
+ assert res.command_status == b"SET"
+ res.clear()
+ assert res.command_status is None
+
+
+def test_command_tuples(pgconn):
+ res = pgconn.exec_(b"set timezone to utf8")
+ assert res.command_tuples is None
+ res = pgconn.exec_(b"select * from generate_series(1, 10)")
+ assert res.command_tuples == 10
+ res.clear()
+ assert res.command_tuples is None
+
+
+def test_oid_value(pgconn):
+ res = pgconn.exec_(b"select 1")
+ assert res.oid_value == 0
+ res.clear()
+ assert res.oid_value == 0
diff --git a/tests/pq/test_pipeline.py b/tests/pq/test_pipeline.py
new file mode 100644
index 0000000..00cd54a
--- /dev/null
+++ b/tests/pq/test_pipeline.py
@@ -0,0 +1,161 @@
+import pytest
+
+import psycopg
+from psycopg import pq
+
+
+@pytest.mark.libpq("< 14")
+def test_old_libpq(pgconn):
+ assert pgconn.pipeline_status == 0
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.enter_pipeline_mode()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.exit_pipeline_mode()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.pipeline_sync()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.send_flush_request()
+
+
+@pytest.mark.libpq(">= 14")
+def test_work_in_progress(pgconn):
+ assert not pgconn.nonblocking
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"])
+ with pytest.raises(psycopg.OperationalError, match="cannot exit pipeline mode"):
+ pgconn.exit_pipeline_mode()
+
+
+@pytest.mark.libpq(">= 14")
+def test_multi_pipelines(pgconn):
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"], param_types=[25])
+ pgconn.pipeline_sync()
+ pgconn.send_query_params(b"select $1", [b"2"], param_types=[25])
+ pgconn.pipeline_sync()
+
+ # result from first query
+ result1 = pgconn.get_result()
+ assert result1 is not None
+ assert result1.status == pq.ExecStatus.TUPLES_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # first sync result
+ sync_result = pgconn.get_result()
+ assert sync_result is not None
+ assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # result from second query
+ result2 = pgconn.get_result()
+ assert result2 is not None
+ assert result2.status == pq.ExecStatus.TUPLES_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # second sync result
+ sync_result = pgconn.get_result()
+ assert sync_result is not None
+ assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # pipeline still ON
+ assert pgconn.pipeline_status == pq.PipelineStatus.ON
+
+ pgconn.exit_pipeline_mode()
+
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+
+ assert result1.get_value(0, 0) == b"1"
+ assert result2.get_value(0, 0) == b"2"
+
+
+@pytest.mark.libpq(">= 14")
+def test_flush_request(pgconn):
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"], param_types=[25])
+ pgconn.send_flush_request()
+ r = pgconn.get_result()
+ assert r.status == pq.ExecStatus.TUPLES_OK
+ assert r.get_value(0, 0) == b"1"
+ pgconn.exit_pipeline_mode()
+
+
+@pytest.fixture
+def table(pgconn):
+ tablename = "pipeline"
+ pgconn.exec_(f"create table {tablename} (s text)".encode("ascii"))
+ yield tablename
+ pgconn.exec_(f"drop table if exists {tablename}".encode("ascii"))
+
+
+@pytest.mark.libpq(">= 14")
+def test_pipeline_abort(pgconn, table):
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"])
+ pgconn.send_query_params(b"select no_such_function($1)", [b"1"])
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"2"])
+ pgconn.pipeline_sync()
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"3"])
+ pgconn.pipeline_sync()
+
+ # result from first INSERT
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.COMMAND_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # error result from second query (SELECT)
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.FATAL_ERROR
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # pipeline should be aborted, due to previous error
+ assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED
+
+ # result from second INSERT, aborted due to previous error
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_ABORTED
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # pipeline is still aborted
+ assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED
+
+ # sync result
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # aborted flag is clear, pipeline is on again
+ assert pgconn.pipeline_status == pq.PipelineStatus.ON
+
+ # result from the third INSERT
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.COMMAND_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # second sync result
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ pgconn.exit_pipeline_mode()
diff --git a/tests/pq/test_pq.py b/tests/pq/test_pq.py
new file mode 100644
index 0000000..076c3b6
--- /dev/null
+++ b/tests/pq/test_pq.py
@@ -0,0 +1,57 @@
+import os
+
+import pytest
+
+import psycopg
+from psycopg import pq
+
+from ..utils import check_libpq_version
+
+
+def test_version():
+ rv = pq.version()
+ assert rv > 90500
+ assert rv < 200000 # you are good for a while
+
+
+def test_build_version():
+ assert pq.__build_version__ and pq.__build_version__ >= 70400
+
+
+@pytest.mark.skipif("not os.environ.get('PSYCOPG_TEST_WANT_LIBPQ_BUILD')")
+def test_want_built_version():
+ want = os.environ["PSYCOPG_TEST_WANT_LIBPQ_BUILD"]
+ got = pq.__build_version__
+ assert not check_libpq_version(got, want)
+
+
+@pytest.mark.skipif("not os.environ.get('PSYCOPG_TEST_WANT_LIBPQ_IMPORT')")
+def test_want_import_version():
+ want = os.environ["PSYCOPG_TEST_WANT_LIBPQ_IMPORT"]
+ got = pq.version()
+ assert not check_libpq_version(got, want)
+
+
+# Note: These tests are here because test_pipeline.py tests are all skipped
+# when pipeline mode is not supported.
+
+
+@pytest.mark.libpq(">= 14")
+def test_pipeline_supported(conn):
+ assert psycopg.Pipeline.is_supported()
+ assert psycopg.AsyncPipeline.is_supported()
+
+ with conn.pipeline():
+ pass
+
+
+@pytest.mark.libpq("< 14")
+def test_pipeline_not_supported(conn):
+ assert not psycopg.Pipeline.is_supported()
+ assert not psycopg.AsyncPipeline.is_supported()
+
+ with pytest.raises(psycopg.NotSupportedError) as exc:
+ with conn.pipeline():
+ pass
+
+ assert "too old" in str(exc.value)