summaryrefslogtreecommitdiffstats
path: root/tests/crdb/test_connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/crdb/test_connection.py')
-rw-r--r--tests/crdb/test_connection.py86
1 files changed, 86 insertions, 0 deletions
diff --git a/tests/crdb/test_connection.py b/tests/crdb/test_connection.py
new file mode 100644
index 0000000..b2a69ef
--- /dev/null
+++ b/tests/crdb/test_connection.py
@@ -0,0 +1,86 @@
+import time
+import threading
+
+import psycopg.crdb
+from psycopg import errors as e
+from psycopg.crdb import CrdbConnection
+
+import pytest
+
+pytestmark = pytest.mark.crdb
+
+
+def test_is_crdb(conn):
+ assert CrdbConnection.is_crdb(conn)
+ assert CrdbConnection.is_crdb(conn.pgconn)
+
+
+def test_connect(dsn):
+ with CrdbConnection.connect(dsn) as conn:
+ assert isinstance(conn, CrdbConnection)
+
+ with psycopg.crdb.connect(dsn) as conn:
+ assert isinstance(conn, CrdbConnection)
+
+
+def test_xid(dsn):
+ with CrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ conn.xid(1, "gtrid", "bqual")
+
+
+def test_tpc_begin(dsn):
+ with CrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ conn.tpc_begin("foo")
+
+
+def test_tpc_recover(dsn):
+ with CrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ conn.tpc_recover()
+
+
+@pytest.mark.slow
+def test_broken_connection(conn):
+ cur = conn.cursor()
+ (session_id,) = cur.execute("select session_id from [show session_id]").fetchone()
+ with pytest.raises(psycopg.DatabaseError):
+ cur.execute("cancel session %s", [session_id])
+ assert conn.closed
+
+
+@pytest.mark.slow
+def test_broken(conn):
+ (session_id,) = conn.execute("show session_id").fetchone()
+ with pytest.raises(psycopg.OperationalError):
+ conn.execute("cancel session %s", [session_id])
+
+ assert conn.closed
+ assert conn.broken
+ conn.close()
+ assert conn.closed
+ assert conn.broken
+
+
+@pytest.mark.slow
+def test_identify_closure(conn_cls, dsn):
+ with conn_cls.connect(dsn, autocommit=True) as conn:
+ with conn_cls.connect(dsn, autocommit=True) as conn2:
+ (session_id,) = conn.execute("show session_id").fetchone()
+
+ def closer():
+ time.sleep(0.2)
+ conn2.execute("cancel session %s", [session_id])
+
+ t = threading.Thread(target=closer)
+ t.start()
+ t0 = time.time()
+ try:
+ with pytest.raises(psycopg.OperationalError):
+ conn.execute("select pg_sleep(3.0)")
+ dt = time.time() - t0
+ # CRDB seems to take not less than 1s
+ assert 0.2 < dt < 2
+ finally:
+ t.join()