diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:35:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 16:35:31 +0000 |
commit | 4f1a3b5f9ad05aa7b08715d48909a2b06ee2fcb1 (patch) | |
tree | e5dee7be2f0d963da4faad6517278d03783e3adc /src/prompt_toolkit/eventloop | |
parent | Initial commit. (diff) | |
download | prompt-toolkit-4f1a3b5f9ad05aa7b08715d48909a2b06ee2fcb1.tar.xz prompt-toolkit-4f1a3b5f9ad05aa7b08715d48909a2b06ee2fcb1.zip |
Adding upstream version 3.0.43.upstream/3.0.43
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/prompt_toolkit/eventloop')
-rw-r--r-- | src/prompt_toolkit/eventloop/__init__.py | 31 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/async_generator.py | 124 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/inputhook.py | 190 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/utils.py | 101 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/win32.py | 72 |
5 files changed, 518 insertions, 0 deletions
diff --git a/src/prompt_toolkit/eventloop/__init__.py b/src/prompt_toolkit/eventloop/__init__.py new file mode 100644 index 0000000..5df623b --- /dev/null +++ b/src/prompt_toolkit/eventloop/__init__.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from .async_generator import aclosing, generator_to_async_generator +from .inputhook import ( + InputHook, + InputHookContext, + InputHookSelector, + new_eventloop_with_inputhook, + set_eventloop_with_inputhook, +) +from .utils import ( + call_soon_threadsafe, + get_traceback_from_context, + run_in_executor_with_context, +) + +__all__ = [ + # Async generator + "generator_to_async_generator", + "aclosing", + # Utils. + "run_in_executor_with_context", + "call_soon_threadsafe", + "get_traceback_from_context", + # Inputhooks. + "InputHook", + "new_eventloop_with_inputhook", + "set_eventloop_with_inputhook", + "InputHookSelector", + "InputHookContext", +] diff --git a/src/prompt_toolkit/eventloop/async_generator.py b/src/prompt_toolkit/eventloop/async_generator.py new file mode 100644 index 0000000..5aee50a --- /dev/null +++ b/src/prompt_toolkit/eventloop/async_generator.py @@ -0,0 +1,124 @@ +""" +Implementation for async generators. +""" +from __future__ import annotations + +from asyncio import get_running_loop +from contextlib import asynccontextmanager +from queue import Empty, Full, Queue +from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar + +from .utils import run_in_executor_with_context + +__all__ = [ + "aclosing", + "generator_to_async_generator", +] + +_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None]) + + +@asynccontextmanager +async def aclosing( + thing: _T_Generator, +) -> AsyncGenerator[_T_Generator, None]: + "Similar to `contextlib.aclosing`, in Python 3.10." + try: + yield thing + finally: + await thing.aclose() + + +# By default, choose a buffer size that's a good balance between having enough +# throughput, but not consuming too much memory. We use this to consume a sync +# generator of completions as an async generator. If the queue size is very +# small (like 1), consuming the completions goes really slow (when there are a +# lot of items). If the queue size would be unlimited or too big, this can +# cause overconsumption of memory, and cause CPU time spent producing items +# that are no longer needed (if the consumption of the async generator stops at +# some point). We need a fixed size in order to get some back pressure from the +# async consumer to the sync producer. We choose 1000 by default here. If we +# have around 50k completions, measurements show that 1000 is still +# significantly faster than a buffer of 100. +DEFAULT_BUFFER_SIZE: int = 1000 + +_T = TypeVar("_T") + + +class _Done: + pass + + +async def generator_to_async_generator( + get_iterable: Callable[[], Iterable[_T]], + buffer_size: int = DEFAULT_BUFFER_SIZE, +) -> AsyncGenerator[_T, None]: + """ + Turn a generator or iterable into an async generator. + + This works by running the generator in a background thread. + + :param get_iterable: Function that returns a generator or iterable when + called. + :param buffer_size: Size of the queue between the async consumer and the + synchronous generator that produces items. + """ + quitting = False + # NOTE: We are limiting the queue size in order to have back-pressure. + q: Queue[_T | _Done] = Queue(maxsize=buffer_size) + loop = get_running_loop() + + def runner() -> None: + """ + Consume the generator in background thread. + When items are received, they'll be pushed to the queue. + """ + try: + for item in get_iterable(): + # When this async generator was cancelled (closed), stop this + # thread. + if quitting: + return + + while True: + try: + q.put(item, timeout=1) + except Full: + if quitting: + return + continue + else: + break + + finally: + while True: + try: + q.put(_Done(), timeout=1) + except Full: + if quitting: + return + continue + else: + break + + # Start background thread. + runner_f = run_in_executor_with_context(runner) + + try: + while True: + try: + item = q.get_nowait() + except Empty: + item = await loop.run_in_executor(None, q.get) + if isinstance(item, _Done): + break + else: + yield item + finally: + # When this async generator is closed (GeneratorExit exception, stop + # the background thread as well. - we don't need that anymore.) + quitting = True + + # Wait for the background thread to finish. (should happen right after + # the last item is yielded). + await runner_f diff --git a/src/prompt_toolkit/eventloop/inputhook.py b/src/prompt_toolkit/eventloop/inputhook.py new file mode 100644 index 0000000..a4c0eee --- /dev/null +++ b/src/prompt_toolkit/eventloop/inputhook.py @@ -0,0 +1,190 @@ +""" +Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in +the asyncio event loop. + +The way this works is by using a custom 'selector' that runs the other event +loop until the real selector is ready. + +It's the responsibility of this event hook to return when there is input ready. +There are two ways to detect when input is ready: + +The inputhook itself is a callable that receives an `InputHookContext`. This +callable should run the other event loop, and return when the main loop has +stuff to do. There are two ways to detect when to return: + +- Call the `input_is_ready` method periodically. Quit when this returns `True`. + +- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor + becomes readable. (But don't read from it.) + + Note that this is not the same as checking for `sys.stdin.fileno()`. The + eventloop of prompt-toolkit allows thread-based executors, for example for + asynchronous autocompletion. When the completion for instance is ready, we + also want prompt-toolkit to gain control again in order to display that. +""" +from __future__ import annotations + +import asyncio +import os +import select +import selectors +import sys +import threading +from asyncio import AbstractEventLoop, get_running_loop +from selectors import BaseSelector, SelectorKey +from typing import TYPE_CHECKING, Any, Callable, Mapping + +__all__ = [ + "new_eventloop_with_inputhook", + "set_eventloop_with_inputhook", + "InputHookSelector", + "InputHookContext", + "InputHook", +] + +if TYPE_CHECKING: + from _typeshed import FileDescriptorLike + from typing_extensions import TypeAlias + + _EventMask = int + + +class InputHookContext: + """ + Given as a parameter to the inputhook. + """ + + def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: + self._fileno = fileno + self.input_is_ready = input_is_ready + + def fileno(self) -> int: + return self._fileno + + +InputHook: TypeAlias = Callable[[InputHookContext], None] + + +def new_eventloop_with_inputhook( + inputhook: Callable[[InputHookContext], None], +) -> AbstractEventLoop: + """ + Create a new event loop with the given inputhook. + """ + selector = InputHookSelector(selectors.DefaultSelector(), inputhook) + loop = asyncio.SelectorEventLoop(selector) + return loop + + +def set_eventloop_with_inputhook( + inputhook: Callable[[InputHookContext], None], +) -> AbstractEventLoop: + """ + Create a new event loop with the given inputhook, and activate it. + """ + # Deprecated! + + loop = new_eventloop_with_inputhook(inputhook) + asyncio.set_event_loop(loop) + return loop + + +class InputHookSelector(BaseSelector): + """ + Usage: + + selector = selectors.SelectSelector() + loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) + asyncio.set_event_loop(loop) + """ + + def __init__( + self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None] + ) -> None: + self.selector = selector + self.inputhook = inputhook + self._r, self._w = os.pipe() + + def register( + self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None + ) -> SelectorKey: + return self.selector.register(fileobj, events, data=data) + + def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: + return self.selector.unregister(fileobj) + + def modify( + self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None + ) -> SelectorKey: + return self.selector.modify(fileobj, events, data=None) + + def select( + self, timeout: float | None = None + ) -> list[tuple[SelectorKey, _EventMask]]: + # If there are tasks in the current event loop, + # don't run the input hook. + if len(getattr(get_running_loop(), "_ready", [])) > 0: + return self.selector.select(timeout=timeout) + + ready = False + result = None + + # Run selector in other thread. + def run_selector() -> None: + nonlocal ready, result + result = self.selector.select(timeout=timeout) + os.write(self._w, b"x") + ready = True + + th = threading.Thread(target=run_selector) + th.start() + + def input_is_ready() -> bool: + return ready + + # Call inputhook. + # The inputhook function is supposed to return when our selector + # becomes ready. The inputhook can do that by registering the fd in its + # own loop, or by checking the `input_is_ready` function regularly. + self.inputhook(InputHookContext(self._r, input_is_ready)) + + # Flush the read end of the pipe. + try: + # Before calling 'os.read', call select.select. This is required + # when the gevent monkey patch has been applied. 'os.read' is never + # monkey patched and won't be cooperative, so that would block all + # other select() calls otherwise. + # See: http://www.gevent.org/gevent.os.html + + # Note: On Windows, this is apparently not an issue. + # However, if we would ever want to add a select call, it + # should use `windll.kernel32.WaitForMultipleObjects`, + # because `select.select` can't wait for a pipe on Windows. + if sys.platform != "win32": + select.select([self._r], [], [], None) + + os.read(self._r, 1024) + except OSError: + # This happens when the window resizes and a SIGWINCH was received. + # We get 'Error: [Errno 4] Interrupted system call' + # Just ignore. + pass + + # Wait for the real selector to be done. + th.join() + assert result is not None + return result + + def close(self) -> None: + """ + Clean up resources. + """ + if self._r: + os.close(self._r) + os.close(self._w) + + self._r = self._w = -1 + self.selector.close() + + def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: + return self.selector.get_map() diff --git a/src/prompt_toolkit/eventloop/utils.py b/src/prompt_toolkit/eventloop/utils.py new file mode 100644 index 0000000..3138361 --- /dev/null +++ b/src/prompt_toolkit/eventloop/utils.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import asyncio +import contextvars +import sys +import time +from asyncio import get_running_loop +from types import TracebackType +from typing import Any, Awaitable, Callable, TypeVar, cast + +__all__ = [ + "run_in_executor_with_context", + "call_soon_threadsafe", + "get_traceback_from_context", +] + +_T = TypeVar("_T") + + +def run_in_executor_with_context( + func: Callable[..., _T], + *args: Any, + loop: asyncio.AbstractEventLoop | None = None, +) -> Awaitable[_T]: + """ + Run a function in an executor, but make sure it uses the same contextvars. + This is required so that the function will see the right application. + + See also: https://bugs.python.org/issue34014 + """ + loop = loop or get_running_loop() + ctx: contextvars.Context = contextvars.copy_context() + + return loop.run_in_executor(None, ctx.run, func, *args) + + +def call_soon_threadsafe( + func: Callable[[], None], + max_postpone_time: float | None = None, + loop: asyncio.AbstractEventLoop | None = None, +) -> None: + """ + Wrapper around asyncio's `call_soon_threadsafe`. + + This takes a `max_postpone_time` which can be used to tune the urgency of + the method. + + Asyncio runs tasks in first-in-first-out. However, this is not what we + want for the render function of the prompt_toolkit UI. Rendering is + expensive, but since the UI is invalidated very often, in some situations + we render the UI too often, so much that the rendering CPU usage slows down + the rest of the processing of the application. (Pymux is an example where + we have to balance the CPU time spend on rendering the UI, and parsing + process output.) + However, we want to set a deadline value, for when the rendering should + happen. (The UI should stay responsive). + """ + loop2 = loop or get_running_loop() + + # If no `max_postpone_time` has been given, schedule right now. + if max_postpone_time is None: + loop2.call_soon_threadsafe(func) + return + + max_postpone_until = time.time() + max_postpone_time + + def schedule() -> None: + # When there are no other tasks scheduled in the event loop. Run it + # now. + # Notice: uvloop doesn't have this _ready attribute. In that case, + # always call immediately. + if not getattr(loop2, "_ready", []): + func() + return + + # If the timeout expired, run this now. + if time.time() > max_postpone_until: + func() + return + + # Schedule again for later. + loop2.call_soon_threadsafe(schedule) + + loop2.call_soon_threadsafe(schedule) + + +def get_traceback_from_context(context: dict[str, Any]) -> TracebackType | None: + """ + Get the traceback object from the context. + """ + exception = context.get("exception") + if exception: + if hasattr(exception, "__traceback__"): + return cast(TracebackType, exception.__traceback__) + else: + # call_exception_handler() is usually called indirectly + # from an except block. If it's not the case, the traceback + # is undefined... + return sys.exc_info()[2] + + return None diff --git a/src/prompt_toolkit/eventloop/win32.py b/src/prompt_toolkit/eventloop/win32.py new file mode 100644 index 0000000..56a0c7d --- /dev/null +++ b/src/prompt_toolkit/eventloop/win32.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import sys + +assert sys.platform == "win32" + +from ctypes import pointer + +from ..utils import SPHINX_AUTODOC_RUNNING + +# Do not import win32-specific stuff when generating documentation. +# Otherwise RTD would be unable to generate docs for this module. +if not SPHINX_AUTODOC_RUNNING: + from ctypes import windll + +from ctypes.wintypes import BOOL, DWORD, HANDLE + +from prompt_toolkit.win32_types import SECURITY_ATTRIBUTES + +__all__ = ["wait_for_handles", "create_win32_event"] + + +WAIT_TIMEOUT = 0x00000102 +INFINITE = -1 + + +def wait_for_handles(handles: list[HANDLE], timeout: int = INFINITE) -> HANDLE | None: + """ + Waits for multiple handles. (Similar to 'select') Returns the handle which is ready. + Returns `None` on timeout. + http://msdn.microsoft.com/en-us/library/windows/desktop/ms687025(v=vs.85).aspx + + Note that handles should be a list of `HANDLE` objects, not integers. See + this comment in the patch by @quark-zju for the reason why: + + ''' Make sure HANDLE on Windows has a correct size + + Previously, the type of various HANDLEs are native Python integer + types. The ctypes library will treat them as 4-byte integer when used + in function arguments. On 64-bit Windows, HANDLE is 8-byte and usually + a small integer. Depending on whether the extra 4 bytes are zero-ed out + or not, things can happen to work, or break. ''' + + This function returns either `None` or one of the given `HANDLE` objects. + (The return value can be tested with the `is` operator.) + """ + arrtype = HANDLE * len(handles) + handle_array = arrtype(*handles) + + ret: int = windll.kernel32.WaitForMultipleObjects( + len(handle_array), handle_array, BOOL(False), DWORD(timeout) + ) + + if ret == WAIT_TIMEOUT: + return None + else: + return handles[ret] + + +def create_win32_event() -> HANDLE: + """ + Creates a Win32 unnamed Event . + http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx + """ + return HANDLE( + windll.kernel32.CreateEventA( + pointer(SECURITY_ATTRIBUTES()), + BOOL(True), # Manual reset event. + BOOL(False), # Initial state. + None, # Unnamed event object. + ) + ) |