""" Parser for VT100 input stream. """ import re from typing import Callable, Dict, Generator, Tuple, Union from ..key_binding.key_processor import KeyPress from ..keys import Keys from .ansi_escape_sequences import ANSI_SEQUENCES __all__ = [ "Vt100Parser", ] # Regex matching any CPR response # (Note that we use '\Z' instead of '$', because '$' could include a trailing # newline.) _cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") # Mouse events: # Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M" _mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"( bool: # (hard coded) If this could be a prefix of a CPR response, return # True. if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( prefix ): result = True else: # If this could be a prefix of anything else, also return True. result = any( v for k, v in ANSI_SEQUENCES.items() if k.startswith(prefix) and k != prefix ) self[prefix] = result return result _IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() class Vt100Parser: """ Parser for VT100 input stream. Data can be fed through the `feed` method and the given callback will be called with KeyPress objects. :: def callback(key): pass i = Vt100Parser(callback) i.feed('data\x01...') :attr feed_key_callback: Function that will be called when a key is parsed. """ # Lookup table of ANSI escape sequences for a VT100 terminal # Hint: in order to know what sequences your terminal writes to stdin, run # "od -c" and start typing. def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: self.feed_key_callback = feed_key_callback self.reset() def reset(self, request: bool = False) -> None: self._in_bracketed_paste = False self._start_parser() def _start_parser(self) -> None: """ Start the parser coroutine. """ self._input_parser = self._input_parser_generator() self._input_parser.send(None) # type: ignore def _get_match(self, prefix: str) -> Union[None, Keys, Tuple[Keys, ...]]: """ Return the key (or keys) that maps to this prefix. """ # (hard coded) If we match a CPR response, return Keys.CPRResponse. # (This one doesn't fit in the ANSI_SEQUENCES, because it contains # integer variables.) if _cpr_response_re.match(prefix): return Keys.CPRResponse elif _mouse_event_re.match(prefix): return Keys.Vt100MouseEvent # Otherwise, use the mappings. try: return ANSI_SEQUENCES[prefix] except KeyError: return None def _input_parser_generator(self) -> Generator[None, Union[str, _Flush], None]: """ Coroutine (state machine) for the input parser. """ prefix = "" retry = False flush = False while True: flush = False if retry: retry = False else: # Get next character. c = yield if isinstance(c, _Flush): flush = True else: prefix += c # If we have some data, check for matches. if prefix: is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] match = self._get_match(prefix) # Exact matches found, call handlers.. if (flush or not is_prefix_of_longer_match) and match: self._call_handler(match, prefix) prefix = "" # No exact match found. elif (flush or not is_prefix_of_longer_match) and not match: found = False retry = True # Loop over the input, try the longest match first and # shift. for i in range(len(prefix), 0, -1): match = self._get_match(prefix[:i]) if match: self._call_handler(match, prefix[:i]) prefix = prefix[i:] found = True if not found: self._call_handler(prefix[0], prefix[0]) prefix = prefix[1:] def _call_handler( self, key: Union[str, Keys, Tuple[Keys, ...]], insert_text: str ) -> None: """ Callback to handler. """ if isinstance(key, tuple): # Received ANSI sequence that corresponds with multiple keys # (probably alt+something). Handle keys individually, but only pass # data payload to first KeyPress (so that we won't insert it # multiple times). for i, k in enumerate(key): self._call_handler(k, insert_text if i == 0 else "") else: if key == Keys.BracketedPaste: self._in_bracketed_paste = True self._paste_buffer = "" else: self.feed_key_callback(KeyPress(key, insert_text)) def feed(self, data: str) -> None: """ Feed the input stream. :param data: Input string (unicode). """ # Handle bracketed paste. (We bypass the parser that matches all other # key presses and keep reading input until we see the end mark.) # This is much faster then parsing character by character. if self._in_bracketed_paste: self._paste_buffer += data end_mark = "\x1b[201~" if end_mark in self._paste_buffer: end_index = self._paste_buffer.index(end_mark) # Feed content to key bindings. paste_content = self._paste_buffer[:end_index] self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) # Quit bracketed paste mode and handle remaining input. self._in_bracketed_paste = False remaining = self._paste_buffer[end_index + len(end_mark) :] self._paste_buffer = "" self.feed(remaining) # Handle normal input character by character. else: for i, c in enumerate(data): if self._in_bracketed_paste: # Quit loop and process from this position when the parser # entered bracketed paste. self.feed(data[i:]) break else: self._input_parser.send(c) def flush(self) -> None: """ Flush the buffer of the input stream. This will allow us to handle the escape key (or maybe meta) sooner. The input received by the escape key is actually the same as the first characters of e.g. Arrow-Up, so without knowing what follows the escape sequence, we don't know whether escape has been pressed, or whether it's something else. This flush function should be called after a timeout, and processes everything that's still in the buffer as-is, so without assuming any characters will follow. """ self._input_parser.send(_Flush()) def feed_and_flush(self, data: str) -> None: """ Wrapper around ``feed`` and ``flush``. """ self.feed(data) self.flush()