From 4f1a3b5f9ad05aa7b08715d48909a2b06ee2fcb1 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 18:35:31 +0200 Subject: Adding upstream version 3.0.43. Signed-off-by: Daniel Baumann --- src/prompt_toolkit/eventloop/async_generator.py | 124 ++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 src/prompt_toolkit/eventloop/async_generator.py (limited to 'src/prompt_toolkit/eventloop/async_generator.py') diff --git a/src/prompt_toolkit/eventloop/async_generator.py b/src/prompt_toolkit/eventloop/async_generator.py new file mode 100644 index 0000000..5aee50a --- /dev/null +++ b/src/prompt_toolkit/eventloop/async_generator.py @@ -0,0 +1,124 @@ +""" +Implementation for async generators. +""" +from __future__ import annotations + +from asyncio import get_running_loop +from contextlib import asynccontextmanager +from queue import Empty, Full, Queue +from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar + +from .utils import run_in_executor_with_context + +__all__ = [ + "aclosing", + "generator_to_async_generator", +] + +_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None]) + + +@asynccontextmanager +async def aclosing( + thing: _T_Generator, +) -> AsyncGenerator[_T_Generator, None]: + "Similar to `contextlib.aclosing`, in Python 3.10." + try: + yield thing + finally: + await thing.aclose() + + +# By default, choose a buffer size that's a good balance between having enough +# throughput, but not consuming too much memory. We use this to consume a sync +# generator of completions as an async generator. If the queue size is very +# small (like 1), consuming the completions goes really slow (when there are a +# lot of items). If the queue size would be unlimited or too big, this can +# cause overconsumption of memory, and cause CPU time spent producing items +# that are no longer needed (if the consumption of the async generator stops at +# some point). We need a fixed size in order to get some back pressure from the +# async consumer to the sync producer. We choose 1000 by default here. If we +# have around 50k completions, measurements show that 1000 is still +# significantly faster than a buffer of 100. +DEFAULT_BUFFER_SIZE: int = 1000 + +_T = TypeVar("_T") + + +class _Done: + pass + + +async def generator_to_async_generator( + get_iterable: Callable[[], Iterable[_T]], + buffer_size: int = DEFAULT_BUFFER_SIZE, +) -> AsyncGenerator[_T, None]: + """ + Turn a generator or iterable into an async generator. + + This works by running the generator in a background thread. + + :param get_iterable: Function that returns a generator or iterable when + called. + :param buffer_size: Size of the queue between the async consumer and the + synchronous generator that produces items. + """ + quitting = False + # NOTE: We are limiting the queue size in order to have back-pressure. + q: Queue[_T | _Done] = Queue(maxsize=buffer_size) + loop = get_running_loop() + + def runner() -> None: + """ + Consume the generator in background thread. + When items are received, they'll be pushed to the queue. + """ + try: + for item in get_iterable(): + # When this async generator was cancelled (closed), stop this + # thread. + if quitting: + return + + while True: + try: + q.put(item, timeout=1) + except Full: + if quitting: + return + continue + else: + break + + finally: + while True: + try: + q.put(_Done(), timeout=1) + except Full: + if quitting: + return + continue + else: + break + + # Start background thread. + runner_f = run_in_executor_with_context(runner) + + try: + while True: + try: + item = q.get_nowait() + except Empty: + item = await loop.run_in_executor(None, q.get) + if isinstance(item, _Done): + break + else: + yield item + finally: + # When this async generator is closed (GeneratorExit exception, stop + # the background thread as well. - we don't need that anymore.) + quitting = True + + # Wait for the background thread to finish. (should happen right after + # the last item is yielded). + await runner_f -- cgit v1.2.3