summaryrefslogtreecommitdiffstats
path: root/src/prompt_toolkit/input
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/prompt_toolkit/input/__init__.py12
-rw-r--r--src/prompt_toolkit/input/ansi_escape_sequences.py343
-rw-r--r--src/prompt_toolkit/input/base.py150
-rw-r--r--src/prompt_toolkit/input/defaults.py79
-rw-r--r--src/prompt_toolkit/input/posix_pipe.py116
-rw-r--r--src/prompt_toolkit/input/posix_utils.py95
-rw-r--r--src/prompt_toolkit/input/typeahead.py76
-rw-r--r--src/prompt_toolkit/input/vt100.py320
-rw-r--r--src/prompt_toolkit/input/vt100_parser.py247
-rw-r--r--src/prompt_toolkit/input/win32.py757
-rw-r--r--src/prompt_toolkit/input/win32_pipe.py154
11 files changed, 2349 insertions, 0 deletions
diff --git a/src/prompt_toolkit/input/__init__.py b/src/prompt_toolkit/input/__init__.py
new file mode 100644
index 0000000..dc31976
--- /dev/null
+++ b/src/prompt_toolkit/input/__init__.py
@@ -0,0 +1,12 @@
+from .base import DummyInput, Input, PipeInput
+from .defaults import create_input, create_pipe_input
+
+__all__ = [
+ # Base.
+ "Input",
+ "PipeInput",
+ "DummyInput",
+ # Defaults.
+ "create_input",
+ "create_pipe_input",
+]
diff --git a/src/prompt_toolkit/input/ansi_escape_sequences.py b/src/prompt_toolkit/input/ansi_escape_sequences.py
new file mode 100644
index 0000000..2e6c5b9
--- /dev/null
+++ b/src/prompt_toolkit/input/ansi_escape_sequences.py
@@ -0,0 +1,343 @@
+"""
+Mappings from VT100 (ANSI) escape sequences to the corresponding prompt_toolkit
+keys.
+
+We are not using the terminfo/termcap databases to detect the ANSI escape
+sequences for the input. Instead, we recognize 99% of the most common
+sequences. This works well, because in practice, every modern terminal is
+mostly Xterm compatible.
+
+Some useful docs:
+- Mintty: https://github.com/mintty/mintty/blob/master/wiki/Keycodes.md
+"""
+from typing import Dict, Tuple, Union
+
+from ..keys import Keys
+
+__all__ = [
+ "ANSI_SEQUENCES",
+ "REVERSE_ANSI_SEQUENCES",
+]
+
+# Mapping of vt100 escape codes to Keys.
+ANSI_SEQUENCES: Dict[str, Union[Keys, Tuple[Keys, ...]]] = {
+ # Control keys.
+ "\x00": Keys.ControlAt, # Control-At (Also for Ctrl-Space)
+ "\x01": Keys.ControlA, # Control-A (home)
+ "\x02": Keys.ControlB, # Control-B (emacs cursor left)
+ "\x03": Keys.ControlC, # Control-C (interrupt)
+ "\x04": Keys.ControlD, # Control-D (exit)
+ "\x05": Keys.ControlE, # Control-E (end)
+ "\x06": Keys.ControlF, # Control-F (cursor forward)
+ "\x07": Keys.ControlG, # Control-G
+ "\x08": Keys.ControlH, # Control-H (8) (Identical to '\b')
+ "\x09": Keys.ControlI, # Control-I (9) (Identical to '\t')
+ "\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n')
+ "\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab)
+ "\x0c": Keys.ControlL, # Control-L (clear; form feed)
+ "\x0d": Keys.ControlM, # Control-M (13) (Identical to '\r')
+ "\x0e": Keys.ControlN, # Control-N (14) (history forward)
+ "\x0f": Keys.ControlO, # Control-O (15)
+ "\x10": Keys.ControlP, # Control-P (16) (history back)
+ "\x11": Keys.ControlQ, # Control-Q
+ "\x12": Keys.ControlR, # Control-R (18) (reverse search)
+ "\x13": Keys.ControlS, # Control-S (19) (forward search)
+ "\x14": Keys.ControlT, # Control-T
+ "\x15": Keys.ControlU, # Control-U
+ "\x16": Keys.ControlV, # Control-V
+ "\x17": Keys.ControlW, # Control-W
+ "\x18": Keys.ControlX, # Control-X
+ "\x19": Keys.ControlY, # Control-Y (25)
+ "\x1a": Keys.ControlZ, # Control-Z
+ "\x1b": Keys.Escape, # Also Control-[
+ "\x9b": Keys.ShiftEscape,
+ "\x1c": Keys.ControlBackslash, # Both Control-\ (also Ctrl-| )
+ "\x1d": Keys.ControlSquareClose, # Control-]
+ "\x1e": Keys.ControlCircumflex, # Control-^
+ "\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.)
+ # ASCII Delete (0x7f)
+ # Vt220 (and Linux terminal) send this when pressing backspace. We map this
+ # to ControlH, because that will make it easier to create key bindings that
+ # work everywhere, with the trade-off that it's no longer possible to
+ # handle backspace and control-h individually for the few terminals that
+ # support it. (Most terminals send ControlH when backspace is pressed.)
+ # See: http://www.ibb.net/~anne/keyboard.html
+ "\x7f": Keys.ControlH,
+ # --
+ # Various
+ "\x1b[1~": Keys.Home, # tmux
+ "\x1b[2~": Keys.Insert,
+ "\x1b[3~": Keys.Delete,
+ "\x1b[4~": Keys.End, # tmux
+ "\x1b[5~": Keys.PageUp,
+ "\x1b[6~": Keys.PageDown,
+ "\x1b[7~": Keys.Home, # xrvt
+ "\x1b[8~": Keys.End, # xrvt
+ "\x1b[Z": Keys.BackTab, # shift + tab
+ "\x1b\x09": Keys.BackTab, # Linux console
+ "\x1b[~": Keys.BackTab, # Windows console
+ # --
+ # Function keys.
+ "\x1bOP": Keys.F1,
+ "\x1bOQ": Keys.F2,
+ "\x1bOR": Keys.F3,
+ "\x1bOS": Keys.F4,
+ "\x1b[[A": Keys.F1, # Linux console.
+ "\x1b[[B": Keys.F2, # Linux console.
+ "\x1b[[C": Keys.F3, # Linux console.
+ "\x1b[[D": Keys.F4, # Linux console.
+ "\x1b[[E": Keys.F5, # Linux console.
+ "\x1b[11~": Keys.F1, # rxvt-unicode
+ "\x1b[12~": Keys.F2, # rxvt-unicode
+ "\x1b[13~": Keys.F3, # rxvt-unicode
+ "\x1b[14~": Keys.F4, # rxvt-unicode
+ "\x1b[15~": Keys.F5,
+ "\x1b[17~": Keys.F6,
+ "\x1b[18~": Keys.F7,
+ "\x1b[19~": Keys.F8,
+ "\x1b[20~": Keys.F9,
+ "\x1b[21~": Keys.F10,
+ "\x1b[23~": Keys.F11,
+ "\x1b[24~": Keys.F12,
+ "\x1b[25~": Keys.F13,
+ "\x1b[26~": Keys.F14,
+ "\x1b[28~": Keys.F15,
+ "\x1b[29~": Keys.F16,
+ "\x1b[31~": Keys.F17,
+ "\x1b[32~": Keys.F18,
+ "\x1b[33~": Keys.F19,
+ "\x1b[34~": Keys.F20,
+ # Xterm
+ "\x1b[1;2P": Keys.F13,
+ "\x1b[1;2Q": Keys.F14,
+ # '\x1b[1;2R': Keys.F15, # Conflicts with CPR response.
+ "\x1b[1;2S": Keys.F16,
+ "\x1b[15;2~": Keys.F17,
+ "\x1b[17;2~": Keys.F18,
+ "\x1b[18;2~": Keys.F19,
+ "\x1b[19;2~": Keys.F20,
+ "\x1b[20;2~": Keys.F21,
+ "\x1b[21;2~": Keys.F22,
+ "\x1b[23;2~": Keys.F23,
+ "\x1b[24;2~": Keys.F24,
+ # --
+ # CSI 27 disambiguated modified "other" keys (xterm)
+ # Ref: https://invisible-island.net/xterm/modified-keys.html
+ # These are currently unsupported, so just re-map some common ones to the
+ # unmodified versions
+ "\x1b[27;2;13~": Keys.ControlM, # Shift + Enter
+ "\x1b[27;5;13~": Keys.ControlM, # Ctrl + Enter
+ "\x1b[27;6;13~": Keys.ControlM, # Ctrl + Shift + Enter
+ # --
+ # Control + function keys.
+ "\x1b[1;5P": Keys.ControlF1,
+ "\x1b[1;5Q": Keys.ControlF2,
+ # "\x1b[1;5R": Keys.ControlF3, # Conflicts with CPR response.
+ "\x1b[1;5S": Keys.ControlF4,
+ "\x1b[15;5~": Keys.ControlF5,
+ "\x1b[17;5~": Keys.ControlF6,
+ "\x1b[18;5~": Keys.ControlF7,
+ "\x1b[19;5~": Keys.ControlF8,
+ "\x1b[20;5~": Keys.ControlF9,
+ "\x1b[21;5~": Keys.ControlF10,
+ "\x1b[23;5~": Keys.ControlF11,
+ "\x1b[24;5~": Keys.ControlF12,
+ "\x1b[1;6P": Keys.ControlF13,
+ "\x1b[1;6Q": Keys.ControlF14,
+ # "\x1b[1;6R": Keys.ControlF15, # Conflicts with CPR response.
+ "\x1b[1;6S": Keys.ControlF16,
+ "\x1b[15;6~": Keys.ControlF17,
+ "\x1b[17;6~": Keys.ControlF18,
+ "\x1b[18;6~": Keys.ControlF19,
+ "\x1b[19;6~": Keys.ControlF20,
+ "\x1b[20;6~": Keys.ControlF21,
+ "\x1b[21;6~": Keys.ControlF22,
+ "\x1b[23;6~": Keys.ControlF23,
+ "\x1b[24;6~": Keys.ControlF24,
+ # --
+ # Tmux (Win32 subsystem) sends the following scroll events.
+ "\x1b[62~": Keys.ScrollUp,
+ "\x1b[63~": Keys.ScrollDown,
+ "\x1b[200~": Keys.BracketedPaste, # Start of bracketed paste.
+ # --
+ # Sequences generated by numpad 5. Not sure what it means. (It doesn't
+ # appear in 'infocmp'. Just ignore.
+ "\x1b[E": Keys.Ignore, # Xterm.
+ "\x1b[G": Keys.Ignore, # Linux console.
+ # --
+ # Meta/control/escape + pageup/pagedown/insert/delete.
+ "\x1b[3;2~": Keys.ShiftDelete, # xterm, gnome-terminal.
+ "\x1b[5;2~": Keys.ShiftPageUp,
+ "\x1b[6;2~": Keys.ShiftPageDown,
+ "\x1b[2;3~": (Keys.Escape, Keys.Insert),
+ "\x1b[3;3~": (Keys.Escape, Keys.Delete),
+ "\x1b[5;3~": (Keys.Escape, Keys.PageUp),
+ "\x1b[6;3~": (Keys.Escape, Keys.PageDown),
+ "\x1b[2;4~": (Keys.Escape, Keys.ShiftInsert),
+ "\x1b[3;4~": (Keys.Escape, Keys.ShiftDelete),
+ "\x1b[5;4~": (Keys.Escape, Keys.ShiftPageUp),
+ "\x1b[6;4~": (Keys.Escape, Keys.ShiftPageDown),
+ "\x1b[3;5~": Keys.ControlDelete, # xterm, gnome-terminal.
+ "\x1b[5;5~": Keys.ControlPageUp,
+ "\x1b[6;5~": Keys.ControlPageDown,
+ "\x1b[3;6~": Keys.ControlShiftDelete,
+ "\x1b[5;6~": Keys.ControlShiftPageUp,
+ "\x1b[6;6~": Keys.ControlShiftPageDown,
+ "\x1b[2;7~": (Keys.Escape, Keys.ControlInsert),
+ "\x1b[5;7~": (Keys.Escape, Keys.ControlPageDown),
+ "\x1b[6;7~": (Keys.Escape, Keys.ControlPageDown),
+ "\x1b[2;8~": (Keys.Escape, Keys.ControlShiftInsert),
+ "\x1b[5;8~": (Keys.Escape, Keys.ControlShiftPageDown),
+ "\x1b[6;8~": (Keys.Escape, Keys.ControlShiftPageDown),
+ # --
+ # Arrows.
+ # (Normal cursor mode).
+ "\x1b[A": Keys.Up,
+ "\x1b[B": Keys.Down,
+ "\x1b[C": Keys.Right,
+ "\x1b[D": Keys.Left,
+ "\x1b[H": Keys.Home,
+ "\x1b[F": Keys.End,
+ # Tmux sends following keystrokes when control+arrow is pressed, but for
+ # Emacs ansi-term sends the same sequences for normal arrow keys. Consider
+ # it a normal arrow press, because that's more important.
+ # (Application cursor mode).
+ "\x1bOA": Keys.Up,
+ "\x1bOB": Keys.Down,
+ "\x1bOC": Keys.Right,
+ "\x1bOD": Keys.Left,
+ "\x1bOF": Keys.End,
+ "\x1bOH": Keys.Home,
+ # Shift + arrows.
+ "\x1b[1;2A": Keys.ShiftUp,
+ "\x1b[1;2B": Keys.ShiftDown,
+ "\x1b[1;2C": Keys.ShiftRight,
+ "\x1b[1;2D": Keys.ShiftLeft,
+ "\x1b[1;2F": Keys.ShiftEnd,
+ "\x1b[1;2H": Keys.ShiftHome,
+ # Meta + arrow keys. Several terminals handle this differently.
+ # The following sequences are for xterm and gnome-terminal.
+ # (Iterm sends ESC followed by the normal arrow_up/down/left/right
+ # sequences, and the OSX Terminal sends ESCb and ESCf for "alt
+ # arrow_left" and "alt arrow_right." We don't handle these
+ # explicitly, in here, because would could not distinguish between
+ # pressing ESC (to go to Vi navigation mode), followed by just the
+ # 'b' or 'f' key. These combinations are handled in
+ # the input processor.)
+ "\x1b[1;3A": (Keys.Escape, Keys.Up),
+ "\x1b[1;3B": (Keys.Escape, Keys.Down),
+ "\x1b[1;3C": (Keys.Escape, Keys.Right),
+ "\x1b[1;3D": (Keys.Escape, Keys.Left),
+ "\x1b[1;3F": (Keys.Escape, Keys.End),
+ "\x1b[1;3H": (Keys.Escape, Keys.Home),
+ # Alt+shift+number.
+ "\x1b[1;4A": (Keys.Escape, Keys.ShiftDown),
+ "\x1b[1;4B": (Keys.Escape, Keys.ShiftUp),
+ "\x1b[1;4C": (Keys.Escape, Keys.ShiftRight),
+ "\x1b[1;4D": (Keys.Escape, Keys.ShiftLeft),
+ "\x1b[1;4F": (Keys.Escape, Keys.ShiftEnd),
+ "\x1b[1;4H": (Keys.Escape, Keys.ShiftHome),
+ # Control + arrows.
+ "\x1b[1;5A": Keys.ControlUp, # Cursor Mode
+ "\x1b[1;5B": Keys.ControlDown, # Cursor Mode
+ "\x1b[1;5C": Keys.ControlRight, # Cursor Mode
+ "\x1b[1;5D": Keys.ControlLeft, # Cursor Mode
+ "\x1b[1;5F": Keys.ControlEnd,
+ "\x1b[1;5H": Keys.ControlHome,
+ # Tmux sends following keystrokes when control+arrow is pressed, but for
+ # Emacs ansi-term sends the same sequences for normal arrow keys. Consider
+ # it a normal arrow press, because that's more important.
+ "\x1b[5A": Keys.ControlUp,
+ "\x1b[5B": Keys.ControlDown,
+ "\x1b[5C": Keys.ControlRight,
+ "\x1b[5D": Keys.ControlLeft,
+ "\x1bOc": Keys.ControlRight, # rxvt
+ "\x1bOd": Keys.ControlLeft, # rxvt
+ # Control + shift + arrows.
+ "\x1b[1;6A": Keys.ControlShiftDown,
+ "\x1b[1;6B": Keys.ControlShiftUp,
+ "\x1b[1;6C": Keys.ControlShiftRight,
+ "\x1b[1;6D": Keys.ControlShiftLeft,
+ "\x1b[1;6F": Keys.ControlShiftEnd,
+ "\x1b[1;6H": Keys.ControlShiftHome,
+ # Control + Meta + arrows.
+ "\x1b[1;7A": (Keys.Escape, Keys.ControlDown),
+ "\x1b[1;7B": (Keys.Escape, Keys.ControlUp),
+ "\x1b[1;7C": (Keys.Escape, Keys.ControlRight),
+ "\x1b[1;7D": (Keys.Escape, Keys.ControlLeft),
+ "\x1b[1;7F": (Keys.Escape, Keys.ControlEnd),
+ "\x1b[1;7H": (Keys.Escape, Keys.ControlHome),
+ # Meta + Shift + arrows.
+ "\x1b[1;8A": (Keys.Escape, Keys.ControlShiftDown),
+ "\x1b[1;8B": (Keys.Escape, Keys.ControlShiftUp),
+ "\x1b[1;8C": (Keys.Escape, Keys.ControlShiftRight),
+ "\x1b[1;8D": (Keys.Escape, Keys.ControlShiftLeft),
+ "\x1b[1;8F": (Keys.Escape, Keys.ControlShiftEnd),
+ "\x1b[1;8H": (Keys.Escape, Keys.ControlShiftHome),
+ # Meta + arrow on (some?) Macs when using iTerm defaults (see issue #483).
+ "\x1b[1;9A": (Keys.Escape, Keys.Up),
+ "\x1b[1;9B": (Keys.Escape, Keys.Down),
+ "\x1b[1;9C": (Keys.Escape, Keys.Right),
+ "\x1b[1;9D": (Keys.Escape, Keys.Left),
+ # --
+ # Control/shift/meta + number in mintty.
+ # (c-2 will actually send c-@ and c-6 will send c-^.)
+ "\x1b[1;5p": Keys.Control0,
+ "\x1b[1;5q": Keys.Control1,
+ "\x1b[1;5r": Keys.Control2,
+ "\x1b[1;5s": Keys.Control3,
+ "\x1b[1;5t": Keys.Control4,
+ "\x1b[1;5u": Keys.Control5,
+ "\x1b[1;5v": Keys.Control6,
+ "\x1b[1;5w": Keys.Control7,
+ "\x1b[1;5x": Keys.Control8,
+ "\x1b[1;5y": Keys.Control9,
+ "\x1b[1;6p": Keys.ControlShift0,
+ "\x1b[1;6q": Keys.ControlShift1,
+ "\x1b[1;6r": Keys.ControlShift2,
+ "\x1b[1;6s": Keys.ControlShift3,
+ "\x1b[1;6t": Keys.ControlShift4,
+ "\x1b[1;6u": Keys.ControlShift5,
+ "\x1b[1;6v": Keys.ControlShift6,
+ "\x1b[1;6w": Keys.ControlShift7,
+ "\x1b[1;6x": Keys.ControlShift8,
+ "\x1b[1;6y": Keys.ControlShift9,
+ "\x1b[1;7p": (Keys.Escape, Keys.Control0),
+ "\x1b[1;7q": (Keys.Escape, Keys.Control1),
+ "\x1b[1;7r": (Keys.Escape, Keys.Control2),
+ "\x1b[1;7s": (Keys.Escape, Keys.Control3),
+ "\x1b[1;7t": (Keys.Escape, Keys.Control4),
+ "\x1b[1;7u": (Keys.Escape, Keys.Control5),
+ "\x1b[1;7v": (Keys.Escape, Keys.Control6),
+ "\x1b[1;7w": (Keys.Escape, Keys.Control7),
+ "\x1b[1;7x": (Keys.Escape, Keys.Control8),
+ "\x1b[1;7y": (Keys.Escape, Keys.Control9),
+ "\x1b[1;8p": (Keys.Escape, Keys.ControlShift0),
+ "\x1b[1;8q": (Keys.Escape, Keys.ControlShift1),
+ "\x1b[1;8r": (Keys.Escape, Keys.ControlShift2),
+ "\x1b[1;8s": (Keys.Escape, Keys.ControlShift3),
+ "\x1b[1;8t": (Keys.Escape, Keys.ControlShift4),
+ "\x1b[1;8u": (Keys.Escape, Keys.ControlShift5),
+ "\x1b[1;8v": (Keys.Escape, Keys.ControlShift6),
+ "\x1b[1;8w": (Keys.Escape, Keys.ControlShift7),
+ "\x1b[1;8x": (Keys.Escape, Keys.ControlShift8),
+ "\x1b[1;8y": (Keys.Escape, Keys.ControlShift9),
+}
+
+
+def _get_reverse_ansi_sequences() -> Dict[Keys, str]:
+ """
+ Create a dictionary that maps prompt_toolkit keys back to the VT100 escape
+ sequences.
+ """
+ result: Dict[Keys, str] = {}
+
+ for sequence, key in ANSI_SEQUENCES.items():
+ if not isinstance(key, tuple):
+ if key not in result:
+ result[key] = sequence
+
+ return result
+
+
+REVERSE_ANSI_SEQUENCES = _get_reverse_ansi_sequences()
diff --git a/src/prompt_toolkit/input/base.py b/src/prompt_toolkit/input/base.py
new file mode 100644
index 0000000..313622d
--- /dev/null
+++ b/src/prompt_toolkit/input/base.py
@@ -0,0 +1,150 @@
+"""
+Abstraction of CLI Input.
+"""
+from abc import ABCMeta, abstractmethod, abstractproperty
+from contextlib import contextmanager
+from typing import Callable, ContextManager, Generator, List
+
+from prompt_toolkit.key_binding import KeyPress
+
+__all__ = [
+ "Input",
+ "PipeInput",
+ "DummyInput",
+]
+
+
+class Input(metaclass=ABCMeta):
+ """
+ Abstraction for any input.
+
+ An instance of this class can be given to the constructor of a
+ :class:`~prompt_toolkit.application.Application` and will also be
+ passed to the :class:`~prompt_toolkit.eventloop.base.EventLoop`.
+ """
+
+ @abstractmethod
+ def fileno(self) -> int:
+ """
+ Fileno for putting this in an event loop.
+ """
+
+ @abstractmethod
+ def typeahead_hash(self) -> str:
+ """
+ Identifier for storing type ahead key presses.
+ """
+
+ @abstractmethod
+ def read_keys(self) -> List[KeyPress]:
+ """
+ Return a list of Key objects which are read/parsed from the input.
+ """
+
+ def flush_keys(self) -> List[KeyPress]:
+ """
+ Flush the underlying parser. and return the pending keys.
+ (Used for vt100 input.)
+ """
+ return []
+
+ def flush(self) -> None:
+ "The event loop can call this when the input has to be flushed."
+ pass
+
+ @abstractproperty
+ def closed(self) -> bool:
+ "Should be true when the input stream is closed."
+ return False
+
+ @abstractmethod
+ def raw_mode(self) -> ContextManager[None]:
+ """
+ Context manager that turns the input into raw mode.
+ """
+
+ @abstractmethod
+ def cooked_mode(self) -> ContextManager[None]:
+ """
+ Context manager that turns the input into cooked mode.
+ """
+
+ @abstractmethod
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ """
+ Return a context manager that makes this input active in the current
+ event loop.
+ """
+
+ @abstractmethod
+ def detach(self) -> ContextManager[None]:
+ """
+ Return a context manager that makes sure that this input is not active
+ in the current event loop.
+ """
+
+ def close(self) -> None:
+ "Close input."
+ pass
+
+
+class PipeInput(Input):
+ """
+ Abstraction for pipe input.
+ """
+
+ @abstractmethod
+ def send_bytes(self, data: bytes) -> None:
+ """Feed byte string into the pipe"""
+
+ @abstractmethod
+ def send_text(self, data: str) -> None:
+ """Feed a text string into the pipe"""
+
+
+class DummyInput(Input):
+ """
+ Input for use in a `DummyApplication`
+
+ If used in an actual application, it will make the application render
+ itself once and exit immediately, due to an `EOFError`.
+ """
+
+ def fileno(self) -> int:
+ raise NotImplementedError
+
+ def typeahead_hash(self) -> str:
+ return "dummy-%s" % id(self)
+
+ def read_keys(self) -> List[KeyPress]:
+ return []
+
+ @property
+ def closed(self) -> bool:
+ # This needs to be true, so that the dummy input will trigger an
+ # `EOFError` immediately in the application.
+ return True
+
+ def raw_mode(self) -> ContextManager[None]:
+ return _dummy_context_manager()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return _dummy_context_manager()
+
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ # Call the callback immediately once after attaching.
+ # This tells the callback to call `read_keys` and check the
+ # `input.closed` flag, after which it won't receive any keys, but knows
+ # that `EOFError` should be raised. This unblocks `read_from_input` in
+ # `application.py`.
+ input_ready_callback()
+
+ return _dummy_context_manager()
+
+ def detach(self) -> ContextManager[None]:
+ return _dummy_context_manager()
+
+
+@contextmanager
+def _dummy_context_manager() -> Generator[None, None, None]:
+ yield
diff --git a/src/prompt_toolkit/input/defaults.py b/src/prompt_toolkit/input/defaults.py
new file mode 100644
index 0000000..564f848
--- /dev/null
+++ b/src/prompt_toolkit/input/defaults.py
@@ -0,0 +1,79 @@
+import io
+import sys
+from typing import ContextManager, Optional, TextIO
+
+from .base import DummyInput, Input, PipeInput
+
+__all__ = [
+ "create_input",
+ "create_pipe_input",
+]
+
+
+def create_input(
+ stdin: Optional[TextIO] = None, always_prefer_tty: bool = False
+) -> Input:
+ """
+ Create the appropriate `Input` object for the current os/environment.
+
+ :param always_prefer_tty: When set, if `sys.stdin` is connected to a Unix
+ `pipe`, check whether `sys.stdout` or `sys.stderr` are connected to a
+ pseudo terminal. If so, open the tty for reading instead of reading for
+ `sys.stdin`. (We can open `stdout` or `stderr` for reading, this is how
+ a `$PAGER` works.)
+ """
+ if sys.platform == "win32":
+ from .win32 import Win32Input
+
+ # If `stdin` was assigned `None` (which happens with pythonw.exe), use
+ # a `DummyInput`. This triggers `EOFError` in the application code.
+ if stdin is None and sys.stdin is None:
+ return DummyInput()
+
+ return Win32Input(stdin or sys.stdin)
+ else:
+ from .vt100 import Vt100Input
+
+ # If no input TextIO is given, use stdin/stdout.
+ if stdin is None:
+ stdin = sys.stdin
+
+ if always_prefer_tty:
+ for obj in [sys.stdin, sys.stdout, sys.stderr]:
+ if obj.isatty():
+ stdin = obj
+ break
+
+ # If we can't access the file descriptor for the selected stdin, return
+ # a `DummyInput` instead. This can happen for instance in unit tests,
+ # when `sys.stdin` is patched by something that's not an actual file.
+ # (Instantiating `Vt100Input` would fail in this case.)
+ try:
+ stdin.fileno()
+ except io.UnsupportedOperation:
+ return DummyInput()
+
+ return Vt100Input(stdin)
+
+
+def create_pipe_input() -> ContextManager[PipeInput]:
+ """
+ Create an input pipe.
+ This is mostly useful for unit testing.
+
+ Usage::
+
+ with create_pipe_input() as input:
+ input.send_text('inputdata')
+
+ Breaking change: In prompt_toolkit 3.0.28 and earlier, this was returning
+ the `PipeInput` directly, rather than through a context manager.
+ """
+ if sys.platform == "win32":
+ from .win32_pipe import Win32PipeInput
+
+ return Win32PipeInput.create()
+ else:
+ from .posix_pipe import PosixPipeInput
+
+ return PosixPipeInput.create()
diff --git a/src/prompt_toolkit/input/posix_pipe.py b/src/prompt_toolkit/input/posix_pipe.py
new file mode 100644
index 0000000..1e7dec7
--- /dev/null
+++ b/src/prompt_toolkit/input/posix_pipe.py
@@ -0,0 +1,116 @@
+import sys
+
+assert sys.platform != "win32"
+
+import os
+from contextlib import contextmanager
+from typing import ContextManager, Iterator, TextIO, cast
+
+from ..utils import DummyContext
+from .base import PipeInput
+from .vt100 import Vt100Input
+
+__all__ = [
+ "PosixPipeInput",
+]
+
+
+class _Pipe:
+ "Wrapper around os.pipe, that ensures we don't double close any end."
+
+ def __init__(self) -> None:
+ self.read_fd, self.write_fd = os.pipe()
+ self._read_closed = False
+ self._write_closed = False
+
+ def close_read(self) -> None:
+ "Close read-end if not yet closed."
+ if self._read_closed:
+ return
+
+ os.close(self.read_fd)
+ self._read_closed = True
+
+ def close_write(self) -> None:
+ "Close write-end if not yet closed."
+ if self._write_closed:
+ return
+
+ os.close(self.write_fd)
+ self._write_closed = True
+
+ def close(self) -> None:
+ "Close both read and write ends."
+ self.close_read()
+ self.close_write()
+
+
+class PosixPipeInput(Vt100Input, PipeInput):
+ """
+ Input that is send through a pipe.
+ This is useful if we want to send the input programmatically into the
+ application. Mostly useful for unit testing.
+
+ Usage::
+
+ with PosixPipeInput.create() as input:
+ input.send_text('inputdata')
+ """
+
+ _id = 0
+
+ def __init__(self, _pipe: _Pipe, _text: str = "") -> None:
+ # Private constructor. Users should use the public `.create()` method.
+ self.pipe = _pipe
+
+ class Stdin:
+ encoding = "utf-8"
+
+ def isatty(stdin) -> bool:
+ return True
+
+ def fileno(stdin) -> int:
+ return self.pipe.read_fd
+
+ super().__init__(cast(TextIO, Stdin()))
+ self.send_text(_text)
+
+ # Identifier for every PipeInput for the hash.
+ self.__class__._id += 1
+ self._id = self.__class__._id
+
+ @classmethod
+ @contextmanager
+ def create(cls, text: str = "") -> Iterator["PosixPipeInput"]:
+ pipe = _Pipe()
+ try:
+ yield PosixPipeInput(_pipe=pipe, _text=text)
+ finally:
+ pipe.close()
+
+ def send_bytes(self, data: bytes) -> None:
+ os.write(self.pipe.write_fd, data)
+
+ def send_text(self, data: str) -> None:
+ "Send text to the input."
+ os.write(self.pipe.write_fd, data.encode("utf-8"))
+
+ def raw_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def close(self) -> None:
+ "Close pipe fds."
+ # Only close the write-end of the pipe. This will unblock the reader
+ # callback (in vt100.py > _attached_input), which eventually will raise
+ # `EOFError`. If we'd also close the read-end, then the event loop
+ # won't wake up the corresponding callback because of this.
+ self.pipe.close_write()
+
+ def typeahead_hash(self) -> str:
+ """
+ This needs to be unique for every `PipeInput`.
+ """
+ return f"pipe-input-{self._id}"
diff --git a/src/prompt_toolkit/input/posix_utils.py b/src/prompt_toolkit/input/posix_utils.py
new file mode 100644
index 0000000..7cf31ee
--- /dev/null
+++ b/src/prompt_toolkit/input/posix_utils.py
@@ -0,0 +1,95 @@
+import os
+import select
+from codecs import getincrementaldecoder
+
+__all__ = [
+ "PosixStdinReader",
+]
+
+
+class PosixStdinReader:
+ """
+ Wrapper around stdin which reads (nonblocking) the next available 1024
+ bytes and decodes it.
+
+ Note that you can't be sure that the input file is closed if the ``read``
+ function returns an empty string. When ``errors=ignore`` is passed,
+ ``read`` can return an empty string if all malformed input was replaced by
+ an empty string. (We can't block here and wait for more input.) So, because
+ of that, check the ``closed`` attribute, to be sure that the file has been
+ closed.
+
+ :param stdin_fd: File descriptor from which we read.
+ :param errors: Can be 'ignore', 'strict' or 'replace'.
+ On Python3, this can be 'surrogateescape', which is the default.
+
+ 'surrogateescape' is preferred, because this allows us to transfer
+ unrecognised bytes to the key bindings. Some terminals, like lxterminal
+ and Guake, use the 'Mxx' notation to send mouse events, where each 'x'
+ can be any possible byte.
+ """
+
+ # By default, we want to 'ignore' errors here. The input stream can be full
+ # of junk. One occurrence of this that I had was when using iTerm2 on OS X,
+ # with "Option as Meta" checked (You should choose "Option as +Esc".)
+
+ def __init__(
+ self, stdin_fd: int, errors: str = "surrogateescape", encoding: str = "utf-8"
+ ) -> None:
+ self.stdin_fd = stdin_fd
+ self.errors = errors
+
+ # Create incremental decoder for decoding stdin.
+ # We can not just do `os.read(stdin.fileno(), 1024).decode('utf-8')`, because
+ # it could be that we are in the middle of a utf-8 byte sequence.
+ self._stdin_decoder_cls = getincrementaldecoder(encoding)
+ self._stdin_decoder = self._stdin_decoder_cls(errors=errors)
+
+ #: True when there is nothing anymore to read.
+ self.closed = False
+
+ def read(self, count: int = 1024) -> str:
+ # By default we choose a rather small chunk size, because reading
+ # big amounts of input at once, causes the event loop to process
+ # all these key bindings also at once without going back to the
+ # loop. This will make the application feel unresponsive.
+ """
+ Read the input and return it as a string.
+
+ Return the text. Note that this can return an empty string, even when
+ the input stream was not yet closed. This means that something went
+ wrong during the decoding.
+ """
+ if self.closed:
+ return ""
+
+ # Check whether there is some input to read. `os.read` would block
+ # otherwise.
+ # (Actually, the event loop is responsible to make sure that this
+ # function is only called when there is something to read, but for some
+ # reason this happens in certain situations.)
+ try:
+ if not select.select([self.stdin_fd], [], [], 0)[0]:
+ return ""
+ except OSError:
+ # Happens for instance when the file descriptor was closed.
+ # (We had this in ptterm, where the FD became ready, a callback was
+ # scheduled, but in the meantime another callback closed it already.)
+ self.closed = True
+
+ # Note: the following works better than wrapping `self.stdin` like
+ # `codecs.getreader('utf-8')(stdin)` and doing `read(1)`.
+ # Somehow that causes some latency when the escape
+ # character is pressed. (Especially on combination with the `select`.)
+ try:
+ data = os.read(self.stdin_fd, count)
+
+ # Nothing more to read, stream is closed.
+ if data == b"":
+ self.closed = True
+ return ""
+ except OSError:
+ # In case of SIGWINCH
+ data = b""
+
+ return self._stdin_decoder.decode(data)
diff --git a/src/prompt_toolkit/input/typeahead.py b/src/prompt_toolkit/input/typeahead.py
new file mode 100644
index 0000000..a3d7866
--- /dev/null
+++ b/src/prompt_toolkit/input/typeahead.py
@@ -0,0 +1,76 @@
+r"""
+Store input key strokes if we did read more than was required.
+
+The input classes `Vt100Input` and `Win32Input` read the input text in chunks
+of a few kilobytes. This means that if we read input from stdin, it could be
+that we read a couple of lines (with newlines in between) at once.
+
+This creates a problem: potentially, we read too much from stdin. Sometimes
+people paste several lines at once because they paste input in a REPL and
+expect each input() call to process one line. Or they rely on type ahead
+because the application can't keep up with the processing.
+
+However, we need to read input in bigger chunks. We need this mostly to support
+pasting of larger chunks of text. We don't want everything to become
+unresponsive because we:
+ - read one character;
+ - parse one character;
+ - call the key binding, which does a string operation with one character;
+ - and render the user interface.
+Doing text operations on single characters is very inefficient in Python, so we
+prefer to work on bigger chunks of text. This is why we have to read the input
+in bigger chunks.
+
+Further, line buffering is also not an option, because it doesn't work well in
+the architecture. We use lower level Posix APIs, that work better with the
+event loop and so on. In fact, there is also nothing that defines that only \n
+can accept the input, you could create a key binding for any key to accept the
+input.
+
+To support type ahead, this module will store all the key strokes that were
+read too early, so that they can be feed into to the next `prompt()` call or to
+the next prompt_toolkit `Application`.
+"""
+from collections import defaultdict
+from typing import Dict, List
+
+from ..key_binding import KeyPress
+from .base import Input
+
+__all__ = [
+ "store_typeahead",
+ "get_typeahead",
+ "clear_typeahead",
+]
+
+_buffer: Dict[str, List[KeyPress]] = defaultdict(list)
+
+
+def store_typeahead(input_obj: Input, key_presses: List[KeyPress]) -> None:
+ """
+ Insert typeahead key presses for the given input.
+ """
+ global _buffer
+ key = input_obj.typeahead_hash()
+ _buffer[key].extend(key_presses)
+
+
+def get_typeahead(input_obj: Input) -> List[KeyPress]:
+ """
+ Retrieve typeahead and reset the buffer for this input.
+ """
+ global _buffer
+
+ key = input_obj.typeahead_hash()
+ result = _buffer[key]
+ _buffer[key] = []
+ return result
+
+
+def clear_typeahead(input_obj: Input) -> None:
+ """
+ Clear typeahead buffer.
+ """
+ global _buffer
+ key = input_obj.typeahead_hash()
+ _buffer[key] = []
diff --git a/src/prompt_toolkit/input/vt100.py b/src/prompt_toolkit/input/vt100.py
new file mode 100644
index 0000000..45ce372
--- /dev/null
+++ b/src/prompt_toolkit/input/vt100.py
@@ -0,0 +1,320 @@
+import sys
+
+assert sys.platform != "win32"
+
+import contextlib
+import io
+import termios
+import tty
+from asyncio import AbstractEventLoop
+from typing import (
+ Callable,
+ ContextManager,
+ Dict,
+ Generator,
+ List,
+ Optional,
+ Set,
+ TextIO,
+ Tuple,
+ Union,
+)
+
+from prompt_toolkit.eventloop import get_event_loop
+
+from ..key_binding import KeyPress
+from .base import Input
+from .posix_utils import PosixStdinReader
+from .vt100_parser import Vt100Parser
+
+__all__ = [
+ "Vt100Input",
+ "raw_mode",
+ "cooked_mode",
+]
+
+
+class Vt100Input(Input):
+ """
+ Vt100 input for Posix systems.
+ (This uses a posix file descriptor that can be registered in the event loop.)
+ """
+
+ # For the error messages. Only display "Input is not a terminal" once per
+ # file descriptor.
+ _fds_not_a_terminal: Set[int] = set()
+
+ def __init__(self, stdin: TextIO) -> None:
+ # Test whether the given input object has a file descriptor.
+ # (Idle reports stdin to be a TTY, but fileno() is not implemented.)
+ try:
+ # This should not raise, but can return 0.
+ stdin.fileno()
+ except io.UnsupportedOperation as e:
+ if "idlelib.run" in sys.modules:
+ raise io.UnsupportedOperation(
+ "Stdin is not a terminal. Running from Idle is not supported."
+ ) from e
+ else:
+ raise io.UnsupportedOperation("Stdin is not a terminal.") from e
+
+ # Even when we have a file descriptor, it doesn't mean it's a TTY.
+ # Normally, this requires a real TTY device, but people instantiate
+ # this class often during unit tests as well. They use for instance
+ # pexpect to pipe data into an application. For convenience, we print
+ # an error message and go on.
+ isatty = stdin.isatty()
+ fd = stdin.fileno()
+
+ if not isatty and fd not in Vt100Input._fds_not_a_terminal:
+ msg = "Warning: Input is not a terminal (fd=%r).\n"
+ sys.stderr.write(msg % fd)
+ sys.stderr.flush()
+ Vt100Input._fds_not_a_terminal.add(fd)
+
+ #
+ self.stdin = stdin
+
+ # Create a backup of the fileno(). We want this to work even if the
+ # underlying file is closed, so that `typeahead_hash()` keeps working.
+ self._fileno = stdin.fileno()
+
+ self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects.
+ self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding)
+ self.vt100_parser = Vt100Parser(
+ lambda key_press: self._buffer.append(key_press)
+ )
+
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ """
+ Return a context manager that makes this input active in the current
+ event loop.
+ """
+ return _attached_input(self, input_ready_callback)
+
+ def detach(self) -> ContextManager[None]:
+ """
+ Return a context manager that makes sure that this input is not active
+ in the current event loop.
+ """
+ return _detached_input(self)
+
+ def read_keys(self) -> List[KeyPress]:
+ "Read list of KeyPress."
+ # Read text from stdin.
+ data = self.stdin_reader.read()
+
+ # Pass it through our vt100 parser.
+ self.vt100_parser.feed(data)
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+ return result
+
+ def flush_keys(self) -> List[KeyPress]:
+ """
+ Flush pending keys and return them.
+ (Used for flushing the 'escape' key.)
+ """
+ # Flush all pending keys. (This is most important to flush the vt100
+ # 'Escape' key early when nothing else follows.)
+ self.vt100_parser.flush()
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+ return result
+
+ @property
+ def closed(self) -> bool:
+ return self.stdin_reader.closed
+
+ def raw_mode(self) -> ContextManager[None]:
+ return raw_mode(self.stdin.fileno())
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return cooked_mode(self.stdin.fileno())
+
+ def fileno(self) -> int:
+ return self.stdin.fileno()
+
+ def typeahead_hash(self) -> str:
+ return f"fd-{self._fileno}"
+
+
+_current_callbacks: Dict[
+ Tuple[AbstractEventLoop, int], Optional[Callable[[], None]]
+] = {} # (loop, fd) -> current callback
+
+
+@contextlib.contextmanager
+def _attached_input(
+ input: Vt100Input, callback: Callable[[], None]
+) -> Generator[None, None, None]:
+ """
+ Context manager that makes this input active in the current event loop.
+
+ :param input: :class:`~prompt_toolkit.input.Input` object.
+ :param callback: Called when the input is ready to read.
+ """
+ loop = get_event_loop()
+ fd = input.fileno()
+ previous = _current_callbacks.get((loop, fd))
+
+ def callback_wrapper() -> None:
+ """Wrapper around the callback that already removes the reader when
+ the input is closed. Otherwise, we keep continuously calling this
+ callback, until we leave the context manager (which can happen a bit
+ later). This fixes issues when piping /dev/null into a prompt_toolkit
+ application."""
+ if input.closed:
+ loop.remove_reader(fd)
+ callback()
+
+ try:
+ loop.add_reader(fd, callback_wrapper)
+ except PermissionError:
+ # For `EPollSelector`, adding /dev/null to the event loop will raise
+ # `PermisisonError` (that doesn't happen for `SelectSelector`
+ # apparently). Whenever we get a `PermissionError`, we can raise
+ # `EOFError`, because there's not more to be read anyway. `EOFError` is
+ # an exception that people expect in
+ # `prompt_toolkit.application.Application.run()`.
+ # To reproduce, do: `ptpython 0< /dev/null 1< /dev/null`
+ raise EOFError
+
+ _current_callbacks[loop, fd] = callback
+
+ try:
+ yield
+ finally:
+ loop.remove_reader(fd)
+
+ if previous:
+ loop.add_reader(fd, previous)
+ _current_callbacks[loop, fd] = previous
+ else:
+ del _current_callbacks[loop, fd]
+
+
+@contextlib.contextmanager
+def _detached_input(input: Vt100Input) -> Generator[None, None, None]:
+ loop = get_event_loop()
+ fd = input.fileno()
+ previous = _current_callbacks.get((loop, fd))
+
+ if previous:
+ loop.remove_reader(fd)
+ _current_callbacks[loop, fd] = None
+
+ try:
+ yield
+ finally:
+ if previous:
+ loop.add_reader(fd, previous)
+ _current_callbacks[loop, fd] = previous
+
+
+class raw_mode:
+ """
+ ::
+
+ with raw_mode(stdin):
+ ''' the pseudo-terminal stdin is now used in raw mode '''
+
+ We ignore errors when executing `tcgetattr` fails.
+ """
+
+ # There are several reasons for ignoring errors:
+ # 1. To avoid the "Inappropriate ioctl for device" crash if somebody would
+ # execute this code (In a Python REPL, for instance):
+ #
+ # import os; f = open(os.devnull); os.dup2(f.fileno(), 0)
+ #
+ # The result is that the eventloop will stop correctly, because it has
+ # to logic to quit when stdin is closed. However, we should not fail at
+ # this point. See:
+ # https://github.com/jonathanslenders/python-prompt-toolkit/pull/393
+ # https://github.com/jonathanslenders/python-prompt-toolkit/issues/392
+
+ # 2. Related, when stdin is an SSH pipe, and no full terminal was allocated.
+ # See: https://github.com/jonathanslenders/python-prompt-toolkit/pull/165
+ def __init__(self, fileno: int) -> None:
+ self.fileno = fileno
+ self.attrs_before: Optional[List[Union[int, List[Union[bytes, int]]]]]
+ try:
+ self.attrs_before = termios.tcgetattr(fileno)
+ except termios.error:
+ # Ignore attribute errors.
+ self.attrs_before = None
+
+ def __enter__(self) -> None:
+ # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this:
+ try:
+ newattr = termios.tcgetattr(self.fileno)
+ except termios.error:
+ pass
+ else:
+ newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
+ newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])
+
+ # VMIN defines the number of characters read at a time in
+ # non-canonical mode. It seems to default to 1 on Linux, but on
+ # Solaris and derived operating systems it defaults to 4. (This is
+ # because the VMIN slot is the same as the VEOF slot, which
+ # defaults to ASCII EOT = Ctrl-D = 4.)
+ newattr[tty.CC][termios.VMIN] = 1
+
+ termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)
+
+ @classmethod
+ def _patch_lflag(cls, attrs: int) -> int:
+ return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
+
+ @classmethod
+ def _patch_iflag(cls, attrs: int) -> int:
+ return attrs & ~(
+ # Disable XON/XOFF flow control on output and input.
+ # (Don't capture Ctrl-S and Ctrl-Q.)
+ # Like executing: "stty -ixon."
+ termios.IXON
+ | termios.IXOFF
+ |
+ # Don't translate carriage return into newline on input.
+ termios.ICRNL
+ | termios.INLCR
+ | termios.IGNCR
+ )
+
+ def __exit__(self, *a: object) -> None:
+ if self.attrs_before is not None:
+ try:
+ termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
+ except termios.error:
+ pass
+
+ # # Put the terminal in application mode.
+ # self._stdout.write('\x1b[?1h')
+
+
+class cooked_mode(raw_mode):
+ """
+ The opposite of ``raw_mode``, used when we need cooked mode inside a
+ `raw_mode` block. Used in `Application.run_in_terminal`.::
+
+ with cooked_mode(stdin):
+ ''' the pseudo-terminal stdin is now used in cooked mode. '''
+ """
+
+ @classmethod
+ def _patch_lflag(cls, attrs: int) -> int:
+ return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
+
+ @classmethod
+ def _patch_iflag(cls, attrs: int) -> int:
+ # Turn the ICRNL flag back on. (Without this, calling `input()` in
+ # run_in_terminal doesn't work and displays ^M instead. Ptpython
+ # evaluates commands using `run_in_terminal`, so it's important that
+ # they translate ^M back into ^J.)
+ return attrs | termios.ICRNL
diff --git a/src/prompt_toolkit/input/vt100_parser.py b/src/prompt_toolkit/input/vt100_parser.py
new file mode 100644
index 0000000..3ee1e14
--- /dev/null
+++ b/src/prompt_toolkit/input/vt100_parser.py
@@ -0,0 +1,247 @@
+"""
+Parser for VT100 input stream.
+"""
+import re
+from typing import Callable, Dict, Generator, Tuple, Union
+
+from ..key_binding.key_processor import KeyPress
+from ..keys import Keys
+from .ansi_escape_sequences import ANSI_SEQUENCES
+
+__all__ = [
+ "Vt100Parser",
+]
+
+
+# Regex matching any CPR response
+# (Note that we use '\Z' instead of '$', because '$' could include a trailing
+# newline.)
+_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z")
+
+# Mouse events:
+# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M"
+_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
+
+# Regex matching any valid prefix of a CPR response.
+# (Note that it doesn't contain the last character, the 'R'. The prefix has to
+# be shorter.)
+_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z")
+
+_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z")
+
+
+class _Flush:
+ """Helper object to indicate flush operation to the parser."""
+
+ pass
+
+
+class _IsPrefixOfLongerMatchCache(Dict[str, bool]):
+ """
+ Dictionary that maps input sequences to a boolean indicating whether there is
+ any key that start with this characters.
+ """
+
+ def __missing__(self, prefix: str) -> bool:
+ # (hard coded) If this could be a prefix of a CPR response, return
+ # True.
+ if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match(
+ prefix
+ ):
+ result = True
+ else:
+ # If this could be a prefix of anything else, also return True.
+ result = any(
+ v
+ for k, v in ANSI_SEQUENCES.items()
+ if k.startswith(prefix) and k != prefix
+ )
+
+ self[prefix] = result
+ return result
+
+
+_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache()
+
+
+class Vt100Parser:
+ """
+ Parser for VT100 input stream.
+ Data can be fed through the `feed` method and the given callback will be
+ called with KeyPress objects.
+
+ ::
+
+ def callback(key):
+ pass
+ i = Vt100Parser(callback)
+ i.feed('data\x01...')
+
+ :attr feed_key_callback: Function that will be called when a key is parsed.
+ """
+
+ # Lookup table of ANSI escape sequences for a VT100 terminal
+ # Hint: in order to know what sequences your terminal writes to stdin, run
+ # "od -c" and start typing.
+ def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None:
+ self.feed_key_callback = feed_key_callback
+ self.reset()
+
+ def reset(self, request: bool = False) -> None:
+ self._in_bracketed_paste = False
+ self._start_parser()
+
+ def _start_parser(self) -> None:
+ """
+ Start the parser coroutine.
+ """
+ self._input_parser = self._input_parser_generator()
+ self._input_parser.send(None) # type: ignore
+
+ def _get_match(self, prefix: str) -> Union[None, Keys, Tuple[Keys, ...]]:
+ """
+ Return the key (or keys) that maps to this prefix.
+ """
+ # (hard coded) If we match a CPR response, return Keys.CPRResponse.
+ # (This one doesn't fit in the ANSI_SEQUENCES, because it contains
+ # integer variables.)
+ if _cpr_response_re.match(prefix):
+ return Keys.CPRResponse
+
+ elif _mouse_event_re.match(prefix):
+ return Keys.Vt100MouseEvent
+
+ # Otherwise, use the mappings.
+ try:
+ return ANSI_SEQUENCES[prefix]
+ except KeyError:
+ return None
+
+ def _input_parser_generator(self) -> Generator[None, Union[str, _Flush], None]:
+ """
+ Coroutine (state machine) for the input parser.
+ """
+ prefix = ""
+ retry = False
+ flush = False
+
+ while True:
+ flush = False
+
+ if retry:
+ retry = False
+ else:
+ # Get next character.
+ c = yield
+
+ if isinstance(c, _Flush):
+ flush = True
+ else:
+ prefix += c
+
+ # If we have some data, check for matches.
+ if prefix:
+ is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
+ match = self._get_match(prefix)
+
+ # Exact matches found, call handlers..
+ if (flush or not is_prefix_of_longer_match) and match:
+ self._call_handler(match, prefix)
+ prefix = ""
+
+ # No exact match found.
+ elif (flush or not is_prefix_of_longer_match) and not match:
+ found = False
+ retry = True
+
+ # Loop over the input, try the longest match first and
+ # shift.
+ for i in range(len(prefix), 0, -1):
+ match = self._get_match(prefix[:i])
+ if match:
+ self._call_handler(match, prefix[:i])
+ prefix = prefix[i:]
+ found = True
+
+ if not found:
+ self._call_handler(prefix[0], prefix[0])
+ prefix = prefix[1:]
+
+ def _call_handler(
+ self, key: Union[str, Keys, Tuple[Keys, ...]], insert_text: str
+ ) -> None:
+ """
+ Callback to handler.
+ """
+ if isinstance(key, tuple):
+ # Received ANSI sequence that corresponds with multiple keys
+ # (probably alt+something). Handle keys individually, but only pass
+ # data payload to first KeyPress (so that we won't insert it
+ # multiple times).
+ for i, k in enumerate(key):
+ self._call_handler(k, insert_text if i == 0 else "")
+ else:
+ if key == Keys.BracketedPaste:
+ self._in_bracketed_paste = True
+ self._paste_buffer = ""
+ else:
+ self.feed_key_callback(KeyPress(key, insert_text))
+
+ def feed(self, data: str) -> None:
+ """
+ Feed the input stream.
+
+ :param data: Input string (unicode).
+ """
+ # Handle bracketed paste. (We bypass the parser that matches all other
+ # key presses and keep reading input until we see the end mark.)
+ # This is much faster then parsing character by character.
+ if self._in_bracketed_paste:
+ self._paste_buffer += data
+ end_mark = "\x1b[201~"
+
+ if end_mark in self._paste_buffer:
+ end_index = self._paste_buffer.index(end_mark)
+
+ # Feed content to key bindings.
+ paste_content = self._paste_buffer[:end_index]
+ self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content))
+
+ # Quit bracketed paste mode and handle remaining input.
+ self._in_bracketed_paste = False
+ remaining = self._paste_buffer[end_index + len(end_mark) :]
+ self._paste_buffer = ""
+
+ self.feed(remaining)
+
+ # Handle normal input character by character.
+ else:
+ for i, c in enumerate(data):
+ if self._in_bracketed_paste:
+ # Quit loop and process from this position when the parser
+ # entered bracketed paste.
+ self.feed(data[i:])
+ break
+ else:
+ self._input_parser.send(c)
+
+ def flush(self) -> None:
+ """
+ Flush the buffer of the input stream.
+
+ This will allow us to handle the escape key (or maybe meta) sooner.
+ The input received by the escape key is actually the same as the first
+ characters of e.g. Arrow-Up, so without knowing what follows the escape
+ sequence, we don't know whether escape has been pressed, or whether
+ it's something else. This flush function should be called after a
+ timeout, and processes everything that's still in the buffer as-is, so
+ without assuming any characters will follow.
+ """
+ self._input_parser.send(_Flush())
+
+ def feed_and_flush(self, data: str) -> None:
+ """
+ Wrapper around ``feed`` and ``flush``.
+ """
+ self.feed(data)
+ self.flush()
diff --git a/src/prompt_toolkit/input/win32.py b/src/prompt_toolkit/input/win32.py
new file mode 100644
index 0000000..db3fa2b
--- /dev/null
+++ b/src/prompt_toolkit/input/win32.py
@@ -0,0 +1,757 @@
+import os
+import sys
+from abc import abstractmethod
+from contextlib import contextmanager
+
+from prompt_toolkit.eventloop import get_event_loop
+
+from ..utils import SPHINX_AUTODOC_RUNNING
+
+assert sys.platform == "win32"
+
+# Do not import win32-specific stuff when generating documentation.
+# Otherwise RTD would be unable to generate docs for this module.
+if not SPHINX_AUTODOC_RUNNING:
+ import msvcrt
+ from ctypes import windll
+
+from ctypes import Array, pointer
+from ctypes.wintypes import DWORD, HANDLE
+from typing import (
+ Callable,
+ ContextManager,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ TextIO,
+)
+
+from prompt_toolkit.eventloop import run_in_executor_with_context
+from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles
+from prompt_toolkit.key_binding.key_processor import KeyPress
+from prompt_toolkit.keys import Keys
+from prompt_toolkit.mouse_events import MouseButton, MouseEventType
+from prompt_toolkit.win32_types import (
+ INPUT_RECORD,
+ KEY_EVENT_RECORD,
+ MOUSE_EVENT_RECORD,
+ STD_INPUT_HANDLE,
+ EventTypes,
+)
+
+from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
+from .base import Input
+
+__all__ = [
+ "Win32Input",
+ "ConsoleInputReader",
+ "raw_mode",
+ "cooked_mode",
+ "attach_win32_input",
+ "detach_win32_input",
+]
+
+# Win32 Constants for MOUSE_EVENT_RECORD.
+# See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str
+FROM_LEFT_1ST_BUTTON_PRESSED = 0x1
+RIGHTMOST_BUTTON_PRESSED = 0x2
+MOUSE_MOVED = 0x0001
+MOUSE_WHEELED = 0x0004
+
+
+class _Win32InputBase(Input):
+ """
+ Base class for `Win32Input` and `Win32PipeInput`.
+ """
+
+ def __init__(self) -> None:
+ self.win32_handles = _Win32Handles()
+
+ @property
+ @abstractmethod
+ def handle(self) -> HANDLE:
+ pass
+
+
+class Win32Input(_Win32InputBase):
+ """
+ `Input` class that reads from the Windows console.
+ """
+
+ def __init__(self, stdin: Optional[TextIO] = None) -> None:
+ super().__init__()
+ self.console_input_reader = ConsoleInputReader()
+
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ """
+ Return a context manager that makes this input active in the current
+ event loop.
+ """
+ return attach_win32_input(self, input_ready_callback)
+
+ def detach(self) -> ContextManager[None]:
+ """
+ Return a context manager that makes sure that this input is not active
+ in the current event loop.
+ """
+ return detach_win32_input(self)
+
+ def read_keys(self) -> List[KeyPress]:
+ return list(self.console_input_reader.read())
+
+ def flush(self) -> None:
+ pass
+
+ @property
+ def closed(self) -> bool:
+ return False
+
+ def raw_mode(self) -> ContextManager[None]:
+ return raw_mode()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return cooked_mode()
+
+ def fileno(self) -> int:
+ # The windows console doesn't depend on the file handle, so
+ # this is not used for the event loop (which uses the
+ # handle instead). But it's used in `Application.run_system_command`
+ # which opens a subprocess with a given stdin/stdout.
+ return sys.stdin.fileno()
+
+ def typeahead_hash(self) -> str:
+ return "win32-input"
+
+ def close(self) -> None:
+ self.console_input_reader.close()
+
+ @property
+ def handle(self) -> HANDLE:
+ return self.console_input_reader.handle
+
+
+class ConsoleInputReader:
+ """
+ :param recognize_paste: When True, try to discover paste actions and turn
+ the event into a BracketedPaste.
+ """
+
+ # Keys with character data.
+ mappings = {
+ b"\x1b": Keys.Escape,
+ b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@)
+ b"\x01": Keys.ControlA, # Control-A (home)
+ b"\x02": Keys.ControlB, # Control-B (emacs cursor left)
+ b"\x03": Keys.ControlC, # Control-C (interrupt)
+ b"\x04": Keys.ControlD, # Control-D (exit)
+ b"\x05": Keys.ControlE, # Control-E (end)
+ b"\x06": Keys.ControlF, # Control-F (cursor forward)
+ b"\x07": Keys.ControlG, # Control-G
+ b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b')
+ b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t')
+ b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n')
+ b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab)
+ b"\x0c": Keys.ControlL, # Control-L (clear; form feed)
+ b"\x0d": Keys.ControlM, # Control-M (enter)
+ b"\x0e": Keys.ControlN, # Control-N (14) (history forward)
+ b"\x0f": Keys.ControlO, # Control-O (15)
+ b"\x10": Keys.ControlP, # Control-P (16) (history back)
+ b"\x11": Keys.ControlQ, # Control-Q
+ b"\x12": Keys.ControlR, # Control-R (18) (reverse search)
+ b"\x13": Keys.ControlS, # Control-S (19) (forward search)
+ b"\x14": Keys.ControlT, # Control-T
+ b"\x15": Keys.ControlU, # Control-U
+ b"\x16": Keys.ControlV, # Control-V
+ b"\x17": Keys.ControlW, # Control-W
+ b"\x18": Keys.ControlX, # Control-X
+ b"\x19": Keys.ControlY, # Control-Y (25)
+ b"\x1a": Keys.ControlZ, # Control-Z
+ b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-|
+ b"\x1d": Keys.ControlSquareClose, # Control-]
+ b"\x1e": Keys.ControlCircumflex, # Control-^
+ b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.)
+ b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.)
+ }
+
+ # Keys that don't carry character data.
+ keycodes = {
+ # Home/End
+ 33: Keys.PageUp,
+ 34: Keys.PageDown,
+ 35: Keys.End,
+ 36: Keys.Home,
+ # Arrows
+ 37: Keys.Left,
+ 38: Keys.Up,
+ 39: Keys.Right,
+ 40: Keys.Down,
+ 45: Keys.Insert,
+ 46: Keys.Delete,
+ # F-keys.
+ 112: Keys.F1,
+ 113: Keys.F2,
+ 114: Keys.F3,
+ 115: Keys.F4,
+ 116: Keys.F5,
+ 117: Keys.F6,
+ 118: Keys.F7,
+ 119: Keys.F8,
+ 120: Keys.F9,
+ 121: Keys.F10,
+ 122: Keys.F11,
+ 123: Keys.F12,
+ }
+
+ LEFT_ALT_PRESSED = 0x0002
+ RIGHT_ALT_PRESSED = 0x0001
+ SHIFT_PRESSED = 0x0010
+ LEFT_CTRL_PRESSED = 0x0008
+ RIGHT_CTRL_PRESSED = 0x0004
+
+ def __init__(self, recognize_paste: bool = True) -> None:
+ self._fdcon = None
+ self.recognize_paste = recognize_paste
+
+ # When stdin is a tty, use that handle, otherwise, create a handle from
+ # CONIN$.
+ self.handle: HANDLE
+ if sys.stdin.isatty():
+ self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
+ else:
+ self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
+ self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
+
+ def close(self) -> None:
+ "Close fdcon."
+ if self._fdcon is not None:
+ os.close(self._fdcon)
+
+ def read(self) -> Iterable[KeyPress]:
+ """
+ Return a list of `KeyPress` instances. It won't return anything when
+ there was nothing to read. (This function doesn't block.)
+
+ http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
+ """
+ max_count = 2048 # Max events to read at the same time.
+
+ read = DWORD(0)
+ arrtype = INPUT_RECORD * max_count
+ input_records = arrtype()
+
+ # Check whether there is some input to read. `ReadConsoleInputW` would
+ # block otherwise.
+ # (Actually, the event loop is responsible to make sure that this
+ # function is only called when there is something to read, but for some
+ # reason this happened in the asyncio_win32 loop, and it's better to be
+ # safe anyway.)
+ if not wait_for_handles([self.handle], timeout=0):
+ return
+
+ # Get next batch of input event.
+ windll.kernel32.ReadConsoleInputW(
+ self.handle, pointer(input_records), max_count, pointer(read)
+ )
+
+ # First, get all the keys from the input buffer, in order to determine
+ # whether we should consider this a paste event or not.
+ all_keys = list(self._get_keys(read, input_records))
+
+ # Fill in 'data' for key presses.
+ all_keys = [self._insert_key_data(key) for key in all_keys]
+
+ # Correct non-bmp characters that are passed as separate surrogate codes
+ all_keys = list(self._merge_paired_surrogates(all_keys))
+
+ if self.recognize_paste and self._is_paste(all_keys):
+ gen = iter(all_keys)
+ k: Optional[KeyPress]
+
+ for k in gen:
+ # Pasting: if the current key consists of text or \n, turn it
+ # into a BracketedPaste.
+ data = []
+ while k and (
+ not isinstance(k.key, Keys)
+ or k.key in {Keys.ControlJ, Keys.ControlM}
+ ):
+ data.append(k.data)
+ try:
+ k = next(gen)
+ except StopIteration:
+ k = None
+
+ if data:
+ yield KeyPress(Keys.BracketedPaste, "".join(data))
+ if k is not None:
+ yield k
+ else:
+ yield from all_keys
+
+ def _insert_key_data(self, key_press: KeyPress) -> KeyPress:
+ """
+ Insert KeyPress data, for vt100 compatibility.
+ """
+ if key_press.data:
+ return key_press
+
+ if isinstance(key_press.key, Keys):
+ data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "")
+ else:
+ data = ""
+
+ return KeyPress(key_press.key, data)
+
+ def _get_keys(
+ self, read: DWORD, input_records: "Array[INPUT_RECORD]"
+ ) -> Iterator[KeyPress]:
+ """
+ Generator that yields `KeyPress` objects from the input records.
+ """
+ for i in range(read.value):
+ ir = input_records[i]
+
+ # Get the right EventType from the EVENT_RECORD.
+ # (For some reason the Windows console application 'cmder'
+ # [http://gooseberrycreative.com/cmder/] can return '0' for
+ # ir.EventType. -- Just ignore that.)
+ if ir.EventType in EventTypes:
+ ev = getattr(ir.Event, EventTypes[ir.EventType])
+
+ # Process if this is a key event. (We also have mouse, menu and
+ # focus events.)
+ if type(ev) == KEY_EVENT_RECORD and ev.KeyDown:
+ yield from self._event_to_key_presses(ev)
+
+ elif type(ev) == MOUSE_EVENT_RECORD:
+ yield from self._handle_mouse(ev)
+
+ @staticmethod
+ def _merge_paired_surrogates(key_presses: List[KeyPress]) -> Iterator[KeyPress]:
+ """
+ Combines consecutive KeyPresses with high and low surrogates into
+ single characters
+ """
+ buffered_high_surrogate = None
+ for key in key_presses:
+ is_text = not isinstance(key.key, Keys)
+ is_high_surrogate = is_text and "\uD800" <= key.key <= "\uDBFF"
+ is_low_surrogate = is_text and "\uDC00" <= key.key <= "\uDFFF"
+
+ if buffered_high_surrogate:
+ if is_low_surrogate:
+ # convert high surrogate + low surrogate to single character
+ fullchar = (
+ (buffered_high_surrogate.key + key.key)
+ .encode("utf-16-le", "surrogatepass")
+ .decode("utf-16-le")
+ )
+ key = KeyPress(fullchar, fullchar)
+ else:
+ yield buffered_high_surrogate
+ buffered_high_surrogate = None
+
+ if is_high_surrogate:
+ buffered_high_surrogate = key
+ else:
+ yield key
+
+ if buffered_high_surrogate:
+ yield buffered_high_surrogate
+
+ @staticmethod
+ def _is_paste(keys: List[KeyPress]) -> bool:
+ """
+ Return `True` when we should consider this list of keys as a paste
+ event. Pasted text on windows will be turned into a
+ `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably
+ the best possible way to detect pasting of text and handle that
+ correctly.)
+ """
+ # Consider paste when it contains at least one newline and at least one
+ # other character.
+ text_count = 0
+ newline_count = 0
+
+ for k in keys:
+ if not isinstance(k.key, Keys):
+ text_count += 1
+ if k.key == Keys.ControlM:
+ newline_count += 1
+
+ return newline_count >= 1 and text_count >= 1
+
+ def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> List[KeyPress]:
+ """
+ For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances.
+ """
+ assert type(ev) == KEY_EVENT_RECORD and ev.KeyDown
+
+ result: Optional[KeyPress] = None
+
+ control_key_state = ev.ControlKeyState
+ u_char = ev.uChar.UnicodeChar
+ # Use surrogatepass because u_char may be an unmatched surrogate
+ ascii_char = u_char.encode("utf-8", "surrogatepass")
+
+ # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the
+ # unicode code point truncated to 1 byte. See also:
+ # https://github.com/ipython/ipython/issues/10004
+ # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389
+
+ if u_char == "\x00":
+ if ev.VirtualKeyCode in self.keycodes:
+ result = KeyPress(self.keycodes[ev.VirtualKeyCode], "")
+ else:
+ if ascii_char in self.mappings:
+ if self.mappings[ascii_char] == Keys.ControlJ:
+ u_char = (
+ "\n" # Windows sends \n, turn into \r for unix compatibility.
+ )
+ result = KeyPress(self.mappings[ascii_char], u_char)
+ else:
+ result = KeyPress(u_char, u_char)
+
+ # First we handle Shift-Control-Arrow/Home/End (need to do this first)
+ if (
+ (
+ control_key_state & self.LEFT_CTRL_PRESSED
+ or control_key_state & self.RIGHT_CTRL_PRESSED
+ )
+ and control_key_state & self.SHIFT_PRESSED
+ and result
+ ):
+ mapping: Dict[str, str] = {
+ Keys.Left: Keys.ControlShiftLeft,
+ Keys.Right: Keys.ControlShiftRight,
+ Keys.Up: Keys.ControlShiftUp,
+ Keys.Down: Keys.ControlShiftDown,
+ Keys.Home: Keys.ControlShiftHome,
+ Keys.End: Keys.ControlShiftEnd,
+ Keys.Insert: Keys.ControlShiftInsert,
+ Keys.PageUp: Keys.ControlShiftPageUp,
+ Keys.PageDown: Keys.ControlShiftPageDown,
+ }
+ result.key = mapping.get(result.key, result.key)
+
+ # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys.
+ if (
+ control_key_state & self.LEFT_CTRL_PRESSED
+ or control_key_state & self.RIGHT_CTRL_PRESSED
+ ) and result:
+ mapping = {
+ Keys.Left: Keys.ControlLeft,
+ Keys.Right: Keys.ControlRight,
+ Keys.Up: Keys.ControlUp,
+ Keys.Down: Keys.ControlDown,
+ Keys.Home: Keys.ControlHome,
+ Keys.End: Keys.ControlEnd,
+ Keys.Insert: Keys.ControlInsert,
+ Keys.Delete: Keys.ControlDelete,
+ Keys.PageUp: Keys.ControlPageUp,
+ Keys.PageDown: Keys.ControlPageDown,
+ }
+ result.key = mapping.get(result.key, result.key)
+
+ # Turn 'Tab' into 'BackTab' when shift was pressed.
+ # Also handle other shift-key combination
+ if control_key_state & self.SHIFT_PRESSED and result:
+ mapping = {
+ Keys.Tab: Keys.BackTab,
+ Keys.Left: Keys.ShiftLeft,
+ Keys.Right: Keys.ShiftRight,
+ Keys.Up: Keys.ShiftUp,
+ Keys.Down: Keys.ShiftDown,
+ Keys.Home: Keys.ShiftHome,
+ Keys.End: Keys.ShiftEnd,
+ Keys.Insert: Keys.ShiftInsert,
+ Keys.Delete: Keys.ShiftDelete,
+ Keys.PageUp: Keys.ShiftPageUp,
+ Keys.PageDown: Keys.ShiftPageDown,
+ }
+ result.key = mapping.get(result.key, result.key)
+
+ # Turn 'Space' into 'ControlSpace' when control was pressed.
+ if (
+ (
+ control_key_state & self.LEFT_CTRL_PRESSED
+ or control_key_state & self.RIGHT_CTRL_PRESSED
+ )
+ and result
+ and result.data == " "
+ ):
+ result = KeyPress(Keys.ControlSpace, " ")
+
+ # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot
+ # detect this combination. But it's really practical on Windows.)
+ if (
+ (
+ control_key_state & self.LEFT_CTRL_PRESSED
+ or control_key_state & self.RIGHT_CTRL_PRESSED
+ )
+ and result
+ and result.key == Keys.ControlJ
+ ):
+ return [KeyPress(Keys.Escape, ""), result]
+
+ # Return result. If alt was pressed, prefix the result with an
+ # 'Escape' key, just like unix VT100 terminals do.
+
+ # NOTE: Only replace the left alt with escape. The right alt key often
+ # acts as altgr and is used in many non US keyboard layouts for
+ # typing some special characters, like a backslash. We don't want
+ # all backslashes to be prefixed with escape. (Esc-\ has a
+ # meaning in E-macs, for instance.)
+ if result:
+ meta_pressed = control_key_state & self.LEFT_ALT_PRESSED
+
+ if meta_pressed:
+ return [KeyPress(Keys.Escape, ""), result]
+ else:
+ return [result]
+
+ else:
+ return []
+
+ def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> List[KeyPress]:
+ """
+ Handle mouse events. Return a list of KeyPress instances.
+ """
+ event_flags = ev.EventFlags
+ button_state = ev.ButtonState
+
+ event_type: Optional[MouseEventType] = None
+ button: MouseButton = MouseButton.NONE
+
+ # Scroll events.
+ if event_flags & MOUSE_WHEELED:
+ if button_state > 0:
+ event_type = MouseEventType.SCROLL_UP
+ else:
+ event_type = MouseEventType.SCROLL_DOWN
+ else:
+ # Handle button state for non-scroll events.
+ if button_state == FROM_LEFT_1ST_BUTTON_PRESSED:
+ button = MouseButton.LEFT
+
+ elif button_state == RIGHTMOST_BUTTON_PRESSED:
+ button = MouseButton.RIGHT
+
+ # Move events.
+ if event_flags & MOUSE_MOVED:
+ event_type = MouseEventType.MOUSE_MOVE
+
+ # No key pressed anymore: mouse up.
+ if event_type is None:
+ if button_state > 0:
+ # Some button pressed.
+ event_type = MouseEventType.MOUSE_DOWN
+ else:
+ # No button pressed.
+ event_type = MouseEventType.MOUSE_UP
+
+ data = ";".join(
+ [
+ button.value,
+ event_type.value,
+ str(ev.MousePosition.X),
+ str(ev.MousePosition.Y),
+ ]
+ )
+ return [KeyPress(Keys.WindowsMouseEvent, data)]
+
+
+class _Win32Handles:
+ """
+ Utility to keep track of which handles are connectod to which callbacks.
+
+ `add_win32_handle` starts a tiny event loop in another thread which waits
+ for the Win32 handle to become ready. When this happens, the callback will
+ be called in the current asyncio event loop using `call_soon_threadsafe`.
+
+ `remove_win32_handle` will stop this tiny event loop.
+
+ NOTE: We use this technique, so that we don't have to use the
+ `ProactorEventLoop` on Windows and we can wait for things like stdin
+ in a `SelectorEventLoop`. This is important, because our inputhook
+ mechanism (used by IPython), only works with the `SelectorEventLoop`.
+ """
+
+ def __init__(self) -> None:
+ self._handle_callbacks: Dict[int, Callable[[], None]] = {}
+
+ # Windows Events that are triggered when we have to stop watching this
+ # handle.
+ self._remove_events: Dict[int, HANDLE] = {}
+
+ def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None:
+ """
+ Add a Win32 handle to the event loop.
+ """
+ handle_value = handle.value
+
+ if handle_value is None:
+ raise ValueError("Invalid handle.")
+
+ # Make sure to remove a previous registered handler first.
+ self.remove_win32_handle(handle)
+
+ loop = get_event_loop()
+ self._handle_callbacks[handle_value] = callback
+
+ # Create remove event.
+ remove_event = create_win32_event()
+ self._remove_events[handle_value] = remove_event
+
+ # Add reader.
+ def ready() -> None:
+ # Tell the callback that input's ready.
+ try:
+ callback()
+ finally:
+ run_in_executor_with_context(wait, loop=loop)
+
+ # Wait for the input to become ready.
+ # (Use an executor for this, the Windows asyncio event loop doesn't
+ # allow us to wait for handles like stdin.)
+ def wait() -> None:
+ # Wait until either the handle becomes ready, or the remove event
+ # has been set.
+ result = wait_for_handles([remove_event, handle])
+
+ if result is remove_event:
+ windll.kernel32.CloseHandle(remove_event)
+ return
+ else:
+ loop.call_soon_threadsafe(ready)
+
+ run_in_executor_with_context(wait, loop=loop)
+
+ def remove_win32_handle(self, handle: HANDLE) -> Optional[Callable[[], None]]:
+ """
+ Remove a Win32 handle from the event loop.
+ Return either the registered handler or `None`.
+ """
+ if handle.value is None:
+ return None # Ignore.
+
+ # Trigger remove events, so that the reader knows to stop.
+ try:
+ event = self._remove_events.pop(handle.value)
+ except KeyError:
+ pass
+ else:
+ windll.kernel32.SetEvent(event)
+
+ try:
+ return self._handle_callbacks.pop(handle.value)
+ except KeyError:
+ return None
+
+
+@contextmanager
+def attach_win32_input(
+ input: _Win32InputBase, callback: Callable[[], None]
+) -> Iterator[None]:
+ """
+ Context manager that makes this input active in the current event loop.
+
+ :param input: :class:`~prompt_toolkit.input.Input` object.
+ :param input_ready_callback: Called when the input is ready to read.
+ """
+ win32_handles = input.win32_handles
+ handle = input.handle
+
+ if handle.value is None:
+ raise ValueError("Invalid handle.")
+
+ # Add reader.
+ previous_callback = win32_handles.remove_win32_handle(handle)
+ win32_handles.add_win32_handle(handle, callback)
+
+ try:
+ yield
+ finally:
+ win32_handles.remove_win32_handle(handle)
+
+ if previous_callback:
+ win32_handles.add_win32_handle(handle, previous_callback)
+
+
+@contextmanager
+def detach_win32_input(input: _Win32InputBase) -> Iterator[None]:
+ win32_handles = input.win32_handles
+ handle = input.handle
+
+ if handle.value is None:
+ raise ValueError("Invalid handle.")
+
+ previous_callback = win32_handles.remove_win32_handle(handle)
+
+ try:
+ yield
+ finally:
+ if previous_callback:
+ win32_handles.add_win32_handle(handle, previous_callback)
+
+
+class raw_mode:
+ """
+ ::
+
+ with raw_mode(stdin):
+ ''' the windows terminal is now in 'raw' mode. '''
+
+ The ``fileno`` attribute is ignored. This is to be compatible with the
+ `raw_input` method of `.vt100_input`.
+ """
+
+ def __init__(self, fileno: Optional[int] = None) -> None:
+ self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
+
+ def __enter__(self) -> None:
+ # Remember original mode.
+ original_mode = DWORD()
+ windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode))
+ self.original_mode = original_mode
+
+ self._patch()
+
+ def _patch(self) -> None:
+ # Set raw
+ ENABLE_ECHO_INPUT = 0x0004
+ ENABLE_LINE_INPUT = 0x0002
+ ENABLE_PROCESSED_INPUT = 0x0001
+
+ windll.kernel32.SetConsoleMode(
+ self.handle,
+ self.original_mode.value
+ & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
+ )
+
+ def __exit__(self, *a: object) -> None:
+ # Restore original mode
+ windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
+
+
+class cooked_mode(raw_mode):
+ """
+ ::
+
+ with cooked_mode(stdin):
+ ''' The pseudo-terminal stdin is now used in cooked mode. '''
+ """
+
+ def _patch(self) -> None:
+ # Set cooked.
+ ENABLE_ECHO_INPUT = 0x0004
+ ENABLE_LINE_INPUT = 0x0002
+ ENABLE_PROCESSED_INPUT = 0x0001
+
+ windll.kernel32.SetConsoleMode(
+ self.handle,
+ self.original_mode.value
+ | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
+ )
diff --git a/src/prompt_toolkit/input/win32_pipe.py b/src/prompt_toolkit/input/win32_pipe.py
new file mode 100644
index 0000000..ebee207
--- /dev/null
+++ b/src/prompt_toolkit/input/win32_pipe.py
@@ -0,0 +1,154 @@
+import sys
+
+assert sys.platform == "win32"
+
+from contextlib import contextmanager
+from ctypes import windll
+from ctypes.wintypes import HANDLE
+from typing import Callable, ContextManager, Iterator, List
+
+from prompt_toolkit.eventloop.win32 import create_win32_event
+
+from ..key_binding import KeyPress
+from ..utils import DummyContext
+from .base import PipeInput
+from .vt100_parser import Vt100Parser
+from .win32 import _Win32InputBase, attach_win32_input, detach_win32_input
+
+__all__ = ["Win32PipeInput"]
+
+
+class Win32PipeInput(_Win32InputBase, PipeInput):
+ """
+ This is an input pipe that works on Windows.
+ Text or bytes can be feed into the pipe, and key strokes can be read from
+ the pipe. This is useful if we want to send the input programmatically into
+ the application. Mostly useful for unit testing.
+
+ Notice that even though it's Windows, we use vt100 escape sequences over
+ the pipe.
+
+ Usage::
+
+ input = Win32PipeInput()
+ input.send_text('inputdata')
+ """
+
+ _id = 0
+
+ def __init__(self, _event: HANDLE) -> None:
+ super().__init__()
+ # Event (handle) for registering this input in the event loop.
+ # This event is set when there is data available to read from the pipe.
+ # Note: We use this approach instead of using a regular pipe, like
+ # returned from `os.pipe()`, because making such a regular pipe
+ # non-blocking is tricky and this works really well.
+ self._event = create_win32_event()
+
+ self._closed = False
+
+ # Parser for incoming keys.
+ self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects.
+ self.vt100_parser = Vt100Parser(lambda key: self._buffer.append(key))
+
+ # Identifier for every PipeInput for the hash.
+ self.__class__._id += 1
+ self._id = self.__class__._id
+
+ @classmethod
+ @contextmanager
+ def create(cls) -> Iterator["Win32PipeInput"]:
+ event = create_win32_event()
+ try:
+ yield Win32PipeInput(_event=event)
+ finally:
+ windll.kernel32.CloseHandle(event)
+
+ @property
+ def closed(self) -> bool:
+ return self._closed
+
+ def fileno(self) -> int:
+ """
+ The windows pipe doesn't depend on the file handle.
+ """
+ raise NotImplementedError
+
+ @property
+ def handle(self) -> HANDLE:
+ "The handle used for registering this pipe in the event loop."
+ return self._event
+
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ """
+ Return a context manager that makes this input active in the current
+ event loop.
+ """
+ return attach_win32_input(self, input_ready_callback)
+
+ def detach(self) -> ContextManager[None]:
+ """
+ Return a context manager that makes sure that this input is not active
+ in the current event loop.
+ """
+ return detach_win32_input(self)
+
+ def read_keys(self) -> List[KeyPress]:
+ "Read list of KeyPress."
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+
+ # Reset event.
+ if not self._closed:
+ # (If closed, the event should not reset.)
+ windll.kernel32.ResetEvent(self._event)
+
+ return result
+
+ def flush_keys(self) -> List[KeyPress]:
+ """
+ Flush pending keys and return them.
+ (Used for flushing the 'escape' key.)
+ """
+ # Flush all pending keys. (This is most important to flush the vt100
+ # 'Escape' key early when nothing else follows.)
+ self.vt100_parser.flush()
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+ return result
+
+ def send_bytes(self, data: bytes) -> None:
+ "Send bytes to the input."
+ self.send_text(data.decode("utf-8", "ignore"))
+
+ def send_text(self, text: str) -> None:
+ "Send text to the input."
+ if self._closed:
+ raise ValueError("Attempt to write into a closed pipe.")
+
+ # Pass it through our vt100 parser.
+ self.vt100_parser.feed(text)
+
+ # Set event.
+ windll.kernel32.SetEvent(self._event)
+
+ def raw_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def close(self) -> None:
+ "Close write-end of the pipe."
+ self._closed = True
+ windll.kernel32.SetEvent(self._event)
+
+ def typeahead_hash(self) -> str:
+ """
+ This needs to be unique for every `PipeInput`.
+ """
+ return f"pipe-input-{self._id}"