From e106bf94eff07d9a59771d9ccc4406421e18ab64 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 19:35:20 +0200 Subject: Adding upstream version 3.0.36. Signed-off-by: Daniel Baumann --- src/prompt_toolkit/layout/processors.py | 1029 +++++++++++++++++++++++++++++++ 1 file changed, 1029 insertions(+) create mode 100644 src/prompt_toolkit/layout/processors.py (limited to 'src/prompt_toolkit/layout/processors.py') diff --git a/src/prompt_toolkit/layout/processors.py b/src/prompt_toolkit/layout/processors.py new file mode 100644 index 0000000..722658a --- /dev/null +++ b/src/prompt_toolkit/layout/processors.py @@ -0,0 +1,1029 @@ +""" +Processors are little transformation blocks that transform the fragments list +from a buffer before the BufferControl will render it to the screen. + +They can insert fragments before or after, or highlight fragments by replacing the +fragment types. +""" +import re +from abc import ABCMeta, abstractmethod +from typing import ( + TYPE_CHECKING, + Callable, + Hashable, + List, + Optional, + Tuple, + Type, + Union, + cast, +) + +from prompt_toolkit.application.current import get_app +from prompt_toolkit.cache import SimpleCache +from prompt_toolkit.document import Document +from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode +from prompt_toolkit.formatted_text import ( + AnyFormattedText, + StyleAndTextTuples, + to_formatted_text, +) +from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text +from prompt_toolkit.search import SearchDirection +from prompt_toolkit.utils import to_int, to_str + +from .utils import explode_text_fragments + +if TYPE_CHECKING: + from .controls import BufferControl, UIContent + +__all__ = [ + "Processor", + "TransformationInput", + "Transformation", + "DummyProcessor", + "HighlightSearchProcessor", + "HighlightIncrementalSearchProcessor", + "HighlightSelectionProcessor", + "PasswordProcessor", + "HighlightMatchingBracketProcessor", + "DisplayMultipleCursors", + "BeforeInput", + "ShowArg", + "AfterInput", + "AppendAutoSuggestion", + "ConditionalProcessor", + "ShowLeadingWhiteSpaceProcessor", + "ShowTrailingWhiteSpaceProcessor", + "TabsProcessor", + "ReverseSearchProcessor", + "DynamicProcessor", + "merge_processors", +] + + +class Processor(metaclass=ABCMeta): + """ + Manipulate the fragments for a given line in a + :class:`~prompt_toolkit.layout.controls.BufferControl`. + """ + + @abstractmethod + def apply_transformation( + self, transformation_input: "TransformationInput" + ) -> "Transformation": + """ + Apply transformation. Returns a :class:`.Transformation` instance. + + :param transformation_input: :class:`.TransformationInput` object. + """ + return Transformation(transformation_input.fragments) + + +SourceToDisplay = Callable[[int], int] +DisplayToSource = Callable[[int], int] + + +class TransformationInput: + """ + :param buffer_control: :class:`.BufferControl` instance. + :param lineno: The number of the line to which we apply the processor. + :param source_to_display: A function that returns the position in the + `fragments` for any position in the source string. (This takes + previous processors into account.) + :param fragments: List of fragments that we can transform. (Received from the + previous processor.) + """ + + def __init__( + self, + buffer_control: "BufferControl", + document: Document, + lineno: int, + source_to_display: SourceToDisplay, + fragments: StyleAndTextTuples, + width: int, + height: int, + ) -> None: + + self.buffer_control = buffer_control + self.document = document + self.lineno = lineno + self.source_to_display = source_to_display + self.fragments = fragments + self.width = width + self.height = height + + def unpack( + self, + ) -> Tuple[ + "BufferControl", Document, int, SourceToDisplay, StyleAndTextTuples, int, int + ]: + return ( + self.buffer_control, + self.document, + self.lineno, + self.source_to_display, + self.fragments, + self.width, + self.height, + ) + + +class Transformation: + """ + Transformation result, as returned by :meth:`.Processor.apply_transformation`. + + Important: Always make sure that the length of `document.text` is equal to + the length of all the text in `fragments`! + + :param fragments: The transformed fragments. To be displayed, or to pass to + the next processor. + :param source_to_display: Cursor position transformation from original + string to transformed string. + :param display_to_source: Cursor position transformed from source string to + original string. + """ + + def __init__( + self, + fragments: StyleAndTextTuples, + source_to_display: Optional[SourceToDisplay] = None, + display_to_source: Optional[DisplayToSource] = None, + ) -> None: + + self.fragments = fragments + self.source_to_display = source_to_display or (lambda i: i) + self.display_to_source = display_to_source or (lambda i: i) + + +class DummyProcessor(Processor): + """ + A `Processor` that doesn't do anything. + """ + + def apply_transformation( + self, transformation_input: TransformationInput + ) -> Transformation: + return Transformation(transformation_input.fragments) + + +class HighlightSearchProcessor(Processor): + """ + Processor that highlights search matches in the document. + Note that this doesn't support multiline search matches yet. + + The style classes 'search' and 'search.current' will be applied to the + content. + """ + + _classname = "search" + _classname_current = "search.current" + + def _get_search_text(self, buffer_control: "BufferControl") -> str: + """ + The text we are searching for. + """ + return buffer_control.search_state.text + + def apply_transformation( + self, transformation_input: TransformationInput + ) -> Transformation: + + ( + buffer_control, + document, + lineno, + source_to_display, + fragments, + _, + _, + ) = transformation_input.unpack() + + search_text = self._get_search_text(buffer_control) + searchmatch_fragment = f" class:{self._classname} " + searchmatch_current_fragment = f" class:{self._classname_current} " + + if search_text and not get_app().is_done: + # For each search match, replace the style string. + line_text = fragment_list_to_text(fragments) + fragments = explode_text_fragments(fragments) + + if buffer_control.search_state.ignore_case(): + flags = re.IGNORECASE + else: + flags = re.RegexFlag(0) + + # Get cursor column. + cursor_column: Optional[int] + if document.cursor_position_row == lineno: + cursor_column = source_to_display(document.cursor_position_col) + else: + cursor_column = None + + for match in re.finditer(re.escape(search_text), line_text, flags=flags): + if cursor_column is not None: + on_cursor = match.start() <= cursor_column < match.end() + else: + on_cursor = False + + for i in range(match.start(), match.end()): + old_fragment, text, *_ = fragments[i] + if on_cursor: + fragments[i] = ( + old_fragment + searchmatch_current_fragment, + fragments[i][1], + ) + else: + fragments[i] = ( + old_fragment + searchmatch_fragment, + fragments[i][1], + ) + + return Transformation(fragments) + + +class HighlightIncrementalSearchProcessor(HighlightSearchProcessor): + """ + Highlight the search terms that are used for highlighting the incremental + search. The style class 'incsearch' will be applied to the content. + + Important: this requires the `preview_search=True` flag to be set for the + `BufferControl`. Otherwise, the cursor position won't be set to the search + match while searching, and nothing happens. + """ + + _classname = "incsearch" + _classname_current = "incsearch.current" + + def _get_search_text(self, buffer_control: "BufferControl") -> str: + """ + The text we are searching for. + """ + # When the search buffer has focus, take that text. + search_buffer = buffer_control.search_buffer + if search_buffer is not None and search_buffer.text: + return search_buffer.text + return "" + + +class HighlightSelectionProcessor(Processor): + """ + Processor that highlights the selection in the document. + """ + + def apply_transformation( + self, transformation_input: TransformationInput + ) -> Transformation: + ( + buffer_control, + document, + lineno, + source_to_display, + fragments, + _, + _, + ) = transformation_input.unpack() + + selected_fragment = " class:selected " + + # In case of selection, highlight all matches. + selection_at_line = document.selection_range_at_line(lineno) + + if selection_at_line: + from_, to = selection_at_line + from_ = source_to_display(from_) + to = source_to_display(to) + + fragments = explode_text_fragments(fragments) + + if from_ == 0 and to == 0 and len(fragments) == 0: + # When this is an empty line, insert a space in order to + # visualise the selection. + return Transformation([(selected_fragment, " ")]) + else: + for i in range(from_, to): + if i < len(fragments): + old_fragment, old_text, *_ = fragments[i] + fragments[i] = (old_fragment + selected_fragment, old_text) + elif i == len(fragments): + fragments.append((selected_fragment, " ")) + + return Transformation(fragments) + + +class PasswordProcessor(Processor): + """ + Processor that masks the input. (For passwords.) + + :param char: (string) Character to be used. "*" by default. + """ + + def __init__(self, char: str = "*") -> None: + self.char = char + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + fragments: StyleAndTextTuples = cast( + StyleAndTextTuples, + [ + (style, self.char * len(text), *handler) + for style, text, *handler in ti.fragments + ], + ) + + return Transformation(fragments) + + +class HighlightMatchingBracketProcessor(Processor): + """ + When the cursor is on or right after a bracket, it highlights the matching + bracket. + + :param max_cursor_distance: Only highlight matching brackets when the + cursor is within this distance. (From inside a `Processor`, we can't + know which lines will be visible on the screen. But we also don't want + to scan the whole document for matching brackets on each key press, so + we limit to this value.) + """ + + _closing_braces = "])}>" + + def __init__( + self, chars: str = "[](){}<>", max_cursor_distance: int = 1000 + ) -> None: + self.chars = chars + self.max_cursor_distance = max_cursor_distance + + self._positions_cache: SimpleCache[ + Hashable, List[Tuple[int, int]] + ] = SimpleCache(maxsize=8) + + def _get_positions_to_highlight(self, document: Document) -> List[Tuple[int, int]]: + """ + Return a list of (row, col) tuples that need to be highlighted. + """ + pos: Optional[int] + + # Try for the character under the cursor. + if document.current_char and document.current_char in self.chars: + pos = document.find_matching_bracket_position( + start_pos=document.cursor_position - self.max_cursor_distance, + end_pos=document.cursor_position + self.max_cursor_distance, + ) + + # Try for the character before the cursor. + elif ( + document.char_before_cursor + and document.char_before_cursor in self._closing_braces + and document.char_before_cursor in self.chars + ): + document = Document(document.text, document.cursor_position - 1) + + pos = document.find_matching_bracket_position( + start_pos=document.cursor_position - self.max_cursor_distance, + end_pos=document.cursor_position + self.max_cursor_distance, + ) + else: + pos = None + + # Return a list of (row, col) tuples that need to be highlighted. + if pos: + pos += document.cursor_position # pos is relative. + row, col = document.translate_index_to_position(pos) + return [ + (row, col), + (document.cursor_position_row, document.cursor_position_col), + ] + else: + return [] + + def apply_transformation( + self, transformation_input: TransformationInput + ) -> Transformation: + + ( + buffer_control, + document, + lineno, + source_to_display, + fragments, + _, + _, + ) = transformation_input.unpack() + + # When the application is in the 'done' state, don't highlight. + if get_app().is_done: + return Transformation(fragments) + + # Get the highlight positions. + key = (get_app().render_counter, document.text, document.cursor_position) + positions = self._positions_cache.get( + key, lambda: self._get_positions_to_highlight(document) + ) + + # Apply if positions were found at this line. + if positions: + for row, col in positions: + if row == lineno: + col = source_to_display(col) + fragments = explode_text_fragments(fragments) + style, text, *_ = fragments[col] + + if col == document.cursor_position_col: + style += " class:matching-bracket.cursor " + else: + style += " class:matching-bracket.other " + + fragments[col] = (style, text) + + return Transformation(fragments) + + +class DisplayMultipleCursors(Processor): + """ + When we're in Vi block insert mode, display all the cursors. + """ + + def apply_transformation( + self, transformation_input: TransformationInput + ) -> Transformation: + + ( + buffer_control, + document, + lineno, + source_to_display, + fragments, + _, + _, + ) = transformation_input.unpack() + + buff = buffer_control.buffer + + if vi_insert_multiple_mode(): + cursor_positions = buff.multiple_cursor_positions + fragments = explode_text_fragments(fragments) + + # If any cursor appears on the current line, highlight that. + start_pos = document.translate_row_col_to_index(lineno, 0) + end_pos = start_pos + len(document.lines[lineno]) + + fragment_suffix = " class:multiple-cursors" + + for p in cursor_positions: + if start_pos <= p <= end_pos: + column = source_to_display(p - start_pos) + + # Replace fragment. + try: + style, text, *_ = fragments[column] + except IndexError: + # Cursor needs to be displayed after the current text. + fragments.append((fragment_suffix, " ")) + else: + style += fragment_suffix + fragments[column] = (style, text) + + return Transformation(fragments) + else: + return Transformation(fragments) + + +class BeforeInput(Processor): + """ + Insert text before the input. + + :param text: This can be either plain text or formatted text + (or a callable that returns any of those). + :param style: style to be applied to this prompt/prefix. + """ + + def __init__(self, text: AnyFormattedText, style: str = "") -> None: + self.text = text + self.style = style + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + source_to_display: Optional[SourceToDisplay] + display_to_source: Optional[DisplayToSource] + + if ti.lineno == 0: + # Get fragments. + fragments_before = to_formatted_text(self.text, self.style) + fragments = fragments_before + ti.fragments + + shift_position = fragment_list_len(fragments_before) + source_to_display = lambda i: i + shift_position + display_to_source = lambda i: i - shift_position + else: + fragments = ti.fragments + source_to_display = None + display_to_source = None + + return Transformation( + fragments, + source_to_display=source_to_display, + display_to_source=display_to_source, + ) + + def __repr__(self) -> str: + return f"BeforeInput({self.text!r}, {self.style!r})" + + +class ShowArg(BeforeInput): + """ + Display the 'arg' in front of the input. + + This was used by the `PromptSession`, but now it uses the + `Window.get_line_prefix` function instead. + """ + + def __init__(self) -> None: + super().__init__(self._get_text_fragments) + + def _get_text_fragments(self) -> StyleAndTextTuples: + app = get_app() + if app.key_processor.arg is None: + return [] + else: + arg = app.key_processor.arg + + return [ + ("class:prompt.arg", "(arg: "), + ("class:prompt.arg.text", str(arg)), + ("class:prompt.arg", ") "), + ] + + def __repr__(self) -> str: + return "ShowArg()" + + +class AfterInput(Processor): + """ + Insert text after the input. + + :param text: This can be either plain text or formatted text + (or a callable that returns any of those). + :param style: style to be applied to this prompt/prefix. + """ + + def __init__(self, text: AnyFormattedText, style: str = "") -> None: + self.text = text + self.style = style + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + # Insert fragments after the last line. + if ti.lineno == ti.document.line_count - 1: + # Get fragments. + fragments_after = to_formatted_text(self.text, self.style) + return Transformation(fragments=ti.fragments + fragments_after) + else: + return Transformation(fragments=ti.fragments) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})" + + +class AppendAutoSuggestion(Processor): + """ + Append the auto suggestion to the input. + (The user can then press the right arrow the insert the suggestion.) + """ + + def __init__(self, style: str = "class:auto-suggestion") -> None: + self.style = style + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + # Insert fragments after the last line. + if ti.lineno == ti.document.line_count - 1: + buffer = ti.buffer_control.buffer + + if buffer.suggestion and ti.document.is_cursor_at_the_end: + suggestion = buffer.suggestion.text + else: + suggestion = "" + + return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) + else: + return Transformation(fragments=ti.fragments) + + +class ShowLeadingWhiteSpaceProcessor(Processor): + """ + Make leading whitespace visible. + + :param get_char: Callable that returns one character. + """ + + def __init__( + self, + get_char: Optional[Callable[[], str]] = None, + style: str = "class:leading-whitespace", + ) -> None: + def default_get_char() -> str: + if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": + return "." + else: + return "\xb7" + + self.style = style + self.get_char = get_char or default_get_char + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + fragments = ti.fragments + + # Walk through all te fragments. + if fragments and fragment_list_to_text(fragments).startswith(" "): + t = (self.style, self.get_char()) + fragments = explode_text_fragments(fragments) + + for i in range(len(fragments)): + if fragments[i][1] == " ": + fragments[i] = t + else: + break + + return Transformation(fragments) + + +class ShowTrailingWhiteSpaceProcessor(Processor): + """ + Make trailing whitespace visible. + + :param get_char: Callable that returns one character. + """ + + def __init__( + self, + get_char: Optional[Callable[[], str]] = None, + style: str = "class:training-whitespace", + ) -> None: + def default_get_char() -> str: + if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": + return "." + else: + return "\xb7" + + self.style = style + self.get_char = get_char or default_get_char + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + fragments = ti.fragments + + if fragments and fragments[-1][1].endswith(" "): + t = (self.style, self.get_char()) + fragments = explode_text_fragments(fragments) + + # Walk backwards through all te fragments and replace whitespace. + for i in range(len(fragments) - 1, -1, -1): + char = fragments[i][1] + if char == " ": + fragments[i] = t + else: + break + + return Transformation(fragments) + + +class TabsProcessor(Processor): + """ + Render tabs as spaces (instead of ^I) or make them visible (for instance, + by replacing them with dots.) + + :param tabstop: Horizontal space taken by a tab. (`int` or callable that + returns an `int`). + :param char1: Character or callable that returns a character (text of + length one). This one is used for the first space taken by the tab. + :param char2: Like `char1`, but for the rest of the space. + """ + + def __init__( + self, + tabstop: Union[int, Callable[[], int]] = 4, + char1: Union[str, Callable[[], str]] = "|", + char2: Union[str, Callable[[], str]] = "\u2508", + style: str = "class:tab", + ) -> None: + + self.char1 = char1 + self.char2 = char2 + self.tabstop = tabstop + self.style = style + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + tabstop = to_int(self.tabstop) + style = self.style + + # Create separator for tabs. + separator1 = to_str(self.char1) + separator2 = to_str(self.char2) + + # Transform fragments. + fragments = explode_text_fragments(ti.fragments) + + position_mappings = {} + result_fragments: StyleAndTextTuples = [] + pos = 0 + + for i, fragment_and_text in enumerate(fragments): + position_mappings[i] = pos + + if fragment_and_text[1] == "\t": + # Calculate how many characters we have to insert. + count = tabstop - (pos % tabstop) + if count == 0: + count = tabstop + + # Insert tab. + result_fragments.append((style, separator1)) + result_fragments.append((style, separator2 * (count - 1))) + pos += count + else: + result_fragments.append(fragment_and_text) + pos += 1 + + position_mappings[len(fragments)] = pos + # Add `pos+1` to mapping, because the cursor can be right after the + # line as well. + position_mappings[len(fragments) + 1] = pos + 1 + + def source_to_display(from_position: int) -> int: + "Maps original cursor position to the new one." + return position_mappings[from_position] + + def display_to_source(display_pos: int) -> int: + "Maps display cursor position to the original one." + position_mappings_reversed = {v: k for k, v in position_mappings.items()} + + while display_pos >= 0: + try: + return position_mappings_reversed[display_pos] + except KeyError: + display_pos -= 1 + return 0 + + return Transformation( + result_fragments, + source_to_display=source_to_display, + display_to_source=display_to_source, + ) + + +class ReverseSearchProcessor(Processor): + """ + Process to display the "(reverse-i-search)`...`:..." stuff around + the search buffer. + + Note: This processor is meant to be applied to the BufferControl that + contains the search buffer, it's not meant for the original input. + """ + + _excluded_input_processors: List[Type[Processor]] = [ + HighlightSearchProcessor, + HighlightSelectionProcessor, + BeforeInput, + AfterInput, + ] + + def _get_main_buffer( + self, buffer_control: "BufferControl" + ) -> Optional["BufferControl"]: + from prompt_toolkit.layout.controls import BufferControl + + prev_control = get_app().layout.search_target_buffer_control + if ( + isinstance(prev_control, BufferControl) + and prev_control.search_buffer_control == buffer_control + ): + return prev_control + return None + + def _content( + self, main_control: "BufferControl", ti: TransformationInput + ) -> "UIContent": + from prompt_toolkit.layout.controls import BufferControl + + # Emulate the BufferControl through which we are searching. + # For this we filter out some of the input processors. + excluded_processors = tuple(self._excluded_input_processors) + + def filter_processor(item: Processor) -> Optional[Processor]: + """Filter processors from the main control that we want to disable + here. This returns either an accepted processor or None.""" + # For a `_MergedProcessor`, check each individual processor, recursively. + if isinstance(item, _MergedProcessor): + accepted_processors = [filter_processor(p) for p in item.processors] + return merge_processors( + [p for p in accepted_processors if p is not None] + ) + + # For a `ConditionalProcessor`, check the body. + elif isinstance(item, ConditionalProcessor): + p = filter_processor(item.processor) + if p: + return ConditionalProcessor(p, item.filter) + + # Otherwise, check the processor itself. + else: + if not isinstance(item, excluded_processors): + return item + + return None + + filtered_processor = filter_processor( + merge_processors(main_control.input_processors or []) + ) + highlight_processor = HighlightIncrementalSearchProcessor() + + if filtered_processor: + new_processors = [filtered_processor, highlight_processor] + else: + new_processors = [highlight_processor] + + from .controls import SearchBufferControl + + assert isinstance(ti.buffer_control, SearchBufferControl) + + buffer_control = BufferControl( + buffer=main_control.buffer, + input_processors=new_processors, + include_default_input_processors=False, + lexer=main_control.lexer, + preview_search=True, + search_buffer_control=ti.buffer_control, + ) + + return buffer_control.create_content(ti.width, ti.height, preview_search=True) + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + from .controls import SearchBufferControl + + assert isinstance( + ti.buffer_control, SearchBufferControl + ), "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only." + + source_to_display: Optional[SourceToDisplay] + display_to_source: Optional[DisplayToSource] + + main_control = self._get_main_buffer(ti.buffer_control) + + if ti.lineno == 0 and main_control: + content = self._content(main_control, ti) + + # Get the line from the original document for this search. + line_fragments = content.get_line(content.cursor_position.y) + + if main_control.search_state.direction == SearchDirection.FORWARD: + direction_text = "i-search" + else: + direction_text = "reverse-i-search" + + fragments_before: StyleAndTextTuples = [ + ("class:prompt.search", "("), + ("class:prompt.search", direction_text), + ("class:prompt.search", ")`"), + ] + + fragments = ( + fragments_before + + [ + ("class:prompt.search.text", fragment_list_to_text(ti.fragments)), + ("", "': "), + ] + + line_fragments + ) + + shift_position = fragment_list_len(fragments_before) + source_to_display = lambda i: i + shift_position + display_to_source = lambda i: i - shift_position + else: + source_to_display = None + display_to_source = None + fragments = ti.fragments + + return Transformation( + fragments, + source_to_display=source_to_display, + display_to_source=display_to_source, + ) + + +class ConditionalProcessor(Processor): + """ + Processor that applies another processor, according to a certain condition. + Example:: + + # Create a function that returns whether or not the processor should + # currently be applied. + def highlight_enabled(): + return true_or_false + + # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`. + BufferControl(input_processors=[ + ConditionalProcessor(HighlightSearchProcessor(), + Condition(highlight_enabled))]) + + :param processor: :class:`.Processor` instance. + :param filter: :class:`~prompt_toolkit.filters.Filter` instance. + """ + + def __init__(self, processor: Processor, filter: FilterOrBool) -> None: + self.processor = processor + self.filter = to_filter(filter) + + def apply_transformation( + self, transformation_input: TransformationInput + ) -> Transformation: + # Run processor when enabled. + if self.filter(): + return self.processor.apply_transformation(transformation_input) + else: + return Transformation(transformation_input.fragments) + + def __repr__(self) -> str: + return "{}(processor={!r}, filter={!r})".format( + self.__class__.__name__, + self.processor, + self.filter, + ) + + +class DynamicProcessor(Processor): + """ + Processor class that dynamically returns any Processor. + + :param get_processor: Callable that returns a :class:`.Processor` instance. + """ + + def __init__(self, get_processor: Callable[[], Optional[Processor]]) -> None: + self.get_processor = get_processor + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + processor = self.get_processor() or DummyProcessor() + return processor.apply_transformation(ti) + + +def merge_processors(processors: List[Processor]) -> Processor: + """ + Merge multiple `Processor` objects into one. + """ + if len(processors) == 0: + return DummyProcessor() + + if len(processors) == 1: + return processors[0] # Nothing to merge. + + return _MergedProcessor(processors) + + +class _MergedProcessor(Processor): + """ + Processor that groups multiple other `Processor` objects, but exposes an + API as if it is one `Processor`. + """ + + def __init__(self, processors: List[Processor]): + self.processors = processors + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + source_to_display_functions = [ti.source_to_display] + display_to_source_functions = [] + fragments = ti.fragments + + def source_to_display(i: int) -> int: + """Translate x position from the buffer to the x position in the + processor fragments list.""" + for f in source_to_display_functions: + i = f(i) + return i + + for p in self.processors: + transformation = p.apply_transformation( + TransformationInput( + ti.buffer_control, + ti.document, + ti.lineno, + source_to_display, + fragments, + ti.width, + ti.height, + ) + ) + fragments = transformation.fragments + display_to_source_functions.append(transformation.display_to_source) + source_to_display_functions.append(transformation.source_to_display) + + def display_to_source(i: int) -> int: + for f in reversed(display_to_source_functions): + i = f(i) + return i + + # In the case of a nested _MergedProcessor, each processor wants to + # receive a 'source_to_display' function (as part of the + # TransformationInput) that has everything in the chain before + # included, because it can be called as part of the + # `apply_transformation` function. However, this first + # `source_to_display` should not be part of the output that we are + # returning. (This is the most consistent with `display_to_source`.) + del source_to_display_functions[:1] + + return Transformation(fragments, source_to_display, display_to_source) -- cgit v1.2.3