diff options
Diffstat (limited to 'tests/crdb/test_connection.py')
-rw-r--r-- | tests/crdb/test_connection.py | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/tests/crdb/test_connection.py b/tests/crdb/test_connection.py new file mode 100644 index 0000000..b2a69ef --- /dev/null +++ b/tests/crdb/test_connection.py @@ -0,0 +1,86 @@ +import time +import threading + +import psycopg.crdb +from psycopg import errors as e +from psycopg.crdb import CrdbConnection + +import pytest + +pytestmark = pytest.mark.crdb + + +def test_is_crdb(conn): + assert CrdbConnection.is_crdb(conn) + assert CrdbConnection.is_crdb(conn.pgconn) + + +def test_connect(dsn): + with CrdbConnection.connect(dsn) as conn: + assert isinstance(conn, CrdbConnection) + + with psycopg.crdb.connect(dsn) as conn: + assert isinstance(conn, CrdbConnection) + + +def test_xid(dsn): + with CrdbConnection.connect(dsn) as conn: + with pytest.raises(e.NotSupportedError): + conn.xid(1, "gtrid", "bqual") + + +def test_tpc_begin(dsn): + with CrdbConnection.connect(dsn) as conn: + with pytest.raises(e.NotSupportedError): + conn.tpc_begin("foo") + + +def test_tpc_recover(dsn): + with CrdbConnection.connect(dsn) as conn: + with pytest.raises(e.NotSupportedError): + conn.tpc_recover() + + +@pytest.mark.slow +def test_broken_connection(conn): + cur = conn.cursor() + (session_id,) = cur.execute("select session_id from [show session_id]").fetchone() + with pytest.raises(psycopg.DatabaseError): + cur.execute("cancel session %s", [session_id]) + assert conn.closed + + +@pytest.mark.slow +def test_broken(conn): + (session_id,) = conn.execute("show session_id").fetchone() + with pytest.raises(psycopg.OperationalError): + conn.execute("cancel session %s", [session_id]) + + assert conn.closed + assert conn.broken + conn.close() + assert conn.closed + assert conn.broken + + +@pytest.mark.slow +def test_identify_closure(conn_cls, dsn): + with conn_cls.connect(dsn, autocommit=True) as conn: + with conn_cls.connect(dsn, autocommit=True) as conn2: + (session_id,) = conn.execute("show session_id").fetchone() + + def closer(): + time.sleep(0.2) + conn2.execute("cancel session %s", [session_id]) + + t = threading.Thread(target=closer) + t.start() + t0 = time.time() + try: + with pytest.raises(psycopg.OperationalError): + conn.execute("select pg_sleep(3.0)") + dt = time.time() - t0 + # CRDB seems to take not less than 1s + assert 0.2 < dt < 2 + finally: + t.join() |