summaryrefslogtreecommitdiffstats
path: root/src/prompt_toolkit/eventloop/async_generator.py
blob: 5aee50a4093ad16d3141f2071350b8bfbc5b252a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
Implementation for async generators.
"""
from __future__ import annotations

from asyncio import get_running_loop
from contextlib import asynccontextmanager
from queue import Empty, Full, Queue
from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar

from .utils import run_in_executor_with_context

__all__ = [
    "aclosing",
    "generator_to_async_generator",
]

_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None])


@asynccontextmanager
async def aclosing(
    thing: _T_Generator,
) -> AsyncGenerator[_T_Generator, None]:
    "Similar to `contextlib.aclosing`, in Python 3.10."
    try:
        yield thing
    finally:
        await thing.aclose()


# By default, choose a buffer size that's a good balance between having enough
# throughput, but not consuming too much memory. We use this to consume a sync
# generator of completions as an async generator. If the queue size is very
# small (like 1), consuming the completions goes really slow (when there are a
# lot of items). If the queue size would be unlimited or too big, this can
# cause overconsumption of memory, and cause CPU time spent producing items
# that are no longer needed (if the consumption of the async generator stops at
# some point). We need a fixed size in order to get some back pressure from the
# async consumer to the sync producer. We choose 1000 by default here. If we
# have around 50k completions, measurements show that 1000 is still
# significantly faster than a buffer of 100.
DEFAULT_BUFFER_SIZE: int = 1000

_T = TypeVar("_T")


class _Done:
    pass


async def generator_to_async_generator(
    get_iterable: Callable[[], Iterable[_T]],
    buffer_size: int = DEFAULT_BUFFER_SIZE,
) -> AsyncGenerator[_T, None]:
    """
    Turn a generator or iterable into an async generator.

    This works by running the generator in a background thread.

    :param get_iterable: Function that returns a generator or iterable when
        called.
    :param buffer_size: Size of the queue between the async consumer and the
        synchronous generator that produces items.
    """
    quitting = False
    # NOTE: We are limiting the queue size in order to have back-pressure.
    q: Queue[_T | _Done] = Queue(maxsize=buffer_size)
    loop = get_running_loop()

    def runner() -> None:
        """
        Consume the generator in background thread.
        When items are received, they'll be pushed to the queue.
        """
        try:
            for item in get_iterable():
                # When this async generator was cancelled (closed), stop this
                # thread.
                if quitting:
                    return

                while True:
                    try:
                        q.put(item, timeout=1)
                    except Full:
                        if quitting:
                            return
                        continue
                    else:
                        break

        finally:
            while True:
                try:
                    q.put(_Done(), timeout=1)
                except Full:
                    if quitting:
                        return
                    continue
                else:
                    break

    # Start background thread.
    runner_f = run_in_executor_with_context(runner)

    try:
        while True:
            try:
                item = q.get_nowait()
            except Empty:
                item = await loop.run_in_executor(None, q.get)
            if isinstance(item, _Done):
                break
            else:
                yield item
    finally:
        # When this async generator is closed (GeneratorExit exception, stop
        # the background thread as well. - we don't need that anymore.)
        quitting = True

        # Wait for the background thread to finish. (should happen right after
        # the last item is yielded).
        await runner_f