summaryrefslogtreecommitdiffstats
path: root/psycopg_pool/psycopg_pool/pool_async.py
blob: 0ea6e9a40a5cd985201cda767daa57c8d37f9d56 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
"""
psycopg asynchronous connection pool
"""

# Copyright (C) 2021 The Psycopg Team

import asyncio
import logging
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, Awaitable, Callable
from typing import Dict, List, Optional, Sequence, Type
from weakref import ref
from contextlib import asynccontextmanager

from psycopg import errors as e
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus

from .base import ConnectionAttempt, BasePool
from .sched import AsyncScheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
from ._compat import Task, create_task, Deque

logger = logging.getLogger("psycopg.pool")


class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
    def __init__(
        self,
        conninfo: str = "",
        *,
        open: bool = True,
        connection_class: Type[AsyncConnection[Any]] = AsyncConnection,
        configure: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None,
        reset: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None,
        **kwargs: Any,
    ):
        self.connection_class = connection_class
        self._configure = configure
        self._reset = reset

        # asyncio objects, created on open to attach them to the right loop.
        self._lock: asyncio.Lock
        self._sched: AsyncScheduler
        self._tasks: "asyncio.Queue[MaintenanceTask]"

        self._waiting = Deque["AsyncClient"]()

        # to notify that the pool is full
        self._pool_full_event: Optional[asyncio.Event] = None

        self._sched_runner: Optional[Task[None]] = None
        self._workers: List[Task[None]] = []

        super().__init__(conninfo, **kwargs)

        if open:
            self._open()

    async def wait(self, timeout: float = 30.0) -> None:
        self._check_open_getconn()

        async with self._lock:
            assert not self._pool_full_event
            if len(self._pool) >= self._min_size:
                return
            self._pool_full_event = asyncio.Event()

        logger.info("waiting for pool %r initialization", self.name)
        try:
            await asyncio.wait_for(self._pool_full_event.wait(), timeout)
        except asyncio.TimeoutError:
            await self.close()  # stop all the tasks
            raise PoolTimeout(
                f"pool initialization incomplete after {timeout} sec"
            ) from None

        async with self._lock:
            assert self._pool_full_event
            self._pool_full_event = None

        logger.info("pool %r is ready to use", self.name)

    @asynccontextmanager
    async def connection(
        self, timeout: Optional[float] = None
    ) -> AsyncIterator[AsyncConnection[Any]]:
        conn = await self.getconn(timeout=timeout)
        t0 = monotonic()
        try:
            async with conn:
                yield conn
        finally:
            t1 = monotonic()
            self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
            await self.putconn(conn)

    async def getconn(self, timeout: Optional[float] = None) -> AsyncConnection[Any]:
        logger.info("connection requested from %r", self.name)
        self._stats[self._REQUESTS_NUM] += 1

        self._check_open_getconn()

        # Critical section: decide here if there's a connection ready
        # or if the client needs to wait.
        async with self._lock:
            conn = await self._get_ready_connection(timeout)
            if not conn:
                # No connection available: put the client in the waiting queue
                t0 = monotonic()
                pos = AsyncClient()
                self._waiting.append(pos)
                self._stats[self._REQUESTS_QUEUED] += 1

                # If there is space for the pool to grow, let's do it
                self._maybe_grow_pool()

        # If we are in the waiting queue, wait to be assigned a connection
        # (outside the critical section, so only the waiting client is locked)
        if not conn:
            if timeout is None:
                timeout = self.timeout
            try:
                conn = await pos.wait(timeout=timeout)
            except Exception:
                self._stats[self._REQUESTS_ERRORS] += 1
                raise
            finally:
                t1 = monotonic()
                self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))

        # Tell the connection it belongs to a pool to avoid closing on __exit__
        # Note that this property shouldn't be set while the connection is in
        # the pool, to avoid to create a reference loop.
        conn._pool = self
        logger.info("connection given by %r", self.name)
        return conn

    async def _get_ready_connection(
        self, timeout: Optional[float]
    ) -> Optional[AsyncConnection[Any]]:
        conn: Optional[AsyncConnection[Any]] = None
        if self._pool:
            # Take a connection ready out of the pool
            conn = self._pool.popleft()
            if len(self._pool) < self._nconns_min:
                self._nconns_min = len(self._pool)
        elif self.max_waiting and len(self._waiting) >= self.max_waiting:
            self._stats[self._REQUESTS_ERRORS] += 1
            raise TooManyRequests(
                f"the pool {self.name!r} has already"
                f" {len(self._waiting)} requests waiting"
            )
        return conn

    def _maybe_grow_pool(self) -> None:
        # Allow only one task at time to grow the pool (or returning
        # connections might be starved).
        if self._nconns < self._max_size and not self._growing:
            self._nconns += 1
            logger.info("growing pool %r to %s", self.name, self._nconns)
            self._growing = True
            self.run_task(AddConnection(self, growing=True))

    async def putconn(self, conn: AsyncConnection[Any]) -> None:
        self._check_pool_putconn(conn)

        logger.info("returning connection to %r", self.name)
        if await self._maybe_close_connection(conn):
            return

        # Use a worker to perform eventual maintenance work in a separate task
        if self._reset:
            self.run_task(ReturnConnection(self, conn))
        else:
            await self._return_connection(conn)

    async def _maybe_close_connection(self, conn: AsyncConnection[Any]) -> bool:
        # If the pool is closed just close the connection instead of returning
        # it to the pool. For extra refcare remove the pool reference from it.
        if not self._closed:
            return False

        conn._pool = None
        await conn.close()
        return True

    async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
        # Make sure the lock is created after there is an event loop
        try:
            self._lock
        except AttributeError:
            self._lock = asyncio.Lock()

        async with self._lock:
            self._open()

        if wait:
            await self.wait(timeout=timeout)

    def _open(self) -> None:
        if not self._closed:
            return

        # Throw a RuntimeError if the pool is open outside a running loop.
        asyncio.get_running_loop()

        self._check_open()

        # Create these objects now to attach them to the right loop.
        # See #219
        self._tasks = asyncio.Queue()
        self._sched = AsyncScheduler()
        # This has been most likely, but not necessarily, created in `open()`.
        try:
            self._lock
        except AttributeError:
            self._lock = asyncio.Lock()

        self._closed = False
        self._opened = True

        self._start_workers()
        self._start_initial_tasks()

    def _start_workers(self) -> None:
        self._sched_runner = create_task(
            self._sched.run(), name=f"{self.name}-scheduler"
        )
        for i in range(self.num_workers):
            t = create_task(
                self.worker(self._tasks),
                name=f"{self.name}-worker-{i}",
            )
            self._workers.append(t)

    def _start_initial_tasks(self) -> None:
        # populate the pool with initial min_size connections in background
        for i in range(self._nconns):
            self.run_task(AddConnection(self))

        # Schedule a task to shrink the pool if connections over min_size have
        # remained unused.
        self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))

    async def close(self, timeout: float = 5.0) -> None:
        if self._closed:
            return

        async with self._lock:
            self._closed = True
            logger.debug("pool %r closed", self.name)

            # Take waiting client and pool connections out of the state
            waiting = list(self._waiting)
            self._waiting.clear()
            connections = list(self._pool)
            self._pool.clear()

        # Now that the flag _closed is set, getconn will fail immediately,
        # putconn will just close the returned connection.
        await self._stop_workers(waiting, connections, timeout)

    async def _stop_workers(
        self,
        waiting_clients: Sequence["AsyncClient"] = (),
        connections: Sequence[AsyncConnection[Any]] = (),
        timeout: float = 0.0,
    ) -> None:
        # Stop the scheduler
        await self._sched.enter(0, None)

        # Stop the worker tasks
        workers, self._workers = self._workers[:], []
        for w in workers:
            self.run_task(StopWorker(self))

        # Signal to eventual clients in the queue that business is closed.
        for pos in waiting_clients:
            await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))

        # Close the connections still in the pool
        for conn in connections:
            await conn.close()

        # Wait for the worker tasks to terminate
        assert self._sched_runner is not None
        sched_runner, self._sched_runner = self._sched_runner, None
        wait = asyncio.gather(sched_runner, *workers)
        try:
            if timeout > 0:
                await asyncio.wait_for(asyncio.shield(wait), timeout=timeout)
            else:
                await wait
        except asyncio.TimeoutError:
            logger.warning(
                "couldn't stop pool %r tasks within %s seconds",
                self.name,
                timeout,
            )

    async def __aenter__(self) -> "AsyncConnectionPool":
        await self.open()
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        await self.close()

    async def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
        min_size, max_size = self._check_size(min_size, max_size)

        ngrow = max(0, min_size - self._min_size)

        logger.info(
            "resizing %r to min_size=%s max_size=%s",
            self.name,
            min_size,
            max_size,
        )
        async with self._lock:
            self._min_size = min_size
            self._max_size = max_size
            self._nconns += ngrow

        for i in range(ngrow):
            self.run_task(AddConnection(self))

    async def check(self) -> None:
        async with self._lock:
            conns = list(self._pool)
            self._pool.clear()

            # Give a chance to the pool to grow if it has no connection.
            # In case there are enough connection, or the pool is already
            # growing, this is a no-op.
            self._maybe_grow_pool()

        while conns:
            conn = conns.pop()
            try:
                await conn.execute("SELECT 1")
                if conn.pgconn.transaction_status == TransactionStatus.INTRANS:
                    await conn.rollback()
            except Exception:
                self._stats[self._CONNECTIONS_LOST] += 1
                logger.warning("discarding broken connection: %s", conn)
                self.run_task(AddConnection(self))
            else:
                await self._add_to_pool(conn)

    def reconnect_failed(self) -> None:
        """
        Called when reconnection failed for longer than `reconnect_timeout`.
        """
        self._reconnect_failed(self)

    def run_task(self, task: "MaintenanceTask") -> None:
        """Run a maintenance task in a worker."""
        self._tasks.put_nowait(task)

    async def schedule_task(self, task: "MaintenanceTask", delay: float) -> None:
        """Run a maintenance task in a worker in the future."""
        await self._sched.enter(delay, task.tick)

    @classmethod
    async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None:
        """Runner to execute pending maintenance task.

        The function is designed to run as a task.

        Block on the queue *q*, run a task received. Finish running if a
        StopWorker is received.
        """
        while True:
            task = await q.get()

            if isinstance(task, StopWorker):
                logger.debug("terminating working task")
                return

            # Run the task. Make sure don't die in the attempt.
            try:
                await task.run()
            except Exception as ex:
                logger.warning(
                    "task run %s failed: %s: %s",
                    task,
                    ex.__class__.__name__,
                    ex,
                )

    async def _connect(self, timeout: Optional[float] = None) -> AsyncConnection[Any]:
        self._stats[self._CONNECTIONS_NUM] += 1
        kwargs = self.kwargs
        if timeout:
            kwargs = kwargs.copy()
            kwargs["connect_timeout"] = max(round(timeout), 1)
        t0 = monotonic()
        try:
            conn: AsyncConnection[Any]
            conn = await self.connection_class.connect(self.conninfo, **kwargs)
        except Exception:
            self._stats[self._CONNECTIONS_ERRORS] += 1
            raise
        else:
            t1 = monotonic()
            self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0))

        conn._pool = self

        if self._configure:
            await self._configure(conn)
            status = conn.pgconn.transaction_status
            if status != TransactionStatus.IDLE:
                sname = TransactionStatus(status).name
                raise e.ProgrammingError(
                    f"connection left in status {sname} by configure function"
                    f" {self._configure}: discarded"
                )

        # Set an expiry date, with some randomness to avoid mass reconnection
        self._set_connection_expiry_date(conn)
        return conn

    async def _add_connection(
        self, attempt: Optional[ConnectionAttempt], growing: bool = False
    ) -> None:
        """Try to connect and add the connection to the pool.

        If failed, reschedule a new attempt in the future for a few times, then
        give up, decrease the pool connections number and call
        `self.reconnect_failed()`.

        """
        now = monotonic()
        if not attempt:
            attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout)

        try:
            conn = await self._connect()
        except Exception as ex:
            logger.warning(f"error connecting in {self.name!r}: {ex}")
            if attempt.time_to_give_up(now):
                logger.warning(
                    "reconnection attempt in pool %r failed after %s sec",
                    self.name,
                    self.reconnect_timeout,
                )
                async with self._lock:
                    self._nconns -= 1
                    # If we have given up with a growing attempt, allow a new one.
                    if growing and self._growing:
                        self._growing = False
                self.reconnect_failed()
            else:
                attempt.update_delay(now)
                await self.schedule_task(
                    AddConnection(self, attempt, growing=growing),
                    attempt.delay,
                )
            return

        logger.info("adding new connection to the pool")
        await self._add_to_pool(conn)
        if growing:
            async with self._lock:
                # Keep on growing if the pool is not full yet, or if there are
                # clients waiting and the pool can extend.
                if self._nconns < self._min_size or (
                    self._nconns < self._max_size and self._waiting
                ):
                    self._nconns += 1
                    logger.info("growing pool %r to %s", self.name, self._nconns)
                    self.run_task(AddConnection(self, growing=True))
                else:
                    self._growing = False

    async def _return_connection(self, conn: AsyncConnection[Any]) -> None:
        """
        Return a connection to the pool after usage.
        """
        await self._reset_connection(conn)
        if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
            self._stats[self._RETURNS_BAD] += 1
            # Connection no more in working state: create a new one.
            self.run_task(AddConnection(self))
            logger.warning("discarding closed connection: %s", conn)
            return

        # Check if the connection is past its best before date
        if conn._expire_at <= monotonic():
            self.run_task(AddConnection(self))
            logger.info("discarding expired connection")
            await conn.close()
            return

        await self._add_to_pool(conn)

    async def _add_to_pool(self, conn: AsyncConnection[Any]) -> None:
        """
        Add a connection to the pool.

        The connection can be a fresh one or one already used in the pool.

        If a client is already waiting for a connection pass it on, otherwise
        put it back into the pool
        """
        # Remove the pool reference from the connection before returning it
        # to the state, to avoid to create a reference loop.
        # Also disable the warning for open connection in conn.__del__
        conn._pool = None

        # Critical section: if there is a client waiting give it the connection
        # otherwise put it back into the pool.
        async with self._lock:
            while self._waiting:
                # If there is a client waiting (which is still waiting and
                # hasn't timed out), give it the connection and notify it.
                pos = self._waiting.popleft()
                if await pos.set(conn):
                    break
            else:
                # No client waiting for a connection: put it back into the pool
                self._pool.append(conn)

                # If we have been asked to wait for pool init, notify the
                # waiter if the pool is full.
                if self._pool_full_event and len(self._pool) >= self._min_size:
                    self._pool_full_event.set()

    async def _reset_connection(self, conn: AsyncConnection[Any]) -> None:
        """
        Bring a connection to IDLE state or close it.
        """
        status = conn.pgconn.transaction_status
        if status == TransactionStatus.IDLE:
            pass

        elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
            # Connection returned with an active transaction
            logger.warning("rolling back returned connection: %s", conn)
            try:
                await conn.rollback()
            except Exception as ex:
                logger.warning(
                    "rollback failed: %s: %s. Discarding connection %s",
                    ex.__class__.__name__,
                    ex,
                    conn,
                )
                await conn.close()

        elif status == TransactionStatus.ACTIVE:
            # Connection returned during an operation. Bad... just close it.
            logger.warning("closing returned connection: %s", conn)
            await conn.close()

        if not conn.closed and self._reset:
            try:
                await self._reset(conn)
                status = conn.pgconn.transaction_status
                if status != TransactionStatus.IDLE:
                    sname = TransactionStatus(status).name
                    raise e.ProgrammingError(
                        f"connection left in status {sname} by reset function"
                        f" {self._reset}: discarded"
                    )
            except Exception as ex:
                logger.warning(f"error resetting connection: {ex}")
                await conn.close()

    async def _shrink_pool(self) -> None:
        to_close: Optional[AsyncConnection[Any]] = None

        async with self._lock:
            # Reset the min number of connections used
            nconns_min = self._nconns_min
            self._nconns_min = len(self._pool)

            # If the pool can shrink and connections were unused, drop one
            if self._nconns > self._min_size and nconns_min > 0:
                to_close = self._pool.popleft()
                self._nconns -= 1
                self._nconns_min -= 1

        if to_close:
            logger.info(
                "shrinking pool %r to %s because %s unused connections"
                " in the last %s sec",
                self.name,
                self._nconns,
                nconns_min,
                self.max_idle,
            )
            await to_close.close()

    def _get_measures(self) -> Dict[str, int]:
        rv = super()._get_measures()
        rv[self._REQUESTS_WAITING] = len(self._waiting)
        return rv


