diff options
Diffstat (limited to '')
-rw-r--r-- | src/prompt_toolkit/input/__init__.py | 12 | ||||
-rw-r--r-- | src/prompt_toolkit/input/ansi_escape_sequences.py | 343 | ||||
-rw-r--r-- | src/prompt_toolkit/input/base.py | 150 | ||||
-rw-r--r-- | src/prompt_toolkit/input/defaults.py | 79 | ||||
-rw-r--r-- | src/prompt_toolkit/input/posix_pipe.py | 116 | ||||
-rw-r--r-- | src/prompt_toolkit/input/posix_utils.py | 95 | ||||
-rw-r--r-- | src/prompt_toolkit/input/typeahead.py | 76 | ||||
-rw-r--r-- | src/prompt_toolkit/input/vt100.py | 320 | ||||
-rw-r--r-- | src/prompt_toolkit/input/vt100_parser.py | 247 | ||||
-rw-r--r-- | src/prompt_toolkit/input/win32.py | 757 | ||||
-rw-r--r-- | src/prompt_toolkit/input/win32_pipe.py | 154 |
11 files changed, 2349 insertions, 0 deletions
diff --git a/src/prompt_toolkit/input/__init__.py b/src/prompt_toolkit/input/__init__.py new file mode 100644 index 0000000..dc31976 --- /dev/null +++ b/src/prompt_toolkit/input/__init__.py @@ -0,0 +1,12 @@ +from .base import DummyInput, Input, PipeInput +from .defaults import create_input, create_pipe_input + +__all__ = [ + # Base. + "Input", + "PipeInput", + "DummyInput", + # Defaults. + "create_input", + "create_pipe_input", +] diff --git a/src/prompt_toolkit/input/ansi_escape_sequences.py b/src/prompt_toolkit/input/ansi_escape_sequences.py new file mode 100644 index 0000000..2e6c5b9 --- /dev/null +++ b/src/prompt_toolkit/input/ansi_escape_sequences.py @@ -0,0 +1,343 @@ +""" +Mappings from VT100 (ANSI) escape sequences to the corresponding prompt_toolkit +keys. + +We are not using the terminfo/termcap databases to detect the ANSI escape +sequences for the input. Instead, we recognize 99% of the most common +sequences. This works well, because in practice, every modern terminal is +mostly Xterm compatible. + +Some useful docs: +- Mintty: https://github.com/mintty/mintty/blob/master/wiki/Keycodes.md +""" +from typing import Dict, Tuple, Union + +from ..keys import Keys + +__all__ = [ + "ANSI_SEQUENCES", + "REVERSE_ANSI_SEQUENCES", +] + +# Mapping of vt100 escape codes to Keys. +ANSI_SEQUENCES: Dict[str, Union[Keys, Tuple[Keys, ...]]] = { + # Control keys. + "\x00": Keys.ControlAt, # Control-At (Also for Ctrl-Space) + "\x01": Keys.ControlA, # Control-A (home) + "\x02": Keys.ControlB, # Control-B (emacs cursor left) + "\x03": Keys.ControlC, # Control-C (interrupt) + "\x04": Keys.ControlD, # Control-D (exit) + "\x05": Keys.ControlE, # Control-E (end) + "\x06": Keys.ControlF, # Control-F (cursor forward) + "\x07": Keys.ControlG, # Control-G + "\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') + "\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') + "\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') + "\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) + "\x0c": Keys.ControlL, # Control-L (clear; form feed) + "\x0d": Keys.ControlM, # Control-M (13) (Identical to '\r') + "\x0e": Keys.ControlN, # Control-N (14) (history forward) + "\x0f": Keys.ControlO, # Control-O (15) + "\x10": Keys.ControlP, # Control-P (16) (history back) + "\x11": Keys.ControlQ, # Control-Q + "\x12": Keys.ControlR, # Control-R (18) (reverse search) + "\x13": Keys.ControlS, # Control-S (19) (forward search) + "\x14": Keys.ControlT, # Control-T + "\x15": Keys.ControlU, # Control-U + "\x16": Keys.ControlV, # Control-V + "\x17": Keys.ControlW, # Control-W + "\x18": Keys.ControlX, # Control-X + "\x19": Keys.ControlY, # Control-Y (25) + "\x1a": Keys.ControlZ, # Control-Z + "\x1b": Keys.Escape, # Also Control-[ + "\x9b": Keys.ShiftEscape, + "\x1c": Keys.ControlBackslash, # Both Control-\ (also Ctrl-| ) + "\x1d": Keys.ControlSquareClose, # Control-] + "\x1e": Keys.ControlCircumflex, # Control-^ + "\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) + # ASCII Delete (0x7f) + # Vt220 (and Linux terminal) send this when pressing backspace. We map this + # to ControlH, because that will make it easier to create key bindings that + # work everywhere, with the trade-off that it's no longer possible to + # handle backspace and control-h individually for the few terminals that + # support it. (Most terminals send ControlH when backspace is pressed.) + # See: http://www.ibb.net/~anne/keyboard.html + "\x7f": Keys.ControlH, + # -- + # Various + "\x1b[1~": Keys.Home, # tmux + "\x1b[2~": Keys.Insert, + "\x1b[3~": Keys.Delete, + "\x1b[4~": Keys.End, # tmux + "\x1b[5~": Keys.PageUp, + "\x1b[6~": Keys.PageDown, + "\x1b[7~": Keys.Home, # xrvt + "\x1b[8~": Keys.End, # xrvt + "\x1b[Z": Keys.BackTab, # shift + tab + "\x1b\x09": Keys.BackTab, # Linux console + "\x1b[~": Keys.BackTab, # Windows console + # -- + # Function keys. + "\x1bOP": Keys.F1, + "\x1bOQ": Keys.F2, + "\x1bOR": Keys.F3, + "\x1bOS": Keys.F4, + "\x1b[[A": Keys.F1, # Linux console. + "\x1b[[B": Keys.F2, # Linux console. + "\x1b[[C": Keys.F3, # Linux console. + "\x1b[[D": Keys.F4, # Linux console. + "\x1b[[E": Keys.F5, # Linux console. + "\x1b[11~": Keys.F1, # rxvt-unicode + "\x1b[12~": Keys.F2, # rxvt-unicode + "\x1b[13~": Keys.F3, # rxvt-unicode + "\x1b[14~": Keys.F4, # rxvt-unicode + "\x1b[15~": Keys.F5, + "\x1b[17~": Keys.F6, + "\x1b[18~": Keys.F7, + "\x1b[19~": Keys.F8, + "\x1b[20~": Keys.F9, + "\x1b[21~": Keys.F10, + "\x1b[23~": Keys.F11, + "\x1b[24~": Keys.F12, + "\x1b[25~": Keys.F13, + "\x1b[26~": Keys.F14, + "\x1b[28~": Keys.F15, + "\x1b[29~": Keys.F16, + "\x1b[31~": Keys.F17, + "\x1b[32~": Keys.F18, + "\x1b[33~": Keys.F19, + "\x1b[34~": Keys.F20, + # Xterm + "\x1b[1;2P": Keys.F13, + "\x1b[1;2Q": Keys.F14, + # '\x1b[1;2R': Keys.F15, # Conflicts with CPR response. + "\x1b[1;2S": Keys.F16, + "\x1b[15;2~": Keys.F17, + "\x1b[17;2~": Keys.F18, + "\x1b[18;2~": Keys.F19, + "\x1b[19;2~": Keys.F20, + "\x1b[20;2~": Keys.F21, + "\x1b[21;2~": Keys.F22, + "\x1b[23;2~": Keys.F23, + "\x1b[24;2~": Keys.F24, + # -- + # CSI 27 disambiguated modified "other" keys (xterm) + # Ref: https://invisible-island.net/xterm/modified-keys.html + # These are currently unsupported, so just re-map some common ones to the + # unmodified versions + "\x1b[27;2;13~": Keys.ControlM, # Shift + Enter + "\x1b[27;5;13~": Keys.ControlM, # Ctrl + Enter + "\x1b[27;6;13~": Keys.ControlM, # Ctrl + Shift + Enter + # -- + # Control + function keys. + "\x1b[1;5P": Keys.ControlF1, + "\x1b[1;5Q": Keys.ControlF2, + # "\x1b[1;5R": Keys.ControlF3, # Conflicts with CPR response. + "\x1b[1;5S": Keys.ControlF4, + "\x1b[15;5~": Keys.ControlF5, + "\x1b[17;5~": Keys.ControlF6, + "\x1b[18;5~": Keys.ControlF7, + "\x1b[19;5~": Keys.ControlF8, + "\x1b[20;5~": Keys.ControlF9, + "\x1b[21;5~": Keys.ControlF10, + "\x1b[23;5~": Keys.ControlF11, + "\x1b[24;5~": Keys.ControlF12, + "\x1b[1;6P": Keys.ControlF13, + "\x1b[1;6Q": Keys.ControlF14, + # "\x1b[1;6R": Keys.ControlF15, # Conflicts with CPR response. + "\x1b[1;6S": Keys.ControlF16, + "\x1b[15;6~": Keys.ControlF17, + "\x1b[17;6~": Keys.ControlF18, + "\x1b[18;6~": Keys.ControlF19, + "\x1b[19;6~": Keys.ControlF20, + "\x1b[20;6~": Keys.ControlF21, + "\x1b[21;6~": Keys.ControlF22, + "\x1b[23;6~": Keys.ControlF23, + "\x1b[24;6~": Keys.ControlF24, + # -- + # Tmux (Win32 subsystem) sends the following scroll events. + "\x1b[62~": Keys.ScrollUp, + "\x1b[63~": Keys.ScrollDown, + "\x1b[200~": Keys.BracketedPaste, # Start of bracketed paste. + # -- + # Sequences generated by numpad 5. Not sure what it means. (It doesn't + # appear in 'infocmp'. Just ignore. + "\x1b[E": Keys.Ignore, # Xterm. + "\x1b[G": Keys.Ignore, # Linux console. + # -- + # Meta/control/escape + pageup/pagedown/insert/delete. + "\x1b[3;2~": Keys.ShiftDelete, # xterm, gnome-terminal. + "\x1b[5;2~": Keys.ShiftPageUp, + "\x1b[6;2~": Keys.ShiftPageDown, + "\x1b[2;3~": (Keys.Escape, Keys.Insert), + "\x1b[3;3~": (Keys.Escape, Keys.Delete), + "\x1b[5;3~": (Keys.Escape, Keys.PageUp), + "\x1b[6;3~": (Keys.Escape, Keys.PageDown), + "\x1b[2;4~": (Keys.Escape, Keys.ShiftInsert), + "\x1b[3;4~": (Keys.Escape, Keys.ShiftDelete), + "\x1b[5;4~": (Keys.Escape, Keys.ShiftPageUp), + "\x1b[6;4~": (Keys.Escape, Keys.ShiftPageDown), + "\x1b[3;5~": Keys.ControlDelete, # xterm, gnome-terminal. + "\x1b[5;5~": Keys.ControlPageUp, + "\x1b[6;5~": Keys.ControlPageDown, + "\x1b[3;6~": Keys.ControlShiftDelete, + "\x1b[5;6~": Keys.ControlShiftPageUp, + "\x1b[6;6~": Keys.ControlShiftPageDown, + "\x1b[2;7~": (Keys.Escape, Keys.ControlInsert), + "\x1b[5;7~": (Keys.Escape, Keys.ControlPageDown), + "\x1b[6;7~": (Keys.Escape, Keys.ControlPageDown), + "\x1b[2;8~": (Keys.Escape, Keys.ControlShiftInsert), + "\x1b[5;8~": (Keys.Escape, Keys.ControlShiftPageDown), + "\x1b[6;8~": (Keys.Escape, Keys.ControlShiftPageDown), + # -- + # Arrows. + # (Normal cursor mode). + "\x1b[A": Keys.Up, + "\x1b[B": Keys.Down, + "\x1b[C": Keys.Right, + "\x1b[D": Keys.Left, + "\x1b[H": Keys.Home, + "\x1b[F": Keys.End, + # Tmux sends following keystrokes when control+arrow is pressed, but for + # Emacs ansi-term sends the same sequences for normal arrow keys. Consider + # it a normal arrow press, because that's more important. + # (Application cursor mode). + "\x1bOA": Keys.Up, + "\x1bOB": Keys.Down, + "\x1bOC": Keys.Right, + "\x1bOD": Keys.Left, + "\x1bOF": Keys.End, + "\x1bOH": Keys.Home, + # Shift + arrows. + "\x1b[1;2A": Keys.ShiftUp, + "\x1b[1;2B": Keys.ShiftDown, + "\x1b[1;2C": Keys.ShiftRight, + "\x1b[1;2D": Keys.ShiftLeft, + "\x1b[1;2F": Keys.ShiftEnd, + "\x1b[1;2H": Keys.ShiftHome, + # Meta + arrow keys. Several terminals handle this differently. + # The following sequences are for xterm and gnome-terminal. + # (Iterm sends ESC followed by the normal arrow_up/down/left/right + # sequences, and the OSX Terminal sends ESCb and ESCf for "alt + # arrow_left" and "alt arrow_right." We don't handle these + # explicitly, in here, because would could not distinguish between + # pressing ESC (to go to Vi navigation mode), followed by just the + # 'b' or 'f' key. These combinations are handled in + # the input processor.) + "\x1b[1;3A": (Keys.Escape, Keys.Up), + "\x1b[1;3B": (Keys.Escape, Keys.Down), + "\x1b[1;3C": (Keys.Escape, Keys.Right), + "\x1b[1;3D": (Keys.Escape, Keys.Left), + "\x1b[1;3F": (Keys.Escape, Keys.End), + "\x1b[1;3H": (Keys.Escape, Keys.Home), + # Alt+shift+number. + "\x1b[1;4A": (Keys.Escape, Keys.ShiftDown), + "\x1b[1;4B": (Keys.Escape, Keys.ShiftUp), + "\x1b[1;4C": (Keys.Escape, Keys.ShiftRight), + "\x1b[1;4D": (Keys.Escape, Keys.ShiftLeft), + "\x1b[1;4F": (Keys.Escape, Keys.ShiftEnd), + "\x1b[1;4H": (Keys.Escape, Keys.ShiftHome), + # Control + arrows. + "\x1b[1;5A": Keys.ControlUp, # Cursor Mode + "\x1b[1;5B": Keys.ControlDown, # Cursor Mode + "\x1b[1;5C": Keys.ControlRight, # Cursor Mode + "\x1b[1;5D": Keys.ControlLeft, # Cursor Mode + "\x1b[1;5F": Keys.ControlEnd, + "\x1b[1;5H": Keys.ControlHome, + # Tmux sends following keystrokes when control+arrow is pressed, but for + # Emacs ansi-term sends the same sequences for normal arrow keys. Consider + # it a normal arrow press, because that's more important. + "\x1b[5A": Keys.ControlUp, + "\x1b[5B": Keys.ControlDown, + "\x1b[5C": Keys.ControlRight, + "\x1b[5D": Keys.ControlLeft, + "\x1bOc": Keys.ControlRight, # rxvt + "\x1bOd": Keys.ControlLeft, # rxvt + # Control + shift + arrows. + "\x1b[1;6A": Keys.ControlShiftDown, + "\x1b[1;6B": Keys.ControlShiftUp, + "\x1b[1;6C": Keys.ControlShiftRight, + "\x1b[1;6D": Keys.ControlShiftLeft, + "\x1b[1;6F": Keys.ControlShiftEnd, + "\x1b[1;6H": Keys.ControlShiftHome, + # Control + Meta + arrows. + "\x1b[1;7A": (Keys.Escape, Keys.ControlDown), + "\x1b[1;7B": (Keys.Escape, Keys.ControlUp), + "\x1b[1;7C": (Keys.Escape, Keys.ControlRight), + "\x1b[1;7D": (Keys.Escape, Keys.ControlLeft), + "\x1b[1;7F": (Keys.Escape, Keys.ControlEnd), + "\x1b[1;7H": (Keys.Escape, Keys.ControlHome), + # Meta + Shift + arrows. + "\x1b[1;8A": (Keys.Escape, Keys.ControlShiftDown), + "\x1b[1;8B": (Keys.Escape, Keys.ControlShiftUp), + "\x1b[1;8C": (Keys.Escape, Keys.ControlShiftRight), + "\x1b[1;8D": (Keys.Escape, Keys.ControlShiftLeft), + "\x1b[1;8F": (Keys.Escape, Keys.ControlShiftEnd), + "\x1b[1;8H": (Keys.Escape, Keys.ControlShiftHome), + # Meta + arrow on (some?) Macs when using iTerm defaults (see issue #483). + "\x1b[1;9A": (Keys.Escape, Keys.Up), + "\x1b[1;9B": (Keys.Escape, Keys.Down), + "\x1b[1;9C": (Keys.Escape, Keys.Right), + "\x1b[1;9D": (Keys.Escape, Keys.Left), + # -- + # Control/shift/meta + number in mintty. + # (c-2 will actually send c-@ and c-6 will send c-^.) + "\x1b[1;5p": Keys.Control0, + "\x1b[1;5q": Keys.Control1, + "\x1b[1;5r": Keys.Control2, + "\x1b[1;5s": Keys.Control3, + "\x1b[1;5t": Keys.Control4, + "\x1b[1;5u": Keys.Control5, + "\x1b[1;5v": Keys.Control6, + "\x1b[1;5w": Keys.Control7, + "\x1b[1;5x": Keys.Control8, + "\x1b[1;5y": Keys.Control9, + "\x1b[1;6p": Keys.ControlShift0, + "\x1b[1;6q": Keys.ControlShift1, + "\x1b[1;6r": Keys.ControlShift2, + "\x1b[1;6s": Keys.ControlShift3, + "\x1b[1;6t": Keys.ControlShift4, + "\x1b[1;6u": Keys.ControlShift5, + "\x1b[1;6v": Keys.ControlShift6, + "\x1b[1;6w": Keys.ControlShift7, + "\x1b[1;6x": Keys.ControlShift8, + "\x1b[1;6y": Keys.ControlShift9, + "\x1b[1;7p": (Keys.Escape, Keys.Control0), + "\x1b[1;7q": (Keys.Escape, Keys.Control1), + "\x1b[1;7r": (Keys.Escape, Keys.Control2), + "\x1b[1;7s": (Keys.Escape, Keys.Control3), + "\x1b[1;7t": (Keys.Escape, Keys.Control4), + "\x1b[1;7u": (Keys.Escape, Keys.Control5), + "\x1b[1;7v": (Keys.Escape, Keys.Control6), + "\x1b[1;7w": (Keys.Escape, Keys.Control7), + "\x1b[1;7x": (Keys.Escape, Keys.Control8), + "\x1b[1;7y": (Keys.Escape, Keys.Control9), + "\x1b[1;8p": (Keys.Escape, Keys.ControlShift0), + "\x1b[1;8q": (Keys.Escape, Keys.ControlShift1), + "\x1b[1;8r": (Keys.Escape, Keys.ControlShift2), + "\x1b[1;8s": (Keys.Escape, Keys.ControlShift3), + "\x1b[1;8t": (Keys.Escape, Keys.ControlShift4), + "\x1b[1;8u": (Keys.Escape, Keys.ControlShift5), + "\x1b[1;8v": (Keys.Escape, Keys.ControlShift6), + "\x1b[1;8w": (Keys.Escape, Keys.ControlShift7), + "\x1b[1;8x": (Keys.Escape, Keys.ControlShift8), + "\x1b[1;8y": (Keys.Escape, Keys.ControlShift9), +} + + +def _get_reverse_ansi_sequences() -> Dict[Keys, str]: + """ + Create a dictionary that maps prompt_toolkit keys back to the VT100 escape + sequences. + """ + result: Dict[Keys, str] = {} + + for sequence, key in ANSI_SEQUENCES.items(): + if not isinstance(key, tuple): + if key not in result: + result[key] = sequence + + return result + + +REVERSE_ANSI_SEQUENCES = _get_reverse_ansi_sequences() diff --git a/src/prompt_toolkit/input/base.py b/src/prompt_toolkit/input/base.py new file mode 100644 index 0000000..313622d --- /dev/null +++ b/src/prompt_toolkit/input/base.py @@ -0,0 +1,150 @@ +""" +Abstraction of CLI Input. +""" +from abc import ABCMeta, abstractmethod, abstractproperty +from contextlib import contextmanager +from typing import Callable, ContextManager, Generator, List + +from prompt_toolkit.key_binding import KeyPress + +__all__ = [ + "Input", + "PipeInput", + "DummyInput", +] + + +class Input(metaclass=ABCMeta): + """ + Abstraction for any input. + + An instance of this class can be given to the constructor of a + :class:`~prompt_toolkit.application.Application` and will also be + passed to the :class:`~prompt_toolkit.eventloop.base.EventLoop`. + """ + + @abstractmethod + def fileno(self) -> int: + """ + Fileno for putting this in an event loop. + """ + + @abstractmethod + def typeahead_hash(self) -> str: + """ + Identifier for storing type ahead key presses. + """ + + @abstractmethod + def read_keys(self) -> List[KeyPress]: + """ + Return a list of Key objects which are read/parsed from the input. + """ + + def flush_keys(self) -> List[KeyPress]: + """ + Flush the underlying parser. and return the pending keys. + (Used for vt100 input.) + """ + return [] + + def flush(self) -> None: + "The event loop can call this when the input has to be flushed." + pass + + @abstractproperty + def closed(self) -> bool: + "Should be true when the input stream is closed." + return False + + @abstractmethod + def raw_mode(self) -> ContextManager[None]: + """ + Context manager that turns the input into raw mode. + """ + + @abstractmethod + def cooked_mode(self) -> ContextManager[None]: + """ + Context manager that turns the input into cooked mode. + """ + + @abstractmethod + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + + @abstractmethod + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + + def close(self) -> None: + "Close input." + pass + + +class PipeInput(Input): + """ + Abstraction for pipe input. + """ + + @abstractmethod + def send_bytes(self, data: bytes) -> None: + """Feed byte string into the pipe""" + + @abstractmethod + def send_text(self, data: str) -> None: + """Feed a text string into the pipe""" + + +class DummyInput(Input): + """ + Input for use in a `DummyApplication` + + If used in an actual application, it will make the application render + itself once and exit immediately, due to an `EOFError`. + """ + + def fileno(self) -> int: + raise NotImplementedError + + def typeahead_hash(self) -> str: + return "dummy-%s" % id(self) + + def read_keys(self) -> List[KeyPress]: + return [] + + @property + def closed(self) -> bool: + # This needs to be true, so that the dummy input will trigger an + # `EOFError` immediately in the application. + return True + + def raw_mode(self) -> ContextManager[None]: + return _dummy_context_manager() + + def cooked_mode(self) -> ContextManager[None]: + return _dummy_context_manager() + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + # Call the callback immediately once after attaching. + # This tells the callback to call `read_keys` and check the + # `input.closed` flag, after which it won't receive any keys, but knows + # that `EOFError` should be raised. This unblocks `read_from_input` in + # `application.py`. + input_ready_callback() + + return _dummy_context_manager() + + def detach(self) -> ContextManager[None]: + return _dummy_context_manager() + + +@contextmanager +def _dummy_context_manager() -> Generator[None, None, None]: + yield diff --git a/src/prompt_toolkit/input/defaults.py b/src/prompt_toolkit/input/defaults.py new file mode 100644 index 0000000..564f848 --- /dev/null +++ b/src/prompt_toolkit/input/defaults.py @@ -0,0 +1,79 @@ +import io +import sys +from typing import ContextManager, Optional, TextIO + +from .base import DummyInput, Input, PipeInput + +__all__ = [ + "create_input", + "create_pipe_input", +] + + +def create_input( + stdin: Optional[TextIO] = None, always_prefer_tty: bool = False +) -> Input: + """ + Create the appropriate `Input` object for the current os/environment. + + :param always_prefer_tty: When set, if `sys.stdin` is connected to a Unix + `pipe`, check whether `sys.stdout` or `sys.stderr` are connected to a + pseudo terminal. If so, open the tty for reading instead of reading for + `sys.stdin`. (We can open `stdout` or `stderr` for reading, this is how + a `$PAGER` works.) + """ + if sys.platform == "win32": + from .win32 import Win32Input + + # If `stdin` was assigned `None` (which happens with pythonw.exe), use + # a `DummyInput`. This triggers `EOFError` in the application code. + if stdin is None and sys.stdin is None: + return DummyInput() + + return Win32Input(stdin or sys.stdin) + else: + from .vt100 import Vt100Input + + # If no input TextIO is given, use stdin/stdout. + if stdin is None: + stdin = sys.stdin + + if always_prefer_tty: + for obj in [sys.stdin, sys.stdout, sys.stderr]: + if obj.isatty(): + stdin = obj + break + + # If we can't access the file descriptor for the selected stdin, return + # a `DummyInput` instead. This can happen for instance in unit tests, + # when `sys.stdin` is patched by something that's not an actual file. + # (Instantiating `Vt100Input` would fail in this case.) + try: + stdin.fileno() + except io.UnsupportedOperation: + return DummyInput() + + return Vt100Input(stdin) + + +def create_pipe_input() -> ContextManager[PipeInput]: + """ + Create an input pipe. + This is mostly useful for unit testing. + + Usage:: + + with create_pipe_input() as input: + input.send_text('inputdata') + + Breaking change: In prompt_toolkit 3.0.28 and earlier, this was returning + the `PipeInput` directly, rather than through a context manager. + """ + if sys.platform == "win32": + from .win32_pipe import Win32PipeInput + + return Win32PipeInput.create() + else: + from .posix_pipe import PosixPipeInput + + return PosixPipeInput.create() diff --git a/src/prompt_toolkit/input/posix_pipe.py b/src/prompt_toolkit/input/posix_pipe.py new file mode 100644 index 0000000..1e7dec7 --- /dev/null +++ b/src/prompt_toolkit/input/posix_pipe.py @@ -0,0 +1,116 @@ +import sys + +assert sys.platform != "win32" + +import os +from contextlib import contextmanager +from typing import ContextManager, Iterator, TextIO, cast + +from ..utils import DummyContext +from .base import PipeInput +from .vt100 import Vt100Input + +__all__ = [ + "PosixPipeInput", +] + + +class _Pipe: + "Wrapper around os.pipe, that ensures we don't double close any end." + + def __init__(self) -> None: + self.read_fd, self.write_fd = os.pipe() + self._read_closed = False + self._write_closed = False + + def close_read(self) -> None: + "Close read-end if not yet closed." + if self._read_closed: + return + + os.close(self.read_fd) + self._read_closed = True + + def close_write(self) -> None: + "Close write-end if not yet closed." + if self._write_closed: + return + + os.close(self.write_fd) + self._write_closed = True + + def close(self) -> None: + "Close both read and write ends." + self.close_read() + self.close_write() + + +class PosixPipeInput(Vt100Input, PipeInput): + """ + Input that is send through a pipe. + This is useful if we want to send the input programmatically into the + application. Mostly useful for unit testing. + + Usage:: + + with PosixPipeInput.create() as input: + input.send_text('inputdata') + """ + + _id = 0 + + def __init__(self, _pipe: _Pipe, _text: str = "") -> None: + # Private constructor. Users should use the public `.create()` method. + self.pipe = _pipe + + class Stdin: + encoding = "utf-8" + + def isatty(stdin) -> bool: + return True + + def fileno(stdin) -> int: + return self.pipe.read_fd + + super().__init__(cast(TextIO, Stdin())) + self.send_text(_text) + + # Identifier for every PipeInput for the hash. + self.__class__._id += 1 + self._id = self.__class__._id + + @classmethod + @contextmanager + def create(cls, text: str = "") -> Iterator["PosixPipeInput"]: + pipe = _Pipe() + try: + yield PosixPipeInput(_pipe=pipe, _text=text) + finally: + pipe.close() + + def send_bytes(self, data: bytes) -> None: + os.write(self.pipe.write_fd, data) + + def send_text(self, data: str) -> None: + "Send text to the input." + os.write(self.pipe.write_fd, data.encode("utf-8")) + + def raw_mode(self) -> ContextManager[None]: + return DummyContext() + + def cooked_mode(self) -> ContextManager[None]: + return DummyContext() + + def close(self) -> None: + "Close pipe fds." + # Only close the write-end of the pipe. This will unblock the reader + # callback (in vt100.py > _attached_input), which eventually will raise + # `EOFError`. If we'd also close the read-end, then the event loop + # won't wake up the corresponding callback because of this. + self.pipe.close_write() + + def typeahead_hash(self) -> str: + """ + This needs to be unique for every `PipeInput`. + """ + return f"pipe-input-{self._id}" diff --git a/src/prompt_toolkit/input/posix_utils.py b/src/prompt_toolkit/input/posix_utils.py new file mode 100644 index 0000000..7cf31ee --- /dev/null +++ b/src/prompt_toolkit/input/posix_utils.py @@ -0,0 +1,95 @@ +import os +import select +from codecs import getincrementaldecoder + +__all__ = [ + "PosixStdinReader", +] + + +class PosixStdinReader: + """ + Wrapper around stdin which reads (nonblocking) the next available 1024 + bytes and decodes it. + + Note that you can't be sure that the input file is closed if the ``read`` + function returns an empty string. When ``errors=ignore`` is passed, + ``read`` can return an empty string if all malformed input was replaced by + an empty string. (We can't block here and wait for more input.) So, because + of that, check the ``closed`` attribute, to be sure that the file has been + closed. + + :param stdin_fd: File descriptor from which we read. + :param errors: Can be 'ignore', 'strict' or 'replace'. + On Python3, this can be 'surrogateescape', which is the default. + + 'surrogateescape' is preferred, because this allows us to transfer + unrecognised bytes to the key bindings. Some terminals, like lxterminal + and Guake, use the 'Mxx' notation to send mouse events, where each 'x' + can be any possible byte. + """ + + # By default, we want to 'ignore' errors here. The input stream can be full + # of junk. One occurrence of this that I had was when using iTerm2 on OS X, + # with "Option as Meta" checked (You should choose "Option as +Esc".) + + def __init__( + self, stdin_fd: int, errors: str = "surrogateescape", encoding: str = "utf-8" + ) -> None: + self.stdin_fd = stdin_fd + self.errors = errors + + # Create incremental decoder for decoding stdin. + # We can not just do `os.read(stdin.fileno(), 1024).decode('utf-8')`, because + # it could be that we are in the middle of a utf-8 byte sequence. + self._stdin_decoder_cls = getincrementaldecoder(encoding) + self._stdin_decoder = self._stdin_decoder_cls(errors=errors) + + #: True when there is nothing anymore to read. + self.closed = False + + def read(self, count: int = 1024) -> str: + # By default we choose a rather small chunk size, because reading + # big amounts of input at once, causes the event loop to process + # all these key bindings also at once without going back to the + # loop. This will make the application feel unresponsive. + """ + Read the input and return it as a string. + + Return the text. Note that this can return an empty string, even when + the input stream was not yet closed. This means that something went + wrong during the decoding. + """ + if self.closed: + return "" + + # Check whether there is some input to read. `os.read` would block + # otherwise. + # (Actually, the event loop is responsible to make sure that this + # function is only called when there is something to read, but for some + # reason this happens in certain situations.) + try: + if not select.select([self.stdin_fd], [], [], 0)[0]: + return "" + except OSError: + # Happens for instance when the file descriptor was closed. + # (We had this in ptterm, where the FD became ready, a callback was + # scheduled, but in the meantime another callback closed it already.) + self.closed = True + + # Note: the following works better than wrapping `self.stdin` like + # `codecs.getreader('utf-8')(stdin)` and doing `read(1)`. + # Somehow that causes some latency when the escape + # character is pressed. (Especially on combination with the `select`.) + try: + data = os.read(self.stdin_fd, count) + + # Nothing more to read, stream is closed. + if data == b"": + self.closed = True + return "" + except OSError: + # In case of SIGWINCH + data = b"" + + return self._stdin_decoder.decode(data) diff --git a/src/prompt_toolkit/input/typeahead.py b/src/prompt_toolkit/input/typeahead.py new file mode 100644 index 0000000..a3d7866 --- /dev/null +++ b/src/prompt_toolkit/input/typeahead.py @@ -0,0 +1,76 @@ +r""" +Store input key strokes if we did read more than was required. + +The input classes `Vt100Input` and `Win32Input` read the input text in chunks +of a few kilobytes. This means that if we read input from stdin, it could be +that we read a couple of lines (with newlines in between) at once. + +This creates a problem: potentially, we read too much from stdin. Sometimes +people paste several lines at once because they paste input in a REPL and +expect each input() call to process one line. Or they rely on type ahead +because the application can't keep up with the processing. + +However, we need to read input in bigger chunks. We need this mostly to support +pasting of larger chunks of text. We don't want everything to become +unresponsive because we: + - read one character; + - parse one character; + - call the key binding, which does a string operation with one character; + - and render the user interface. +Doing text operations on single characters is very inefficient in Python, so we +prefer to work on bigger chunks of text. This is why we have to read the input +in bigger chunks. + +Further, line buffering is also not an option, because it doesn't work well in +the architecture. We use lower level Posix APIs, that work better with the +event loop and so on. In fact, there is also nothing that defines that only \n +can accept the input, you could create a key binding for any key to accept the +input. + +To support type ahead, this module will store all the key strokes that were +read too early, so that they can be feed into to the next `prompt()` call or to +the next prompt_toolkit `Application`. +""" +from collections import defaultdict +from typing import Dict, List + +from ..key_binding import KeyPress +from .base import Input + +__all__ = [ + "store_typeahead", + "get_typeahead", + "clear_typeahead", +] + +_buffer: Dict[str, List[KeyPress]] = defaultdict(list) + + +def store_typeahead(input_obj: Input, key_presses: List[KeyPress]) -> None: + """ + Insert typeahead key presses for the given input. + """ + global _buffer + key = input_obj.typeahead_hash() + _buffer[key].extend(key_presses) + + +def get_typeahead(input_obj: Input) -> List[KeyPress]: + """ + Retrieve typeahead and reset the buffer for this input. + """ + global _buffer + + key = input_obj.typeahead_hash() + result = _buffer[key] + _buffer[key] = [] + return result + + +def clear_typeahead(input_obj: Input) -> None: + """ + Clear typeahead buffer. + """ + global _buffer + key = input_obj.typeahead_hash() + _buffer[key] = [] diff --git a/src/prompt_toolkit/input/vt100.py b/src/prompt_toolkit/input/vt100.py new file mode 100644 index 0000000..45ce372 --- /dev/null +++ b/src/prompt_toolkit/input/vt100.py @@ -0,0 +1,320 @@ +import sys + +assert sys.platform != "win32" + +import contextlib +import io +import termios +import tty +from asyncio import AbstractEventLoop +from typing import ( + Callable, + ContextManager, + Dict, + Generator, + List, + Optional, + Set, + TextIO, + Tuple, + Union, +) + +from prompt_toolkit.eventloop import get_event_loop + +from ..key_binding import KeyPress +from .base import Input +from .posix_utils import PosixStdinReader +from .vt100_parser import Vt100Parser + +__all__ = [ + "Vt100Input", + "raw_mode", + "cooked_mode", +] + + +class Vt100Input(Input): + """ + Vt100 input for Posix systems. + (This uses a posix file descriptor that can be registered in the event loop.) + """ + + # For the error messages. Only display "Input is not a terminal" once per + # file descriptor. + _fds_not_a_terminal: Set[int] = set() + + def __init__(self, stdin: TextIO) -> None: + # Test whether the given input object has a file descriptor. + # (Idle reports stdin to be a TTY, but fileno() is not implemented.) + try: + # This should not raise, but can return 0. + stdin.fileno() + except io.UnsupportedOperation as e: + if "idlelib.run" in sys.modules: + raise io.UnsupportedOperation( + "Stdin is not a terminal. Running from Idle is not supported." + ) from e + else: + raise io.UnsupportedOperation("Stdin is not a terminal.") from e + + # Even when we have a file descriptor, it doesn't mean it's a TTY. + # Normally, this requires a real TTY device, but people instantiate + # this class often during unit tests as well. They use for instance + # pexpect to pipe data into an application. For convenience, we print + # an error message and go on. + isatty = stdin.isatty() + fd = stdin.fileno() + + if not isatty and fd not in Vt100Input._fds_not_a_terminal: + msg = "Warning: Input is not a terminal (fd=%r).\n" + sys.stderr.write(msg % fd) + sys.stderr.flush() + Vt100Input._fds_not_a_terminal.add(fd) + + # + self.stdin = stdin + + # Create a backup of the fileno(). We want this to work even if the + # underlying file is closed, so that `typeahead_hash()` keeps working. + self._fileno = stdin.fileno() + + self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects. + self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding) + self.vt100_parser = Vt100Parser( + lambda key_press: self._buffer.append(key_press) + ) + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + return _attached_input(self, input_ready_callback) + + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + return _detached_input(self) + + def read_keys(self) -> List[KeyPress]: + "Read list of KeyPress." + # Read text from stdin. + data = self.stdin_reader.read() + + # Pass it through our vt100 parser. + self.vt100_parser.feed(data) + + # Return result. + result = self._buffer + self._buffer = [] + return result + + def flush_keys(self) -> List[KeyPress]: + """ + Flush pending keys and return them. + (Used for flushing the 'escape' key.) + """ + # Flush all pending keys. (This is most important to flush the vt100 + # 'Escape' key early when nothing else follows.) + self.vt100_parser.flush() + + # Return result. + result = self._buffer + self._buffer = [] + return result + + @property + def closed(self) -> bool: + return self.stdin_reader.closed + + def raw_mode(self) -> ContextManager[None]: + return raw_mode(self.stdin.fileno()) + + def cooked_mode(self) -> ContextManager[None]: + return cooked_mode(self.stdin.fileno()) + + def fileno(self) -> int: + return self.stdin.fileno() + + def typeahead_hash(self) -> str: + return f"fd-{self._fileno}" + + +_current_callbacks: Dict[ + Tuple[AbstractEventLoop, int], Optional[Callable[[], None]] +] = {} # (loop, fd) -> current callback + + +@contextlib.contextmanager +def _attached_input( + input: Vt100Input, callback: Callable[[], None] +) -> Generator[None, None, None]: + """ + Context manager that makes this input active in the current event loop. + + :param input: :class:`~prompt_toolkit.input.Input` object. + :param callback: Called when the input is ready to read. + """ + loop = get_event_loop() + fd = input.fileno() + previous = _current_callbacks.get((loop, fd)) + + def callback_wrapper() -> None: + """Wrapper around the callback that already removes the reader when + the input is closed. Otherwise, we keep continuously calling this + callback, until we leave the context manager (which can happen a bit + later). This fixes issues when piping /dev/null into a prompt_toolkit + application.""" + if input.closed: + loop.remove_reader(fd) + callback() + + try: + loop.add_reader(fd, callback_wrapper) + except PermissionError: + # For `EPollSelector`, adding /dev/null to the event loop will raise + # `PermisisonError` (that doesn't happen for `SelectSelector` + # apparently). Whenever we get a `PermissionError`, we can raise + # `EOFError`, because there's not more to be read anyway. `EOFError` is + # an exception that people expect in + # `prompt_toolkit.application.Application.run()`. + # To reproduce, do: `ptpython 0< /dev/null 1< /dev/null` + raise EOFError + + _current_callbacks[loop, fd] = callback + + try: + yield + finally: + loop.remove_reader(fd) + + if previous: + loop.add_reader(fd, previous) + _current_callbacks[loop, fd] = previous + else: + del _current_callbacks[loop, fd] + + +@contextlib.contextmanager +def _detached_input(input: Vt100Input) -> Generator[None, None, None]: + loop = get_event_loop() + fd = input.fileno() + previous = _current_callbacks.get((loop, fd)) + + if previous: + loop.remove_reader(fd) + _current_callbacks[loop, fd] = None + + try: + yield + finally: + if previous: + loop.add_reader(fd, previous) + _current_callbacks[loop, fd] = previous + + +class raw_mode: + """ + :: + + with raw_mode(stdin): + ''' the pseudo-terminal stdin is now used in raw mode ''' + + We ignore errors when executing `tcgetattr` fails. + """ + + # There are several reasons for ignoring errors: + # 1. To avoid the "Inappropriate ioctl for device" crash if somebody would + # execute this code (In a Python REPL, for instance): + # + # import os; f = open(os.devnull); os.dup2(f.fileno(), 0) + # + # The result is that the eventloop will stop correctly, because it has + # to logic to quit when stdin is closed. However, we should not fail at + # this point. See: + # https://github.com/jonathanslenders/python-prompt-toolkit/pull/393 + # https://github.com/jonathanslenders/python-prompt-toolkit/issues/392 + + # 2. Related, when stdin is an SSH pipe, and no full terminal was allocated. + # See: https://github.com/jonathanslenders/python-prompt-toolkit/pull/165 + def __init__(self, fileno: int) -> None: + self.fileno = fileno + self.attrs_before: Optional[List[Union[int, List[Union[bytes, int]]]]] + try: + self.attrs_before = termios.tcgetattr(fileno) + except termios.error: + # Ignore attribute errors. + self.attrs_before = None + + def __enter__(self) -> None: + # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this: + try: + newattr = termios.tcgetattr(self.fileno) + except termios.error: + pass + else: + newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG]) + newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG]) + + # VMIN defines the number of characters read at a time in + # non-canonical mode. It seems to default to 1 on Linux, but on + # Solaris and derived operating systems it defaults to 4. (This is + # because the VMIN slot is the same as the VEOF slot, which + # defaults to ASCII EOT = Ctrl-D = 4.) + newattr[tty.CC][termios.VMIN] = 1 + + termios.tcsetattr(self.fileno, termios.TCSANOW, newattr) + + @classmethod + def _patch_lflag(cls, attrs: int) -> int: + return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) + + @classmethod + def _patch_iflag(cls, attrs: int) -> int: + return attrs & ~( + # Disable XON/XOFF flow control on output and input. + # (Don't capture Ctrl-S and Ctrl-Q.) + # Like executing: "stty -ixon." + termios.IXON + | termios.IXOFF + | + # Don't translate carriage return into newline on input. + termios.ICRNL + | termios.INLCR + | termios.IGNCR + ) + + def __exit__(self, *a: object) -> None: + if self.attrs_before is not None: + try: + termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before) + except termios.error: + pass + + # # Put the terminal in application mode. + # self._stdout.write('\x1b[?1h') + + +class cooked_mode(raw_mode): + """ + The opposite of ``raw_mode``, used when we need cooked mode inside a + `raw_mode` block. Used in `Application.run_in_terminal`.:: + + with cooked_mode(stdin): + ''' the pseudo-terminal stdin is now used in cooked mode. ''' + """ + + @classmethod + def _patch_lflag(cls, attrs: int) -> int: + return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) + + @classmethod + def _patch_iflag(cls, attrs: int) -> int: + # Turn the ICRNL flag back on. (Without this, calling `input()` in + # run_in_terminal doesn't work and displays ^M instead. Ptpython + # evaluates commands using `run_in_terminal`, so it's important that + # they translate ^M back into ^J.) + return attrs | termios.ICRNL diff --git a/src/prompt_toolkit/input/vt100_parser.py b/src/prompt_toolkit/input/vt100_parser.py new file mode 100644 index 0000000..3ee1e14 --- /dev/null +++ b/src/prompt_toolkit/input/vt100_parser.py @@ -0,0 +1,247 @@ +""" +Parser for VT100 input stream. +""" +import re +from typing import Callable, Dict, Generator, Tuple, Union + +from ..key_binding.key_processor import KeyPress +from ..keys import Keys +from .ansi_escape_sequences import ANSI_SEQUENCES + +__all__ = [ + "Vt100Parser", +] + + +# Regex matching any CPR response +# (Note that we use '\Z' instead of '$', because '$' could include a trailing +# newline.) +_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") + +# Mouse events: +# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M" +_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") + +# Regex matching any valid prefix of a CPR response. +# (Note that it doesn't contain the last character, the 'R'. The prefix has to +# be shorter.) +_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") + +_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") + + +class _Flush: + """Helper object to indicate flush operation to the parser.""" + + pass + + +class _IsPrefixOfLongerMatchCache(Dict[str, bool]): + """ + Dictionary that maps input sequences to a boolean indicating whether there is + any key that start with this characters. + """ + + def __missing__(self, prefix: str) -> bool: + # (hard coded) If this could be a prefix of a CPR response, return + # True. + if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( + prefix + ): + result = True + else: + # If this could be a prefix of anything else, also return True. + result = any( + v + for k, v in ANSI_SEQUENCES.items() + if k.startswith(prefix) and k != prefix + ) + + self[prefix] = result + return result + + +_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() + + +class Vt100Parser: + """ + Parser for VT100 input stream. + Data can be fed through the `feed` method and the given callback will be + called with KeyPress objects. + + :: + + def callback(key): + pass + i = Vt100Parser(callback) + i.feed('data\x01...') + + :attr feed_key_callback: Function that will be called when a key is parsed. + """ + + # Lookup table of ANSI escape sequences for a VT100 terminal + # Hint: in order to know what sequences your terminal writes to stdin, run + # "od -c" and start typing. + def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: + self.feed_key_callback = feed_key_callback + self.reset() + + def reset(self, request: bool = False) -> None: + self._in_bracketed_paste = False + self._start_parser() + + def _start_parser(self) -> None: + """ + Start the parser coroutine. + """ + self._input_parser = self._input_parser_generator() + self._input_parser.send(None) # type: ignore + + def _get_match(self, prefix: str) -> Union[None, Keys, Tuple[Keys, ...]]: + """ + Return the key (or keys) that maps to this prefix. + """ + # (hard coded) If we match a CPR response, return Keys.CPRResponse. + # (This one doesn't fit in the ANSI_SEQUENCES, because it contains + # integer variables.) + if _cpr_response_re.match(prefix): + return Keys.CPRResponse + + elif _mouse_event_re.match(prefix): + return Keys.Vt100MouseEvent + + # Otherwise, use the mappings. + try: + return ANSI_SEQUENCES[prefix] + except KeyError: + return None + + def _input_parser_generator(self) -> Generator[None, Union[str, _Flush], None]: + """ + Coroutine (state machine) for the input parser. + """ + prefix = "" + retry = False + flush = False + + while True: + flush = False + + if retry: + retry = False + else: + # Get next character. + c = yield + + if isinstance(c, _Flush): + flush = True + else: + prefix += c + + # If we have some data, check for matches. + if prefix: + is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] + match = self._get_match(prefix) + + # Exact matches found, call handlers.. + if (flush or not is_prefix_of_longer_match) and match: + self._call_handler(match, prefix) + prefix = "" + + # No exact match found. + elif (flush or not is_prefix_of_longer_match) and not match: + found = False + retry = True + + # Loop over the input, try the longest match first and + # shift. + for i in range(len(prefix), 0, -1): + match = self._get_match(prefix[:i]) + if match: + self._call_handler(match, prefix[:i]) + prefix = prefix[i:] + found = True + + if not found: + self._call_handler(prefix[0], prefix[0]) + prefix = prefix[1:] + + def _call_handler( + self, key: Union[str, Keys, Tuple[Keys, ...]], insert_text: str + ) -> None: + """ + Callback to handler. + """ + if isinstance(key, tuple): + # Received ANSI sequence that corresponds with multiple keys + # (probably alt+something). Handle keys individually, but only pass + # data payload to first KeyPress (so that we won't insert it + # multiple times). + for i, k in enumerate(key): + self._call_handler(k, insert_text if i == 0 else "") + else: + if key == Keys.BracketedPaste: + self._in_bracketed_paste = True + self._paste_buffer = "" + else: + self.feed_key_callback(KeyPress(key, insert_text)) + + def feed(self, data: str) -> None: + """ + Feed the input stream. + + :param data: Input string (unicode). + """ + # Handle bracketed paste. (We bypass the parser that matches all other + # key presses and keep reading input until we see the end mark.) + # This is much faster then parsing character by character. + if self._in_bracketed_paste: + self._paste_buffer += data + end_mark = "\x1b[201~" + + if end_mark in self._paste_buffer: + end_index = self._paste_buffer.index(end_mark) + + # Feed content to key bindings. + paste_content = self._paste_buffer[:end_index] + self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) + + # Quit bracketed paste mode and handle remaining input. + self._in_bracketed_paste = False + remaining = self._paste_buffer[end_index + len(end_mark) :] + self._paste_buffer = "" + + self.feed(remaining) + + # Handle normal input character by character. + else: + for i, c in enumerate(data): + if self._in_bracketed_paste: + # Quit loop and process from this position when the parser + # entered bracketed paste. + self.feed(data[i:]) + break + else: + self._input_parser.send(c) + + def flush(self) -> None: + """ + Flush the buffer of the input stream. + + This will allow us to handle the escape key (or maybe meta) sooner. + The input received by the escape key is actually the same as the first + characters of e.g. Arrow-Up, so without knowing what follows the escape + sequence, we don't know whether escape has been pressed, or whether + it's something else. This flush function should be called after a + timeout, and processes everything that's still in the buffer as-is, so + without assuming any characters will follow. + """ + self._input_parser.send(_Flush()) + + def feed_and_flush(self, data: str) -> None: + """ + Wrapper around ``feed`` and ``flush``. + """ + self.feed(data) + self.flush() diff --git a/src/prompt_toolkit/input/win32.py b/src/prompt_toolkit/input/win32.py new file mode 100644 index 0000000..db3fa2b --- /dev/null +++ b/src/prompt_toolkit/input/win32.py @@ -0,0 +1,757 @@ +import os +import sys +from abc import abstractmethod +from contextlib import contextmanager + +from prompt_toolkit.eventloop import get_event_loop + +from ..utils import SPHINX_AUTODOC_RUNNING + +assert sys.platform == "win32" + +# Do not import win32-specific stuff when generating documentation. +# Otherwise RTD would be unable to generate docs for this module. +if not SPHINX_AUTODOC_RUNNING: + import msvcrt + from ctypes import windll + +from ctypes import Array, pointer +from ctypes.wintypes import DWORD, HANDLE +from typing import ( + Callable, + ContextManager, + Dict, + Iterable, + Iterator, + List, + Optional, + TextIO, +) + +from prompt_toolkit.eventloop import run_in_executor_with_context +from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles +from prompt_toolkit.key_binding.key_processor import KeyPress +from prompt_toolkit.keys import Keys +from prompt_toolkit.mouse_events import MouseButton, MouseEventType +from prompt_toolkit.win32_types import ( + INPUT_RECORD, + KEY_EVENT_RECORD, + MOUSE_EVENT_RECORD, + STD_INPUT_HANDLE, + EventTypes, +) + +from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES +from .base import Input + +__all__ = [ + "Win32Input", + "ConsoleInputReader", + "raw_mode", + "cooked_mode", + "attach_win32_input", + "detach_win32_input", +] + +# Win32 Constants for MOUSE_EVENT_RECORD. +# See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str +FROM_LEFT_1ST_BUTTON_PRESSED = 0x1 +RIGHTMOST_BUTTON_PRESSED = 0x2 +MOUSE_MOVED = 0x0001 +MOUSE_WHEELED = 0x0004 + + +class _Win32InputBase(Input): + """ + Base class for `Win32Input` and `Win32PipeInput`. + """ + + def __init__(self) -> None: + self.win32_handles = _Win32Handles() + + @property + @abstractmethod + def handle(self) -> HANDLE: + pass + + +class Win32Input(_Win32InputBase): + """ + `Input` class that reads from the Windows console. + """ + + def __init__(self, stdin: Optional[TextIO] = None) -> None: + super().__init__() + self.console_input_reader = ConsoleInputReader() + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + return attach_win32_input(self, input_ready_callback) + + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + return detach_win32_input(self) + + def read_keys(self) -> List[KeyPress]: + return list(self.console_input_reader.read()) + + def flush(self) -> None: + pass + + @property + def closed(self) -> bool: + return False + + def raw_mode(self) -> ContextManager[None]: + return raw_mode() + + def cooked_mode(self) -> ContextManager[None]: + return cooked_mode() + + def fileno(self) -> int: + # The windows console doesn't depend on the file handle, so + # this is not used for the event loop (which uses the + # handle instead). But it's used in `Application.run_system_command` + # which opens a subprocess with a given stdin/stdout. + return sys.stdin.fileno() + + def typeahead_hash(self) -> str: + return "win32-input" + + def close(self) -> None: + self.console_input_reader.close() + + @property + def handle(self) -> HANDLE: + return self.console_input_reader.handle + + +class ConsoleInputReader: + """ + :param recognize_paste: When True, try to discover paste actions and turn + the event into a BracketedPaste. + """ + + # Keys with character data. + mappings = { + b"\x1b": Keys.Escape, + b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@) + b"\x01": Keys.ControlA, # Control-A (home) + b"\x02": Keys.ControlB, # Control-B (emacs cursor left) + b"\x03": Keys.ControlC, # Control-C (interrupt) + b"\x04": Keys.ControlD, # Control-D (exit) + b"\x05": Keys.ControlE, # Control-E (end) + b"\x06": Keys.ControlF, # Control-F (cursor forward) + b"\x07": Keys.ControlG, # Control-G + b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') + b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') + b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') + b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) + b"\x0c": Keys.ControlL, # Control-L (clear; form feed) + b"\x0d": Keys.ControlM, # Control-M (enter) + b"\x0e": Keys.ControlN, # Control-N (14) (history forward) + b"\x0f": Keys.ControlO, # Control-O (15) + b"\x10": Keys.ControlP, # Control-P (16) (history back) + b"\x11": Keys.ControlQ, # Control-Q + b"\x12": Keys.ControlR, # Control-R (18) (reverse search) + b"\x13": Keys.ControlS, # Control-S (19) (forward search) + b"\x14": Keys.ControlT, # Control-T + b"\x15": Keys.ControlU, # Control-U + b"\x16": Keys.ControlV, # Control-V + b"\x17": Keys.ControlW, # Control-W + b"\x18": Keys.ControlX, # Control-X + b"\x19": Keys.ControlY, # Control-Y (25) + b"\x1a": Keys.ControlZ, # Control-Z + b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-| + b"\x1d": Keys.ControlSquareClose, # Control-] + b"\x1e": Keys.ControlCircumflex, # Control-^ + b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) + b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.) + } + + # Keys that don't carry character data. + keycodes = { + # Home/End + 33: Keys.PageUp, + 34: Keys.PageDown, + 35: Keys.End, + 36: Keys.Home, + # Arrows + 37: Keys.Left, + 38: Keys.Up, + 39: Keys.Right, + 40: Keys.Down, + 45: Keys.Insert, + 46: Keys.Delete, + # F-keys. + 112: Keys.F1, + 113: Keys.F2, + 114: Keys.F3, + 115: Keys.F4, + 116: Keys.F5, + 117: Keys.F6, + 118: Keys.F7, + 119: Keys.F8, + 120: Keys.F9, + 121: Keys.F10, + 122: Keys.F11, + 123: Keys.F12, + } + + LEFT_ALT_PRESSED = 0x0002 + RIGHT_ALT_PRESSED = 0x0001 + SHIFT_PRESSED = 0x0010 + LEFT_CTRL_PRESSED = 0x0008 + RIGHT_CTRL_PRESSED = 0x0004 + + def __init__(self, recognize_paste: bool = True) -> None: + self._fdcon = None + self.recognize_paste = recognize_paste + + # When stdin is a tty, use that handle, otherwise, create a handle from + # CONIN$. + self.handle: HANDLE + if sys.stdin.isatty(): + self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) + else: + self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY) + self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon)) + + def close(self) -> None: + "Close fdcon." + if self._fdcon is not None: + os.close(self._fdcon) + + def read(self) -> Iterable[KeyPress]: + """ + Return a list of `KeyPress` instances. It won't return anything when + there was nothing to read. (This function doesn't block.) + + http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx + """ + max_count = 2048 # Max events to read at the same time. + + read = DWORD(0) + arrtype = INPUT_RECORD * max_count + input_records = arrtype() + + # Check whether there is some input to read. `ReadConsoleInputW` would + # block otherwise. + # (Actually, the event loop is responsible to make sure that this + # function is only called when there is something to read, but for some + # reason this happened in the asyncio_win32 loop, and it's better to be + # safe anyway.) + if not wait_for_handles([self.handle], timeout=0): + return + + # Get next batch of input event. + windll.kernel32.ReadConsoleInputW( + self.handle, pointer(input_records), max_count, pointer(read) + ) + + # First, get all the keys from the input buffer, in order to determine + # whether we should consider this a paste event or not. + all_keys = list(self._get_keys(read, input_records)) + + # Fill in 'data' for key presses. + all_keys = [self._insert_key_data(key) for key in all_keys] + + # Correct non-bmp characters that are passed as separate surrogate codes + all_keys = list(self._merge_paired_surrogates(all_keys)) + + if self.recognize_paste and self._is_paste(all_keys): + gen = iter(all_keys) + k: Optional[KeyPress] + + for k in gen: + # Pasting: if the current key consists of text or \n, turn it + # into a BracketedPaste. + data = [] + while k and ( + not isinstance(k.key, Keys) + or k.key in {Keys.ControlJ, Keys.ControlM} + ): + data.append(k.data) + try: + k = next(gen) + except StopIteration: + k = None + + if data: + yield KeyPress(Keys.BracketedPaste, "".join(data)) + if k is not None: + yield k + else: + yield from all_keys + + def _insert_key_data(self, key_press: KeyPress) -> KeyPress: + """ + Insert KeyPress data, for vt100 compatibility. + """ + if key_press.data: + return key_press + + if isinstance(key_press.key, Keys): + data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "") + else: + data = "" + + return KeyPress(key_press.key, data) + + def _get_keys( + self, read: DWORD, input_records: "Array[INPUT_RECORD]" + ) -> Iterator[KeyPress]: + """ + Generator that yields `KeyPress` objects from the input records. + """ + for i in range(read.value): + ir = input_records[i] + + # Get the right EventType from the EVENT_RECORD. + # (For some reason the Windows console application 'cmder' + # [http://gooseberrycreative.com/cmder/] can return '0' for + # ir.EventType. -- Just ignore that.) + if ir.EventType in EventTypes: + ev = getattr(ir.Event, EventTypes[ir.EventType]) + + # Process if this is a key event. (We also have mouse, menu and + # focus events.) + if type(ev) == KEY_EVENT_RECORD and ev.KeyDown: + yield from self._event_to_key_presses(ev) + + elif type(ev) == MOUSE_EVENT_RECORD: + yield from self._handle_mouse(ev) + + @staticmethod + def _merge_paired_surrogates(key_presses: List[KeyPress]) -> Iterator[KeyPress]: + """ + Combines consecutive KeyPresses with high and low surrogates into + single characters + """ + buffered_high_surrogate = None + for key in key_presses: + is_text = not isinstance(key.key, Keys) + is_high_surrogate = is_text and "\uD800" <= key.key <= "\uDBFF" + is_low_surrogate = is_text and "\uDC00" <= key.key <= "\uDFFF" + + if buffered_high_surrogate: + if is_low_surrogate: + # convert high surrogate + low surrogate to single character + fullchar = ( + (buffered_high_surrogate.key + key.key) + .encode("utf-16-le", "surrogatepass") + .decode("utf-16-le") + ) + key = KeyPress(fullchar, fullchar) + else: + yield buffered_high_surrogate + buffered_high_surrogate = None + + if is_high_surrogate: + buffered_high_surrogate = key + else: + yield key + + if buffered_high_surrogate: + yield buffered_high_surrogate + + @staticmethod + def _is_paste(keys: List[KeyPress]) -> bool: + """ + Return `True` when we should consider this list of keys as a paste + event. Pasted text on windows will be turned into a + `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably + the best possible way to detect pasting of text and handle that + correctly.) + """ + # Consider paste when it contains at least one newline and at least one + # other character. + text_count = 0 + newline_count = 0 + + for k in keys: + if not isinstance(k.key, Keys): + text_count += 1 + if k.key == Keys.ControlM: + newline_count += 1 + + return newline_count >= 1 and text_count >= 1 + + def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> List[KeyPress]: + """ + For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances. + """ + assert type(ev) == KEY_EVENT_RECORD and ev.KeyDown + + result: Optional[KeyPress] = None + + control_key_state = ev.ControlKeyState + u_char = ev.uChar.UnicodeChar + # Use surrogatepass because u_char may be an unmatched surrogate + ascii_char = u_char.encode("utf-8", "surrogatepass") + + # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the + # unicode code point truncated to 1 byte. See also: + # https://github.com/ipython/ipython/issues/10004 + # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389 + + if u_char == "\x00": + if ev.VirtualKeyCode in self.keycodes: + result = KeyPress(self.keycodes[ev.VirtualKeyCode], "") + else: + if ascii_char in self.mappings: + if self.mappings[ascii_char] == Keys.ControlJ: + u_char = ( + "\n" # Windows sends \n, turn into \r for unix compatibility. + ) + result = KeyPress(self.mappings[ascii_char], u_char) + else: + result = KeyPress(u_char, u_char) + + # First we handle Shift-Control-Arrow/Home/End (need to do this first) + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and control_key_state & self.SHIFT_PRESSED + and result + ): + mapping: Dict[str, str] = { + Keys.Left: Keys.ControlShiftLeft, + Keys.Right: Keys.ControlShiftRight, + Keys.Up: Keys.ControlShiftUp, + Keys.Down: Keys.ControlShiftDown, + Keys.Home: Keys.ControlShiftHome, + Keys.End: Keys.ControlShiftEnd, + Keys.Insert: Keys.ControlShiftInsert, + Keys.PageUp: Keys.ControlShiftPageUp, + Keys.PageDown: Keys.ControlShiftPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys. + if ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) and result: + mapping = { + Keys.Left: Keys.ControlLeft, + Keys.Right: Keys.ControlRight, + Keys.Up: Keys.ControlUp, + Keys.Down: Keys.ControlDown, + Keys.Home: Keys.ControlHome, + Keys.End: Keys.ControlEnd, + Keys.Insert: Keys.ControlInsert, + Keys.Delete: Keys.ControlDelete, + Keys.PageUp: Keys.ControlPageUp, + Keys.PageDown: Keys.ControlPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Turn 'Tab' into 'BackTab' when shift was pressed. + # Also handle other shift-key combination + if control_key_state & self.SHIFT_PRESSED and result: + mapping = { + Keys.Tab: Keys.BackTab, + Keys.Left: Keys.ShiftLeft, + Keys.Right: Keys.ShiftRight, + Keys.Up: Keys.ShiftUp, + Keys.Down: Keys.ShiftDown, + Keys.Home: Keys.ShiftHome, + Keys.End: Keys.ShiftEnd, + Keys.Insert: Keys.ShiftInsert, + Keys.Delete: Keys.ShiftDelete, + Keys.PageUp: Keys.ShiftPageUp, + Keys.PageDown: Keys.ShiftPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Turn 'Space' into 'ControlSpace' when control was pressed. + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and result + and result.data == " " + ): + result = KeyPress(Keys.ControlSpace, " ") + + # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot + # detect this combination. But it's really practical on Windows.) + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and result + and result.key == Keys.ControlJ + ): + return [KeyPress(Keys.Escape, ""), result] + + # Return result. If alt was pressed, prefix the result with an + # 'Escape' key, just like unix VT100 terminals do. + + # NOTE: Only replace the left alt with escape. The right alt key often + # acts as altgr and is used in many non US keyboard layouts for + # typing some special characters, like a backslash. We don't want + # all backslashes to be prefixed with escape. (Esc-\ has a + # meaning in E-macs, for instance.) + if result: + meta_pressed = control_key_state & self.LEFT_ALT_PRESSED + + if meta_pressed: + return [KeyPress(Keys.Escape, ""), result] + else: + return [result] + + else: + return [] + + def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> List[KeyPress]: + """ + Handle mouse events. Return a list of KeyPress instances. + """ + event_flags = ev.EventFlags + button_state = ev.ButtonState + + event_type: Optional[MouseEventType] = None + button: MouseButton = MouseButton.NONE + + # Scroll events. + if event_flags & MOUSE_WHEELED: + if button_state > 0: + event_type = MouseEventType.SCROLL_UP + else: + event_type = MouseEventType.SCROLL_DOWN + else: + # Handle button state for non-scroll events. + if button_state == FROM_LEFT_1ST_BUTTON_PRESSED: + button = MouseButton.LEFT + + elif button_state == RIGHTMOST_BUTTON_PRESSED: + button = MouseButton.RIGHT + + # Move events. + if event_flags & MOUSE_MOVED: + event_type = MouseEventType.MOUSE_MOVE + + # No key pressed anymore: mouse up. + if event_type is None: + if button_state > 0: + # Some button pressed. + event_type = MouseEventType.MOUSE_DOWN + else: + # No button pressed. + event_type = MouseEventType.MOUSE_UP + + data = ";".join( + [ + button.value, + event_type.value, + str(ev.MousePosition.X), + str(ev.MousePosition.Y), + ] + ) + return [KeyPress(Keys.WindowsMouseEvent, data)] + + +class _Win32Handles: + """ + Utility to keep track of which handles are connectod to which callbacks. + + `add_win32_handle` starts a tiny event loop in another thread which waits + for the Win32 handle to become ready. When this happens, the callback will + be called in the current asyncio event loop using `call_soon_threadsafe`. + + `remove_win32_handle` will stop this tiny event loop. + + NOTE: We use this technique, so that we don't have to use the + `ProactorEventLoop` on Windows and we can wait for things like stdin + in a `SelectorEventLoop`. This is important, because our inputhook + mechanism (used by IPython), only works with the `SelectorEventLoop`. + """ + + def __init__(self) -> None: + self._handle_callbacks: Dict[int, Callable[[], None]] = {} + + # Windows Events that are triggered when we have to stop watching this + # handle. + self._remove_events: Dict[int, HANDLE] = {} + + def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None: + """ + Add a Win32 handle to the event loop. + """ + handle_value = handle.value + + if handle_value is None: + raise ValueError("Invalid handle.") + + # Make sure to remove a previous registered handler first. + self.remove_win32_handle(handle) + + loop = get_event_loop() + self._handle_callbacks[handle_value] = callback + + # Create remove event. + remove_event = create_win32_event() + self._remove_events[handle_value] = remove_event + + # Add reader. + def ready() -> None: + # Tell the callback that input's ready. + try: + callback() + finally: + run_in_executor_with_context(wait, loop=loop) + + # Wait for the input to become ready. + # (Use an executor for this, the Windows asyncio event loop doesn't + # allow us to wait for handles like stdin.) + def wait() -> None: + # Wait until either the handle becomes ready, or the remove event + # has been set. + result = wait_for_handles([remove_event, handle]) + + if result is remove_event: + windll.kernel32.CloseHandle(remove_event) + return + else: + loop.call_soon_threadsafe(ready) + + run_in_executor_with_context(wait, loop=loop) + + def remove_win32_handle(self, handle: HANDLE) -> Optional[Callable[[], None]]: + """ + Remove a Win32 handle from the event loop. + Return either the registered handler or `None`. + """ + if handle.value is None: + return None # Ignore. + + # Trigger remove events, so that the reader knows to stop. + try: + event = self._remove_events.pop(handle.value) + except KeyError: + pass + else: + windll.kernel32.SetEvent(event) + + try: + return self._handle_callbacks.pop(handle.value) + except KeyError: + return None + + +@contextmanager +def attach_win32_input( + input: _Win32InputBase, callback: Callable[[], None] +) -> Iterator[None]: + """ + Context manager that makes this input active in the current event loop. + + :param input: :class:`~prompt_toolkit.input.Input` object. + :param input_ready_callback: Called when the input is ready to read. + """ + win32_handles = input.win32_handles + handle = input.handle + + if handle.value is None: + raise ValueError("Invalid handle.") + + # Add reader. + previous_callback = win32_handles.remove_win32_handle(handle) + win32_handles.add_win32_handle(handle, callback) + + try: + yield + finally: + win32_handles.remove_win32_handle(handle) + + if previous_callback: + win32_handles.add_win32_handle(handle, previous_callback) + + +@contextmanager +def detach_win32_input(input: _Win32InputBase) -> Iterator[None]: + win32_handles = input.win32_handles + handle = input.handle + + if handle.value is None: + raise ValueError("Invalid handle.") + + previous_callback = win32_handles.remove_win32_handle(handle) + + try: + yield + finally: + if previous_callback: + win32_handles.add_win32_handle(handle, previous_callback) + + +class raw_mode: + """ + :: + + with raw_mode(stdin): + ''' the windows terminal is now in 'raw' mode. ''' + + The ``fileno`` attribute is ignored. This is to be compatible with the + `raw_input` method of `.vt100_input`. + """ + + def __init__(self, fileno: Optional[int] = None) -> None: + self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) + + def __enter__(self) -> None: + # Remember original mode. + original_mode = DWORD() + windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode)) + self.original_mode = original_mode + + self._patch() + + def _patch(self) -> None: + # Set raw + ENABLE_ECHO_INPUT = 0x0004 + ENABLE_LINE_INPUT = 0x0002 + ENABLE_PROCESSED_INPUT = 0x0001 + + windll.kernel32.SetConsoleMode( + self.handle, + self.original_mode.value + & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), + ) + + def __exit__(self, *a: object) -> None: + # Restore original mode + windll.kernel32.SetConsoleMode(self.handle, self.original_mode) + + +class cooked_mode(raw_mode): + """ + :: + + with cooked_mode(stdin): + ''' The pseudo-terminal stdin is now used in cooked mode. ''' + """ + + def _patch(self) -> None: + # Set cooked. + ENABLE_ECHO_INPUT = 0x0004 + ENABLE_LINE_INPUT = 0x0002 + ENABLE_PROCESSED_INPUT = 0x0001 + + windll.kernel32.SetConsoleMode( + self.handle, + self.original_mode.value + | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), + ) diff --git a/src/prompt_toolkit/input/win32_pipe.py b/src/prompt_toolkit/input/win32_pipe.py new file mode 100644 index 0000000..ebee207 --- /dev/null +++ b/src/prompt_toolkit/input/win32_pipe.py @@ -0,0 +1,154 @@ +import sys + +assert sys.platform == "win32" + +from contextlib import contextmanager +from ctypes import windll +from ctypes.wintypes import HANDLE +from typing import Callable, ContextManager, Iterator, List + +from prompt_toolkit.eventloop.win32 import create_win32_event + +from ..key_binding import KeyPress +from ..utils import DummyContext +from .base import PipeInput +from .vt100_parser import Vt100Parser +from .win32 import _Win32InputBase, attach_win32_input, detach_win32_input + +__all__ = ["Win32PipeInput"] + + +class Win32PipeInput(_Win32InputBase, PipeInput): + """ + This is an input pipe that works on Windows. + Text or bytes can be feed into the pipe, and key strokes can be read from + the pipe. This is useful if we want to send the input programmatically into + the application. Mostly useful for unit testing. + + Notice that even though it's Windows, we use vt100 escape sequences over + the pipe. + + Usage:: + + input = Win32PipeInput() + input.send_text('inputdata') + """ + + _id = 0 + + def __init__(self, _event: HANDLE) -> None: + super().__init__() + # Event (handle) for registering this input in the event loop. + # This event is set when there is data available to read from the pipe. + # Note: We use this approach instead of using a regular pipe, like + # returned from `os.pipe()`, because making such a regular pipe + # non-blocking is tricky and this works really well. + self._event = create_win32_event() + + self._closed = False + + # Parser for incoming keys. + self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects. + self.vt100_parser = Vt100Parser(lambda key: self._buffer.append(key)) + + # Identifier for every PipeInput for the hash. + self.__class__._id += 1 + self._id = self.__class__._id + + @classmethod + @contextmanager + def create(cls) -> Iterator["Win32PipeInput"]: + event = create_win32_event() + try: + yield Win32PipeInput(_event=event) + finally: + windll.kernel32.CloseHandle(event) + + @property + def closed(self) -> bool: + return self._closed + + def fileno(self) -> int: + """ + The windows pipe doesn't depend on the file handle. + """ + raise NotImplementedError + + @property + def handle(self) -> HANDLE: + "The handle used for registering this pipe in the event loop." + return self._event + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + return attach_win32_input(self, input_ready_callback) + + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + return detach_win32_input(self) + + def read_keys(self) -> List[KeyPress]: + "Read list of KeyPress." + + # Return result. + result = self._buffer + self._buffer = [] + + # Reset event. + if not self._closed: + # (If closed, the event should not reset.) + windll.kernel32.ResetEvent(self._event) + + return result + + def flush_keys(self) -> List[KeyPress]: + """ + Flush pending keys and return them. + (Used for flushing the 'escape' key.) + """ + # Flush all pending keys. (This is most important to flush the vt100 + # 'Escape' key early when nothing else follows.) + self.vt100_parser.flush() + + # Return result. + result = self._buffer + self._buffer = [] + return result + + def send_bytes(self, data: bytes) -> None: + "Send bytes to the input." + self.send_text(data.decode("utf-8", "ignore")) + + def send_text(self, text: str) -> None: + "Send text to the input." + if self._closed: + raise ValueError("Attempt to write into a closed pipe.") + + # Pass it through our vt100 parser. + self.vt100_parser.feed(text) + + # Set event. + windll.kernel32.SetEvent(self._event) + + def raw_mode(self) -> ContextManager[None]: + return DummyContext() + + def cooked_mode(self) -> ContextManager[None]: + return DummyContext() + + def close(self) -> None: + "Close write-end of the pipe." + self._closed = True + windll.kernel32.SetEvent(self._event) + + def typeahead_hash(self) -> str: + """ + This needs to be unique for every `PipeInput`. + """ + return f"pipe-input-{self._id}" |