1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# These tests relate to AsyncConnectionPool, but are not marked asyncio
# because they rely on the pool initialization outside the asyncio loop.
import asyncio
import pytest
from ..utils import gc_collect
try:
import psycopg_pool as pool
except ImportError:
# Tests should have been skipped if the package is not available
pass
@pytest.mark.slow
def test_reconnect_after_max_lifetime(dsn, asyncio_run):
# See issue #219, pool created before the loop.
p = pool.AsyncConnectionPool(dsn, min_size=1, max_lifetime=0.2, open=False)
async def test():
try:
await p.open()
ns = []
for i in range(5):
async with p.connection() as conn:
cur = await conn.execute("select 1")
ns.append(await cur.fetchone())
await asyncio.sleep(0.2)
assert len(ns) == 5
finally:
await p.close()
asyncio_run(asyncio.wait_for(test(), timeout=2.0))
@pytest.mark.slow
def test_working_created_before_loop(dsn, asyncio_run):
p = pool.AsyncNullConnectionPool(dsn, open=False)
async def test():
try:
await p.open()
ns = []
for i in range(5):
async with p.connection() as conn:
cur = await conn.execute("select 1")
ns.append(await cur.fetchone())
await asyncio.sleep(0.2)
assert len(ns) == 5
finally:
await p.close()
asyncio_run(asyncio.wait_for(test(), timeout=2.0))
def test_cant_create_open_outside_loop(dsn):
with pytest.raises(RuntimeError):
pool.AsyncConnectionPool(dsn, open=True)
@pytest.fixture
def asyncio_run(recwarn):
"""Fixture reuturning asyncio.run, but managing resources at exit.
In certain runs, fd objects are leaked and the error will only be caught
downstream, by some innocent test calling gc_collect().
"""
recwarn.clear()
try:
yield asyncio.run
finally:
gc_collect()
if recwarn:
warn = recwarn.pop(ResourceWarning)
assert "unclosed event loop" in str(warn.message)
assert not recwarn
|