class AsyncClient:
    """A position in a queue for a client waiting for a connection."""

    __slots__ = ("conn", "error", "_cond")

    def __init__(self) -> None:
        self.conn: Optional[AsyncConnection[Any]] = None
        self.error: Optional[Exception] = None

        # The AsyncClient behaves in a way similar to an Event, but we need
        # to notify reliably the flagger that the waiter has "accepted" the
        # message and it hasn't timed out yet, otherwise the pool may give a
        # connection to a client that has already timed out getconn(), which
        # will be lost.
        self._cond = asyncio.Condition()

    async def wait(self, timeout: float) -> AsyncConnection[Any]:
        """Wait for a connection to be set and return it.

        Raise an exception if the wait times out or if fail() is called.
        """
        async with self._cond:
            if not (self.conn or self.error):
                try:
                    await asyncio.wait_for(self._cond.wait(), timeout)
                except asyncio.TimeoutError:
                    self.error = PoolTimeout(
                        f"couldn't get a connection after {timeout} sec"
                    )

        if self.conn:
            return self.conn
        else:
            assert self.error
            raise self.error

    async def set(self, conn: AsyncConnection[Any]) -> bool:
        """Signal the client waiting that a connection is ready.

        Return True if the client has "accepted" the connection, False
        otherwise (typically because wait() has timed out).
        """
        async with self._cond:
            if self.conn or self.error:
                return False

            self.conn = conn
            self._cond.notify_all()
            return True

    async def fail(self, error: Exception) -> bool:
        """Signal the client that, alas, they won't have a connection today.

        Return True if the client has "accepted" the error, False otherwise
        (typically because wait() has timed out).
        """
        async with self._cond:
            if self.conn or self.error:
                return False

            self.error = error
            self._cond.notify_all()
            return True


