summaryrefslogtreecommitdiffstats
path: root/tests/test_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_pipeline.py')
-rw-r--r--tests/test_pipeline.py577
1 files changed, 577 insertions, 0 deletions
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
new file mode 100644
index 0000000..56fe598
--- /dev/null
+++ b/tests/test_pipeline.py
@@ -0,0 +1,577 @@
+import logging
+import concurrent.futures
+from typing import Any
+from operator import attrgetter
+from itertools import groupby
+
+import pytest
+
+import psycopg
+from psycopg import pq
+from psycopg import errors as e
+
+pytestmark = [
+ pytest.mark.pipeline,
+ pytest.mark.skipif("not psycopg.Pipeline.is_supported()"),
+]
+
+pipeline_aborted = pytest.mark.flakey("the server might get in pipeline aborted")
+
+
+def test_repr(conn):
+ with conn.pipeline() as p:
+ assert "psycopg.Pipeline" in repr(p)
+ assert "[IDLE, pipeline=ON]" in repr(p)
+
+ conn.close()
+ assert "[BAD]" in repr(p)
+
+
+def test_connection_closed(conn):
+ conn.close()
+ with pytest.raises(e.OperationalError):
+ with conn.pipeline():
+ pass
+
+
+def test_pipeline_status(conn: psycopg.Connection[Any]) -> None:
+ assert conn._pipeline is None
+ with conn.pipeline() as p:
+ assert conn._pipeline is p
+ assert p.status == pq.PipelineStatus.ON
+ assert p.status == pq.PipelineStatus.OFF
+ assert not conn._pipeline
+
+
+def test_pipeline_reenter(conn: psycopg.Connection[Any]) -> None:
+ with conn.pipeline() as p1:
+ with conn.pipeline() as p2:
+ assert p2 is p1
+ assert p1.status == pq.PipelineStatus.ON
+ assert p2 is p1
+ assert p2.status == pq.PipelineStatus.ON
+ assert conn._pipeline is None
+ assert p1.status == pq.PipelineStatus.OFF
+
+
+def test_pipeline_broken_conn_exit(conn: psycopg.Connection[Any]) -> None:
+ with pytest.raises(e.OperationalError):
+ with conn.pipeline():
+ conn.execute("select 1")
+ conn.close()
+ closed = True
+
+ assert closed
+
+
+def test_pipeline_exit_error_noclobber(conn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+ with pytest.raises(ZeroDivisionError):
+ with conn.pipeline():
+ conn.close()
+ 1 / 0
+
+ assert len(caplog.records) == 1
+
+
+def test_pipeline_exit_error_noclobber_nested(conn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+ with pytest.raises(ZeroDivisionError):
+ with conn.pipeline():
+ with conn.pipeline():
+ conn.close()
+ 1 / 0
+
+ assert len(caplog.records) == 2
+
+
+def test_pipeline_exit_sync_trace(conn, trace):
+ t = trace.trace(conn)
+ with conn.pipeline():
+ pass
+ conn.close()
+ assert len([i for i in t if i.type == "Sync"]) == 1
+
+
+def test_pipeline_nested_sync_trace(conn, trace):
+ t = trace.trace(conn)
+ with conn.pipeline():
+ with conn.pipeline():
+ pass
+ conn.close()
+ assert len([i for i in t if i.type == "Sync"]) == 2
+
+
+def test_cursor_stream(conn):
+ with conn.pipeline(), conn.cursor() as cur:
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.stream("select 1").__next__()
+
+
+def test_server_cursor(conn):
+ with conn.cursor(name="pipeline") as cur, conn.pipeline():
+ with pytest.raises(psycopg.NotSupportedError):
+ cur.execute("select 1")
+
+
+def test_cannot_insert_multiple_commands(conn):
+ with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
+ with conn.pipeline():
+ conn.execute("select 1; select 2")
+
+
+def test_copy(conn):
+ with conn.pipeline():
+ cur = conn.cursor()
+ with pytest.raises(e.NotSupportedError):
+ with cur.copy("copy (select 1) to stdout"):
+ pass
+
+
+def test_pipeline_processed_at_exit(conn):
+ with conn.cursor() as cur:
+ with conn.pipeline() as p:
+ cur.execute("select 1")
+
+ assert len(p.result_queue) == 1
+
+ assert cur.fetchone() == (1,)
+
+
+def test_pipeline_errors_processed_at_exit(conn):
+ conn.autocommit = True
+ with pytest.raises(e.UndefinedTable):
+ with conn.pipeline():
+ conn.execute("select * from nosuchtable")
+ conn.execute("create table voila ()")
+ cur = conn.execute(
+ "select count(*) from pg_tables where tablename = %s", ("voila",)
+ )
+ (count,) = cur.fetchone()
+ assert count == 0
+
+
+def test_pipeline(conn):
+ with conn.pipeline() as p:
+ c1 = conn.cursor()
+ c2 = conn.cursor()
+ c1.execute("select 1")
+ c2.execute("select 2")
+
+ assert len(p.result_queue) == 2
+
+ (r1,) = c1.fetchone()
+ assert r1 == 1
+
+ (r2,) = c2.fetchone()
+ assert r2 == 2
+
+
+def test_autocommit(conn):
+ conn.autocommit = True
+ with conn.pipeline(), conn.cursor() as c:
+ c.execute("select 1")
+
+ (r,) = c.fetchone()
+ assert r == 1
+
+
+def test_pipeline_aborted(conn):
+ conn.autocommit = True
+ with conn.pipeline() as p:
+ c1 = conn.execute("select 1")
+ with pytest.raises(e.UndefinedTable):
+ conn.execute("select * from doesnotexist").fetchone()
+ with pytest.raises(e.PipelineAborted):
+ conn.execute("select 'aborted'").fetchone()
+ # Sync restore the connection in usable state.
+ p.sync()
+ c2 = conn.execute("select 2")
+
+ (r,) = c1.fetchone()
+ assert r == 1
+
+ (r,) = c2.fetchone()
+ assert r == 2
+
+
+def test_pipeline_commit_aborted(conn):
+ with pytest.raises((e.UndefinedColumn, e.OperationalError)):
+ with conn.pipeline():
+ conn.execute("select error")
+ conn.execute("create table voila ()")
+ conn.commit()
+
+
+def test_sync_syncs_results(conn):
+ with conn.pipeline() as p:
+ cur = conn.execute("select 1")
+ assert cur.statusmessage is None
+ p.sync()
+ assert cur.statusmessage == "SELECT 1"
+
+
+def test_sync_syncs_errors(conn):
+ conn.autocommit = True
+ with conn.pipeline() as p:
+ conn.execute("select 1 from nosuchtable")
+ with pytest.raises(e.UndefinedTable):
+ p.sync()
+
+
+@pipeline_aborted
+def test_errors_raised_on_commit(conn):
+ with conn.pipeline():
+ conn.execute("select 1 from nosuchtable")
+ with pytest.raises(e.UndefinedTable):
+ conn.commit()
+ conn.rollback()
+ cur1 = conn.execute("select 1")
+ cur2 = conn.execute("select 2")
+
+ assert cur1.fetchone() == (1,)
+ assert cur2.fetchone() == (2,)
+
+
+@pytest.mark.flakey("assert fails randomly in CI blocking release")
+def test_errors_raised_on_transaction_exit(conn):
+ here = False
+ with conn.pipeline():
+ with pytest.raises(e.UndefinedTable):
+ with conn.transaction():
+ conn.execute("select 1 from nosuchtable")
+ here = True
+ cur1 = conn.execute("select 1")
+ assert here
+ cur2 = conn.execute("select 2")
+
+ assert cur1.fetchone() == (1,)
+ assert cur2.fetchone() == (2,)
+
+
+@pytest.mark.flakey("assert fails randomly in CI blocking release")
+def test_errors_raised_on_nested_transaction_exit(conn):
+ here = False
+ with conn.pipeline():
+ with conn.transaction():
+ with pytest.raises(e.UndefinedTable):
+ with conn.transaction():
+ conn.execute("select 1 from nosuchtable")
+ here = True
+ cur1 = conn.execute("select 1")
+ assert here
+ cur2 = conn.execute("select 2")
+
+ assert cur1.fetchone() == (1,)
+ assert cur2.fetchone() == (2,)
+
+
+def test_implicit_transaction(conn):
+ conn.autocommit = True
+ with conn.pipeline():
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
+ conn.execute("select 'before'")
+ # Transaction is ACTIVE because previous command is not completed
+ # since we have not fetched its results.
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.ACTIVE
+ # Upon entering the nested pipeline through "with transaction():", a
+ # sync() is emitted to restore the transaction state to IDLE, as
+ # expected to emit a BEGIN.
+ with conn.transaction():
+ conn.execute("select 'tx'")
+ cur = conn.execute("select 'after'")
+ assert cur.fetchone() == ("after",)
+
+
+@pytest.mark.crdb_skip("deferrable")
+def test_error_on_commit(conn):
+ conn.execute(
+ """
+ drop table if exists selfref;
+ create table selfref (
+ x serial primary key,
+ y int references selfref (x) deferrable initially deferred)
+ """
+ )
+ conn.commit()
+
+ with conn.pipeline():
+ conn.execute("insert into selfref (y) values (-1)")
+ with pytest.raises(e.ForeignKeyViolation):
+ conn.commit()
+ cur1 = conn.execute("select 1")
+ cur2 = conn.execute("select 2")
+
+ assert cur1.fetchone() == (1,)
+ assert cur2.fetchone() == (2,)
+
+
+def test_fetch_no_result(conn):
+ with conn.pipeline():
+ cur = conn.cursor()
+ with pytest.raises(e.ProgrammingError):
+ cur.fetchone()
+
+
+def test_executemany(conn):
+ conn.autocommit = True
+ conn.execute("drop table if exists execmanypipeline")
+ conn.execute(
+ "create unlogged table execmanypipeline ("
+ " id serial primary key, num integer)"
+ )
+ with conn.pipeline(), conn.cursor() as cur:
+ cur.executemany(
+ "insert into execmanypipeline(num) values (%s) returning num",
+ [(10,), (20,)],
+ returning=True,
+ )
+ assert cur.rowcount == 2
+ assert cur.fetchone() == (10,)
+ assert cur.nextset()
+ assert cur.fetchone() == (20,)
+ assert cur.nextset() is None
+
+
+def test_executemany_no_returning(conn):
+ conn.autocommit = True
+ conn.execute("drop table if exists execmanypipelinenoreturning")
+ conn.execute(
+ "create unlogged table execmanypipelinenoreturning ("
+ " id serial primary key, num integer)"
+ )
+ with conn.pipeline(), conn.cursor() as cur:
+ cur.executemany(
+ "insert into execmanypipelinenoreturning(num) values (%s)",
+ [(10,), (20,)],
+ returning=False,
+ )
+ with pytest.raises(e.ProgrammingError, match="no result available"):
+ cur.fetchone()
+ assert cur.nextset() is None
+ with pytest.raises(e.ProgrammingError, match="no result available"):
+ cur.fetchone()
+ assert cur.nextset() is None
+
+
+@pytest.mark.crdb("skip", reason="temp tables")
+def test_executemany_trace(conn, trace):
+ conn.autocommit = True
+ cur = conn.cursor()
+ cur.execute("create temp table trace (id int)")
+ t = trace.trace(conn)
+ with conn.pipeline():
+ cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+ cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+ conn.close()
+ items = list(t)
+ assert items[-1].type == "Terminate"
+ del items[-1]
+ roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+ assert roundtrips == ["F", "B"]
+ assert len([i for i in items if i.type == "Sync"]) == 1
+
+
+@pytest.mark.crdb("skip", reason="temp tables")
+def test_executemany_trace_returning(conn, trace):
+ conn.autocommit = True
+ cur = conn.cursor()
+ cur.execute("create temp table trace (id int)")
+ t = trace.trace(conn)
+ with conn.pipeline():
+ cur.executemany(
+ "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+ )
+ cur.executemany(
+ "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+ )
+ conn.close()
+ items = list(t)
+ assert items[-1].type == "Terminate"
+ del items[-1]
+ roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+ assert roundtrips == ["F", "B"] * 3
+ assert items[-2].direction == "F" # last 2 items are F B
+ assert len([i for i in items if i.type == "Sync"]) == 1
+
+
+def test_prepared(conn):
+ conn.autocommit = True
+ with conn.pipeline():
+ c1 = conn.execute("select %s::int", [10], prepare=True)
+ c2 = conn.execute(
+ "select count(*) from pg_prepared_statements where name != ''"
+ )
+
+ (r,) = c1.fetchone()
+ assert r == 10
+
+ (r,) = c2.fetchone()
+ assert r == 1
+
+
+def test_auto_prepare(conn):
+ conn.autocommit = True
+ conn.prepared_threshold = 5
+ with conn.pipeline():
+ cursors = [
+ conn.execute("select count(*) from pg_prepared_statements where name != ''")
+ for i in range(10)
+ ]
+
+ assert len(conn._prepared._names) == 1
+
+ res = [c.fetchone()[0] for c in cursors]
+ assert res == [0] * 5 + [1] * 5
+
+
+def test_transaction(conn):
+ notices = []
+ conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+
+ with conn.pipeline():
+ with conn.transaction():
+ cur = conn.execute("select 'tx'")
+
+ (r,) = cur.fetchone()
+ assert r == "tx"
+
+ with conn.transaction():
+ cur = conn.execute("select 'rb'")
+ raise psycopg.Rollback()
+
+ (r,) = cur.fetchone()
+ assert r == "rb"
+
+ assert not notices
+
+
+def test_transaction_nested(conn):
+ with conn.pipeline():
+ with conn.transaction():
+ outer = conn.execute("select 'outer'")
+ with pytest.raises(ZeroDivisionError):
+ with conn.transaction():
+ inner = conn.execute("select 'inner'")
+ 1 / 0
+
+ (r,) = outer.fetchone()
+ assert r == "outer"
+ (r,) = inner.fetchone()
+ assert r == "inner"
+
+
+def test_transaction_nested_no_statement(conn):
+ with conn.pipeline():
+ with conn.transaction():
+ with conn.transaction():
+ cur = conn.execute("select 1")
+
+ (r,) = cur.fetchone()
+ assert r == 1
+
+
+def test_outer_transaction(conn):
+ with conn.transaction():
+ conn.execute("drop table if exists outertx")
+ with conn.transaction():
+ with conn.pipeline():
+ conn.execute("create table outertx as (select 1)")
+ cur = conn.execute("select * from outertx")
+ (r,) = cur.fetchone()
+ assert r == 1
+ cur = conn.execute("select count(*) from pg_tables where tablename = 'outertx'")
+ assert cur.fetchone()[0] == 1
+
+
+def test_outer_transaction_error(conn):
+ with conn.transaction():
+ with pytest.raises((e.UndefinedColumn, e.OperationalError)):
+ with conn.pipeline():
+ conn.execute("select error")
+ conn.execute("create table voila ()")
+
+
+def test_rollback_explicit(conn):
+ conn.autocommit = True
+ with conn.pipeline():
+ with pytest.raises(e.DivisionByZero):
+ cur = conn.execute("select 1 / %s", [0])
+ cur.fetchone()
+ conn.rollback()
+ conn.execute("select 1")
+
+
+def test_rollback_transaction(conn):
+ conn.autocommit = True
+ with pytest.raises(e.DivisionByZero):
+ with conn.pipeline():
+ with conn.transaction():
+ cur = conn.execute("select 1 / %s", [0])
+ cur.fetchone()
+ conn.execute("select 1")
+
+
+def test_message_0x33(conn):
+ # https://github.com/psycopg/psycopg/issues/314
+ notices = []
+ conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+
+ conn.autocommit = True
+ with conn.pipeline():
+ cur = conn.execute("select 'test'")
+ assert cur.fetchone() == ("test",)
+
+ assert not notices
+
+
+def test_transaction_state_implicit_begin(conn, trace):
+ # Regression test to ensure that the transaction state is correct after
+ # the implicit BEGIN statement (in non-autocommit mode).
+ notices = []
+ conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+ t = trace.trace(conn)
+ with conn.pipeline():
+ conn.execute("select 'x'").fetchone()
+ conn.execute("select 'y'")
+ assert not notices
+ assert [
+ e.content[0] for e in t if e.type == "Parse" and b"BEGIN" in e.content[0]
+ ] == [b' "" "BEGIN" 0']
+
+
+def test_concurrency(conn):
+ with conn.transaction():
+ conn.execute("drop table if exists pipeline_concurrency")
+ conn.execute("drop table if exists accessed")
+ with conn.transaction():
+ conn.execute(
+ "create unlogged table pipeline_concurrency ("
+ " id serial primary key,"
+ " value integer"
+ ")"
+ )
+ conn.execute("create unlogged table accessed as (select now() as value)")
+
+ def update(value):
+ cur = conn.execute(
+ "insert into pipeline_concurrency(value) values (%s) returning value",
+ (value,),
+ )
+ conn.execute("update accessed set value = now()")
+ return cur
+
+ conn.autocommit = True
+
+ (before,) = conn.execute("select value from accessed").fetchone()
+
+ values = range(1, 10)
+ with conn.pipeline():
+ with concurrent.futures.ThreadPoolExecutor() as e:
+ cursors = e.map(update, values, timeout=len(values))
+ assert sum(cur.fetchone()[0] for cur in cursors) == sum(values)
+
+ (s,) = conn.execute("select sum(value) from pipeline_concurrency").fetchone()
+ assert s == sum(values)
+ (after,) = conn.execute("select value from accessed").fetchone()
+ assert after > before