summaryrefslogtreecommitdiffstats
path: root/src/prompt_toolkit/eventloop/utils.py
blob: 2e5a05e8383b887902a6330599531eb2b78d2f12 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import asyncio
import sys
import time
from types import TracebackType
from typing import Any, Awaitable, Callable, Dict, Optional, TypeVar, cast

try:
    import contextvars
except ImportError:
    from . import dummy_contextvars as contextvars  # type: ignore

__all__ = [
    "run_in_executor_with_context",
    "call_soon_threadsafe",
    "get_traceback_from_context",
    "get_event_loop",
]

_T = TypeVar("_T")


def run_in_executor_with_context(
    func: Callable[..., _T],
    *args: Any,
    loop: Optional[asyncio.AbstractEventLoop] = None,
) -> Awaitable[_T]:
    """
    Run a function in an executor, but make sure it uses the same contextvars.
    This is required so that the function will see the right application.

    See also: https://bugs.python.org/issue34014
    """
    loop = loop or get_event_loop()
    ctx: contextvars.Context = contextvars.copy_context()

    return loop.run_in_executor(None, ctx.run, func, *args)


def call_soon_threadsafe(
    func: Callable[[], None],
    max_postpone_time: Optional[float] = None,
    loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
    """
    Wrapper around asyncio's `call_soon_threadsafe`.

    This takes a `max_postpone_time` which can be used to tune the urgency of
    the method.

    Asyncio runs tasks in first-in-first-out. However, this is not what we
    want for the render function of the prompt_toolkit UI. Rendering is
    expensive, but since the UI is invalidated very often, in some situations
    we render the UI too often, so much that the rendering CPU usage slows down
    the rest of the processing of the application.  (Pymux is an example where
    we have to balance the CPU time spend on rendering the UI, and parsing
    process output.)
    However, we want to set a deadline value, for when the rendering should
    happen. (The UI should stay responsive).
    """
    loop2 = loop or get_event_loop()

    # If no `max_postpone_time` has been given, schedule right now.
    if max_postpone_time is None:
        loop2.call_soon_threadsafe(func)
        return

    max_postpone_until = time.time() + max_postpone_time

    def schedule() -> None:
        # When there are no other tasks scheduled in the event loop. Run it
        # now.
        # Notice: uvloop doesn't have this _ready attribute. In that case,
        #         always call immediately.
        if not getattr(loop2, "_ready", []):
            func()
            return

        # If the timeout expired, run this now.
        if time.time() > max_postpone_until:
            func()
            return

        # Schedule again for later.
        loop2.call_soon_threadsafe(schedule)

    loop2.call_soon_threadsafe(schedule)


def get_traceback_from_context(context: Dict[str, Any]) -> Optional[TracebackType]:
    """
    Get the traceback object from the context.
    """
    exception = context.get("exception")
    if exception:
        if hasattr(exception, "__traceback__"):
            return cast(TracebackType, exception.__traceback__)
        else:
            # call_exception_handler() is usually called indirectly
            # from an except block. If it's not the case, the traceback
            # is undefined...
            return sys.exc_info()[2]

    return None


def get_event_loop() -> asyncio.AbstractEventLoop:
    """Backward compatible way to get the event loop"""
    # Python 3.6 doesn't have get_running_loop
    # Python 3.10 deprecated get_event_loop
    if sys.version_info >= (3, 7):
        getloop = asyncio.get_running_loop
    else:
        getloop = asyncio.get_event_loop

    try:
        return getloop()
    except RuntimeError:
        return asyncio.get_event_loop_policy().get_event_loop()