class MaintenanceTask(ABC):
    """A task to run asynchronously to maintain the pool state."""

    def __init__(self, pool: "AsyncConnectionPool"):
        self.pool = ref(pool)

    def __repr__(self) -> str:
        pool = self.pool()
        name = repr(pool.name) if pool else "<pool is gone>"
        return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"

    async def run(self) -> None:
        """Run the task.

        This usually happens in a worker. Call the concrete _run()
        implementation, if the pool is still alive.
        """
        pool = self.pool()
        if not pool or pool.closed:
            # Pool is no more working. Quietly discard the operation.
            logger.debug("task run discarded: %s", self)
            return

        await self._run(pool)

    async def tick(self) -> None:
        """Run the scheduled task

        This function is called by the scheduler task. Use a worker to
        run the task for real in order to free the scheduler immediately.
        """
        pool = self.pool()
        if not pool or pool.closed:
            # Pool is no more working. Quietly discard the operation.
            logger.debug("task tick discarded: %s", self)
            return

        pool.run_task(self)

    @abstractmethod
    async def _run(self, pool: "AsyncConnectionPool") -> None:
        ...


class StopWorker(MaintenanceTask):
    """Signal the maintenance worker to terminate."""

    async def _run(self, pool: "AsyncConnectionPool") -> None:
        pass


