1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import time
import threading
import psycopg.crdb
from psycopg import errors as e
from psycopg.crdb import CrdbConnection
import pytest
pytestmark = pytest.mark.crdb
def test_is_crdb(conn):
assert CrdbConnection.is_crdb(conn)
assert CrdbConnection.is_crdb(conn.pgconn)
def test_connect(dsn):
with CrdbConnection.connect(dsn) as conn:
assert isinstance(conn, CrdbConnection)
with psycopg.crdb.connect(dsn) as conn:
assert isinstance(conn, CrdbConnection)
def test_xid(dsn):
with CrdbConnection.connect(dsn) as conn:
with pytest.raises(e.NotSupportedError):
conn.xid(1, "gtrid", "bqual")
def test_tpc_begin(dsn):
with CrdbConnection.connect(dsn) as conn:
with pytest.raises(e.NotSupportedError):
conn.tpc_begin("foo")
def test_tpc_recover(dsn):
with CrdbConnection.connect(dsn) as conn:
with pytest.raises(e.NotSupportedError):
conn.tpc_recover()
@pytest.mark.slow
def test_broken_connection(conn):
cur = conn.cursor()
(session_id,) = cur.execute("select session_id from [show session_id]").fetchone()
with pytest.raises(psycopg.DatabaseError):
cur.execute("cancel session %s", [session_id])
assert conn.closed
@pytest.mark.slow
def test_broken(conn):
(session_id,) = conn.execute("show session_id").fetchone()
with pytest.raises(psycopg.OperationalError):
conn.execute("cancel session %s", [session_id])
assert conn.closed
assert conn.broken
conn.close()
assert conn.closed
assert conn.broken
@pytest.mark.slow
def test_identify_closure(conn_cls, dsn):
with conn_cls.connect(dsn, autocommit=True) as conn:
with conn_cls.connect(dsn, autocommit=True) as conn2:
(session_id,) = conn.execute("show session_id").fetchone()
def closer():
time.sleep(0.2)
conn2.execute("cancel session %s", [session_id])
t = threading.Thread(target=closer)
t.start()
t0 = time.time()
try:
with pytest.raises(psycopg.OperationalError):
conn.execute("select pg_sleep(3.0)")
dt = time.time() - t0
# CRDB seems to take not less than 1s
assert 0.2 < dt < 2
finally:
t.join()
|