summaryrefslogtreecommitdiffstats
path: root/tests/pq/test_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/pq/test_pipeline.py')
-rw-r--r--tests/pq/test_pipeline.py161
1 files changed, 161 insertions, 0 deletions
diff --git a/tests/pq/test_pipeline.py b/tests/pq/test_pipeline.py
new file mode 100644
index 0000000..00cd54a
--- /dev/null
+++ b/tests/pq/test_pipeline.py
@@ -0,0 +1,161 @@
+import pytest
+
+import psycopg
+from psycopg import pq
+
+
+@pytest.mark.libpq("< 14")
+def test_old_libpq(pgconn):
+ assert pgconn.pipeline_status == 0
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.enter_pipeline_mode()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.exit_pipeline_mode()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.pipeline_sync()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.send_flush_request()
+
+
+@pytest.mark.libpq(">= 14")
+def test_work_in_progress(pgconn):
+ assert not pgconn.nonblocking
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"])
+ with pytest.raises(psycopg.OperationalError, match="cannot exit pipeline mode"):
+ pgconn.exit_pipeline_mode()
+
+
+@pytest.mark.libpq(">= 14")
+def test_multi_pipelines(pgconn):
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"], param_types=[25])
+ pgconn.pipeline_sync()
+ pgconn.send_query_params(b"select $1", [b"2"], param_types=[25])
+ pgconn.pipeline_sync()
+
+ # result from first query
+ result1 = pgconn.get_result()
+ assert result1 is not None
+ assert result1.status == pq.ExecStatus.TUPLES_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # first sync result
+ sync_result = pgconn.get_result()
+ assert sync_result is not None
+ assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # result from second query
+ result2 = pgconn.get_result()
+ assert result2 is not None
+ assert result2.status == pq.ExecStatus.TUPLES_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # second sync result
+ sync_result = pgconn.get_result()
+ assert sync_result is not None
+ assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # pipeline still ON
+ assert pgconn.pipeline_status == pq.PipelineStatus.ON
+
+ pgconn.exit_pipeline_mode()
+
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+
+ assert result1.get_value(0, 0) == b"1"
+ assert result2.get_value(0, 0) == b"2"
+
+
+@pytest.mark.libpq(">= 14")
+def test_flush_request(pgconn):
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"], param_types=[25])
+ pgconn.send_flush_request()
+ r = pgconn.get_result()
+ assert r.status == pq.ExecStatus.TUPLES_OK
+ assert r.get_value(0, 0) == b"1"
+ pgconn.exit_pipeline_mode()
+
+
+@pytest.fixture
+def table(pgconn):
+ tablename = "pipeline"
+ pgconn.exec_(f"create table {tablename} (s text)".encode("ascii"))
+ yield tablename
+ pgconn.exec_(f"drop table if exists {tablename}".encode("ascii"))
+
+
+@pytest.mark.libpq(">= 14")
+def test_pipeline_abort(pgconn, table):
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"])
+ pgconn.send_query_params(b"select no_such_function($1)", [b"1"])
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"2"])
+ pgconn.pipeline_sync()
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"3"])
+ pgconn.pipeline_sync()
+
+ # result from first INSERT
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.COMMAND_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # error result from second query (SELECT)
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.FATAL_ERROR
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # pipeline should be aborted, due to previous error
+ assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED
+
+ # result from second INSERT, aborted due to previous error
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_ABORTED
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # pipeline is still aborted
+ assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED
+
+ # sync result
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # aborted flag is clear, pipeline is on again
+ assert pgconn.pipeline_status == pq.PipelineStatus.ON
+
+ # result from the third INSERT
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.COMMAND_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # second sync result
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ pgconn.exit_pipeline_mode()