From 4f1a3b5f9ad05aa7b08715d48909a2b06ee2fcb1 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 18:35:31 +0200 Subject: Adding upstream version 3.0.43. Signed-off-by: Daniel Baumann --- src/prompt_toolkit/eventloop/inputhook.py | 190 ++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 src/prompt_toolkit/eventloop/inputhook.py (limited to 'src/prompt_toolkit/eventloop/inputhook.py') diff --git a/src/prompt_toolkit/eventloop/inputhook.py b/src/prompt_toolkit/eventloop/inputhook.py new file mode 100644 index 0000000..a4c0eee --- /dev/null +++ b/src/prompt_toolkit/eventloop/inputhook.py @@ -0,0 +1,190 @@ +""" +Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in +the asyncio event loop. + +The way this works is by using a custom 'selector' that runs the other event +loop until the real selector is ready. + +It's the responsibility of this event hook to return when there is input ready. +There are two ways to detect when input is ready: + +The inputhook itself is a callable that receives an `InputHookContext`. This +callable should run the other event loop, and return when the main loop has +stuff to do. There are two ways to detect when to return: + +- Call the `input_is_ready` method periodically. Quit when this returns `True`. + +- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor + becomes readable. (But don't read from it.) + + Note that this is not the same as checking for `sys.stdin.fileno()`. The + eventloop of prompt-toolkit allows thread-based executors, for example for + asynchronous autocompletion. When the completion for instance is ready, we + also want prompt-toolkit to gain control again in order to display that. +""" +from __future__ import annotations + +import asyncio +import os +import select +import selectors +import sys +import threading +from asyncio import AbstractEventLoop, get_running_loop +from selectors import BaseSelector, SelectorKey +from typing import TYPE_CHECKING, Any, Callable, Mapping + +__all__ = [ + "new_eventloop_with_inputhook", + "set_eventloop_with_inputhook", + "InputHookSelector", + "InputHookContext", + "InputHook", +] + +if TYPE_CHECKING: + from _typeshed import FileDescriptorLike + from typing_extensions import TypeAlias + + _EventMask = int + + +class InputHookContext: + """ + Given as a parameter to the inputhook. + """ + + def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: + self._fileno = fileno + self.input_is_ready = input_is_ready + + def fileno(self) -> int: + return self._fileno + + +InputHook: TypeAlias = Callable[[InputHookContext], None] + + +def new_eventloop_with_inputhook( + inputhook: Callable[[InputHookContext], None], +) -> AbstractEventLoop: + """ + Create a new event loop with the given inputhook. + """ + selector = InputHookSelector(selectors.DefaultSelector(), inputhook) + loop = asyncio.SelectorEventLoop(selector) + return loop + + +def set_eventloop_with_inputhook( + inputhook: Callable[[InputHookContext], None], +) -> AbstractEventLoop: + """ + Create a new event loop with the given inputhook, and activate it. + """ + # Deprecated! + + loop = new_eventloop_with_inputhook(inputhook) + asyncio.set_event_loop(loop) + return loop + + +class InputHookSelector(BaseSelector): + """ + Usage: + + selector = selectors.SelectSelector() + loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) + asyncio.set_event_loop(loop) + """ + + def __init__( + self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None] + ) -> None: + self.selector = selector + self.inputhook = inputhook + self._r, self._w = os.pipe() + + def register( + self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None + ) -> SelectorKey: + return self.selector.register(fileobj, events, data=data) + + def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey: + return self.selector.unregister(fileobj) + + def modify( + self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None + ) -> SelectorKey: + return self.selector.modify(fileobj, events, data=None) + + def select( + self, timeout: float | None = None + ) -> list[tuple[SelectorKey, _EventMask]]: + # If there are tasks in the current event loop, + # don't run the input hook. + if len(getattr(get_running_loop(), "_ready", [])) > 0: + return self.selector.select(timeout=timeout) + + ready = False + result = None + + # Run selector in other thread. + def run_selector() -> None: + nonlocal ready, result + result = self.selector.select(timeout=timeout) + os.write(self._w, b"x") + ready = True + + th = threading.Thread(target=run_selector) + th.start() + + def input_is_ready() -> bool: + return ready + + # Call inputhook. + # The inputhook function is supposed to return when our selector + # becomes ready. The inputhook can do that by registering the fd in its + # own loop, or by checking the `input_is_ready` function regularly. + self.inputhook(InputHookContext(self._r, input_is_ready)) + + # Flush the read end of the pipe. + try: + # Before calling 'os.read', call select.select. This is required + # when the gevent monkey patch has been applied. 'os.read' is never + # monkey patched and won't be cooperative, so that would block all + # other select() calls otherwise. + # See: http://www.gevent.org/gevent.os.html + + # Note: On Windows, this is apparently not an issue. + # However, if we would ever want to add a select call, it + # should use `windll.kernel32.WaitForMultipleObjects`, + # because `select.select` can't wait for a pipe on Windows. + if sys.platform != "win32": + select.select([self._r], [], [], None) + + os.read(self._r, 1024) + except OSError: + # This happens when the window resizes and a SIGWINCH was received. + # We get 'Error: [Errno 4] Interrupted system call' + # Just ignore. + pass + + # Wait for the real selector to be done. + th.join() + assert result is not None + return result + + def close(self) -> None: + """ + Clean up resources. + """ + if self._r: + os.close(self._r) + os.close(self._w) + + self._r = self._w = -1 + self.selector.close() + + def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]: + return self.selector.get_map() -- cgit v1.2.3