summaryrefslogtreecommitdiffstats
path: root/src/prompt_toolkit/key_binding/key_processor.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/prompt_toolkit/key_binding/key_processor.py')
-rw-r--r--src/prompt_toolkit/key_binding/key_processor.py529
1 files changed, 529 insertions, 0 deletions
diff --git a/src/prompt_toolkit/key_binding/key_processor.py b/src/prompt_toolkit/key_binding/key_processor.py
new file mode 100644
index 0000000..4c4f0d1
--- /dev/null
+++ b/src/prompt_toolkit/key_binding/key_processor.py
@@ -0,0 +1,529 @@
+"""
+An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from
+the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance.
+
+The `KeyProcessor` will according to the implemented keybindings call the
+correct callbacks when new key presses are feed through `feed`.
+"""
+from __future__ import annotations
+
+import weakref
+from asyncio import Task, sleep
+from collections import deque
+from typing import TYPE_CHECKING, Any, Generator
+
+from prompt_toolkit.application.current import get_app
+from prompt_toolkit.enums import EditingMode
+from prompt_toolkit.filters.app import vi_navigation_mode
+from prompt_toolkit.keys import Keys
+from prompt_toolkit.utils import Event
+
+from .key_bindings import Binding, KeyBindingsBase
+
+if TYPE_CHECKING:
+ from prompt_toolkit.application import Application
+ from prompt_toolkit.buffer import Buffer
+
+
+__all__ = [
+ "KeyProcessor",
+ "KeyPress",
+ "KeyPressEvent",
+]
+
+
+class KeyPress:
+ """
+ :param key: A `Keys` instance or text (one character).
+ :param data: The received string on stdin. (Often vt100 escape codes.)
+ """
+
+ def __init__(self, key: Keys | str, data: str | None = None) -> None:
+ assert isinstance(key, Keys) or len(key) == 1
+
+ if data is None:
+ if isinstance(key, Keys):
+ data = key.value
+ else:
+ data = key # 'key' is a one character string.
+
+ self.key = key
+ self.data = data
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}(key={self.key!r}, data={self.data!r})"
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, KeyPress):
+ return False
+ return self.key == other.key and self.data == other.data
+
+
+"""
+Helper object to indicate flush operation in the KeyProcessor.
+NOTE: the implementation is very similar to the VT100 parser.
+"""
+_Flush = KeyPress("?", data="_Flush")
+
+
+class KeyProcessor:
+ """
+ Statemachine that receives :class:`KeyPress` instances and according to the
+ key bindings in the given :class:`KeyBindings`, calls the matching handlers.
+
+ ::
+
+ p = KeyProcessor(key_bindings)
+
+ # Send keys into the processor.
+ p.feed(KeyPress(Keys.ControlX, '\x18'))
+ p.feed(KeyPress(Keys.ControlC, '\x03')
+
+ # Process all the keys in the queue.
+ p.process_keys()
+
+ # Now the ControlX-ControlC callback will be called if this sequence is
+ # registered in the key bindings.
+
+ :param key_bindings: `KeyBindingsBase` instance.
+ """
+
+ def __init__(self, key_bindings: KeyBindingsBase) -> None:
+ self._bindings = key_bindings
+
+ self.before_key_press = Event(self)
+ self.after_key_press = Event(self)
+
+ self._flush_wait_task: Task[None] | None = None
+
+ self.reset()
+
+ def reset(self) -> None:
+ self._previous_key_sequence: list[KeyPress] = []
+ self._previous_handler: Binding | None = None
+
+ # The queue of keys not yet send to our _process generator/state machine.
+ self.input_queue: deque[KeyPress] = deque()
+
+ # The key buffer that is matched in the generator state machine.
+ # (This is at at most the amount of keys that make up for one key binding.)
+ self.key_buffer: list[KeyPress] = []
+
+ #: Readline argument (for repetition of commands.)
+ #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html
+ self.arg: str | None = None
+
+ # Start the processor coroutine.
+ self._process_coroutine = self._process()
+ self._process_coroutine.send(None) # type: ignore
+
+ def _get_matches(self, key_presses: list[KeyPress]) -> list[Binding]:
+ """
+ For a list of :class:`KeyPress` instances. Give the matching handlers
+ that would handle this.
+ """
+ keys = tuple(k.key for k in key_presses)
+
+ # Try match, with mode flag
+ return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()]
+
+ def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) -> bool:
+ """
+ For a list of :class:`KeyPress` instances. Return True if there is any
+ handler that is bound to a suffix of this keys.
+ """
+ keys = tuple(k.key for k in key_presses)
+
+ # Get the filters for all the key bindings that have a longer match.
+ # Note that we transform it into a `set`, because we don't care about
+ # the actual bindings and executing it more than once doesn't make
+ # sense. (Many key bindings share the same filter.)
+ filters = {
+ b.filter for b in self._bindings.get_bindings_starting_with_keys(keys)
+ }
+
+ # When any key binding is active, return True.
+ return any(f() for f in filters)
+
+ def _process(self) -> Generator[None, KeyPress, None]:
+ """
+ Coroutine implementing the key match algorithm. Key strokes are sent
+ into this generator, and it calls the appropriate handlers.
+ """
+ buffer = self.key_buffer
+ retry = False
+
+ while True:
+ flush = False
+
+ if retry:
+ retry = False
+ else:
+ key = yield
+ if key is _Flush:
+ flush = True
+ else:
+ buffer.append(key)
+
+ # If we have some key presses, check for matches.
+ if buffer:
+ matches = self._get_matches(buffer)
+
+ if flush:
+ is_prefix_of_longer_match = False
+ else:
+ is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer)
+
+ # When eager matches were found, give priority to them and also
+ # ignore all the longer matches.
+ eager_matches = [m for m in matches if m.eager()]
+
+ if eager_matches:
+ matches = eager_matches
+ is_prefix_of_longer_match = False
+
+ # Exact matches found, call handler.
+ if not is_prefix_of_longer_match and matches:
+ self._call_handler(matches[-1], key_sequence=buffer[:])
+ del buffer[:] # Keep reference.
+
+ # No match found.
+ elif not is_prefix_of_longer_match and not matches:
+ retry = True
+ found = False
+
+ # Loop over the input, try longest match first and shift.
+ for i in range(len(buffer), 0, -1):
+ matches = self._get_matches(buffer[:i])
+ if matches:
+ self._call_handler(matches[-1], key_sequence=buffer[:i])
+ del buffer[:i]
+ found = True
+ break
+
+ if not found:
+ del buffer[:1]
+
+ def feed(self, key_press: KeyPress, first: bool = False) -> None:
+ """
+ Add a new :class:`KeyPress` to the input queue.
+ (Don't forget to call `process_keys` in order to process the queue.)
+
+ :param first: If true, insert before everything else.
+ """
+ if first:
+ self.input_queue.appendleft(key_press)
+ else:
+ self.input_queue.append(key_press)
+
+ def feed_multiple(self, key_presses: list[KeyPress], first: bool = False) -> None:
+ """
+ :param first: If true, insert before everything else.
+ """
+ if first:
+ self.input_queue.extendleft(reversed(key_presses))
+ else:
+ self.input_queue.extend(key_presses)
+
+ def process_keys(self) -> None:
+ """
+ Process all the keys in the `input_queue`.
+ (To be called after `feed`.)
+
+ Note: because of the `feed`/`process_keys` separation, it is
+ possible to call `feed` from inside a key binding.
+ This function keeps looping until the queue is empty.
+ """
+ app = get_app()
+
+ def not_empty() -> bool:
+ # When the application result is set, stop processing keys. (E.g.
+ # if ENTER was received, followed by a few additional key strokes,
+ # leave the other keys in the queue.)
+ if app.is_done:
+ # But if there are still CPRResponse keys in the queue, these
+ # need to be processed.
+ return any(k for k in self.input_queue if k.key == Keys.CPRResponse)
+ else:
+ return bool(self.input_queue)
+
+ def get_next() -> KeyPress:
+ if app.is_done:
+ # Only process CPR responses. Everything else is typeahead.
+ cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0]
+ self.input_queue.remove(cpr)
+ return cpr
+ else:
+ return self.input_queue.popleft()
+
+ is_flush = False
+
+ while not_empty():
+ # Process next key.
+ key_press = get_next()
+
+ is_flush = key_press is _Flush
+ is_cpr = key_press.key == Keys.CPRResponse
+
+ if not is_flush and not is_cpr:
+ self.before_key_press.fire()
+
+ try:
+ self._process_coroutine.send(key_press)
+ except Exception:
+ # If for some reason something goes wrong in the parser, (maybe
+ # an exception was raised) restart the processor for next time.
+ self.reset()
+ self.empty_queue()
+ raise
+
+ if not is_flush and not is_cpr:
+ self.after_key_press.fire()
+
+ # Skip timeout if the last key was flush.
+ if not is_flush:
+ self._start_timeout()
+
+ def empty_queue(self) -> list[KeyPress]:
+ """
+ Empty the input queue. Return the unprocessed input.
+ """
+ key_presses = list(self.input_queue)
+ self.input_queue.clear()
+
+ # Filter out CPRs. We don't want to return these.
+ key_presses = [k for k in key_presses if k.key != Keys.CPRResponse]
+ return key_presses
+
+ def _call_handler(self, handler: Binding, key_sequence: list[KeyPress]) -> None:
+ app = get_app()
+ was_recording_emacs = app.emacs_state.is_recording
+ was_recording_vi = bool(app.vi_state.recording_register)
+ was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode
+ arg = self.arg
+ self.arg = None
+
+ event = KeyPressEvent(
+ weakref.ref(self),
+ arg=arg,
+ key_sequence=key_sequence,
+ previous_key_sequence=self._previous_key_sequence,
+ is_repeat=(handler == self._previous_handler),
+ )
+
+ # Save the state of the current buffer.
+ if handler.save_before(event):
+ event.app.current_buffer.save_to_undo_stack()
+
+ # Call handler.
+ from prompt_toolkit.buffer import EditReadOnlyBuffer
+
+ try:
+ handler.call(event)
+ self._fix_vi_cursor_position(event)
+
+ except EditReadOnlyBuffer:
+ # When a key binding does an attempt to change a buffer which is
+ # read-only, we can ignore that. We sound a bell and go on.
+ app.output.bell()
+
+ if was_temporary_navigation_mode:
+ self._leave_vi_temp_navigation_mode(event)
+
+ self._previous_key_sequence = key_sequence
+ self._previous_handler = handler
+
+ # Record the key sequence in our macro. (Only if we're in macro mode
+ # before and after executing the key.)
+ if handler.record_in_macro():
+ if app.emacs_state.is_recording and was_recording_emacs:
+ recording = app.emacs_state.current_recording
+ if recording is not None: # Should always be true, given that
+ # `was_recording_emacs` is set.
+ recording.extend(key_sequence)
+
+ if app.vi_state.recording_register and was_recording_vi:
+ for k in key_sequence:
+ app.vi_state.current_recording += k.data
+
+ def _fix_vi_cursor_position(self, event: KeyPressEvent) -> None:
+ """
+ After every command, make sure that if we are in Vi navigation mode, we
+ never put the cursor after the last character of a line. (Unless it's
+ an empty line.)
+ """
+ app = event.app
+ buff = app.current_buffer
+ preferred_column = buff.preferred_column
+
+ if (
+ vi_navigation_mode()
+ and buff.document.is_cursor_at_the_end_of_line
+ and len(buff.document.current_line) > 0
+ ):
+ buff.cursor_position -= 1
+
+ # Set the preferred_column for arrow up/down again.
+ # (This was cleared after changing the cursor position.)
+ buff.preferred_column = preferred_column
+
+ def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) -> None:
+ """
+ If we're in Vi temporary navigation (normal) mode, return to
+ insert/replace mode after executing one action.
+ """
+ app = event.app
+
+ if app.editing_mode == EditingMode.VI:
+ # Not waiting for a text object and no argument has been given.
+ if app.vi_state.operator_func is None and self.arg is None:
+ app.vi_state.temporary_navigation_mode = False
+
+ def _start_timeout(self) -> None:
+ """
+ Start auto flush timeout. Similar to Vim's `timeoutlen` option.
+
+ Start a background coroutine with a timer. When this timeout expires
+ and no key was pressed in the meantime, we flush all data in the queue
+ and call the appropriate key binding handlers.
+ """
+ app = get_app()
+ timeout = app.timeoutlen
+
+ if timeout is None:
+ return
+
+ async def wait() -> None:
+ "Wait for timeout."
+ # This sleep can be cancelled. In that case we don't flush.
+ await sleep(timeout)
+
+ if len(self.key_buffer) > 0:
+ # (No keys pressed in the meantime.)
+ flush_keys()
+
+ def flush_keys() -> None:
+ "Flush keys."
+ self.feed(_Flush)
+ self.process_keys()
+
+ # Automatically flush keys.
+ if self._flush_wait_task:
+ self._flush_wait_task.cancel()
+ self._flush_wait_task = app.create_background_task(wait())
+
+ def send_sigint(self) -> None:
+ """
+ Send SIGINT. Immediately call the SIGINT key handler.
+ """
+ self.feed(KeyPress(key=Keys.SIGINT), first=True)
+ self.process_keys()
+
+
+class KeyPressEvent:
+ """
+ Key press event, delivered to key bindings.
+
+ :param key_processor_ref: Weak reference to the `KeyProcessor`.
+ :param arg: Repetition argument.
+ :param key_sequence: List of `KeyPress` instances.
+ :param previouskey_sequence: Previous list of `KeyPress` instances.
+ :param is_repeat: True when the previous event was delivered to the same handler.
+ """
+
+ def __init__(
+ self,
+ key_processor_ref: weakref.ReferenceType[KeyProcessor],
+ arg: str | None,
+ key_sequence: list[KeyPress],
+ previous_key_sequence: list[KeyPress],
+ is_repeat: bool,
+ ) -> None:
+ self._key_processor_ref = key_processor_ref
+ self.key_sequence = key_sequence
+ self.previous_key_sequence = previous_key_sequence
+
+ #: True when the previous key sequence was handled by the same handler.
+ self.is_repeat = is_repeat
+
+ self._arg = arg
+ self._app = get_app()
+
+ def __repr__(self) -> str:
+ return "KeyPressEvent(arg={!r}, key_sequence={!r}, is_repeat={!r})".format(
+ self.arg,
+ self.key_sequence,
+ self.is_repeat,
+ )
+
+ @property
+ def data(self) -> str:
+ return self.key_sequence[-1].data
+
+ @property
+ def key_processor(self) -> KeyProcessor:
+ processor = self._key_processor_ref()
+ if processor is None:
+ raise Exception("KeyProcessor was lost. This should not happen.")
+ return processor
+
+ @property
+ def app(self) -> Application[Any]:
+ """
+ The current `Application` object.
+ """
+ return self._app
+
+ @property
+ def current_buffer(self) -> Buffer:
+ """
+ The current buffer.
+ """
+ return self.app.current_buffer
+
+ @property
+ def arg(self) -> int:
+ """
+ Repetition argument.
+ """
+ if self._arg == "-":
+ return -1
+
+ result = int(self._arg or 1)
+
+ # Don't exceed a million.
+ if int(result) >= 1000000:
+ result = 1
+
+ return result
+
+ @property
+ def arg_present(self) -> bool:
+ """
+ True if repetition argument was explicitly provided.
+ """
+ return self._arg is not None
+
+ def append_to_arg_count(self, data: str) -> None:
+ """
+ Add digit to the input argument.
+
+ :param data: the typed digit as string
+ """
+ assert data in "-0123456789"
+ current = self._arg
+
+ if data == "-":
+ assert current is None or current == "-"
+ result = data
+ elif current is None:
+ result = data
+ else:
+ result = f"{current}{data}"
+
+ self.key_processor.arg = result
+
+ @property
+ def cli(self) -> Application[Any]:
+ "For backward-compatibility."
+ return self.app