diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 17:35:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 17:35:20 +0000 |
commit | e106bf94eff07d9a59771d9ccc4406421e18ab64 (patch) | |
tree | edb6545500e39df9c67aa918a6125bffc8ec1aee /src/prompt_toolkit/completion/base.py | |
parent | Initial commit. (diff) | |
download | prompt-toolkit-0a3e0a12a3f6453f8e5b1cffb2f7e1e619420f44.tar.xz prompt-toolkit-0a3e0a12a3f6453f8e5b1cffb2f7e1e619420f44.zip |
Adding upstream version 3.0.36.upstream/3.0.36upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/prompt_toolkit/completion/base.py')
-rw-r--r-- | src/prompt_toolkit/completion/base.py | 455 |
1 files changed, 455 insertions, 0 deletions
diff --git a/src/prompt_toolkit/completion/base.py b/src/prompt_toolkit/completion/base.py new file mode 100644 index 0000000..bd575ec --- /dev/null +++ b/src/prompt_toolkit/completion/base.py @@ -0,0 +1,455 @@ +""" +""" +from abc import ABCMeta, abstractmethod +from typing import AsyncGenerator, Callable, Iterable, List, Optional, Sequence + +from prompt_toolkit.document import Document +from prompt_toolkit.eventloop import ( + aclosing, + generator_to_async_generator, + get_event_loop, +) +from prompt_toolkit.filters import FilterOrBool, to_filter +from prompt_toolkit.formatted_text import AnyFormattedText, StyleAndTextTuples + +__all__ = [ + "Completion", + "Completer", + "ThreadedCompleter", + "DummyCompleter", + "DynamicCompleter", + "CompleteEvent", + "ConditionalCompleter", + "merge_completers", + "get_common_complete_suffix", +] + + +class Completion: + """ + :param text: The new string that will be inserted into the document. + :param start_position: Position relative to the cursor_position where the + new text will start. The text will be inserted between the + start_position and the original cursor position. + :param display: (optional string or formatted text) If the completion has + to be displayed differently in the completion menu. + :param display_meta: (Optional string or formatted text) Meta information + about the completion, e.g. the path or source where it's coming from. + This can also be a callable that returns a string. + :param style: Style string. + :param selected_style: Style string, used for a selected completion. + This can override the `style` parameter. + """ + + def __init__( + self, + text: str, + start_position: int = 0, + display: Optional[AnyFormattedText] = None, + display_meta: Optional[AnyFormattedText] = None, + style: str = "", + selected_style: str = "", + ) -> None: + + from prompt_toolkit.formatted_text import to_formatted_text + + self.text = text + self.start_position = start_position + self._display_meta = display_meta + + if display is None: + display = text + + self.display = to_formatted_text(display) + + self.style = style + self.selected_style = selected_style + + assert self.start_position <= 0 + + def __repr__(self) -> str: + if isinstance(self.display, str) and self.display == self.text: + return "{}(text={!r}, start_position={!r})".format( + self.__class__.__name__, + self.text, + self.start_position, + ) + else: + return "{}(text={!r}, start_position={!r}, display={!r})".format( + self.__class__.__name__, + self.text, + self.start_position, + self.display, + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Completion): + return False + return ( + self.text == other.text + and self.start_position == other.start_position + and self.display == other.display + and self._display_meta == other._display_meta + ) + + def __hash__(self) -> int: + return hash((self.text, self.start_position, self.display, self._display_meta)) + + @property + def display_text(self) -> str: + "The 'display' field as plain text." + from prompt_toolkit.formatted_text import fragment_list_to_text + + return fragment_list_to_text(self.display) + + @property + def display_meta(self) -> StyleAndTextTuples: + "Return meta-text. (This is lazy when using a callable)." + from prompt_toolkit.formatted_text import to_formatted_text + + return to_formatted_text(self._display_meta or "") + + @property + def display_meta_text(self) -> str: + "The 'meta' field as plain text." + from prompt_toolkit.formatted_text import fragment_list_to_text + + return fragment_list_to_text(self.display_meta) + + def new_completion_from_position(self, position: int) -> "Completion": + """ + (Only for internal use!) + Get a new completion by splitting this one. Used by `Application` when + it needs to have a list of new completions after inserting the common + prefix. + """ + assert position - self.start_position >= 0 + + return Completion( + text=self.text[position - self.start_position :], + display=self.display, + display_meta=self._display_meta, + ) + + +class CompleteEvent: + """ + Event that called the completer. + + :param text_inserted: When True, it means that completions are requested + because of a text insert. (`Buffer.complete_while_typing`.) + :param completion_requested: When True, it means that the user explicitly + pressed the `Tab` key in order to view the completions. + + These two flags can be used for instance to implement a completer that + shows some completions when ``Tab`` has been pressed, but not + automatically when the user presses a space. (Because of + `complete_while_typing`.) + """ + + def __init__( + self, text_inserted: bool = False, completion_requested: bool = False + ) -> None: + assert not (text_inserted and completion_requested) + + #: Automatic completion while typing. + self.text_inserted = text_inserted + + #: Used explicitly requested completion by pressing 'tab'. + self.completion_requested = completion_requested + + def __repr__(self) -> str: + return "{}(text_inserted={!r}, completion_requested={!r})".format( + self.__class__.__name__, + self.text_inserted, + self.completion_requested, + ) + + +class Completer(metaclass=ABCMeta): + """ + Base class for completer implementations. + """ + + @abstractmethod + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + """ + This should be a generator that yields :class:`.Completion` instances. + + If the generation of completions is something expensive (that takes a + lot of time), consider wrapping this `Completer` class in a + `ThreadedCompleter`. In that case, the completer algorithm runs in a + background thread and completions will be displayed as soon as they + arrive. + + :param document: :class:`~prompt_toolkit.document.Document` instance. + :param complete_event: :class:`.CompleteEvent` instance. + """ + while False: + yield + + async def get_completions_async( + self, document: Document, complete_event: CompleteEvent + ) -> AsyncGenerator[Completion, None]: + """ + Asynchronous generator for completions. (Probably, you won't have to + override this.) + + Asynchronous generator of :class:`.Completion` objects. + """ + for item in self.get_completions(document, complete_event): + yield item + + +class ThreadedCompleter(Completer): + """ + Wrapper that runs the `get_completions` generator in a thread. + + (Use this to prevent the user interface from becoming unresponsive if the + generation of completions takes too much time.) + + The completions will be displayed as soon as they are produced. The user + can already select a completion, even if not all completions are displayed. + """ + + def __init__(self, completer: Completer) -> None: + self.completer = completer + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + return self.completer.get_completions(document, complete_event) + + async def get_completions_async( + self, document: Document, complete_event: CompleteEvent + ) -> AsyncGenerator[Completion, None]: + """ + Asynchronous generator of completions. + """ + # NOTE: Right now, we are consuming the `get_completions` generator in + # a synchronous background thread, then passing the results one + # at a time over a queue, and consuming this queue in the main + # thread (that's what `generator_to_async_generator` does). That + # means that if the completer is *very* slow, we'll be showing + # completions in the UI once they are computed. + + # It's very tempting to replace this implementation with the + # commented code below for several reasons: + + # - `generator_to_async_generator` is not perfect and hard to get + # right. It's a lot of complexity for little gain. The + # implementation needs a huge buffer for it to be efficient + # when there are many completions (like 50k+). + # - Normally, a completer is supposed to be fast, users can have + # "complete while typing" enabled, and want to see the + # completions within a second. Handling one completion at a + # time, and rendering once we get it here doesn't make any + # sense if this is quick anyway. + # - Completers like `FuzzyCompleter` prepare all completions + # anyway so that they can be sorted by accuracy before they are + # yielded. At the point that we start yielding completions + # here, we already have all completions. + # - The `Buffer` class has complex logic to invalidate the UI + # while it is consuming the completions. We don't want to + # invalidate the UI for every completion (if there are many), + # but we want to do it often enough so that completions are + # being displayed while they are produced. + + # We keep the current behavior mainly for backward-compatibility. + # Similarly, it would be better for this function to not return + # an async generator, but simply be a coroutine that returns a + # list of `Completion` objects, containing all completions at + # once. + + # Note that this argument doesn't mean we shouldn't use + # `ThreadedCompleter`. It still makes sense to produce + # completions in a background thread, because we don't want to + # freeze the UI while the user is typing. But sending the + # completions one at a time to the UI maybe isn't worth it. + + # def get_all_in_thread() -> List[Completion]: + # return list(self.get_completions(document, complete_event)) + + # completions = await get_event_loop().run_in_executor(None, get_all_in_thread) + # for completion in completions: + # yield completion + + async with aclosing( + generator_to_async_generator( + lambda: self.completer.get_completions(document, complete_event) + ) + ) as async_generator: + async for completion in async_generator: + yield completion + + def __repr__(self) -> str: + return f"ThreadedCompleter({self.completer!r})" + + +class DummyCompleter(Completer): + """ + A completer that doesn't return any completion. + """ + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + return [] + + def __repr__(self) -> str: + return "DummyCompleter()" + + +class DynamicCompleter(Completer): + """ + Completer class that can dynamically returns any Completer. + + :param get_completer: Callable that returns a :class:`.Completer` instance. + """ + + def __init__(self, get_completer: Callable[[], Optional[Completer]]) -> None: + self.get_completer = get_completer + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + completer = self.get_completer() or DummyCompleter() + return completer.get_completions(document, complete_event) + + async def get_completions_async( + self, document: Document, complete_event: CompleteEvent + ) -> AsyncGenerator[Completion, None]: + completer = self.get_completer() or DummyCompleter() + + async for completion in completer.get_completions_async( + document, complete_event + ): + yield completion + + def __repr__(self) -> str: + return f"DynamicCompleter({self.get_completer!r} -> {self.get_completer()!r})" + + +class ConditionalCompleter(Completer): + """ + Wrapper around any other completer that will enable/disable the completions + depending on whether the received condition is satisfied. + + :param completer: :class:`.Completer` instance. + :param filter: :class:`.Filter` instance. + """ + + def __init__(self, completer: Completer, filter: FilterOrBool) -> None: + self.completer = completer + self.filter = to_filter(filter) + + def __repr__(self) -> str: + return f"ConditionalCompleter({self.completer!r}, filter={self.filter!r})" + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + # Get all completions in a blocking way. + if self.filter(): + yield from self.completer.get_completions(document, complete_event) + + async def get_completions_async( + self, document: Document, complete_event: CompleteEvent + ) -> AsyncGenerator[Completion, None]: + + # Get all completions in a non-blocking way. + if self.filter(): + async with aclosing( + self.completer.get_completions_async(document, complete_event) + ) as async_generator: + async for item in async_generator: + yield item + + +class _MergedCompleter(Completer): + """ + Combine several completers into one. + """ + + def __init__(self, completers: Sequence[Completer]) -> None: + self.completers = completers + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + # Get all completions from the other completers in a blocking way. + for completer in self.completers: + yield from completer.get_completions(document, complete_event) + + async def get_completions_async( + self, document: Document, complete_event: CompleteEvent + ) -> AsyncGenerator[Completion, None]: + + # Get all completions from the other completers in a non-blocking way. + for completer in self.completers: + async with aclosing( + completer.get_completions_async(document, complete_event) + ) as async_generator: + async for item in async_generator: + yield item + + +def merge_completers( + completers: Sequence[Completer], deduplicate: bool = False +) -> Completer: + """ + Combine several completers into one. + + :param deduplicate: If `True`, wrap the result in a `DeduplicateCompleter` + so that completions that would result in the same text will be + deduplicated. + """ + if deduplicate: + from .deduplicate import DeduplicateCompleter + + return DeduplicateCompleter(_MergedCompleter(completers)) + + return _MergedCompleter(completers) + + +def get_common_complete_suffix( + document: Document, completions: Sequence[Completion] +) -> str: + """ + Return the common prefix for all completions. + """ + # Take only completions that don't change the text before the cursor. + def doesnt_change_before_cursor(completion: Completion) -> bool: + end = completion.text[: -completion.start_position] + return document.text_before_cursor.endswith(end) + + completions2 = [c for c in completions if doesnt_change_before_cursor(c)] + + # When there is at least one completion that changes the text before the + # cursor, don't return any common part. + if len(completions2) != len(completions): + return "" + + # Return the common prefix. + def get_suffix(completion: Completion) -> str: + return completion.text[-completion.start_position :] + + return _commonprefix([get_suffix(c) for c in completions2]) + + +def _commonprefix(strings: Iterable[str]) -> str: + # Similar to os.path.commonprefix + if not strings: + return "" + + else: + s1 = min(strings) + s2 = max(strings) + + for i, c in enumerate(s1): + if c != s2[i]: + return s1[:i] + + return s1 |