summaryrefslogtreecommitdiffstats
path: root/psycopg_pool
diff options
context:
space:
mode:
Diffstat (limited to 'psycopg_pool')
-rw-r--r--psycopg_pool/.flake83
-rw-r--r--psycopg_pool/LICENSE.txt165
-rw-r--r--psycopg_pool/README.rst24
-rw-r--r--psycopg_pool/psycopg_pool/__init__.py22
-rw-r--r--psycopg_pool/psycopg_pool/_compat.py51
-rw-r--r--psycopg_pool/psycopg_pool/base.py230
-rw-r--r--psycopg_pool/psycopg_pool/errors.py25
-rw-r--r--psycopg_pool/psycopg_pool/null_pool.py159
-rw-r--r--psycopg_pool/psycopg_pool/null_pool_async.py122
-rw-r--r--psycopg_pool/psycopg_pool/pool.py839
-rw-r--r--psycopg_pool/psycopg_pool/pool_async.py784
-rw-r--r--psycopg_pool/psycopg_pool/py.typed0
-rw-r--r--psycopg_pool/psycopg_pool/sched.py177
-rw-r--r--psycopg_pool/psycopg_pool/version.py13
-rw-r--r--psycopg_pool/setup.cfg45
-rw-r--r--psycopg_pool/setup.py26
16 files changed, 2685 insertions, 0 deletions
diff --git a/psycopg_pool/.flake8 b/psycopg_pool/.flake8
new file mode 100644
index 0000000..2ae629c
--- /dev/null
+++ b/psycopg_pool/.flake8
@@ -0,0 +1,3 @@
+[flake8]
+max-line-length = 88
+ignore = W503, E203
diff --git a/psycopg_pool/LICENSE.txt b/psycopg_pool/LICENSE.txt
new file mode 100644
index 0000000..0a04128
--- /dev/null
+++ b/psycopg_pool/LICENSE.txt
@@ -0,0 +1,165 @@
+ GNU LESSER GENERAL PUBLIC LICENSE
+ Version 3, 29 June 2007
+
+ Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+
+ This version of the GNU Lesser General Public License incorporates
+the terms and conditions of version 3 of the GNU General Public
+License, supplemented by the additional permissions listed below.
+
+ 0. Additional Definitions.
+
+ As used herein, "this License" refers to version 3 of the GNU Lesser
+General Public License, and the "GNU GPL" refers to version 3 of the GNU
+General Public License.
+
+ "The Library" refers to a covered work governed by this License,
+other than an Application or a Combined Work as defined below.
+
+ An "Application" is any work that makes use of an interface provided
+by the Library, but which is not otherwise based on the Library.
+Defining a subclass of a class defined by the Library is deemed a mode
+of using an interface provided by the Library.
+
+ A "Combined Work" is a work produced by combining or linking an
+Application with the Library. The particular version of the Library
+with which the Combined Work was made is also called the "Linked
+Version".
+
+ The "Minimal Corresponding Source" for a Combined Work means the
+Corresponding Source for the Combined Work, excluding any source code
+for portions of the Combined Work that, considered in isolation, are
+based on the Application, and not on the Linked Version.
+
+ The "Corresponding Application Code" for a Combined Work means the
+object code and/or source code for the Application, including any data
+and utility programs needed for reproducing the Combined Work from the
+Application, but excluding the System Libraries of the Combined Work.
+
+ 1. Exception to Section 3 of the GNU GPL.
+
+ You may convey a covered work under sections 3 and 4 of this License
+without being bound by section 3 of the GNU GPL.
+
+ 2. Conveying Modified Versions.
+
+ If you modify a copy of the Library, and, in your modifications, a
+facility refers to a function or data to be supplied by an Application
+that uses the facility (other than as an argument passed when the
+facility is invoked), then you may convey a copy of the modified
+version:
+
+ a) under this License, provided that you make a good faith effort to
+ ensure that, in the event an Application does not supply the
+ function or data, the facility still operates, and performs
+ whatever part of its purpose remains meaningful, or
+
+ b) under the GNU GPL, with none of the additional permissions of
+ this License applicable to that copy.
+
+ 3. Object Code Incorporating Material from Library Header Files.
+
+ The object code form of an Application may incorporate material from
+a header file that is part of the Library. You may convey such object
+code under terms of your choice, provided that, if the incorporated
+material is not limited to numerical parameters, data structure
+layouts and accessors, or small macros, inline functions and templates
+(ten or fewer lines in length), you do both of the following:
+
+ a) Give prominent notice with each copy of the object code that the
+ Library is used in it and that the Library and its use are
+ covered by this License.
+
+ b) Accompany the object code with a copy of the GNU GPL and this license
+ document.
+
+ 4. Combined Works.
+
+ You may convey a Combined Work under terms of your choice that,
+taken together, effectively do not restrict modification of the
+portions of the Library contained in the Combined Work and reverse
+engineering for debugging such modifications, if you also do each of
+the following:
+
+ a) Give prominent notice with each copy of the Combined Work that
+ the Library is used in it and that the Library and its use are
+ covered by this License.
+
+ b) Accompany the Combined Work with a copy of the GNU GPL and this license
+ document.
+
+ c) For a Combined Work that displays copyright notices during
+ execution, include the copyright notice for the Library among
+ these notices, as well as a reference directing the user to the
+ copies of the GNU GPL and this license document.
+
+ d) Do one of the following:
+
+ 0) Convey the Minimal Corresponding Source under the terms of this
+ License, and the Corresponding Application Code in a form
+ suitable for, and under terms that permit, the user to
+ recombine or relink the Application with a modified version of
+ the Linked Version to produce a modified Combined Work, in the
+ manner specified by section 6 of the GNU GPL for conveying
+ Corresponding Source.
+
+ 1) Use a suitable shared library mechanism for linking with the
+ Library. A suitable mechanism is one that (a) uses at run time
+ a copy of the Library already present on the user's computer
+ system, and (b) will operate properly with a modified version
+ of the Library that is interface-compatible with the Linked
+ Version.
+
+ e) Provide Installation Information, but only if you would otherwise
+ be required to provide such information under section 6 of the
+ GNU GPL, and only to the extent that such information is
+ necessary to install and execute a modified version of the
+ Combined Work produced by recombining or relinking the
+ Application with a modified version of the Linked Version. (If
+ you use option 4d0, the Installation Information must accompany
+ the Minimal Corresponding Source and Corresponding Application
+ Code. If you use option 4d1, you must provide the Installation
+ Information in the manner specified by section 6 of the GNU GPL
+ for conveying Corresponding Source.)
+
+ 5. Combined Libraries.
+
+ You may place library facilities that are a work based on the
+Library side by side in a single library together with other library
+facilities that are not Applications and are not covered by this
+License, and convey such a combined library under terms of your
+choice, if you do both of the following:
+
+ a) Accompany the combined library with a copy of the same work based
+ on the Library, uncombined with any other library facilities,
+ conveyed under the terms of this License.
+
+ b) Give prominent notice with the combined library that part of it
+ is a work based on the Library, and explaining where to find the
+ accompanying uncombined form of the same work.
+
+ 6. Revised Versions of the GNU Lesser General Public License.
+
+ The Free Software Foundation may publish revised and/or new versions
+of the GNU Lesser General Public License from time to time. Such new
+versions will be similar in spirit to the present version, but may
+differ in detail to address new problems or concerns.
+
+ Each version is given a distinguishing version number. If the
+Library as you received it specifies that a certain numbered version
+of the GNU Lesser General Public License "or any later version"
+applies to it, you have the option of following the terms and
+conditions either of that published version or of any later version
+published by the Free Software Foundation. If the Library as you
+received it does not specify a version number of the GNU Lesser
+General Public License, you may choose any version of the GNU Lesser
+General Public License ever published by the Free Software Foundation.
+
+ If the Library as you received it specifies that a proxy can decide
+whether future versions of the GNU Lesser General Public License shall
+apply, that proxy's public statement of acceptance of any version is
+permanent authorization for you to choose that version for the
+Library.
diff --git a/psycopg_pool/README.rst b/psycopg_pool/README.rst
new file mode 100644
index 0000000..6e6b32c
--- /dev/null
+++ b/psycopg_pool/README.rst
@@ -0,0 +1,24 @@
+Psycopg 3: PostgreSQL database adapter for Python - Connection Pool
+===================================================================
+
+This distribution contains the optional connection pool package
+`psycopg_pool`__.
+
+.. __: https://www.psycopg.org/psycopg3/docs/advanced/pool.html
+
+This package is kept separate from the main ``psycopg`` package because it is
+likely that it will follow a different release cycle.
+
+You can also install this package using::
+
+ pip install "psycopg[pool]"
+
+Please read `the project readme`__ and `the installation documentation`__ for
+more details.
+
+.. __: https://github.com/psycopg/psycopg#readme
+.. __: https://www.psycopg.org/psycopg3/docs/basic/install.html
+ #installing-the-connection-pool
+
+
+Copyright (C) 2020 The Psycopg Team
diff --git a/psycopg_pool/psycopg_pool/__init__.py b/psycopg_pool/psycopg_pool/__init__.py
new file mode 100644
index 0000000..e4d975f
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/__init__.py
@@ -0,0 +1,22 @@
+"""
+psycopg connection pool package
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+from .pool import ConnectionPool
+from .pool_async import AsyncConnectionPool
+from .null_pool import NullConnectionPool
+from .null_pool_async import AsyncNullConnectionPool
+from .errors import PoolClosed, PoolTimeout, TooManyRequests
+from .version import __version__ as __version__ # noqa: F401
+
+__all__ = [
+ "AsyncConnectionPool",
+ "AsyncNullConnectionPool",
+ "ConnectionPool",
+ "NullConnectionPool",
+ "PoolClosed",
+ "PoolTimeout",
+ "TooManyRequests",
+]
diff --git a/psycopg_pool/psycopg_pool/_compat.py b/psycopg_pool/psycopg_pool/_compat.py
new file mode 100644
index 0000000..9fb2b9b
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/_compat.py
@@ -0,0 +1,51 @@
+"""
+compatibility functions for different Python versions
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import sys
+import asyncio
+from typing import Any, Awaitable, Generator, Optional, Union, Type, TypeVar
+from typing_extensions import TypeAlias
+
+import psycopg.errors as e
+
+T = TypeVar("T")
+FutureT: TypeAlias = Union["asyncio.Future[T]", Generator[Any, None, T], Awaitable[T]]
+
+if sys.version_info >= (3, 8):
+ create_task = asyncio.create_task
+ Task = asyncio.Task
+
+else:
+
+ def create_task(
+ coro: FutureT[T], name: Optional[str] = None
+ ) -> "asyncio.Future[T]":
+ return asyncio.create_task(coro)
+
+ Task = asyncio.Future
+
+if sys.version_info >= (3, 9):
+ from collections import Counter, deque as Deque
+else:
+ from typing import Counter, Deque
+
+__all__ = [
+ "Counter",
+ "Deque",
+ "Task",
+ "create_task",
+]
+
+# Workaround for psycopg < 3.0.8.
+# Timeout on NullPool connection mignt not work correctly.
+try:
+ ConnectionTimeout: Type[e.OperationalError] = e.ConnectionTimeout
+except AttributeError:
+
+ class DummyConnectionTimeout(e.OperationalError):
+ pass
+
+ ConnectionTimeout = DummyConnectionTimeout
diff --git a/psycopg_pool/psycopg_pool/base.py b/psycopg_pool/psycopg_pool/base.py
new file mode 100644
index 0000000..298ea68
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/base.py
@@ -0,0 +1,230 @@
+"""
+psycopg connection pool base class and functionalities.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+from time import monotonic
+from random import random
+from typing import Any, Callable, Dict, Generic, Optional, Tuple
+
+from psycopg import errors as e
+from psycopg.abc import ConnectionType
+
+from .errors import PoolClosed
+from ._compat import Counter, Deque
+
+
+class BasePool(Generic[ConnectionType]):
+
+ # Used to generate pool names
+ _num_pool = 0
+
+ # Stats keys
+ _POOL_MIN = "pool_min"
+ _POOL_MAX = "pool_max"
+ _POOL_SIZE = "pool_size"
+ _POOL_AVAILABLE = "pool_available"
+ _REQUESTS_WAITING = "requests_waiting"
+ _REQUESTS_NUM = "requests_num"
+ _REQUESTS_QUEUED = "requests_queued"
+ _REQUESTS_WAIT_MS = "requests_wait_ms"
+ _REQUESTS_ERRORS = "requests_errors"
+ _USAGE_MS = "usage_ms"
+ _RETURNS_BAD = "returns_bad"
+ _CONNECTIONS_NUM = "connections_num"
+ _CONNECTIONS_MS = "connections_ms"
+ _CONNECTIONS_ERRORS = "connections_errors"
+ _CONNECTIONS_LOST = "connections_lost"
+
+ def __init__(
+ self,
+ conninfo: str = "",
+ *,
+ kwargs: Optional[Dict[str, Any]] = None,
+ min_size: int = 4,
+ max_size: Optional[int] = None,
+ open: bool = True,
+ name: Optional[str] = None,
+ timeout: float = 30.0,
+ max_waiting: int = 0,
+ max_lifetime: float = 60 * 60.0,
+ max_idle: float = 10 * 60.0,
+ reconnect_timeout: float = 5 * 60.0,
+ reconnect_failed: Optional[Callable[["BasePool[ConnectionType]"], None]] = None,
+ num_workers: int = 3,
+ ):
+ min_size, max_size = self._check_size(min_size, max_size)
+
+ if not name:
+ num = BasePool._num_pool = BasePool._num_pool + 1
+ name = f"pool-{num}"
+
+ if num_workers < 1:
+ raise ValueError("num_workers must be at least 1")
+
+ self.conninfo = conninfo
+ self.kwargs: Dict[str, Any] = kwargs or {}
+ self._reconnect_failed: Callable[["BasePool[ConnectionType]"], None]
+ self._reconnect_failed = reconnect_failed or (lambda pool: None)
+ self.name = name
+ self._min_size = min_size
+ self._max_size = max_size
+ self.timeout = timeout
+ self.max_waiting = max_waiting
+ self.reconnect_timeout = reconnect_timeout
+ self.max_lifetime = max_lifetime
+ self.max_idle = max_idle
+ self.num_workers = num_workers
+
+ self._nconns = min_size # currently in the pool, out, being prepared
+ self._pool = Deque[ConnectionType]()
+ self._stats = Counter[str]()
+
+ # Min number of connections in the pool in a max_idle unit of time.
+ # It is reset periodically by the ShrinkPool scheduled task.
+ # It is used to shrink back the pool if maxcon > min_size and extra
+ # connections have been acquired, if we notice that in the last
+ # max_idle interval they weren't all used.
+ self._nconns_min = min_size
+
+ # Flag to allow the pool to grow only one connection at time. In case
+ # of spike, if threads are allowed to grow in parallel and connection
+ # time is slow, there won't be any thread available to return the
+ # connections to the pool.
+ self._growing = False
+
+ self._opened = False
+ self._closed = True
+
+ def __repr__(self) -> str:
+ return (
+ f"<{self.__class__.__module__}.{self.__class__.__name__}"
+ f" {self.name!r} at 0x{id(self):x}>"
+ )
+
+ @property
+ def min_size(self) -> int:
+ return self._min_size
+
+ @property
+ def max_size(self) -> int:
+ return self._max_size
+
+ @property
+ def closed(self) -> bool:
+ """`!True` if the pool is closed."""
+ return self._closed
+
+ def _check_size(self, min_size: int, max_size: Optional[int]) -> Tuple[int, int]:
+ if max_size is None:
+ max_size = min_size
+
+ if min_size < 0:
+ raise ValueError("min_size cannot be negative")
+ if max_size < min_size:
+ raise ValueError("max_size must be greater or equal than min_size")
+ if min_size == max_size == 0:
+ raise ValueError("if min_size is 0 max_size must be greater or than 0")
+
+ return min_size, max_size
+
+ def _check_open(self) -> None:
+ if self._closed and self._opened:
+ raise e.OperationalError(
+ "pool has already been opened/closed and cannot be reused"
+ )
+
+ def _check_open_getconn(self) -> None:
+ if self._closed:
+ if self._opened:
+ raise PoolClosed(f"the pool {self.name!r} is already closed")
+ else:
+ raise PoolClosed(f"the pool {self.name!r} is not open yet")
+
+ def _check_pool_putconn(self, conn: ConnectionType) -> None:
+ pool = getattr(conn, "_pool", None)
+ if pool is self:
+ return
+
+ if pool:
+ msg = f"it comes from pool {pool.name!r}"
+ else:
+ msg = "it doesn't come from any pool"
+ raise ValueError(
+ f"can't return connection to pool {self.name!r}, {msg}: {conn}"
+ )
+
+ def get_stats(self) -> Dict[str, int]:
+ """
+ Return current stats about the pool usage.
+ """
+ rv = dict(self._stats)
+ rv.update(self._get_measures())
+ return rv
+
+ def pop_stats(self) -> Dict[str, int]:
+ """
+ Return current stats about the pool usage.
+
+ After the call, all the counters are reset to zero.
+ """
+ stats, self._stats = self._stats, Counter()
+ rv = dict(stats)
+ rv.update(self._get_measures())
+ return rv
+
+ def _get_measures(self) -> Dict[str, int]:
+ """
+ Return immediate measures of the pool (not counters).
+ """
+ return {
+ self._POOL_MIN: self._min_size,
+ self._POOL_MAX: self._max_size,
+ self._POOL_SIZE: self._nconns,
+ self._POOL_AVAILABLE: len(self._pool),
+ }
+
+ @classmethod
+ def _jitter(cls, value: float, min_pc: float, max_pc: float) -> float:
+ """
+ Add a random value to *value* between *min_pc* and *max_pc* percent.
+ """
+ return value * (1.0 + ((max_pc - min_pc) * random()) + min_pc)
+
+ def _set_connection_expiry_date(self, conn: ConnectionType) -> None:
+ """Set an expiry date on a connection.
+
+ Add some randomness to avoid mass reconnection.
+ """
+ conn._expire_at = monotonic() + self._jitter(self.max_lifetime, -0.05, 0.0)
+
+
+class ConnectionAttempt:
+ """Keep the state of a connection attempt."""
+
+ INITIAL_DELAY = 1.0
+ DELAY_JITTER = 0.1
+ DELAY_BACKOFF = 2.0
+
+ def __init__(self, *, reconnect_timeout: float):
+ self.reconnect_timeout = reconnect_timeout
+ self.delay = 0.0
+ self.give_up_at = 0.0
+
+ def update_delay(self, now: float) -> None:
+ """Calculate how long to wait for a new connection attempt"""
+ if self.delay == 0.0:
+ self.give_up_at = now + self.reconnect_timeout
+ self.delay = BasePool._jitter(
+ self.INITIAL_DELAY, -self.DELAY_JITTER, self.DELAY_JITTER
+ )
+ else:
+ self.delay *= self.DELAY_BACKOFF
+
+ if self.delay + now > self.give_up_at:
+ self.delay = max(0.0, self.give_up_at - now)
+
+ def time_to_give_up(self, now: float) -> bool:
+ """Return True if we are tired of trying to connect. Meh."""
+ return self.give_up_at > 0.0 and now >= self.give_up_at
diff --git a/psycopg_pool/psycopg_pool/errors.py b/psycopg_pool/psycopg_pool/errors.py
new file mode 100644
index 0000000..9e672ad
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/errors.py
@@ -0,0 +1,25 @@
+"""
+Connection pool errors.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+from psycopg import errors as e
+
+
+class PoolClosed(e.OperationalError):
+ """Attempt to get a connection from a closed pool."""
+
+ __module__ = "psycopg_pool"
+
+
+class PoolTimeout(e.OperationalError):
+ """The pool couldn't provide a connection in acceptable time."""
+
+ __module__ = "psycopg_pool"
+
+
+class TooManyRequests(e.OperationalError):
+ """Too many requests in the queue waiting for a connection from the pool."""
+
+ __module__ = "psycopg_pool"
diff --git a/psycopg_pool/psycopg_pool/null_pool.py b/psycopg_pool/psycopg_pool/null_pool.py
new file mode 100644
index 0000000..c0a77c2
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/null_pool.py
@@ -0,0 +1,159 @@
+"""
+Psycopg null connection pools
+"""
+
+# Copyright (C) 2022 The Psycopg Team
+
+import logging
+import threading
+from typing import Any, Optional, Tuple
+
+from psycopg import Connection
+from psycopg.pq import TransactionStatus
+
+from .pool import ConnectionPool, AddConnection
+from .errors import PoolTimeout, TooManyRequests
+from ._compat import ConnectionTimeout
+
+logger = logging.getLogger("psycopg.pool")
+
+
+class _BaseNullConnectionPool:
+ def __init__(
+ self, conninfo: str = "", min_size: int = 0, *args: Any, **kwargs: Any
+ ):
+ super().__init__( # type: ignore[call-arg]
+ conninfo, *args, min_size=min_size, **kwargs
+ )
+
+ def _check_size(self, min_size: int, max_size: Optional[int]) -> Tuple[int, int]:
+ if max_size is None:
+ max_size = min_size
+
+ if min_size != 0:
+ raise ValueError("null pools must have min_size = 0")
+ if max_size < min_size:
+ raise ValueError("max_size must be greater or equal than min_size")
+
+ return min_size, max_size
+
+ def _start_initial_tasks(self) -> None:
+ # Null pools don't have background tasks to fill connections
+ # or to grow/shrink.
+ return
+
+ def _maybe_grow_pool(self) -> None:
+ # null pools don't grow
+ pass
+
+
+class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool):
+ def wait(self, timeout: float = 30.0) -> None:
+ """
+ Create a connection for test.
+
+ Calling this function will verify that the connectivity with the
+ database works as expected. However the connection will not be stored
+ in the pool.
+
+ Close the pool, and raise `PoolTimeout`, if not ready within *timeout*
+ sec.
+ """
+ self._check_open_getconn()
+
+ with self._lock:
+ assert not self._pool_full_event
+ self._pool_full_event = threading.Event()
+
+ logger.info("waiting for pool %r initialization", self.name)
+ self.run_task(AddConnection(self))
+ if not self._pool_full_event.wait(timeout):
+ self.close() # stop all the threads
+ raise PoolTimeout(f"pool initialization incomplete after {timeout} sec")
+
+ with self._lock:
+ assert self._pool_full_event
+ self._pool_full_event = None
+
+ logger.info("pool %r is ready to use", self.name)
+
+ def _get_ready_connection(
+ self, timeout: Optional[float]
+ ) -> Optional[Connection[Any]]:
+ conn: Optional[Connection[Any]] = None
+ if self.max_size == 0 or self._nconns < self.max_size:
+ # Create a new connection for the client
+ try:
+ conn = self._connect(timeout=timeout)
+ except ConnectionTimeout as ex:
+ raise PoolTimeout(str(ex)) from None
+ self._nconns += 1
+
+ elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise TooManyRequests(
+ f"the pool {self.name!r} has already"
+ f" {len(self._waiting)} requests waiting"
+ )
+ return conn
+
+ def _maybe_close_connection(self, conn: Connection[Any]) -> bool:
+ with self._lock:
+ if not self._closed and self._waiting:
+ return False
+
+ conn._pool = None
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
+ conn.close()
+ self._nconns -= 1
+ return True
+
+ def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
+ """Change the size of the pool during runtime.
+
+ Only *max_size* can be changed; *min_size* must remain 0.
+ """
+ min_size, max_size = self._check_size(min_size, max_size)
+
+ logger.info(
+ "resizing %r to min_size=%s max_size=%s",
+ self.name,
+ min_size,
+ max_size,
+ )
+ with self._lock:
+ self._min_size = min_size
+ self._max_size = max_size
+
+ def check(self) -> None:
+ """No-op, as the pool doesn't have connections in its state."""
+ pass
+
+ def _add_to_pool(self, conn: Connection[Any]) -> None:
+ # Remove the pool reference from the connection before returning it
+ # to the state, to avoid to create a reference loop.
+ # Also disable the warning for open connection in conn.__del__
+ conn._pool = None
+
+ # Critical section: if there is a client waiting give it the connection
+ # otherwise put it back into the pool.
+ with self._lock:
+ while self._waiting:
+ # If there is a client waiting (which is still waiting and
+ # hasn't timed out), give it the connection and notify it.
+ pos = self._waiting.popleft()
+ if pos.set(conn):
+ break
+ else:
+ # No client waiting for a connection: close the connection
+ conn.close()
+
+ # If we have been asked to wait for pool init, notify the
+ # waiter if the pool is ready.
+ if self._pool_full_event:
+ self._pool_full_event.set()
+ else:
+ # The connection created by wait shouldn't decrease the
+ # count of the number of connection used.
+ self._nconns -= 1
diff --git a/psycopg_pool/psycopg_pool/null_pool_async.py b/psycopg_pool/psycopg_pool/null_pool_async.py
new file mode 100644
index 0000000..ae9d207
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/null_pool_async.py
@@ -0,0 +1,122 @@
+"""
+psycopg asynchronous null connection pool
+"""
+
+# Copyright (C) 2022 The Psycopg Team
+
+import asyncio
+import logging
+from typing import Any, Optional
+
+from psycopg import AsyncConnection
+from psycopg.pq import TransactionStatus
+
+from .errors import PoolTimeout, TooManyRequests
+from ._compat import ConnectionTimeout
+from .null_pool import _BaseNullConnectionPool
+from .pool_async import AsyncConnectionPool, AddConnection
+
+logger = logging.getLogger("psycopg.pool")
+
+
+class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool):
+ async def wait(self, timeout: float = 30.0) -> None:
+ self._check_open_getconn()
+
+ async with self._lock:
+ assert not self._pool_full_event
+ self._pool_full_event = asyncio.Event()
+
+ logger.info("waiting for pool %r initialization", self.name)
+ self.run_task(AddConnection(self))
+ try:
+ await asyncio.wait_for(self._pool_full_event.wait(), timeout)
+ except asyncio.TimeoutError:
+ await self.close() # stop all the tasks
+ raise PoolTimeout(
+ f"pool initialization incomplete after {timeout} sec"
+ ) from None
+
+ async with self._lock:
+ assert self._pool_full_event
+ self._pool_full_event = None
+
+ logger.info("pool %r is ready to use", self.name)
+
+ async def _get_ready_connection(
+ self, timeout: Optional[float]
+ ) -> Optional[AsyncConnection[Any]]:
+ conn: Optional[AsyncConnection[Any]] = None
+ if self.max_size == 0 or self._nconns < self.max_size:
+ # Create a new connection for the client
+ try:
+ conn = await self._connect(timeout=timeout)
+ except ConnectionTimeout as ex:
+ raise PoolTimeout(str(ex)) from None
+ self._nconns += 1
+ elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise TooManyRequests(
+ f"the pool {self.name!r} has already"
+ f" {len(self._waiting)} requests waiting"
+ )
+ return conn
+
+ async def _maybe_close_connection(self, conn: AsyncConnection[Any]) -> bool:
+ # Close the connection if no client is waiting for it, or if the pool
+ # is closed. For extra refcare remove the pool reference from it.
+ # Maintain the stats.
+ async with self._lock:
+ if not self._closed and self._waiting:
+ return False
+
+ conn._pool = None
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
+ await conn.close()
+ self._nconns -= 1
+ return True
+
+ async def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
+ min_size, max_size = self._check_size(min_size, max_size)
+
+ logger.info(
+ "resizing %r to min_size=%s max_size=%s",
+ self.name,
+ min_size,
+ max_size,
+ )
+ async with self._lock:
+ self._min_size = min_size
+ self._max_size = max_size
+
+ async def check(self) -> None:
+ pass
+
+ async def _add_to_pool(self, conn: AsyncConnection[Any]) -> None:
+ # Remove the pool reference from the connection before returning it
+ # to the state, to avoid to create a reference loop.
+ # Also disable the warning for open connection in conn.__del__
+ conn._pool = None
+
+ # Critical section: if there is a client waiting give it the connection
+ # otherwise put it back into the pool.
+ async with self._lock:
+ while self._waiting:
+ # If there is a client waiting (which is still waiting and
+ # hasn't timed out), give it the connection and notify it.
+ pos = self._waiting.popleft()
+ if await pos.set(conn):
+ break
+ else:
+ # No client waiting for a connection: close the connection
+ await conn.close()
+
+ # If we have been asked to wait for pool init, notify the
+ # waiter if the pool is ready.
+ if self._pool_full_event:
+ self._pool_full_event.set()
+ else:
+ # The connection created by wait shouldn't decrease the
+ # count of the number of connection used.
+ self._nconns -= 1
diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py
new file mode 100644
index 0000000..609d95d
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/pool.py
@@ -0,0 +1,839 @@
+"""
+psycopg synchronous connection pool
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import logging
+import threading
+from abc import ABC, abstractmethod
+from time import monotonic
+from queue import Queue, Empty
+from types import TracebackType
+from typing import Any, Callable, Dict, Iterator, List
+from typing import Optional, Sequence, Type
+from weakref import ref
+from contextlib import contextmanager
+
+from psycopg import errors as e
+from psycopg import Connection
+from psycopg.pq import TransactionStatus
+
+from .base import ConnectionAttempt, BasePool
+from .sched import Scheduler
+from .errors import PoolClosed, PoolTimeout, TooManyRequests
+from ._compat import Deque
+
+logger = logging.getLogger("psycopg.pool")
+
+
+class ConnectionPool(BasePool[Connection[Any]]):
+ def __init__(
+ self,
+ conninfo: str = "",
+ *,
+ open: bool = True,
+ connection_class: Type[Connection[Any]] = Connection,
+ configure: Optional[Callable[[Connection[Any]], None]] = None,
+ reset: Optional[Callable[[Connection[Any]], None]] = None,
+ **kwargs: Any,
+ ):
+ self.connection_class = connection_class
+ self._configure = configure
+ self._reset = reset
+
+ self._lock = threading.RLock()
+ self._waiting = Deque["WaitingClient"]()
+
+ # to notify that the pool is full
+ self._pool_full_event: Optional[threading.Event] = None
+
+ self._sched = Scheduler()
+ self._sched_runner: Optional[threading.Thread] = None
+ self._tasks: "Queue[MaintenanceTask]" = Queue()
+ self._workers: List[threading.Thread] = []
+
+ super().__init__(conninfo, **kwargs)
+
+ if open:
+ self.open()
+
+ def __del__(self) -> None:
+ # If the '_closed' property is not set we probably failed in __init__.
+ # Don't try anything complicated as probably it won't work.
+ if getattr(self, "_closed", True):
+ return
+
+ self._stop_workers()
+
+ def wait(self, timeout: float = 30.0) -> None:
+ """
+ Wait for the pool to be full (with `min_size` connections) after creation.
+
+ Close the pool, and raise `PoolTimeout`, if not ready within *timeout*
+ sec.
+
+ Calling this method is not mandatory: you can try and use the pool
+ immediately after its creation. The first client will be served as soon
+ as a connection is ready. You can use this method if you prefer your
+ program to terminate in case the environment is not configured
+ properly, rather than trying to stay up the hardest it can.
+ """
+ self._check_open_getconn()
+
+ with self._lock:
+ assert not self._pool_full_event
+ if len(self._pool) >= self._min_size:
+ return
+ self._pool_full_event = threading.Event()
+
+ logger.info("waiting for pool %r initialization", self.name)
+ if not self._pool_full_event.wait(timeout):
+ self.close() # stop all the threads
+ raise PoolTimeout(f"pool initialization incomplete after {timeout} sec")
+
+ with self._lock:
+ assert self._pool_full_event
+ self._pool_full_event = None
+
+ logger.info("pool %r is ready to use", self.name)
+
+ @contextmanager
+ def connection(self, timeout: Optional[float] = None) -> Iterator[Connection[Any]]:
+ """Context manager to obtain a connection from the pool.
+
+ Return the connection immediately if available, otherwise wait up to
+ *timeout* or `self.timeout` seconds and throw `PoolTimeout` if a
+ connection is not available in time.
+
+ Upon context exit, return the connection to the pool. Apply the normal
+ :ref:`connection context behaviour <with-connection>` (commit/rollback
+ the transaction in case of success/error). If the connection is no more
+ in working state, replace it with a new one.
+ """
+ conn = self.getconn(timeout=timeout)
+ t0 = monotonic()
+ try:
+ with conn:
+ yield conn
+ finally:
+ t1 = monotonic()
+ self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
+ self.putconn(conn)
+
+ def getconn(self, timeout: Optional[float] = None) -> Connection[Any]:
+ """Obtain a connection from the pool.
+
+ You should preferably use `connection()`. Use this function only if
+ it is not possible to use the connection as context manager.
+
+ After using this function you *must* call a corresponding `putconn()`:
+ failing to do so will deplete the pool. A depleted pool is a sad pool:
+ you don't want a depleted pool.
+ """
+ logger.info("connection requested from %r", self.name)
+ self._stats[self._REQUESTS_NUM] += 1
+
+ # Critical section: decide here if there's a connection ready
+ # or if the client needs to wait.
+ with self._lock:
+ self._check_open_getconn()
+ conn = self._get_ready_connection(timeout)
+ if not conn:
+ # No connection available: put the client in the waiting queue
+ t0 = monotonic()
+ pos = WaitingClient()
+ self._waiting.append(pos)
+ self._stats[self._REQUESTS_QUEUED] += 1
+
+ # If there is space for the pool to grow, let's do it
+ self._maybe_grow_pool()
+
+ # If we are in the waiting queue, wait to be assigned a connection
+ # (outside the critical section, so only the waiting client is locked)
+ if not conn:
+ if timeout is None:
+ timeout = self.timeout
+ try:
+ conn = pos.wait(timeout=timeout)
+ except Exception:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise
+ finally:
+ t1 = monotonic()
+ self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
+
+ # Tell the connection it belongs to a pool to avoid closing on __exit__
+ # Note that this property shouldn't be set while the connection is in
+ # the pool, to avoid to create a reference loop.
+ conn._pool = self
+ logger.info("connection given by %r", self.name)
+ return conn
+
+ def _get_ready_connection(
+ self, timeout: Optional[float]
+ ) -> Optional[Connection[Any]]:
+ """Return a connection, if the client deserves one."""
+ conn: Optional[Connection[Any]] = None
+ if self._pool:
+ # Take a connection ready out of the pool
+ conn = self._pool.popleft()
+ if len(self._pool) < self._nconns_min:
+ self._nconns_min = len(self._pool)
+ elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise TooManyRequests(
+ f"the pool {self.name!r} has already"
+ f" {len(self._waiting)} requests waiting"
+ )
+ return conn
+
+ def _maybe_grow_pool(self) -> None:
+ # Allow only one thread at time to grow the pool (or returning
+ # connections might be starved).
+ if self._nconns >= self._max_size or self._growing:
+ return
+ self._nconns += 1
+ logger.info("growing pool %r to %s", self.name, self._nconns)
+ self._growing = True
+ self.run_task(AddConnection(self, growing=True))
+
+ def putconn(self, conn: Connection[Any]) -> None:
+ """Return a connection to the loving hands of its pool.
+
+ Use this function only paired with a `getconn()`. You don't need to use
+ it if you use the much more comfortable `connection()` context manager.
+ """
+ # Quick check to discard the wrong connection
+ self._check_pool_putconn(conn)
+
+ logger.info("returning connection to %r", self.name)
+
+ if self._maybe_close_connection(conn):
+ return
+
+ # Use a worker to perform eventual maintenance work in a separate thread
+ if self._reset:
+ self.run_task(ReturnConnection(self, conn))
+ else:
+ self._return_connection(conn)
+
+ def _maybe_close_connection(self, conn: Connection[Any]) -> bool:
+ """Close a returned connection if necessary.
+
+ Return `!True if the connection was closed.
+ """
+ # If the pool is closed just close the connection instead of returning
+ # it to the pool. For extra refcare remove the pool reference from it.
+ if not self._closed:
+ return False
+
+ conn._pool = None
+ conn.close()
+ return True
+
+ def open(self, wait: bool = False, timeout: float = 30.0) -> None:
+ """Open the pool by starting connecting and and accepting clients.
+
+ If *wait* is `!False`, return immediately and let the background worker
+ fill the pool if `min_size` > 0. Otherwise wait up to *timeout* seconds
+ for the requested number of connections to be ready (see `wait()` for
+ details).
+
+ It is safe to call `!open()` again on a pool already open (because the
+ method was already called, or because the pool context was entered, or
+ because the pool was initialized with *open* = `!True`) but you cannot
+ currently re-open a closed pool.
+ """
+ with self._lock:
+ self._open()
+
+ if wait:
+ self.wait(timeout=timeout)
+
+ def _open(self) -> None:
+ if not self._closed:
+ return
+
+ self._check_open()
+
+ self._closed = False
+ self._opened = True
+
+ self._start_workers()
+ self._start_initial_tasks()
+
+ def _start_workers(self) -> None:
+ self._sched_runner = threading.Thread(
+ target=self._sched.run,
+ name=f"{self.name}-scheduler",
+ daemon=True,
+ )
+ assert not self._workers
+ for i in range(self.num_workers):
+ t = threading.Thread(
+ target=self.worker,
+ args=(self._tasks,),
+ name=f"{self.name}-worker-{i}",
+ daemon=True,
+ )
+ self._workers.append(t)
+
+ # The object state is complete. Start the worker threads
+ self._sched_runner.start()
+ for t in self._workers:
+ t.start()
+
+ def _start_initial_tasks(self) -> None:
+ # populate the pool with initial min_size connections in background
+ for i in range(self._nconns):
+ self.run_task(AddConnection(self))
+
+ # Schedule a task to shrink the pool if connections over min_size have
+ # remained unused.
+ self.schedule_task(ShrinkPool(self), self.max_idle)
+
+ def close(self, timeout: float = 5.0) -> None:
+ """Close the pool and make it unavailable to new clients.
+
+ All the waiting and future clients will fail to acquire a connection
+ with a `PoolClosed` exception. Currently used connections will not be
+ closed until returned to the pool.
+
+ Wait *timeout* seconds for threads to terminate their job, if positive.
+ If the timeout expires the pool is closed anyway, although it may raise
+ some warnings on exit.
+ """
+ if self._closed:
+ return
+
+ with self._lock:
+ self._closed = True
+ logger.debug("pool %r closed", self.name)
+
+ # Take waiting client and pool connections out of the state
+ waiting = list(self._waiting)
+ self._waiting.clear()
+ connections = list(self._pool)
+ self._pool.clear()
+
+ # Now that the flag _closed is set, getconn will fail immediately,
+ # putconn will just close the returned connection.
+ self._stop_workers(waiting, connections, timeout)
+
+ def _stop_workers(
+ self,
+ waiting_clients: Sequence["WaitingClient"] = (),
+ connections: Sequence[Connection[Any]] = (),
+ timeout: float = 0.0,
+ ) -> None:
+
+ # Stop the scheduler
+ self._sched.enter(0, None)
+
+ # Stop the worker threads
+ workers, self._workers = self._workers[:], []
+ for i in range(len(workers)):
+ self.run_task(StopWorker(self))
+
+ # Signal to eventual clients in the queue that business is closed.
+ for pos in waiting_clients:
+ pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+
+ # Close the connections still in the pool
+ for conn in connections:
+ conn.close()
+
+ # Wait for the worker threads to terminate
+ assert self._sched_runner is not None
+ sched_runner, self._sched_runner = self._sched_runner, None
+ if timeout > 0:
+ for t in [sched_runner] + workers:
+ if not t.is_alive():
+ continue
+ t.join(timeout)
+ if t.is_alive():
+ logger.warning(
+ "couldn't stop thread %s in pool %r within %s seconds",
+ t,
+ self.name,
+ timeout,
+ )
+
+ def __enter__(self) -> "ConnectionPool":
+ self.open()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.close()
+
+ def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
+ """Change the size of the pool during runtime."""
+ min_size, max_size = self._check_size(min_size, max_size)
+
+ ngrow = max(0, min_size - self._min_size)
+
+ logger.info(
+ "resizing %r to min_size=%s max_size=%s",
+ self.name,
+ min_size,
+ max_size,
+ )
+ with self._lock:
+ self._min_size = min_size
+ self._max_size = max_size
+ self._nconns += ngrow
+
+ for i in range(ngrow):
+ self.run_task(AddConnection(self))
+
+ def check(self) -> None:
+ """Verify the state of the connections currently in the pool.
+
+ Test each connection: if it works return it to the pool, otherwise
+ dispose of it and create a new one.
+ """
+ with self._lock:
+ conns = list(self._pool)
+ self._pool.clear()
+
+ # Give a chance to the pool to grow if it has no connection.
+ # In case there are enough connection, or the pool is already
+ # growing, this is a no-op.
+ self._maybe_grow_pool()
+
+ while conns:
+ conn = conns.pop()
+ try:
+ conn.execute("SELECT 1")
+ if conn.pgconn.transaction_status == TransactionStatus.INTRANS:
+ conn.rollback()
+ except Exception:
+ self._stats[self._CONNECTIONS_LOST] += 1
+ logger.warning("discarding broken connection: %s", conn)
+ self.run_task(AddConnection(self))
+ else:
+ self._add_to_pool(conn)
+
+ def reconnect_failed(self) -> None:
+ """
+ Called when reconnection failed for longer than `reconnect_timeout`.
+ """
+ self._reconnect_failed(self)
+
+ def run_task(self, task: "MaintenanceTask") -> None:
+ """Run a maintenance task in a worker thread."""
+ self._tasks.put_nowait(task)
+
+ def schedule_task(self, task: "MaintenanceTask", delay: float) -> None:
+ """Run a maintenance task in a worker thread in the future."""
+ self._sched.enter(delay, task.tick)
+
+ _WORKER_TIMEOUT = 60.0
+
+ @classmethod
+ def worker(cls, q: "Queue[MaintenanceTask]") -> None:
+ """Runner to execute pending maintenance task.
+
+ The function is designed to run as a separate thread.
+
+ Block on the queue *q*, run a task received. Finish running if a
+ StopWorker is received.
+ """
+ # Don't make all the workers time out at the same moment
+ timeout = cls._jitter(cls._WORKER_TIMEOUT, -0.1, 0.1)
+ while True:
+ # Use a timeout to make the wait interruptible
+ try:
+ task = q.get(timeout=timeout)
+ except Empty:
+ continue
+
+ if isinstance(task, StopWorker):
+ logger.debug(
+ "terminating working thread %s",
+ threading.current_thread().name,
+ )
+ return
+
+ # Run the task. Make sure don't die in the attempt.
+ try:
+ task.run()
+ except Exception as ex:
+ logger.warning(
+ "task run %s failed: %s: %s",
+ task,
+ ex.__class__.__name__,
+ ex,
+ )
+
+ def _connect(self, timeout: Optional[float] = None) -> Connection[Any]:
+ """Return a new connection configured for the pool."""
+ self._stats[self._CONNECTIONS_NUM] += 1
+ kwargs = self.kwargs
+ if timeout:
+ kwargs = kwargs.copy()
+ kwargs["connect_timeout"] = max(round(timeout), 1)
+ t0 = monotonic()
+ try:
+ conn: Connection[Any]
+ conn = self.connection_class.connect(self.conninfo, **kwargs)
+ except Exception:
+ self._stats[self._CONNECTIONS_ERRORS] += 1
+ raise
+ else:
+ t1 = monotonic()
+ self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0))
+
+ conn._pool = self
+
+ if self._configure:
+ self._configure(conn)
+ status = conn.pgconn.transaction_status
+ if status != TransactionStatus.IDLE:
+ sname = TransactionStatus(status).name
+ raise e.ProgrammingError(
+ f"connection left in status {sname} by configure function"
+ f" {self._configure}: discarded"
+ )
+
+ # Set an expiry date, with some randomness to avoid mass reconnection
+ self._set_connection_expiry_date(conn)
+ return conn
+
+ def _add_connection(
+ self, attempt: Optional[ConnectionAttempt], growing: bool = False
+ ) -> None:
+ """Try to connect and add the connection to the pool.
+
+ If failed, reschedule a new attempt in the future for a few times, then
+ give up, decrease the pool connections number and call
+ `self.reconnect_failed()`.
+
+ """
+ now = monotonic()
+ if not attempt:
+ attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout)
+
+ try:
+ conn = self._connect()
+ except Exception as ex:
+ logger.warning(f"error connecting in {self.name!r}: {ex}")
+ if attempt.time_to_give_up(now):
+ logger.warning(
+ "reconnection attempt in pool %r failed after %s sec",
+ self.name,
+ self.reconnect_timeout,
+ )
+ with self._lock:
+ self._nconns -= 1
+ # If we have given up with a growing attempt, allow a new one.
+ if growing and self._growing:
+ self._growing = False
+ self.reconnect_failed()
+ else:
+ attempt.update_delay(now)
+ self.schedule_task(
+ AddConnection(self, attempt, growing=growing),
+ attempt.delay,
+ )
+ return
+
+ logger.info("adding new connection to the pool")
+ self._add_to_pool(conn)
+ if growing:
+ with self._lock:
+ # Keep on growing if the pool is not full yet, or if there are
+ # clients waiting and the pool can extend.
+ if self._nconns < self._min_size or (
+ self._nconns < self._max_size and self._waiting
+ ):
+ self._nconns += 1
+ logger.info("growing pool %r to %s", self.name, self._nconns)
+ self.run_task(AddConnection(self, growing=True))
+ else:
+ self._growing = False
+
+ def _return_connection(self, conn: Connection[Any]) -> None:
+ """
+ Return a connection to the pool after usage.
+ """
+ self._reset_connection(conn)
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
+ # Connection no more in working state: create a new one.
+ self.run_task(AddConnection(self))
+ logger.warning("discarding closed connection: %s", conn)
+ return
+
+ # Check if the connection is past its best before date
+ if conn._expire_at <= monotonic():
+ self.run_task(AddConnection(self))
+ logger.info("discarding expired connection")
+ conn.close()
+ return
+
+ self._add_to_pool(conn)
+
+ def _add_to_pool(self, conn: Connection[Any]) -> None:
+ """
+ Add a connection to the pool.
+
+ The connection can be a fresh one or one already used in the pool.
+
+ If a client is already waiting for a connection pass it on, otherwise
+ put it back into the pool
+ """
+ # Remove the pool reference from the connection before returning it
+ # to the state, to avoid to create a reference loop.
+ # Also disable the warning for open connection in conn.__del__
+ conn._pool = None
+
+ # Critical section: if there is a client waiting give it the connection
+ # otherwise put it back into the pool.
+ with self._lock:
+ while self._waiting:
+ # If there is a client waiting (which is still waiting and
+ # hasn't timed out), give it the connection and notify it.
+ pos = self._waiting.popleft()
+ if pos.set(conn):
+ break
+ else:
+ # No client waiting for a connection: put it back into the pool
+ self._pool.append(conn)
+
+ # If we have been asked to wait for pool init, notify the
+ # waiter if the pool is full.
+ if self._pool_full_event and len(self._pool) >= self._min_size:
+ self._pool_full_event.set()
+
+ def _reset_connection(self, conn: Connection[Any]) -> None:
+ """
+ Bring a connection to IDLE state or close it.
+ """
+ status = conn.pgconn.transaction_status
+ if status == TransactionStatus.IDLE:
+ pass
+
+ elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
+ # Connection returned with an active transaction
+ logger.warning("rolling back returned connection: %s", conn)
+ try:
+ conn.rollback()
+ except Exception as ex:
+ logger.warning(
+ "rollback failed: %s: %s. Discarding connection %s",
+ ex.__class__.__name__,
+ ex,
+ conn,
+ )
+ conn.close()
+
+ elif status == TransactionStatus.ACTIVE:
+ # Connection returned during an operation. Bad... just close it.
+ logger.warning("closing returned connection: %s", conn)
+ conn.close()
+
+ if not conn.closed and self._reset:
+ try:
+ self._reset(conn)
+ status = conn.pgconn.transaction_status
+ if status != TransactionStatus.IDLE:
+ sname = TransactionStatus(status).name
+ raise e.ProgrammingError(
+ f"connection left in status {sname} by reset function"
+ f" {self._reset}: discarded"
+ )
+ except Exception as ex:
+ logger.warning(f"error resetting connection: {ex}")
+ conn.close()
+
+ def _shrink_pool(self) -> None:
+ to_close: Optional[Connection[Any]] = None
+
+ with self._lock:
+ # Reset the min number of connections used
+ nconns_min = self._nconns_min
+ self._nconns_min = len(self._pool)
+
+ # If the pool can shrink and connections were unused, drop one
+ if self._nconns > self._min_size and nconns_min > 0:
+ to_close = self._pool.popleft()
+ self._nconns -= 1
+ self._nconns_min -= 1
+
+ if to_close:
+ logger.info(
+ "shrinking pool %r to %s because %s unused connections"
+ " in the last %s sec",
+ self.name,
+ self._nconns,
+ nconns_min,
+ self.max_idle,
+ )
+ to_close.close()
+
+ def _get_measures(self) -> Dict[str, int]:
+ rv = super()._get_measures()
+ rv[self._REQUESTS_WAITING] = len(self._waiting)
+ return rv
+
+
+class WaitingClient:
+ """A position in a queue for a client waiting for a connection."""
+
+ __slots__ = ("conn", "error", "_cond")
+
+ def __init__(self) -> None:
+ self.conn: Optional[Connection[Any]] = None
+ self.error: Optional[Exception] = None
+
+ # The WaitingClient behaves in a way similar to an Event, but we need
+ # to notify reliably the flagger that the waiter has "accepted" the
+ # message and it hasn't timed out yet, otherwise the pool may give a
+ # connection to a client that has already timed out getconn(), which
+ # will be lost.
+ self._cond = threading.Condition()
+
+ def wait(self, timeout: float) -> Connection[Any]:
+ """Wait for a connection to be set and return it.
+
+ Raise an exception if the wait times out or if fail() is called.
+ """
+ with self._cond:
+ if not (self.conn or self.error):
+ if not self._cond.wait(timeout):
+ self.error = PoolTimeout(
+ f"couldn't get a connection after {timeout} sec"
+ )
+
+ if self.conn:
+ return self.conn
+ else:
+ assert self.error
+ raise self.error
+
+ def set(self, conn: Connection[Any]) -> bool:
+ """Signal the client waiting that a connection is ready.
+
+ Return True if the client has "accepted" the connection, False
+ otherwise (typically because wait() has timed out).
+ """
+ with self._cond:
+ if self.conn or self.error:
+ return False
+
+ self.conn = conn
+ self._cond.notify_all()
+ return True
+
+ def fail(self, error: Exception) -> bool:
+ """Signal the client that, alas, they won't have a connection today.
+
+ Return True if the client has "accepted" the error, False otherwise
+ (typically because wait() has timed out).
+ """
+ with self._cond:
+ if self.conn or self.error:
+ return False
+
+ self.error = error
+ self._cond.notify_all()
+ return True
+
+
+class MaintenanceTask(ABC):
+ """A task to run asynchronously to maintain the pool state."""
+
+ def __init__(self, pool: "ConnectionPool"):
+ self.pool = ref(pool)
+
+ def __repr__(self) -> str:
+ pool = self.pool()
+ name = repr(pool.name) if pool else "<pool is gone>"
+ return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
+
+ def run(self) -> None:
+ """Run the task.
+
+ This usually happens in a worker thread. Call the concrete _run()
+ implementation, if the pool is still alive.
+ """
+ pool = self.pool()
+ if not pool or pool.closed:
+ # Pool is no more working. Quietly discard the operation.
+ logger.debug("task run discarded: %s", self)
+ return
+
+ logger.debug("task running in %s: %s", threading.current_thread().name, self)
+ self._run(pool)
+
+ def tick(self) -> None:
+ """Run the scheduled task
+
+ This function is called by the scheduler thread. Use a worker to
+ run the task for real in order to free the scheduler immediately.
+ """
+ pool = self.pool()
+ if not pool or pool.closed:
+ # Pool is no more working. Quietly discard the operation.
+ logger.debug("task tick discarded: %s", self)
+ return
+
+ pool.run_task(self)
+
+ @abstractmethod
+ def _run(self, pool: "ConnectionPool") -> None:
+ ...
+
+
+class StopWorker(MaintenanceTask):
+ """Signal the maintenance thread to terminate."""
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ pass
+
+
+class AddConnection(MaintenanceTask):
+ def __init__(
+ self,
+ pool: "ConnectionPool",
+ attempt: Optional["ConnectionAttempt"] = None,
+ growing: bool = False,
+ ):
+ super().__init__(pool)
+ self.attempt = attempt
+ self.growing = growing
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ pool._add_connection(self.attempt, growing=self.growing)
+
+
+class ReturnConnection(MaintenanceTask):
+ """Clean up and return a connection to the pool."""
+
+ def __init__(self, pool: "ConnectionPool", conn: "Connection[Any]"):
+ super().__init__(pool)
+ self.conn = conn
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ pool._return_connection(self.conn)
+
+
+class ShrinkPool(MaintenanceTask):
+ """If the pool can shrink, remove one connection.
+
+ Re-schedule periodically and also reset the minimum number of connections
+ in the pool.
+ """
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ # Reschedule the task now so that in case of any error we don't lose
+ # the periodic run.
+ pool.schedule_task(self, pool.max_idle)
+ pool._shrink_pool()
diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py
new file mode 100644
index 0000000..0ea6e9a
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/pool_async.py
@@ -0,0 +1,784 @@
+"""
+psycopg asynchronous connection pool
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import asyncio
+import logging
+from abc import ABC, abstractmethod
+from time import monotonic
+from types import TracebackType
+from typing import Any, AsyncIterator, Awaitable, Callable
+from typing import Dict, List, Optional, Sequence, Type
+from weakref import ref
+from contextlib import asynccontextmanager
+
+from psycopg import errors as e
+from psycopg import AsyncConnection
+from psycopg.pq import TransactionStatus
+
+from .base import ConnectionAttempt, BasePool
+from .sched import AsyncScheduler
+from .errors import PoolClosed, PoolTimeout, TooManyRequests
+from ._compat import Task, create_task, Deque
+
+logger = logging.getLogger("psycopg.pool")
+
+
+class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
+ def __init__(
+ self,
+ conninfo: str = "",
+ *,
+ open: bool = True,
+ connection_class: Type[AsyncConnection[Any]] = AsyncConnection,
+ configure: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None,
+ reset: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None,
+ **kwargs: Any,
+ ):
+ self.connection_class = connection_class
+ self._configure = configure
+ self._reset = reset
+
+ # asyncio objects, created on open to attach them to the right loop.
+ self._lock: asyncio.Lock
+ self._sched: AsyncScheduler
+ self._tasks: "asyncio.Queue[MaintenanceTask]"
+
+ self._waiting = Deque["AsyncClient"]()
+
+ # to notify that the pool is full
+ self._pool_full_event: Optional[asyncio.Event] = None
+
+ self._sched_runner: Optional[Task[None]] = None
+ self._workers: List[Task[None]] = []
+
+ super().__init__(conninfo, **kwargs)
+
+ if open:
+ self._open()
+
+ async def wait(self, timeout: float = 30.0) -> None:
+ self._check_open_getconn()
+
+ async with self._lock:
+ assert not self._pool_full_event
+ if len(self._pool) >= self._min_size:
+ return
+ self._pool_full_event = asyncio.Event()
+
+ logger.info("waiting for pool %r initialization", self.name)
+ try:
+ await asyncio.wait_for(self._pool_full_event.wait(), timeout)
+ except asyncio.TimeoutError:
+ await self.close() # stop all the tasks
+ raise PoolTimeout(
+ f"pool initialization incomplete after {timeout} sec"
+ ) from None
+
+ async with self._lock:
+ assert self._pool_full_event
+ self._pool_full_event = None
+
+ logger.info("pool %r is ready to use", self.name)
+
+ @asynccontextmanager
+ async def connection(
+ self, timeout: Optional[float] = None
+ ) -> AsyncIterator[AsyncConnection[Any]]:
+ conn = await self.getconn(timeout=timeout)
+ t0 = monotonic()
+ try:
+ async with conn:
+ yield conn
+ finally:
+ t1 = monotonic()
+ self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
+ await self.putconn(conn)
+
+ async def getconn(self, timeout: Optional[float] = None) -> AsyncConnection[Any]:
+ logger.info("connection requested from %r", self.name)
+ self._stats[self._REQUESTS_NUM] += 1
+
+ self._check_open_getconn()
+
+ # Critical section: decide here if there's a connection ready
+ # or if the client needs to wait.
+ async with self._lock:
+ conn = await self._get_ready_connection(timeout)
+ if not conn:
+ # No connection available: put the client in the waiting queue
+ t0 = monotonic()
+ pos = AsyncClient()
+ self._waiting.append(pos)
+ self._stats[self._REQUESTS_QUEUED] += 1
+
+ # If there is space for the pool to grow, let's do it
+ self._maybe_grow_pool()
+
+ # If we are in the waiting queue, wait to be assigned a connection
+ # (outside the critical section, so only the waiting client is locked)
+ if not conn:
+ if timeout is None:
+ timeout = self.timeout
+ try:
+ conn = await pos.wait(timeout=timeout)
+ except Exception:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise
+ finally:
+ t1 = monotonic()
+ self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
+
+ # Tell the connection it belongs to a pool to avoid closing on __exit__
+ # Note that this property shouldn't be set while the connection is in
+ # the pool, to avoid to create a reference loop.
+ conn._pool = self
+ logger.info("connection given by %r", self.name)
+ return conn
+
+ async def _get_ready_connection(
+ self, timeout: Optional[float]
+ ) -> Optional[AsyncConnection[Any]]:
+ conn: Optional[AsyncConnection[Any]] = None
+ if self._pool:
+ # Take a connection ready out of the pool
+ conn = self._pool.popleft()
+ if len(self._pool) < self._nconns_min:
+ self._nconns_min = len(self._pool)
+ elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise TooManyRequests(
+ f"the pool {self.name!r} has already"
+ f" {len(self._waiting)} requests waiting"
+ )
+ return conn
+
+ def _maybe_grow_pool(self) -> None:
+ # Allow only one task at time to grow the pool (or returning
+ # connections might be starved).
+ if self._nconns < self._max_size and not self._growing:
+ self._nconns += 1
+ logger.info("growing pool %r to %s", self.name, self._nconns)
+ self._growing = True
+ self.run_task(AddConnection(self, growing=True))
+
+ async def putconn(self, conn: AsyncConnection[Any]) -> None:
+ self._check_pool_putconn(conn)
+
+ logger.info("returning connection to %r", self.name)
+ if await self._maybe_close_connection(conn):
+ return
+
+ # Use a worker to perform eventual maintenance work in a separate task
+ if self._reset:
+ self.run_task(ReturnConnection(self, conn))
+ else:
+ await self._return_connection(conn)
+
+ async def _maybe_close_connection(self, conn: AsyncConnection[Any]) -> bool:
+ # If the pool is closed just close the connection instead of returning
+ # it to the pool. For extra refcare remove the pool reference from it.
+ if not self._closed:
+ return False
+
+ conn._pool = None
+ await conn.close()
+ return True
+
+ async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
+ # Make sure the lock is created after there is an event loop
+ try:
+ self._lock
+ except AttributeError:
+ self._lock = asyncio.Lock()
+
+ async with self._lock:
+ self._open()
+
+ if wait:
+ await self.wait(timeout=timeout)
+
+ def _open(self) -> None:
+ if not self._closed:
+ return
+
+ # Throw a RuntimeError if the pool is open outside a running loop.
+ asyncio.get_running_loop()
+
+ self._check_open()
+
+ # Create these objects now to attach them to the right loop.
+ # See #219
+ self._tasks = asyncio.Queue()
+ self._sched = AsyncScheduler()
+ # This has been most likely, but not necessarily, created in `open()`.
+ try:
+ self._lock
+ except AttributeError:
+ self._lock = asyncio.Lock()
+
+ self._closed = False
+ self._opened = True
+
+ self._start_workers()
+ self._start_initial_tasks()
+
+ def _start_workers(self) -> None:
+ self._sched_runner = create_task(
+ self._sched.run(), name=f"{self.name}-scheduler"
+ )
+ for i in range(self.num_workers):
+ t = create_task(
+ self.worker(self._tasks),
+ name=f"{self.name}-worker-{i}",
+ )
+ self._workers.append(t)
+
+ def _start_initial_tasks(self) -> None:
+ # populate the pool with initial min_size connections in background
+ for i in range(self._nconns):
+ self.run_task(AddConnection(self))
+
+ # Schedule a task to shrink the pool if connections over min_size have
+ # remained unused.
+ self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
+
+ async def close(self, timeout: float = 5.0) -> None:
+ if self._closed:
+ return
+
+ async with self._lock:
+ self._closed = True
+ logger.debug("pool %r closed", self.name)
+
+ # Take waiting client and pool connections out of the state
+ waiting = list(self._waiting)
+ self._waiting.clear()
+ connections = list(self._pool)
+ self._pool.clear()
+
+ # Now that the flag _closed is set, getconn will fail immediately,
+ # putconn will just close the returned connection.
+ await self._stop_workers(waiting, connections, timeout)
+
+ async def _stop_workers(
+ self,
+ waiting_clients: Sequence["AsyncClient"] = (),
+ connections: Sequence[AsyncConnection[Any]] = (),
+ timeout: float = 0.0,
+ ) -> None:
+ # Stop the scheduler
+ await self._sched.enter(0, None)
+
+ # Stop the worker tasks
+ workers, self._workers = self._workers[:], []
+ for w in workers:
+ self.run_task(StopWorker(self))
+
+ # Signal to eventual clients in the queue that business is closed.
+ for pos in waiting_clients:
+ await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+
+ # Close the connections still in the pool
+ for conn in connections:
+ await conn.close()
+
+ # Wait for the worker tasks to terminate
+ assert self._sched_runner is not None
+ sched_runner, self._sched_runner = self._sched_runner, None
+ wait = asyncio.gather(sched_runner, *workers)
+ try:
+ if timeout > 0:
+ await asyncio.wait_for(asyncio.shield(wait), timeout=timeout)
+ else:
+ await wait
+ except asyncio.TimeoutError:
+ logger.warning(
+ "couldn't stop pool %r tasks within %s seconds",
+ self.name,
+ timeout,
+ )
+
+ async def __aenter__(self) -> "AsyncConnectionPool":
+ await self.open()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ await self.close()
+
+ async def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
+ min_size, max_size = self._check_size(min_size, max_size)
+
+ ngrow = max(0, min_size - self._min_size)
+
+ logger.info(
+ "resizing %r to min_size=%s max_size=%s",
+ self.name,
+ min_size,
+ max_size,
+ )
+ async with self._lock:
+ self._min_size = min_size
+ self._max_size = max_size
+ self._nconns += ngrow
+
+ for i in range(ngrow):
+ self.run_task(AddConnection(self))
+
+ async def check(self) -> None:
+ async with self._lock:
+ conns = list(self._pool)
+ self._pool.clear()
+
+ # Give a chance to the pool to grow if it has no connection.
+ # In case there are enough connection, or the pool is already
+ # growing, this is a no-op.
+ self._maybe_grow_pool()
+
+ while conns:
+ conn = conns.pop()
+ try:
+ await conn.execute("SELECT 1")
+ if conn.pgconn.transaction_status == TransactionStatus.INTRANS:
+ await conn.rollback()
+ except Exception:
+ self._stats[self._CONNECTIONS_LOST] += 1
+ logger.warning("discarding broken connection: %s", conn)
+ self.run_task(AddConnection(self))
+ else:
+ await self._add_to_pool(conn)
+
+ def reconnect_failed(self) -> None:
+ """
+ Called when reconnection failed for longer than `reconnect_timeout`.
+ """
+ self._reconnect_failed(self)
+
+ def run_task(self, task: "MaintenanceTask") -> None:
+ """Run a maintenance task in a worker."""
+ self._tasks.put_nowait(task)
+
+ async def schedule_task(self, task: "MaintenanceTask", delay: float) -> None:
+ """Run a maintenance task in a worker in the future."""
+ await self._sched.enter(delay, task.tick)
+
+ @classmethod
+ async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None:
+ """Runner to execute pending maintenance task.
+
+ The function is designed to run as a task.
+
+ Block on the queue *q*, run a task received. Finish running if a
+ StopWorker is received.
+ """
+ while True:
+ task = await q.get()
+
+ if isinstance(task, StopWorker):
+ logger.debug("terminating working task")
+ return
+
+ # Run the task. Make sure don't die in the attempt.
+ try:
+ await task.run()
+ except Exception as ex:
+ logger.warning(
+ "task run %s failed: %s: %s",
+ task,
+ ex.__class__.__name__,
+ ex,
+ )
+
+ async def _connect(self, timeout: Optional[float] = None) -> AsyncConnection[Any]:
+ self._stats[self._CONNECTIONS_NUM] += 1
+ kwargs = self.kwargs
+ if timeout:
+ kwargs = kwargs.copy()
+ kwargs["connect_timeout"] = max(round(timeout), 1)
+ t0 = monotonic()
+ try:
+ conn: AsyncConnection[Any]
+ conn = await self.connection_class.connect(self.conninfo, **kwargs)
+ except Exception:
+ self._stats[self._CONNECTIONS_ERRORS] += 1
+ raise
+ else:
+ t1 = monotonic()
+ self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0))
+
+ conn._pool = self
+
+ if self._configure:
+ await self._configure(conn)
+ status = conn.pgconn.transaction_status
+ if status != TransactionStatus.IDLE:
+ sname = TransactionStatus(status).name
+ raise e.ProgrammingError(
+ f"connection left in status {sname} by configure function"
+ f" {self._configure}: discarded"
+ )
+
+ # Set an expiry date, with some randomness to avoid mass reconnection
+ self._set_connection_expiry_date(conn)
+ return conn
+
+ async def _add_connection(
+ self, attempt: Optional[ConnectionAttempt], growing: bool = False
+ ) -> None:
+ """Try to connect and add the connection to the pool.
+
+ If failed, reschedule a new attempt in the future for a few times, then
+ give up, decrease the pool connections number and call
+ `self.reconnect_failed()`.
+
+ """
+ now = monotonic()
+ if not attempt:
+ attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout)
+
+ try:
+ conn = await self._connect()
+ except Exception as ex:
+ logger.warning(f"error connecting in {self.name!r}: {ex}")
+ if attempt.time_to_give_up(now):
+ logger.warning(
+ "reconnection attempt in pool %r failed after %s sec",
+ self.name,
+ self.reconnect_timeout,
+ )
+ async with self._lock:
+ self._nconns -= 1
+ # If we have given up with a growing attempt, allow a new one.
+ if growing and self._growing:
+ self._growing = False
+ self.reconnect_failed()
+ else:
+ attempt.update_delay(now)
+ await self.schedule_task(
+ AddConnection(self, attempt, growing=growing),
+ attempt.delay,
+ )
+ return
+
+ logger.info("adding new connection to the pool")
+ await self._add_to_pool(conn)
+ if growing:
+ async with self._lock:
+ # Keep on growing if the pool is not full yet, or if there are
+ # clients waiting and the pool can extend.
+ if self._nconns < self._min_size or (
+ self._nconns < self._max_size and self._waiting
+ ):
+ self._nconns += 1
+ logger.info("growing pool %r to %s", self.name, self._nconns)
+ self.run_task(AddConnection(self, growing=True))
+ else:
+ self._growing = False
+
+ async def _return_connection(self, conn: AsyncConnection[Any]) -> None:
+ """
+ Return a connection to the pool after usage.
+ """
+ await self._reset_connection(conn)
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
+ # Connection no more in working state: create a new one.
+ self.run_task(AddConnection(self))
+ logger.warning("discarding closed connection: %s", conn)
+ return
+
+ # Check if the connection is past its best before date
+ if conn._expire_at <= monotonic():
+ self.run_task(AddConnection(self))
+ logger.info("discarding expired connection")
+ await conn.close()
+ return
+
+ await self._add_to_pool(conn)
+
+ async def _add_to_pool(self, conn: AsyncConnection[Any]) -> None:
+ """
+ Add a connection to the pool.
+
+ The connection can be a fresh one or one already used in the pool.
+
+ If a client is already waiting for a connection pass it on, otherwise
+ put it back into the pool
+ """
+ # Remove the pool reference from the connection before returning it
+ # to the state, to avoid to create a reference loop.
+ # Also disable the warning for open connection in conn.__del__
+ conn._pool = None
+
+ # Critical section: if there is a client waiting give it the connection
+ # otherwise put it back into the pool.
+ async with self._lock:
+ while self._waiting:
+ # If there is a client waiting (which is still waiting and
+ # hasn't timed out), give it the connection and notify it.
+ pos = self._waiting.popleft()
+ if await pos.set(conn):
+ break
+ else:
+ # No client waiting for a connection: put it back into the pool
+ self._pool.append(conn)
+
+ # If we have been asked to wait for pool init, notify the
+ # waiter if the pool is full.
+ if self._pool_full_event and len(self._pool) >= self._min_size:
+ self._pool_full_event.set()
+
+ async def _reset_connection(self, conn: AsyncConnection[Any]) -> None:
+ """
+ Bring a connection to IDLE state or close it.
+ """
+ status = conn.pgconn.transaction_status
+ if status == TransactionStatus.IDLE:
+ pass
+
+ elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
+ # Connection returned with an active transaction
+ logger.warning("rolling back returned connection: %s", conn)
+ try:
+ await conn.rollback()
+ except Exception as ex:
+ logger.warning(
+ "rollback failed: %s: %s. Discarding connection %s",
+ ex.__class__.__name__,
+ ex,
+ conn,
+ )
+ await conn.close()
+
+ elif status == TransactionStatus.ACTIVE:
+ # Connection returned during an operation. Bad... just close it.
+ logger.warning("closing returned connection: %s", conn)
+ await conn.close()
+
+ if not conn.closed and self._reset:
+ try:
+ await self._reset(conn)
+ status = conn.pgconn.transaction_status
+ if status != TransactionStatus.IDLE:
+ sname = TransactionStatus(status).name
+ raise e.ProgrammingError(
+ f"connection left in status {sname} by reset function"
+ f" {self._reset}: discarded"
+ )
+ except Exception as ex:
+ logger.warning(f"error resetting connection: {ex}")
+ await conn.close()
+
+ async def _shrink_pool(self) -> None:
+ to_close: Optional[AsyncConnection[Any]] = None
+
+ async with self._lock:
+ # Reset the min number of connections used
+ nconns_min = self._nconns_min
+ self._nconns_min = len(self._pool)
+
+ # If the pool can shrink and connections were unused, drop one
+ if self._nconns > self._min_size and nconns_min > 0:
+ to_close = self._pool.popleft()
+ self._nconns -= 1
+ self._nconns_min -= 1
+
+ if to_close:
+ logger.info(
+ "shrinking pool %r to %s because %s unused connections"
+ " in the last %s sec",
+ self.name,
+ self._nconns,
+ nconns_min,
+ self.max_idle,
+ )
+ await to_close.close()
+
+ def _get_measures(self) -> Dict[str, int]:
+ rv = super()._get_measures()
+ rv[self._REQUESTS_WAITING] = len(self._waiting)
+ return rv
+
+
+class AsyncClient:
+ """A position in a queue for a client waiting for a connection."""
+
+ __slots__ = ("conn", "error", "_cond")
+
+ def __init__(self) -> None:
+ self.conn: Optional[AsyncConnection[Any]] = None
+ self.error: Optional[Exception] = None
+
+ # The AsyncClient behaves in a way similar to an Event, but we need
+ # to notify reliably the flagger that the waiter has "accepted" the
+ # message and it hasn't timed out yet, otherwise the pool may give a
+ # connection to a client that has already timed out getconn(), which
+ # will be lost.
+ self._cond = asyncio.Condition()
+
+ async def wait(self, timeout: float) -> AsyncConnection[Any]:
+ """Wait for a connection to be set and return it.
+
+ Raise an exception if the wait times out or if fail() is called.
+ """
+ async with self._cond:
+ if not (self.conn or self.error):
+ try:
+ await asyncio.wait_for(self._cond.wait(), timeout)
+ except asyncio.TimeoutError:
+ self.error = PoolTimeout(
+ f"couldn't get a connection after {timeout} sec"
+ )
+
+ if self.conn:
+ return self.conn
+ else:
+ assert self.error
+ raise self.error
+
+ async def set(self, conn: AsyncConnection[Any]) -> bool:
+ """Signal the client waiting that a connection is ready.
+
+ Return True if the client has "accepted" the connection, False
+ otherwise (typically because wait() has timed out).
+ """
+ async with self._cond:
+ if self.conn or self.error:
+ return False
+
+ self.conn = conn
+ self._cond.notify_all()
+ return True
+
+ async def fail(self, error: Exception) -> bool:
+ """Signal the client that, alas, they won't have a connection today.
+
+ Return True if the client has "accepted" the error, False otherwise
+ (typically because wait() has timed out).
+ """
+ async with self._cond:
+ if self.conn or self.error:
+ return False
+
+ self.error = error
+ self._cond.notify_all()
+ return True
+
+
+class MaintenanceTask(ABC):
+ """A task to run asynchronously to maintain the pool state."""
+
+ def __init__(self, pool: "AsyncConnectionPool"):
+ self.pool = ref(pool)
+
+ def __repr__(self) -> str:
+ pool = self.pool()
+ name = repr(pool.name) if pool else "<pool is gone>"
+ return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
+
+ async def run(self) -> None:
+ """Run the task.
+
+ This usually happens in a worker. Call the concrete _run()
+ implementation, if the pool is still alive.
+ """
+ pool = self.pool()
+ if not pool or pool.closed:
+ # Pool is no more working. Quietly discard the operation.
+ logger.debug("task run discarded: %s", self)
+ return
+
+ await self._run(pool)
+
+ async def tick(self) -> None:
+ """Run the scheduled task
+
+ This function is called by the scheduler task. Use a worker to
+ run the task for real in order to free the scheduler immediately.
+ """
+ pool = self.pool()
+ if not pool or pool.closed:
+ # Pool is no more working. Quietly discard the operation.
+ logger.debug("task tick discarded: %s", self)
+ return
+
+ pool.run_task(self)
+
+ @abstractmethod
+ async def _run(self, pool: "AsyncConnectionPool") -> None:
+ ...
+
+
+class StopWorker(MaintenanceTask):
+ """Signal the maintenance worker to terminate."""
+
+ async def _run(self, pool: "AsyncConnectionPool") -> None:
+ pass
+
+
+class AddConnection(MaintenanceTask):
+ def __init__(
+ self,
+ pool: "AsyncConnectionPool",
+ attempt: Optional["ConnectionAttempt"] = None,
+ growing: bool = False,
+ ):
+ super().__init__(pool)
+ self.attempt = attempt
+ self.growing = growing
+
+ async def _run(self, pool: "AsyncConnectionPool") -> None:
+ await pool._add_connection(self.attempt, growing=self.growing)
+
+
+class ReturnConnection(MaintenanceTask):
+ """Clean up and return a connection to the pool."""
+
+ def __init__(self, pool: "AsyncConnectionPool", conn: "AsyncConnection[Any]"):
+ super().__init__(pool)
+ self.conn = conn
+
+ async def _run(self, pool: "AsyncConnectionPool") -> None:
+ await pool._return_connection(self.conn)
+
+
+class ShrinkPool(MaintenanceTask):
+ """If the pool can shrink, remove one connection.
+
+ Re-schedule periodically and also reset the minimum number of connections
+ in the pool.
+ """
+
+ async def _run(self, pool: "AsyncConnectionPool") -> None:
+ # Reschedule the task now so that in case of any error we don't lose
+ # the periodic run.
+ await pool.schedule_task(self, pool.max_idle)
+ await pool._shrink_pool()
+
+
+class Schedule(MaintenanceTask):
+ """Schedule a task in the pool scheduler.
+
+ This task is a trampoline to allow to use a sync call (pool.run_task)
+ to execute an async one (pool.schedule_task).
+ """
+
+ def __init__(
+ self,
+ pool: "AsyncConnectionPool",
+ task: MaintenanceTask,
+ delay: float,
+ ):
+ super().__init__(pool)
+ self.task = task
+ self.delay = delay
+
+ async def _run(self, pool: "AsyncConnectionPool") -> None:
+ await pool.schedule_task(self.task, self.delay)
diff --git a/psycopg_pool/psycopg_pool/py.typed b/psycopg_pool/psycopg_pool/py.typed
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/py.typed
diff --git a/psycopg_pool/psycopg_pool/sched.py b/psycopg_pool/psycopg_pool/sched.py
new file mode 100644
index 0000000..ca26007
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/sched.py
@@ -0,0 +1,177 @@
+"""
+A minimal scheduler to schedule tasks run in the future.
+
+Inspired to the standard library `sched.scheduler`, but designed for
+multi-thread usage ground up, not as an afterthought. Tasks can be scheduled in
+front of the one currently running and `Scheduler.run()` can be left running
+without any task scheduled.
+
+Tasks are called "Task", not "Event", here, because we actually make use of
+`threading.Event` and the two would be confusing.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import asyncio
+import logging
+import threading
+from time import monotonic
+from heapq import heappush, heappop
+from typing import Any, Callable, List, Optional, NamedTuple
+
+logger = logging.getLogger(__name__)
+
+
+class Task(NamedTuple):
+ time: float
+ action: Optional[Callable[[], Any]]
+
+ def __eq__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time == other.time
+
+ def __lt__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time < other.time
+
+ def __le__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time <= other.time
+
+ def __gt__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time > other.time
+
+ def __ge__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time >= other.time
+
+
+class Scheduler:
+ def __init__(self) -> None:
+ """Initialize a new instance, passing the time and delay functions."""
+ self._queue: List[Task] = []
+ self._lock = threading.RLock()
+ self._event = threading.Event()
+
+ EMPTY_QUEUE_TIMEOUT = 600.0
+
+ def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task:
+ """Enter a new task in the queue delayed in the future.
+
+ Schedule a `!None` to stop the execution.
+ """
+ time = monotonic() + delay
+ return self.enterabs(time, action)
+
+ def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task:
+ """Enter a new task in the queue at an absolute time.
+
+ Schedule a `!None` to stop the execution.
+ """
+ task = Task(time, action)
+ with self._lock:
+ heappush(self._queue, task)
+ first = self._queue[0] is task
+
+ if first:
+ self._event.set()
+
+ return task
+
+ def run(self) -> None:
+ """Execute the events scheduled."""
+ q = self._queue
+ while True:
+ with self._lock:
+ now = monotonic()
+ task = q[0] if q else None
+ if task:
+ if task.time <= now:
+ heappop(q)
+ else:
+ delay = task.time - now
+ task = None
+ else:
+ delay = self.EMPTY_QUEUE_TIMEOUT
+ self._event.clear()
+
+ if task:
+ if not task.action:
+ break
+ try:
+ task.action()
+ except Exception as e:
+ logger.warning(
+ "scheduled task run %s failed: %s: %s",
+ task.action,
+ e.__class__.__name__,
+ e,
+ )
+ else:
+ # Block for the expected timeout or until a new task scheduled
+ self._event.wait(timeout=delay)
+
+
+class AsyncScheduler:
+ def __init__(self) -> None:
+ """Initialize a new instance, passing the time and delay functions."""
+ self._queue: List[Task] = []
+ self._lock = asyncio.Lock()
+ self._event = asyncio.Event()
+
+ EMPTY_QUEUE_TIMEOUT = 600.0
+
+ async def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task:
+ """Enter a new task in the queue delayed in the future.
+
+ Schedule a `!None` to stop the execution.
+ """
+ time = monotonic() + delay
+ return await self.enterabs(time, action)
+
+ async def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task:
+ """Enter a new task in the queue at an absolute time.
+
+ Schedule a `!None` to stop the execution.
+ """
+ task = Task(time, action)
+ async with self._lock:
+ heappush(self._queue, task)
+ first = self._queue[0] is task
+
+ if first:
+ self._event.set()
+
+ return task
+
+ async def run(self) -> None:
+ """Execute the events scheduled."""
+ q = self._queue
+ while True:
+ async with self._lock:
+ now = monotonic()
+ task = q[0] if q else None
+ if task:
+ if task.time <= now:
+ heappop(q)
+ else:
+ delay = task.time - now
+ task = None
+ else:
+ delay = self.EMPTY_QUEUE_TIMEOUT
+ self._event.clear()
+
+ if task:
+ if not task.action:
+ break
+ try:
+ await task.action()
+ except Exception as e:
+ logger.warning(
+ "scheduled task run %s failed: %s: %s",
+ task.action,
+ e.__class__.__name__,
+ e,
+ )
+ else:
+ # Block for the expected timeout or until a new task scheduled
+ try:
+ await asyncio.wait_for(self._event.wait(), delay)
+ except asyncio.TimeoutError:
+ pass
diff --git a/psycopg_pool/psycopg_pool/version.py b/psycopg_pool/psycopg_pool/version.py
new file mode 100644
index 0000000..fc99bbd
--- /dev/null
+++ b/psycopg_pool/psycopg_pool/version.py
@@ -0,0 +1,13 @@
+"""
+psycopg pool version file.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+# Use a versioning scheme as defined in
+# https://www.python.org/dev/peps/pep-0440/
+
+# STOP AND READ! if you change:
+__version__ = "3.1.5"
+# also change:
+# - `docs/news_pool.rst` to declare this version current or unreleased
diff --git a/psycopg_pool/setup.cfg b/psycopg_pool/setup.cfg
new file mode 100644
index 0000000..1a3274e
--- /dev/null
+++ b/psycopg_pool/setup.cfg
@@ -0,0 +1,45 @@
+[metadata]
+name = psycopg-pool
+description = Connection Pool for Psycopg
+url = https://psycopg.org/psycopg3/
+author = Daniele Varrazzo
+author_email = daniele.varrazzo@gmail.com
+license = GNU Lesser General Public License v3 (LGPLv3)
+
+project_urls =
+ Homepage = https://psycopg.org/
+ Code = https://github.com/psycopg/psycopg
+ Issue Tracker = https://github.com/psycopg/psycopg/issues
+ Download = https://pypi.org/project/psycopg-pool/
+
+classifiers =
+ Development Status :: 5 - Production/Stable
+ Intended Audience :: Developers
+ License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)
+ Operating System :: MacOS :: MacOS X
+ Operating System :: Microsoft :: Windows
+ Operating System :: POSIX
+ Programming Language :: Python :: 3
+ Programming Language :: Python :: 3.7
+ Programming Language :: Python :: 3.8
+ Programming Language :: Python :: 3.9
+ Programming Language :: Python :: 3.10
+ Programming Language :: Python :: 3.11
+ Topic :: Database
+ Topic :: Database :: Front-Ends
+ Topic :: Software Development
+ Topic :: Software Development :: Libraries :: Python Modules
+
+long_description = file: README.rst
+long_description_content_type = text/x-rst
+license_files = LICENSE.txt
+
+[options]
+python_requires = >= 3.7
+packages = find:
+zip_safe = False
+install_requires =
+ typing-extensions >= 3.10
+
+[options.package_data]
+psycopg_pool = py.typed
diff --git a/psycopg_pool/setup.py b/psycopg_pool/setup.py
new file mode 100644
index 0000000..771847d
--- /dev/null
+++ b/psycopg_pool/setup.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python3
+"""
+PostgreSQL database adapter for Python - Connection Pool
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+import os
+import re
+from setuptools import setup
+
+# Move to the directory of setup.py: executing this file from another location
+# (e.g. from the project root) will fail
+here = os.path.abspath(os.path.dirname(__file__))
+if os.path.abspath(os.getcwd()) != here:
+ os.chdir(here)
+
+with open("psycopg_pool/version.py") as f:
+ data = f.read()
+ m = re.search(r"""(?m)^__version__\s*=\s*['"]([^'"]+)['"]""", data)
+ if not m:
+ raise Exception(f"cannot find version in {f.name}")
+ version = m.group(1)
+
+
+setup(version=version)