diff options
Diffstat (limited to '')
-rw-r--r-- | psycopg_pool/.flake8 | 3 | ||||
-rw-r--r-- | psycopg_pool/LICENSE.txt | 165 | ||||
-rw-r--r-- | psycopg_pool/README.rst | 24 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/__init__.py | 22 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/_compat.py | 51 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/base.py | 230 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/errors.py | 25 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/null_pool.py | 159 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/null_pool_async.py | 122 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/pool.py | 839 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/pool_async.py | 784 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/py.typed | 0 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/sched.py | 177 | ||||
-rw-r--r-- | psycopg_pool/psycopg_pool/version.py | 13 | ||||
-rw-r--r-- | psycopg_pool/setup.cfg | 45 | ||||
-rw-r--r-- | psycopg_pool/setup.py | 26 |
16 files changed, 2685 insertions, 0 deletions
diff --git a/psycopg_pool/.flake8 b/psycopg_pool/.flake8 new file mode 100644 index 0000000..2ae629c --- /dev/null +++ b/psycopg_pool/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 88 +ignore = W503, E203 diff --git a/psycopg_pool/LICENSE.txt b/psycopg_pool/LICENSE.txt new file mode 100644 index 0000000..0a04128 --- /dev/null +++ b/psycopg_pool/LICENSE.txt @@ -0,0 +1,165 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/> + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/psycopg_pool/README.rst b/psycopg_pool/README.rst new file mode 100644 index 0000000..6e6b32c --- /dev/null +++ b/psycopg_pool/README.rst @@ -0,0 +1,24 @@ +Psycopg 3: PostgreSQL database adapter for Python - Connection Pool +=================================================================== + +This distribution contains the optional connection pool package +`psycopg_pool`__. + +.. __: https://www.psycopg.org/psycopg3/docs/advanced/pool.html + +This package is kept separate from the main ``psycopg`` package because it is +likely that it will follow a different release cycle. + +You can also install this package using:: + + pip install "psycopg[pool]" + +Please read `the project readme`__ and `the installation documentation`__ for +more details. + +.. __: https://github.com/psycopg/psycopg#readme +.. __: https://www.psycopg.org/psycopg3/docs/basic/install.html + #installing-the-connection-pool + + +Copyright (C) 2020 The Psycopg Team diff --git a/psycopg_pool/psycopg_pool/__init__.py b/psycopg_pool/psycopg_pool/__init__.py new file mode 100644 index 0000000..e4d975f --- /dev/null +++ b/psycopg_pool/psycopg_pool/__init__.py @@ -0,0 +1,22 @@ +""" +psycopg connection pool package +""" + +# Copyright (C) 2021 The Psycopg Team + +from .pool import ConnectionPool +from .pool_async import AsyncConnectionPool +from .null_pool import NullConnectionPool +from .null_pool_async import AsyncNullConnectionPool +from .errors import PoolClosed, PoolTimeout, TooManyRequests +from .version import __version__ as __version__ # noqa: F401 + +__all__ = [ + "AsyncConnectionPool", + "AsyncNullConnectionPool", + "ConnectionPool", + "NullConnectionPool", + "PoolClosed", + "PoolTimeout", + "TooManyRequests", +] diff --git a/psycopg_pool/psycopg_pool/_compat.py b/psycopg_pool/psycopg_pool/_compat.py new file mode 100644 index 0000000..9fb2b9b --- /dev/null +++ b/psycopg_pool/psycopg_pool/_compat.py @@ -0,0 +1,51 @@ +""" +compatibility functions for different Python versions +""" + +# Copyright (C) 2021 The Psycopg Team + +import sys +import asyncio +from typing import Any, Awaitable, Generator, Optional, Union, Type, TypeVar +from typing_extensions import TypeAlias + +import psycopg.errors as e + +T = TypeVar("T") +FutureT: TypeAlias = Union["asyncio.Future[T]", Generator[Any, None, T], Awaitable[T]] + +if sys.version_info >= (3, 8): + create_task = asyncio.create_task + Task = asyncio.Task + +else: + + def create_task( + coro: FutureT[T], name: Optional[str] = None + ) -> "asyncio.Future[T]": + return asyncio.create_task(coro) + + Task = asyncio.Future + +if sys.version_info >= (3, 9): + from collections import Counter, deque as Deque +else: + from typing import Counter, Deque + +__all__ = [ + "Counter", + "Deque", + "Task", + "create_task", +] + +# Workaround for psycopg < 3.0.8. +# Timeout on NullPool connection mignt not work correctly. +try: + ConnectionTimeout: Type[e.OperationalError] = e.ConnectionTimeout +except AttributeError: + + class DummyConnectionTimeout(e.OperationalError): + pass + + ConnectionTimeout = DummyConnectionTimeout diff --git a/psycopg_pool/psycopg_pool/base.py b/psycopg_pool/psycopg_pool/base.py new file mode 100644 index 0000000..298ea68 --- /dev/null +++ b/psycopg_pool/psycopg_pool/base.py @@ -0,0 +1,230 @@ +""" +psycopg connection pool base class and functionalities. +""" + +# Copyright (C) 2021 The Psycopg Team + +from time import monotonic +from random import random +from typing import Any, Callable, Dict, Generic, Optional, Tuple + +from psycopg import errors as e +from psycopg.abc import ConnectionType + +from .errors import PoolClosed +from ._compat import Counter, Deque + + +class BasePool(Generic[ConnectionType]): + + # Used to generate pool names + _num_pool = 0 + + # Stats keys + _POOL_MIN = "pool_min" + _POOL_MAX = "pool_max" + _POOL_SIZE = "pool_size" + _POOL_AVAILABLE = "pool_available" + _REQUESTS_WAITING = "requests_waiting" + _REQUESTS_NUM = "requests_num" + _REQUESTS_QUEUED = "requests_queued" + _REQUESTS_WAIT_MS = "requests_wait_ms" + _REQUESTS_ERRORS = "requests_errors" + _USAGE_MS = "usage_ms" + _RETURNS_BAD = "returns_bad" + _CONNECTIONS_NUM = "connections_num" + _CONNECTIONS_MS = "connections_ms" + _CONNECTIONS_ERRORS = "connections_errors" + _CONNECTIONS_LOST = "connections_lost" + + def __init__( + self, + conninfo: str = "", + *, + kwargs: Optional[Dict[str, Any]] = None, + min_size: int = 4, + max_size: Optional[int] = None, + open: bool = True, + name: Optional[str] = None, + timeout: float = 30.0, + max_waiting: int = 0, + max_lifetime: float = 60 * 60.0, + max_idle: float = 10 * 60.0, + reconnect_timeout: float = 5 * 60.0, + reconnect_failed: Optional[Callable[["BasePool[ConnectionType]"], None]] = None, + num_workers: int = 3, + ): + min_size, max_size = self._check_size(min_size, max_size) + + if not name: + num = BasePool._num_pool = BasePool._num_pool + 1 + name = f"pool-{num}" + + if num_workers < 1: + raise ValueError("num_workers must be at least 1") + + self.conninfo = conninfo + self.kwargs: Dict[str, Any] = kwargs or {} + self._reconnect_failed: Callable[["BasePool[ConnectionType]"], None] + self._reconnect_failed = reconnect_failed or (lambda pool: None) + self.name = name + self._min_size = min_size + self._max_size = max_size + self.timeout = timeout + self.max_waiting = max_waiting + self.reconnect_timeout = reconnect_timeout + self.max_lifetime = max_lifetime + self.max_idle = max_idle + self.num_workers = num_workers + + self._nconns = min_size # currently in the pool, out, being prepared + self._pool = Deque[ConnectionType]() + self._stats = Counter[str]() + + # Min number of connections in the pool in a max_idle unit of time. + # It is reset periodically by the ShrinkPool scheduled task. + # It is used to shrink back the pool if maxcon > min_size and extra + # connections have been acquired, if we notice that in the last + # max_idle interval they weren't all used. + self._nconns_min = min_size + + # Flag to allow the pool to grow only one connection at time. In case + # of spike, if threads are allowed to grow in parallel and connection + # time is slow, there won't be any thread available to return the + # connections to the pool. + self._growing = False + + self._opened = False + self._closed = True + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__module__}.{self.__class__.__name__}" + f" {self.name!r} at 0x{id(self):x}>" + ) + + @property + def min_size(self) -> int: + return self._min_size + + @property + def max_size(self) -> int: + return self._max_size + + @property + def closed(self) -> bool: + """`!True` if the pool is closed.""" + return self._closed + + def _check_size(self, min_size: int, max_size: Optional[int]) -> Tuple[int, int]: + if max_size is None: + max_size = min_size + + if min_size < 0: + raise ValueError("min_size cannot be negative") + if max_size < min_size: + raise ValueError("max_size must be greater or equal than min_size") + if min_size == max_size == 0: + raise ValueError("if min_size is 0 max_size must be greater or than 0") + + return min_size, max_size + + def _check_open(self) -> None: + if self._closed and self._opened: + raise e.OperationalError( + "pool has already been opened/closed and cannot be reused" + ) + + def _check_open_getconn(self) -> None: + if self._closed: + if self._opened: + raise PoolClosed(f"the pool {self.name!r} is already closed") + else: + raise PoolClosed(f"the pool {self.name!r} is not open yet") + + def _check_pool_putconn(self, conn: ConnectionType) -> None: + pool = getattr(conn, "_pool", None) + if pool is self: + return + + if pool: + msg = f"it comes from pool {pool.name!r}" + else: + msg = "it doesn't come from any pool" + raise ValueError( + f"can't return connection to pool {self.name!r}, {msg}: {conn}" + ) + + def get_stats(self) -> Dict[str, int]: + """ + Return current stats about the pool usage. + """ + rv = dict(self._stats) + rv.update(self._get_measures()) + return rv + + def pop_stats(self) -> Dict[str, int]: + """ + Return current stats about the pool usage. + + After the call, all the counters are reset to zero. + """ + stats, self._stats = self._stats, Counter() + rv = dict(stats) + rv.update(self._get_measures()) + return rv + + def _get_measures(self) -> Dict[str, int]: + """ + Return immediate measures of the pool (not counters). + """ + return { + self._POOL_MIN: self._min_size, + self._POOL_MAX: self._max_size, + self._POOL_SIZE: self._nconns, + self._POOL_AVAILABLE: len(self._pool), + } + + @classmethod + def _jitter(cls, value: float, min_pc: float, max_pc: float) -> float: + """ + Add a random value to *value* between *min_pc* and *max_pc* percent. + """ + return value * (1.0 + ((max_pc - min_pc) * random()) + min_pc) + + def _set_connection_expiry_date(self, conn: ConnectionType) -> None: + """Set an expiry date on a connection. + + Add some randomness to avoid mass reconnection. + """ + conn._expire_at = monotonic() + self._jitter(self.max_lifetime, -0.05, 0.0) + + +class ConnectionAttempt: + """Keep the state of a connection attempt.""" + + INITIAL_DELAY = 1.0 + DELAY_JITTER = 0.1 + DELAY_BACKOFF = 2.0 + + def __init__(self, *, reconnect_timeout: float): + self.reconnect_timeout = reconnect_timeout + self.delay = 0.0 + self.give_up_at = 0.0 + + def update_delay(self, now: float) -> None: + """Calculate how long to wait for a new connection attempt""" + if self.delay == 0.0: + self.give_up_at = now + self.reconnect_timeout + self.delay = BasePool._jitter( + self.INITIAL_DELAY, -self.DELAY_JITTER, self.DELAY_JITTER + ) + else: + self.delay *= self.DELAY_BACKOFF + + if self.delay + now > self.give_up_at: + self.delay = max(0.0, self.give_up_at - now) + + def time_to_give_up(self, now: float) -> bool: + """Return True if we are tired of trying to connect. Meh.""" + return self.give_up_at > 0.0 and now >= self.give_up_at diff --git a/psycopg_pool/psycopg_pool/errors.py b/psycopg_pool/psycopg_pool/errors.py new file mode 100644 index 0000000..9e672ad --- /dev/null +++ b/psycopg_pool/psycopg_pool/errors.py @@ -0,0 +1,25 @@ +""" +Connection pool errors. +""" + +# Copyright (C) 2021 The Psycopg Team + +from psycopg import errors as e + + +class PoolClosed(e.OperationalError): + """Attempt to get a connection from a closed pool.""" + + __module__ = "psycopg_pool" + + +class PoolTimeout(e.OperationalError): + """The pool couldn't provide a connection in acceptable time.""" + + __module__ = "psycopg_pool" + + +class TooManyRequests(e.OperationalError): + """Too many requests in the queue waiting for a connection from the pool.""" + + __module__ = "psycopg_pool" diff --git a/psycopg_pool/psycopg_pool/null_pool.py b/psycopg_pool/psycopg_pool/null_pool.py new file mode 100644 index 0000000..c0a77c2 --- /dev/null +++ b/psycopg_pool/psycopg_pool/null_pool.py @@ -0,0 +1,159 @@ +""" +Psycopg null connection pools +""" + +# Copyright (C) 2022 The Psycopg Team + +import logging +import threading +from typing import Any, Optional, Tuple + +from psycopg import Connection +from psycopg.pq import TransactionStatus + +from .pool import ConnectionPool, AddConnection +from .errors import PoolTimeout, TooManyRequests +from ._compat import ConnectionTimeout + +logger = logging.getLogger("psycopg.pool") + + +class _BaseNullConnectionPool: + def __init__( + self, conninfo: str = "", min_size: int = 0, *args: Any, **kwargs: Any + ): + super().__init__( # type: ignore[call-arg] + conninfo, *args, min_size=min_size, **kwargs + ) + + def _check_size(self, min_size: int, max_size: Optional[int]) -> Tuple[int, int]: + if max_size is None: + max_size = min_size + + if min_size != 0: + raise ValueError("null pools must have min_size = 0") + if max_size < min_size: + raise ValueError("max_size must be greater or equal than min_size") + + return min_size, max_size + + def _start_initial_tasks(self) -> None: + # Null pools don't have background tasks to fill connections + # or to grow/shrink. + return + + def _maybe_grow_pool(self) -> None: + # null pools don't grow + pass + + +class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool): + def wait(self, timeout: float = 30.0) -> None: + """ + Create a connection for test. + + Calling this function will verify that the connectivity with the + database works as expected. However the connection will not be stored + in the pool. + + Close the pool, and raise `PoolTimeout`, if not ready within *timeout* + sec. + """ + self._check_open_getconn() + + with self._lock: + assert not self._pool_full_event + self._pool_full_event = threading.Event() + + logger.info("waiting for pool %r initialization", self.name) + self.run_task(AddConnection(self)) + if not self._pool_full_event.wait(timeout): + self.close() # stop all the threads + raise PoolTimeout(f"pool initialization incomplete after {timeout} sec") + + with self._lock: + assert self._pool_full_event + self._pool_full_event = None + + logger.info("pool %r is ready to use", self.name) + + def _get_ready_connection( + self, timeout: Optional[float] + ) -> Optional[Connection[Any]]: + conn: Optional[Connection[Any]] = None + if self.max_size == 0 or self._nconns < self.max_size: + # Create a new connection for the client + try: + conn = self._connect(timeout=timeout) + except ConnectionTimeout as ex: + raise PoolTimeout(str(ex)) from None + self._nconns += 1 + + elif self.max_waiting and len(self._waiting) >= self.max_waiting: + self._stats[self._REQUESTS_ERRORS] += 1 + raise TooManyRequests( + f"the pool {self.name!r} has already" + f" {len(self._waiting)} requests waiting" + ) + return conn + + def _maybe_close_connection(self, conn: Connection[Any]) -> bool: + with self._lock: + if not self._closed and self._waiting: + return False + + conn._pool = None + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 + conn.close() + self._nconns -= 1 + return True + + def resize(self, min_size: int, max_size: Optional[int] = None) -> None: + """Change the size of the pool during runtime. + + Only *max_size* can be changed; *min_size* must remain 0. + """ + min_size, max_size = self._check_size(min_size, max_size) + + logger.info( + "resizing %r to min_size=%s max_size=%s", + self.name, + min_size, + max_size, + ) + with self._lock: + self._min_size = min_size + self._max_size = max_size + + def check(self) -> None: + """No-op, as the pool doesn't have connections in its state.""" + pass + + def _add_to_pool(self, conn: Connection[Any]) -> None: + # Remove the pool reference from the connection before returning it + # to the state, to avoid to create a reference loop. + # Also disable the warning for open connection in conn.__del__ + conn._pool = None + + # Critical section: if there is a client waiting give it the connection + # otherwise put it back into the pool. + with self._lock: + while self._waiting: + # If there is a client waiting (which is still waiting and + # hasn't timed out), give it the connection and notify it. + pos = self._waiting.popleft() + if pos.set(conn): + break + else: + # No client waiting for a connection: close the connection + conn.close() + + # If we have been asked to wait for pool init, notify the + # waiter if the pool is ready. + if self._pool_full_event: + self._pool_full_event.set() + else: + # The connection created by wait shouldn't decrease the + # count of the number of connection used. + self._nconns -= 1 diff --git a/psycopg_pool/psycopg_pool/null_pool_async.py b/psycopg_pool/psycopg_pool/null_pool_async.py new file mode 100644 index 0000000..ae9d207 --- /dev/null +++ b/psycopg_pool/psycopg_pool/null_pool_async.py @@ -0,0 +1,122 @@ +""" +psycopg asynchronous null connection pool +""" + +# Copyright (C) 2022 The Psycopg Team + +import asyncio +import logging +from typing import Any, Optional + +from psycopg import AsyncConnection +from psycopg.pq import TransactionStatus + +from .errors import PoolTimeout, TooManyRequests +from ._compat import ConnectionTimeout +from .null_pool import _BaseNullConnectionPool +from .pool_async import AsyncConnectionPool, AddConnection + +logger = logging.getLogger("psycopg.pool") + + +class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool): + async def wait(self, timeout: float = 30.0) -> None: + self._check_open_getconn() + + async with self._lock: + assert not self._pool_full_event + self._pool_full_event = asyncio.Event() + + logger.info("waiting for pool %r initialization", self.name) + self.run_task(AddConnection(self)) + try: + await asyncio.wait_for(self._pool_full_event.wait(), timeout) + except asyncio.TimeoutError: + await self.close() # stop all the tasks + raise PoolTimeout( + f"pool initialization incomplete after {timeout} sec" + ) from None + + async with self._lock: + assert self._pool_full_event + self._pool_full_event = None + + logger.info("pool %r is ready to use", self.name) + + async def _get_ready_connection( + self, timeout: Optional[float] + ) -> Optional[AsyncConnection[Any]]: + conn: Optional[AsyncConnection[Any]] = None + if self.max_size == 0 or self._nconns < self.max_size: + # Create a new connection for the client + try: + conn = await self._connect(timeout=timeout) + except ConnectionTimeout as ex: + raise PoolTimeout(str(ex)) from None + self._nconns += 1 + elif self.max_waiting and len(self._waiting) >= self.max_waiting: + self._stats[self._REQUESTS_ERRORS] += 1 + raise TooManyRequests( + f"the pool {self.name!r} has already" + f" {len(self._waiting)} requests waiting" + ) + return conn + + async def _maybe_close_connection(self, conn: AsyncConnection[Any]) -> bool: + # Close the connection if no client is waiting for it, or if the pool + # is closed. For extra refcare remove the pool reference from it. + # Maintain the stats. + async with self._lock: + if not self._closed and self._waiting: + return False + + conn._pool = None + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 + await conn.close() + self._nconns -= 1 + return True + + async def resize(self, min_size: int, max_size: Optional[int] = None) -> None: + min_size, max_size = self._check_size(min_size, max_size) + + logger.info( + "resizing %r to min_size=%s max_size=%s", + self.name, + min_size, + max_size, + ) + async with self._lock: + self._min_size = min_size + self._max_size = max_size + + async def check(self) -> None: + pass + + async def _add_to_pool(self, conn: AsyncConnection[Any]) -> None: + # Remove the pool reference from the connection before returning it + # to the state, to avoid to create a reference loop. + # Also disable the warning for open connection in conn.__del__ + conn._pool = None + + # Critical section: if there is a client waiting give it the connection + # otherwise put it back into the pool. + async with self._lock: + while self._waiting: + # If there is a client waiting (which is still waiting and + # hasn't timed out), give it the connection and notify it. + pos = self._waiting.popleft() + if await pos.set(conn): + break + else: + # No client waiting for a connection: close the connection + await conn.close() + + # If we have been asked to wait for pool init, notify the + # waiter if the pool is ready. + if self._pool_full_event: + self._pool_full_event.set() + else: + # The connection created by wait shouldn't decrease the + # count of the number of connection used. + self._nconns -= 1 diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py new file mode 100644 index 0000000..609d95d --- /dev/null +++ b/psycopg_pool/psycopg_pool/pool.py @@ -0,0 +1,839 @@ +""" +psycopg synchronous connection pool +""" + +# Copyright (C) 2021 The Psycopg Team + +import logging +import threading +from abc import ABC, abstractmethod +from time import monotonic +from queue import Queue, Empty +from types import TracebackType +from typing import Any, Callable, Dict, Iterator, List +from typing import Optional, Sequence, Type +from weakref import ref +from contextlib import contextmanager + +from psycopg import errors as e +from psycopg import Connection +from psycopg.pq import TransactionStatus + +from .base import ConnectionAttempt, BasePool +from .sched import Scheduler +from .errors import PoolClosed, PoolTimeout, TooManyRequests +from ._compat import Deque + +logger = logging.getLogger("psycopg.pool") + + +class ConnectionPool(BasePool[Connection[Any]]): + def __init__( + self, + conninfo: str = "", + *, + open: bool = True, + connection_class: Type[Connection[Any]] = Connection, + configure: Optional[Callable[[Connection[Any]], None]] = None, + reset: Optional[Callable[[Connection[Any]], None]] = None, + **kwargs: Any, + ): + self.connection_class = connection_class + self._configure = configure + self._reset = reset + + self._lock = threading.RLock() + self._waiting = Deque["WaitingClient"]() + + # to notify that the pool is full + self._pool_full_event: Optional[threading.Event] = None + + self._sched = Scheduler() + self._sched_runner: Optional[threading.Thread] = None + self._tasks: "Queue[MaintenanceTask]" = Queue() + self._workers: List[threading.Thread] = [] + + super().__init__(conninfo, **kwargs) + + if open: + self.open() + + def __del__(self) -> None: + # If the '_closed' property is not set we probably failed in __init__. + # Don't try anything complicated as probably it won't work. + if getattr(self, "_closed", True): + return + + self._stop_workers() + + def wait(self, timeout: float = 30.0) -> None: + """ + Wait for the pool to be full (with `min_size` connections) after creation. + + Close the pool, and raise `PoolTimeout`, if not ready within *timeout* + sec. + + Calling this method is not mandatory: you can try and use the pool + immediately after its creation. The first client will be served as soon + as a connection is ready. You can use this method if you prefer your + program to terminate in case the environment is not configured + properly, rather than trying to stay up the hardest it can. + """ + self._check_open_getconn() + + with self._lock: + assert not self._pool_full_event + if len(self._pool) >= self._min_size: + return + self._pool_full_event = threading.Event() + + logger.info("waiting for pool %r initialization", self.name) + if not self._pool_full_event.wait(timeout): + self.close() # stop all the threads + raise PoolTimeout(f"pool initialization incomplete after {timeout} sec") + + with self._lock: + assert self._pool_full_event + self._pool_full_event = None + + logger.info("pool %r is ready to use", self.name) + + @contextmanager + def connection(self, timeout: Optional[float] = None) -> Iterator[Connection[Any]]: + """Context manager to obtain a connection from the pool. + + Return the connection immediately if available, otherwise wait up to + *timeout* or `self.timeout` seconds and throw `PoolTimeout` if a + connection is not available in time. + + Upon context exit, return the connection to the pool. Apply the normal + :ref:`connection context behaviour <with-connection>` (commit/rollback + the transaction in case of success/error). If the connection is no more + in working state, replace it with a new one. + """ + conn = self.getconn(timeout=timeout) + t0 = monotonic() + try: + with conn: + yield conn + finally: + t1 = monotonic() + self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0)) + self.putconn(conn) + + def getconn(self, timeout: Optional[float] = None) -> Connection[Any]: + """Obtain a connection from the pool. + + You should preferably use `connection()`. Use this function only if + it is not possible to use the connection as context manager. + + After using this function you *must* call a corresponding `putconn()`: + failing to do so will deplete the pool. A depleted pool is a sad pool: + you don't want a depleted pool. + """ + logger.info("connection requested from %r", self.name) + self._stats[self._REQUESTS_NUM] += 1 + + # Critical section: decide here if there's a connection ready + # or if the client needs to wait. + with self._lock: + self._check_open_getconn() + conn = self._get_ready_connection(timeout) + if not conn: + # No connection available: put the client in the waiting queue + t0 = monotonic() + pos = WaitingClient() + self._waiting.append(pos) + self._stats[self._REQUESTS_QUEUED] += 1 + + # If there is space for the pool to grow, let's do it + self._maybe_grow_pool() + + # If we are in the waiting queue, wait to be assigned a connection + # (outside the critical section, so only the waiting client is locked) + if not conn: + if timeout is None: + timeout = self.timeout + try: + conn = pos.wait(timeout=timeout) + except Exception: + self._stats[self._REQUESTS_ERRORS] += 1 + raise + finally: + t1 = monotonic() + self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0)) + + # Tell the connection it belongs to a pool to avoid closing on __exit__ + # Note that this property shouldn't be set while the connection is in + # the pool, to avoid to create a reference loop. + conn._pool = self + logger.info("connection given by %r", self.name) + return conn + + def _get_ready_connection( + self, timeout: Optional[float] + ) -> Optional[Connection[Any]]: + """Return a connection, if the client deserves one.""" + conn: Optional[Connection[Any]] = None + if self._pool: + # Take a connection ready out of the pool + conn = self._pool.popleft() + if len(self._pool) < self._nconns_min: + self._nconns_min = len(self._pool) + elif self.max_waiting and len(self._waiting) >= self.max_waiting: + self._stats[self._REQUESTS_ERRORS] += 1 + raise TooManyRequests( + f"the pool {self.name!r} has already" + f" {len(self._waiting)} requests waiting" + ) + return conn + + def _maybe_grow_pool(self) -> None: + # Allow only one thread at time to grow the pool (or returning + # connections might be starved). + if self._nconns >= self._max_size or self._growing: + return + self._nconns += 1 + logger.info("growing pool %r to %s", self.name, self._nconns) + self._growing = True + self.run_task(AddConnection(self, growing=True)) + + def putconn(self, conn: Connection[Any]) -> None: + """Return a connection to the loving hands of its pool. + + Use this function only paired with a `getconn()`. You don't need to use + it if you use the much more comfortable `connection()` context manager. + """ + # Quick check to discard the wrong connection + self._check_pool_putconn(conn) + + logger.info("returning connection to %r", self.name) + + if self._maybe_close_connection(conn): + return + + # Use a worker to perform eventual maintenance work in a separate thread + if self._reset: + self.run_task(ReturnConnection(self, conn)) + else: + self._return_connection(conn) + + def _maybe_close_connection(self, conn: Connection[Any]) -> bool: + """Close a returned connection if necessary. + + Return `!True if the connection was closed. + """ + # If the pool is closed just close the connection instead of returning + # it to the pool. For extra refcare remove the pool reference from it. + if not self._closed: + return False + + conn._pool = None + conn.close() + return True + + def open(self, wait: bool = False, timeout: float = 30.0) -> None: + """Open the pool by starting connecting and and accepting clients. + + If *wait* is `!False`, return immediately and let the background worker + fill the pool if `min_size` > 0. Otherwise wait up to *timeout* seconds + for the requested number of connections to be ready (see `wait()` for + details). + + It is safe to call `!open()` again on a pool already open (because the + method was already called, or because the pool context was entered, or + because the pool was initialized with *open* = `!True`) but you cannot + currently re-open a closed pool. + """ + with self._lock: + self._open() + + if wait: + self.wait(timeout=timeout) + + def _open(self) -> None: + if not self._closed: + return + + self._check_open() + + self._closed = False + self._opened = True + + self._start_workers() + self._start_initial_tasks() + + def _start_workers(self) -> None: + self._sched_runner = threading.Thread( + target=self._sched.run, + name=f"{self.name}-scheduler", + daemon=True, + ) + assert not self._workers + for i in range(self.num_workers): + t = threading.Thread( + target=self.worker, + args=(self._tasks,), + name=f"{self.name}-worker-{i}", + daemon=True, + ) + self._workers.append(t) + + # The object state is complete. Start the worker threads + self._sched_runner.start() + for t in self._workers: + t.start() + + def _start_initial_tasks(self) -> None: + # populate the pool with initial min_size connections in background + for i in range(self._nconns): + self.run_task(AddConnection(self)) + + # Schedule a task to shrink the pool if connections over min_size have + # remained unused. + self.schedule_task(ShrinkPool(self), self.max_idle) + + def close(self, timeout: float = 5.0) -> None: + """Close the pool and make it unavailable to new clients. + + All the waiting and future clients will fail to acquire a connection + with a `PoolClosed` exception. Currently used connections will not be + closed until returned to the pool. + + Wait *timeout* seconds for threads to terminate their job, if positive. + If the timeout expires the pool is closed anyway, although it may raise + some warnings on exit. + """ + if self._closed: + return + + with self._lock: + self._closed = True + logger.debug("pool %r closed", self.name) + + # Take waiting client and pool connections out of the state + waiting = list(self._waiting) + self._waiting.clear() + connections = list(self._pool) + self._pool.clear() + + # Now that the flag _closed is set, getconn will fail immediately, + # putconn will just close the returned connection. + self._stop_workers(waiting, connections, timeout) + + def _stop_workers( + self, + waiting_clients: Sequence["WaitingClient"] = (), + connections: Sequence[Connection[Any]] = (), + timeout: float = 0.0, + ) -> None: + + # Stop the scheduler + self._sched.enter(0, None) + + # Stop the worker threads + workers, self._workers = self._workers[:], [] + for i in range(len(workers)): + self.run_task(StopWorker(self)) + + # Signal to eventual clients in the queue that business is closed. + for pos in waiting_clients: + pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) + + # Close the connections still in the pool + for conn in connections: + conn.close() + + # Wait for the worker threads to terminate + assert self._sched_runner is not None + sched_runner, self._sched_runner = self._sched_runner, None + if timeout > 0: + for t in [sched_runner] + workers: + if not t.is_alive(): + continue + t.join(timeout) + if t.is_alive(): + logger.warning( + "couldn't stop thread %s in pool %r within %s seconds", + t, + self.name, + timeout, + ) + + def __enter__(self) -> "ConnectionPool": + self.open() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.close() + + def resize(self, min_size: int, max_size: Optional[int] = None) -> None: + """Change the size of the pool during runtime.""" + min_size, max_size = self._check_size(min_size, max_size) + + ngrow = max(0, min_size - self._min_size) + + logger.info( + "resizing %r to min_size=%s max_size=%s", + self.name, + min_size, + max_size, + ) + with self._lock: + self._min_size = min_size + self._max_size = max_size + self._nconns += ngrow + + for i in range(ngrow): + self.run_task(AddConnection(self)) + + def check(self) -> None: + """Verify the state of the connections currently in the pool. + + Test each connection: if it works return it to the pool, otherwise + dispose of it and create a new one. + """ + with self._lock: + conns = list(self._pool) + self._pool.clear() + + # Give a chance to the pool to grow if it has no connection. + # In case there are enough connection, or the pool is already + # growing, this is a no-op. + self._maybe_grow_pool() + + while conns: + conn = conns.pop() + try: + conn.execute("SELECT 1") + if conn.pgconn.transaction_status == TransactionStatus.INTRANS: + conn.rollback() + except Exception: + self._stats[self._CONNECTIONS_LOST] += 1 + logger.warning("discarding broken connection: %s", conn) + self.run_task(AddConnection(self)) + else: + self._add_to_pool(conn) + + def reconnect_failed(self) -> None: + """ + Called when reconnection failed for longer than `reconnect_timeout`. + """ + self._reconnect_failed(self) + + def run_task(self, task: "MaintenanceTask") -> None: + """Run a maintenance task in a worker thread.""" + self._tasks.put_nowait(task) + + def schedule_task(self, task: "MaintenanceTask", delay: float) -> None: + """Run a maintenance task in a worker thread in the future.""" + self._sched.enter(delay, task.tick) + + _WORKER_TIMEOUT = 60.0 + + @classmethod + def worker(cls, q: "Queue[MaintenanceTask]") -> None: + """Runner to execute pending maintenance task. + + The function is designed to run as a separate thread. + + Block on the queue *q*, run a task received. Finish running if a + StopWorker is received. + """ + # Don't make all the workers time out at the same moment + timeout = cls._jitter(cls._WORKER_TIMEOUT, -0.1, 0.1) + while True: + # Use a timeout to make the wait interruptible + try: + task = q.get(timeout=timeout) + except Empty: + continue + + if isinstance(task, StopWorker): + logger.debug( + "terminating working thread %s", + threading.current_thread().name, + ) + return + + # Run the task. Make sure don't die in the attempt. + try: + task.run() + except Exception as ex: + logger.warning( + "task run %s failed: %s: %s", + task, + ex.__class__.__name__, + ex, + ) + + def _connect(self, timeout: Optional[float] = None) -> Connection[Any]: + """Return a new connection configured for the pool.""" + self._stats[self._CONNECTIONS_NUM] += 1 + kwargs = self.kwargs + if timeout: + kwargs = kwargs.copy() + kwargs["connect_timeout"] = max(round(timeout), 1) + t0 = monotonic() + try: + conn: Connection[Any] + conn = self.connection_class.connect(self.conninfo, **kwargs) + except Exception: + self._stats[self._CONNECTIONS_ERRORS] += 1 + raise + else: + t1 = monotonic() + self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0)) + + conn._pool = self + + if self._configure: + self._configure(conn) + status = conn.pgconn.transaction_status + if status != TransactionStatus.IDLE: + sname = TransactionStatus(status).name + raise e.ProgrammingError( + f"connection left in status {sname} by configure function" + f" {self._configure}: discarded" + ) + + # Set an expiry date, with some randomness to avoid mass reconnection + self._set_connection_expiry_date(conn) + return conn + + def _add_connection( + self, attempt: Optional[ConnectionAttempt], growing: bool = False + ) -> None: + """Try to connect and add the connection to the pool. + + If failed, reschedule a new attempt in the future for a few times, then + give up, decrease the pool connections number and call + `self.reconnect_failed()`. + + """ + now = monotonic() + if not attempt: + attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout) + + try: + conn = self._connect() + except Exception as ex: + logger.warning(f"error connecting in {self.name!r}: {ex}") + if attempt.time_to_give_up(now): + logger.warning( + "reconnection attempt in pool %r failed after %s sec", + self.name, + self.reconnect_timeout, + ) + with self._lock: + self._nconns -= 1 + # If we have given up with a growing attempt, allow a new one. + if growing and self._growing: + self._growing = False + self.reconnect_failed() + else: + attempt.update_delay(now) + self.schedule_task( + AddConnection(self, attempt, growing=growing), + attempt.delay, + ) + return + + logger.info("adding new connection to the pool") + self._add_to_pool(conn) + if growing: + with self._lock: + # Keep on growing if the pool is not full yet, or if there are + # clients waiting and the pool can extend. + if self._nconns < self._min_size or ( + self._nconns < self._max_size and self._waiting + ): + self._nconns += 1 + logger.info("growing pool %r to %s", self.name, self._nconns) + self.run_task(AddConnection(self, growing=True)) + else: + self._growing = False + + def _return_connection(self, conn: Connection[Any]) -> None: + """ + Return a connection to the pool after usage. + """ + self._reset_connection(conn) + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 + # Connection no more in working state: create a new one. + self.run_task(AddConnection(self)) + logger.warning("discarding closed connection: %s", conn) + return + + # Check if the connection is past its best before date + if conn._expire_at <= monotonic(): + self.run_task(AddConnection(self)) + logger.info("discarding expired connection") + conn.close() + return + + self._add_to_pool(conn) + + def _add_to_pool(self, conn: Connection[Any]) -> None: + """ + Add a connection to the pool. + + The connection can be a fresh one or one already used in the pool. + + If a client is already waiting for a connection pass it on, otherwise + put it back into the pool + """ + # Remove the pool reference from the connection before returning it + # to the state, to avoid to create a reference loop. + # Also disable the warning for open connection in conn.__del__ + conn._pool = None + + # Critical section: if there is a client waiting give it the connection + # otherwise put it back into the pool. + with self._lock: + while self._waiting: + # If there is a client waiting (which is still waiting and + # hasn't timed out), give it the connection and notify it. + pos = self._waiting.popleft() + if pos.set(conn): + break + else: + # No client waiting for a connection: put it back into the pool + self._pool.append(conn) + + # If we have been asked to wait for pool init, notify the + # waiter if the pool is full. + if self._pool_full_event and len(self._pool) >= self._min_size: + self._pool_full_event.set() + + def _reset_connection(self, conn: Connection[Any]) -> None: + """ + Bring a connection to IDLE state or close it. + """ + status = conn.pgconn.transaction_status + if status == TransactionStatus.IDLE: + pass + + elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR): + # Connection returned with an active transaction + logger.warning("rolling back returned connection: %s", conn) + try: + conn.rollback() + except Exception as ex: + logger.warning( + "rollback failed: %s: %s. Discarding connection %s", + ex.__class__.__name__, + ex, + conn, + ) + conn.close() + + elif status == TransactionStatus.ACTIVE: + # Connection returned during an operation. Bad... just close it. + logger.warning("closing returned connection: %s", conn) + conn.close() + + if not conn.closed and self._reset: + try: + self._reset(conn) + status = conn.pgconn.transaction_status + if status != TransactionStatus.IDLE: + sname = TransactionStatus(status).name + raise e.ProgrammingError( + f"connection left in status {sname} by reset function" + f" {self._reset}: discarded" + ) + except Exception as ex: + logger.warning(f"error resetting connection: {ex}") + conn.close() + + def _shrink_pool(self) -> None: + to_close: Optional[Connection[Any]] = None + + with self._lock: + # Reset the min number of connections used + nconns_min = self._nconns_min + self._nconns_min = len(self._pool) + + # If the pool can shrink and connections were unused, drop one + if self._nconns > self._min_size and nconns_min > 0: + to_close = self._pool.popleft() + self._nconns -= 1 + self._nconns_min -= 1 + + if to_close: + logger.info( + "shrinking pool %r to %s because %s unused connections" + " in the last %s sec", + self.name, + self._nconns, + nconns_min, + self.max_idle, + ) + to_close.close() + + def _get_measures(self) -> Dict[str, int]: + rv = super()._get_measures() + rv[self._REQUESTS_WAITING] = len(self._waiting) + return rv + + +class WaitingClient: + """A position in a queue for a client waiting for a connection.""" + + __slots__ = ("conn", "error", "_cond") + + def __init__(self) -> None: + self.conn: Optional[Connection[Any]] = None + self.error: Optional[Exception] = None + + # The WaitingClient behaves in a way similar to an Event, but we need + # to notify reliably the flagger that the waiter has "accepted" the + # message and it hasn't timed out yet, otherwise the pool may give a + # connection to a client that has already timed out getconn(), which + # will be lost. + self._cond = threading.Condition() + + def wait(self, timeout: float) -> Connection[Any]: + """Wait for a connection to be set and return it. + + Raise an exception if the wait times out or if fail() is called. + """ + with self._cond: + if not (self.conn or self.error): + if not self._cond.wait(timeout): + self.error = PoolTimeout( + f"couldn't get a connection after {timeout} sec" + ) + + if self.conn: + return self.conn + else: + assert self.error + raise self.error + + def set(self, conn: Connection[Any]) -> bool: + """Signal the client waiting that a connection is ready. + + Return True if the client has "accepted" the connection, False + otherwise (typically because wait() has timed out). + """ + with self._cond: + if self.conn or self.error: + return False + + self.conn = conn + self._cond.notify_all() + return True + + def fail(self, error: Exception) -> bool: + """Signal the client that, alas, they won't have a connection today. + + Return True if the client has "accepted" the error, False otherwise + (typically because wait() has timed out). + """ + with self._cond: + if self.conn or self.error: + return False + + self.error = error + self._cond.notify_all() + return True + + +class MaintenanceTask(ABC): + """A task to run asynchronously to maintain the pool state.""" + + def __init__(self, pool: "ConnectionPool"): + self.pool = ref(pool) + + def __repr__(self) -> str: + pool = self.pool() + name = repr(pool.name) if pool else "<pool is gone>" + return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>" + + def run(self) -> None: + """Run the task. + + This usually happens in a worker thread. Call the concrete _run() + implementation, if the pool is still alive. + """ + pool = self.pool() + if not pool or pool.closed: + # Pool is no more working. Quietly discard the operation. + logger.debug("task run discarded: %s", self) + return + + logger.debug("task running in %s: %s", threading.current_thread().name, self) + self._run(pool) + + def tick(self) -> None: + """Run the scheduled task + + This function is called by the scheduler thread. Use a worker to + run the task for real in order to free the scheduler immediately. + """ + pool = self.pool() + if not pool or pool.closed: + # Pool is no more working. Quietly discard the operation. + logger.debug("task tick discarded: %s", self) + return + + pool.run_task(self) + + @abstractmethod + def _run(self, pool: "ConnectionPool") -> None: + ... + + +class StopWorker(MaintenanceTask): + """Signal the maintenance thread to terminate.""" + + def _run(self, pool: "ConnectionPool") -> None: + pass + + +class AddConnection(MaintenanceTask): + def __init__( + self, + pool: "ConnectionPool", + attempt: Optional["ConnectionAttempt"] = None, + growing: bool = False, + ): + super().__init__(pool) + self.attempt = attempt + self.growing = growing + + def _run(self, pool: "ConnectionPool") -> None: + pool._add_connection(self.attempt, growing=self.growing) + + +class ReturnConnection(MaintenanceTask): + """Clean up and return a connection to the pool.""" + + def __init__(self, pool: "ConnectionPool", conn: "Connection[Any]"): + super().__init__(pool) + self.conn = conn + + def _run(self, pool: "ConnectionPool") -> None: + pool._return_connection(self.conn) + + +class ShrinkPool(MaintenanceTask): + """If the pool can shrink, remove one connection. + + Re-schedule periodically and also reset the minimum number of connections + in the pool. + """ + + def _run(self, pool: "ConnectionPool") -> None: + # Reschedule the task now so that in case of any error we don't lose + # the periodic run. + pool.schedule_task(self, pool.max_idle) + pool._shrink_pool() diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py new file mode 100644 index 0000000..0ea6e9a --- /dev/null +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -0,0 +1,784 @@ +""" +psycopg asynchronous connection pool +""" + +# Copyright (C) 2021 The Psycopg Team + +import asyncio +import logging +from abc import ABC, abstractmethod +from time import monotonic +from types import TracebackType +from typing import Any, AsyncIterator, Awaitable, Callable +from typing import Dict, List, Optional, Sequence, Type +from weakref import ref +from contextlib import asynccontextmanager + +from psycopg import errors as e +from psycopg import AsyncConnection +from psycopg.pq import TransactionStatus + +from .base import ConnectionAttempt, BasePool +from .sched import AsyncScheduler +from .errors import PoolClosed, PoolTimeout, TooManyRequests +from ._compat import Task, create_task, Deque + +logger = logging.getLogger("psycopg.pool") + + +class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): + def __init__( + self, + conninfo: str = "", + *, + open: bool = True, + connection_class: Type[AsyncConnection[Any]] = AsyncConnection, + configure: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None, + reset: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None, + **kwargs: Any, + ): + self.connection_class = connection_class + self._configure = configure + self._reset = reset + + # asyncio objects, created on open to attach them to the right loop. + self._lock: asyncio.Lock + self._sched: AsyncScheduler + self._tasks: "asyncio.Queue[MaintenanceTask]" + + self._waiting = Deque["AsyncClient"]() + + # to notify that the pool is full + self._pool_full_event: Optional[asyncio.Event] = None + + self._sched_runner: Optional[Task[None]] = None + self._workers: List[Task[None]] = [] + + super().__init__(conninfo, **kwargs) + + if open: + self._open() + + async def wait(self, timeout: float = 30.0) -> None: + self._check_open_getconn() + + async with self._lock: + assert not self._pool_full_event + if len(self._pool) >= self._min_size: + return + self._pool_full_event = asyncio.Event() + + logger.info("waiting for pool %r initialization", self.name) + try: + await asyncio.wait_for(self._pool_full_event.wait(), timeout) + except asyncio.TimeoutError: + await self.close() # stop all the tasks + raise PoolTimeout( + f"pool initialization incomplete after {timeout} sec" + ) from None + + async with self._lock: + assert self._pool_full_event + self._pool_full_event = None + + logger.info("pool %r is ready to use", self.name) + + @asynccontextmanager + async def connection( + self, timeout: Optional[float] = None + ) -> AsyncIterator[AsyncConnection[Any]]: + conn = await self.getconn(timeout=timeout) + t0 = monotonic() + try: + async with conn: + yield conn + finally: + t1 = monotonic() + self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0)) + await self.putconn(conn) + + async def getconn(self, timeout: Optional[float] = None) -> AsyncConnection[Any]: + logger.info("connection requested from %r", self.name) + self._stats[self._REQUESTS_NUM] += 1 + + self._check_open_getconn() + + # Critical section: decide here if there's a connection ready + # or if the client needs to wait. + async with self._lock: + conn = await self._get_ready_connection(timeout) + if not conn: + # No connection available: put the client in the waiting queue + t0 = monotonic() + pos = AsyncClient() + self._waiting.append(pos) + self._stats[self._REQUESTS_QUEUED] += 1 + + # If there is space for the pool to grow, let's do it + self._maybe_grow_pool() + + # If we are in the waiting queue, wait to be assigned a connection + # (outside the critical section, so only the waiting client is locked) + if not conn: + if timeout is None: + timeout = self.timeout + try: + conn = await pos.wait(timeout=timeout) + except Exception: + self._stats[self._REQUESTS_ERRORS] += 1 + raise + finally: + t1 = monotonic() + self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0)) + + # Tell the connection it belongs to a pool to avoid closing on __exit__ + # Note that this property shouldn't be set while the connection is in + # the pool, to avoid to create a reference loop. + conn._pool = self + logger.info("connection given by %r", self.name) + return conn + + async def _get_ready_connection( + self, timeout: Optional[float] + ) -> Optional[AsyncConnection[Any]]: + conn: Optional[AsyncConnection[Any]] = None + if self._pool: + # Take a connection ready out of the pool + conn = self._pool.popleft() + if len(self._pool) < self._nconns_min: + self._nconns_min = len(self._pool) + elif self.max_waiting and len(self._waiting) >= self.max_waiting: + self._stats[self._REQUESTS_ERRORS] += 1 + raise TooManyRequests( + f"the pool {self.name!r} has already" + f" {len(self._waiting)} requests waiting" + ) + return conn + + def _maybe_grow_pool(self) -> None: + # Allow only one task at time to grow the pool (or returning + # connections might be starved). + if self._nconns < self._max_size and not self._growing: + self._nconns += 1 + logger.info("growing pool %r to %s", self.name, self._nconns) + self._growing = True + self.run_task(AddConnection(self, growing=True)) + + async def putconn(self, conn: AsyncConnection[Any]) -> None: + self._check_pool_putconn(conn) + + logger.info("returning connection to %r", self.name) + if await self._maybe_close_connection(conn): + return + + # Use a worker to perform eventual maintenance work in a separate task + if self._reset: + self.run_task(ReturnConnection(self, conn)) + else: + await self._return_connection(conn) + + async def _maybe_close_connection(self, conn: AsyncConnection[Any]) -> bool: + # If the pool is closed just close the connection instead of returning + # it to the pool. For extra refcare remove the pool reference from it. + if not self._closed: + return False + + conn._pool = None + await conn.close() + return True + + async def open(self, wait: bool = False, timeout: float = 30.0) -> None: + # Make sure the lock is created after there is an event loop + try: + self._lock + except AttributeError: + self._lock = asyncio.Lock() + + async with self._lock: + self._open() + + if wait: + await self.wait(timeout=timeout) + + def _open(self) -> None: + if not self._closed: + return + + # Throw a RuntimeError if the pool is open outside a running loop. + asyncio.get_running_loop() + + self._check_open() + + # Create these objects now to attach them to the right loop. + # See #219 + self._tasks = asyncio.Queue() + self._sched = AsyncScheduler() + # This has been most likely, but not necessarily, created in `open()`. + try: + self._lock + except AttributeError: + self._lock = asyncio.Lock() + + self._closed = False + self._opened = True + + self._start_workers() + self._start_initial_tasks() + + def _start_workers(self) -> None: + self._sched_runner = create_task( + self._sched.run(), name=f"{self.name}-scheduler" + ) + for i in range(self.num_workers): + t = create_task( + self.worker(self._tasks), + name=f"{self.name}-worker-{i}", + ) + self._workers.append(t) + + def _start_initial_tasks(self) -> None: + # populate the pool with initial min_size connections in background + for i in range(self._nconns): + self.run_task(AddConnection(self)) + + # Schedule a task to shrink the pool if connections over min_size have + # remained unused. + self.run_task(Schedule(self, ShrinkPool(self), self.max_idle)) + + async def close(self, timeout: float = 5.0) -> None: + if self._closed: + return + + async with self._lock: + self._closed = True + logger.debug("pool %r closed", self.name) + + # Take waiting client and pool connections out of the state + waiting = list(self._waiting) + self._waiting.clear() + connections = list(self._pool) + self._pool.clear() + + # Now that the flag _closed is set, getconn will fail immediately, + # putconn will just close the returned connection. + await self._stop_workers(waiting, connections, timeout) + + async def _stop_workers( + self, + waiting_clients: Sequence["AsyncClient"] = (), + connections: Sequence[AsyncConnection[Any]] = (), + timeout: float = 0.0, + ) -> None: + # Stop the scheduler + await self._sched.enter(0, None) + + # Stop the worker tasks + workers, self._workers = self._workers[:], [] + for w in workers: + self.run_task(StopWorker(self)) + + # Signal to eventual clients in the queue that business is closed. + for pos in waiting_clients: + await pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) + + # Close the connections still in the pool + for conn in connections: + await conn.close() + + # Wait for the worker tasks to terminate + assert self._sched_runner is not None + sched_runner, self._sched_runner = self._sched_runner, None + wait = asyncio.gather(sched_runner, *workers) + try: + if timeout > 0: + await asyncio.wait_for(asyncio.shield(wait), timeout=timeout) + else: + await wait + except asyncio.TimeoutError: + logger.warning( + "couldn't stop pool %r tasks within %s seconds", + self.name, + timeout, + ) + + async def __aenter__(self) -> "AsyncConnectionPool": + await self.open() + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + await self.close() + + async def resize(self, min_size: int, max_size: Optional[int] = None) -> None: + min_size, max_size = self._check_size(min_size, max_size) + + ngrow = max(0, min_size - self._min_size) + + logger.info( + "resizing %r to min_size=%s max_size=%s", + self.name, + min_size, + max_size, + ) + async with self._lock: + self._min_size = min_size + self._max_size = max_size + self._nconns += ngrow + + for i in range(ngrow): + self.run_task(AddConnection(self)) + + async def check(self) -> None: + async with self._lock: + conns = list(self._pool) + self._pool.clear() + + # Give a chance to the pool to grow if it has no connection. + # In case there are enough connection, or the pool is already + # growing, this is a no-op. + self._maybe_grow_pool() + + while conns: + conn = conns.pop() + try: + await conn.execute("SELECT 1") + if conn.pgconn.transaction_status == TransactionStatus.INTRANS: + await conn.rollback() + except Exception: + self._stats[self._CONNECTIONS_LOST] += 1 + logger.warning("discarding broken connection: %s", conn) + self.run_task(AddConnection(self)) + else: + await self._add_to_pool(conn) + + def reconnect_failed(self) -> None: + """ + Called when reconnection failed for longer than `reconnect_timeout`. + """ + self._reconnect_failed(self) + + def run_task(self, task: "MaintenanceTask") -> None: + """Run a maintenance task in a worker.""" + self._tasks.put_nowait(task) + + async def schedule_task(self, task: "MaintenanceTask", delay: float) -> None: + """Run a maintenance task in a worker in the future.""" + await self._sched.enter(delay, task.tick) + + @classmethod + async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None: + """Runner to execute pending maintenance task. + + The function is designed to run as a task. + + Block on the queue *q*, run a task received. Finish running if a + StopWorker is received. + """ + while True: + task = await q.get() + + if isinstance(task, StopWorker): + logger.debug("terminating working task") + return + + # Run the task. Make sure don't die in the attempt. + try: + await task.run() + except Exception as ex: + logger.warning( + "task run %s failed: %s: %s", + task, + ex.__class__.__name__, + ex, + ) + + async def _connect(self, timeout: Optional[float] = None) -> AsyncConnection[Any]: + self._stats[self._CONNECTIONS_NUM] += 1 + kwargs = self.kwargs + if timeout: + kwargs = kwargs.copy() + kwargs["connect_timeout"] = max(round(timeout), 1) + t0 = monotonic() + try: + conn: AsyncConnection[Any] + conn = await self.connection_class.connect(self.conninfo, **kwargs) + except Exception: + self._stats[self._CONNECTIONS_ERRORS] += 1 + raise + else: + t1 = monotonic() + self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0)) + + conn._pool = self + + if self._configure: + await self._configure(conn) + status = conn.pgconn.transaction_status + if status != TransactionStatus.IDLE: + sname = TransactionStatus(status).name + raise e.ProgrammingError( + f"connection left in status {sname} by configure function" + f" {self._configure}: discarded" + ) + + # Set an expiry date, with some randomness to avoid mass reconnection + self._set_connection_expiry_date(conn) + return conn + + async def _add_connection( + self, attempt: Optional[ConnectionAttempt], growing: bool = False + ) -> None: + """Try to connect and add the connection to the pool. + + If failed, reschedule a new attempt in the future for a few times, then + give up, decrease the pool connections number and call + `self.reconnect_failed()`. + + """ + now = monotonic() + if not attempt: + attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout) + + try: + conn = await self._connect() + except Exception as ex: + logger.warning(f"error connecting in {self.name!r}: {ex}") + if attempt.time_to_give_up(now): + logger.warning( + "reconnection attempt in pool %r failed after %s sec", + self.name, + self.reconnect_timeout, + ) + async with self._lock: + self._nconns -= 1 + # If we have given up with a growing attempt, allow a new one. + if growing and self._growing: + self._growing = False + self.reconnect_failed() + else: + attempt.update_delay(now) + await self.schedule_task( + AddConnection(self, attempt, growing=growing), + attempt.delay, + ) + return + + logger.info("adding new connection to the pool") + await self._add_to_pool(conn) + if growing: + async with self._lock: + # Keep on growing if the pool is not full yet, or if there are + # clients waiting and the pool can extend. + if self._nconns < self._min_size or ( + self._nconns < self._max_size and self._waiting + ): + self._nconns += 1 + logger.info("growing pool %r to %s", self.name, self._nconns) + self.run_task(AddConnection(self, growing=True)) + else: + self._growing = False + + async def _return_connection(self, conn: AsyncConnection[Any]) -> None: + """ + Return a connection to the pool after usage. + """ + await self._reset_connection(conn) + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 + # Connection no more in working state: create a new one. + self.run_task(AddConnection(self)) + logger.warning("discarding closed connection: %s", conn) + return + + # Check if the connection is past its best before date + if conn._expire_at <= monotonic(): + self.run_task(AddConnection(self)) + logger.info("discarding expired connection") + await conn.close() + return + + await self._add_to_pool(conn) + + async def _add_to_pool(self, conn: AsyncConnection[Any]) -> None: + """ + Add a connection to the pool. + + The connection can be a fresh one or one already used in the pool. + + If a client is already waiting for a connection pass it on, otherwise + put it back into the pool + """ + # Remove the pool reference from the connection before returning it + # to the state, to avoid to create a reference loop. + # Also disable the warning for open connection in conn.__del__ + conn._pool = None + + # Critical section: if there is a client waiting give it the connection + # otherwise put it back into the pool. + async with self._lock: + while self._waiting: + # If there is a client waiting (which is still waiting and + # hasn't timed out), give it the connection and notify it. + pos = self._waiting.popleft() + if await pos.set(conn): + break + else: + # No client waiting for a connection: put it back into the pool + self._pool.append(conn) + + # If we have been asked to wait for pool init, notify the + # waiter if the pool is full. + if self._pool_full_event and len(self._pool) >= self._min_size: + self._pool_full_event.set() + + async def _reset_connection(self, conn: AsyncConnection[Any]) -> None: + """ + Bring a connection to IDLE state or close it. + """ + status = conn.pgconn.transaction_status + if status == TransactionStatus.IDLE: + pass + + elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR): + # Connection returned with an active transaction + logger.warning("rolling back returned connection: %s", conn) + try: + await conn.rollback() + except Exception as ex: + logger.warning( + "rollback failed: %s: %s. Discarding connection %s", + ex.__class__.__name__, + ex, + conn, + ) + await conn.close() + + elif status == TransactionStatus.ACTIVE: + # Connection returned during an operation. Bad... just close it. + logger.warning("closing returned connection: %s", conn) + await conn.close() + + if not conn.closed and self._reset: + try: + await self._reset(conn) + status = conn.pgconn.transaction_status + if status != TransactionStatus.IDLE: + sname = TransactionStatus(status).name + raise e.ProgrammingError( + f"connection left in status {sname} by reset function" + f" {self._reset}: discarded" + ) + except Exception as ex: + logger.warning(f"error resetting connection: {ex}") + await conn.close() + + async def _shrink_pool(self) -> None: + to_close: Optional[AsyncConnection[Any]] = None + + async with self._lock: + # Reset the min number of connections used + nconns_min = self._nconns_min + self._nconns_min = len(self._pool) + + # If the pool can shrink and connections were unused, drop one + if self._nconns > self._min_size and nconns_min > 0: + to_close = self._pool.popleft() + self._nconns -= 1 + self._nconns_min -= 1 + + if to_close: + logger.info( + "shrinking pool %r to %s because %s unused connections" + " in the last %s sec", + self.name, + self._nconns, + nconns_min, + self.max_idle, + ) + await to_close.close() + + def _get_measures(self) -> Dict[str, int]: + rv = super()._get_measures() + rv[self._REQUESTS_WAITING] = len(self._waiting) + return rv + + +class AsyncClient: + """A position in a queue for a client waiting for a connection.""" + + __slots__ = ("conn", "error", "_cond") + + def __init__(self) -> None: + self.conn: Optional[AsyncConnection[Any]] = None + self.error: Optional[Exception] = None + + # The AsyncClient behaves in a way similar to an Event, but we need + # to notify reliably the flagger that the waiter has "accepted" the + # message and it hasn't timed out yet, otherwise the pool may give a + # connection to a client that has already timed out getconn(), which + # will be lost. + self._cond = asyncio.Condition() + + async def wait(self, timeout: float) -> AsyncConnection[Any]: + """Wait for a connection to be set and return it. + + Raise an exception if the wait times out or if fail() is called. + """ + async with self._cond: + if not (self.conn or self.error): + try: + await asyncio.wait_for(self._cond.wait(), timeout) + except asyncio.TimeoutError: + self.error = PoolTimeout( + f"couldn't get a connection after {timeout} sec" + ) + + if self.conn: + return self.conn + else: + assert self.error + raise self.error + + async def set(self, conn: AsyncConnection[Any]) -> bool: + """Signal the client waiting that a connection is ready. + + Return True if the client has "accepted" the connection, False + otherwise (typically because wait() has timed out). + """ + async with self._cond: + if self.conn or self.error: + return False + + self.conn = conn + self._cond.notify_all() + return True + + async def fail(self, error: Exception) -> bool: + """Signal the client that, alas, they won't have a connection today. + + Return True if the client has "accepted" the error, False otherwise + (typically because wait() has timed out). + """ + async with self._cond: + if self.conn or self.error: + return False + + self.error = error + self._cond.notify_all() + return True + + +class MaintenanceTask(ABC): + """A task to run asynchronously to maintain the pool state.""" + + def __init__(self, pool: "AsyncConnectionPool"): + self.pool = ref(pool) + + def __repr__(self) -> str: + pool = self.pool() + name = repr(pool.name) if pool else "<pool is gone>" + return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>" + + async def run(self) -> None: + """Run the task. + + This usually happens in a worker. Call the concrete _run() + implementation, if the pool is still alive. + """ + pool = self.pool() + if not pool or pool.closed: + # Pool is no more working. Quietly discard the operation. + logger.debug("task run discarded: %s", self) + return + + await self._run(pool) + + async def tick(self) -> None: + """Run the scheduled task + + This function is called by the scheduler task. Use a worker to + run the task for real in order to free the scheduler immediately. + """ + pool = self.pool() + if not pool or pool.closed: + # Pool is no more working. Quietly discard the operation. + logger.debug("task tick discarded: %s", self) + return + + pool.run_task(self) + + @abstractmethod + async def _run(self, pool: "AsyncConnectionPool") -> None: + ... + + +class StopWorker(MaintenanceTask): + """Signal the maintenance worker to terminate.""" + + async def _run(self, pool: "AsyncConnectionPool") -> None: + pass + + +class AddConnection(MaintenanceTask): + def __init__( + self, + pool: "AsyncConnectionPool", + attempt: Optional["ConnectionAttempt"] = None, + growing: bool = False, + ): + super().__init__(pool) + self.attempt = attempt + self.growing = growing + + async def _run(self, pool: "AsyncConnectionPool") -> None: + await pool._add_connection(self.attempt, growing=self.growing) + + +class ReturnConnection(MaintenanceTask): + """Clean up and return a connection to the pool.""" + + def __init__(self, pool: "AsyncConnectionPool", conn: "AsyncConnection[Any]"): + super().__init__(pool) + self.conn = conn + + async def _run(self, pool: "AsyncConnectionPool") -> None: + await pool._return_connection(self.conn) + + +class ShrinkPool(MaintenanceTask): + """If the pool can shrink, remove one connection. + + Re-schedule periodically and also reset the minimum number of connections + in the pool. + """ + + async def _run(self, pool: "AsyncConnectionPool") -> None: + # Reschedule the task now so that in case of any error we don't lose + # the periodic run. + await pool.schedule_task(self, pool.max_idle) + await pool._shrink_pool() + + +class Schedule(MaintenanceTask): + """Schedule a task in the pool scheduler. + + This task is a trampoline to allow to use a sync call (pool.run_task) + to execute an async one (pool.schedule_task). + """ + + def __init__( + self, + pool: "AsyncConnectionPool", + task: MaintenanceTask, + delay: float, + ): + super().__init__(pool) + self.task = task + self.delay = delay + + async def _run(self, pool: "AsyncConnectionPool") -> None: + await pool.schedule_task(self.task, self.delay) diff --git a/psycopg_pool/psycopg_pool/py.typed b/psycopg_pool/psycopg_pool/py.typed new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/psycopg_pool/psycopg_pool/py.typed diff --git a/psycopg_pool/psycopg_pool/sched.py b/psycopg_pool/psycopg_pool/sched.py new file mode 100644 index 0000000..ca26007 --- /dev/null +++ b/psycopg_pool/psycopg_pool/sched.py @@ -0,0 +1,177 @@ +""" +A minimal scheduler to schedule tasks run in the future. + +Inspired to the standard library `sched.scheduler`, but designed for +multi-thread usage ground up, not as an afterthought. Tasks can be scheduled in +front of the one currently running and `Scheduler.run()` can be left running +without any task scheduled. + +Tasks are called "Task", not "Event", here, because we actually make use of +`threading.Event` and the two would be confusing. +""" + +# Copyright (C) 2021 The Psycopg Team + +import asyncio +import logging +import threading +from time import monotonic +from heapq import heappush, heappop +from typing import Any, Callable, List, Optional, NamedTuple + +logger = logging.getLogger(__name__) + + +class Task(NamedTuple): + time: float + action: Optional[Callable[[], Any]] + + def __eq__(self, other: "Task") -> Any: # type: ignore[override] + return self.time == other.time + + def __lt__(self, other: "Task") -> Any: # type: ignore[override] + return self.time < other.time + + def __le__(self, other: "Task") -> Any: # type: ignore[override] + return self.time <= other.time + + def __gt__(self, other: "Task") -> Any: # type: ignore[override] + return self.time > other.time + + def __ge__(self, other: "Task") -> Any: # type: ignore[override] + return self.time >= other.time + + +class Scheduler: + def __init__(self) -> None: + """Initialize a new instance, passing the time and delay functions.""" + self._queue: List[Task] = [] + self._lock = threading.RLock() + self._event = threading.Event() + + EMPTY_QUEUE_TIMEOUT = 600.0 + + def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task: + """Enter a new task in the queue delayed in the future. + + Schedule a `!None` to stop the execution. + """ + time = monotonic() + delay + return self.enterabs(time, action) + + def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task: + """Enter a new task in the queue at an absolute time. + + Schedule a `!None` to stop the execution. + """ + task = Task(time, action) + with self._lock: + heappush(self._queue, task) + first = self._queue[0] is task + + if first: + self._event.set() + + return task + + def run(self) -> None: + """Execute the events scheduled.""" + q = self._queue + while True: + with self._lock: + now = monotonic() + task = q[0] if q else None + if task: + if task.time <= now: + heappop(q) + else: + delay = task.time - now + task = None + else: + delay = self.EMPTY_QUEUE_TIMEOUT + self._event.clear() + + if task: + if not task.action: + break + try: + task.action() + except Exception as e: + logger.warning( + "scheduled task run %s failed: %s: %s", + task.action, + e.__class__.__name__, + e, + ) + else: + # Block for the expected timeout or until a new task scheduled + self._event.wait(timeout=delay) + + +class AsyncScheduler: + def __init__(self) -> None: + """Initialize a new instance, passing the time and delay functions.""" + self._queue: List[Task] = [] + self._lock = asyncio.Lock() + self._event = asyncio.Event() + + EMPTY_QUEUE_TIMEOUT = 600.0 + + async def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task: + """Enter a new task in the queue delayed in the future. + + Schedule a `!None` to stop the execution. + """ + time = monotonic() + delay + return await self.enterabs(time, action) + + async def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task: + """Enter a new task in the queue at an absolute time. + + Schedule a `!None` to stop the execution. + """ + task = Task(time, action) + async with self._lock: + heappush(self._queue, task) + first = self._queue[0] is task + + if first: + self._event.set() + + return task + + async def run(self) -> None: + """Execute the events scheduled.""" + q = self._queue + while True: + async with self._lock: + now = monotonic() + task = q[0] if q else None + if task: + if task.time <= now: + heappop(q) + else: + delay = task.time - now + task = None + else: + delay = self.EMPTY_QUEUE_TIMEOUT + self._event.clear() + + if task: + if not task.action: + break + try: + await task.action() + except Exception as e: + logger.warning( + "scheduled task run %s failed: %s: %s", + task.action, + e.__class__.__name__, + e, + ) + else: + # Block for the expected timeout or until a new task scheduled + try: + await asyncio.wait_for(self._event.wait(), delay) + except asyncio.TimeoutError: + pass diff --git a/psycopg_pool/psycopg_pool/version.py b/psycopg_pool/psycopg_pool/version.py new file mode 100644 index 0000000..fc99bbd --- /dev/null +++ b/psycopg_pool/psycopg_pool/version.py @@ -0,0 +1,13 @@ +""" +psycopg pool version file. +""" + +# Copyright (C) 2021 The Psycopg Team + +# Use a versioning scheme as defined in +# https://www.python.org/dev/peps/pep-0440/ + +# STOP AND READ! if you change: +__version__ = "3.1.5" +# also change: +# - `docs/news_pool.rst` to declare this version current or unreleased diff --git a/psycopg_pool/setup.cfg b/psycopg_pool/setup.cfg new file mode 100644 index 0000000..1a3274e --- /dev/null +++ b/psycopg_pool/setup.cfg @@ -0,0 +1,45 @@ +[metadata] +name = psycopg-pool +description = Connection Pool for Psycopg +url = https://psycopg.org/psycopg3/ +author = Daniele Varrazzo +author_email = daniele.varrazzo@gmail.com +license = GNU Lesser General Public License v3 (LGPLv3) + +project_urls = + Homepage = https://psycopg.org/ + Code = https://github.com/psycopg/psycopg + Issue Tracker = https://github.com/psycopg/psycopg/issues + Download = https://pypi.org/project/psycopg-pool/ + +classifiers = + Development Status :: 5 - Production/Stable + Intended Audience :: Developers + License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3) + Operating System :: MacOS :: MacOS X + Operating System :: Microsoft :: Windows + Operating System :: POSIX + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + Programming Language :: Python :: 3.11 + Topic :: Database + Topic :: Database :: Front-Ends + Topic :: Software Development + Topic :: Software Development :: Libraries :: Python Modules + +long_description = file: README.rst +long_description_content_type = text/x-rst +license_files = LICENSE.txt + +[options] +python_requires = >= 3.7 +packages = find: +zip_safe = False +install_requires = + typing-extensions >= 3.10 + +[options.package_data] +psycopg_pool = py.typed diff --git a/psycopg_pool/setup.py b/psycopg_pool/setup.py new file mode 100644 index 0000000..771847d --- /dev/null +++ b/psycopg_pool/setup.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +""" +PostgreSQL database adapter for Python - Connection Pool +""" + +# Copyright (C) 2020 The Psycopg Team + +import os +import re +from setuptools import setup + +# Move to the directory of setup.py: executing this file from another location +# (e.g. from the project root) will fail +here = os.path.abspath(os.path.dirname(__file__)) +if os.path.abspath(os.getcwd()) != here: + os.chdir(here) + +with open("psycopg_pool/version.py") as f: + data = f.read() + m = re.search(r"""(?m)^__version__\s*=\s*['"]([^'"]+)['"]""", data) + if not m: + raise Exception(f"cannot find version in {f.name}") + version = m.group(1) + + +setup(version=version) |