diff options
Diffstat (limited to 'tests/test_transaction_async.py')
-rw-r--r-- | tests/test_transaction_async.py | 743 |
1 files changed, 743 insertions, 0 deletions
diff --git a/tests/test_transaction_async.py b/tests/test_transaction_async.py new file mode 100644 index 0000000..55e1c9c --- /dev/null +++ b/tests/test_transaction_async.py @@ -0,0 +1,743 @@ +import asyncio +import logging + +import pytest + +from psycopg import Rollback +from psycopg import errors as e +from psycopg._compat import create_task + +from .test_transaction import in_transaction, insert_row, inserted, get_exc_info +from .test_transaction import ExpectedException, crdb_skip_external_observer +from .test_transaction import create_test_table # noqa # autouse fixture + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture +async def aconn(aconn, apipeline): + return aconn + + +async def test_basic(aconn, apipeline): + """Basic use of transaction() to BEGIN and COMMIT a transaction.""" + assert not in_transaction(aconn) + async with aconn.transaction(): + if apipeline: + await apipeline.sync() + assert in_transaction(aconn) + assert not in_transaction(aconn) + + +async def test_exposes_associated_connection(aconn): + """Transaction exposes its connection as a read-only property.""" + async with aconn.transaction() as tx: + assert tx.connection is aconn + with pytest.raises(AttributeError): + tx.connection = aconn + + +async def test_exposes_savepoint_name(aconn): + """Transaction exposes its savepoint name as a read-only property.""" + async with aconn.transaction(savepoint_name="foo") as tx: + assert tx.savepoint_name == "foo" + with pytest.raises(AttributeError): + tx.savepoint_name = "bar" + + +async def test_cant_reenter(aconn): + async with aconn.transaction() as tx: + pass + + with pytest.raises(TypeError): + async with tx: + pass + + +async def test_begins_on_enter(aconn, apipeline): + """Transaction does not begin until __enter__() is called.""" + tx = aconn.transaction() + assert not in_transaction(aconn) + async with tx: + if apipeline: + await apipeline.sync() + assert in_transaction(aconn) + assert not in_transaction(aconn) + + +async def test_commit_on_successful_exit(aconn): + """Changes are committed on successful exit from the `with` block.""" + async with aconn.transaction(): + await insert_row(aconn, "foo") + + assert not in_transaction(aconn) + assert await inserted(aconn) == {"foo"} + + +async def test_rollback_on_exception_exit(aconn): + """Changes are rolled back if an exception escapes the `with` block.""" + with pytest.raises(ExpectedException): + async with aconn.transaction(): + await insert_row(aconn, "foo") + raise ExpectedException("This discards the insert") + + assert not in_transaction(aconn) + assert not await inserted(aconn) + + +@pytest.mark.crdb_skip("pg_terminate_backend") +async def test_context_inerror_rollback_no_clobber( + aconn_cls, aconn, apipeline, dsn, caplog +): + if apipeline: + # Only 'aconn' is possibly in pipeline mode, but the transaction and + # checks are on 'conn2'. + pytest.skip("not applicable") + caplog.set_level(logging.WARNING, logger="psycopg") + + with pytest.raises(ZeroDivisionError): + async with await aconn_cls.connect(dsn) as conn2: + async with conn2.transaction(): + await conn2.execute("select 1") + await aconn.execute( + "select pg_terminate_backend(%s::int)", + [conn2.pgconn.backend_pid], + ) + 1 / 0 + + assert len(caplog.records) == 1 + rec = caplog.records[0] + assert rec.levelno == logging.WARNING + assert "in rollback" in rec.message + + +@pytest.mark.crdb_skip("copy") +async def test_context_active_rollback_no_clobber(aconn_cls, dsn, caplog): + caplog.set_level(logging.WARNING, logger="psycopg") + + conn = await aconn_cls.connect(dsn) + try: + with pytest.raises(ZeroDivisionError): + async with conn.transaction(): + conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout") + status = conn.info.transaction_status + assert status == conn.TransactionStatus.ACTIVE + 1 / 0 + + assert len(caplog.records) == 1 + rec = caplog.records[0] + assert rec.levelno == logging.WARNING + assert "in rollback" in rec.message + finally: + await conn.close() + + +async def test_interaction_dbapi_transaction(aconn): + await insert_row(aconn, "foo") + + async with aconn.transaction(): + await insert_row(aconn, "bar") + raise Rollback + + async with aconn.transaction(): + await insert_row(aconn, "baz") + + assert in_transaction(aconn) + await aconn.commit() + assert await inserted(aconn) == {"foo", "baz"} + + +async def test_prohibits_use_of_commit_rollback_autocommit(aconn): + """ + Within a Transaction block, it is forbidden to touch commit, rollback, + or the autocommit setting on the connection, as this would interfere + with the transaction scope being managed by the Transaction block. + """ + await aconn.set_autocommit(False) + await aconn.commit() + await aconn.rollback() + + async with aconn.transaction(): + with pytest.raises(e.ProgrammingError): + await aconn.set_autocommit(False) + with pytest.raises(e.ProgrammingError): + await aconn.commit() + with pytest.raises(e.ProgrammingError): + await aconn.rollback() + + await aconn.set_autocommit(False) + await aconn.commit() + await aconn.rollback() + + +@pytest.mark.parametrize("autocommit", [False, True]) +async def test_preserves_autocommit(aconn, autocommit): + """ + Connection.autocommit is unchanged both during and after Transaction block. + """ + await aconn.set_autocommit(autocommit) + async with aconn.transaction(): + assert aconn.autocommit is autocommit + assert aconn.autocommit is autocommit + + +async def test_autocommit_off_but_no_tx_started_successful_exit(aconn, svcconn): + """ + Scenario: + * Connection has autocommit off but no transaction has been initiated + before entering the Transaction context + * Code exits Transaction context successfully + + Outcome: + * Changes made within Transaction context are committed + """ + await aconn.set_autocommit(False) + assert not in_transaction(aconn) + async with aconn.transaction(): + await insert_row(aconn, "new") + assert not in_transaction(aconn) + + # Changes committed + assert await inserted(aconn) == {"new"} + assert inserted(svcconn) == {"new"} + + +async def test_autocommit_off_but_no_tx_started_exception_exit(aconn, svcconn): + """ + Scenario: + * Connection has autocommit off but no transaction has been initiated + before entering the Transaction context + * Code exits Transaction context with an exception + + Outcome: + * Changes made within Transaction context are discarded + """ + await aconn.set_autocommit(False) + assert not in_transaction(aconn) + with pytest.raises(ExpectedException): + async with aconn.transaction(): + await insert_row(aconn, "new") + raise ExpectedException() + assert not in_transaction(aconn) + + # Changes discarded + assert not await inserted(aconn) + assert not inserted(svcconn) + + +@crdb_skip_external_observer +async def test_autocommit_off_and_tx_in_progress_successful_exit( + aconn, apipeline, svcconn +): + """ + Scenario: + * Connection has autocommit off but and a transaction is already in + progress before entering the Transaction context + * Code exits Transaction context successfully + + Outcome: + * Changes made within Transaction context are left intact + * Outer transaction is left running, and no changes are visible to an + outside observer from another connection. + """ + await aconn.set_autocommit(False) + await insert_row(aconn, "prior") + if apipeline: + await apipeline.sync() + assert in_transaction(aconn) + async with aconn.transaction(): + await insert_row(aconn, "new") + assert in_transaction(aconn) + assert await inserted(aconn) == {"prior", "new"} + # Nothing committed yet; changes not visible on another connection + assert not inserted(svcconn) + + +@crdb_skip_external_observer +async def test_autocommit_off_and_tx_in_progress_exception_exit( + aconn, apipeline, svcconn +): + """ + Scenario: + * Connection has autocommit off but and a transaction is already in + progress before entering the Transaction context + * Code exits Transaction context with an exception + + Outcome: + * Changes made before the Transaction context are left intact + * Changes made within Transaction context are discarded + * Outer transaction is left running, and no changes are visible to an + outside observer from another connection. + """ + await aconn.set_autocommit(False) + await insert_row(aconn, "prior") + if apipeline: + await apipeline.sync() + assert in_transaction(aconn) + with pytest.raises(ExpectedException): + async with aconn.transaction(): + await insert_row(aconn, "new") + raise ExpectedException() + assert in_transaction(aconn) + assert await inserted(aconn) == {"prior"} + # Nothing committed yet; changes not visible on another connection + assert not inserted(svcconn) + + +async def test_nested_all_changes_persisted_on_successful_exit(aconn, svcconn): + """Changes from nested transaction contexts are all persisted on exit.""" + async with aconn.transaction(): + await insert_row(aconn, "outer-before") + async with aconn.transaction(): + await insert_row(aconn, "inner") + await insert_row(aconn, "outer-after") + assert not in_transaction(aconn) + assert await inserted(aconn) == {"outer-before", "inner", "outer-after"} + assert inserted(svcconn) == {"outer-before", "inner", "outer-after"} + + +async def test_nested_all_changes_discarded_on_outer_exception(aconn, svcconn): + """ + Changes from nested transaction contexts are discarded when an exception + raised in outer context escapes. + """ + with pytest.raises(ExpectedException): + async with aconn.transaction(): + await insert_row(aconn, "outer") + async with aconn.transaction(): + await insert_row(aconn, "inner") + raise ExpectedException() + assert not in_transaction(aconn) + assert not await inserted(aconn) + assert not inserted(svcconn) + + +async def test_nested_all_changes_discarded_on_inner_exception(aconn, svcconn): + """ + Changes from nested transaction contexts are discarded when an exception + raised in inner context escapes the outer context. + """ + with pytest.raises(ExpectedException): + async with aconn.transaction(): + await insert_row(aconn, "outer") + async with aconn.transaction(): + await insert_row(aconn, "inner") + raise ExpectedException() + assert not in_transaction(aconn) + assert not await inserted(aconn) + assert not inserted(svcconn) + + +async def test_nested_inner_scope_exception_handled_in_outer_scope(aconn, svcconn): + """ + An exception escaping the inner transaction context causes changes made + within that inner context to be discarded, but the error can then be + handled in the outer context, allowing changes made in the outer context + (both before, and after, the inner context) to be successfully committed. + """ + async with aconn.transaction(): + await insert_row(aconn, "outer-before") + with pytest.raises(ExpectedException): + async with aconn.transaction(): + await insert_row(aconn, "inner") + raise ExpectedException() + await insert_row(aconn, "outer-after") + assert not in_transaction(aconn) + assert await inserted(aconn) == {"outer-before", "outer-after"} + assert inserted(svcconn) == {"outer-before", "outer-after"} + + +async def test_nested_three_levels_successful_exit(aconn, svcconn): + """Exercise management of more than one savepoint.""" + async with aconn.transaction(): # BEGIN + await insert_row(aconn, "one") + async with aconn.transaction(): # SAVEPOINT s1 + await insert_row(aconn, "two") + async with aconn.transaction(): # SAVEPOINT s2 + await insert_row(aconn, "three") + assert not in_transaction(aconn) + assert await inserted(aconn) == {"one", "two", "three"} + assert inserted(svcconn) == {"one", "two", "three"} + + +async def test_named_savepoint_escapes_savepoint_name(aconn): + async with aconn.transaction("s-1"): + pass + async with aconn.transaction("s1; drop table students"): + pass + + +async def test_named_savepoints_successful_exit(aconn, acommands): + """ + Entering a transaction context will do one of these these things: + 1. Begin an outer transaction (if one isn't already in progress) + 2. Begin an outer transaction and create a savepoint (if one is named) + 3. Create a savepoint (if a transaction is already in progress) + either using the name provided, or auto-generating a savepoint name. + + ...and exiting the context successfully will "commit" the same. + """ + commands = acommands + + # Case 1 + # Using Transaction explicitly because conn.transaction() enters the contetx + async with aconn.transaction() as tx: + assert commands.popall() == ["BEGIN"] + assert not tx.savepoint_name + assert commands.popall() == ["COMMIT"] + + # Case 1 (with a transaction already started) + await aconn.cursor().execute("select 1") + assert commands.popall() == ["BEGIN"] + async with aconn.transaction() as tx: + assert commands.popall() == ['SAVEPOINT "_pg3_1"'] + assert tx.savepoint_name == "_pg3_1" + + assert commands.popall() == ['RELEASE "_pg3_1"'] + await aconn.rollback() + assert commands.popall() == ["ROLLBACK"] + + # Case 2 + async with aconn.transaction(savepoint_name="foo") as tx: + assert commands.popall() == ["BEGIN", 'SAVEPOINT "foo"'] + assert tx.savepoint_name == "foo" + assert commands.popall() == ["COMMIT"] + + # Case 3 (with savepoint name provided) + async with aconn.transaction(): + assert commands.popall() == ["BEGIN"] + async with aconn.transaction(savepoint_name="bar") as tx: + assert commands.popall() == ['SAVEPOINT "bar"'] + assert tx.savepoint_name == "bar" + assert commands.popall() == ['RELEASE "bar"'] + assert commands.popall() == ["COMMIT"] + + # Case 3 (with savepoint name auto-generated) + async with aconn.transaction(): + assert commands.popall() == ["BEGIN"] + async with aconn.transaction() as tx: + assert commands.popall() == ['SAVEPOINT "_pg3_2"'] + assert tx.savepoint_name == "_pg3_2" + assert commands.popall() == ['RELEASE "_pg3_2"'] + assert commands.popall() == ["COMMIT"] + + +async def test_named_savepoints_exception_exit(aconn, acommands): + """ + Same as the previous test but checks that when exiting the context with an + exception, whatever transaction and/or savepoint was started on enter will + be rolled-back as appropriate. + """ + commands = acommands + + # Case 1 + with pytest.raises(ExpectedException): + async with aconn.transaction() as tx: + assert commands.popall() == ["BEGIN"] + assert not tx.savepoint_name + raise ExpectedException + assert commands.popall() == ["ROLLBACK"] + + # Case 2 + with pytest.raises(ExpectedException): + async with aconn.transaction(savepoint_name="foo") as tx: + assert commands.popall() == ["BEGIN", 'SAVEPOINT "foo"'] + assert tx.savepoint_name == "foo" + raise ExpectedException + assert commands.popall() == ["ROLLBACK"] + + # Case 3 (with savepoint name provided) + async with aconn.transaction(): + assert commands.popall() == ["BEGIN"] + with pytest.raises(ExpectedException): + async with aconn.transaction(savepoint_name="bar") as tx: + assert commands.popall() == ['SAVEPOINT "bar"'] + assert tx.savepoint_name == "bar" + raise ExpectedException + assert commands.popall() == ['ROLLBACK TO "bar"', 'RELEASE "bar"'] + assert commands.popall() == ["COMMIT"] + + # Case 3 (with savepoint name auto-generated) + async with aconn.transaction(): + assert commands.popall() == ["BEGIN"] + with pytest.raises(ExpectedException): + async with aconn.transaction() as tx: + assert commands.popall() == ['SAVEPOINT "_pg3_2"'] + assert tx.savepoint_name == "_pg3_2" + raise ExpectedException + assert commands.popall() == [ + 'ROLLBACK TO "_pg3_2"', + 'RELEASE "_pg3_2"', + ] + assert commands.popall() == ["COMMIT"] + + +async def test_named_savepoints_with_repeated_names_works(aconn): + """ + Using the same savepoint name repeatedly works correctly, but bypasses + some sanity checks. + """ + # Works correctly if no inner transactions are rolled back + async with aconn.transaction(force_rollback=True): + async with aconn.transaction("sp"): + await insert_row(aconn, "tx1") + async with aconn.transaction("sp"): + await insert_row(aconn, "tx2") + async with aconn.transaction("sp"): + await insert_row(aconn, "tx3") + assert await inserted(aconn) == {"tx1", "tx2", "tx3"} + + # Works correctly if one level of inner transaction is rolled back + async with aconn.transaction(force_rollback=True): + async with aconn.transaction("s1"): + await insert_row(aconn, "tx1") + async with aconn.transaction("s1", force_rollback=True): + await insert_row(aconn, "tx2") + async with aconn.transaction("s1"): + await insert_row(aconn, "tx3") + assert await inserted(aconn) == {"tx1"} + assert await inserted(aconn) == {"tx1"} + + # Works correctly if multiple inner transactions are rolled back + # (This scenario mandates releasing savepoints after rolling back to them.) + async with aconn.transaction(force_rollback=True): + async with aconn.transaction("s1"): + await insert_row(aconn, "tx1") + async with aconn.transaction("s1") as tx2: + await insert_row(aconn, "tx2") + async with aconn.transaction("s1"): + await insert_row(aconn, "tx3") + raise Rollback(tx2) + assert await inserted(aconn) == {"tx1"} + assert await inserted(aconn) == {"tx1"} + + +async def test_force_rollback_successful_exit(aconn, svcconn): + """ + Transaction started with the force_rollback option enabled discards all + changes at the end of the context. + """ + async with aconn.transaction(force_rollback=True): + await insert_row(aconn, "foo") + assert not await inserted(aconn) + assert not inserted(svcconn) + + +async def test_force_rollback_exception_exit(aconn, svcconn): + """ + Transaction started with the force_rollback option enabled discards all + changes at the end of the context. + """ + with pytest.raises(ExpectedException): + async with aconn.transaction(force_rollback=True): + await insert_row(aconn, "foo") + raise ExpectedException() + assert not await inserted(aconn) + assert not inserted(svcconn) + + +@crdb_skip_external_observer +async def test_explicit_rollback_discards_changes(aconn, svcconn): + """ + Raising a Rollback exception in the middle of a block exits the block and + discards all changes made within that block. + + You can raise any of the following: + - Rollback (type) + - Rollback() (instance) + - Rollback(tx) (instance initialised with reference to the transaction) + All of these are equivalent. + """ + + async def assert_no_rows(): + assert not await inserted(aconn) + assert not inserted(svcconn) + + async with aconn.transaction(): + await insert_row(aconn, "foo") + raise Rollback + await assert_no_rows() + + async with aconn.transaction(): + await insert_row(aconn, "foo") + raise Rollback() + await assert_no_rows() + + async with aconn.transaction() as tx: + await insert_row(aconn, "foo") + raise Rollback(tx) + await assert_no_rows() + + +@crdb_skip_external_observer +async def test_explicit_rollback_outer_tx_unaffected(aconn, svcconn): + """ + Raising a Rollback exception in the middle of a block does not impact an + enclosing transaction block. + """ + async with aconn.transaction(): + await insert_row(aconn, "before") + async with aconn.transaction(): + await insert_row(aconn, "during") + raise Rollback + assert in_transaction(aconn) + assert not inserted(svcconn) + await insert_row(aconn, "after") + assert await inserted(aconn) == {"before", "after"} + assert inserted(svcconn) == {"before", "after"} + + +async def test_explicit_rollback_of_outer_transaction(aconn): + """ + Raising a Rollback exception that references an outer transaction will + discard all changes from both inner and outer transaction blocks. + """ + async with aconn.transaction() as outer_tx: + await insert_row(aconn, "outer") + async with aconn.transaction(): + await insert_row(aconn, "inner") + raise Rollback(outer_tx) + assert False, "This line of code should be unreachable." + assert not await inserted(aconn) + + +@crdb_skip_external_observer +async def test_explicit_rollback_of_enclosing_tx_outer_tx_unaffected(aconn, svcconn): + """ + Rolling-back an enclosing transaction does not impact an outer transaction. + """ + async with aconn.transaction(): + await insert_row(aconn, "outer-before") + async with aconn.transaction() as tx_enclosing: + await insert_row(aconn, "enclosing") + async with aconn.transaction(): + await insert_row(aconn, "inner") + raise Rollback(tx_enclosing) + await insert_row(aconn, "outer-after") + + assert await inserted(aconn) == {"outer-before", "outer-after"} + assert not inserted(svcconn) # Not yet committed + # Changes committed + assert inserted(svcconn) == {"outer-before", "outer-after"} + + +async def test_str(aconn, apipeline): + async with aconn.transaction() as tx: + if apipeline: + assert "[INTRANS]" not in str(tx) + await apipeline.sync() + assert "[INTRANS, pipeline=ON]" in str(tx) + else: + assert "[INTRANS]" in str(tx) + assert "(active)" in str(tx) + assert "'" not in str(tx) + async with aconn.transaction("wat") as tx2: + if apipeline: + assert "[INTRANS, pipeline=ON]" in str(tx2) + else: + assert "[INTRANS]" in str(tx2) + assert "'wat'" in str(tx2) + + if apipeline: + assert "[IDLE, pipeline=ON]" in str(tx) + else: + assert "[IDLE]" in str(tx) + assert "(terminated)" in str(tx) + + with pytest.raises(ZeroDivisionError): + async with aconn.transaction() as tx: + 1 / 0 + + assert "(terminated)" in str(tx) + + +@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback]) +async def test_out_of_order_exit(aconn, exit_error): + await aconn.set_autocommit(True) + + t1 = aconn.transaction() + await t1.__aenter__() + + t2 = aconn.transaction() + await t2.__aenter__() + + with pytest.raises(e.ProgrammingError): + await t1.__aexit__(*get_exc_info(exit_error)) + + with pytest.raises(e.ProgrammingError): + await t2.__aexit__(*get_exc_info(exit_error)) + + +@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback]) +async def test_out_of_order_implicit_begin(aconn, exit_error): + await aconn.execute("select 1") + + t1 = aconn.transaction() + await t1.__aenter__() + + t2 = aconn.transaction() + await t2.__aenter__() + + with pytest.raises(e.ProgrammingError): + await t1.__aexit__(*get_exc_info(exit_error)) + + with pytest.raises(e.ProgrammingError): + await t2.__aexit__(*get_exc_info(exit_error)) + + +@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback]) +async def test_out_of_order_exit_same_name(aconn, exit_error): + await aconn.set_autocommit(True) + + t1 = aconn.transaction("save") + await t1.__aenter__() + t2 = aconn.transaction("save") + await t2.__aenter__() + + with pytest.raises(e.ProgrammingError): + await t1.__aexit__(*get_exc_info(exit_error)) + + with pytest.raises(e.ProgrammingError): + await t2.__aexit__(*get_exc_info(exit_error)) + + +@pytest.mark.parametrize("what", ["commit", "rollback", "error"]) +async def test_concurrency(aconn, what): + await aconn.set_autocommit(True) + + evs = [asyncio.Event() for i in range(3)] + + async def worker(unlock, wait_on): + with pytest.raises(e.ProgrammingError) as ex: + async with aconn.transaction(): + unlock.set() + await wait_on.wait() + await aconn.execute("select 1") + + if what == "error": + 1 / 0 + elif what == "rollback": + raise Rollback() + else: + assert what == "commit" + + if what == "error": + assert "transaction rollback" in str(ex.value) + assert isinstance(ex.value.__context__, ZeroDivisionError) + elif what == "rollback": + assert "transaction rollback" in str(ex.value) + assert isinstance(ex.value.__context__, Rollback) + else: + assert "transaction commit" in str(ex.value) + + # Start a first transaction in a task + t1 = create_task(worker(unlock=evs[0], wait_on=evs[1])) + await evs[0].wait() + + # Start a nested transaction in a task + t2 = create_task(worker(unlock=evs[1], wait_on=evs[2])) + + # Terminate the first transaction before the second does + await asyncio.gather(t1) + evs[2].set() + await asyncio.gather(t2) |