summaryrefslogtreecommitdiffstats
path: root/tests/test_transaction_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transaction_async.py')
-rw-r--r--tests/test_transaction_async.py743
1 files changed, 743 insertions, 0 deletions
diff --git a/tests/test_transaction_async.py b/tests/test_transaction_async.py
new file mode 100644
index 0000000..55e1c9c
--- /dev/null
+++ b/tests/test_transaction_async.py
@@ -0,0 +1,743 @@
+import asyncio
+import logging
+
+import pytest
+
+from psycopg import Rollback
+from psycopg import errors as e
+from psycopg._compat import create_task
+
+from .test_transaction import in_transaction, insert_row, inserted, get_exc_info
+from .test_transaction import ExpectedException, crdb_skip_external_observer
+from .test_transaction import create_test_table # noqa # autouse fixture
+
+pytestmark = pytest.mark.asyncio
+
+
+@pytest.fixture
+async def aconn(aconn, apipeline):
+ return aconn
+
+
+async def test_basic(aconn, apipeline):
+ """Basic use of transaction() to BEGIN and COMMIT a transaction."""
+ assert not in_transaction(aconn)
+ async with aconn.transaction():
+ if apipeline:
+ await apipeline.sync()
+ assert in_transaction(aconn)
+ assert not in_transaction(aconn)
+
+
+async def test_exposes_associated_connection(aconn):
+ """Transaction exposes its connection as a read-only property."""
+ async with aconn.transaction() as tx:
+ assert tx.connection is aconn
+ with pytest.raises(AttributeError):
+ tx.connection = aconn
+
+
+async def test_exposes_savepoint_name(aconn):
+ """Transaction exposes its savepoint name as a read-only property."""
+ async with aconn.transaction(savepoint_name="foo") as tx:
+ assert tx.savepoint_name == "foo"
+ with pytest.raises(AttributeError):
+ tx.savepoint_name = "bar"
+
+
+async def test_cant_reenter(aconn):
+ async with aconn.transaction() as tx:
+ pass
+
+ with pytest.raises(TypeError):
+ async with tx:
+ pass
+
+
+async def test_begins_on_enter(aconn, apipeline):
+ """Transaction does not begin until __enter__() is called."""
+ tx = aconn.transaction()
+ assert not in_transaction(aconn)
+ async with tx:
+ if apipeline:
+ await apipeline.sync()
+ assert in_transaction(aconn)
+ assert not in_transaction(aconn)
+
+
+async def test_commit_on_successful_exit(aconn):
+ """Changes are committed on successful exit from the `with` block."""
+ async with aconn.transaction():
+ await insert_row(aconn, "foo")
+
+ assert not in_transaction(aconn)
+ assert await inserted(aconn) == {"foo"}
+
+
+async def test_rollback_on_exception_exit(aconn):
+ """Changes are rolled back if an exception escapes the `with` block."""
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction():
+ await insert_row(aconn, "foo")
+ raise ExpectedException("This discards the insert")
+
+ assert not in_transaction(aconn)
+ assert not await inserted(aconn)
+
+
+@pytest.mark.crdb_skip("pg_terminate_backend")
+async def test_context_inerror_rollback_no_clobber(
+ aconn_cls, aconn, apipeline, dsn, caplog
+):
+ if apipeline:
+ # Only 'aconn' is possibly in pipeline mode, but the transaction and
+ # checks are on 'conn2'.
+ pytest.skip("not applicable")
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ with pytest.raises(ZeroDivisionError):
+ async with await aconn_cls.connect(dsn) as conn2:
+ async with conn2.transaction():
+ await conn2.execute("select 1")
+ await aconn.execute(
+ "select pg_terminate_backend(%s::int)",
+ [conn2.pgconn.backend_pid],
+ )
+ 1 / 0
+
+ assert len(caplog.records) == 1
+ rec = caplog.records[0]
+ assert rec.levelno == logging.WARNING
+ assert "in rollback" in rec.message
+
+
+@pytest.mark.crdb_skip("copy")
+async def test_context_active_rollback_no_clobber(aconn_cls, dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ conn = await aconn_cls.connect(dsn)
+ try:
+ with pytest.raises(ZeroDivisionError):
+ async with conn.transaction():
+ conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout")
+ status = conn.info.transaction_status
+ assert status == conn.TransactionStatus.ACTIVE
+ 1 / 0
+
+ assert len(caplog.records) == 1
+ rec = caplog.records[0]
+ assert rec.levelno == logging.WARNING
+ assert "in rollback" in rec.message
+ finally:
+ await conn.close()
+
+
+async def test_interaction_dbapi_transaction(aconn):
+ await insert_row(aconn, "foo")
+
+ async with aconn.transaction():
+ await insert_row(aconn, "bar")
+ raise Rollback
+
+ async with aconn.transaction():
+ await insert_row(aconn, "baz")
+
+ assert in_transaction(aconn)
+ await aconn.commit()
+ assert await inserted(aconn) == {"foo", "baz"}
+
+
+async def test_prohibits_use_of_commit_rollback_autocommit(aconn):
+ """
+ Within a Transaction block, it is forbidden to touch commit, rollback,
+ or the autocommit setting on the connection, as this would interfere
+ with the transaction scope being managed by the Transaction block.
+ """
+ await aconn.set_autocommit(False)
+ await aconn.commit()
+ await aconn.rollback()
+
+ async with aconn.transaction():
+ with pytest.raises(e.ProgrammingError):
+ await aconn.set_autocommit(False)
+ with pytest.raises(e.ProgrammingError):
+ await aconn.commit()
+ with pytest.raises(e.ProgrammingError):
+ await aconn.rollback()
+
+ await aconn.set_autocommit(False)
+ await aconn.commit()
+ await aconn.rollback()
+
+
+@pytest.mark.parametrize("autocommit", [False, True])
+async def test_preserves_autocommit(aconn, autocommit):
+ """
+ Connection.autocommit is unchanged both during and after Transaction block.
+ """
+ await aconn.set_autocommit(autocommit)
+ async with aconn.transaction():
+ assert aconn.autocommit is autocommit
+ assert aconn.autocommit is autocommit
+
+
+async def test_autocommit_off_but_no_tx_started_successful_exit(aconn, svcconn):
+ """
+ Scenario:
+ * Connection has autocommit off but no transaction has been initiated
+ before entering the Transaction context
+ * Code exits Transaction context successfully
+
+ Outcome:
+ * Changes made within Transaction context are committed
+ """
+ await aconn.set_autocommit(False)
+ assert not in_transaction(aconn)
+ async with aconn.transaction():
+ await insert_row(aconn, "new")
+ assert not in_transaction(aconn)
+
+ # Changes committed
+ assert await inserted(aconn) == {"new"}
+ assert inserted(svcconn) == {"new"}
+
+
+async def test_autocommit_off_but_no_tx_started_exception_exit(aconn, svcconn):
+ """
+ Scenario:
+ * Connection has autocommit off but no transaction has been initiated
+ before entering the Transaction context
+ * Code exits Transaction context with an exception
+
+ Outcome:
+ * Changes made within Transaction context are discarded
+ """
+ await aconn.set_autocommit(False)
+ assert not in_transaction(aconn)
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction():
+ await insert_row(aconn, "new")
+ raise ExpectedException()
+ assert not in_transaction(aconn)
+
+ # Changes discarded
+ assert not await inserted(aconn)
+ assert not inserted(svcconn)
+
+
+@crdb_skip_external_observer
+async def test_autocommit_off_and_tx_in_progress_successful_exit(
+ aconn, apipeline, svcconn
+):
+ """
+ Scenario:
+ * Connection has autocommit off but and a transaction is already in
+ progress before entering the Transaction context
+ * Code exits Transaction context successfully
+
+ Outcome:
+ * Changes made within Transaction context are left intact
+ * Outer transaction is left running, and no changes are visible to an
+ outside observer from another connection.
+ """
+ await aconn.set_autocommit(False)
+ await insert_row(aconn, "prior")
+ if apipeline:
+ await apipeline.sync()
+ assert in_transaction(aconn)
+ async with aconn.transaction():
+ await insert_row(aconn, "new")
+ assert in_transaction(aconn)
+ assert await inserted(aconn) == {"prior", "new"}
+ # Nothing committed yet; changes not visible on another connection
+ assert not inserted(svcconn)
+
+
+@crdb_skip_external_observer
+async def test_autocommit_off_and_tx_in_progress_exception_exit(
+ aconn, apipeline, svcconn
+):
+ """
+ Scenario:
+ * Connection has autocommit off but and a transaction is already in
+ progress before entering the Transaction context
+ * Code exits Transaction context with an exception
+
+ Outcome:
+ * Changes made before the Transaction context are left intact
+ * Changes made within Transaction context are discarded
+ * Outer transaction is left running, and no changes are visible to an
+ outside observer from another connection.
+ """
+ await aconn.set_autocommit(False)
+ await insert_row(aconn, "prior")
+ if apipeline:
+ await apipeline.sync()
+ assert in_transaction(aconn)
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction():
+ await insert_row(aconn, "new")
+ raise ExpectedException()
+ assert in_transaction(aconn)
+ assert await inserted(aconn) == {"prior"}
+ # Nothing committed yet; changes not visible on another connection
+ assert not inserted(svcconn)
+
+
+async def test_nested_all_changes_persisted_on_successful_exit(aconn, svcconn):
+ """Changes from nested transaction contexts are all persisted on exit."""
+ async with aconn.transaction():
+ await insert_row(aconn, "outer-before")
+ async with aconn.transaction():
+ await insert_row(aconn, "inner")
+ await insert_row(aconn, "outer-after")
+ assert not in_transaction(aconn)
+ assert await inserted(aconn) == {"outer-before", "inner", "outer-after"}
+ assert inserted(svcconn) == {"outer-before", "inner", "outer-after"}
+
+
+async def test_nested_all_changes_discarded_on_outer_exception(aconn, svcconn):
+ """
+ Changes from nested transaction contexts are discarded when an exception
+ raised in outer context escapes.
+ """
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction():
+ await insert_row(aconn, "outer")
+ async with aconn.transaction():
+ await insert_row(aconn, "inner")
+ raise ExpectedException()
+ assert not in_transaction(aconn)
+ assert not await inserted(aconn)
+ assert not inserted(svcconn)
+
+
+async def test_nested_all_changes_discarded_on_inner_exception(aconn, svcconn):
+ """
+ Changes from nested transaction contexts are discarded when an exception
+ raised in inner context escapes the outer context.
+ """
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction():
+ await insert_row(aconn, "outer")
+ async with aconn.transaction():
+ await insert_row(aconn, "inner")
+ raise ExpectedException()
+ assert not in_transaction(aconn)
+ assert not await inserted(aconn)
+ assert not inserted(svcconn)
+
+
+async def test_nested_inner_scope_exception_handled_in_outer_scope(aconn, svcconn):
+ """
+ An exception escaping the inner transaction context causes changes made
+ within that inner context to be discarded, but the error can then be
+ handled in the outer context, allowing changes made in the outer context
+ (both before, and after, the inner context) to be successfully committed.
+ """
+ async with aconn.transaction():
+ await insert_row(aconn, "outer-before")
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction():
+ await insert_row(aconn, "inner")
+ raise ExpectedException()
+ await insert_row(aconn, "outer-after")
+ assert not in_transaction(aconn)
+ assert await inserted(aconn) == {"outer-before", "outer-after"}
+ assert inserted(svcconn) == {"outer-before", "outer-after"}
+
+
+async def test_nested_three_levels_successful_exit(aconn, svcconn):
+ """Exercise management of more than one savepoint."""
+ async with aconn.transaction(): # BEGIN
+ await insert_row(aconn, "one")
+ async with aconn.transaction(): # SAVEPOINT s1
+ await insert_row(aconn, "two")
+ async with aconn.transaction(): # SAVEPOINT s2
+ await insert_row(aconn, "three")
+ assert not in_transaction(aconn)
+ assert await inserted(aconn) == {"one", "two", "three"}
+ assert inserted(svcconn) == {"one", "two", "three"}
+
+
+async def test_named_savepoint_escapes_savepoint_name(aconn):
+ async with aconn.transaction("s-1"):
+ pass
+ async with aconn.transaction("s1; drop table students"):
+ pass
+
+
+async def test_named_savepoints_successful_exit(aconn, acommands):
+ """
+ Entering a transaction context will do one of these these things:
+ 1. Begin an outer transaction (if one isn't already in progress)
+ 2. Begin an outer transaction and create a savepoint (if one is named)
+ 3. Create a savepoint (if a transaction is already in progress)
+ either using the name provided, or auto-generating a savepoint name.
+
+ ...and exiting the context successfully will "commit" the same.
+ """
+ commands = acommands
+
+ # Case 1
+ # Using Transaction explicitly because conn.transaction() enters the contetx
+ async with aconn.transaction() as tx:
+ assert commands.popall() == ["BEGIN"]
+ assert not tx.savepoint_name
+ assert commands.popall() == ["COMMIT"]
+
+ # Case 1 (with a transaction already started)
+ await aconn.cursor().execute("select 1")
+ assert commands.popall() == ["BEGIN"]
+ async with aconn.transaction() as tx:
+ assert commands.popall() == ['SAVEPOINT "_pg3_1"']
+ assert tx.savepoint_name == "_pg3_1"
+
+ assert commands.popall() == ['RELEASE "_pg3_1"']
+ await aconn.rollback()
+ assert commands.popall() == ["ROLLBACK"]
+
+ # Case 2
+ async with aconn.transaction(savepoint_name="foo") as tx:
+ assert commands.popall() == ["BEGIN", 'SAVEPOINT "foo"']
+ assert tx.savepoint_name == "foo"
+ assert commands.popall() == ["COMMIT"]
+
+ # Case 3 (with savepoint name provided)
+ async with aconn.transaction():
+ assert commands.popall() == ["BEGIN"]
+ async with aconn.transaction(savepoint_name="bar") as tx:
+ assert commands.popall() == ['SAVEPOINT "bar"']
+ assert tx.savepoint_name == "bar"
+ assert commands.popall() == ['RELEASE "bar"']
+ assert commands.popall() == ["COMMIT"]
+
+ # Case 3 (with savepoint name auto-generated)
+ async with aconn.transaction():
+ assert commands.popall() == ["BEGIN"]
+ async with aconn.transaction() as tx:
+ assert commands.popall() == ['SAVEPOINT "_pg3_2"']
+ assert tx.savepoint_name == "_pg3_2"
+ assert commands.popall() == ['RELEASE "_pg3_2"']
+ assert commands.popall() == ["COMMIT"]
+
+
+async def test_named_savepoints_exception_exit(aconn, acommands):
+ """
+ Same as the previous test but checks that when exiting the context with an
+ exception, whatever transaction and/or savepoint was started on enter will
+ be rolled-back as appropriate.
+ """
+ commands = acommands
+
+ # Case 1
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction() as tx:
+ assert commands.popall() == ["BEGIN"]
+ assert not tx.savepoint_name
+ raise ExpectedException
+ assert commands.popall() == ["ROLLBACK"]
+
+ # Case 2
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction(savepoint_name="foo") as tx:
+ assert commands.popall() == ["BEGIN", 'SAVEPOINT "foo"']
+ assert tx.savepoint_name == "foo"
+ raise ExpectedException
+ assert commands.popall() == ["ROLLBACK"]
+
+ # Case 3 (with savepoint name provided)
+ async with aconn.transaction():
+ assert commands.popall() == ["BEGIN"]
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction(savepoint_name="bar") as tx:
+ assert commands.popall() == ['SAVEPOINT "bar"']
+ assert tx.savepoint_name == "bar"
+ raise ExpectedException
+ assert commands.popall() == ['ROLLBACK TO "bar"', 'RELEASE "bar"']
+ assert commands.popall() == ["COMMIT"]
+
+ # Case 3 (with savepoint name auto-generated)
+ async with aconn.transaction():
+ assert commands.popall() == ["BEGIN"]
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction() as tx:
+ assert commands.popall() == ['SAVEPOINT "_pg3_2"']
+ assert tx.savepoint_name == "_pg3_2"
+ raise ExpectedException
+ assert commands.popall() == [
+ 'ROLLBACK TO "_pg3_2"',
+ 'RELEASE "_pg3_2"',
+ ]
+ assert commands.popall() == ["COMMIT"]
+
+
+async def test_named_savepoints_with_repeated_names_works(aconn):
+ """
+ Using the same savepoint name repeatedly works correctly, but bypasses
+ some sanity checks.
+ """
+ # Works correctly if no inner transactions are rolled back
+ async with aconn.transaction(force_rollback=True):
+ async with aconn.transaction("sp"):
+ await insert_row(aconn, "tx1")
+ async with aconn.transaction("sp"):
+ await insert_row(aconn, "tx2")
+ async with aconn.transaction("sp"):
+ await insert_row(aconn, "tx3")
+ assert await inserted(aconn) == {"tx1", "tx2", "tx3"}
+
+ # Works correctly if one level of inner transaction is rolled back
+ async with aconn.transaction(force_rollback=True):
+ async with aconn.transaction("s1"):
+ await insert_row(aconn, "tx1")
+ async with aconn.transaction("s1", force_rollback=True):
+ await insert_row(aconn, "tx2")
+ async with aconn.transaction("s1"):
+ await insert_row(aconn, "tx3")
+ assert await inserted(aconn) == {"tx1"}
+ assert await inserted(aconn) == {"tx1"}
+
+ # Works correctly if multiple inner transactions are rolled back
+ # (This scenario mandates releasing savepoints after rolling back to them.)
+ async with aconn.transaction(force_rollback=True):
+ async with aconn.transaction("s1"):
+ await insert_row(aconn, "tx1")
+ async with aconn.transaction("s1") as tx2:
+ await insert_row(aconn, "tx2")
+ async with aconn.transaction("s1"):
+ await insert_row(aconn, "tx3")
+ raise Rollback(tx2)
+ assert await inserted(aconn) == {"tx1"}
+ assert await inserted(aconn) == {"tx1"}
+
+
+async def test_force_rollback_successful_exit(aconn, svcconn):
+ """
+ Transaction started with the force_rollback option enabled discards all
+ changes at the end of the context.
+ """
+ async with aconn.transaction(force_rollback=True):
+ await insert_row(aconn, "foo")
+ assert not await inserted(aconn)
+ assert not inserted(svcconn)
+
+
+async def test_force_rollback_exception_exit(aconn, svcconn):
+ """
+ Transaction started with the force_rollback option enabled discards all
+ changes at the end of the context.
+ """
+ with pytest.raises(ExpectedException):
+ async with aconn.transaction(force_rollback=True):
+ await insert_row(aconn, "foo")
+ raise ExpectedException()
+ assert not await inserted(aconn)
+ assert not inserted(svcconn)
+
+
+@crdb_skip_external_observer
+async def test_explicit_rollback_discards_changes(aconn, svcconn):
+ """
+ Raising a Rollback exception in the middle of a block exits the block and
+ discards all changes made within that block.
+
+ You can raise any of the following:
+ - Rollback (type)
+ - Rollback() (instance)
+ - Rollback(tx) (instance initialised with reference to the transaction)
+ All of these are equivalent.
+ """
+
+ async def assert_no_rows():
+ assert not await inserted(aconn)
+ assert not inserted(svcconn)
+
+ async with aconn.transaction():
+ await insert_row(aconn, "foo")
+ raise Rollback
+ await assert_no_rows()
+
+ async with aconn.transaction():
+ await insert_row(aconn, "foo")
+ raise Rollback()
+ await assert_no_rows()
+
+ async with aconn.transaction() as tx:
+ await insert_row(aconn, "foo")
+ raise Rollback(tx)
+ await assert_no_rows()
+
+
+@crdb_skip_external_observer
+async def test_explicit_rollback_outer_tx_unaffected(aconn, svcconn):
+ """
+ Raising a Rollback exception in the middle of a block does not impact an
+ enclosing transaction block.
+ """
+ async with aconn.transaction():
+ await insert_row(aconn, "before")
+ async with aconn.transaction():
+ await insert_row(aconn, "during")
+ raise Rollback
+ assert in_transaction(aconn)
+ assert not inserted(svcconn)
+ await insert_row(aconn, "after")
+ assert await inserted(aconn) == {"before", "after"}
+ assert inserted(svcconn) == {"before", "after"}
+
+
+async def test_explicit_rollback_of_outer_transaction(aconn):
+ """
+ Raising a Rollback exception that references an outer transaction will
+ discard all changes from both inner and outer transaction blocks.
+ """
+ async with aconn.transaction() as outer_tx:
+ await insert_row(aconn, "outer")
+ async with aconn.transaction():
+ await insert_row(aconn, "inner")
+ raise Rollback(outer_tx)
+ assert False, "This line of code should be unreachable."
+ assert not await inserted(aconn)
+
+
+@crdb_skip_external_observer
+async def test_explicit_rollback_of_enclosing_tx_outer_tx_unaffected(aconn, svcconn):
+ """
+ Rolling-back an enclosing transaction does not impact an outer transaction.
+ """
+ async with aconn.transaction():
+ await insert_row(aconn, "outer-before")
+ async with aconn.transaction() as tx_enclosing:
+ await insert_row(aconn, "enclosing")
+ async with aconn.transaction():
+ await insert_row(aconn, "inner")
+ raise Rollback(tx_enclosing)
+ await insert_row(aconn, "outer-after")
+
+ assert await inserted(aconn) == {"outer-before", "outer-after"}
+ assert not inserted(svcconn) # Not yet committed
+ # Changes committed
+ assert inserted(svcconn) == {"outer-before", "outer-after"}
+
+
+async def test_str(aconn, apipeline):
+ async with aconn.transaction() as tx:
+ if apipeline:
+ assert "[INTRANS]" not in str(tx)
+ await apipeline.sync()
+ assert "[INTRANS, pipeline=ON]" in str(tx)
+ else:
+ assert "[INTRANS]" in str(tx)
+ assert "(active)" in str(tx)
+ assert "'" not in str(tx)
+ async with aconn.transaction("wat") as tx2:
+ if apipeline:
+ assert "[INTRANS, pipeline=ON]" in str(tx2)
+ else:
+ assert "[INTRANS]" in str(tx2)
+ assert "'wat'" in str(tx2)
+
+ if apipeline:
+ assert "[IDLE, pipeline=ON]" in str(tx)
+ else:
+ assert "[IDLE]" in str(tx)
+ assert "(terminated)" in str(tx)
+
+ with pytest.raises(ZeroDivisionError):
+ async with aconn.transaction() as tx:
+ 1 / 0
+
+ assert "(terminated)" in str(tx)
+
+
+@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback])
+async def test_out_of_order_exit(aconn, exit_error):
+ await aconn.set_autocommit(True)
+
+ t1 = aconn.transaction()
+ await t1.__aenter__()
+
+ t2 = aconn.transaction()
+ await t2.__aenter__()
+
+ with pytest.raises(e.ProgrammingError):
+ await t1.__aexit__(*get_exc_info(exit_error))
+
+ with pytest.raises(e.ProgrammingError):
+ await t2.__aexit__(*get_exc_info(exit_error))
+
+
+@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback])
+async def test_out_of_order_implicit_begin(aconn, exit_error):
+ await aconn.execute("select 1")
+
+ t1 = aconn.transaction()
+ await t1.__aenter__()
+
+ t2 = aconn.transaction()
+ await t2.__aenter__()
+
+ with pytest.raises(e.ProgrammingError):
+ await t1.__aexit__(*get_exc_info(exit_error))
+
+ with pytest.raises(e.ProgrammingError):
+ await t2.__aexit__(*get_exc_info(exit_error))
+
+
+@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback])
+async def test_out_of_order_exit_same_name(aconn, exit_error):
+ await aconn.set_autocommit(True)
+
+ t1 = aconn.transaction("save")
+ await t1.__aenter__()
+ t2 = aconn.transaction("save")
+ await t2.__aenter__()
+
+ with pytest.raises(e.ProgrammingError):
+ await t1.__aexit__(*get_exc_info(exit_error))
+
+ with pytest.raises(e.ProgrammingError):
+ await t2.__aexit__(*get_exc_info(exit_error))
+
+
+@pytest.mark.parametrize("what", ["commit", "rollback", "error"])
+async def test_concurrency(aconn, what):
+ await aconn.set_autocommit(True)
+
+ evs = [asyncio.Event() for i in range(3)]
+
+ async def worker(unlock, wait_on):
+ with pytest.raises(e.ProgrammingError) as ex:
+ async with aconn.transaction():
+ unlock.set()
+ await wait_on.wait()
+ await aconn.execute("select 1")
+
+ if what == "error":
+ 1 / 0
+ elif what == "rollback":
+ raise Rollback()
+ else:
+ assert what == "commit"
+
+ if what == "error":
+ assert "transaction rollback" in str(ex.value)
+ assert isinstance(ex.value.__context__, ZeroDivisionError)
+ elif what == "rollback":
+ assert "transaction rollback" in str(ex.value)
+ assert isinstance(ex.value.__context__, Rollback)
+ else:
+ assert "transaction commit" in str(ex.value)
+
+ # Start a first transaction in a task
+ t1 = create_task(worker(unlock=evs[0], wait_on=evs[1]))
+ await evs[0].wait()
+
+ # Start a nested transaction in a task
+ t2 = create_task(worker(unlock=evs[1], wait_on=evs[2]))
+
+ # Terminate the first transaction before the second does
+ await asyncio.gather(t1)
+ evs[2].set()
+ await asyncio.gather(t2)