diff options
Diffstat (limited to 'tests/pq/test_pipeline.py')
-rw-r--r-- | tests/pq/test_pipeline.py | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/tests/pq/test_pipeline.py b/tests/pq/test_pipeline.py new file mode 100644 index 0000000..00cd54a --- /dev/null +++ b/tests/pq/test_pipeline.py @@ -0,0 +1,161 @@ +import pytest + +import psycopg +from psycopg import pq + + +@pytest.mark.libpq("< 14") +def test_old_libpq(pgconn): + assert pgconn.pipeline_status == 0 + with pytest.raises(psycopg.NotSupportedError): + pgconn.enter_pipeline_mode() + with pytest.raises(psycopg.NotSupportedError): + pgconn.exit_pipeline_mode() + with pytest.raises(psycopg.NotSupportedError): + pgconn.pipeline_sync() + with pytest.raises(psycopg.NotSupportedError): + pgconn.send_flush_request() + + +@pytest.mark.libpq(">= 14") +def test_work_in_progress(pgconn): + assert not pgconn.nonblocking + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"]) + with pytest.raises(psycopg.OperationalError, match="cannot exit pipeline mode"): + pgconn.exit_pipeline_mode() + + +@pytest.mark.libpq(">= 14") +def test_multi_pipelines(pgconn): + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"], param_types=[25]) + pgconn.pipeline_sync() + pgconn.send_query_params(b"select $1", [b"2"], param_types=[25]) + pgconn.pipeline_sync() + + # result from first query + result1 = pgconn.get_result() + assert result1 is not None + assert result1.status == pq.ExecStatus.TUPLES_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # first sync result + sync_result = pgconn.get_result() + assert sync_result is not None + assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC + + # result from second query + result2 = pgconn.get_result() + assert result2 is not None + assert result2.status == pq.ExecStatus.TUPLES_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # second sync result + sync_result = pgconn.get_result() + assert sync_result is not None + assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC + + # pipeline still ON + assert pgconn.pipeline_status == pq.PipelineStatus.ON + + pgconn.exit_pipeline_mode() + + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + + assert result1.get_value(0, 0) == b"1" + assert result2.get_value(0, 0) == b"2" + + +@pytest.mark.libpq(">= 14") +def test_flush_request(pgconn): + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"], param_types=[25]) + pgconn.send_flush_request() + r = pgconn.get_result() + assert r.status == pq.ExecStatus.TUPLES_OK + assert r.get_value(0, 0) == b"1" + pgconn.exit_pipeline_mode() + + +@pytest.fixture +def table(pgconn): + tablename = "pipeline" + pgconn.exec_(f"create table {tablename} (s text)".encode("ascii")) + yield tablename + pgconn.exec_(f"drop table if exists {tablename}".encode("ascii")) + + +@pytest.mark.libpq(">= 14") +def test_pipeline_abort(pgconn, table): + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"]) + pgconn.send_query_params(b"select no_such_function($1)", [b"1"]) + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"2"]) + pgconn.pipeline_sync() + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"3"]) + pgconn.pipeline_sync() + + # result from first INSERT + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.COMMAND_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # error result from second query (SELECT) + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.FATAL_ERROR + + # NULL signals end of result + assert pgconn.get_result() is None + + # pipeline should be aborted, due to previous error + assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED + + # result from second INSERT, aborted due to previous error + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_ABORTED + + # NULL signals end of result + assert pgconn.get_result() is None + + # pipeline is still aborted + assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED + + # sync result + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_SYNC + + # aborted flag is clear, pipeline is on again + assert pgconn.pipeline_status == pq.PipelineStatus.ON + + # result from the third INSERT + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.COMMAND_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # second sync result + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_SYNC + + # NULL signals end of result + assert pgconn.get_result() is None + + pgconn.exit_pipeline_mode() |