summaryrefslogtreecommitdiffstats
path: root/src/prompt_toolkit/eventloop/inputhook.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/prompt_toolkit/eventloop/inputhook.py')
-rw-r--r--src/prompt_toolkit/eventloop/inputhook.py190
1 files changed, 190 insertions, 0 deletions
diff --git a/src/prompt_toolkit/eventloop/inputhook.py b/src/prompt_toolkit/eventloop/inputhook.py
new file mode 100644
index 0000000..a4c0eee
--- /dev/null
+++ b/src/prompt_toolkit/eventloop/inputhook.py
@@ -0,0 +1,190 @@
+"""
+Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in
+the asyncio event loop.
+
+The way this works is by using a custom 'selector' that runs the other event
+loop until the real selector is ready.
+
+It's the responsibility of this event hook to return when there is input ready.
+There are two ways to detect when input is ready:
+
+The inputhook itself is a callable that receives an `InputHookContext`. This
+callable should run the other event loop, and return when the main loop has
+stuff to do. There are two ways to detect when to return:
+
+- Call the `input_is_ready` method periodically. Quit when this returns `True`.
+
+- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor
+ becomes readable. (But don't read from it.)
+
+ Note that this is not the same as checking for `sys.stdin.fileno()`. The
+ eventloop of prompt-toolkit allows thread-based executors, for example for
+ asynchronous autocompletion. When the completion for instance is ready, we
+ also want prompt-toolkit to gain control again in order to display that.
+"""
+from __future__ import annotations
+
+import asyncio
+import os
+import select
+import selectors
+import sys
+import threading
+from asyncio import AbstractEventLoop, get_running_loop
+from selectors import BaseSelector, SelectorKey
+from typing import TYPE_CHECKING, Any, Callable, Mapping
+
+__all__ = [
+ "new_eventloop_with_inputhook",
+ "set_eventloop_with_inputhook",
+ "InputHookSelector",
+ "InputHookContext",
+ "InputHook",
+]
+
+if TYPE_CHECKING:
+ from _typeshed import FileDescriptorLike
+ from typing_extensions import TypeAlias
+
+ _EventMask = int
+
+
+class InputHookContext:
+ """
+ Given as a parameter to the inputhook.
+ """
+
+ def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None:
+ self._fileno = fileno
+ self.input_is_ready = input_is_ready
+
+ def fileno(self) -> int:
+ return self._fileno
+
+
+InputHook: TypeAlias = Callable[[InputHookContext], None]
+
+
+def new_eventloop_with_inputhook(
+ inputhook: Callable[[InputHookContext], None],
+) -> AbstractEventLoop:
+ """
+ Create a new event loop with the given inputhook.
+ """
+ selector = InputHookSelector(selectors.DefaultSelector(), inputhook)
+ loop = asyncio.SelectorEventLoop(selector)
+ return loop
+
+
+def set_eventloop_with_inputhook(
+ inputhook: Callable[[InputHookContext], None],
+) -> AbstractEventLoop:
+ """
+ Create a new event loop with the given inputhook, and activate it.
+ """
+ # Deprecated!
+
+ loop = new_eventloop_with_inputhook(inputhook)
+ asyncio.set_event_loop(loop)
+ return loop
+
+
+class InputHookSelector(BaseSelector):
+ """
+ Usage:
+
+ selector = selectors.SelectSelector()
+ loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook))
+ asyncio.set_event_loop(loop)
+ """
+
+ def __init__(
+ self, selector: BaseSelector, inputhook: Callable[[InputHookContext], None]
+ ) -> None:
+ self.selector = selector
+ self.inputhook = inputhook
+ self._r, self._w = os.pipe()
+
+ def register(
+ self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
+ ) -> SelectorKey:
+ return self.selector.register(fileobj, events, data=data)
+
+ def unregister(self, fileobj: FileDescriptorLike) -> SelectorKey:
+ return self.selector.unregister(fileobj)
+
+ def modify(
+ self, fileobj: FileDescriptorLike, events: _EventMask, data: Any = None
+ ) -> SelectorKey:
+ return self.selector.modify(fileobj, events, data=None)
+
+ def select(
+ self, timeout: float | None = None
+ ) -> list[tuple[SelectorKey, _EventMask]]:
+ # If there are tasks in the current event loop,
+ # don't run the input hook.
+ if len(getattr(get_running_loop(), "_ready", [])) > 0:
+ return self.selector.select(timeout=timeout)
+
+ ready = False
+ result = None
+
+ # Run selector in other thread.
+ def run_selector() -> None:
+ nonlocal ready, result
+ result = self.selector.select(timeout=timeout)
+ os.write(self._w, b"x")
+ ready = True
+
+ th = threading.Thread(target=run_selector)
+ th.start()
+
+ def input_is_ready() -> bool:
+ return ready
+
+ # Call inputhook.
+ # The inputhook function is supposed to return when our selector
+ # becomes ready. The inputhook can do that by registering the fd in its
+ # own loop, or by checking the `input_is_ready` function regularly.
+ self.inputhook(InputHookContext(self._r, input_is_ready))
+
+ # Flush the read end of the pipe.
+ try:
+ # Before calling 'os.read', call select.select. This is required
+ # when the gevent monkey patch has been applied. 'os.read' is never
+ # monkey patched and won't be cooperative, so that would block all
+ # other select() calls otherwise.
+ # See: http://www.gevent.org/gevent.os.html
+
+ # Note: On Windows, this is apparently not an issue.
+ # However, if we would ever want to add a select call, it
+ # should use `windll.kernel32.WaitForMultipleObjects`,
+ # because `select.select` can't wait for a pipe on Windows.
+ if sys.platform != "win32":
+ select.select([self._r], [], [], None)
+
+ os.read(self._r, 1024)
+ except OSError:
+ # This happens when the window resizes and a SIGWINCH was received.
+ # We get 'Error: [Errno 4] Interrupted system call'
+ # Just ignore.
+ pass
+
+ # Wait for the real selector to be done.
+ th.join()
+ assert result is not None
+ return result
+
+ def close(self) -> None:
+ """
+ Clean up resources.
+ """
+ if self._r:
+ os.close(self._r)
+ os.close(self._w)
+
+ self._r = self._w = -1
+ self.selector.close()
+
+ def get_map(self) -> Mapping[FileDescriptorLike, SelectorKey]:
+ return self.selector.get_map()