From e5f7a41d988de298069eda636e823f374b973bd4 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 18:33:15 +0200 Subject: Adding upstream version 3.0.26. Signed-off-by: Daniel Baumann --- ptpython/__init__.py | 5 + ptpython/__main__.py | 8 + ptpython/completer.py | 671 +++++++++++++++++++ ptpython/contrib/__init__.py | 0 ptpython/contrib/asyncssh_repl.py | 125 ++++ ptpython/entry_points/__init__.py | 0 ptpython/entry_points/run_ptipython.py | 82 +++ ptpython/entry_points/run_ptpython.py | 234 +++++++ ptpython/eventloop.py | 75 +++ ptpython/filters.py | 39 ++ ptpython/history_browser.py | 687 +++++++++++++++++++ ptpython/ipython.py | 339 ++++++++++ ptpython/key_bindings.py | 337 ++++++++++ ptpython/layout.py | 773 ++++++++++++++++++++++ ptpython/lexer.py | 30 + ptpython/printer.py | 435 +++++++++++++ ptpython/prompt_style.py | 79 +++ ptpython/py.typed | 0 ptpython/python_input.py | 1121 ++++++++++++++++++++++++++++++++ ptpython/repl.py | 531 +++++++++++++++ ptpython/signatures.py | 270 ++++++++ ptpython/style.py | 175 +++++ ptpython/utils.py | 212 ++++++ ptpython/validator.py | 62 ++ 24 files changed, 6290 insertions(+) create mode 100644 ptpython/__init__.py create mode 100644 ptpython/__main__.py create mode 100644 ptpython/completer.py create mode 100644 ptpython/contrib/__init__.py create mode 100644 ptpython/contrib/asyncssh_repl.py create mode 100644 ptpython/entry_points/__init__.py create mode 100644 ptpython/entry_points/run_ptipython.py create mode 100644 ptpython/entry_points/run_ptpython.py create mode 100644 ptpython/eventloop.py create mode 100644 ptpython/filters.py create mode 100644 ptpython/history_browser.py create mode 100644 ptpython/ipython.py create mode 100644 ptpython/key_bindings.py create mode 100644 ptpython/layout.py create mode 100644 ptpython/lexer.py create mode 100644 ptpython/printer.py create mode 100644 ptpython/prompt_style.py create mode 100644 ptpython/py.typed create mode 100644 ptpython/python_input.py create mode 100644 ptpython/repl.py create mode 100644 ptpython/signatures.py create mode 100644 ptpython/style.py create mode 100644 ptpython/utils.py create mode 100644 ptpython/validator.py (limited to 'ptpython') diff --git a/ptpython/__init__.py b/ptpython/__init__.py new file mode 100644 index 0000000..63c6233 --- /dev/null +++ b/ptpython/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +from .repl import embed + +__all__ = ["embed"] diff --git a/ptpython/__main__.py b/ptpython/__main__.py new file mode 100644 index 0000000..c006261 --- /dev/null +++ b/ptpython/__main__.py @@ -0,0 +1,8 @@ +""" +Make `python -m ptpython` an alias for running `./ptpython`. +""" +from __future__ import annotations + +from .entry_points.run_ptpython import run + +run() diff --git a/ptpython/completer.py b/ptpython/completer.py new file mode 100644 index 0000000..91d6647 --- /dev/null +++ b/ptpython/completer.py @@ -0,0 +1,671 @@ +from __future__ import annotations + +import ast +import collections.abc as collections_abc +import inspect +import keyword +import re +from enum import Enum +from typing import TYPE_CHECKING, Any, Callable, Iterable + +from prompt_toolkit.completion import ( + CompleteEvent, + Completer, + Completion, + PathCompleter, +) +from prompt_toolkit.contrib.completers.system import SystemCompleter +from prompt_toolkit.contrib.regular_languages.compiler import compile as compile_grammar +from prompt_toolkit.contrib.regular_languages.completion import GrammarCompleter +from prompt_toolkit.document import Document +from prompt_toolkit.formatted_text import fragment_list_to_text, to_formatted_text + +from ptpython.utils import get_jedi_script_from_document + +if TYPE_CHECKING: + import jedi.api.classes + from prompt_toolkit.contrib.regular_languages.compiler import _CompiledGrammar + +__all__ = ["PythonCompleter", "CompletePrivateAttributes", "HidePrivateCompleter"] + + +class CompletePrivateAttributes(Enum): + """ + Should we display private attributes in the completion pop-up? + """ + + NEVER = "NEVER" + IF_NO_PUBLIC = "IF_NO_PUBLIC" + ALWAYS = "ALWAYS" + + +class PythonCompleter(Completer): + """ + Completer for Python code. + """ + + def __init__( + self, + get_globals: Callable[[], dict[str, Any]], + get_locals: Callable[[], dict[str, Any]], + enable_dictionary_completion: Callable[[], bool], + ) -> None: + super().__init__() + + self.get_globals = get_globals + self.get_locals = get_locals + self.enable_dictionary_completion = enable_dictionary_completion + + self._system_completer = SystemCompleter() + self._jedi_completer = JediCompleter(get_globals, get_locals) + self._dictionary_completer = DictionaryCompleter(get_globals, get_locals) + + self._path_completer_cache: GrammarCompleter | None = None + self._path_completer_grammar_cache: _CompiledGrammar | None = None + + @property + def _path_completer(self) -> GrammarCompleter: + if self._path_completer_cache is None: + self._path_completer_cache = GrammarCompleter( + self._path_completer_grammar, + { + "var1": PathCompleter(expanduser=True), + "var2": PathCompleter(expanduser=True), + }, + ) + return self._path_completer_cache + + @property + def _path_completer_grammar(self) -> _CompiledGrammar: + """ + Return the grammar for matching paths inside strings inside Python + code. + """ + # We make this lazy, because it delays startup time a little bit. + # This way, the grammar is build during the first completion. + if self._path_completer_grammar_cache is None: + self._path_completer_grammar_cache = self._create_path_completer_grammar() + return self._path_completer_grammar_cache + + def _create_path_completer_grammar(self) -> _CompiledGrammar: + def unwrapper(text: str) -> str: + return re.sub(r"\\(.)", r"\1", text) + + def single_quoted_wrapper(text: str) -> str: + return text.replace("\\", "\\\\").replace("'", "\\'") + + def double_quoted_wrapper(text: str) -> str: + return text.replace("\\", "\\\\").replace('"', '\\"') + + grammar = r""" + # Text before the current string. + ( + [^'"#] | # Not quoted characters. + ''' ([^'\\]|'(?!')|''(?!')|\\.])* ''' | # Inside single quoted triple strings + "" " ([^"\\]|"(?!")|""(?!^)|\\.])* "" " | # Inside double quoted triple strings + + \#[^\n]*(\n|$) | # Comment. + "(?!"") ([^"\\]|\\.)*" | # Inside double quoted strings. + '(?!'') ([^'\\]|\\.)*' # Inside single quoted strings. + + # Warning: The negative lookahead in the above two + # statements is important. If we drop that, + # then the regex will try to interpret every + # triple quoted string also as a single quoted + # string, making this exponentially expensive to + # execute! + )* + # The current string that we're completing. + ( + ' (?P([^\n'\\]|\\.)*) | # Inside a single quoted string. + " (?P([^\n"\\]|\\.)*) # Inside a double quoted string. + ) + """ + + return compile_grammar( + grammar, + escape_funcs={"var1": single_quoted_wrapper, "var2": double_quoted_wrapper}, + unescape_funcs={"var1": unwrapper, "var2": unwrapper}, + ) + + def _complete_path_while_typing(self, document: Document) -> bool: + char_before_cursor = document.char_before_cursor + return bool( + document.text + and (char_before_cursor.isalnum() or char_before_cursor in "/.~") + ) + + def _complete_python_while_typing(self, document: Document) -> bool: + """ + When `complete_while_typing` is set, only return completions when this + returns `True`. + """ + text = document.text_before_cursor # .rstrip() + char_before_cursor = text[-1:] + return bool( + text and (char_before_cursor.isalnum() or char_before_cursor in "_.([,") + ) + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + """ + Get Python completions. + """ + # If the input starts with an exclamation mark. Use the system completer. + if document.text.lstrip().startswith("!"): + yield from self._system_completer.get_completions( + Document( + text=document.text[1:], cursor_position=document.cursor_position - 1 + ), + complete_event, + ) + return + + # Do dictionary key completions. + if complete_event.completion_requested or self._complete_python_while_typing( + document + ): + if self.enable_dictionary_completion(): + has_dict_completions = False + for c in self._dictionary_completer.get_completions( + document, complete_event + ): + if c.text not in "[.": + # If we get the [ or . completion, still include the other + # completions. + has_dict_completions = True + yield c + if has_dict_completions: + return + + # Do Path completions (if there were no dictionary completions). + if complete_event.completion_requested or self._complete_path_while_typing( + document + ): + yield from self._path_completer.get_completions(document, complete_event) + + # Do Jedi completions. + if complete_event.completion_requested or self._complete_python_while_typing( + document + ): + # If we are inside a string, Don't do Jedi completion. + if not self._path_completer_grammar.match(document.text_before_cursor): + # Do Jedi Python completions. + yield from self._jedi_completer.get_completions( + document, complete_event + ) + + +class JediCompleter(Completer): + """ + Autocompleter that uses the Jedi library. + """ + + def __init__( + self, + get_globals: Callable[[], dict[str, Any]], + get_locals: Callable[[], dict[str, Any]], + ) -> None: + super().__init__() + + self.get_globals = get_globals + self.get_locals = get_locals + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + script = get_jedi_script_from_document( + document, self.get_locals(), self.get_globals() + ) + + if script: + try: + jedi_completions = script.complete( + column=document.cursor_position_col, + line=document.cursor_position_row + 1, + ) + except TypeError: + # Issue #9: bad syntax causes completions() to fail in jedi. + # https://github.com/jonathanslenders/python-prompt-toolkit/issues/9 + pass + except UnicodeDecodeError: + # Issue #43: UnicodeDecodeError on OpenBSD + # https://github.com/jonathanslenders/python-prompt-toolkit/issues/43 + pass + except AttributeError: + # Jedi issue #513: https://github.com/davidhalter/jedi/issues/513 + pass + except ValueError: + # Jedi issue: "ValueError: invalid \x escape" + pass + except KeyError: + # Jedi issue: "KeyError: u'a_lambda'." + # https://github.com/jonathanslenders/ptpython/issues/89 + pass + except OSError: + # Jedi issue: "IOError: No such file or directory." + # https://github.com/jonathanslenders/ptpython/issues/71 + pass + except AssertionError: + # In jedi.parser.__init__.py: 227, in remove_last_newline, + # the assertion "newline.value.endswith('\n')" can fail. + pass + except SystemError: + # In jedi.api.helpers.py: 144, in get_stack_at_position + # raise SystemError("This really shouldn't happen. There's a bug in Jedi.") + pass + except NotImplementedError: + # See: https://github.com/jonathanslenders/ptpython/issues/223 + pass + except Exception: + # Suppress all other Jedi exceptions. + pass + else: + # Move function parameters to the top. + jedi_completions = sorted( + jedi_completions, + key=lambda jc: ( + # Params first. + jc.type != "param", + # Private at the end. + jc.name.startswith("_"), + # Then sort by name. + jc.name_with_symbols.lower(), + ), + ) + + for jc in jedi_completions: + if jc.type == "function": + suffix = "()" + else: + suffix = "" + + if jc.type == "param": + suffix = "..." + + yield Completion( + jc.name_with_symbols, + len(jc.complete) - len(jc.name_with_symbols), + display=jc.name_with_symbols + suffix, + display_meta=jc.type, + style=_get_style_for_jedi_completion(jc), + ) + + +class DictionaryCompleter(Completer): + """ + Experimental completer for Python dictionary keys. + + Warning: This does an `eval` and `repr` on some Python expressions before + the cursor, which is potentially dangerous. It doesn't match on + function calls, so it only triggers attribute access. + """ + + def __init__( + self, + get_globals: Callable[[], dict[str, Any]], + get_locals: Callable[[], dict[str, Any]], + ) -> None: + super().__init__() + + self.get_globals = get_globals + self.get_locals = get_locals + + # Pattern for expressions that are "safe" to eval for auto-completion. + # These are expressions that contain only attribute and index lookups. + varname = r"[a-zA-Z_][a-zA-Z0-9_]*" + + expression = rf""" + # Any expression safe enough to eval while typing. + # No operators, except dot, and only other dict lookups. + # Technically, this can be unsafe of course, if bad code runs + # in `__getattr__` or ``__getitem__``. + ( + # Variable name + {varname} + + \s* + + (?: + # Attribute access. + \s* \. \s* {varname} \s* + + | + + # Item lookup. + # (We match the square brackets. The key can be anything. + # We don't care about matching quotes here in the regex. + # Nested square brackets are not supported.) + \s* \[ [^\[\]]+ \] \s* + )* + ) + """ + + # Pattern for recognizing for-loops, so that we can provide + # autocompletion on the iterator of the for-loop. (According to the + # first item of the collection we're iterating over.) + self.for_loop_pattern = re.compile( + rf""" + for \s+ ([a-zA-Z0-9_]+) \s+ in \s+ {expression} \s* : + """, + re.VERBOSE, + ) + + # Pattern for matching a simple expression (for completing [ or . + # operators). + self.expression_pattern = re.compile( + rf""" + {expression} + $ + """, + re.VERBOSE, + ) + + # Pattern for matching item lookups. + self.item_lookup_pattern = re.compile( + rf""" + {expression} + + # Dict lookup to complete (square bracket open + start of + # string). + \[ + \s* ([^\[\]]*)$ + """, + re.VERBOSE, + ) + + # Pattern for matching attribute lookups. + self.attribute_lookup_pattern = re.compile( + rf""" + {expression} + + # Attribute lookup to complete (dot + varname). + \. + \s* ([a-zA-Z0-9_]*)$ + """, + re.VERBOSE, + ) + + def _lookup(self, expression: str, temp_locals: dict[str, Any]) -> object: + """ + Do lookup of `object_var` in the context. + `temp_locals` is a dictionary, used for the locals. + """ + try: + return eval(expression.strip(), self.get_globals(), temp_locals) + except BaseException: + return None # Many exception, like NameError can be thrown here. + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + # First, find all for-loops, and assign the first item of the + # collections they're iterating to the iterator variable, so that we + # can provide code completion on the iterators. + temp_locals = self.get_locals().copy() + + for match in self.for_loop_pattern.finditer(document.text_before_cursor): + varname, expression = match.groups() + expression_val = self._lookup(expression, temp_locals) + + # We do this only for lists and tuples. Calling `next()` on any + # collection would create undesired side effects. + if isinstance(expression_val, (list, tuple)) and expression_val: + temp_locals[varname] = expression_val[0] + + # Get all completions. + yield from self._get_expression_completions( + document, complete_event, temp_locals + ) + yield from self._get_item_lookup_completions( + document, complete_event, temp_locals + ) + yield from self._get_attribute_completions( + document, complete_event, temp_locals + ) + + def _do_repr(self, obj: object) -> str: + try: + return str(repr(obj)) + except BaseException: + raise ReprFailedError + + def eval_expression(self, document: Document, locals: dict[str, Any]) -> object: + """ + Evaluate + """ + match = self.expression_pattern.search(document.text_before_cursor) + if match is not None: + object_var = match.groups()[0] + return self._lookup(object_var, locals) + + return None + + def _get_expression_completions( + self, + document: Document, + complete_event: CompleteEvent, + temp_locals: dict[str, Any], + ) -> Iterable[Completion]: + """ + Complete the [ or . operator after an object. + """ + result = self.eval_expression(document, temp_locals) + + if result is not None: + if isinstance( + result, + (list, tuple, dict, collections_abc.Mapping, collections_abc.Sequence), + ): + yield Completion("[", 0) + + else: + # Note: Don't call `if result` here. That can fail for types + # that have custom truthness checks. + yield Completion(".", 0) + + def _get_item_lookup_completions( + self, + document: Document, + complete_event: CompleteEvent, + temp_locals: dict[str, Any], + ) -> Iterable[Completion]: + """ + Complete dictionary keys. + """ + + def meta_repr(value: object) -> Callable[[], str]: + "Abbreviate meta text, make sure it fits on one line." + + # We return a function, so that it gets computed when it's needed. + # When there are many completions, that improves the performance + # quite a bit (for the multi-column completion menu, we only need + # to display one meta text). + def get_value_repr() -> str: + text = self._do_repr(value) + + # Take first line, if multiple lines. + if "\n" in text: + text = text.split("\n", 1)[0] + "..." + + return text + + return get_value_repr + + match = self.item_lookup_pattern.search(document.text_before_cursor) + if match is not None: + object_var, key = match.groups() + + # Do lookup of `object_var` in the context. + result = self._lookup(object_var, temp_locals) + + # If this object is a dictionary, complete the keys. + if isinstance(result, (dict, collections_abc.Mapping)): + # Try to evaluate the key. + key_obj = key + for k in [key, key + '"', key + "'"]: + try: + key_obj = ast.literal_eval(k) + except (SyntaxError, ValueError): + continue + else: + break + + for k, v in result.items(): + if str(k).startswith(str(key_obj)): + try: + k_repr = self._do_repr(k) + yield Completion( + k_repr + "]", + -len(key), + display=f"[{k_repr}]", + display_meta=meta_repr(v), + ) + except ReprFailedError: + pass + + # Complete list/tuple index keys. + elif isinstance(result, (list, tuple, collections_abc.Sequence)): + if not key or key.isdigit(): + for k in range(min(len(result), 1000)): + if str(k).startswith(key): + try: + k_repr = self._do_repr(k) + yield Completion( + k_repr + "]", + -len(key), + display=f"[{k_repr}]", + display_meta=meta_repr(result[k]), + ) + except KeyError: + # `result[k]` lookup failed. Trying to complete + # broken object. + pass + except ReprFailedError: + pass + + def _get_attribute_completions( + self, + document: Document, + complete_event: CompleteEvent, + temp_locals: dict[str, Any], + ) -> Iterable[Completion]: + """ + Complete attribute names. + """ + match = self.attribute_lookup_pattern.search(document.text_before_cursor) + if match is not None: + object_var, attr_name = match.groups() + + # Do lookup of `object_var` in the context. + result = self._lookup(object_var, temp_locals) + + names = self._sort_attribute_names(dir(result)) + + def get_suffix(name: str) -> str: + try: + obj = getattr(result, name, None) + if inspect.isfunction(obj) or inspect.ismethod(obj): + return "()" + if isinstance(obj, collections_abc.Mapping): + return "{}" + if isinstance(obj, collections_abc.Sequence): + return "[]" + except: + pass + return "" + + for name in names: + if name.startswith(attr_name): + suffix = get_suffix(name) + yield Completion(name, -len(attr_name), display=name + suffix) + + def _sort_attribute_names(self, names: list[str]) -> list[str]: + """ + Sort attribute names alphabetically, but move the double underscore and + underscore names to the end. + """ + + def sort_key(name: str) -> tuple[int, str]: + if name.startswith("__"): + return (2, name) # Double underscore comes latest. + if name.startswith("_"): + return (1, name) # Single underscore before that. + return (0, name) # Other names first. + + return sorted(names, key=sort_key) + + +class HidePrivateCompleter(Completer): + """ + Wrapper around completer that hides private fields, depending on whether or + not public fields are shown. + + (The reason this is implemented as a `Completer` wrapper is because this + way it works also with `FuzzyCompleter`.) + """ + + def __init__( + self, + completer: Completer, + complete_private_attributes: Callable[[], CompletePrivateAttributes], + ) -> None: + self.completer = completer + self.complete_private_attributes = complete_private_attributes + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + completions = list(self.completer.get_completions(document, complete_event)) + complete_private_attributes = self.complete_private_attributes() + hide_private = False + + def is_private(completion: Completion) -> bool: + text = fragment_list_to_text(to_formatted_text(completion.display)) + return text.startswith("_") + + if complete_private_attributes == CompletePrivateAttributes.NEVER: + hide_private = True + + elif complete_private_attributes == CompletePrivateAttributes.IF_NO_PUBLIC: + hide_private = any(not is_private(completion) for completion in completions) + + if hide_private: + completions = [ + completion for completion in completions if not is_private(completion) + ] + + return completions + + +class ReprFailedError(Exception): + "Raised when the repr() call in `DictionaryCompleter` fails." + + +try: + import builtins + + _builtin_names = dir(builtins) +except ImportError: # Python 2. + _builtin_names = [] + + +def _get_style_for_jedi_completion( + jedi_completion: jedi.api.classes.Completion, +) -> str: + """ + Return completion style to use for this name. + """ + name = jedi_completion.name_with_symbols + + if jedi_completion.type == "param": + return "class:completion.param" + + if name in _builtin_names: + return "class:completion.builtin" + + if keyword.iskeyword(name): + return "class:completion.keyword" + + return "" diff --git a/ptpython/contrib/__init__.py b/ptpython/contrib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ptpython/contrib/asyncssh_repl.py b/ptpython/contrib/asyncssh_repl.py new file mode 100644 index 0000000..2f74eb2 --- /dev/null +++ b/ptpython/contrib/asyncssh_repl.py @@ -0,0 +1,125 @@ +""" +Tool for embedding a REPL inside a Python 3 asyncio process. +See ./examples/asyncio-ssh-python-embed.py for a demo. + +Note that the code in this file is Python 3 only. However, we +should make sure not to use Python 3-only syntax, because this +package should be installable in Python 2 as well! +""" +from __future__ import annotations + +import asyncio +from typing import Any, AnyStr, TextIO, cast + +import asyncssh +from prompt_toolkit.data_structures import Size +from prompt_toolkit.input import create_pipe_input +from prompt_toolkit.output.vt100 import Vt100_Output + +from ptpython.python_input import _GetNamespace, _Namespace +from ptpython.repl import PythonRepl + +__all__ = ["ReplSSHServerSession"] + + +class ReplSSHServerSession(asyncssh.SSHServerSession[str]): + """ + SSH server session that runs a Python REPL. + + :param get_globals: callable that returns the current globals. + :param get_locals: (optional) callable that returns the current locals. + """ + + def __init__( + self, get_globals: _GetNamespace, get_locals: _GetNamespace | None = None + ) -> None: + self._chan: Any = None + + def _globals() -> _Namespace: + data = get_globals() + data.setdefault("print", self._print) + return data + + # PipInput object, for sending input in the CLI. + # (This is something that we can use in the prompt_toolkit event loop, + # but still write date in manually.) + self._input_pipe = create_pipe_input() + + # Output object. Don't render to the real stdout, but write everything + # in the SSH channel. + class Stdout: + def write(s, data: str) -> None: + if self._chan is not None: + data = data.replace("\n", "\r\n") + self._chan.write(data) + + def flush(s) -> None: + pass + + self.repl = PythonRepl( + get_globals=_globals, + get_locals=get_locals or _globals, + input=self._input_pipe, + output=Vt100_Output(cast(TextIO, Stdout()), self._get_size), + ) + + # Disable open-in-editor and system prompt. Because it would run and + # display these commands on the server side, rather than in the SSH + # client. + self.repl.enable_open_in_editor = False + self.repl.enable_system_bindings = False + + def _get_size(self) -> Size: + """ + Callable that returns the current `Size`, required by Vt100_Output. + """ + if self._chan is None: + return Size(rows=20, columns=79) + else: + width, height, pixwidth, pixheight = self._chan.get_terminal_size() + return Size(rows=height, columns=width) + + def connection_made(self, chan: Any) -> None: + """ + Client connected, run repl in coroutine. + """ + self._chan = chan + + # Run REPL interface. + f = asyncio.ensure_future(self.repl.run_async()) + + # Close channel when done. + def done(_: object) -> None: + chan.close() + self._chan = None + + f.add_done_callback(done) + + def shell_requested(self) -> bool: + return True + + def terminal_size_changed( + self, width: int, height: int, pixwidth: int, pixheight: int + ) -> None: + """ + When the terminal size changes, report back to CLI. + """ + self.repl.app._on_resize() + + def data_received(self, data: AnyStr, datatype: int | None) -> None: + """ + When data is received, send to inputstream of the CLI and repaint. + """ + self._input_pipe.send(data) # type: ignore + + def _print( + self, *data: object, sep: str = " ", end: str = "\n", file: Any = None + ) -> None: + """ + Alternative 'print' function that prints back into the SSH channel. + """ + # Pop keyword-only arguments. (We cannot use the syntax from the + # signature. Otherwise, Python2 will give a syntax error message when + # installing.) + data_as_str = sep.join(map(str, data)) + self._chan.write(data_as_str + end) diff --git a/ptpython/entry_points/__init__.py b/ptpython/entry_points/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ptpython/entry_points/run_ptipython.py b/ptpython/entry_points/run_ptipython.py new file mode 100644 index 0000000..b660a0a --- /dev/null +++ b/ptpython/entry_points/run_ptipython.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +from __future__ import annotations + +import os +import sys + +from .run_ptpython import create_parser, get_config_and_history_file + + +def run(user_ns=None): + a = create_parser().parse_args() + + config_file, history_file = get_config_and_history_file(a) + + # If IPython is not available, show message and exit here with error status + # code. + try: + import IPython + except ImportError: + print("IPython not found. Please install IPython (pip install ipython).") + sys.exit(1) + else: + from ptpython.ipython import embed + from ptpython.repl import enable_deprecation_warnings, run_config + + # Add the current directory to `sys.path`. + if sys.path[0] != "": + sys.path.insert(0, "") + + # When a file has been given, run that, otherwise start the shell. + if a.args and not a.interactive: + sys.argv = a.args + path = a.args[0] + with open(path, "rb") as f: + code = compile(f.read(), path, "exec") + exec(code, {"__name__": "__main__", "__file__": path}) + else: + enable_deprecation_warnings() + + # Create an empty namespace for this interactive shell. (If we don't do + # that, all the variables from this function will become available in + # the IPython shell.) + if user_ns is None: + user_ns = {} + + # Startup path + startup_paths = [] + if "PYTHONSTARTUP" in os.environ: + startup_paths.append(os.environ["PYTHONSTARTUP"]) + + # --interactive + if a.interactive: + startup_paths.append(a.args[0]) + sys.argv = a.args + + # exec scripts from startup paths + for path in startup_paths: + if os.path.exists(path): + with open(path, "rb") as f: + code = compile(f.read(), path, "exec") + exec(code, user_ns, user_ns) + else: + print(f"File not found: {path}\n\n") + sys.exit(1) + + # Apply config file + def configure(repl): + if os.path.exists(config_file): + run_config(repl, config_file) + + # Run interactive shell. + embed( + vi_mode=a.vi, + history_filename=history_file, + configure=configure, + user_ns=user_ns, + title="IPython REPL (ptipython)", + ) + + +if __name__ == "__main__": + run() diff --git a/ptpython/entry_points/run_ptpython.py b/ptpython/entry_points/run_ptpython.py new file mode 100644 index 0000000..1d4a532 --- /dev/null +++ b/ptpython/entry_points/run_ptpython.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python +""" +ptpython: Interactive Python shell. + +positional arguments: + args Script and arguments + +optional arguments: + -h, --help show this help message and exit + --vi Enable Vi key bindings + -i, --interactive Start interactive shell after executing this file. + --asyncio Run an asyncio event loop to support top-level "await". + --light-bg Run on a light background (use dark colors for text). + --dark-bg Run on a dark background (use light colors for text). + --config-file CONFIG_FILE + Location of configuration file. + --history-file HISTORY_FILE + Location of history file. + -V, --version show program's version number and exit + +environment variables: + PTPYTHON_CONFIG_HOME: a configuration directory to use + PYTHONSTARTUP: file executed on interactive startup (no default) +""" +from __future__ import annotations + +import argparse +import asyncio +import os +import pathlib +import sys +from textwrap import dedent +from typing import IO + +import appdirs +from prompt_toolkit.formatted_text import HTML +from prompt_toolkit.shortcuts import print_formatted_text + +from ptpython.repl import PythonRepl, embed, enable_deprecation_warnings, run_config + +try: + from importlib import metadata # type: ignore +except ImportError: + import importlib_metadata as metadata # type: ignore + + +__all__ = ["create_parser", "get_config_and_history_file", "run"] + + +class _Parser(argparse.ArgumentParser): + def print_help(self, file: IO[str] | None = None) -> None: + super().print_help() + print( + dedent( + """ + environment variables: + PTPYTHON_CONFIG_HOME: a configuration directory to use + PYTHONSTARTUP: file executed on interactive startup (no default) + """, + ).rstrip(), + ) + + +def create_parser() -> _Parser: + parser = _Parser(description="ptpython: Interactive Python shell.") + parser.add_argument("--vi", action="store_true", help="Enable Vi key bindings") + parser.add_argument( + "-i", + "--interactive", + action="store_true", + help="Start interactive shell after executing this file.", + ) + parser.add_argument( + "--asyncio", + action="store_true", + help='Run an asyncio event loop to support top-level "await".', + ) + parser.add_argument( + "--light-bg", + action="store_true", + help="Run on a light background (use dark colors for text).", + ) + parser.add_argument( + "--dark-bg", + action="store_true", + help="Run on a dark background (use light colors for text).", + ) + parser.add_argument( + "--config-file", type=str, help="Location of configuration file." + ) + parser.add_argument("--history-file", type=str, help="Location of history file.") + parser.add_argument( + "-V", + "--version", + action="version", + version=metadata.version("ptpython"), + ) + parser.add_argument("args", nargs="*", help="Script and arguments") + return parser + + +def get_config_and_history_file(namespace: argparse.Namespace) -> tuple[str, str]: + """ + Check which config/history files to use, ensure that the directories for + these files exist, and return the config and history path. + """ + config_dir = os.environ.get( + "PTPYTHON_CONFIG_HOME", + appdirs.user_config_dir("ptpython", "prompt_toolkit"), + ) + data_dir = appdirs.user_data_dir("ptpython", "prompt_toolkit") + + # Create directories. + for d in (config_dir, data_dir): + pathlib.Path(d).mkdir(parents=True, exist_ok=True) + + # Determine config file to be used. + config_file = os.path.join(config_dir, "config.py") + legacy_config_file = os.path.join(os.path.expanduser("~/.ptpython"), "config.py") + + warnings = [] + + # Config file + if namespace.config_file: + # Override config_file. + config_file = os.path.expanduser(namespace.config_file) + + elif os.path.isfile(legacy_config_file): + # Warn about the legacy configuration file. + warnings.append( + HTML( + " ~/.ptpython/config.py is deprecated, move your configuration to %s\n" + ) + % config_file + ) + config_file = legacy_config_file + + # Determine history file to be used. + history_file = os.path.join(data_dir, "history") + legacy_history_file = os.path.join(os.path.expanduser("~/.ptpython"), "history") + + if namespace.history_file: + # Override history_file. + history_file = os.path.expanduser(namespace.history_file) + + elif os.path.isfile(legacy_history_file): + # Warn about the legacy history file. + warnings.append( + HTML( + " ~/.ptpython/history is deprecated, move your history to %s\n" + ) + % history_file + ) + history_file = legacy_history_file + + # Print warnings. + if warnings: + print_formatted_text(HTML("Warning:")) + for w in warnings: + print_formatted_text(w) + + return config_file, history_file + + +def run() -> None: + a = create_parser().parse_args() + + config_file, history_file = get_config_and_history_file(a) + + # Startup path + startup_paths = [] + if "PYTHONSTARTUP" in os.environ: + startup_paths.append(os.environ["PYTHONSTARTUP"]) + + # --interactive + if a.interactive and a.args: + # Note that we shouldn't run PYTHONSTARTUP when -i is given. + startup_paths = [a.args[0]] + sys.argv = a.args + + # Add the current directory to `sys.path`. + if sys.path[0] != "": + sys.path.insert(0, "") + + # When a file has been given, run that, otherwise start the shell. + if a.args and not a.interactive: + sys.argv = a.args + path = a.args[0] + with open(path, "rb") as f: + code = compile(f.read(), path, "exec") + # NOTE: We have to pass a dict as namespace. Omitting this argument + # causes imports to not be found. See issue #326. + # However, an empty dict sets __name__ to 'builtins', which + # breaks `if __name__ == '__main__'` checks. See issue #444. + exec(code, {"__name__": "__main__", "__file__": path}) + + # Run interactive shell. + else: + enable_deprecation_warnings() + + # Apply config file + def configure(repl: PythonRepl) -> None: + if os.path.exists(config_file): + run_config(repl, config_file) + + # Adjust colors if dark/light background flag has been given. + if a.light_bg: + repl.min_brightness = 0.0 + repl.max_brightness = 0.60 + elif a.dark_bg: + repl.min_brightness = 0.60 + repl.max_brightness = 1.0 + + import __main__ + + embed_result = embed( # type: ignore + vi_mode=a.vi, + history_filename=history_file, + configure=configure, + locals=__main__.__dict__, + globals=__main__.__dict__, + startup_paths=startup_paths, + title="Python REPL (ptpython)", + return_asyncio_coroutine=a.asyncio, + ) + + if a.asyncio: + print("Starting ptpython asyncio REPL") + print('Use "await" directly instead of "asyncio.run()".') + asyncio.run(embed_result) + + +if __name__ == "__main__": + run() diff --git a/ptpython/eventloop.py b/ptpython/eventloop.py new file mode 100644 index 0000000..14ab64b --- /dev/null +++ b/ptpython/eventloop.py @@ -0,0 +1,75 @@ +""" +Wrapper around the eventloop that gives some time to the Tkinter GUI to process +events when it's loaded and while we are waiting for input at the REPL. This +way we don't block the UI of for instance ``turtle`` and other Tk libraries. + +(Normally Tkinter registers it's callbacks in ``PyOS_InputHook`` to integrate +in readline. ``prompt-toolkit`` doesn't understand that input hook, but this +will fix it for Tk.) +""" +from __future__ import annotations + +import sys +import time + +from prompt_toolkit.eventloop import InputHookContext + +__all__ = ["inputhook"] + + +def _inputhook_tk(inputhook_context: InputHookContext) -> None: + """ + Inputhook for Tk. + Run the Tk eventloop until prompt-toolkit needs to process the next input. + """ + # Get the current TK application. + import tkinter + + import _tkinter # Keep this imports inline! + + root = tkinter._default_root # type: ignore + + def wait_using_filehandler() -> None: + """ + Run the TK eventloop until the file handler that we got from the + inputhook becomes readable. + """ + # Add a handler that sets the stop flag when `prompt-toolkit` has input + # to process. + stop = [False] + + def done(*a: object) -> None: + stop[0] = True + + root.createfilehandler(inputhook_context.fileno(), _tkinter.READABLE, done) + + # Run the TK event loop as long as we don't receive input. + while root.dooneevent(_tkinter.ALL_EVENTS): + if stop[0]: + break + + root.deletefilehandler(inputhook_context.fileno()) + + def wait_using_polling() -> None: + """ + Windows TK doesn't support 'createfilehandler'. + So, run the TK eventloop and poll until input is ready. + """ + while not inputhook_context.input_is_ready(): + while root.dooneevent(_tkinter.ALL_EVENTS | _tkinter.DONT_WAIT): + pass + # Sleep to make the CPU idle, but not too long, so that the UI + # stays responsive. + time.sleep(0.01) + + if root is not None: + if hasattr(root, "createfilehandler"): + wait_using_filehandler() + else: + wait_using_polling() + + +def inputhook(inputhook_context: InputHookContext) -> None: + # Only call the real input hook when the 'Tkinter' library was loaded. + if "Tkinter" in sys.modules or "tkinter" in sys.modules: + _inputhook_tk(inputhook_context) diff --git a/ptpython/filters.py b/ptpython/filters.py new file mode 100644 index 0000000..a2079fd --- /dev/null +++ b/ptpython/filters.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from prompt_toolkit.filters import Filter + +if TYPE_CHECKING: + from .python_input import PythonInput + +__all__ = ["HasSignature", "ShowSidebar", "ShowSignature", "ShowDocstring"] + + +class PythonInputFilter(Filter): + def __init__(self, python_input: PythonInput) -> None: + super().__init__() + self.python_input = python_input + + def __call__(self) -> bool: + raise NotImplementedError + + +class HasSignature(PythonInputFilter): + def __call__(self) -> bool: + return bool(self.python_input.signatures) + + +class ShowSidebar(PythonInputFilter): + def __call__(self) -> bool: + return self.python_input.show_sidebar + + +class ShowSignature(PythonInputFilter): + def __call__(self) -> bool: + return self.python_input.show_signature + + +class ShowDocstring(PythonInputFilter): + def __call__(self) -> bool: + return self.python_input.show_docstring diff --git a/ptpython/history_browser.py b/ptpython/history_browser.py new file mode 100644 index 0000000..b667be1 --- /dev/null +++ b/ptpython/history_browser.py @@ -0,0 +1,687 @@ +""" +Utility to easily select lines from the history and execute them again. + +`create_history_application` creates an `Application` instance that runs will +run as a sub application of the Repl/PythonInput. +""" +from __future__ import annotations + +from functools import partial +from typing import TYPE_CHECKING, Callable + +from prompt_toolkit.application import Application +from prompt_toolkit.application.current import get_app +from prompt_toolkit.buffer import Buffer +from prompt_toolkit.document import Document +from prompt_toolkit.enums import DEFAULT_BUFFER +from prompt_toolkit.filters import Condition, has_focus +from prompt_toolkit.formatted_text.base import StyleAndTextTuples +from prompt_toolkit.formatted_text.utils import fragment_list_to_text +from prompt_toolkit.history import History +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.key_binding.key_processor import KeyPressEvent +from prompt_toolkit.layout.containers import ( + ConditionalContainer, + Container, + Float, + FloatContainer, + HSplit, + ScrollOffsets, + VSplit, + Window, + WindowAlign, + WindowRenderInfo, +) +from prompt_toolkit.layout.controls import ( + BufferControl, + FormattedTextControl, + UIContent, +) +from prompt_toolkit.layout.dimension import Dimension as D +from prompt_toolkit.layout.layout import Layout +from prompt_toolkit.layout.margins import Margin, ScrollbarMargin +from prompt_toolkit.layout.processors import ( + Processor, + Transformation, + TransformationInput, +) +from prompt_toolkit.lexers import PygmentsLexer +from prompt_toolkit.mouse_events import MouseEvent +from prompt_toolkit.widgets import Frame +from prompt_toolkit.widgets.toolbars import ArgToolbar, SearchToolbar +from pygments.lexers import Python3Lexer as PythonLexer +from pygments.lexers import RstLexer + +from ptpython.layout import get_inputmode_fragments + +from .utils import if_mousedown + +if TYPE_CHECKING: + from .python_input import PythonInput + +HISTORY_COUNT = 2000 + +__all__ = ["HistoryLayout", "PythonHistory"] + +E = KeyPressEvent + +HELP_TEXT = """ +This interface is meant to select multiple lines from the +history and execute them together. + +Typical usage +------------- + +1. Move the ``cursor up`` in the history pane, until the + cursor is on the first desired line. +2. Hold down the ``space bar``, or press it multiple + times. Each time it will select one line and move to + the next one. Each selected line will appear on the + right side. +3. When all the required lines are displayed on the right + side, press ``Enter``. This will go back to the Python + REPL and show these lines as the current input. They + can still be edited from there. + +Key bindings +------------ + +Many Emacs and Vi navigation key bindings should work. +Press ``F4`` to switch between Emacs and Vi mode. + +Additional bindings: + +- ``Space``: Select or delect a line. +- ``Tab``: Move the focus between the history and input + pane. (Alternative: ``Ctrl-W``) +- ``Ctrl-C``: Cancel. Ignore the result and go back to + the REPL. (Alternatives: ``q`` and ``Control-G``.) +- ``Enter``: Accept the result and go back to the REPL. +- ``F1``: Show/hide help. Press ``Enter`` to quit this + help message. + +Further, remember that searching works like in Emacs +(using ``Ctrl-R``) or Vi (using ``/``). +""" + + +class BORDER: + "Box drawing characters." + + HORIZONTAL = "\u2501" + VERTICAL = "\u2503" + TOP_LEFT = "\u250f" + TOP_RIGHT = "\u2513" + BOTTOM_LEFT = "\u2517" + BOTTOM_RIGHT = "\u251b" + LIGHT_VERTICAL = "\u2502" + + +def _create_popup_window(title: str, body: Container) -> Frame: + """ + Return the layout for a pop-up window. It consists of a title bar showing + the `title` text, and a body layout. The window is surrounded by borders. + """ + return Frame(body=body, title=title) + + +class HistoryLayout: + """ + Create and return a `Container` instance for the history + application. + """ + + def __init__(self, history: PythonHistory) -> None: + search_toolbar = SearchToolbar() + + self.help_buffer_control = BufferControl( + buffer=history.help_buffer, lexer=PygmentsLexer(RstLexer) + ) + + help_window = _create_popup_window( + title="History Help", + body=Window( + content=self.help_buffer_control, + right_margins=[ScrollbarMargin(display_arrows=True)], + scroll_offsets=ScrollOffsets(top=2, bottom=2), + ), + ) + + self.default_buffer_control = BufferControl( + buffer=history.default_buffer, + input_processors=[GrayExistingText(history.history_mapping)], + lexer=PygmentsLexer(PythonLexer), + ) + + self.history_buffer_control = BufferControl( + buffer=history.history_buffer, + lexer=PygmentsLexer(PythonLexer), + search_buffer_control=search_toolbar.control, + preview_search=True, + ) + + history_window = Window( + content=self.history_buffer_control, + wrap_lines=False, + left_margins=[HistoryMargin(history)], + scroll_offsets=ScrollOffsets(top=2, bottom=2), + ) + + self.root_container = HSplit( + [ + # Top title bar. + Window( + content=FormattedTextControl(_get_top_toolbar_fragments), + align=WindowAlign.CENTER, + style="class:status-toolbar", + ), + FloatContainer( + content=VSplit( + [ + # Left side: history. + history_window, + # Separator. + Window( + width=D.exact(1), + char=BORDER.LIGHT_VERTICAL, + style="class:separator", + ), + # Right side: result. + Window( + content=self.default_buffer_control, + wrap_lines=False, + left_margins=[ResultMargin(history)], + scroll_offsets=ScrollOffsets(top=2, bottom=2), + ), + ] + ), + floats=[ + # Help text as a float. + Float( + width=60, + top=3, + bottom=2, + content=ConditionalContainer( + content=help_window, + filter=has_focus(history.help_buffer), + ), + ) + ], + ), + # Bottom toolbars. + ArgToolbar(), + search_toolbar, + Window( + content=FormattedTextControl( + partial(_get_bottom_toolbar_fragments, history=history) + ), + style="class:status-toolbar", + ), + ] + ) + + self.layout = Layout(self.root_container, history_window) + + +def _get_top_toolbar_fragments() -> StyleAndTextTuples: + return [("class:status-bar.title", "History browser - Insert from history")] + + +def _get_bottom_toolbar_fragments(history: PythonHistory) -> StyleAndTextTuples: + python_input = history.python_input + + @if_mousedown + def f1(mouse_event: MouseEvent) -> None: + _toggle_help(history) + + @if_mousedown + def tab(mouse_event: MouseEvent) -> None: + _select_other_window(history) + + return ( + [("class:status-toolbar", " ")] + + get_inputmode_fragments(python_input) + + [ + ("class:status-toolbar", " "), + ("class:status-toolbar.key", "[Space]"), + ("class:status-toolbar", " Toggle "), + ("class:status-toolbar.key", "[Tab]", tab), + ("class:status-toolbar", " Focus ", tab), + ("class:status-toolbar.key", "[Enter]"), + ("class:status-toolbar", " Accept "), + ("class:status-toolbar.key", "[F1]", f1), + ("class:status-toolbar", " Help ", f1), + ] + ) + + +class HistoryMargin(Margin): + """ + Margin for the history buffer. + This displays a green bar for the selected entries. + """ + + def __init__(self, history: PythonHistory) -> None: + self.history_buffer = history.history_buffer + self.history_mapping = history.history_mapping + + def get_width(self, get_ui_content: Callable[[], UIContent]) -> int: + return 2 + + def create_margin( + self, window_render_info: WindowRenderInfo, width: int, height: int + ) -> StyleAndTextTuples: + document = self.history_buffer.document + + lines_starting_new_entries = self.history_mapping.lines_starting_new_entries + selected_lines = self.history_mapping.selected_lines + + current_lineno = document.cursor_position_row + + visible_line_to_input_line = window_render_info.visible_line_to_input_line + result: StyleAndTextTuples = [] + + for y in range(height): + line_number = visible_line_to_input_line.get(y) + + # Show stars at the start of each entry. + # (Visualises multiline entries.) + if line_number in lines_starting_new_entries: + char = "*" + else: + char = " " + + if line_number in selected_lines: + t = "class:history-line,selected" + else: + t = "class:history-line" + + if line_number == current_lineno: + t = t + ",current" + + result.append((t, char)) + result.append(("", "\n")) + + return result + + +class ResultMargin(Margin): + """ + The margin to be shown in the result pane. + """ + + def __init__(self, history: PythonHistory) -> None: + self.history_mapping = history.history_mapping + self.history_buffer = history.history_buffer + + def get_width(self, get_ui_content: Callable[[], UIContent]) -> int: + return 2 + + def create_margin( + self, window_render_info: WindowRenderInfo, width: int, height: int + ) -> StyleAndTextTuples: + document = self.history_buffer.document + + current_lineno = document.cursor_position_row + offset = ( + self.history_mapping.result_line_offset + ) # original_document.cursor_position_row + + visible_line_to_input_line = window_render_info.visible_line_to_input_line + + result: StyleAndTextTuples = [] + + for y in range(height): + line_number = visible_line_to_input_line.get(y) + + if ( + line_number is None + or line_number < offset + or line_number >= offset + len(self.history_mapping.selected_lines) + ): + t = "" + elif line_number == current_lineno: + t = "class:history-line,selected,current" + else: + t = "class:history-line,selected" + + result.append((t, " ")) + result.append(("", "\n")) + + return result + + def invalidation_hash(self, document: Document) -> int: + return document.cursor_position_row + + +class GrayExistingText(Processor): + """ + Turn the existing input, before and after the inserted code gray. + """ + + def __init__(self, history_mapping: HistoryMapping) -> None: + self.history_mapping = history_mapping + self._lines_before = len( + history_mapping.original_document.text_before_cursor.splitlines() + ) + + def apply_transformation( + self, transformation_input: TransformationInput + ) -> Transformation: + lineno = transformation_input.lineno + fragments = transformation_input.fragments + + if lineno < self._lines_before or lineno >= self._lines_before + len( + self.history_mapping.selected_lines + ): + text = fragment_list_to_text(fragments) + return Transformation(fragments=[("class:history.existing-input", text)]) + else: + return Transformation(fragments=fragments) + + +class HistoryMapping: + """ + Keep a list of all the lines from the history and the selected lines. + """ + + def __init__( + self, + history: PythonHistory, + python_history: History, + original_document: Document, + ) -> None: + self.history = history + self.python_history = python_history + self.original_document = original_document + + self.lines_starting_new_entries = set() + self.selected_lines: set[int] = set() + + # Process history. + history_strings = python_history.get_strings() + history_lines: list[str] = [] + + for entry_nr, entry in list(enumerate(history_strings))[-HISTORY_COUNT:]: + self.lines_starting_new_entries.add(len(history_lines)) + + for line in entry.splitlines(): + history_lines.append(line) + + if len(history_strings) > HISTORY_COUNT: + history_lines[0] = ( + "# *** History has been truncated to %s lines ***" % HISTORY_COUNT + ) + + self.history_lines = history_lines + self.concatenated_history = "\n".join(history_lines) + + # Line offset. + if self.original_document.text_before_cursor: + self.result_line_offset = self.original_document.cursor_position_row + 1 + else: + self.result_line_offset = 0 + + def get_new_document(self, cursor_pos: int | None = None) -> Document: + """ + Create a `Document` instance that contains the resulting text. + """ + lines = [] + + # Original text, before cursor. + if self.original_document.text_before_cursor: + lines.append(self.original_document.text_before_cursor) + + # Selected entries from the history. + for line_no in sorted(self.selected_lines): + lines.append(self.history_lines[line_no]) + + # Original text, after cursor. + if self.original_document.text_after_cursor: + lines.append(self.original_document.text_after_cursor) + + # Create `Document` with cursor at the right position. + text = "\n".join(lines) + if cursor_pos is not None and cursor_pos > len(text): + cursor_pos = len(text) + return Document(text, cursor_pos) + + def update_default_buffer(self) -> None: + b = self.history.default_buffer + + b.set_document(self.get_new_document(b.cursor_position), bypass_readonly=True) + + +def _toggle_help(history: PythonHistory) -> None: + "Display/hide help." + help_buffer_control = history.history_layout.help_buffer_control + + if history.app.layout.current_control == help_buffer_control: + history.app.layout.focus_previous() + else: + history.app.layout.current_control = help_buffer_control + + +def _select_other_window(history: PythonHistory) -> None: + "Toggle focus between left/right window." + current_buffer = history.app.current_buffer + layout = history.history_layout.layout + + if current_buffer == history.history_buffer: + layout.current_control = history.history_layout.default_buffer_control + + elif current_buffer == history.default_buffer: + layout.current_control = history.history_layout.history_buffer_control + + +def create_key_bindings( + history: PythonHistory, + python_input: PythonInput, + history_mapping: HistoryMapping, +) -> KeyBindings: + """ + Key bindings. + """ + bindings = KeyBindings() + handle = bindings.add + + @handle(" ", filter=has_focus(history.history_buffer)) + def _(event: E) -> None: + """ + Space: select/deselect line from history pane. + """ + b = event.current_buffer + line_no = b.document.cursor_position_row + + if not history_mapping.history_lines: + # If we've no history, then nothing to do + return + + if line_no in history_mapping.selected_lines: + # Remove line. + history_mapping.selected_lines.remove(line_no) + history_mapping.update_default_buffer() + else: + # Add line. + history_mapping.selected_lines.add(line_no) + history_mapping.update_default_buffer() + + # Update cursor position + default_buffer = history.default_buffer + default_lineno = ( + sorted(history_mapping.selected_lines).index(line_no) + + history_mapping.result_line_offset + ) + default_buffer.cursor_position = ( + default_buffer.document.translate_row_col_to_index(default_lineno, 0) + ) + + # Also move the cursor to the next line. (This way they can hold + # space to select a region.) + b.cursor_position = b.document.translate_row_col_to_index(line_no + 1, 0) + + @handle(" ", filter=has_focus(DEFAULT_BUFFER)) + @handle("delete", filter=has_focus(DEFAULT_BUFFER)) + @handle("c-h", filter=has_focus(DEFAULT_BUFFER)) + def _(event: E) -> None: + """ + Space: remove line from default pane. + """ + b = event.current_buffer + line_no = b.document.cursor_position_row - history_mapping.result_line_offset + + if line_no >= 0: + try: + history_lineno = sorted(history_mapping.selected_lines)[line_no] + except IndexError: + pass # When `selected_lines` is an empty set. + else: + history_mapping.selected_lines.remove(history_lineno) + + history_mapping.update_default_buffer() + + help_focussed = has_focus(history.help_buffer) + main_buffer_focussed = has_focus(history.history_buffer) | has_focus( + history.default_buffer + ) + + @handle("tab", filter=main_buffer_focussed) + @handle("c-x", filter=main_buffer_focussed, eager=True) + # Eager: ignore the Emacs [Ctrl-X Ctrl-X] binding. + @handle("c-w", filter=main_buffer_focussed) + def _(event: E) -> None: + "Select other window." + _select_other_window(history) + + @handle("f4") + def _(event: E) -> None: + "Switch between Emacs/Vi mode." + python_input.vi_mode = not python_input.vi_mode + + @handle("f1") + def _(event: E) -> None: + "Display/hide help." + _toggle_help(history) + + @handle("enter", filter=help_focussed) + @handle("c-c", filter=help_focussed) + @handle("c-g", filter=help_focussed) + @handle("escape", filter=help_focussed) + def _(event: E) -> None: + "Leave help." + event.app.layout.focus_previous() + + @handle("q", filter=main_buffer_focussed) + @handle("f3", filter=main_buffer_focussed) + @handle("c-c", filter=main_buffer_focussed) + @handle("c-g", filter=main_buffer_focussed) + def _(event: E) -> None: + "Cancel and go back." + event.app.exit(result=None) + + @handle("enter", filter=main_buffer_focussed) + def _(event: E) -> None: + "Accept input." + event.app.exit(result=history.default_buffer.text) + + enable_system_bindings = Condition(lambda: python_input.enable_system_bindings) + + @handle("c-z", filter=enable_system_bindings) + def _(event: E) -> None: + "Suspend to background." + event.app.suspend_to_background() + + return bindings + + +class PythonHistory: + def __init__(self, python_input: PythonInput, original_document: Document) -> None: + """ + Create an `Application` for the history screen. + This has to be run as a sub application of `python_input`. + + When this application runs and returns, it returns the selected lines. + """ + self.python_input = python_input + + history_mapping = HistoryMapping(self, python_input.history, original_document) + self.history_mapping = history_mapping + + document = Document(history_mapping.concatenated_history) + document = Document( + document.text, + cursor_position=document.cursor_position + + document.get_start_of_line_position(), + ) + + def accept_handler(buffer: Buffer) -> bool: + get_app().exit(result=self.default_buffer.text) + return False + + self.history_buffer = Buffer( + document=document, + on_cursor_position_changed=self._history_buffer_pos_changed, + accept_handler=accept_handler, + read_only=True, + ) + + self.default_buffer = Buffer( + name=DEFAULT_BUFFER, + document=history_mapping.get_new_document(), + on_cursor_position_changed=self._default_buffer_pos_changed, + read_only=True, + ) + + self.help_buffer = Buffer(document=Document(HELP_TEXT, 0), read_only=True) + + self.history_layout = HistoryLayout(self) + + self.app: Application[str] = Application( + layout=self.history_layout.layout, + full_screen=True, + style=python_input._current_style, + mouse_support=Condition(lambda: python_input.enable_mouse_support), + key_bindings=create_key_bindings(self, python_input, history_mapping), + ) + + def _default_buffer_pos_changed(self, _: Buffer) -> None: + """When the cursor changes in the default buffer. Synchronize with + history buffer.""" + # Only when this buffer has the focus. + if self.app.current_buffer == self.default_buffer: + try: + line_no = ( + self.default_buffer.document.cursor_position_row + - self.history_mapping.result_line_offset + ) + + if line_no < 0: # When the cursor is above the inserted region. + raise IndexError + + history_lineno = sorted(self.history_mapping.selected_lines)[line_no] + except IndexError: + pass + else: + self.history_buffer.cursor_position = ( + self.history_buffer.document.translate_row_col_to_index( + history_lineno, 0 + ) + ) + + def _history_buffer_pos_changed(self, _: Buffer) -> None: + """When the cursor changes in the history buffer. Synchronize.""" + # Only when this buffer has the focus. + if self.app.current_buffer == self.history_buffer: + line_no = self.history_buffer.document.cursor_position_row + + if line_no in self.history_mapping.selected_lines: + default_lineno = ( + sorted(self.history_mapping.selected_lines).index(line_no) + + self.history_mapping.result_line_offset + ) + + self.default_buffer.cursor_position = ( + self.default_buffer.document.translate_row_col_to_index( + default_lineno, 0 + ) + ) diff --git a/ptpython/ipython.py b/ptpython/ipython.py new file mode 100644 index 0000000..ad0516a --- /dev/null +++ b/ptpython/ipython.py @@ -0,0 +1,339 @@ +""" + +Adaptor for using the input system of `prompt_toolkit` with the IPython +backend. + +This gives a powerful interactive shell that has a nice user interface, but +also the power of for instance all the %-magic functions that IPython has to +offer. + +""" +from __future__ import annotations + +from typing import Iterable +from warnings import warn + +from IPython import utils as ipy_utils +from IPython.core.inputtransformer2 import TransformerManager +from IPython.terminal.embed import InteractiveShellEmbed as _InteractiveShellEmbed +from IPython.terminal.ipapp import load_default_config +from prompt_toolkit.completion import ( + CompleteEvent, + Completer, + Completion, + PathCompleter, + WordCompleter, +) +from prompt_toolkit.contrib.completers import SystemCompleter +from prompt_toolkit.contrib.regular_languages.compiler import compile +from prompt_toolkit.contrib.regular_languages.completion import GrammarCompleter +from prompt_toolkit.contrib.regular_languages.lexer import GrammarLexer +from prompt_toolkit.document import Document +from prompt_toolkit.formatted_text import AnyFormattedText, PygmentsTokens +from prompt_toolkit.lexers import PygmentsLexer, SimpleLexer +from prompt_toolkit.styles import Style +from pygments.lexers import BashLexer, PythonLexer + +from ptpython.prompt_style import PromptStyle + +from .completer import PythonCompleter +from .python_input import PythonInput +from .repl import PyCF_ALLOW_TOP_LEVEL_AWAIT +from .style import default_ui_style +from .validator import PythonValidator + +__all__ = ["embed"] + + +class IPythonPrompt(PromptStyle): + """ + Style for IPython >5.0, use the prompt_toolkit tokens directly. + """ + + def __init__(self, prompts): + self.prompts = prompts + + def in_prompt(self) -> AnyFormattedText: + return PygmentsTokens(self.prompts.in_prompt_tokens()) + + def in2_prompt(self, width: int) -> AnyFormattedText: + return PygmentsTokens(self.prompts.continuation_prompt_tokens()) + + def out_prompt(self) -> AnyFormattedText: + return [] + + +class IPythonValidator(PythonValidator): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.isp = TransformerManager() + + def validate(self, document: Document) -> None: + document = Document(text=self.isp.transform_cell(document.text)) + super().validate(document) + + +def create_ipython_grammar(): + """ + Return compiled IPython grammar. + """ + return compile( + r""" + \s* + ( + (?P%)( + (?Ppycat|run|loadpy|load) \s+ (?P[^\s]+) | + (?Pcat) \s+ (?P[^\s]+) | + (?Ppushd|cd|ls) \s+ (?P[^\s]+) | + (?Ppdb) \s+ (?P[^\s]+) | + (?Pautocall) \s+ (?P[^\s]+) | + (?Ptime|timeit|prun) \s+ (?P.+) | + (?Ppsource|pfile|pinfo|pinfo2) \s+ (?P.+) | + (?Psystem) \s+ (?P.+) | + (?Punalias) \s+ (?P.+) | + (?P[^\s]+) .* | + ) .* | + !(?P.+) | + (?![%!]) (?P.+) + ) + \s* + """ + ) + + +def create_completer( + get_globals, + get_locals, + magics_manager, + alias_manager, + get_enable_dictionary_completion, +): + g = create_ipython_grammar() + + return GrammarCompleter( + g, + { + "python": PythonCompleter( + get_globals, get_locals, get_enable_dictionary_completion + ), + "magic": MagicsCompleter(magics_manager), + "alias_name": AliasCompleter(alias_manager), + "pdb_arg": WordCompleter(["on", "off"], ignore_case=True), + "autocall_arg": WordCompleter(["0", "1", "2"], ignore_case=True), + "py_filename": PathCompleter( + only_directories=False, file_filter=lambda name: name.endswith(".py") + ), + "filename": PathCompleter(only_directories=False), + "directory": PathCompleter(only_directories=True), + "system": SystemCompleter(), + }, + ) + + +def create_lexer(): + g = create_ipython_grammar() + + return GrammarLexer( + g, + lexers={ + "percent": SimpleLexer("class:pygments.operator"), + "magic": SimpleLexer("class:pygments.keyword"), + "filename": SimpleLexer("class:pygments.name"), + "python": PygmentsLexer(PythonLexer), + "system": PygmentsLexer(BashLexer), + }, + ) + + +class MagicsCompleter(Completer): + def __init__(self, magics_manager): + self.magics_manager = magics_manager + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + text = document.text_before_cursor.lstrip() + + for m in sorted(self.magics_manager.magics["line"]): + if m.startswith(text): + yield Completion("%s" % m, -len(text)) + + +class AliasCompleter(Completer): + def __init__(self, alias_manager): + self.alias_manager = alias_manager + + def get_completions( + self, document: Document, complete_event: CompleteEvent + ) -> Iterable[Completion]: + text = document.text_before_cursor.lstrip() + # aliases = [a for a, _ in self.alias_manager.aliases] + aliases = self.alias_manager.aliases + + for a, cmd in sorted(aliases, key=lambda a: a[0]): + if a.startswith(text): + yield Completion("%s" % a, -len(text), display_meta=cmd) + + +class IPythonInput(PythonInput): + """ + Override our `PythonCommandLineInterface` to add IPython specific stuff. + """ + + def __init__(self, ipython_shell, *a, **kw): + kw["_completer"] = create_completer( + kw["get_globals"], + kw["get_globals"], + ipython_shell.magics_manager, + ipython_shell.alias_manager, + lambda: self.enable_dictionary_completion, + ) + kw["_lexer"] = create_lexer() + kw["_validator"] = IPythonValidator(get_compiler_flags=self.get_compiler_flags) + + super().__init__(*a, **kw) + self.ipython_shell = ipython_shell + + self.all_prompt_styles["ipython"] = IPythonPrompt(ipython_shell.prompts) + self.prompt_style = "ipython" + + # UI style for IPython. Add tokens that are used by IPython>5.0 + style_dict = {} + style_dict.update(default_ui_style) + style_dict.update( + { + "pygments.prompt": "#009900", + "pygments.prompt-num": "#00ff00 bold", + "pygments.out-prompt": "#990000", + "pygments.out-prompt-num": "#ff0000 bold", + } + ) + + self.ui_styles = {"default": Style.from_dict(style_dict)} + self.use_ui_colorscheme("default") + + def get_compiler_flags(self): + flags = super().get_compiler_flags() + if self.ipython_shell.autoawait: + flags |= PyCF_ALLOW_TOP_LEVEL_AWAIT + return flags + + +class InteractiveShellEmbed(_InteractiveShellEmbed): + """ + Override the `InteractiveShellEmbed` from IPython, to replace the front-end + with our input shell. + + :param configure: Callable for configuring the repl. + """ + + def __init__(self, *a, **kw): + vi_mode = kw.pop("vi_mode", False) + history_filename = kw.pop("history_filename", None) + configure = kw.pop("configure", None) + title = kw.pop("title", None) + + # Don't ask IPython to confirm for exit. We have our own exit prompt. + self.confirm_exit = False + + super().__init__(*a, **kw) + + def get_globals(): + return self.user_ns + + python_input = IPythonInput( + self, + get_globals=get_globals, + vi_mode=vi_mode, + history_filename=history_filename, + ) + + if title: + python_input.terminal_title = title + + if configure: + configure(python_input) + python_input.prompt_style = "ipython" # Don't take from config. + + self.python_input = python_input + + def prompt_for_code(self) -> str: + try: + return self.python_input.app.run() + except KeyboardInterrupt: + self.python_input.default_buffer.document = Document() + return "" + + +def initialize_extensions(shell, extensions): + """ + Partial copy of `InteractiveShellApp.init_extensions` from IPython. + """ + try: + iter(extensions) + except TypeError: + pass # no extensions found + else: + for ext in extensions: + try: + shell.extension_manager.load_extension(ext) + except: + warn( + "Error in loading extension: %s" % ext + + "\nCheck your config files in %s" + % ipy_utils.path.get_ipython_dir() + ) + shell.showtraceback() + + +def run_exec_lines(shell, exec_lines): + """ + Partial copy of run_exec_lines code from IPython.core.shellapp . + """ + try: + iter(exec_lines) + except TypeError: + pass + else: + try: + for line in exec_lines: + try: + shell.run_cell(line, store_history=False) + except: + shell.showtraceback() + except: + shell.showtraceback() + + +def embed(**kwargs): + """ + Copied from `IPython/terminal/embed.py`, but using our `InteractiveShellEmbed` instead. + """ + config = kwargs.get("config") + header = kwargs.pop("header", "") + compile_flags = kwargs.pop("compile_flags", None) + if config is None: + config = load_default_config() + config.InteractiveShellEmbed = config.TerminalInteractiveShell + kwargs["config"] = config + shell = InteractiveShellEmbed.instance(**kwargs) + initialize_extensions(shell, config["InteractiveShellApp"]["extensions"]) + run_exec_lines(shell, config["InteractiveShellApp"]["exec_lines"]) + run_startup_scripts(shell) + shell(header=header, stack_depth=2, compile_flags=compile_flags) + + +def run_startup_scripts(shell): + """ + Contributed by linyuxu: + https://github.com/prompt-toolkit/ptpython/issues/126#issue-161242480 + """ + import glob + import os + + startup_dir = shell.profile_dir.startup_dir + startup_files = [] + startup_files += glob.glob(os.path.join(startup_dir, "*.py")) + startup_files += glob.glob(os.path.join(startup_dir, "*.ipy")) + for file in startup_files: + shell.run_cell(open(file).read()) diff --git a/ptpython/key_bindings.py b/ptpython/key_bindings.py new file mode 100644 index 0000000..d7bb575 --- /dev/null +++ b/ptpython/key_bindings.py @@ -0,0 +1,337 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from prompt_toolkit.application import get_app +from prompt_toolkit.buffer import Buffer +from prompt_toolkit.document import Document +from prompt_toolkit.enums import DEFAULT_BUFFER +from prompt_toolkit.filters import ( + Condition, + emacs_insert_mode, + emacs_mode, + has_focus, + has_selection, + vi_insert_mode, +) +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.key_binding.bindings.named_commands import get_by_name +from prompt_toolkit.key_binding.key_processor import KeyPressEvent +from prompt_toolkit.keys import Keys + +from .utils import document_is_multiline_python + +if TYPE_CHECKING: + from .python_input import PythonInput + +__all__ = [ + "load_python_bindings", + "load_sidebar_bindings", + "load_confirm_exit_bindings", +] + +E = KeyPressEvent + + +@Condition +def tab_should_insert_whitespace() -> bool: + """ + When the 'tab' key is pressed with only whitespace character before the + cursor, do autocompletion. Otherwise, insert indentation. + + Except for the first character at the first line. Then always do a + completion. It doesn't make sense to start the first line with + indentation. + """ + b = get_app().current_buffer + before_cursor = b.document.current_line_before_cursor + + return bool(b.text and (not before_cursor or before_cursor.isspace())) + + +def load_python_bindings(python_input: PythonInput) -> KeyBindings: + """ + Custom key bindings. + """ + bindings = KeyBindings() + + sidebar_visible = Condition(lambda: python_input.show_sidebar) + handle = bindings.add + + @handle("c-l") + def _(event: E) -> None: + """ + Clear whole screen and render again -- also when the sidebar is visible. + """ + event.app.renderer.clear() + + @handle("c-z") + def _(event: E) -> None: + """ + Suspend. + """ + if python_input.enable_system_bindings: + event.app.suspend_to_background() + + # Delete word before cursor, but use all Python symbols as separators + # (WORD=False). + handle("c-w")(get_by_name("backward-kill-word")) + + @handle("f2") + def _(event: E) -> None: + """ + Show/hide sidebar. + """ + python_input.show_sidebar = not python_input.show_sidebar + if python_input.show_sidebar: + event.app.layout.focus(python_input.ptpython_layout.sidebar) + else: + event.app.layout.focus_last() + + @handle("f3") + def _(event: E) -> None: + """ + Select from the history. + """ + python_input.enter_history() + + @handle("f4") + def _(event: E) -> None: + """ + Toggle between Vi and Emacs mode. + """ + python_input.vi_mode = not python_input.vi_mode + + @handle("f6") + def _(event: E) -> None: + """ + Enable/Disable paste mode. + """ + python_input.paste_mode = not python_input.paste_mode + + @handle( + "tab", filter=~sidebar_visible & ~has_selection & tab_should_insert_whitespace + ) + def _(event: E) -> None: + """ + When tab should insert whitespace, do that instead of completion. + """ + event.app.current_buffer.insert_text(" ") + + @Condition + def is_multiline() -> bool: + return document_is_multiline_python(python_input.default_buffer.document) + + @handle( + "enter", + filter=~sidebar_visible + & ~has_selection + & (vi_insert_mode | emacs_insert_mode) + & has_focus(DEFAULT_BUFFER) + & ~is_multiline, + ) + @handle(Keys.Escape, Keys.Enter, filter=~sidebar_visible & emacs_mode) + def _(event: E) -> None: + """ + Accept input (for single line input). + """ + b = event.current_buffer + + if b.validate(): + # When the cursor is at the end, and we have an empty line: + # drop the empty lines, but return the value. + b.document = Document( + text=b.text.rstrip(), cursor_position=len(b.text.rstrip()) + ) + + b.validate_and_handle() + + @handle( + "enter", + filter=~sidebar_visible + & ~has_selection + & (vi_insert_mode | emacs_insert_mode) + & has_focus(DEFAULT_BUFFER) + & is_multiline, + ) + def _(event: E) -> None: + """ + Behaviour of the Enter key. + + Auto indent after newline/Enter. + (When not in Vi navigation mode, and when multiline is enabled.) + """ + b = event.current_buffer + empty_lines_required = python_input.accept_input_on_enter or 10000 + + def at_the_end(b: Buffer) -> bool: + """we consider the cursor at the end when there is no text after + the cursor, or only whitespace.""" + text = b.document.text_after_cursor + return text == "" or (text.isspace() and "\n" not in text) + + if python_input.paste_mode: + # In paste mode, always insert text. + b.insert_text("\n") + + elif at_the_end(b) and b.document.text.replace(" ", "").endswith( + "\n" * (empty_lines_required - 1) + ): + # When the cursor is at the end, and we have an empty line: + # drop the empty lines, but return the value. + if b.validate(): + b.document = Document( + text=b.text.rstrip(), cursor_position=len(b.text.rstrip()) + ) + + b.validate_and_handle() + else: + auto_newline(b) + + @handle( + "c-d", + filter=~sidebar_visible + & has_focus(python_input.default_buffer) + & Condition( + lambda: + # The current buffer is empty. + not get_app().current_buffer.text + ), + ) + def _(event: E) -> None: + """ + Override Control-D exit, to ask for confirmation. + """ + if python_input.confirm_exit: + # Show exit confirmation and focus it (focusing is important for + # making sure the default buffer key bindings are not active). + python_input.show_exit_confirmation = True + python_input.app.layout.focus( + python_input.ptpython_layout.exit_confirmation + ) + else: + event.app.exit(exception=EOFError) + + @handle("c-c", filter=has_focus(python_input.default_buffer)) + def _(event: E) -> None: + "Abort when Control-C has been pressed." + event.app.exit(exception=KeyboardInterrupt, style="class:aborting") + + return bindings + + +def load_sidebar_bindings(python_input: PythonInput) -> KeyBindings: + """ + Load bindings for the navigation in the sidebar. + """ + bindings = KeyBindings() + + handle = bindings.add + sidebar_visible = Condition(lambda: python_input.show_sidebar) + + @handle("up", filter=sidebar_visible) + @handle("c-p", filter=sidebar_visible) + @handle("k", filter=sidebar_visible) + def _(event: E) -> None: + "Go to previous option." + python_input.selected_option_index = ( + python_input.selected_option_index - 1 + ) % python_input.option_count + + @handle("down", filter=sidebar_visible) + @handle("c-n", filter=sidebar_visible) + @handle("j", filter=sidebar_visible) + def _(event: E) -> None: + "Go to next option." + python_input.selected_option_index = ( + python_input.selected_option_index + 1 + ) % python_input.option_count + + @handle("right", filter=sidebar_visible) + @handle("l", filter=sidebar_visible) + @handle(" ", filter=sidebar_visible) + def _(event: E) -> None: + "Select next value for current option." + option = python_input.selected_option + option.activate_next() + + @handle("left", filter=sidebar_visible) + @handle("h", filter=sidebar_visible) + def _(event: E) -> None: + "Select previous value for current option." + option = python_input.selected_option + option.activate_previous() + + @handle("c-c", filter=sidebar_visible) + @handle("c-d", filter=sidebar_visible) + @handle("c-d", filter=sidebar_visible) + @handle("enter", filter=sidebar_visible) + @handle("escape", filter=sidebar_visible) + def _(event: E) -> None: + "Hide sidebar." + python_input.show_sidebar = False + event.app.layout.focus_last() + + return bindings + + +def load_confirm_exit_bindings(python_input: PythonInput) -> KeyBindings: + """ + Handle yes/no key presses when the exit confirmation is shown. + """ + bindings = KeyBindings() + + handle = bindings.add + confirmation_visible = Condition(lambda: python_input.show_exit_confirmation) + + @handle("y", filter=confirmation_visible) + @handle("Y", filter=confirmation_visible) + @handle("enter", filter=confirmation_visible) + @handle("c-d", filter=confirmation_visible) + def _(event: E) -> None: + """ + Really quit. + """ + event.app.exit(exception=EOFError, style="class:exiting") + + @handle(Keys.Any, filter=confirmation_visible) + def _(event: E) -> None: + """ + Cancel exit. + """ + python_input.show_exit_confirmation = False + python_input.app.layout.focus_previous() + + return bindings + + +def auto_newline(buffer: Buffer) -> None: + r""" + Insert \n at the cursor position. Also add necessary padding. + """ + insert_text = buffer.insert_text + + if buffer.document.current_line_after_cursor: + # When we are in the middle of a line. Always insert a newline. + insert_text("\n") + else: + # Go to new line, but also add indentation. + current_line = buffer.document.current_line_before_cursor.rstrip() + insert_text("\n") + + # Unident if the last line ends with 'pass', remove four spaces. + unindent = current_line.rstrip().endswith(" pass") + + # Copy whitespace from current line + current_line2 = current_line[4:] if unindent else current_line + + for c in current_line2: + if c.isspace(): + insert_text(c) + else: + break + + # If the last line ends with a colon, add four extra spaces. + if current_line[-1:] == ":": + for x in range(4): + insert_text(" ") diff --git a/ptpython/layout.py b/ptpython/layout.py new file mode 100644 index 0000000..2c1ec15 --- /dev/null +++ b/ptpython/layout.py @@ -0,0 +1,773 @@ +""" +Creation of the `Layout` instance for the Python input/REPL. +""" +from __future__ import annotations + +import platform +import sys +from enum import Enum +from inspect import _ParameterKind as ParameterKind +from typing import TYPE_CHECKING, Any + +from prompt_toolkit.application import get_app +from prompt_toolkit.enums import DEFAULT_BUFFER, SEARCH_BUFFER +from prompt_toolkit.filters import ( + Condition, + has_focus, + is_done, + renderer_height_is_known, +) +from prompt_toolkit.formatted_text import fragment_list_width, to_formatted_text +from prompt_toolkit.formatted_text.base import StyleAndTextTuples +from prompt_toolkit.key_binding.vi_state import InputMode +from prompt_toolkit.layout.containers import ( + AnyContainer, + ConditionalContainer, + Container, + Float, + FloatContainer, + HSplit, + ScrollOffsets, + VSplit, + Window, +) +from prompt_toolkit.layout.controls import BufferControl, FormattedTextControl +from prompt_toolkit.layout.dimension import AnyDimension, Dimension +from prompt_toolkit.layout.layout import Layout +from prompt_toolkit.layout.margins import PromptMargin +from prompt_toolkit.layout.menus import CompletionsMenu, MultiColumnCompletionsMenu +from prompt_toolkit.layout.processors import ( + AppendAutoSuggestion, + ConditionalProcessor, + DisplayMultipleCursors, + HighlightIncrementalSearchProcessor, + HighlightMatchingBracketProcessor, + HighlightSelectionProcessor, + Processor, + TabsProcessor, +) +from prompt_toolkit.lexers import Lexer, SimpleLexer +from prompt_toolkit.mouse_events import MouseEvent +from prompt_toolkit.selection import SelectionType +from prompt_toolkit.widgets.toolbars import ( + ArgToolbar, + CompletionsToolbar, + SearchToolbar, + SystemToolbar, + ValidationToolbar, +) + +from .filters import HasSignature, ShowDocstring, ShowSidebar, ShowSignature +from .prompt_style import PromptStyle +from .utils import if_mousedown + +if TYPE_CHECKING: + from .python_input import OptionCategory, PythonInput + +__all__ = ["PtPythonLayout", "CompletionVisualisation"] + + +class CompletionVisualisation(Enum): + "Visualisation method for the completions." + + NONE = "none" + POP_UP = "pop-up" + MULTI_COLUMN = "multi-column" + TOOLBAR = "toolbar" + + +def show_completions_toolbar(python_input: PythonInput) -> Condition: + return Condition( + lambda: python_input.completion_visualisation == CompletionVisualisation.TOOLBAR + ) + + +def show_completions_menu(python_input: PythonInput) -> Condition: + return Condition( + lambda: python_input.completion_visualisation == CompletionVisualisation.POP_UP + ) + + +def show_multi_column_completions_menu(python_input: PythonInput) -> Condition: + return Condition( + lambda: python_input.completion_visualisation + == CompletionVisualisation.MULTI_COLUMN + ) + + +def python_sidebar(python_input: PythonInput) -> Window: + """ + Create the `Layout` for the sidebar with the configurable options. + """ + + def get_text_fragments() -> StyleAndTextTuples: + tokens: StyleAndTextTuples = [] + + def append_category(category: OptionCategory[Any]) -> None: + tokens.extend( + [ + ("class:sidebar", " "), + ("class:sidebar.title", " %-36s" % category.title), + ("class:sidebar", "\n"), + ] + ) + + def append(index: int, label: str, status: str) -> None: + selected = index == python_input.selected_option_index + + @if_mousedown + def select_item(mouse_event: MouseEvent) -> None: + python_input.selected_option_index = index + + @if_mousedown + def goto_next(mouse_event: MouseEvent) -> None: + "Select item and go to next value." + python_input.selected_option_index = index + option = python_input.selected_option + option.activate_next() + + sel = ",selected" if selected else "" + + tokens.append(("class:sidebar" + sel, " >" if selected else " ")) + tokens.append(("class:sidebar.label" + sel, "%-24s" % label, select_item)) + tokens.append(("class:sidebar.status" + sel, " ", select_item)) + tokens.append(("class:sidebar.status" + sel, "%s" % status, goto_next)) + + if selected: + tokens.append(("[SetCursorPosition]", "")) + + tokens.append( + ("class:sidebar.status" + sel, " " * (13 - len(status)), goto_next) + ) + tokens.append(("class:sidebar", "<" if selected else "")) + tokens.append(("class:sidebar", "\n")) + + i = 0 + for category in python_input.options: + append_category(category) + + for option in category.options: + append(i, option.title, str(option.get_current_value())) + i += 1 + + tokens.pop() # Remove last newline. + + return tokens + + class Control(FormattedTextControl): + def move_cursor_down(self) -> None: + python_input.selected_option_index += 1 + + def move_cursor_up(self) -> None: + python_input.selected_option_index -= 1 + + return Window( + Control(get_text_fragments), + style="class:sidebar", + width=Dimension.exact(43), + height=Dimension(min=3), + scroll_offsets=ScrollOffsets(top=1, bottom=1), + ) + + +def python_sidebar_navigation(python_input: PythonInput) -> Window: + """ + Create the `Layout` showing the navigation information for the sidebar. + """ + + def get_text_fragments() -> StyleAndTextTuples: + # Show navigation info. + return [ + ("class:sidebar", " "), + ("class:sidebar.key", "[Arrows]"), + ("class:sidebar", " "), + ("class:sidebar.description", "Navigate"), + ("class:sidebar", " "), + ("class:sidebar.key", "[Enter]"), + ("class:sidebar", " "), + ("class:sidebar.description", "Hide menu"), + ] + + return Window( + FormattedTextControl(get_text_fragments), + style="class:sidebar", + width=Dimension.exact(43), + height=Dimension.exact(1), + ) + + +def python_sidebar_help(python_input: PythonInput) -> Container: + """ + Create the `Layout` for the help text for the current item in the sidebar. + """ + token = "class:sidebar.helptext" + + def get_current_description() -> str: + """ + Return the description of the selected option. + """ + i = 0 + for category in python_input.options: + for option in category.options: + if i == python_input.selected_option_index: + return option.description + i += 1 + return "" + + def get_help_text() -> StyleAndTextTuples: + return [(token, get_current_description())] + + return ConditionalContainer( + content=Window( + FormattedTextControl(get_help_text), + style=token, + height=Dimension(min=3), + wrap_lines=True, + ), + filter=ShowSidebar(python_input) + & Condition(lambda: python_input.show_sidebar_help) + & ~is_done, + ) + + +def signature_toolbar(python_input: PythonInput) -> Container: + """ + Return the `Layout` for the signature. + """ + + def get_text_fragments() -> StyleAndTextTuples: + result: StyleAndTextTuples = [] + append = result.append + Signature = "class:signature-toolbar" + + if python_input.signatures: + sig = python_input.signatures[0] # Always take the first one. + + append((Signature, " ")) + try: + append((Signature, sig.name)) + except IndexError: + # Workaround for #37: https://github.com/jonathanslenders/python-prompt-toolkit/issues/37 + # See also: https://github.com/davidhalter/jedi/issues/490 + return [] + + append((Signature + ",operator", "(")) + + got_positional_only = False + got_keyword_only = False + + for i, p in enumerate(sig.parameters): + # Detect transition between positional-only and not positional-only. + if p.kind == ParameterKind.POSITIONAL_ONLY: + got_positional_only = True + if got_positional_only and p.kind != ParameterKind.POSITIONAL_ONLY: + got_positional_only = False + append((Signature, "/")) + append((Signature + ",operator", ", ")) + + if not got_keyword_only and p.kind == ParameterKind.KEYWORD_ONLY: + got_keyword_only = True + append((Signature, "*")) + append((Signature + ",operator", ", ")) + + sig_index = getattr(sig, "index", 0) + + if i == sig_index: + # Note: we use `_Param.description` instead of + # `_Param.name`, that way we also get the '*' before args. + append((Signature + ",current-name", p.description)) + else: + append((Signature, p.description)) + + if p.default: + # NOTE: For the jedi-based completion, the default is + # currently still part of the name. + append((Signature, f"={p.default}")) + + append((Signature + ",operator", ", ")) + + if sig.parameters: + # Pop last comma + result.pop() + + append((Signature + ",operator", ")")) + append((Signature, " ")) + return result + + return ConditionalContainer( + content=Window( + FormattedTextControl(get_text_fragments), height=Dimension.exact(1) + ), + # Show only when there is a signature + filter=HasSignature(python_input) + & + # Signature needs to be shown. + ShowSignature(python_input) + & + # And no sidebar is visible. + ~ShowSidebar(python_input) + & + # Not done yet. + ~is_done, + ) + + +class PythonPromptMargin(PromptMargin): + """ + Create margin that displays the prompt. + It shows something like "In [1]:". + """ + + def __init__(self, python_input: PythonInput) -> None: + self.python_input = python_input + + def get_prompt_style() -> PromptStyle: + return python_input.all_prompt_styles[python_input.prompt_style] + + def get_prompt() -> StyleAndTextTuples: + return to_formatted_text(get_prompt_style().in_prompt()) + + def get_continuation( + width: int, line_number: int, is_soft_wrap: bool + ) -> StyleAndTextTuples: + if python_input.show_line_numbers and not is_soft_wrap: + text = ("%i " % (line_number + 1)).rjust(width) + return [("class:line-number", text)] + else: + return to_formatted_text(get_prompt_style().in2_prompt(width)) + + super().__init__(get_prompt, get_continuation) + + +def status_bar(python_input: PythonInput) -> Container: + """ + Create the `Layout` for the status bar. + """ + TB = "class:status-toolbar" + + @if_mousedown + def toggle_paste_mode(mouse_event: MouseEvent) -> None: + python_input.paste_mode = not python_input.paste_mode + + @if_mousedown + def enter_history(mouse_event: MouseEvent) -> None: + python_input.enter_history() + + def get_text_fragments() -> StyleAndTextTuples: + python_buffer = python_input.default_buffer + + result: StyleAndTextTuples = [] + append = result.append + + append((TB, " ")) + result.extend(get_inputmode_fragments(python_input)) + append((TB, " ")) + + # Position in history. + append( + ( + TB, + "%i/%i " + % (python_buffer.working_index + 1, len(python_buffer._working_lines)), + ) + ) + + # Shortcuts. + app = get_app() + if ( + not python_input.vi_mode + and app.current_buffer == python_input.search_buffer + ): + append((TB, "[Ctrl-G] Cancel search [Enter] Go to this position.")) + elif bool(app.current_buffer.selection_state) and not python_input.vi_mode: + # Emacs cut/copy keys. + append((TB, "[Ctrl-W] Cut [Meta-W] Copy [Ctrl-Y] Paste [Ctrl-G] Cancel")) + else: + result.extend( + [ + (TB + " class:status-toolbar.key", "[F3]", enter_history), + (TB, " History ", enter_history), + (TB + " class:status-toolbar.key", "[F6]", toggle_paste_mode), + (TB, " ", toggle_paste_mode), + ] + ) + + if python_input.paste_mode: + append( + (TB + " class:paste-mode-on", "Paste mode (on)", toggle_paste_mode) + ) + else: + append((TB, "Paste mode", toggle_paste_mode)) + + return result + + return ConditionalContainer( + content=Window(content=FormattedTextControl(get_text_fragments), style=TB), + filter=~is_done + & renderer_height_is_known + & Condition( + lambda: python_input.show_status_bar + and not python_input.show_exit_confirmation + ), + ) + + +def get_inputmode_fragments(python_input: PythonInput) -> StyleAndTextTuples: + """ + Return current input mode as a list of (token, text) tuples for use in a + toolbar. + """ + app = get_app() + + @if_mousedown + def toggle_vi_mode(mouse_event: MouseEvent) -> None: + python_input.vi_mode = not python_input.vi_mode + + token = "class:status-toolbar" + input_mode_t = "class:status-toolbar.input-mode" + + mode = app.vi_state.input_mode + result: StyleAndTextTuples = [] + append = result.append + + if python_input.title: + result.extend(to_formatted_text(python_input.title)) + + append((input_mode_t, "[F4] ", toggle_vi_mode)) + + # InputMode + if python_input.vi_mode: + recording_register = app.vi_state.recording_register + if recording_register: + append((token, " ")) + append((token + " class:record", f"RECORD({recording_register})")) + append((token, " - ")) + + if app.current_buffer.selection_state is not None: + if app.current_buffer.selection_state.type == SelectionType.LINES: + append((input_mode_t, "Vi (VISUAL LINE)", toggle_vi_mode)) + elif app.current_buffer.selection_state.type == SelectionType.CHARACTERS: + append((input_mode_t, "Vi (VISUAL)", toggle_vi_mode)) + append((token, " ")) + elif app.current_buffer.selection_state.type == SelectionType.BLOCK: + append((input_mode_t, "Vi (VISUAL BLOCK)", toggle_vi_mode)) + append((token, " ")) + elif mode in (InputMode.INSERT, "vi-insert-multiple"): + append((input_mode_t, "Vi (INSERT)", toggle_vi_mode)) + append((token, " ")) + elif mode == InputMode.NAVIGATION: + append((input_mode_t, "Vi (NAV)", toggle_vi_mode)) + append((token, " ")) + elif mode == InputMode.REPLACE: + append((input_mode_t, "Vi (REPLACE)", toggle_vi_mode)) + append((token, " ")) + else: + if app.emacs_state.is_recording: + append((token, " ")) + append((token + " class:record", "RECORD")) + append((token, " - ")) + + append((input_mode_t, "Emacs", toggle_vi_mode)) + append((token, " ")) + + return result + + +def show_sidebar_button_info(python_input: PythonInput) -> Container: + """ + Create `Layout` for the information in the right-bottom corner. + (The right part of the status bar.) + """ + + @if_mousedown + def toggle_sidebar(mouse_event: MouseEvent) -> None: + "Click handler for the menu." + python_input.show_sidebar = not python_input.show_sidebar + + version = sys.version_info + tokens: StyleAndTextTuples = [ + ("class:status-toolbar.key", "[F2]", toggle_sidebar), + ("class:status-toolbar", " Menu", toggle_sidebar), + ("class:status-toolbar", " - "), + ( + "class:status-toolbar.python-version", + "%s %i.%i.%i" + % (platform.python_implementation(), version[0], version[1], version[2]), + ), + ("class:status-toolbar", " "), + ] + width = fragment_list_width(tokens) + + def get_text_fragments() -> StyleAndTextTuples: + # Python version + return tokens + + return ConditionalContainer( + content=Window( + FormattedTextControl(get_text_fragments), + style="class:status-toolbar", + height=Dimension.exact(1), + width=Dimension.exact(width), + ), + filter=~is_done + & renderer_height_is_known + & Condition( + lambda: python_input.show_status_bar + and not python_input.show_exit_confirmation + ), + ) + + +def create_exit_confirmation( + python_input: PythonInput, style: str = "class:exit-confirmation" +) -> Container: + """ + Create `Layout` for the exit message. + """ + + def get_text_fragments() -> StyleAndTextTuples: + # Show "Do you really want to exit?" + return [ + (style, "\n %s ([y]/n) " % python_input.exit_message), + ("[SetCursorPosition]", ""), + (style, " \n"), + ] + + visible = ~is_done & Condition(lambda: python_input.show_exit_confirmation) + + return ConditionalContainer( + content=Window( + FormattedTextControl(get_text_fragments, focusable=True), style=style + ), + filter=visible, + ) + + +def meta_enter_message(python_input: PythonInput) -> Container: + """ + Create the `Layout` for the 'Meta+Enter` message. + """ + + def get_text_fragments() -> StyleAndTextTuples: + return [("class:accept-message", " [Meta+Enter] Execute ")] + + @Condition + def extra_condition() -> bool: + "Only show when..." + b = python_input.default_buffer + + return ( + python_input.show_meta_enter_message + and ( + not b.document.is_cursor_at_the_end + or python_input.accept_input_on_enter is None + ) + and "\n" in b.text + ) + + visible = ~is_done & has_focus(DEFAULT_BUFFER) & extra_condition + + return ConditionalContainer( + content=Window(FormattedTextControl(get_text_fragments)), filter=visible + ) + + +class PtPythonLayout: + def __init__( + self, + python_input: PythonInput, + lexer: Lexer, + extra_body: AnyContainer | None = None, + extra_toolbars: list[AnyContainer] | None = None, + extra_buffer_processors: list[Processor] | None = None, + input_buffer_height: AnyDimension | None = None, + ) -> None: + D = Dimension + extra_body_list: list[AnyContainer] = [extra_body] if extra_body else [] + extra_toolbars = extra_toolbars or [] + + input_buffer_height = input_buffer_height or D(min=6) + + search_toolbar = SearchToolbar(python_input.search_buffer) + + def create_python_input_window() -> Window: + def menu_position() -> int | None: + """ + When there is no autocompletion menu to be shown, and we have a + signature, set the pop-up position at `bracket_start`. + """ + b = python_input.default_buffer + + if python_input.signatures: + row, col = python_input.signatures[0].bracket_start + index = b.document.translate_row_col_to_index(row - 1, col) + return index + return None + + return Window( + BufferControl( + buffer=python_input.default_buffer, + search_buffer_control=search_toolbar.control, + lexer=lexer, + include_default_input_processors=False, + input_processors=[ + ConditionalProcessor( + processor=HighlightIncrementalSearchProcessor(), + filter=has_focus(SEARCH_BUFFER) + | has_focus(search_toolbar.control), + ), + HighlightSelectionProcessor(), + DisplayMultipleCursors(), + TabsProcessor(), + # Show matching parentheses, but only while editing. + ConditionalProcessor( + processor=HighlightMatchingBracketProcessor(chars="[](){}"), + filter=has_focus(DEFAULT_BUFFER) + & ~is_done + & Condition( + lambda: python_input.highlight_matching_parenthesis + ), + ), + ConditionalProcessor( + processor=AppendAutoSuggestion(), filter=~is_done + ), + ] + + (extra_buffer_processors or []), + menu_position=menu_position, + # Make sure that we always see the result of an reverse-i-search: + preview_search=True, + ), + left_margins=[PythonPromptMargin(python_input)], + # Scroll offsets. The 1 at the bottom is important to make sure + # the cursor is never below the "Press [Meta+Enter]" message + # which is a float. + scroll_offsets=ScrollOffsets(bottom=1, left=4, right=4), + # As long as we're editing, prefer a minimal height of 6. + height=( + lambda: ( + None + if get_app().is_done or python_input.show_exit_confirmation + else input_buffer_height + ) + ), + wrap_lines=Condition(lambda: python_input.wrap_lines), + ) + + sidebar = python_sidebar(python_input) + self.exit_confirmation = create_exit_confirmation(python_input) + + self.root_container = HSplit( + [ + VSplit( + [ + HSplit( + [ + FloatContainer( + content=HSplit( + [create_python_input_window()] + extra_body_list + ), + floats=[ + Float( + xcursor=True, + ycursor=True, + content=HSplit( + [ + signature_toolbar(python_input), + ConditionalContainer( + content=CompletionsMenu( + scroll_offset=( + lambda: python_input.completion_menu_scroll_offset + ), + max_height=12, + ), + filter=show_completions_menu( + python_input + ), + ), + ConditionalContainer( + content=MultiColumnCompletionsMenu(), + filter=show_multi_column_completions_menu( + python_input + ), + ), + ] + ), + ), + Float( + left=2, + bottom=1, + content=self.exit_confirmation, + ), + Float( + bottom=0, + right=0, + height=1, + content=meta_enter_message(python_input), + hide_when_covering_content=True, + ), + Float( + bottom=1, + left=1, + right=0, + content=python_sidebar_help(python_input), + ), + ], + ), + ArgToolbar(), + search_toolbar, + SystemToolbar(), + ValidationToolbar(), + ConditionalContainer( + content=CompletionsToolbar(), + filter=show_completions_toolbar(python_input) + & ~is_done, + ), + # Docstring region. + ConditionalContainer( + content=Window( + height=D.exact(1), + char="\u2500", + style="class:separator", + ), + filter=HasSignature(python_input) + & ShowDocstring(python_input) + & ~is_done, + ), + ConditionalContainer( + content=Window( + BufferControl( + buffer=python_input.docstring_buffer, + lexer=SimpleLexer(style="class:docstring"), + # lexer=PythonLexer, + ), + height=D(max=12), + ), + filter=HasSignature(python_input) + & ShowDocstring(python_input) + & ~is_done, + ), + ] + ), + ConditionalContainer( + content=HSplit( + [ + sidebar, + Window(style="class:sidebar,separator", height=1), + python_sidebar_navigation(python_input), + ] + ), + filter=ShowSidebar(python_input) & ~is_done, + ), + ] + ) + ] + + extra_toolbars + + [ + VSplit( + [status_bar(python_input), show_sidebar_button_info(python_input)] + ) + ] + ) + + self.layout = Layout(self.root_container) + self.sidebar = sidebar diff --git a/ptpython/lexer.py b/ptpython/lexer.py new file mode 100644 index 0000000..d925e95 --- /dev/null +++ b/ptpython/lexer.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing import Callable + +from prompt_toolkit.document import Document +from prompt_toolkit.formatted_text import StyleAndTextTuples +from prompt_toolkit.lexers import Lexer, PygmentsLexer +from pygments.lexers import BashLexer +from pygments.lexers import Python3Lexer as PythonLexer + +__all__ = ["PtpythonLexer"] + + +class PtpythonLexer(Lexer): + """ + Lexer for ptpython input. + + If the input starts with an exclamation mark, use a Bash lexer, otherwise, + use a Python 3 lexer. + """ + + def __init__(self, python_lexer: Lexer | None = None) -> None: + self.python_lexer = python_lexer or PygmentsLexer(PythonLexer) + self.system_lexer = PygmentsLexer(BashLexer) + + def lex_document(self, document: Document) -> Callable[[int], StyleAndTextTuples]: + if document.text.startswith("!"): + return self.system_lexer.lex_document(document) + + return self.python_lexer.lex_document(document) diff --git a/ptpython/printer.py b/ptpython/printer.py new file mode 100644 index 0000000..85bd9c8 --- /dev/null +++ b/ptpython/printer.py @@ -0,0 +1,435 @@ +from __future__ import annotations + +import sys +import traceback +from dataclasses import dataclass +from enum import Enum +from typing import Generator, Iterable + +from prompt_toolkit.formatted_text import ( + HTML, + AnyFormattedText, + FormattedText, + OneStyleAndTextTuple, + StyleAndTextTuples, + fragment_list_width, + merge_formatted_text, + to_formatted_text, +) +from prompt_toolkit.formatted_text.utils import split_lines +from prompt_toolkit.input import Input +from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent +from prompt_toolkit.output import Output +from prompt_toolkit.shortcuts import PromptSession, print_formatted_text +from prompt_toolkit.styles import BaseStyle, StyleTransformation +from prompt_toolkit.styles.pygments import pygments_token_to_classname +from prompt_toolkit.utils import get_cwidth +from pygments.lexers import PythonLexer, PythonTracebackLexer + +__all__ = ["OutputPrinter"] + +# Never reformat results larger than this: +MAX_REFORMAT_SIZE = 1_000_000 + + +@dataclass +class OutputPrinter: + """ + Result printer. + + Usage:: + + printer = OutputPrinter(...) + printer.display_result(...) + printer.display_exception(...) + """ + + output: Output + input: Input + style: BaseStyle + title: AnyFormattedText + style_transformation: StyleTransformation + + def display_result( + self, + result: object, + *, + out_prompt: AnyFormattedText, + reformat: bool, + highlight: bool, + paginate: bool, + ) -> None: + """ + Show __repr__ (or `__pt_repr__`) for an `eval` result and print to output. + + :param reformat: Reformat result using 'black' before printing if the + result is parsable as Python code. + :param highlight: Syntax highlight the result. + :param paginate: Show paginator when the result does not fit on the + screen. + """ + out_prompt = to_formatted_text(out_prompt) + out_prompt_width = fragment_list_width(out_prompt) + + result = self._insert_out_prompt_and_split_lines( + self._format_result_output( + result, + reformat=reformat, + highlight=highlight, + line_length=self.output.get_size().columns - out_prompt_width, + paginate=paginate, + ), + out_prompt=out_prompt, + ) + self._display_result(result, paginate=paginate) + + def display_exception( + self, e: BaseException, *, highlight: bool, paginate: bool + ) -> None: + """ + Render an exception. + """ + result = self._insert_out_prompt_and_split_lines( + self._format_exception_output(e, highlight=highlight), + out_prompt="", + ) + self._display_result(result, paginate=paginate) + + def display_style_and_text_tuples( + self, + result: Iterable[OneStyleAndTextTuple], + *, + paginate: bool, + ) -> None: + self._display_result( + self._insert_out_prompt_and_split_lines(result, out_prompt=""), + paginate=paginate, + ) + + def _display_result( + self, + lines: Iterable[StyleAndTextTuples], + *, + paginate: bool, + ) -> None: + if paginate: + self._print_paginated_formatted_text(lines) + else: + for line in lines: + self._print_formatted_text(line) + + self.output.flush() + + def _print_formatted_text(self, line: StyleAndTextTuples, end: str = "\n") -> None: + print_formatted_text( + FormattedText(line), + style=self.style, + style_transformation=self.style_transformation, + include_default_pygments_style=False, + output=self.output, + end=end, + ) + + def _format_result_output( + self, + result: object, + *, + reformat: bool, + highlight: bool, + line_length: int, + paginate: bool, + ) -> Generator[OneStyleAndTextTuple, None, None]: + """ + Format __repr__ for an `eval` result. + + Note: this can raise `KeyboardInterrupt` if either calling `__repr__`, + `__pt_repr__` or formatting the output with "Black" takes to long + and the user presses Control-C. + """ + # If __pt_repr__ is present, take this. This can return prompt_toolkit + # formatted text. + try: + if hasattr(result, "__pt_repr__"): + formatted_result_repr = to_formatted_text( + getattr(result, "__pt_repr__")() + ) + yield from formatted_result_repr + return + except (GeneratorExit, KeyboardInterrupt): + raise # Don't catch here. + except: + # For bad code, `__getattr__` can raise something that's not an + # `AttributeError`. This happens already when calling `hasattr()`. + pass + + # Call `__repr__` of given object first, to turn it in a string. + try: + result_repr = repr(result) + except KeyboardInterrupt: + raise # Don't catch here. + except BaseException as e: + # Calling repr failed. + self.display_exception(e, highlight=highlight, paginate=paginate) + return + + # Determine whether it's valid Python code. If not, + # reformatting/highlighting won't be applied. + if len(result_repr) < MAX_REFORMAT_SIZE: + try: + compile(result_repr, "", "eval") + except SyntaxError: + valid_python = False + else: + valid_python = True + else: + valid_python = False + + if valid_python and reformat: + # Inline import. Slightly speed up start-up time if black is + # not used. + try: + import black + + if not hasattr(black, "Mode"): + raise ImportError + except ImportError: + pass # no Black package in your installation + else: + result_repr = black.format_str( + result_repr, + mode=black.Mode(line_length=line_length), + ) + + if valid_python and highlight: + yield from _lex_python_result(result_repr) + else: + yield ("", result_repr) + + def _insert_out_prompt_and_split_lines( + self, result: Iterable[OneStyleAndTextTuple], out_prompt: AnyFormattedText + ) -> Iterable[StyleAndTextTuples]: + r""" + Split styled result in lines (based on the \n characters in the result) + an insert output prompt on whitespace in front of each line. (This does + not yet do the soft wrapping.) + + Yield lines as a result. + """ + out_prompt = to_formatted_text(out_prompt) + out_prompt_width = fragment_list_width(out_prompt) + prefix = ("", " " * out_prompt_width) + + for i, line in enumerate(split_lines(result)): + if i == 0: + line = [*out_prompt, *line] + else: + line = [prefix, *line] + yield line + + def _apply_soft_wrapping( + self, lines: Iterable[StyleAndTextTuples] + ) -> Iterable[StyleAndTextTuples]: + """ + Apply soft wrapping to the given lines. Wrap according to the terminal + width. Insert whitespace in front of each wrapped line to align it with + the output prompt. + """ + line_length = self.output.get_size().columns + + # Iterate over hard wrapped lines. + for lineno, line in enumerate(lines): + columns_in_buffer = 0 + current_line: list[OneStyleAndTextTuple] = [] + + for style, text, *_ in line: + for c in text: + width = get_cwidth(c) + + # (Soft) wrap line if it doesn't fit. + if columns_in_buffer + width > line_length: + yield current_line + columns_in_buffer = 0 + current_line = [] + + columns_in_buffer += width + current_line.append((style, c)) + + if len(current_line) > 0: + yield current_line + + def _print_paginated_formatted_text( + self, lines: Iterable[StyleAndTextTuples] + ) -> None: + """ + Print formatted text, using --MORE-- style pagination. + (Avoid filling up the terminal's scrollback buffer.) + """ + lines = self._apply_soft_wrapping(lines) + pager_prompt = create_pager_prompt( + self.style, self.title, output=self.output, input=self.input + ) + + abort = False + print_all = False + + # Max number of lines allowed in the buffer before painting. + size = self.output.get_size() + max_rows = size.rows - 1 + + # Page buffer. + page: StyleAndTextTuples = [] + + def show_pager() -> None: + nonlocal abort, max_rows, print_all + + # Run pager prompt in another thread. + # Same as for the input. This prevents issues with nested event + # loops. + pager_result = pager_prompt.prompt(in_thread=True) + + if pager_result == PagerResult.ABORT: + print("...") + abort = True + + elif pager_result == PagerResult.NEXT_LINE: + max_rows = 1 + + elif pager_result == PagerResult.NEXT_PAGE: + max_rows = size.rows - 1 + + elif pager_result == PagerResult.PRINT_ALL: + print_all = True + + # Loop over lines. Show --MORE-- prompt when page is filled. + rows = 0 + + for lineno, line in enumerate(lines): + page.extend(line) + page.append(("", "\n")) + rows += 1 + + if rows >= max_rows: + self._print_formatted_text(page, end="") + page = [] + rows = 0 + + if not print_all: + show_pager() + if abort: + return + + self._print_formatted_text(page) + + def _format_exception_output( + self, e: BaseException, highlight: bool + ) -> Generator[OneStyleAndTextTuple, None, None]: + # Instead of just calling ``traceback.format_exc``, we take the + # traceback and skip the bottom calls of this framework. + t, v, tb = sys.exc_info() + + # Required for pdb.post_mortem() to work. + sys.last_type, sys.last_value, sys.last_traceback = t, v, tb + + tblist = list(traceback.extract_tb(tb)) + + for line_nr, tb_tuple in enumerate(tblist): + if tb_tuple[0] == "": + tblist = tblist[line_nr:] + break + + tb_list = traceback.format_list(tblist) + if tb_list: + tb_list.insert(0, "Traceback (most recent call last):\n") + tb_list.extend(traceback.format_exception_only(t, v)) + + tb_str = "".join(tb_list) + + # Format exception and write to output. + # (We use the default style. Most other styles result + # in unreadable colors for the traceback.) + if highlight: + for index, tokentype, text in PythonTracebackLexer().get_tokens_unprocessed( + tb_str + ): + yield ("class:" + pygments_token_to_classname(tokentype), text) + else: + yield ("", tb_str) + + +class PagerResult(Enum): + ABORT = "ABORT" + NEXT_LINE = "NEXT_LINE" + NEXT_PAGE = "NEXT_PAGE" + PRINT_ALL = "PRINT_ALL" + + +def create_pager_prompt( + style: BaseStyle, + title: AnyFormattedText = "", + input: Input | None = None, + output: Output | None = None, +) -> PromptSession[PagerResult]: + """ + Create a "--MORE--" prompt for paginated output. + """ + bindings = KeyBindings() + + @bindings.add("enter") + @bindings.add("down") + def next_line(event: KeyPressEvent) -> None: + event.app.exit(result=PagerResult.NEXT_LINE) + + @bindings.add("space") + def next_page(event: KeyPressEvent) -> None: + event.app.exit(result=PagerResult.NEXT_PAGE) + + @bindings.add("a") + def print_all(event: KeyPressEvent) -> None: + event.app.exit(result=PagerResult.PRINT_ALL) + + @bindings.add("q") + @bindings.add("c-c") + @bindings.add("c-d") + @bindings.add("escape", eager=True) + def no(event: KeyPressEvent) -> None: + event.app.exit(result=PagerResult.ABORT) + + @bindings.add("") + def _(event: KeyPressEvent) -> None: + "Disallow inserting other text." + pass + + session: PromptSession[PagerResult] = PromptSession( + merge_formatted_text( + [ + title, + HTML( + "" + " -- MORE -- " + "[Enter] Scroll " + "[Space] Next page " + "[a] Print all " + "[q] Quit " + ": " + ), + ] + ), + key_bindings=bindings, + erase_when_done=True, + style=style, + input=input, + output=output, + ) + return session + + +def _lex_python_result(result: str) -> Generator[tuple[str, str], None, None]: + "Return token list for Python string." + lexer = PythonLexer() + # Use `get_tokens_unprocessed`, so that we get exactly the same string, + # without line endings appended. `print_formatted_text` already appends a + # line ending, and otherwise we'll have two line endings. + tokens = lexer.get_tokens_unprocessed(result) + + for index, tokentype, text in tokens: + yield ("class:" + pygments_token_to_classname(tokentype), text) diff --git a/ptpython/prompt_style.py b/ptpython/prompt_style.py new file mode 100644 index 0000000..96b738f --- /dev/null +++ b/ptpython/prompt_style.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from typing import TYPE_CHECKING + +from prompt_toolkit.formatted_text import AnyFormattedText + +if TYPE_CHECKING: + from .python_input import PythonInput + +__all__ = ["PromptStyle", "IPythonPrompt", "ClassicPrompt"] + + +class PromptStyle(metaclass=ABCMeta): + """ + Base class for all prompts. + """ + + @abstractmethod + def in_prompt(self) -> AnyFormattedText: + "Return the input tokens." + return [] + + @abstractmethod + def in2_prompt(self, width: int) -> AnyFormattedText: + """ + Tokens for every following input line. + + :param width: The available width. This is coming from the width taken + by `in_prompt`. + """ + return [] + + @abstractmethod + def out_prompt(self) -> AnyFormattedText: + "Return the output tokens." + return [] + + +class IPythonPrompt(PromptStyle): + """ + A prompt resembling the IPython prompt. + """ + + def __init__(self, python_input: PythonInput) -> None: + self.python_input = python_input + + def in_prompt(self) -> AnyFormattedText: + return [ + ("class:in", "In ["), + ("class:in.number", "%s" % self.python_input.current_statement_index), + ("class:in", "]: "), + ] + + def in2_prompt(self, width: int) -> AnyFormattedText: + return [("class:in", "...: ".rjust(width))] + + def out_prompt(self) -> AnyFormattedText: + return [ + ("class:out", "Out["), + ("class:out.number", "%s" % self.python_input.current_statement_index), + ("class:out", "]:"), + ("", " "), + ] + + +class ClassicPrompt(PromptStyle): + """ + The classic Python prompt. + """ + + def in_prompt(self) -> AnyFormattedText: + return [("class:prompt", ">>> ")] + + def in2_prompt(self, width: int) -> AnyFormattedText: + return [("class:prompt.dots", "...")] + + def out_prompt(self) -> AnyFormattedText: + return [] diff --git a/ptpython/py.typed b/ptpython/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/ptpython/python_input.py b/ptpython/python_input.py new file mode 100644 index 0000000..54ddbef --- /dev/null +++ b/ptpython/python_input.py @@ -0,0 +1,1121 @@ +""" +Application for reading Python input. +This can be used for creation of Python REPLs. +""" +from __future__ import annotations + +from asyncio import get_running_loop +from functools import partial +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Mapping, TypeVar, Union + +from prompt_toolkit.application import Application, get_app +from prompt_toolkit.auto_suggest import ( + AutoSuggestFromHistory, + ConditionalAutoSuggest, + ThreadedAutoSuggest, +) +from prompt_toolkit.buffer import Buffer +from prompt_toolkit.completion import ( + Completer, + ConditionalCompleter, + DynamicCompleter, + FuzzyCompleter, + ThreadedCompleter, + merge_completers, +) +from prompt_toolkit.cursor_shapes import ( + AnyCursorShapeConfig, + CursorShape, + DynamicCursorShapeConfig, + ModalCursorShapeConfig, +) +from prompt_toolkit.document import Document +from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode +from prompt_toolkit.filters import Condition, FilterOrBool +from prompt_toolkit.formatted_text import AnyFormattedText +from prompt_toolkit.history import ( + FileHistory, + History, + InMemoryHistory, + ThreadedHistory, +) +from prompt_toolkit.input import Input +from prompt_toolkit.key_binding import ( + ConditionalKeyBindings, + KeyBindings, + merge_key_bindings, +) +from prompt_toolkit.key_binding.bindings.auto_suggest import load_auto_suggest_bindings +from prompt_toolkit.key_binding.bindings.open_in_editor import ( + load_open_in_editor_bindings, +) +from prompt_toolkit.key_binding.key_bindings import Binding, KeyHandlerCallable +from prompt_toolkit.key_binding.key_processor import KeyPressEvent +from prompt_toolkit.key_binding.vi_state import InputMode +from prompt_toolkit.keys import Keys +from prompt_toolkit.layout.containers import AnyContainer +from prompt_toolkit.layout.dimension import AnyDimension +from prompt_toolkit.layout.processors import Processor +from prompt_toolkit.lexers import DynamicLexer, Lexer, SimpleLexer +from prompt_toolkit.output import ColorDepth, Output +from prompt_toolkit.styles import ( + AdjustBrightnessStyleTransformation, + BaseStyle, + ConditionalStyleTransformation, + DynamicStyle, + SwapLightAndDarkStyleTransformation, + merge_style_transformations, +) +from prompt_toolkit.utils import is_windows +from prompt_toolkit.validation import ConditionalValidator, Validator + +from .completer import CompletePrivateAttributes, HidePrivateCompleter, PythonCompleter +from .history_browser import PythonHistory +from .key_bindings import ( + load_confirm_exit_bindings, + load_python_bindings, + load_sidebar_bindings, +) +from .layout import CompletionVisualisation, PtPythonLayout +from .lexer import PtpythonLexer +from .prompt_style import ClassicPrompt, IPythonPrompt, PromptStyle +from .signatures import Signature, get_signatures_using_eval, get_signatures_using_jedi +from .style import generate_style, get_all_code_styles, get_all_ui_styles +from .utils import unindent_code +from .validator import PythonValidator + +# Isort introduces a SyntaxError, if we'd write `import __future__`. +# https://github.com/PyCQA/isort/issues/2100 +__future__ = __import__("__future__") + + +__all__ = ["PythonInput"] + + +if TYPE_CHECKING: + from typing_extensions import Protocol + + class _SupportsLessThan(Protocol): + # Taken from typeshed. _T_lt is used by "sorted", which needs anything + # sortable. + def __lt__(self, __other: Any) -> bool: + ... + + +_T_lt = TypeVar("_T_lt", bound="_SupportsLessThan") +_T_kh = TypeVar("_T_kh", bound=Union[KeyHandlerCallable, Binding]) + + +class OptionCategory(Generic[_T_lt]): + def __init__(self, title: str, options: list[Option[_T_lt]]) -> None: + self.title = title + self.options = options + + +class Option(Generic[_T_lt]): + """ + Ptpython configuration option that can be shown and modified from the + sidebar. + + :param title: Text. + :param description: Text. + :param get_values: Callable that returns a dictionary mapping the + possible values to callbacks that activate these value. + :param get_current_value: Callable that returns the current, active value. + """ + + def __init__( + self, + title: str, + description: str, + get_current_value: Callable[[], _T_lt], + # We accept `object` as return type for the select functions, because + # often they return an unused boolean. Maybe this can be improved. + get_values: Callable[[], Mapping[_T_lt, Callable[[], object]]], + ) -> None: + self.title = title + self.description = description + self.get_current_value = get_current_value + self.get_values = get_values + + @property + def values(self) -> Mapping[_T_lt, Callable[[], object]]: + return self.get_values() + + def activate_next(self, _previous: bool = False) -> None: + """ + Activate next value. + """ + current = self.get_current_value() + options = sorted(self.values.keys()) + + # Get current index. + try: + index = options.index(current) + except ValueError: + index = 0 + + # Go to previous/next index. + if _previous: + index -= 1 + else: + index += 1 + + # Call handler for this option. + next_option = options[index % len(options)] + self.values[next_option]() + + def activate_previous(self) -> None: + """ + Activate previous value. + """ + self.activate_next(_previous=True) + + +COLOR_DEPTHS = { + ColorDepth.DEPTH_1_BIT: "Monochrome", + ColorDepth.DEPTH_4_BIT: "ANSI Colors", + ColorDepth.DEPTH_8_BIT: "256 colors", + ColorDepth.DEPTH_24_BIT: "True color", +} + +_Namespace = Dict[str, Any] +_GetNamespace = Callable[[], _Namespace] + + +class PythonInput: + """ + Prompt for reading Python input. + + :: + + python_input = PythonInput(...) + python_code = python_input.app.run() + + :param create_app: When `False`, don't create and manage a prompt_toolkit + application. The default is `True` and should only be set + to false if PythonInput is being embedded in a separate + prompt_toolkit application. + """ + + def __init__( + self, + get_globals: _GetNamespace | None = None, + get_locals: _GetNamespace | None = None, + history_filename: str | None = None, + vi_mode: bool = False, + color_depth: ColorDepth | None = None, + # Input/output. + input: Input | None = None, + output: Output | None = None, + # For internal use. + extra_key_bindings: KeyBindings | None = None, + create_app: bool = True, + _completer: Completer | None = None, + _validator: Validator | None = None, + _lexer: Lexer | None = None, + _extra_buffer_processors: list[Processor] | None = None, + _extra_layout_body: AnyContainer | None = None, + _extra_toolbars: list[AnyContainer] | None = None, + _input_buffer_height: AnyDimension | None = None, + ) -> None: + self.get_globals: _GetNamespace = get_globals or (lambda: {}) + self.get_locals: _GetNamespace = get_locals or self.get_globals + + self.completer = _completer or PythonCompleter( + self.get_globals, + self.get_locals, + lambda: self.enable_dictionary_completion, + ) + + self._completer = HidePrivateCompleter( + # If fuzzy is enabled, first do fuzzy completion, but always add + # the non-fuzzy completions, if somehow the fuzzy completer didn't + # find them. (Due to the way the cursor position is moved in the + # fuzzy completer, some completions will not always be found by the + # fuzzy completer, but will be found with the normal completer.) + merge_completers( + [ + ConditionalCompleter( + FuzzyCompleter(DynamicCompleter(lambda: self.completer)), + Condition(lambda: self.enable_fuzzy_completion), + ), + DynamicCompleter(lambda: self.completer), + ], + deduplicate=True, + ), + lambda: self.complete_private_attributes, + ) + self._validator = _validator or PythonValidator(self.get_compiler_flags) + self._lexer = PtpythonLexer(_lexer) + + self.history: History + if history_filename: + self.history = ThreadedHistory(FileHistory(history_filename)) + else: + self.history = InMemoryHistory() + + self._input_buffer_height = _input_buffer_height + self._extra_layout_body = _extra_layout_body + self._extra_toolbars = _extra_toolbars or [] + self._extra_buffer_processors = _extra_buffer_processors or [] + + self.extra_key_bindings = extra_key_bindings or KeyBindings() + + # Settings. + self.title: AnyFormattedText = "" + self.show_signature: bool = False + self.show_docstring: bool = False + self.show_meta_enter_message: bool = True + self.completion_visualisation: CompletionVisualisation = ( + CompletionVisualisation.MULTI_COLUMN + ) + self.completion_menu_scroll_offset: int = 1 + + self.show_line_numbers: bool = False + self.show_status_bar: bool = True + self.wrap_lines: bool = True + self.complete_while_typing: bool = True + self.paste_mode: bool = ( + False # When True, don't insert whitespace after newline. + ) + self.confirm_exit: bool = ( + True # Ask for confirmation when Control-D is pressed. + ) + self.accept_input_on_enter: int = 2 # Accept when pressing Enter 'n' times. + # 'None' means that meta-enter is always required. + self.enable_open_in_editor: bool = True + self.enable_system_bindings: bool = True + self.enable_input_validation: bool = True + self.enable_auto_suggest: bool = False + self.enable_mouse_support: bool = False + self.enable_history_search: bool = False # When True, like readline, going + # back in history will filter the + # history on the records starting + # with the current input. + + self.enable_syntax_highlighting: bool = True + self.enable_fuzzy_completion: bool = False + self.enable_dictionary_completion: bool = False # Also eval-based completion. + self.complete_private_attributes: CompletePrivateAttributes = ( + CompletePrivateAttributes.ALWAYS + ) + self.swap_light_and_dark: bool = False + self.highlight_matching_parenthesis: bool = False + self.show_sidebar: bool = False # Currently show the sidebar. + + # Pager. + self.enable_output_formatting: bool = False + self.enable_pager: bool = False + + # When the sidebar is visible, also show the help text. + self.show_sidebar_help: bool = True + + # Currently show 'Do you really want to exit?' + self.show_exit_confirmation: bool = False + + # The title to be displayed in the terminal. (None or string.) + self.terminal_title: str | None = None + + self.exit_message: str = "Do you really want to exit?" + self.insert_blank_line_after_output: bool = True # (For the REPL.) + self.insert_blank_line_after_input: bool = False # (For the REPL.) + + # The buffers. + self.default_buffer = self._create_buffer() + self.search_buffer: Buffer = Buffer() + self.docstring_buffer: Buffer = Buffer(read_only=True) + + # Cursor shapes. + self.cursor_shape_config = "Block" + self.all_cursor_shape_configs: dict[str, AnyCursorShapeConfig] = { + "Block": CursorShape.BLOCK, + "Underline": CursorShape.UNDERLINE, + "Beam": CursorShape.BEAM, + "Modal (vi)": ModalCursorShapeConfig(), + "Blink block": CursorShape.BLINKING_BLOCK, + "Blink under": CursorShape.BLINKING_UNDERLINE, + "Blink beam": CursorShape.BLINKING_BEAM, + } + + # Tokens to be shown at the prompt. + self.prompt_style: str = "classic" # The currently active style. + + # Styles selectable from the menu. + self.all_prompt_styles: dict[str, PromptStyle] = { + "ipython": IPythonPrompt(self), + "classic": ClassicPrompt(), + } + + self.get_input_prompt = lambda: self.all_prompt_styles[ + self.prompt_style + ].in_prompt() + + self.get_output_prompt = lambda: self.all_prompt_styles[ + self.prompt_style + ].out_prompt() + + #: Load styles. + self.code_styles: dict[str, BaseStyle] = get_all_code_styles() + self.ui_styles = get_all_ui_styles() + self._current_code_style_name: str = "default" + self._current_ui_style_name: str = "default" + + if is_windows(): + self._current_code_style_name = "win32" + + self._current_style = self._generate_style() + self.color_depth: ColorDepth = color_depth or ColorDepth.default() + + self.max_brightness: float = 1.0 + self.min_brightness: float = 0.0 + + # Options to be configurable from the sidebar. + self.options = self._create_options() + self.selected_option_index: int = 0 + + #: Incrementing integer counting the current statement. + self.current_statement_index: int = 1 + + # Code signatures. (This is set asynchronously after a timeout.) + self.signatures: list[Signature] = [] + + # Boolean indicating whether we have a signatures thread running. + # (Never run more than one at the same time.) + self._get_signatures_thread_running: bool = False + + # Get into Vi navigation mode at startup + self.vi_start_in_navigation_mode: bool = False + + # Preserve last used Vi input mode between main loop iterations + self.vi_keep_last_used_mode: bool = False + + self.style_transformation = merge_style_transformations( + [ + ConditionalStyleTransformation( + SwapLightAndDarkStyleTransformation(), + filter=Condition(lambda: self.swap_light_and_dark), + ), + AdjustBrightnessStyleTransformation( + lambda: self.min_brightness, lambda: self.max_brightness + ), + ] + ) + self.ptpython_layout = PtPythonLayout( + self, + lexer=DynamicLexer( + lambda: self._lexer + if self.enable_syntax_highlighting + else SimpleLexer() + ), + input_buffer_height=self._input_buffer_height, + extra_buffer_processors=self._extra_buffer_processors, + extra_body=self._extra_layout_body, + extra_toolbars=self._extra_toolbars, + ) + + # Create an app if requested. If not, the global get_app() is returned + # for self.app via property getter. + if create_app: + self._app: Application[str] | None = self._create_application(input, output) + # Setting vi_mode will not work unless the prompt_toolkit + # application has been created. + if vi_mode: + self.app.editing_mode = EditingMode.VI + else: + self._app = None + + def _accept_handler(self, buff: Buffer) -> bool: + app = get_app() + app.exit(result=buff.text) + app.pre_run_callables.append(buff.reset) + return True # Keep text, we call 'reset' later on. + + @property + def option_count(self) -> int: + "Return the total amount of options. (In all categories together.)" + return sum(len(category.options) for category in self.options) + + @property + def selected_option(self) -> Option[Any]: + "Return the currently selected option." + i = 0 + for category in self.options: + for o in category.options: + if i == self.selected_option_index: + return o + else: + i += 1 + + raise ValueError("Nothing selected") + + def get_compiler_flags(self) -> int: + """ + Give the current compiler flags by looking for _Feature instances + in the globals. + """ + flags = 0 + + for value in self.get_globals().values(): + try: + if isinstance(value, __future__._Feature): + f = value.compiler_flag + flags |= f + except BaseException: + # get_compiler_flags should never raise to not run into an + # `Unhandled exception in event loop` + + # See: https://github.com/prompt-toolkit/ptpython/issues/351 + # An exception can be raised when some objects in the globals + # raise an exception in a custom `__getattribute__`. + pass + + return flags + + def add_key_binding( + self, + *keys: Keys | str, + filter: FilterOrBool = True, + eager: FilterOrBool = False, + is_global: FilterOrBool = False, + save_before: Callable[[KeyPressEvent], bool] = (lambda e: True), + record_in_macro: FilterOrBool = True, + ) -> Callable[[_T_kh], _T_kh]: + """ + Shortcut for adding new key bindings. + (Mostly useful for a config.py file, that receives + a PythonInput/Repl instance as input.) + + All arguments are identical to prompt_toolkit's `KeyBindings.add`. + + :: + + @python_input.add_key_binding(Keys.ControlX, filter=...) + def handler(event): + ... + """ + return self.extra_key_bindings.add( + *keys, + filter=filter, + eager=eager, + is_global=is_global, + save_before=save_before, + record_in_macro=record_in_macro, + ) + + def install_code_colorscheme(self, name: str, style: BaseStyle) -> None: + """ + Install a new code color scheme. + """ + self.code_styles[name] = style + + def use_code_colorscheme(self, name: str) -> None: + """ + Apply new colorscheme. (By name.) + """ + assert name in self.code_styles + + self._current_code_style_name = name + self._current_style = self._generate_style() + + def install_ui_colorscheme(self, name: str, style: BaseStyle) -> None: + """ + Install a new UI color scheme. + """ + self.ui_styles[name] = style + + def use_ui_colorscheme(self, name: str) -> None: + """ + Apply new colorscheme. (By name.) + """ + assert name in self.ui_styles + + self._current_ui_style_name = name + self._current_style = self._generate_style() + + def _use_color_depth(self, depth: ColorDepth) -> None: + self.color_depth = depth + + def _set_min_brightness(self, value: float) -> None: + self.min_brightness = value + self.max_brightness = max(self.max_brightness, value) + + def _set_max_brightness(self, value: float) -> None: + self.max_brightness = value + self.min_brightness = min(self.min_brightness, value) + + def _generate_style(self) -> BaseStyle: + """ + Create new Style instance. + (We don't want to do this on every key press, because each time the + renderer receives a new style class, he will redraw everything.) + """ + return generate_style( + self.code_styles[self._current_code_style_name], + self.ui_styles[self._current_ui_style_name], + ) + + def _create_options(self) -> list[OptionCategory[Any]]: + """ + Create a list of `Option` instances for the options sidebar. + """ + + def enable(attribute: str, value: Any = True) -> bool: + setattr(self, attribute, value) + + # Return `True`, to be able to chain this in the lambdas below. + return True + + def disable(attribute: str) -> bool: + setattr(self, attribute, False) + return True + + def simple_option( + title: str, + description: str, + field_name: str, + values: tuple[str, str] = ("off", "on"), + ) -> Option[str]: + "Create Simple on/of option." + + def get_current_value() -> str: + return values[bool(getattr(self, field_name))] + + def get_values() -> dict[str, Callable[[], bool]]: + return { + values[1]: lambda: enable(field_name), + values[0]: lambda: disable(field_name), + } + + return Option( + title=title, + description=description, + get_values=get_values, + get_current_value=get_current_value, + ) + + brightness_values = [1.0 / 20 * value for value in range(0, 21)] + + return [ + OptionCategory( + "Input", + [ + Option( + title="Editing mode", + description="Vi or emacs key bindings.", + get_current_value=lambda: ["Emacs", "Vi"][self.vi_mode], + get_values=lambda: { + "Emacs": lambda: disable("vi_mode"), + "Vi": lambda: enable("vi_mode"), + }, + ), + Option( + title="Cursor shape", + description="Change the cursor style, possibly according " + "to the Vi input mode.", + get_current_value=lambda: self.cursor_shape_config, + get_values=lambda: { + s: partial(enable, "cursor_shape_config", s) + for s in self.all_cursor_shape_configs + }, + ), + simple_option( + title="Paste mode", + description="When enabled, don't indent automatically.", + field_name="paste_mode", + ), + Option( + title="Complete while typing", + description="Generate autocompletions automatically while typing. " + 'Don\'t require pressing TAB. (Not compatible with "History search".)', + get_current_value=lambda: ["off", "on"][ + self.complete_while_typing + ], + get_values=lambda: { + "on": lambda: enable("complete_while_typing") + and disable("enable_history_search"), + "off": lambda: disable("complete_while_typing"), + }, + ), + Option( + title="Complete private attrs", + description="Show or hide private attributes in the completions. " + "'If no public' means: show private attributes only if no public " + "matches are found or if an underscore was typed.", + get_current_value=lambda: { + CompletePrivateAttributes.NEVER: "Never", + CompletePrivateAttributes.ALWAYS: "Always", + CompletePrivateAttributes.IF_NO_PUBLIC: "If no public", + }[self.complete_private_attributes], + get_values=lambda: { + "Never": lambda: enable( + "complete_private_attributes", + CompletePrivateAttributes.NEVER, + ), + "Always": lambda: enable( + "complete_private_attributes", + CompletePrivateAttributes.ALWAYS, + ), + "If no public": lambda: enable( + "complete_private_attributes", + CompletePrivateAttributes.IF_NO_PUBLIC, + ), + }, + ), + Option( + title="Enable fuzzy completion", + description="Enable fuzzy completion.", + get_current_value=lambda: ["off", "on"][ + self.enable_fuzzy_completion + ], + get_values=lambda: { + "on": lambda: enable("enable_fuzzy_completion"), + "off": lambda: disable("enable_fuzzy_completion"), + }, + ), + Option( + title="Dictionary completion", + description="Enable experimental dictionary/list completion.\n" + 'WARNING: this does "eval" on fragments of\n' + " your Python input and is\n" + " potentially unsafe.", + get_current_value=lambda: ["off", "on"][ + self.enable_dictionary_completion + ], + get_values=lambda: { + "on": lambda: enable("enable_dictionary_completion"), + "off": lambda: disable("enable_dictionary_completion"), + }, + ), + Option( + title="History search", + description="When pressing the up-arrow, filter the history on input starting " + 'with the current text. (Not compatible with "Complete while typing".)', + get_current_value=lambda: ["off", "on"][ + self.enable_history_search + ], + get_values=lambda: { + "on": lambda: enable("enable_history_search") + and disable("complete_while_typing"), + "off": lambda: disable("enable_history_search"), + }, + ), + simple_option( + title="Mouse support", + description="Respond to mouse clicks and scrolling for positioning the cursor, " + "selecting text and scrolling through windows.", + field_name="enable_mouse_support", + ), + simple_option( + title="Confirm on exit", + description="Require confirmation when exiting.", + field_name="confirm_exit", + ), + simple_option( + title="Input validation", + description="In case of syntax errors, move the cursor to the error " + "instead of showing a traceback of a SyntaxError.", + field_name="enable_input_validation", + ), + simple_option( + title="Auto suggestion", + description="Auto suggest inputs by looking at the history. " + "Pressing right arrow or Ctrl-E will complete the entry.", + field_name="enable_auto_suggest", + ), + Option( + title="Accept input on enter", + description="Amount of ENTER presses required to execute input when the cursor " + "is at the end of the input. (Note that META+ENTER will always execute.)", + get_current_value=lambda: str( + self.accept_input_on_enter or "meta-enter" + ), + get_values=lambda: { + "2": lambda: enable("accept_input_on_enter", 2), + "3": lambda: enable("accept_input_on_enter", 3), + "4": lambda: enable("accept_input_on_enter", 4), + "meta-enter": lambda: enable("accept_input_on_enter", None), + }, + ), + ], + ), + OptionCategory( + "Display", + [ + Option( + title="Completions", + description="Visualisation to use for displaying the completions. (Multiple columns, one column, a toolbar or nothing.)", + get_current_value=lambda: self.completion_visualisation.value, + get_values=lambda: { + CompletionVisualisation.NONE.value: lambda: enable( + "completion_visualisation", CompletionVisualisation.NONE + ), + CompletionVisualisation.POP_UP.value: lambda: enable( + "completion_visualisation", + CompletionVisualisation.POP_UP, + ), + CompletionVisualisation.MULTI_COLUMN.value: lambda: enable( + "completion_visualisation", + CompletionVisualisation.MULTI_COLUMN, + ), + CompletionVisualisation.TOOLBAR.value: lambda: enable( + "completion_visualisation", + CompletionVisualisation.TOOLBAR, + ), + }, + ), + Option( + title="Prompt", + description="Visualisation of the prompt. ('>>>' or 'In [1]:')", + get_current_value=lambda: self.prompt_style, + get_values=lambda: { + s: partial(enable, "prompt_style", s) + for s in self.all_prompt_styles + }, + ), + simple_option( + title="Blank line after input", + description="Insert a blank line after the input.", + field_name="insert_blank_line_after_input", + ), + simple_option( + title="Blank line after output", + description="Insert a blank line after the output.", + field_name="insert_blank_line_after_output", + ), + simple_option( + title="Show signature", + description="Display function signatures.", + field_name="show_signature", + ), + simple_option( + title="Show docstring", + description="Display function docstrings.", + field_name="show_docstring", + ), + simple_option( + title="Show line numbers", + description="Show line numbers when the input consists of multiple lines.", + field_name="show_line_numbers", + ), + simple_option( + title="Show Meta+Enter message", + description="Show the [Meta+Enter] message when this key combination is required to execute commands. " + + "(This is the case when a simple [Enter] key press will insert a newline.", + field_name="show_meta_enter_message", + ), + simple_option( + title="Wrap lines", + description="Wrap lines instead of scrolling horizontally.", + field_name="wrap_lines", + ), + simple_option( + title="Show status bar", + description="Show the status bar at the bottom of the terminal.", + field_name="show_status_bar", + ), + simple_option( + title="Show sidebar help", + description="When the sidebar is visible, also show this help text.", + field_name="show_sidebar_help", + ), + simple_option( + title="Highlight parenthesis", + description="Highlight matching parenthesis, when the cursor is on or right after one.", + field_name="highlight_matching_parenthesis", + ), + simple_option( + title="Reformat output (black)", + description="Reformat outputs using Black, if possible (experimental).", + field_name="enable_output_formatting", + ), + simple_option( + title="Enable pager for output", + description="Use a pager for displaying outputs that don't " + "fit on the screen.", + field_name="enable_pager", + ), + ], + ), + OptionCategory( + "Colors", + [ + simple_option( + title="Syntax highlighting", + description="Use colors for syntax highlighting", + field_name="enable_syntax_highlighting", + ), + simple_option( + title="Swap light/dark colors", + description="Swap light and dark colors.", + field_name="swap_light_and_dark", + ), + Option( + title="Code", + description="Color scheme to use for the Python code.", + get_current_value=lambda: self._current_code_style_name, + get_values=lambda: { + name: partial(self.use_code_colorscheme, name) + for name in self.code_styles + }, + ), + Option( + title="User interface", + description="Color scheme to use for the user interface.", + get_current_value=lambda: self._current_ui_style_name, + get_values=lambda: { + name: partial(self.use_ui_colorscheme, name) + for name in self.ui_styles + }, + ), + Option( + title="Color depth", + description="Monochrome (1 bit), 16 ANSI colors (4 bit),\n256 colors (8 bit), or 24 bit.", + get_current_value=lambda: COLOR_DEPTHS[self.color_depth], + get_values=lambda: { + name: partial(self._use_color_depth, depth) + for depth, name in COLOR_DEPTHS.items() + }, + ), + Option( + title="Min brightness", + description="Minimum brightness for the color scheme (default=0.0).", + get_current_value=lambda: "%.2f" % self.min_brightness, + get_values=lambda: { + "%.2f" % value: partial(self._set_min_brightness, value) + for value in brightness_values + }, + ), + Option( + title="Max brightness", + description="Maximum brightness for the color scheme (default=1.0).", + get_current_value=lambda: "%.2f" % self.max_brightness, + get_values=lambda: { + "%.2f" % value: partial(self._set_max_brightness, value) + for value in brightness_values + }, + ), + ], + ), + ] + + def _create_application( + self, input: Input | None, output: Output | None + ) -> Application[str]: + """ + Create an `Application` instance. + """ + return Application( + layout=self.ptpython_layout.layout, + key_bindings=merge_key_bindings( + [ + load_python_bindings(self), + load_auto_suggest_bindings(), + load_sidebar_bindings(self), + load_confirm_exit_bindings(self), + ConditionalKeyBindings( + load_open_in_editor_bindings(), + Condition(lambda: self.enable_open_in_editor), + ), + # Extra key bindings should not be active when the sidebar is visible. + ConditionalKeyBindings( + self.extra_key_bindings, + Condition(lambda: not self.show_sidebar), + ), + ] + ), + color_depth=lambda: self.color_depth, + paste_mode=Condition(lambda: self.paste_mode), + mouse_support=Condition(lambda: self.enable_mouse_support), + style=DynamicStyle(lambda: self._current_style), + style_transformation=self.style_transformation, + include_default_pygments_style=False, + reverse_vi_search_direction=True, + cursor=DynamicCursorShapeConfig( + lambda: self.all_cursor_shape_configs[self.cursor_shape_config] + ), + input=input, + output=output, + ) + + def _create_buffer(self) -> Buffer: + """ + Create the `Buffer` for the Python input. + """ + python_buffer = Buffer( + name=DEFAULT_BUFFER, + complete_while_typing=Condition(lambda: self.complete_while_typing), + enable_history_search=Condition(lambda: self.enable_history_search), + tempfile_suffix=".py", + history=self.history, + completer=ThreadedCompleter(self._completer), + validator=ConditionalValidator( + self._validator, Condition(lambda: self.enable_input_validation) + ), + auto_suggest=ConditionalAutoSuggest( + ThreadedAutoSuggest(AutoSuggestFromHistory()), + Condition(lambda: self.enable_auto_suggest), + ), + accept_handler=self._accept_handler, + on_text_changed=self._on_input_timeout, + ) + + return python_buffer + + @property + def editing_mode(self) -> EditingMode: + return self.app.editing_mode + + @editing_mode.setter + def editing_mode(self, value: EditingMode) -> None: + self.app.editing_mode = value + + @property + def vi_mode(self) -> bool: + return self.editing_mode == EditingMode.VI + + @vi_mode.setter + def vi_mode(self, value: bool) -> None: + if value: + self.editing_mode = EditingMode.VI + else: + self.editing_mode = EditingMode.EMACS + + @property + def app(self) -> Application[str]: + if self._app is None: + return get_app() + return self._app + + def _on_input_timeout(self, buff: Buffer) -> None: + """ + When there is no input activity, + in another thread, get the signature of the current code. + """ + + def get_signatures_in_executor(document: Document) -> list[Signature]: + # First, get signatures from Jedi. If we didn't found any and if + # "dictionary completion" (eval-based completion) is enabled, then + # get signatures using eval. + signatures = get_signatures_using_jedi( + document, self.get_locals(), self.get_globals() + ) + if not signatures and self.enable_dictionary_completion: + signatures = get_signatures_using_eval( + document, self.get_locals(), self.get_globals() + ) + + return signatures + + app = self.app + + async def on_timeout_task() -> None: + loop = get_running_loop() + + # Never run multiple get-signature threads. + if self._get_signatures_thread_running: + return + self._get_signatures_thread_running = True + + try: + while True: + document = buff.document + signatures = await loop.run_in_executor( + None, get_signatures_in_executor, document + ) + + # If the text didn't change in the meantime, take these + # signatures. Otherwise, try again. + if buff.text == document.text: + break + finally: + self._get_signatures_thread_running = False + + # Set signatures and redraw. + self.signatures = signatures + + # Set docstring in docstring buffer. + if signatures: + self.docstring_buffer.reset( + document=Document(signatures[0].docstring, cursor_position=0) + ) + else: + self.docstring_buffer.reset() + + app.invalidate() + + if app.is_running: + app.create_background_task(on_timeout_task()) + + def on_reset(self) -> None: + self.signatures = [] + + def enter_history(self) -> None: + """ + Display the history. + """ + app = self.app + app.vi_state.input_mode = InputMode.NAVIGATION + + history = PythonHistory(self, self.default_buffer.document) + + import asyncio + + from prompt_toolkit.application import in_terminal + + async def do_in_terminal() -> None: + async with in_terminal(): + result = await history.app.run_async() + if result is not None: + self.default_buffer.text = result + + app.vi_state.input_mode = InputMode.INSERT + + asyncio.ensure_future(do_in_terminal()) + + def read(self) -> str: + """ + Read the input. + + This will run the Python input user interface in another thread, wait + for input to be accepted and return that. By running the UI in another + thread, we avoid issues regarding possibly nested event loops. + + This can raise EOFError, when Control-D is pressed. + """ + + # Capture the current input_mode in order to restore it after reset, + # for ViState.reset() sets it to InputMode.INSERT unconditionally and + # doesn't accept any arguments. + def pre_run( + last_input_mode: InputMode = self.app.vi_state.input_mode, + ) -> None: + if self.vi_keep_last_used_mode: + self.app.vi_state.input_mode = last_input_mode + + if not self.vi_keep_last_used_mode and self.vi_start_in_navigation_mode: + self.app.vi_state.input_mode = InputMode.NAVIGATION + + # Run the UI. + while True: + try: + result = self.app.run(pre_run=pre_run, in_thread=True) + + if result.lstrip().startswith("\x1a"): + # When the input starts with Ctrl-Z, quit the REPL. + # (Important for Windows users.) + raise EOFError + + # Remove leading whitespace. + # (Users can add extra indentation, which happens for + # instance because of copy/pasting code.) + result = unindent_code(result) + + if result and not result.isspace(): + if self.insert_blank_line_after_input: + self.app.output.write("\n") + + return result + except KeyboardInterrupt: + # Abort - try again. + self.default_buffer.document = Document() diff --git a/ptpython/repl.py b/ptpython/repl.py new file mode 100644 index 0000000..fc9b9da --- /dev/null +++ b/ptpython/repl.py @@ -0,0 +1,531 @@ +""" +Utility for creating a Python repl. + +:: + + from ptpython.repl import embed + embed(globals(), locals(), vi_mode=False) + +""" +from __future__ import annotations + +import asyncio +import builtins +import os +import signal +import sys +import traceback +import types +import warnings +from dis import COMPILER_FLAG_NAMES +from typing import Any, Callable, ContextManager, Iterable + +from prompt_toolkit.formatted_text import OneStyleAndTextTuple +from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context +from prompt_toolkit.shortcuts import ( + clear_title, + set_title, +) +from prompt_toolkit.utils import DummyContext +from pygments.lexers import PythonTracebackLexer # noqa: F401 + +from .printer import OutputPrinter +from .python_input import PythonInput + +PyCF_ALLOW_TOP_LEVEL_AWAIT: int +try: + from ast import PyCF_ALLOW_TOP_LEVEL_AWAIT # type: ignore +except ImportError: + PyCF_ALLOW_TOP_LEVEL_AWAIT = 0 + +__all__ = ["PythonRepl", "enable_deprecation_warnings", "run_config", "embed"] + + +def _get_coroutine_flag() -> int | None: + for k, v in COMPILER_FLAG_NAMES.items(): + if v == "COROUTINE": + return k + + # Flag not found. + return None + + +COROUTINE_FLAG: int | None = _get_coroutine_flag() + + +def _has_coroutine_flag(code: types.CodeType) -> bool: + if COROUTINE_FLAG is None: + # Not supported on this Python version. + return False + + return bool(code.co_flags & COROUTINE_FLAG) + + +class PythonRepl(PythonInput): + def __init__(self, *a, **kw) -> None: + self._startup_paths = kw.pop("startup_paths", None) + super().__init__(*a, **kw) + self._load_start_paths() + + def _load_start_paths(self) -> None: + "Start the Read-Eval-Print Loop." + if self._startup_paths: + for path in self._startup_paths: + if os.path.exists(path): + with open(path, "rb") as f: + code = compile(f.read(), path, "exec") + exec(code, self.get_globals(), self.get_locals()) + else: + output = self.app.output + output.write(f"WARNING | File not found: {path}\n\n") + + def run_and_show_expression(self, expression: str) -> None: + try: + # Eval. + try: + result = self.eval(expression) + except KeyboardInterrupt: + # KeyboardInterrupt doesn't inherit from Exception. + raise + except SystemExit: + raise + except BaseException as e: + self._handle_exception(e) + else: + # Print. + if result is not None: + self._show_result(result) + if self.insert_blank_line_after_output: + self.app.output.write("\n") + + # Loop. + self.current_statement_index += 1 + self.signatures = [] + + except KeyboardInterrupt as e: + # Handle all possible `KeyboardInterrupt` errors. This can + # happen during the `eval`, but also during the + # `show_result` if something takes too long. + # (Try/catch is around the whole block, because we want to + # prevent that a Control-C keypress terminates the REPL in + # any case.) + self._handle_keyboard_interrupt(e) + + def _get_output_printer(self) -> OutputPrinter: + return OutputPrinter( + output=self.app.output, + input=self.app.input, + style=self._current_style, + style_transformation=self.style_transformation, + title=self.title, + ) + + def _show_result(self, result: object) -> None: + self._get_output_printer().display_result( + result=result, + out_prompt=self.get_output_prompt(), + reformat=self.enable_output_formatting, + highlight=self.enable_syntax_highlighting, + paginate=self.enable_pager, + ) + + def run(self) -> None: + """ + Run the REPL loop. + """ + if self.terminal_title: + set_title(self.terminal_title) + + self._add_to_namespace() + + try: + while True: + # Pull text from the user. + try: + text = self.read() + except EOFError: + return + except BaseException: + # Something went wrong while reading input. + # (E.g., a bug in the completer that propagates. Don't + # crash the REPL.) + traceback.print_exc() + continue + + # Run it; display the result (or errors if applicable). + self.run_and_show_expression(text) + finally: + if self.terminal_title: + clear_title() + self._remove_from_namespace() + + async def run_and_show_expression_async(self, text: str) -> Any: + loop = asyncio.get_running_loop() + system_exit: SystemExit | None = None + + try: + try: + # Create `eval` task. Ensure that control-c will cancel this + # task. + async def eval() -> Any: + nonlocal system_exit + try: + return await self.eval_async(text) + except SystemExit as e: + # Don't propagate SystemExit in `create_task()`. That + # will kill the event loop. We want to handle it + # gracefully. + system_exit = e + + task = asyncio.create_task(eval()) + loop.add_signal_handler(signal.SIGINT, lambda *_: task.cancel()) + result = await task + + if system_exit is not None: + raise system_exit + except KeyboardInterrupt: + # KeyboardInterrupt doesn't inherit from Exception. + raise + except SystemExit: + raise + except BaseException as e: + self._handle_exception(e) + else: + # Print. + if result is not None: + await loop.run_in_executor(None, lambda: self._show_result(result)) + + # Loop. + self.current_statement_index += 1 + self.signatures = [] + # Return the result for future consumers. + return result + finally: + loop.remove_signal_handler(signal.SIGINT) + + except KeyboardInterrupt as e: + # Handle all possible `KeyboardInterrupt` errors. This can + # happen during the `eval`, but also during the + # `show_result` if something takes too long. + # (Try/catch is around the whole block, because we want to + # prevent that a Control-C keypress terminates the REPL in + # any case.) + self._handle_keyboard_interrupt(e) + + async def run_async(self) -> None: + """ + Run the REPL loop, but run the blocking parts in an executor, so that + we don't block the event loop. Both the input and output (which can + display a pager) will run in a separate thread with their own event + loop, this way ptpython's own event loop won't interfere with the + asyncio event loop from where this is called. + + The "eval" however happens in the current thread, which is important. + (Both for control-C to work, as well as for the code to see the right + thread in which it was embedded). + """ + loop = asyncio.get_running_loop() + + if self.terminal_title: + set_title(self.terminal_title) + + self._add_to_namespace() + + try: + while True: + try: + # Read. + try: + text = await loop.run_in_executor(None, self.read) + except EOFError: + return + except BaseException: + # Something went wrong while reading input. + # (E.g., a bug in the completer that propagates. Don't + # crash the REPL.) + traceback.print_exc() + continue + + # Eval. + await self.run_and_show_expression_async(text) + + except KeyboardInterrupt as e: + # XXX: This does not yet work properly. In some situations, + # `KeyboardInterrupt` exceptions can end up in the event + # loop selector. + self._handle_keyboard_interrupt(e) + except SystemExit: + return + finally: + if self.terminal_title: + clear_title() + self._remove_from_namespace() + + def eval(self, line: str) -> object: + """ + Evaluate the line and print the result. + """ + # WORKAROUND: Due to a bug in Jedi, the current directory is removed + # from sys.path. See: https://github.com/davidhalter/jedi/issues/1148 + if "" not in sys.path: + sys.path.insert(0, "") + + if line.lstrip().startswith("!"): + # Run as shell command + os.system(line[1:]) + else: + # Try eval first + try: + code = self._compile_with_flags(line, "eval") + except SyntaxError: + pass + else: + # No syntax errors for eval. Do eval. + result = eval(code, self.get_globals(), self.get_locals()) + + if _has_coroutine_flag(code): + result = asyncio.get_running_loop().run_until_complete(result) + + self._store_eval_result(result) + return result + + # If not a valid `eval` expression, run using `exec` instead. + # Note that we shouldn't run this in the `except SyntaxError` block + # above, then `sys.exc_info()` would not report the right error. + # See issue: https://github.com/prompt-toolkit/ptpython/issues/435 + code = self._compile_with_flags(line, "exec") + result = eval(code, self.get_globals(), self.get_locals()) + + if _has_coroutine_flag(code): + result = asyncio.get_running_loop().run_until_complete(result) + + return None + + async def eval_async(self, line: str) -> object: + """ + Evaluate the line and print the result. + """ + # WORKAROUND: Due to a bug in Jedi, the current directory is removed + # from sys.path. See: https://github.com/davidhalter/jedi/issues/1148 + if "" not in sys.path: + sys.path.insert(0, "") + + if line.lstrip().startswith("!"): + # Run as shell command + os.system(line[1:]) + else: + # Try eval first + try: + code = self._compile_with_flags(line, "eval") + except SyntaxError: + pass + else: + # No syntax errors for eval. Do eval. + result = eval(code, self.get_globals(), self.get_locals()) + + if _has_coroutine_flag(code): + result = await result + + self._store_eval_result(result) + return result + + # If not a valid `eval` expression, compile as `exec` expression + # but still run with eval to get an awaitable in case of a + # awaitable expression. + code = self._compile_with_flags(line, "exec") + result = eval(code, self.get_globals(), self.get_locals()) + + if _has_coroutine_flag(code): + result = await result + + return None + + def _store_eval_result(self, result: object) -> None: + locals: dict[str, Any] = self.get_locals() + locals["_"] = locals["_%i" % self.current_statement_index] = result + + def get_compiler_flags(self) -> int: + return super().get_compiler_flags() | PyCF_ALLOW_TOP_LEVEL_AWAIT + + def _compile_with_flags(self, code: str, mode: str): + "Compile code with the right compiler flags." + return compile( + code, + "", + mode, + flags=self.get_compiler_flags(), + dont_inherit=True, + ) + + def _handle_exception(self, e: BaseException) -> None: + self._get_output_printer().display_exception( + e, + highlight=self.enable_syntax_highlighting, + paginate=self.enable_pager, + ) + + def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None: + output = self.app.output + + output.write("\rKeyboardInterrupt\n\n") + output.flush() + + def _add_to_namespace(self) -> None: + """ + Add ptpython built-ins to global namespace. + """ + globals = self.get_globals() + + # Add a 'get_ptpython', similar to 'get_ipython' + def get_ptpython() -> PythonInput: + return self + + globals["get_ptpython"] = get_ptpython + + def _remove_from_namespace(self) -> None: + """ + Remove added symbols from the globals. + """ + globals = self.get_globals() + del globals["get_ptpython"] + + def print_paginated_formatted_text( + self, + formatted_text: Iterable[OneStyleAndTextTuple], + end: str = "\n", + ) -> None: + # Warning: This is mainly here backwards-compatibility. Some projects + # call `print_paginated_formatted_text` on the Repl object. + self._get_output_printer().display_style_and_text_tuples( + formatted_text, paginate=True + ) + + +def enable_deprecation_warnings() -> None: + """ + Show deprecation warnings, when they are triggered directly by actions in + the REPL. This is recommended to call, before calling `embed`. + + e.g. This will show an error message when the user imports the 'sha' + library on Python 2.7. + """ + warnings.filterwarnings("default", category=DeprecationWarning, module="__main__") + + +DEFAULT_CONFIG_FILE = "~/.config/ptpython/config.py" + + +def run_config(repl: PythonInput, config_file: str | None = None) -> None: + """ + Execute REPL config file. + + :param repl: `PythonInput` instance. + :param config_file: Path of the configuration file. + """ + explicit_config_file = config_file is not None + + # Expand tildes. + config_file = os.path.expanduser( + config_file if config_file is not None else DEFAULT_CONFIG_FILE + ) + + def enter_to_continue() -> None: + input("\nPress ENTER to continue...") + + # Check whether this file exists. + if not os.path.exists(config_file): + if explicit_config_file: + print(f"Impossible to read {config_file}") + enter_to_continue() + return + + # Run the config file in an empty namespace. + try: + namespace: dict[str, Any] = {} + + with open(config_file, "rb") as f: + code = compile(f.read(), config_file, "exec") + exec(code, namespace, namespace) + + # Now we should have a 'configure' method in this namespace. We call this + # method with the repl as an argument. + if "configure" in namespace: + namespace["configure"](repl) + + except Exception: + traceback.print_exc() + enter_to_continue() + + +def embed( + globals=None, + locals=None, + configure: Callable[[PythonRepl], None] | None = None, + vi_mode: bool = False, + history_filename: str | None = None, + title: str | None = None, + startup_paths=None, + patch_stdout: bool = False, + return_asyncio_coroutine: bool = False, +) -> None: + """ + Call this to embed Python shell at the current point in your program. + It's similar to `IPython.embed` and `bpython.embed`. :: + + from prompt_toolkit.contrib.repl import embed + embed(globals(), locals()) + + :param vi_mode: Boolean. Use Vi instead of Emacs key bindings. + :param configure: Callable that will be called with the `PythonRepl` as a first + argument, to trigger configuration. + :param title: Title to be displayed in the terminal titlebar. (None or string.) + :param patch_stdout: When true, patch `sys.stdout` so that background + threads that are printing will print nicely above the prompt. + """ + # Default globals/locals + if globals is None: + globals = { + "__name__": "__main__", + "__package__": None, + "__doc__": None, + "__builtins__": builtins, + } + + locals = locals or globals + + def get_globals(): + return globals + + def get_locals(): + return locals + + # Create REPL. + repl = PythonRepl( + get_globals=get_globals, + get_locals=get_locals, + vi_mode=vi_mode, + history_filename=history_filename, + startup_paths=startup_paths, + ) + + if title: + repl.terminal_title = title + + if configure: + configure(repl) + + # Start repl. + patch_context: ContextManager[None] = ( + patch_stdout_context() if patch_stdout else DummyContext() + ) + + if return_asyncio_coroutine: + + async def coroutine() -> None: + with patch_context: + await repl.run_async() + + return coroutine() # type: ignore + else: + with patch_context: + repl.run() diff --git a/ptpython/signatures.py b/ptpython/signatures.py new file mode 100644 index 0000000..d4cb98c --- /dev/null +++ b/ptpython/signatures.py @@ -0,0 +1,270 @@ +""" +Helpers for retrieving the function signature of the function call that we are +editing. + +Either with the Jedi library, or using `inspect.signature` if Jedi fails and we +can use `eval()` to evaluate the function object. +""" +from __future__ import annotations + +import inspect +from inspect import Signature as InspectSignature +from inspect import _ParameterKind as ParameterKind +from typing import TYPE_CHECKING, Any, Sequence + +from prompt_toolkit.document import Document + +from .completer import DictionaryCompleter +from .utils import get_jedi_script_from_document + +if TYPE_CHECKING: + import jedi.api.classes + +__all__ = ["Signature", "get_signatures_using_jedi", "get_signatures_using_eval"] + + +class Parameter: + def __init__( + self, + name: str, + annotation: str | None, + default: str | None, + kind: ParameterKind, + ) -> None: + self.name = name + self.kind = kind + + self.annotation = annotation + self.default = default + + def __repr__(self) -> str: + return f"Parameter(name={self.name!r})" + + @property + def description(self) -> str: + """ + Name + annotation. + """ + description = self.name + + if self.annotation is not None: + description += f": {self.annotation}" + + return description + + +class Signature: + """ + Signature definition used wrap around both Jedi signatures and + python-inspect signatures. + + :param index: Parameter index of the current cursor position. + :param bracket_start: (line, column) tuple for the open bracket that starts + the function call. + """ + + def __init__( + self, + name: str, + docstring: str, + parameters: Sequence[Parameter], + index: int | None = None, + returns: str = "", + bracket_start: tuple[int, int] = (0, 0), + ) -> None: + self.name = name + self.docstring = docstring + self.parameters = parameters + self.index = index + self.returns = returns + self.bracket_start = bracket_start + + @classmethod + def from_inspect_signature( + cls, + name: str, + docstring: str, + signature: InspectSignature, + index: int, + ) -> Signature: + parameters = [] + + def get_annotation_name(annotation: object) -> str: + """ + Get annotation as string from inspect signature. + """ + try: + # In case the annotation is a class like "int", "float", ... + return str(annotation.__name__) # type: ignore + except AttributeError: + pass # No attribute `__name__`, e.g., in case of `List[int]`. + + annotation = str(annotation) + if annotation.startswith("typing."): + annotation = annotation[len("typing:") :] + return annotation + + for p in signature.parameters.values(): + parameters.append( + Parameter( + name=p.name, + annotation=get_annotation_name(p.annotation), + default=repr(p.default) + if p.default is not inspect.Parameter.empty + else None, + kind=p.kind, + ) + ) + + return cls( + name=name, + docstring=docstring, + parameters=parameters, + index=index, + returns="", + ) + + @classmethod + def from_jedi_signature(cls, signature: jedi.api.classes.Signature) -> Signature: + parameters = [] + + for p in signature.params: + if p is None: + # We just hit the "*". + continue + + parameters.append( + Parameter( + name=p.to_string(), # p.name, (`to_string()` already includes the annotation). + annotation=None, # p.infer_annotation() + default=None, # p.infer_default() + kind=p.kind, + ) + ) + + docstring = signature.docstring() + if not isinstance(docstring, str): + docstring = docstring.decode("utf-8") + + return cls( + name=signature.name, + docstring=docstring, + parameters=parameters, + index=signature.index, + returns="", + bracket_start=signature.bracket_start, + ) + + def __repr__(self) -> str: + return f"Signature({self.name!r}, parameters={self.parameters!r})" + + +def get_signatures_using_jedi( + document: Document, locals: dict[str, Any], globals: dict[str, Any] +) -> list[Signature]: + script = get_jedi_script_from_document(document, locals, globals) + + # Show signatures in help text. + if not script: + return [] + + try: + signatures = script.get_signatures() + except ValueError: + # e.g. in case of an invalid \\x escape. + signatures = [] + except Exception: + # Sometimes we still get an exception (TypeError), because + # of probably bugs in jedi. We can silence them. + # See: https://github.com/davidhalter/jedi/issues/492 + signatures = [] + else: + # Try to access the params attribute just once. For Jedi + # signatures containing the keyword-only argument star, + # this will crash when retrieving it the first time with + # AttributeError. Every following time it works. + # See: https://github.com/jonathanslenders/ptpython/issues/47 + # https://github.com/davidhalter/jedi/issues/598 + try: + if signatures: + signatures[0].params + except AttributeError: + pass + + return [Signature.from_jedi_signature(sig) for sig in signatures] + + +def get_signatures_using_eval( + document: Document, locals: dict[str, Any], globals: dict[str, Any] +) -> list[Signature]: + """ + Look for the signature of the function before the cursor position without + use of Jedi. This uses a similar approach as the `DictionaryCompleter` of + running `eval()` over the detected function name. + """ + # Look for open parenthesis, before cursor position. + pos = document.cursor_position - 1 + + paren_mapping = {")": "(", "}": "{", "]": "["} + paren_stack = [ + ")" + ] # Start stack with closing ')'. We are going to look for the matching open ')'. + comma_count = 0 # Number of comma's between start of function call and cursor pos. + found_start = False # Found something. + + while pos >= 0: + char = document.text[pos] + if char in ")]}": + paren_stack.append(char) + elif char in "([{": + if not paren_stack: + # Open paren, while no closing paren was found. Mouse cursor is + # positioned in nested parentheses. Not at the "top-level" of a + # function call. + break + if paren_mapping[paren_stack[-1]] != char: + # Unmatching parentheses: syntax error? + break + + paren_stack.pop() + + if len(paren_stack) == 0: + found_start = True + break + + elif char == "," and len(paren_stack) == 1: + comma_count += 1 + + pos -= 1 + + if not found_start: + return [] + + # We found the start of the function call. Now look for the object before + # this position on which we can do an 'eval' to retrieve the function + # object. + obj = DictionaryCompleter(lambda: globals, lambda: locals).eval_expression( + Document(document.text, cursor_position=pos), locals + ) + if obj is None: + return [] + + try: + name = obj.__name__ # type:ignore + except Exception: + name = obj.__class__.__name__ + + try: + signature = inspect.signature(obj) # type: ignore + except TypeError: + return [] # Not a callable object. + except ValueError: + return [] # No signature found, like for build-ins like "print". + + try: + doc = obj.__doc__ or "" + except: + doc = "" + + # TODO: `index` is not yet correct when dealing with keyword-only arguments. + return [Signature.from_inspect_signature(name, doc, signature, index=comma_count)] diff --git a/ptpython/style.py b/ptpython/style.py new file mode 100644 index 0000000..c5a04e5 --- /dev/null +++ b/ptpython/style.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +from prompt_toolkit.styles import BaseStyle, Style, merge_styles +from prompt_toolkit.styles.pygments import style_from_pygments_cls +from prompt_toolkit.utils import is_conemu_ansi, is_windows, is_windows_vt100_supported +from pygments.styles import get_all_styles, get_style_by_name + +__all__ = ["get_all_code_styles", "get_all_ui_styles", "generate_style"] + + +def get_all_code_styles() -> dict[str, BaseStyle]: + """ + Return a mapping from style names to their classes. + """ + result: dict[str, BaseStyle] = { + name: style_from_pygments_cls(get_style_by_name(name)) + for name in get_all_styles() + } + result["win32"] = Style.from_dict(win32_code_style) + return result + + +def get_all_ui_styles() -> dict[str, BaseStyle]: + """ + Return a dict mapping {ui_style_name -> style_dict}. + """ + return { + "default": Style.from_dict(default_ui_style), + "blue": Style.from_dict(blue_ui_style), + } + + +def generate_style(python_style: BaseStyle, ui_style: BaseStyle) -> BaseStyle: + """ + Generate Pygments Style class from two dictionaries + containing style rules. + """ + return merge_styles([python_style, ui_style]) + + +# Code style for Windows consoles. They support only 16 colors, +# so we choose a combination that displays nicely. +win32_code_style = { + "pygments.comment": "#00ff00", + "pygments.keyword": "#44ff44", + "pygments.number": "", + "pygments.operator": "", + "pygments.string": "#ff44ff", + "pygments.name": "", + "pygments.name.decorator": "#ff4444", + "pygments.name.class": "#ff4444", + "pygments.name.function": "#ff4444", + "pygments.name.builtin": "#ff4444", + "pygments.name.attribute": "", + "pygments.name.constant": "", + "pygments.name.entity": "", + "pygments.name.exception": "", + "pygments.name.label": "", + "pygments.name.namespace": "", + "pygments.name.tag": "", + "pygments.name.variable": "", +} + + +default_ui_style = { + "control-character": "ansiblue", + # Classic prompt. + "prompt": "bold", + "prompt.dots": "noinherit", + # (IPython <5.0) Prompt: "In [1]:" + "in": "bold #008800", + "in.number": "", + # Return value. + "out": "#ff0000", + "out.number": "#ff0000", + # Completions. + "completion.builtin": "", + "completion.param": "#006666 italic", + "completion.keyword": "fg:#008800", + "completion.keyword fuzzymatch.inside": "fg:#008800", + "completion.keyword fuzzymatch.outside": "fg:#44aa44", + # Separator between windows. (Used above docstring.) + "separator": "#bbbbbb", + # System toolbar + "system-toolbar": "#22aaaa noinherit", + # "arg" toolbar. + "arg-toolbar": "#22aaaa noinherit", + "arg-toolbar.text": "noinherit", + # Signature toolbar. + "signature-toolbar": "bg:#44bbbb #000000", + "signature-toolbar current-name": "bg:#008888 #ffffff bold", + "signature-toolbar operator": "#000000 bold", + "docstring": "#888888", + # Validation toolbar. + "validation-toolbar": "bg:#440000 #aaaaaa", + # Status toolbar. + "status-toolbar": "bg:#222222 #aaaaaa", + "status-toolbar.title": "underline", + "status-toolbar.inputmode": "bg:#222222 #ffffaa", + "status-toolbar.key": "bg:#000000 #888888", + "status-toolbar key": "bg:#000000 #888888", + "status-toolbar.pastemodeon": "bg:#aa4444 #ffffff", + "status-toolbar.pythonversion": "bg:#222222 #ffffff bold", + "status-toolbar paste-mode-on": "bg:#aa4444 #ffffff", + "record": "bg:#884444 white", + "status-toolbar more": "#ffff44", + "status-toolbar.input-mode": "#ffff44", + # The options sidebar. + "sidebar": "bg:#bbbbbb #000000", + "sidebar.title": "bg:#668866 #ffffff", + "sidebar.label": "bg:#bbbbbb #222222", + "sidebar.status": "bg:#dddddd #000011", + "sidebar.label selected": "bg:#222222 #eeeeee", + "sidebar.status selected": "bg:#444444 #ffffff bold", + "sidebar.separator": "underline", + "sidebar.key": "bg:#bbddbb #000000 bold", + "sidebar.key.description": "bg:#bbbbbb #000000", + "sidebar.helptext": "bg:#fdf6e3 #000011", + # # Styling for the history layout. + # history.line: '', + # history.line.selected: 'bg:#008800 #000000', + # history.line.current: 'bg:#ffffff #000000', + # history.line.selected.current: 'bg:#88ff88 #000000', + # history.existinginput: '#888888', + # Help Window. + "window-border": "#aaaaaa", + "window-title": "bg:#bbbbbb #000000", + # Meta-enter message. + "accept-message": "bg:#ffff88 #444444", + # Exit confirmation. + "exit-confirmation": "bg:#884444 #ffffff", +} + + +# Some changes to get a bit more contrast on Windows consoles. +# (They only support 16 colors.) +if is_windows() and not is_conemu_ansi() and not is_windows_vt100_supported(): + default_ui_style.update( + { + "sidebar.title": "bg:#00ff00 #ffffff", + "exitconfirmation": "bg:#ff4444 #ffffff", + "toolbar.validation": "bg:#ff4444 #ffffff", + "menu.completions.completion": "bg:#ffffff #000000", + "menu.completions.completion.current": "bg:#aaaaaa #000000", + } + ) + + +blue_ui_style = {} +blue_ui_style.update(default_ui_style) +# blue_ui_style.update({ +# # Line numbers. +# Token.LineNumber: '#aa6666', +# +# # Highlighting of search matches in document. +# Token.SearchMatch: '#ffffff bg:#4444aa', +# Token.SearchMatch.Current: '#ffffff bg:#44aa44', +# +# # Highlighting of select text in document. +# Token.SelectedText: '#ffffff bg:#6666aa', +# +# # Completer toolbar. +# Token.Toolbar.Completions: 'bg:#44bbbb #000000', +# Token.Toolbar.Completions.Arrow: 'bg:#44bbbb #000000 bold', +# Token.Toolbar.Completions.Completion: 'bg:#44bbbb #000000', +# Token.Toolbar.Completions.Completion.Current: 'bg:#008888 #ffffff', +# +# # Completer menu. +# Token.Menu.Completions.Completion: 'bg:#44bbbb #000000', +# Token.Menu.Completions.Completion.Current: 'bg:#008888 #ffffff', +# Token.Menu.Completions.Meta: 'bg:#449999 #000000', +# Token.Menu.Completions.Meta.Current: 'bg:#00aaaa #000000', +# Token.Menu.Completions.ProgressBar: 'bg:#aaaaaa', +# Token.Menu.Completions.ProgressButton: 'bg:#000000', +# }) diff --git a/ptpython/utils.py b/ptpython/utils.py new file mode 100644 index 0000000..28887d2 --- /dev/null +++ b/ptpython/utils.py @@ -0,0 +1,212 @@ +""" +For internal use only. +""" +from __future__ import annotations + +import re +from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast + +from prompt_toolkit.document import Document +from prompt_toolkit.formatted_text import to_formatted_text +from prompt_toolkit.formatted_text.utils import fragment_list_to_text +from prompt_toolkit.mouse_events import MouseEvent, MouseEventType + +if TYPE_CHECKING: + from jedi import Interpreter + + # See: prompt_toolkit/key_binding/key_bindings.py + # Annotating these return types as `object` is what works best, because + # `NotImplemented` is typed `Any`. + NotImplementedOrNone = object + +__all__ = [ + "has_unclosed_brackets", + "get_jedi_script_from_document", + "document_is_multiline_python", + "unindent_code", +] + + +def has_unclosed_brackets(text: str) -> bool: + """ + Starting at the end of the string. If we find an opening bracket + for which we didn't had a closing one yet, return True. + """ + stack = [] + + # Ignore braces inside strings + text = re.sub(r"""('[^']*'|"[^"]*")""", "", text) # XXX: handle escaped quotes.! + + for c in reversed(text): + if c in "])}": + stack.append(c) + + elif c in "[({": + if stack: + if ( + (c == "[" and stack[-1] == "]") + or (c == "{" and stack[-1] == "}") + or (c == "(" and stack[-1] == ")") + ): + stack.pop() + else: + # Opening bracket for which we didn't had a closing one. + return True + + return False + + +def get_jedi_script_from_document( + document: Document, locals: dict[str, Any], globals: dict[str, Any] +) -> Interpreter: + import jedi # We keep this import in-line, to improve start-up time. + + # Importing Jedi is 'slow'. + + try: + return jedi.Interpreter( + document.text, + path="input-text", + namespaces=[locals, globals], + ) + except ValueError: + # Invalid cursor position. + # ValueError('`column` parameter is not in a valid range.') + return None + except AttributeError: + # Workaround for #65: https://github.com/jonathanslenders/python-prompt-toolkit/issues/65 + # See also: https://github.com/davidhalter/jedi/issues/508 + return None + except IndexError: + # Workaround Jedi issue #514: for https://github.com/davidhalter/jedi/issues/514 + return None + except KeyError: + # Workaround for a crash when the input is "u'", the start of a unicode string. + return None + except Exception: + # Workaround for: https://github.com/jonathanslenders/ptpython/issues/91 + return None + + +_multiline_string_delims = re.compile("""[']{3}|["]{3}""") + + +def document_is_multiline_python(document: Document) -> bool: + """ + Determine whether this is a multiline Python document. + """ + + def ends_in_multiline_string() -> bool: + """ + ``True`` if we're inside a multiline string at the end of the text. + """ + delims = _multiline_string_delims.findall(document.text) + opening = None + for delim in delims: + if opening is None: + opening = delim + elif delim == opening: + opening = None + return bool(opening) + + if "\n" in document.text or ends_in_multiline_string(): + return True + + def line_ends_with_colon() -> bool: + return document.current_line.rstrip()[-1:] == ":" + + # If we just typed a colon, or still have open brackets, always insert a real newline. + if ( + line_ends_with_colon() + or ( + document.is_cursor_at_the_end + and has_unclosed_brackets(document.text_before_cursor) + ) + or document.text.startswith("@") + ): + return True + + # If the character before the cursor is a backslash (line continuation + # char), insert a new line. + elif document.text_before_cursor[-1:] == "\\": + return True + + return False + + +_T = TypeVar("_T", bound=Callable[[MouseEvent], None]) + + +def if_mousedown(handler: _T) -> _T: + """ + Decorator for mouse handlers. + Only handle event when the user pressed mouse down. + + (When applied to a token list. Scroll events will bubble up and are handled + by the Window.) + """ + + def handle_if_mouse_down(mouse_event: MouseEvent) -> NotImplementedOrNone: + if mouse_event.event_type == MouseEventType.MOUSE_DOWN: + return handler(mouse_event) + else: + return NotImplemented + + return cast(_T, handle_if_mouse_down) + + +_T_type = TypeVar("_T_type", bound=type) + + +def ptrepr_to_repr(cls: _T_type) -> _T_type: + """ + Generate a normal `__repr__` method for classes that have a `__pt_repr__`. + """ + if not hasattr(cls, "__pt_repr__"): + raise TypeError( + "@ptrepr_to_repr can only be applied to classes that have a `__pt_repr__` method." + ) + + def __repr__(self: object) -> str: + assert hasattr(cls, "__pt_repr__") + return fragment_list_to_text(to_formatted_text(cls.__pt_repr__(self))) + + cls.__repr__ = __repr__ # type:ignore + return cls + + +def unindent_code(text: str) -> str: + """ + Remove common leading whitespace when all lines are indented. + """ + lines = text.splitlines(keepends=True) + + # Look for common prefix. + common_prefix = _common_whitespace_prefix(lines) + + # Remove indentation. + lines = [line[len(common_prefix) :] for line in lines] + + return "".join(lines) + + +def _common_whitespace_prefix(strings: Iterable[str]) -> str: + """ + Return common prefix for a list of lines. + This will ignore lines that contain whitespace only. + """ + # Ignore empty lines and lines that have whitespace only. + strings = [s for s in strings if not s.isspace() and not len(s) == 0] + + if not strings: + return "" + + else: + s1 = min(strings) + s2 = max(strings) + + for i, c in enumerate(s1): + if c != s2[i] or c not in " \t": + return s1[:i] + + return s1 diff --git a/ptpython/validator.py b/ptpython/validator.py new file mode 100644 index 0000000..91b9c28 --- /dev/null +++ b/ptpython/validator.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from typing import Callable + +from prompt_toolkit.document import Document +from prompt_toolkit.validation import ValidationError, Validator + +from .utils import unindent_code + +__all__ = ["PythonValidator"] + + +class PythonValidator(Validator): + """ + Validation of Python input. + + :param get_compiler_flags: Callable that returns the currently + active compiler flags. + """ + + def __init__(self, get_compiler_flags: Callable[[], int] | None = None) -> None: + self.get_compiler_flags = get_compiler_flags + + def validate(self, document: Document) -> None: + """ + Check input for Python syntax errors. + """ + text = unindent_code(document.text) + + # When the input starts with Ctrl-Z, always accept. This means EOF in a + # Python REPL. + if text.startswith("\x1a"): + return + + # When the input starts with an exclamation mark. Accept as shell + # command. + if text.lstrip().startswith("!"): + return + + try: + if self.get_compiler_flags: + flags = self.get_compiler_flags() + else: + flags = 0 + + compile(text, "", "exec", flags=flags, dont_inherit=True) + except SyntaxError as e: + # Note, the 'or 1' for offset is required because Python 2.7 + # gives `None` as offset in case of '4=4' as input. (Looks like + # fixed in Python 3.) + # TODO: This is not correct if indentation was removed. + index = document.translate_row_col_to_index( + (e.lineno or 1) - 1, (e.offset or 1) - 1 + ) + raise ValidationError(index, f"Syntax Error: {e}") + except TypeError as e: + # e.g. "compile() expected string without null bytes" + raise ValidationError(0, str(e)) + except ValueError as e: + # In Python 2, compiling "\x9" (an invalid escape sequence) raises + # ValueError instead of SyntaxError. + raise ValidationError(0, "Syntax Error: %s" % e) -- cgit v1.2.3