class AddConnection(MaintenanceTask):
    def __init__(
        self,
        pool: "AsyncConnectionPool",
        attempt: Optional["ConnectionAttempt"] = None,
        growing: bool = False,
    ):
        super().__init__(pool)
        self.attempt = attempt
        self.growing = growing

    async def _run(self, pool: "AsyncConnectionPool") -> None:
        await pool._add_connection(self.attempt, growing=self.growing)


class ReturnConnection(MaintenanceTask):
    """Clean up and return a connection to the pool."""

    def __init__(self, pool: "AsyncConnectionPool", conn: "AsyncConnection[Any]"):
        super().__init__(pool)
        self.conn = conn

    async def _run(self, pool: "AsyncConnectionPool") -> None:
        await pool._return_connection(self.conn)


class ShrinkPool(MaintenanceTask):
    """If the pool can shrink, remove one connection.

    Re-schedule periodically and also reset the minimum number of connections
    in the pool.
    """

    async def _run(self, pool: "AsyncConnectionPool") -> None:
        # Reschedule the task now so that in case of any error we don't lose
        # the periodic run.
        await pool.schedule_task(self, pool.max_idle)
        await pool._shrink_pool()


class Schedule(MaintenanceTask):
    """Schedule a task in the pool scheduler.

    This task is a trampoline to allow to use a sync call (pool.run_task)
    to execute an async one (pool.schedule_task).
    """

    def __init__(
        self,
        pool: "AsyncConnectionPool",
        task: MaintenanceTask,
        delay: float,
    ):
        super().__init__(pool)
        self.task = task
        self.delay = delay

    async def _run(self, pool: "AsyncConnectionPool") -> None:
        await pool.schedule_task(self.task, self.delay)