diff options
Diffstat (limited to 'src/prompt_toolkit/eventloop')
-rw-r--r-- | src/prompt_toolkit/eventloop/__init__.py | 29 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/async_context_manager.py | 132 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/async_generator.py | 139 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/dummy_contextvars.py | 56 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/inputhook.py | 183 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/utils.py | 118 | ||||
-rw-r--r-- | src/prompt_toolkit/eventloop/win32.py | 73 |
7 files changed, 730 insertions, 0 deletions
diff --git a/src/prompt_toolkit/eventloop/__init__.py b/src/prompt_toolkit/eventloop/__init__.py new file mode 100644 index 0000000..e92e7a2 --- /dev/null +++ b/src/prompt_toolkit/eventloop/__init__.py @@ -0,0 +1,29 @@ +from .async_generator import aclosing, generator_to_async_generator +from .inputhook import ( + InputHookContext, + InputHookSelector, + new_eventloop_with_inputhook, + set_eventloop_with_inputhook, +) +from .utils import ( + call_soon_threadsafe, + get_event_loop, + get_traceback_from_context, + run_in_executor_with_context, +) + +__all__ = [ + # Async generator + "generator_to_async_generator", + "aclosing", + # Utils. + "run_in_executor_with_context", + "call_soon_threadsafe", + "get_traceback_from_context", + "get_event_loop", + # Inputhooks. + "new_eventloop_with_inputhook", + "set_eventloop_with_inputhook", + "InputHookSelector", + "InputHookContext", +] diff --git a/src/prompt_toolkit/eventloop/async_context_manager.py b/src/prompt_toolkit/eventloop/async_context_manager.py new file mode 100644 index 0000000..3914616 --- /dev/null +++ b/src/prompt_toolkit/eventloop/async_context_manager.py @@ -0,0 +1,132 @@ +""" +@asynccontextmanager code, copied from Python 3.7's contextlib. +For usage in Python 3.6. +Types have been added to this file, just enough to make Mypy happy. +""" +# mypy: allow-untyped-defs +import abc +from functools import wraps +from typing import AsyncContextManager, AsyncIterator, Callable, TypeVar + +import _collections_abc + +__all__ = ["asynccontextmanager"] + + +class AbstractAsyncContextManager(abc.ABC): + + """An abstract base class for asynchronous context managers.""" + + async def __aenter__(self): + """Return `self` upon entering the runtime context.""" + return self + + @abc.abstractmethod + async def __aexit__(self, exc_type, exc_value, traceback): + """Raise any exception triggered within the runtime context.""" + return None + + @classmethod + def __subclasshook__(cls, C): + if cls is AbstractAsyncContextManager: + return _collections_abc._check_methods(C, "__aenter__", "__aexit__") # type: ignore + return NotImplemented + + +class _GeneratorContextManagerBase: + """Shared functionality for @contextmanager and @asynccontextmanager.""" + + def __init__(self, func, args, kwds): + self.gen = func(*args, **kwds) + self.func, self.args, self.kwds = func, args, kwds + # Issue 19330: ensure context manager instances have good docstrings + doc = getattr(func, "__doc__", None) + if doc is None: + doc = type(self).__doc__ + self.__doc__ = doc + # Unfortunately, this still doesn't provide good help output when + # inspecting the created context manager instances, since pydoc + # currently bypasses the instance docstring and shows the docstring + # for the class instead. + # See http://bugs.python.org/issue19404 for more details. + + +class _AsyncGeneratorContextManager( + _GeneratorContextManagerBase, AbstractAsyncContextManager +): + """Helper for @asynccontextmanager.""" + + async def __aenter__(self): + try: + return await self.gen.__anext__() + except StopAsyncIteration: + raise RuntimeError("generator didn't yield") from None + + async def __aexit__(self, typ, value, traceback): + if typ is None: + try: + await self.gen.__anext__() + except StopAsyncIteration: + return + else: + raise RuntimeError("generator didn't stop") + else: + if value is None: + value = typ() + # See _GeneratorContextManager.__exit__ for comments on subtleties + # in this implementation + try: + await self.gen.athrow(typ, value, traceback) + raise RuntimeError("generator didn't stop after athrow()") + except StopAsyncIteration as exc: + return exc is not value + except RuntimeError as exc: + if exc is value: + return False + # Avoid suppressing if a StopIteration exception + # was passed to throw() and later wrapped into a RuntimeError + # (see PEP 479 for sync generators; async generators also + # have this behavior). But do this only if the exception wrapped + # by the RuntimeError is actully Stop(Async)Iteration (see + # issue29692). + if isinstance(value, (StopIteration, StopAsyncIteration)): + if exc.__cause__ is value: + return False + raise + except BaseException as exc: + if exc is not value: + raise + + +_T = TypeVar("_T") + + +def asynccontextmanager( + func: Callable[..., AsyncIterator[_T]] +) -> Callable[..., AsyncContextManager[_T]]: + """@asynccontextmanager decorator. + Typical usage: + @asynccontextmanager + async def some_async_generator(<arguments>): + <setup> + try: + yield <value> + finally: + <cleanup> + This makes this: + async with some_async_generator(<arguments>) as <variable>: + <body> + equivalent to this: + <setup> + try: + <variable> = <value> + <body> + finally: + <cleanup> + """ + + @wraps(func) + def helper(*args, **kwds): + return _AsyncGeneratorContextManager(func, args, kwds) # type: ignore + + return helper diff --git a/src/prompt_toolkit/eventloop/async_generator.py b/src/prompt_toolkit/eventloop/async_generator.py new file mode 100644 index 0000000..a4997b4 --- /dev/null +++ b/src/prompt_toolkit/eventloop/async_generator.py @@ -0,0 +1,139 @@ +""" +Implementation for async generators. +""" +from queue import Empty, Full, Queue +from threading import Event +from typing import ( + TYPE_CHECKING, + AsyncGenerator, + Awaitable, + Callable, + Iterable, + TypeVar, + Union, +) + +from .async_context_manager import asynccontextmanager +from .utils import get_event_loop, run_in_executor_with_context + +__all__ = [ + "aclosing", + "generator_to_async_generator", +] + + +if TYPE_CHECKING: + # Thanks: https://github.com/python/typeshed/blob/main/stdlib/contextlib.pyi + from typing_extensions import Protocol + + class _SupportsAclose(Protocol): + def aclose(self) -> Awaitable[object]: + ... + + _SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose) + + +@asynccontextmanager +async def aclosing( + thing: "_SupportsAcloseT", +) -> AsyncGenerator["_SupportsAcloseT", None]: + "Similar to `contextlib.aclosing`, in Python 3.10." + try: + yield thing + finally: + await thing.aclose() + + +# By default, choose a buffer size that's a good balance between having enough +# throughput, but not consuming too much memory. We use this to consume a sync +# generator of completions as an async generator. If the queue size is very +# small (like 1), consuming the completions goes really slow (when there are a +# lot of items). If the queue size would be unlimited or too big, this can +# cause overconsumption of memory, and cause CPU time spent producing items +# that are no longer needed (if the consumption of the async generator stops at +# some point). We need a fixed size in order to get some back pressure from the +# async consumer to the sync producer. We choose 1000 by default here. If we +# have around 50k completions, measurements show that 1000 is still +# significantly faster than a buffer of 100. +DEFAULT_BUFFER_SIZE: int = 1000 + +_T = TypeVar("_T") + + +class _Done: + pass + + +async def generator_to_async_generator( + get_iterable: Callable[[], Iterable[_T]], + buffer_size: int = DEFAULT_BUFFER_SIZE, +) -> AsyncGenerator[_T, None]: + """ + Turn a generator or iterable into an async generator. + + This works by running the generator in a background thread. + + :param get_iterable: Function that returns a generator or iterable when + called. + :param buffer_size: Size of the queue between the async consumer and the + synchronous generator that produces items. + """ + quitting = False + # NOTE: We are limiting the queue size in order to have back-pressure. + q: Queue[Union[_T, _Done]] = Queue(maxsize=buffer_size) + loop = get_event_loop() + + def runner() -> None: + """ + Consume the generator in background thread. + When items are received, they'll be pushed to the queue. + """ + try: + for item in get_iterable(): + # When this async generator was cancelled (closed), stop this + # thread. + if quitting: + return + + while True: + try: + q.put(item, timeout=1) + except Full: + if quitting: + return + continue + else: + break + + finally: + while True: + try: + q.put(_Done(), timeout=1) + except Full: + if quitting: + return + continue + else: + break + + # Start background thread. + runner_f = run_in_executor_with_context(runner) + + try: + while True: + try: + item = q.get_nowait() + except Empty: + item = await loop.run_in_executor(None, q.get) + if isinstance(item, _Done): + break + else: + yield item + finally: + # When this async generator is closed (GeneratorExit exception, stop + # the background thread as well. - we don't need that anymore.) + quitting = True + + # Wait for the background thread to finish. (should happen right after + # the last item is yielded). + await runner_f diff --git a/src/prompt_toolkit/eventloop/dummy_contextvars.py b/src/prompt_toolkit/eventloop/dummy_contextvars.py new file mode 100644 index 0000000..3fcd260 --- /dev/null +++ b/src/prompt_toolkit/eventloop/dummy_contextvars.py @@ -0,0 +1,56 @@ +""" +Dummy contextvars implementation, to make prompt_toolkit work on Python 3.6. + +As long as there is only one application running at a time, we don't need the +real contextvars. So, stuff like the telnet-server and so on requires 3.7. +""" +from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar + +if TYPE_CHECKING: + from typing_extensions import ParamSpec + + +def copy_context() -> "Context": + return Context() + + +if TYPE_CHECKING: + _P = ParamSpec("_P") +_T = TypeVar("_T") + + +class Context: + def run( + self, callable: "Callable[_P, _T]", *args: "_P.args", **kwargs: "_P.kwargs" + ) -> _T: + return callable(*args, **kwargs) + + def copy(self) -> "Context": + return self + + +class Token(Generic[_T]): + pass + + +class ContextVar(Generic[_T]): + def __init__(self, name: str, *, default: Optional[_T] = None) -> None: + self._name = name + self._value = default + + @property + def name(self) -> str: + return self._name + + def get(self, default: Optional[_T] = None) -> _T: + result = self._value or default + if result is None: + raise LookupError + return result + + def set(self, value: _T) -> Token[_T]: + self._value = value + return Token() + + def reset(self, token: Token[_T]) -> None: + pass diff --git a/src/prompt_toolkit/eventloop/inputhook.py b/src/prompt_toolkit/eventloop/inputhook.py new file mode 100644 index 0000000..05d2981 --- /dev/null +++ b/src/prompt_toolkit/eventloop/inputhook.py @@ -0,0 +1,183 @@ +""" +Similar to `PyOS_InputHook` of the Python API, we can plug in an input hook in +the asyncio event loop. + +The way this works is by using a custom 'selector' that runs the other event +loop until the real selector is ready. + +It's the responsibility of this event hook to return when there is input ready. +There are two ways to detect when input is ready: + +The inputhook itself is a callable that receives an `InputHookContext`. This +callable should run the other event loop, and return when the main loop has +stuff to do. There are two ways to detect when to return: + +- Call the `input_is_ready` method periodically. Quit when this returns `True`. + +- Add the `fileno` as a watch to the external eventloop. Quit when file descriptor + becomes readable. (But don't read from it.) + + Note that this is not the same as checking for `sys.stdin.fileno()`. The + eventloop of prompt-toolkit allows thread-based executors, for example for + asynchronous autocompletion. When the completion for instance is ready, we + also want prompt-toolkit to gain control again in order to display that. +""" +import asyncio +import os +import select +import selectors +import sys +import threading +from asyncio import AbstractEventLoop +from selectors import BaseSelector, SelectorKey +from typing import TYPE_CHECKING, Any, Callable, List, Mapping, Optional, Tuple + +from .utils import get_event_loop + +__all__ = [ + "new_eventloop_with_inputhook", + "set_eventloop_with_inputhook", + "InputHookSelector", + "InputHookContext", +] + +if TYPE_CHECKING: + from _typeshed import FileDescriptorLike + + _EventMask = int + + +def new_eventloop_with_inputhook( + inputhook: Callable[["InputHookContext"], None] +) -> AbstractEventLoop: + """ + Create a new event loop with the given inputhook. + """ + selector = InputHookSelector(selectors.DefaultSelector(), inputhook) + loop = asyncio.SelectorEventLoop(selector) + return loop + + +def set_eventloop_with_inputhook( + inputhook: Callable[["InputHookContext"], None] +) -> AbstractEventLoop: + """ + Create a new event loop with the given inputhook, and activate it. + """ + loop = new_eventloop_with_inputhook(inputhook) + asyncio.set_event_loop(loop) + return loop + + +class InputHookSelector(BaseSelector): + """ + Usage: + + selector = selectors.SelectSelector() + loop = asyncio.SelectorEventLoop(InputHookSelector(selector, inputhook)) + asyncio.set_event_loop(loop) + """ + + def __init__( + self, selector: BaseSelector, inputhook: Callable[["InputHookContext"], None] + ) -> None: + self.selector = selector + self.inputhook = inputhook + self._r, self._w = os.pipe() + + def register( + self, fileobj: "FileDescriptorLike", events: "_EventMask", data: Any = None + ) -> "SelectorKey": + return self.selector.register(fileobj, events, data=data) + + def unregister(self, fileobj: "FileDescriptorLike") -> "SelectorKey": + return self.selector.unregister(fileobj) + + def modify( + self, fileobj: "FileDescriptorLike", events: "_EventMask", data: Any = None + ) -> "SelectorKey": + return self.selector.modify(fileobj, events, data=None) + + def select( + self, timeout: Optional[float] = None + ) -> List[Tuple["SelectorKey", "_EventMask"]]: + # If there are tasks in the current event loop, + # don't run the input hook. + if len(getattr(get_event_loop(), "_ready", [])) > 0: + return self.selector.select(timeout=timeout) + + ready = False + result = None + + # Run selector in other thread. + def run_selector() -> None: + nonlocal ready, result + result = self.selector.select(timeout=timeout) + os.write(self._w, b"x") + ready = True + + th = threading.Thread(target=run_selector) + th.start() + + def input_is_ready() -> bool: + return ready + + # Call inputhook. + # The inputhook function is supposed to return when our selector + # becomes ready. The inputhook can do that by registering the fd in its + # own loop, or by checking the `input_is_ready` function regularly. + self.inputhook(InputHookContext(self._r, input_is_ready)) + + # Flush the read end of the pipe. + try: + # Before calling 'os.read', call select.select. This is required + # when the gevent monkey patch has been applied. 'os.read' is never + # monkey patched and won't be cooperative, so that would block all + # other select() calls otherwise. + # See: http://www.gevent.org/gevent.os.html + + # Note: On Windows, this is apparently not an issue. + # However, if we would ever want to add a select call, it + # should use `windll.kernel32.WaitForMultipleObjects`, + # because `select.select` can't wait for a pipe on Windows. + if sys.platform != "win32": + select.select([self._r], [], [], None) + + os.read(self._r, 1024) + except OSError: + # This happens when the window resizes and a SIGWINCH was received. + # We get 'Error: [Errno 4] Interrupted system call' + # Just ignore. + pass + + # Wait for the real selector to be done. + th.join() + assert result is not None + return result + + def close(self) -> None: + """ + Clean up resources. + """ + if self._r: + os.close(self._r) + os.close(self._w) + + self._r = self._w = -1 + self.selector.close() + + def get_map(self) -> Mapping["FileDescriptorLike", "SelectorKey"]: + return self.selector.get_map() + + +class InputHookContext: + """ + Given as a parameter to the inputhook. + """ + + def __init__(self, fileno: int, input_is_ready: Callable[[], bool]) -> None: + self._fileno = fileno + self.input_is_ready = input_is_ready + + def fileno(self) -> int: + return self._fileno diff --git a/src/prompt_toolkit/eventloop/utils.py b/src/prompt_toolkit/eventloop/utils.py new file mode 100644 index 0000000..2e5a05e --- /dev/null +++ b/src/prompt_toolkit/eventloop/utils.py @@ -0,0 +1,118 @@ +import asyncio +import sys +import time +from types import TracebackType +from typing import Any, Awaitable, Callable, Dict, Optional, TypeVar, cast + +try: + import contextvars +except ImportError: + from . import dummy_contextvars as contextvars # type: ignore + +__all__ = [ + "run_in_executor_with_context", + "call_soon_threadsafe", + "get_traceback_from_context", + "get_event_loop", +] + +_T = TypeVar("_T") + + +def run_in_executor_with_context( + func: Callable[..., _T], + *args: Any, + loop: Optional[asyncio.AbstractEventLoop] = None, +) -> Awaitable[_T]: + """ + Run a function in an executor, but make sure it uses the same contextvars. + This is required so that the function will see the right application. + + See also: https://bugs.python.org/issue34014 + """ + loop = loop or get_event_loop() + ctx: contextvars.Context = contextvars.copy_context() + + return loop.run_in_executor(None, ctx.run, func, *args) + + +def call_soon_threadsafe( + func: Callable[[], None], + max_postpone_time: Optional[float] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, +) -> None: + """ + Wrapper around asyncio's `call_soon_threadsafe`. + + This takes a `max_postpone_time` which can be used to tune the urgency of + the method. + + Asyncio runs tasks in first-in-first-out. However, this is not what we + want for the render function of the prompt_toolkit UI. Rendering is + expensive, but since the UI is invalidated very often, in some situations + we render the UI too often, so much that the rendering CPU usage slows down + the rest of the processing of the application. (Pymux is an example where + we have to balance the CPU time spend on rendering the UI, and parsing + process output.) + However, we want to set a deadline value, for when the rendering should + happen. (The UI should stay responsive). + """ + loop2 = loop or get_event_loop() + + # If no `max_postpone_time` has been given, schedule right now. + if max_postpone_time is None: + loop2.call_soon_threadsafe(func) + return + + max_postpone_until = time.time() + max_postpone_time + + def schedule() -> None: + # When there are no other tasks scheduled in the event loop. Run it + # now. + # Notice: uvloop doesn't have this _ready attribute. In that case, + # always call immediately. + if not getattr(loop2, "_ready", []): + func() + return + + # If the timeout expired, run this now. + if time.time() > max_postpone_until: + func() + return + + # Schedule again for later. + loop2.call_soon_threadsafe(schedule) + + loop2.call_soon_threadsafe(schedule) + + +def get_traceback_from_context(context: Dict[str, Any]) -> Optional[TracebackType]: + """ + Get the traceback object from the context. + """ + exception = context.get("exception") + if exception: + if hasattr(exception, "__traceback__"): + return cast(TracebackType, exception.__traceback__) + else: + # call_exception_handler() is usually called indirectly + # from an except block. If it's not the case, the traceback + # is undefined... + return sys.exc_info()[2] + + return None + + +def get_event_loop() -> asyncio.AbstractEventLoop: + """Backward compatible way to get the event loop""" + # Python 3.6 doesn't have get_running_loop + # Python 3.10 deprecated get_event_loop + if sys.version_info >= (3, 7): + getloop = asyncio.get_running_loop + else: + getloop = asyncio.get_event_loop + + try: + return getloop() + except RuntimeError: + return asyncio.get_event_loop_policy().get_event_loop() diff --git a/src/prompt_toolkit/eventloop/win32.py b/src/prompt_toolkit/eventloop/win32.py new file mode 100644 index 0000000..fbc02d4 --- /dev/null +++ b/src/prompt_toolkit/eventloop/win32.py @@ -0,0 +1,73 @@ +import sys + +assert sys.platform == "win32" + +from ctypes import pointer + +from ..utils import SPHINX_AUTODOC_RUNNING + +# Do not import win32-specific stuff when generating documentation. +# Otherwise RTD would be unable to generate docs for this module. +if not SPHINX_AUTODOC_RUNNING: + from ctypes import windll + +from ctypes.wintypes import BOOL, DWORD, HANDLE +from typing import List, Optional + +from prompt_toolkit.win32_types import SECURITY_ATTRIBUTES + +__all__ = ["wait_for_handles", "create_win32_event"] + + +WAIT_TIMEOUT = 0x00000102 +INFINITE = -1 + + +def wait_for_handles( + handles: List[HANDLE], timeout: int = INFINITE +) -> Optional[HANDLE]: + """ + Waits for multiple handles. (Similar to 'select') Returns the handle which is ready. + Returns `None` on timeout. + http://msdn.microsoft.com/en-us/library/windows/desktop/ms687025(v=vs.85).aspx + + Note that handles should be a list of `HANDLE` objects, not integers. See + this comment in the patch by @quark-zju for the reason why: + + ''' Make sure HANDLE on Windows has a correct size + + Previously, the type of various HANDLEs are native Python integer + types. The ctypes library will treat them as 4-byte integer when used + in function arguments. On 64-bit Windows, HANDLE is 8-byte and usually + a small integer. Depending on whether the extra 4 bytes are zero-ed out + or not, things can happen to work, or break. ''' + + This function returns either `None` or one of the given `HANDLE` objects. + (The return value can be tested with the `is` operator.) + """ + arrtype = HANDLE * len(handles) + handle_array = arrtype(*handles) + + ret: int = windll.kernel32.WaitForMultipleObjects( + len(handle_array), handle_array, BOOL(False), DWORD(timeout) + ) + + if ret == WAIT_TIMEOUT: + return None + else: + return handles[ret] + + +def create_win32_event() -> HANDLE: + """ + Creates a Win32 unnamed Event . + http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx + """ + return HANDLE( + windll.kernel32.CreateEventA( + pointer(SECURITY_ATTRIBUTES()), + BOOL(True), # Manual reset event. + BOOL(False), # Initial state. + None, # Unnamed event object. + ) + ) |