summaryrefslogtreecommitdiffstats
path: root/tests/pq/test_async.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 17:41:08 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 17:41:08 +0000
commit506ed8899b3a97e512be3fd6d44d5b11463bf9bf (patch)
tree808913770c5e6935d3714058c2a066c57b4632ec /tests/pq/test_async.py
parentInitial commit. (diff)
downloadpsycopg3-upstream.tar.xz
psycopg3-upstream.zip
Adding upstream version 3.1.7.upstream/3.1.7upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/pq/test_async.py')
-rw-r--r--tests/pq/test_async.py210
1 files changed, 210 insertions, 0 deletions
diff --git a/tests/pq/test_async.py b/tests/pq/test_async.py
new file mode 100644
index 0000000..2c3de98
--- /dev/null
+++ b/tests/pq/test_async.py
@@ -0,0 +1,210 @@
+from select import select
+
+import pytest
+
+import psycopg
+from psycopg import pq
+from psycopg.generators import execute
+
+
+def execute_wait(pgconn):
+ return psycopg.waiting.wait(execute(pgconn), pgconn.socket)
+
+
+def test_send_query(pgconn):
+ # This test shows how to process an async query in all its glory
+ pgconn.nonblocking = 1
+
+ # Long query to make sure we have to wait on send
+ pgconn.send_query(
+ b"/* %s */ select 'x' as f from pg_sleep(0.01); select 1 as foo;"
+ % (b"x" * 1_000_000)
+ )
+
+ # send loop
+ waited_on_send = 0
+ while True:
+ f = pgconn.flush()
+ if f == 0:
+ break
+
+ waited_on_send += 1
+
+ rl, wl, xl = select([pgconn.socket], [pgconn.socket], [])
+ assert not (rl and wl)
+ if wl:
+ continue # call flush again()
+ if rl:
+ pgconn.consume_input()
+ continue
+
+ # TODO: this check is not reliable, it fails on travis sometimes
+ # assert waited_on_send
+
+ # read loop
+ results = []
+ while True:
+ pgconn.consume_input()
+ if pgconn.is_busy():
+ select([pgconn.socket], [], [])
+ continue
+ res = pgconn.get_result()
+ if res is None:
+ break
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ results.append(res)
+
+ assert len(results) == 2
+ assert results[0].nfields == 1
+ assert results[0].fname(0) == b"f"
+ assert results[0].get_value(0, 0) == b"x"
+ assert results[1].nfields == 1
+ assert results[1].fname(0) == b"foo"
+ assert results[1].get_value(0, 0) == b"1"
+
+
+def test_send_query_compact_test(pgconn):
+ # Like the above test but use psycopg facilities for compactness
+ pgconn.send_query(
+ b"/* %s */ select 'x' as f from pg_sleep(0.01); select 1 as foo;"
+ % (b"x" * 1_000_000)
+ )
+ results = execute_wait(pgconn)
+
+ assert len(results) == 2
+ assert results[0].nfields == 1
+ assert results[0].fname(0) == b"f"
+ assert results[0].get_value(0, 0) == b"x"
+ assert results[1].nfields == 1
+ assert results[1].fname(0) == b"foo"
+ assert results[1].get_value(0, 0) == b"1"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_query(b"select 1")
+
+
+def test_single_row_mode(pgconn):
+ pgconn.send_query(b"select generate_series(1,2)")
+ pgconn.set_single_row_mode()
+
+ results = execute_wait(pgconn)
+ assert len(results) == 3
+
+ res = results[0]
+ assert res.status == pq.ExecStatus.SINGLE_TUPLE
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"1"
+
+ res = results[1]
+ assert res.status == pq.ExecStatus.SINGLE_TUPLE
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"2"
+
+ res = results[2]
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.ntuples == 0
+
+
+def test_send_query_params(pgconn):
+ pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"8"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_query_params(b"select $1", [b"1"])
+
+
+def test_send_prepare(pgconn):
+ pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+ (res,) = execute_wait(pgconn)
+ assert res.get_value(0, 0) == b"8"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+
+
+def test_send_prepare_types(pgconn):
+ pgconn.send_prepare(b"prep", b"select $1 + $2", [23, 23])
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+ (res,) = execute_wait(pgconn)
+ assert res.get_value(0, 0) == b"8"
+
+
+def test_send_prepared_binary_in(pgconn):
+ val = b"foo\00bar"
+ pgconn.send_prepare(b"", b"select length($1::bytea), length($2::bytea)")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"", [val, val], param_formats=[0, 1])
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"3"
+ assert res.get_value(0, 1) == b"7"
+
+ with pytest.raises(ValueError):
+ pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1])
+
+
+@pytest.mark.parametrize("fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")])
+def test_send_prepared_binary_out(pgconn, fmt, out):
+ val = b"foo\00bar"
+ pgconn.send_prepare(b"", b"select $1::bytea")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"", [val], param_formats=[1], result_format=fmt)
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == out
+
+
+def test_send_describe_prepared(pgconn):
+ pgconn.send_prepare(b"prep", b"select $1::int8 + $2::int8 as fld")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_describe_prepared(b"prep")
+ (res,) = execute_wait(pgconn)
+ assert res.nfields == 1
+ assert res.ntuples == 0
+ assert res.fname(0) == b"fld"
+ assert res.ftype(0) == 20
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_describe_prepared(b"prep")
+
+
+@pytest.mark.crdb_skip("server-side cursor")
+def test_send_describe_portal(pgconn):
+ res = pgconn.exec_(
+ b"""
+ begin;
+ declare cur cursor for select * from generate_series(1,10) foo;
+ """
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_describe_portal(b"cur")
+ (res,) = execute_wait(pgconn)
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ assert res.nfields == 1
+ assert res.fname(0) == b"foo"
+
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.send_describe_portal(b"cur")