summaryrefslogtreecommitdiffstats
path: root/tests/test_transaction.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transaction.py')
-rw-r--r--tests/test_transaction.py796
1 files changed, 796 insertions, 0 deletions
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
new file mode 100644
index 0000000..9391e00
--- /dev/null
+++ b/tests/test_transaction.py
@@ -0,0 +1,796 @@
+import sys
+import logging
+from threading import Thread, Event
+
+import pytest
+
+import psycopg
+from psycopg import Rollback
+from psycopg import errors as e
+
+# TODOCRDB: is this the expected behaviour?
+crdb_skip_external_observer = pytest.mark.crdb(
+ "skip", reason="deadlock on observer connection"
+)
+
+
+@pytest.fixture
+def conn(conn, pipeline):
+ return conn
+
+
+@pytest.fixture(autouse=True)
+def create_test_table(svcconn):
+ """Creates a table called 'test_table' for use in tests."""
+ cur = svcconn.cursor()
+ cur.execute("drop table if exists test_table")
+ cur.execute("create table test_table (id text primary key)")
+ yield
+ cur.execute("drop table test_table")
+
+
+def insert_row(conn, value):
+ sql = "INSERT INTO test_table VALUES (%s)"
+ if isinstance(conn, psycopg.Connection):
+ conn.cursor().execute(sql, (value,))
+ else:
+
+ async def f():
+ cur = conn.cursor()
+ await cur.execute(sql, (value,))
+
+ return f()
+
+
+def inserted(conn):
+ """Return the values inserted in the test table."""
+ sql = "SELECT * FROM test_table"
+ if isinstance(conn, psycopg.Connection):
+ rows = conn.cursor().execute(sql).fetchall()
+ return set(v for (v,) in rows)
+ else:
+
+ async def f():
+ cur = conn.cursor()
+ await cur.execute(sql)
+ rows = await cur.fetchall()
+ return set(v for (v,) in rows)
+
+ return f()
+
+
+def in_transaction(conn):
+ if conn.pgconn.transaction_status == conn.TransactionStatus.IDLE:
+ return False
+ elif conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS:
+ return True
+ else:
+ assert False, conn.pgconn.transaction_status
+
+
+def get_exc_info(exc):
+ """Return the exc info for an exception or a success if exc is None"""
+ if not exc:
+ return (None,) * 3
+ try:
+ raise exc
+ except exc:
+ return sys.exc_info()
+
+
+class ExpectedException(Exception):
+ pass
+
+
+def test_basic(conn, pipeline):
+ """Basic use of transaction() to BEGIN and COMMIT a transaction."""
+ assert not in_transaction(conn)
+ with conn.transaction():
+ if pipeline:
+ pipeline.sync()
+ assert in_transaction(conn)
+ assert not in_transaction(conn)
+
+
+def test_exposes_associated_connection(conn):
+ """Transaction exposes its connection as a read-only property."""
+ with conn.transaction() as tx:
+ assert tx.connection is conn
+ with pytest.raises(AttributeError):
+ tx.connection = conn
+
+
+def test_exposes_savepoint_name(conn):
+ """Transaction exposes its savepoint name as a read-only property."""
+ with conn.transaction(savepoint_name="foo") as tx:
+ assert tx.savepoint_name == "foo"
+ with pytest.raises(AttributeError):
+ tx.savepoint_name = "bar"
+
+
+def test_cant_reenter(conn):
+ with conn.transaction() as tx:
+ pass
+
+ with pytest.raises(TypeError):
+ with tx:
+ pass
+
+
+def test_begins_on_enter(conn, pipeline):
+ """Transaction does not begin until __enter__() is called."""
+ tx = conn.transaction()
+ assert not in_transaction(conn)
+ with tx:
+ if pipeline:
+ pipeline.sync()
+ assert in_transaction(conn)
+ assert not in_transaction(conn)
+
+
+def test_commit_on_successful_exit(conn):
+ """Changes are committed on successful exit from the `with` block."""
+ with conn.transaction():
+ insert_row(conn, "foo")
+
+ assert not in_transaction(conn)
+ assert inserted(conn) == {"foo"}
+
+
+def test_rollback_on_exception_exit(conn):
+ """Changes are rolled back if an exception escapes the `with` block."""
+ with pytest.raises(ExpectedException):
+ with conn.transaction():
+ insert_row(conn, "foo")
+ raise ExpectedException("This discards the insert")
+
+ assert not in_transaction(conn)
+ assert not inserted(conn)
+
+
+@pytest.mark.crdb_skip("pg_terminate_backend")
+def test_context_inerror_rollback_no_clobber(conn_cls, conn, pipeline, dsn, caplog):
+ if pipeline:
+ # Only 'conn' is possibly in pipeline mode, but the transaction and
+ # checks are on 'conn2'.
+ pytest.skip("not applicable")
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ with pytest.raises(ZeroDivisionError):
+ with conn_cls.connect(dsn) as conn2:
+ with conn2.transaction():
+ conn2.execute("select 1")
+ conn.execute(
+ "select pg_terminate_backend(%s::int)",
+ [conn2.pgconn.backend_pid],
+ )
+ 1 / 0
+
+ assert len(caplog.records) == 1
+ rec = caplog.records[0]
+ assert rec.levelno == logging.WARNING
+ assert "in rollback" in rec.message
+
+
+@pytest.mark.crdb_skip("copy")
+def test_context_active_rollback_no_clobber(conn_cls, dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ conn = conn_cls.connect(dsn)
+ try:
+ with pytest.raises(ZeroDivisionError):
+ with conn.transaction():
+ conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout")
+ status = conn.info.transaction_status
+ assert status == conn.TransactionStatus.ACTIVE
+ 1 / 0
+
+ assert len(caplog.records) == 1
+ rec = caplog.records[0]
+ assert rec.levelno == logging.WARNING
+ assert "in rollback" in rec.message
+ finally:
+ conn.close()
+
+
+def test_interaction_dbapi_transaction(conn):
+ insert_row(conn, "foo")
+
+ with conn.transaction():
+ insert_row(conn, "bar")
+ raise Rollback
+
+ with conn.transaction():
+ insert_row(conn, "baz")
+
+ assert in_transaction(conn)
+ conn.commit()
+ assert inserted(conn) == {"foo", "baz"}
+
+
+def test_prohibits_use_of_commit_rollback_autocommit(conn):
+ """
+ Within a Transaction block, it is forbidden to touch commit, rollback,
+ or the autocommit setting on the connection, as this would interfere
+ with the transaction scope being managed by the Transaction block.
+ """
+ conn.autocommit = False
+ conn.commit()
+ conn.rollback()
+
+ with conn.transaction():
+ with pytest.raises(e.ProgrammingError):
+ conn.autocommit = False
+ with pytest.raises(e.ProgrammingError):
+ conn.commit()
+ with pytest.raises(e.ProgrammingError):
+ conn.rollback()
+
+ conn.autocommit = False
+ conn.commit()
+ conn.rollback()
+
+
+@pytest.mark.parametrize("autocommit", [False, True])
+def test_preserves_autocommit(conn, autocommit):
+ """
+ Connection.autocommit is unchanged both during and after Transaction block.
+ """
+ conn.autocommit = autocommit
+ with conn.transaction():
+ assert conn.autocommit is autocommit
+ assert conn.autocommit is autocommit
+
+
+def test_autocommit_off_but_no_tx_started_successful_exit(conn, svcconn):
+ """
+ Scenario:
+ * Connection has autocommit off but no transaction has been initiated
+ before entering the Transaction context
+ * Code exits Transaction context successfully
+
+ Outcome:
+ * Changes made within Transaction context are committed
+ """
+ conn.autocommit = False
+ assert not in_transaction(conn)
+ with conn.transaction():
+ insert_row(conn, "new")
+ assert not in_transaction(conn)
+
+ # Changes committed
+ assert inserted(conn) == {"new"}
+ assert inserted(svcconn) == {"new"}
+
+
+def test_autocommit_off_but_no_tx_started_exception_exit(conn, svcconn):
+ """
+ Scenario:
+ * Connection has autocommit off but no transaction has been initiated
+ before entering the Transaction context
+ * Code exits Transaction context with an exception
+
+ Outcome:
+ * Changes made within Transaction context are discarded
+ """
+ conn.autocommit = False
+ assert not in_transaction(conn)
+ with pytest.raises(ExpectedException):
+ with conn.transaction():
+ insert_row(conn, "new")
+ raise ExpectedException()
+ assert not in_transaction(conn)
+
+ # Changes discarded
+ assert not inserted(conn)
+ assert not inserted(svcconn)
+
+
+@crdb_skip_external_observer
+def test_autocommit_off_and_tx_in_progress_successful_exit(conn, pipeline, svcconn):
+ """
+ Scenario:
+ * Connection has autocommit off but and a transaction is already in
+ progress before entering the Transaction context
+ * Code exits Transaction context successfully
+
+ Outcome:
+ * Changes made within Transaction context are left intact
+ * Outer transaction is left running, and no changes are visible to an
+ outside observer from another connection.
+ """
+ conn.autocommit = False
+ insert_row(conn, "prior")
+ if pipeline:
+ pipeline.sync()
+ assert in_transaction(conn)
+ with conn.transaction():
+ insert_row(conn, "new")
+ assert in_transaction(conn)
+ assert inserted(conn) == {"prior", "new"}
+ # Nothing committed yet; changes not visible on another connection
+ assert not inserted(svcconn)
+
+
+@crdb_skip_external_observer
+def test_autocommit_off_and_tx_in_progress_exception_exit(conn, pipeline, svcconn):
+ """
+ Scenario:
+ * Connection has autocommit off but and a transaction is already in
+ progress before entering the Transaction context
+ * Code exits Transaction context with an exception
+
+ Outcome:
+ * Changes made before the Transaction context are left intact
+ * Changes made within Transaction context are discarded
+ * Outer transaction is left running, and no changes are visible to an
+ outside observer from another connection.
+ """
+ conn.autocommit = False
+ insert_row(conn, "prior")
+ if pipeline:
+ pipeline.sync()
+ assert in_transaction(conn)
+ with pytest.raises(ExpectedException):
+ with conn.transaction():
+ insert_row(conn, "new")
+ raise ExpectedException()
+ assert in_transaction(conn)
+ assert inserted(conn) == {"prior"}
+ # Nothing committed yet; changes not visible on another connection
+ assert not inserted(svcconn)
+
+
+def test_nested_all_changes_persisted_on_successful_exit(conn, svcconn):
+ """Changes from nested transaction contexts are all persisted on exit."""
+ with conn.transaction():
+ insert_row(conn, "outer-before")
+ with conn.transaction():
+ insert_row(conn, "inner")
+ insert_row(conn, "outer-after")
+ assert not in_transaction(conn)
+ assert inserted(conn) == {"outer-before", "inner", "outer-after"}
+ assert inserted(svcconn) == {"outer-before", "inner", "outer-after"}
+
+
+def test_nested_all_changes_discarded_on_outer_exception(conn, svcconn):
+ """
+ Changes from nested transaction contexts are discarded when an exception
+ raised in outer context escapes.
+ """
+ with pytest.raises(ExpectedException):
+ with conn.transaction():
+ insert_row(conn, "outer")
+ with conn.transaction():
+ insert_row(conn, "inner")
+ raise ExpectedException()
+ assert not in_transaction(conn)
+ assert not inserted(conn)
+ assert not inserted(svcconn)
+
+
+def test_nested_all_changes_discarded_on_inner_exception(conn, svcconn):
+ """
+ Changes from nested transaction contexts are discarded when an exception
+ raised in inner context escapes the outer context.
+ """
+ with pytest.raises(ExpectedException):
+ with conn.transaction():
+ insert_row(conn, "outer")
+ with conn.transaction():
+ insert_row(conn, "inner")
+ raise ExpectedException()
+ assert not in_transaction(conn)
+ assert not inserted(conn)
+ assert not inserted(svcconn)
+
+
+def test_nested_inner_scope_exception_handled_in_outer_scope(conn, svcconn):
+ """
+ An exception escaping the inner transaction context causes changes made
+ within that inner context to be discarded, but the error can then be
+ handled in the outer context, allowing changes made in the outer context
+ (both before, and after, the inner context) to be successfully committed.
+ """
+ with conn.transaction():
+ insert_row(conn, "outer-before")
+ with pytest.raises(ExpectedException):
+ with conn.transaction():
+ insert_row(conn, "inner")
+ raise ExpectedException()
+ insert_row(conn, "outer-after")
+ assert not in_transaction(conn)
+ assert inserted(conn) == {"outer-before", "outer-after"}
+ assert inserted(svcconn) == {"outer-before", "outer-after"}
+
+
+def test_nested_three_levels_successful_exit(conn, svcconn):
+ """Exercise management of more than one savepoint."""
+ with conn.transaction(): # BEGIN
+ insert_row(conn, "one")
+ with conn.transaction(): # SAVEPOINT s1
+ insert_row(conn, "two")
+ with conn.transaction(): # SAVEPOINT s2
+ insert_row(conn, "three")
+ assert not in_transaction(conn)
+ assert inserted(conn) == {"one", "two", "three"}
+ assert inserted(svcconn) == {"one", "two", "three"}
+
+
+def test_named_savepoint_escapes_savepoint_name(conn):
+ with conn.transaction("s-1"):
+ pass
+ with conn.transaction("s1; drop table students"):
+ pass
+
+
+def test_named_savepoints_successful_exit(conn, commands):
+ """
+ Entering a transaction context will do one of these these things:
+ 1. Begin an outer transaction (if one isn't already in progress)
+ 2. Begin an outer transaction and create a savepoint (if one is named)
+ 3. Create a savepoint (if a transaction is already in progress)
+ either using the name provided, or auto-generating a savepoint name.
+
+ ...and exiting the context successfully will "commit" the same.
+ """
+ # Case 1
+ # Using Transaction explicitly because conn.transaction() enters the contetx
+ assert not commands
+ with conn.transaction() as tx:
+ assert commands.popall() == ["BEGIN"]
+ assert not tx.savepoint_name
+ assert commands.popall() == ["COMMIT"]
+
+ # Case 1 (with a transaction already started)
+ conn.cursor().execute("select 1")
+ assert commands.popall() == ["BEGIN"]
+ with conn.transaction() as tx:
+ assert commands.popall() == ['SAVEPOINT "_pg3_1"']
+ assert tx.savepoint_name == "_pg3_1"
+ assert commands.popall() == ['RELEASE "_pg3_1"']
+ conn.rollback()
+ assert commands.popall() == ["ROLLBACK"]
+
+ # Case 2
+ with conn.transaction(savepoint_name="foo") as tx:
+ assert commands.popall() == ["BEGIN", 'SAVEPOINT "foo"']
+ assert tx.savepoint_name == "foo"
+ assert commands.popall() == ["COMMIT"]
+
+ # Case 3 (with savepoint name provided)
+ with conn.transaction():
+ assert commands.popall() == ["BEGIN"]
+ with conn.transaction(savepoint_name="bar") as tx:
+ assert commands.popall() == ['SAVEPOINT "bar"']
+ assert tx.savepoint_name == "bar"
+ assert commands.popall() == ['RELEASE "bar"']
+ assert commands.popall() == ["COMMIT"]
+
+ # Case 3 (with savepoint name auto-generated)
+ with conn.transaction():
+ assert commands.popall() == ["BEGIN"]
+ with conn.transaction() as tx:
+ assert commands.popall() == ['SAVEPOINT "_pg3_2"']
+ assert tx.savepoint_name == "_pg3_2"
+ assert commands.popall() == ['RELEASE "_pg3_2"']
+ assert commands.popall() == ["COMMIT"]
+
+
+def test_named_savepoints_exception_exit(conn, commands):
+ """
+ Same as the previous test but checks that when exiting the context with an
+ exception, whatever transaction and/or savepoint was started on enter will
+ be rolled-back as appropriate.
+ """
+ # Case 1
+ with pytest.raises(ExpectedException):
+ with conn.transaction() as tx:
+ assert commands.popall() == ["BEGIN"]
+ assert not tx.savepoint_name
+ raise ExpectedException
+ assert commands.popall() == ["ROLLBACK"]
+
+ # Case 2
+ with pytest.raises(ExpectedException):
+ with conn.transaction(savepoint_name="foo") as tx:
+ assert commands.popall() == ["BEGIN", 'SAVEPOINT "foo"']
+ assert tx.savepoint_name == "foo"
+ raise ExpectedException
+ assert commands.popall() == ["ROLLBACK"]
+
+ # Case 3 (with savepoint name provided)
+ with conn.transaction():
+ assert commands.popall() == ["BEGIN"]
+ with pytest.raises(ExpectedException):
+ with conn.transaction(savepoint_name="bar") as tx:
+ assert commands.popall() == ['SAVEPOINT "bar"']
+ assert tx.savepoint_name == "bar"
+ raise ExpectedException
+ assert commands.popall() == ['ROLLBACK TO "bar"', 'RELEASE "bar"']
+ assert commands.popall() == ["COMMIT"]
+
+ # Case 3 (with savepoint name auto-generated)
+ with conn.transaction():
+ assert commands.popall() == ["BEGIN"]
+ with pytest.raises(ExpectedException):
+ with conn.transaction() as tx:
+ assert commands.popall() == ['SAVEPOINT "_pg3_2"']
+ assert tx.savepoint_name == "_pg3_2"
+ raise ExpectedException
+ assert commands.popall() == [
+ 'ROLLBACK TO "_pg3_2"',
+ 'RELEASE "_pg3_2"',
+ ]
+ assert commands.popall() == ["COMMIT"]
+
+
+def test_named_savepoints_with_repeated_names_works(conn):
+ """
+ Using the same savepoint name repeatedly works correctly, but bypasses
+ some sanity checks.
+ """
+ # Works correctly if no inner transactions are rolled back
+ with conn.transaction(force_rollback=True):
+ with conn.transaction("sp"):
+ insert_row(conn, "tx1")
+ with conn.transaction("sp"):
+ insert_row(conn, "tx2")
+ with conn.transaction("sp"):
+ insert_row(conn, "tx3")
+ assert inserted(conn) == {"tx1", "tx2", "tx3"}
+
+ # Works correctly if one level of inner transaction is rolled back
+ with conn.transaction(force_rollback=True):
+ with conn.transaction("s1"):
+ insert_row(conn, "tx1")
+ with conn.transaction("s1", force_rollback=True):
+ insert_row(conn, "tx2")
+ with conn.transaction("s1"):
+ insert_row(conn, "tx3")
+ assert inserted(conn) == {"tx1"}
+ assert inserted(conn) == {"tx1"}
+
+ # Works correctly if multiple inner transactions are rolled back
+ # (This scenario mandates releasing savepoints after rolling back to them.)
+ with conn.transaction(force_rollback=True):
+ with conn.transaction("s1"):
+ insert_row(conn, "tx1")
+ with conn.transaction("s1") as tx2:
+ insert_row(conn, "tx2")
+ with conn.transaction("s1"):
+ insert_row(conn, "tx3")
+ raise Rollback(tx2)
+ assert inserted(conn) == {"tx1"}
+ assert inserted(conn) == {"tx1"}
+
+
+def test_force_rollback_successful_exit(conn, svcconn):
+ """
+ Transaction started with the force_rollback option enabled discards all
+ changes at the end of the context.
+ """
+ with conn.transaction(force_rollback=True):
+ insert_row(conn, "foo")
+ assert not inserted(conn)
+ assert not inserted(svcconn)
+
+
+def test_force_rollback_exception_exit(conn, svcconn):
+ """
+ Transaction started with the force_rollback option enabled discards all
+ changes at the end of the context.
+ """
+ with pytest.raises(ExpectedException):
+ with conn.transaction(force_rollback=True):
+ insert_row(conn, "foo")
+ raise ExpectedException()
+ assert not inserted(conn)
+ assert not inserted(svcconn)
+
+
+@crdb_skip_external_observer
+def test_explicit_rollback_discards_changes(conn, svcconn):
+ """
+ Raising a Rollback exception in the middle of a block exits the block and
+ discards all changes made within that block.
+
+ You can raise any of the following:
+ - Rollback (type)
+ - Rollback() (instance)
+ - Rollback(tx) (instance initialised with reference to the transaction)
+ All of these are equivalent.
+ """
+
+ def assert_no_rows():
+ assert not inserted(conn)
+ assert not inserted(svcconn)
+
+ with conn.transaction():
+ insert_row(conn, "foo")
+ raise Rollback
+ assert_no_rows()
+
+ with conn.transaction():
+ insert_row(conn, "foo")
+ raise Rollback()
+ assert_no_rows()
+
+ with conn.transaction() as tx:
+ insert_row(conn, "foo")
+ raise Rollback(tx)
+ assert_no_rows()
+
+
+@crdb_skip_external_observer
+def test_explicit_rollback_outer_tx_unaffected(conn, svcconn):
+ """
+ Raising a Rollback exception in the middle of a block does not impact an
+ enclosing transaction block.
+ """
+ with conn.transaction():
+ insert_row(conn, "before")
+ with conn.transaction():
+ insert_row(conn, "during")
+ raise Rollback
+ assert in_transaction(conn)
+ assert not inserted(svcconn)
+ insert_row(conn, "after")
+ assert inserted(conn) == {"before", "after"}
+ assert inserted(svcconn) == {"before", "after"}
+
+
+def test_explicit_rollback_of_outer_transaction(conn):
+ """
+ Raising a Rollback exception that references an outer transaction will
+ discard all changes from both inner and outer transaction blocks.
+ """
+ with conn.transaction() as outer_tx:
+ insert_row(conn, "outer")
+ with conn.transaction():
+ insert_row(conn, "inner")
+ raise Rollback(outer_tx)
+ assert False, "This line of code should be unreachable."
+ assert not inserted(conn)
+
+
+@crdb_skip_external_observer
+def test_explicit_rollback_of_enclosing_tx_outer_tx_unaffected(conn, svcconn):
+ """
+ Rolling-back an enclosing transaction does not impact an outer transaction.
+ """
+ with conn.transaction():
+ insert_row(conn, "outer-before")
+ with conn.transaction() as tx_enclosing:
+ insert_row(conn, "enclosing")
+ with conn.transaction():
+ insert_row(conn, "inner")
+ raise Rollback(tx_enclosing)
+ insert_row(conn, "outer-after")
+
+ assert inserted(conn) == {"outer-before", "outer-after"}
+ assert not inserted(svcconn) # Not yet committed
+ # Changes committed
+ assert inserted(svcconn) == {"outer-before", "outer-after"}
+
+
+def test_str(conn, pipeline):
+ with conn.transaction() as tx:
+ if pipeline:
+ assert "[INTRANS, pipeline=ON]" in str(tx)
+ else:
+ assert "[INTRANS]" in str(tx)
+ assert "(active)" in str(tx)
+ assert "'" not in str(tx)
+ with conn.transaction("wat") as tx2:
+ if pipeline:
+ assert "[INTRANS, pipeline=ON]" in str(tx2)
+ else:
+ assert "[INTRANS]" in str(tx2)
+ assert "'wat'" in str(tx2)
+
+ if pipeline:
+ assert "[IDLE, pipeline=ON]" in str(tx)
+ else:
+ assert "[IDLE]" in str(tx)
+ assert "(terminated)" in str(tx)
+
+ with pytest.raises(ZeroDivisionError):
+ with conn.transaction() as tx:
+ 1 / 0
+
+ assert "(terminated)" in str(tx)
+
+
+@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback])
+def test_out_of_order_exit(conn, exit_error):
+ conn.autocommit = True
+
+ t1 = conn.transaction()
+ t1.__enter__()
+
+ t2 = conn.transaction()
+ t2.__enter__()
+
+ with pytest.raises(e.ProgrammingError):
+ t1.__exit__(*get_exc_info(exit_error))
+
+ with pytest.raises(e.ProgrammingError):
+ t2.__exit__(*get_exc_info(exit_error))
+
+
+@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback])
+def test_out_of_order_implicit_begin(conn, exit_error):
+ conn.execute("select 1")
+
+ t1 = conn.transaction()
+ t1.__enter__()
+
+ t2 = conn.transaction()
+ t2.__enter__()
+
+ with pytest.raises(e.ProgrammingError):
+ t1.__exit__(*get_exc_info(exit_error))
+
+ with pytest.raises(e.ProgrammingError):
+ t2.__exit__(*get_exc_info(exit_error))
+
+
+@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback])
+def test_out_of_order_exit_same_name(conn, exit_error):
+ conn.autocommit = True
+
+ t1 = conn.transaction("save")
+ t1.__enter__()
+ t2 = conn.transaction("save")
+ t2.__enter__()
+
+ with pytest.raises(e.ProgrammingError):
+ t1.__exit__(*get_exc_info(exit_error))
+
+ with pytest.raises(e.ProgrammingError):
+ t2.__exit__(*get_exc_info(exit_error))
+
+
+@pytest.mark.parametrize("what", ["commit", "rollback", "error"])
+def test_concurrency(conn, what):
+ conn.autocommit = True
+
+ evs = [Event() for i in range(3)]
+
+ def worker(unlock, wait_on):
+ with pytest.raises(e.ProgrammingError) as ex:
+ with conn.transaction():
+ unlock.set()
+ wait_on.wait()
+ conn.execute("select 1")
+
+ if what == "error":
+ 1 / 0
+ elif what == "rollback":
+ raise Rollback()
+ else:
+ assert what == "commit"
+
+ if what == "error":
+ assert "transaction rollback" in str(ex.value)
+ assert isinstance(ex.value.__context__, ZeroDivisionError)
+ elif what == "rollback":
+ assert "transaction rollback" in str(ex.value)
+ assert isinstance(ex.value.__context__, Rollback)
+ else:
+ assert "transaction commit" in str(ex.value)
+
+ # Start a first transaction in a thread
+ t1 = Thread(target=worker, kwargs={"unlock": evs[0], "wait_on": evs[1]})
+ t1.start()
+ evs[0].wait()
+
+ # Start a nested transaction in a thread
+ t2 = Thread(target=worker, kwargs={"unlock": evs[1], "wait_on": evs[2]})
+ t2.start()
+
+ # Terminate the first transaction before the second does
+ t1.join()
+ evs[2].set()
+ t2.join()