blob: df43f3bd1fa3b2e7c5728bb094ff5fed9c36cd72 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
from psycopg.pq import TransactionStatus
from psycopg.crdb import CrdbConnection
import pytest
pytestmark = pytest.mark.crdb("skip")
def test_is_crdb(conn):
assert not CrdbConnection.is_crdb(conn)
assert not CrdbConnection.is_crdb(conn.pgconn)
def test_tpc_on_pg_connection(conn, tpc):
xid = conn.xid(1, "gtrid", "bqual")
assert conn.info.transaction_status == TransactionStatus.IDLE
conn.tpc_begin(xid)
assert conn.info.transaction_status == TransactionStatus.INTRANS
cur = conn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit')")
assert tpc.count_xacts() == 0
assert tpc.count_test_records() == 0
conn.tpc_prepare()
assert conn.info.transaction_status == TransactionStatus.IDLE
assert tpc.count_xacts() == 1
assert tpc.count_test_records() == 0
conn.tpc_commit()
assert conn.info.transaction_status == TransactionStatus.IDLE
assert tpc.count_xacts() == 0
assert tpc.count_test_records() == 1
|