diff options
Diffstat (limited to 'src/prompt_toolkit/eventloop/inputhook.py')
-rw-r--r-- | src/prompt_toolkit/eventloop/inputhook.py | 183 |
1 files changed, 183 insertions, 0 deletions
diff --git a/src/prompt_toolkit/eventloop/inputhook.py b/src/prompt_toolkit/eventloop/inputhook.py new file mode 100644 index 0000000..05d2981 --- /dev/null +++ b/src/prompt_toolkit/eventloop/inputhook.py @@ -0,0 +1,183 @@ +""" +Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in +the asyncio event loop. + +The way this works is by using a custom 'selector' that runs the other event +loop until the real selector is ready. + +It's the responsibility of this event hook to return when there is input ready. +There are two ways to detect when input is ready: + +The inputhook itself is a callable that receives an `InputHookContext`. This +callable should run the other event loop, and return when the main loop has +stuff to do. There are two ways to detect when to return: + +- Call the `input_is_ready` method periodically. Quit when this returns `True`. + +- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor + becomes readable. (But don't read from it.) + + Note that this is not the same as checking for `sys.stdin.fileno()`. The + eventloop of prompt-toolkit allows thread-based executors, for example for + asynchronous autocompletion. When the completion for instance is ready, we + also want prompt-toolkit to gain control again in order to display that. +""" +import asyncio +import os +import select +import selectors +import sys +import threading +from asyncio import AbstractEventLoop +from selectors import BaseSelector, SelectorKey +from typing import TYPE_CHECKING, Any, Callable, List, Mapping, Optional, Tuple + +from .utils import get_event_loop + +__all__ = [ + "new_eventloop_with_inputhook", + "set_eventloop_with_inputhook", + "InputHookSelector", + "InputHookContext", +] + +if TYPE_CHECKING: + from _typeshed import FileDescriptorLike + + _EventMask = int + + +def new_eventloop_with_inputhook( + inputhook: Callable[["InputHookContext"], None] +) -> AbstractEventLoop: + """ + Create a new event loop with the given inputhook. + """ + selector = InputHookSelector(selectors.DefaultSelector(), inputhook) + loop = asyncio.SelectorEventLoop(selector) + return loop + + +def set_eventloop_with_inputhook( + inputhook: Callable[["InputHookContext"], None] +) -> AbstractEventLoop: + """ + Create a new event loop with the given inputhook, and activate it. + """ + loop = new_eventloop_with_inputhook(inputhook) + asyncio.set_event_loop(loop) + return loop + + +class InputHookSelector(BaseSelector): + """ + Usage: + + selector = selectors.SelectSelector() + loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) + asyncio.set_event_loop(loop) + """ + + def __init__( + self, selector: BaseSelector, inputhook: Callable[["InputHookContext"], None] + ) -> None: + self.selector = selector + self.inputhook = inputhook + self._r, self._w = os.pipe() + + def register( + self, fileobj: "FileDescriptorLike", events: "_EventMask", data: Any = None + ) -> "SelectorKey": + return self.selector.register(fileobj, events, data=data) + + def unregister(self, fileobj: "FileDescriptorLike") -> "SelectorKey": + return self.selector.unregister(fileobj) + + def modify( + self, fileobj: "FileDescriptorLike", events: "_EventMask", data: Any = None + ) -> "SelectorKey": + return self.selector.modify(fileobj, events, data=None) + + def select( + self, timeout: Optional[float] = None + ) -> List[Tuple["SelectorKey", "_EventMask"]]: + # If there are tasks in the current event loop, + # don't run the input hook. + if len(getattr(get_event_loop(), "_ready", [])) > 0: + return self.selector.select(timeout=timeout) + + ready = False + result = None + + # Run selector in other thread. + def run_selector() -> None: + nonlocal ready, result + result = self.selector.select(timeout=timeout) + os.write(self._w, b"x") + ready = True + + th = threading.Thread(target=run_selector) + th.start() + + def input_is_ready() -> bool: + return ready + + # Call inputhook. + # The inputhook function is supposed to return when our selector + # becomes ready. The inputhook can do that by registering the fd in its + # own loop, or by checking the `input_is_ready` function regularly. + self.inputhook(InputHookContext(self._r, input_is_ready)) + + # Flush the read end of the pipe. + try: + # Before calling 'os.read', call select.select. This is required + # when the gevent monkey patch has been applied. 'os.read' is never + # monkey patched and won't be cooperative, so that would block all + # other select() calls otherwise. + # See: http://www.gevent.org/gevent.os.html + + # Note: On Windows, this is apparently not an issue. + # However, if we would ever want to add a select call, it + # should use `windll.kernel32.WaitForMultipleObjects`, + # because `select.select` can't wait for a pipe on Windows. + if sys.platform != "win32": + select.select([self._r], [], [], None) + + os.read(self._r, 1024) + except OSError: + # This happens when the window resizes and a SIGWINCH was received. + # We get 'Error: [Errno 4] Interrupted system call' + # Just ignore. + pass + + # Wait for the real selector to be done. + th.join() + assert result is not None + return result + + def close(self) -> None: + """ + Clean up resources. + """ + if self._r: + os.close(self._r) + os.close(self._w) + + self._r = self._w = -1 + self.selector.close() + + def get_map(self) -> Mapping["FileDescriptorLike", "SelectorKey"]: + return self.selector.get_map() + + +class InputHookContext: + """ + Given as a parameter to the inputhook. + """ + + def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: + self._fileno = fileno + self.input_is_ready = input_is_ready + + def fileno(self) -> int: + return self._fileno |