diff options
-rw-r--r-- | .github/workflows/test.yaml | 6 | ||||
-rw-r--r-- | CHANGELOG | 29 | ||||
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | README.rst | 15 | ||||
-rwxr-xr-x | examples/asyncio-python-embed.py | 15 | ||||
-rwxr-xr-x | examples/asyncio-ssh-python-embed.py | 18 | ||||
-rw-r--r-- | examples/ptpython_config/config.py | 3 | ||||
-rwxr-xr-x | examples/python-embed-with-custom-prompt.py | 12 | ||||
-rwxr-xr-x | examples/python-embed.py | 2 | ||||
-rwxr-xr-x | examples/ssh-and-telnet-embed.py | 11 | ||||
-rw-r--r-- | ptpython/completer.py | 8 | ||||
-rw-r--r-- | ptpython/contrib/asyncssh_repl.py | 28 | ||||
-rw-r--r-- | ptpython/entry_points/run_ptpython.py | 21 | ||||
-rw-r--r-- | ptpython/history_browser.py | 3 | ||||
-rw-r--r-- | ptpython/ipython.py | 11 | ||||
-rw-r--r-- | ptpython/layout.py | 22 | ||||
-rw-r--r-- | ptpython/lexer.py | 2 | ||||
-rw-r--r-- | ptpython/printer.py | 435 | ||||
-rw-r--r-- | ptpython/python_input.py | 85 | ||||
-rw-r--r-- | ptpython/repl.py | 492 | ||||
-rw-r--r-- | ptpython/signatures.py | 3 | ||||
-rw-r--r-- | ptpython/style.py | 2 | ||||
-rw-r--r-- | ptpython/utils.py | 14 | ||||
-rw-r--r-- | ptpython/validator.py | 2 | ||||
-rw-r--r-- | pyproject.toml | 48 | ||||
-rw-r--r-- | setup.py | 17 |
26 files changed, 773 insertions, 533 deletions
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 31837db..9a50f3b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -22,13 +22,13 @@ jobs: run: | sudo apt remove python3-pip python -m pip install --upgrade pip - python -m pip install . black isort mypy pytest readme_renderer + python -m pip install . ruff mypy pytest readme_renderer pip list - name: Type Checker run: | mypy ptpython - isort -c --profile black ptpython examples setup.py - black --check ptpython examples setup.py + ruff . + ruff format --check . - name: Run Tests run: | ./tests/run_tests.py @@ -1,6 +1,33 @@ CHANGELOG ========= +3.0.25: 2023-12-14 +------------------ + +Fixes: +- Fix handling of 'config file does not exist' when embedding ptpython. + + +3.0.24: 2023-12-13 +------------------ + +Fixes: +- Don't show "Impossible to read config file" warnings when no config file was + passed to `run_config()`. +- IPython integration fixes: + * Fix top-level await in IPython. + * Fix IPython `DeprecationWarning`. +- Output printing fixes: + * Paginate exceptions if pagination is enabled. + * Handle big outputs without running out of memory. +- Asyncio REPL improvements: + * From now on, passing `--asyncio` is required to activate the asyncio-REPL. + This will ensure that an event loop is created at the start in which we can + run top-level await statements. + * Use `get_running_loop()` instead of `get_event_loop()`. + * Better handling of `SystemExit` and control-c in the async REPL. + + 3.0.23: 2023-02-22 ------------------ @@ -191,7 +218,7 @@ New features: - Optional pager for displaying outputs that don't fit on the screen. - Added --light-bg and --dark-bg flags to automatically optimize the brightness of the colors according to the terminal background. -- Addd `PTPYTHON_CONFIG_HOME` for explicitely setting the config directory. +- Add `PTPYTHON_CONFIG_HOME` for explicitly setting the config directory. - Show completion suffixes (like '(' for functions). Fixes: @@ -1,4 +1,4 @@ -Copyright (c) 2015, Jonathan Slenders +Copyright (c) 2015-2023, Jonathan Slenders All rights reserved. Redistribution and use in source and binary forms, with or without modification, @@ -71,6 +71,7 @@ The help menu shows basic command-line options. -h, --help show this help message and exit --vi Enable Vi key bindings -i, --interactive Start interactive shell after executing this file. + --asyncio Run an asyncio event loop to support top-level "await". --light-bg Run on a light background (use dark colors for text). --dark-bg Run on a dark background (use light colors for text). --config-file CONFIG_FILE @@ -171,6 +172,20 @@ error. .. image :: https://github.com/jonathanslenders/ptpython/raw/master/docs/images/validation.png +Asyncio REPL and top level await +******************************** + +In order to get top-level ``await`` support, start ptpython as follows: + +.. code:: + + ptpython --asyncio + +This will spawn an asyncio event loop and embed the async REPL in the event +loop. After this, top-level await will work and statements like ``await +asyncio.sleep(10)`` will execute. + + Additional features ******************* diff --git a/examples/asyncio-python-embed.py b/examples/asyncio-python-embed.py index 05f52f1..a8fbba5 100755 --- a/examples/asyncio-python-embed.py +++ b/examples/asyncio-python-embed.py @@ -19,7 +19,7 @@ loop = asyncio.get_event_loop() counter = [0] -async def print_counter(): +async def print_counter() -> None: """ Coroutine that prints counters and saves it in a global variable. """ @@ -29,7 +29,7 @@ async def print_counter(): await asyncio.sleep(3) -async def interactive_shell(): +async def interactive_shell() -> None: """ Coroutine that starts a Python REPL from which we can access the global counter variable. @@ -44,13 +44,10 @@ async def interactive_shell(): loop.stop() -def main(): - asyncio.ensure_future(print_counter()) - asyncio.ensure_future(interactive_shell()) - - loop.run_forever() - loop.close() +async def main() -> None: + asyncio.create_task(print_counter()) + await interactive_shell() if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/examples/asyncio-ssh-python-embed.py b/examples/asyncio-ssh-python-embed.py index 86b5607..be0689e 100755 --- a/examples/asyncio-ssh-python-embed.py +++ b/examples/asyncio-ssh-python-embed.py @@ -32,31 +32,25 @@ class MySSHServer(asyncssh.SSHServer): return ReplSSHServerSession(self.get_namespace) -def main(port=8222): +async def main(port: int = 8222) -> None: """ Example that starts the REPL through an SSH server. """ - loop = asyncio.get_event_loop() - # Namespace exposed in the REPL. environ = {"hello": "world"} # Start SSH server. - def create_server(): + def create_server() -> MySSHServer: return MySSHServer(lambda: environ) print("Listening on :%i" % port) print('To connect, do "ssh localhost -p %i"' % port) - loop.run_until_complete( - asyncssh.create_server( - create_server, "", port, server_host_keys=["/etc/ssh/ssh_host_dsa_key"] - ) + await asyncssh.create_server( + create_server, "", port, server_host_keys=["/etc/ssh/ssh_host_dsa_key"] ) - - # Run eventloop. - loop.run_forever() + await asyncio.Future() # Wait forever. if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/examples/ptpython_config/config.py b/examples/ptpython_config/config.py index 2f3f49d..b25850a 100644 --- a/examples/ptpython_config/config.py +++ b/examples/ptpython_config/config.py @@ -70,6 +70,9 @@ def configure(repl): # Vi mode. repl.vi_mode = False + # Enable the modal cursor (when using Vi mode). Other options are 'Block', 'Underline', 'Beam', 'Blink under', 'Blink block', and 'Blink beam' + repl.cursor_shape_config = "Modal (vi)" + # Paste mode. (When True, don't insert whitespace after new line.) repl.paste_mode = False diff --git a/examples/python-embed-with-custom-prompt.py b/examples/python-embed-with-custom-prompt.py index 968aedc..d54da1d 100755 --- a/examples/python-embed-with-custom-prompt.py +++ b/examples/python-embed-with-custom-prompt.py @@ -2,26 +2,26 @@ """ Example of embedding a Python REPL, and setting a custom prompt. """ -from prompt_toolkit.formatted_text import HTML +from prompt_toolkit.formatted_text import HTML, AnyFormattedText from ptpython.prompt_style import PromptStyle from ptpython.repl import embed -def configure(repl): +def configure(repl) -> None: # Probably, the best is to add a new PromptStyle to `all_prompt_styles` and # activate it. This way, the other styles are still selectable from the # menu. class CustomPrompt(PromptStyle): - def in_prompt(self): + def in_prompt(self) -> AnyFormattedText: return HTML("<ansigreen>Input[%s]</ansigreen>: ") % ( repl.current_statement_index, ) - def in2_prompt(self, width): + def in2_prompt(self, width: int) -> AnyFormattedText: return "...: ".rjust(width) - def out_prompt(self): + def out_prompt(self) -> AnyFormattedText: return HTML("<ansired>Result[%s]</ansired>: ") % ( repl.current_statement_index, ) @@ -30,7 +30,7 @@ def configure(repl): repl.prompt_style = "custom" -def main(): +def main() -> None: embed(globals(), locals(), configure=configure) diff --git a/examples/python-embed.py b/examples/python-embed.py index ac2cd06..49224ac 100755 --- a/examples/python-embed.py +++ b/examples/python-embed.py @@ -4,7 +4,7 @@ from ptpython.repl import embed -def main(): +def main() -> None: embed(globals(), locals(), vi_mode=False) diff --git a/examples/ssh-and-telnet-embed.py b/examples/ssh-and-telnet-embed.py index 378784c..62fa76d 100755 --- a/examples/ssh-and-telnet-embed.py +++ b/examples/ssh-and-telnet-embed.py @@ -11,13 +11,16 @@ import pathlib import asyncssh from prompt_toolkit import print_formatted_text -from prompt_toolkit.contrib.ssh.server import PromptToolkitSSHServer +from prompt_toolkit.contrib.ssh.server import ( + PromptToolkitSSHServer, + PromptToolkitSSHSession, +) from prompt_toolkit.contrib.telnet.server import TelnetServer from ptpython.repl import embed -def ensure_key(filename="ssh_host_key"): +def ensure_key(filename: str = "ssh_host_key") -> str: path = pathlib.Path(filename) if not path.exists(): rsa_key = asyncssh.generate_private_key("ssh-rsa") @@ -25,12 +28,12 @@ def ensure_key(filename="ssh_host_key"): return str(path) -async def interact(connection=None): +async def interact(connection: PromptToolkitSSHSession) -> None: global_dict = {**globals(), "print": print_formatted_text} await embed(return_asyncio_coroutine=True, globals=global_dict) -async def main(ssh_port=8022, telnet_port=8023): +async def main(ssh_port: int = 8022, telnet_port: int = 8023) -> None: ssh_server = PromptToolkitSSHServer(interact=interact) await asyncssh.create_server( lambda: ssh_server, "", ssh_port, server_host_keys=[ensure_key()] diff --git a/ptpython/completer.py b/ptpython/completer.py index f28d2b1..91d6647 100644 --- a/ptpython/completer.py +++ b/ptpython/completer.py @@ -6,7 +6,7 @@ import inspect import keyword import re from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, Iterable from prompt_toolkit.completion import ( CompleteEvent, @@ -259,7 +259,7 @@ class JediCompleter(Completer): # See: https://github.com/jonathanslenders/ptpython/issues/223 pass except Exception: - # Supress all other Jedi exceptions. + # Suppress all other Jedi exceptions. pass else: # Move function parameters to the top. @@ -367,7 +367,7 @@ class DictionaryCompleter(Completer): rf""" {expression} - # Dict loopup to complete (square bracket open + start of + # Dict lookup to complete (square bracket open + start of # string). \[ \s* ([^\[\]]*)$ @@ -380,7 +380,7 @@ class DictionaryCompleter(Completer): rf""" {expression} - # Attribute loopup to complete (dot + varname). + # Attribute lookup to complete (dot + varname). \. \s* ([a-zA-Z0-9_]*)$ """, diff --git a/ptpython/contrib/asyncssh_repl.py b/ptpython/contrib/asyncssh_repl.py index 0347ade..2f74eb2 100644 --- a/ptpython/contrib/asyncssh_repl.py +++ b/ptpython/contrib/asyncssh_repl.py @@ -9,20 +9,20 @@ package should be installable in Python 2 as well! from __future__ import annotations import asyncio -from typing import Any, Optional, TextIO, cast +from typing import Any, AnyStr, TextIO, cast import asyncssh from prompt_toolkit.data_structures import Size from prompt_toolkit.input import create_pipe_input from prompt_toolkit.output.vt100 import Vt100_Output -from ptpython.python_input import _GetNamespace +from ptpython.python_input import _GetNamespace, _Namespace from ptpython.repl import PythonRepl __all__ = ["ReplSSHServerSession"] -class ReplSSHServerSession(asyncssh.SSHServerSession): +class ReplSSHServerSession(asyncssh.SSHServerSession[str]): """ SSH server session that runs a Python REPL. @@ -35,7 +35,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession): ) -> None: self._chan: Any = None - def _globals() -> dict: + def _globals() -> _Namespace: data = get_globals() data.setdefault("print", self._print) return data @@ -79,7 +79,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession): width, height, pixwidth, pixheight = self._chan.get_terminal_size() return Size(rows=height, columns=width) - def connection_made(self, chan): + def connection_made(self, chan: Any) -> None: """ Client connected, run repl in coroutine. """ @@ -89,7 +89,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession): f = asyncio.ensure_future(self.repl.run_async()) # Close channel when done. - def done(_) -> None: + def done(_: object) -> None: chan.close() self._chan = None @@ -98,24 +98,28 @@ class ReplSSHServerSession(asyncssh.SSHServerSession): def shell_requested(self) -> bool: return True - def terminal_size_changed(self, width, height, pixwidth, pixheight): + def terminal_size_changed( + self, width: int, height: int, pixwidth: int, pixheight: int + ) -> None: """ When the terminal size changes, report back to CLI. """ self.repl.app._on_resize() - def data_received(self, data, datatype): + def data_received(self, data: AnyStr, datatype: int | None) -> None: """ When data is received, send to inputstream of the CLI and repaint. """ - self._input_pipe.send(data) + self._input_pipe.send(data) # type: ignore - def _print(self, *data, sep=" ", end="\n", file=None) -> None: + def _print( + self, *data: object, sep: str = " ", end: str = "\n", file: Any = None + ) -> None: """ Alternative 'print' function that prints back into the SSH channel. """ # Pop keyword-only arguments. (We cannot use the syntax from the # signature. Otherwise, Python2 will give a syntax error message when # installing.) - data = sep.join(map(str, data)) - self._chan.write(data + end) + data_as_str = sep.join(map(str, data)) + self._chan.write(data_as_str + end) diff --git a/ptpython/entry_points/run_ptpython.py b/ptpython/entry_points/run_ptpython.py index 1b4074d..1d4a532 100644 --- a/ptpython/entry_points/run_ptpython.py +++ b/ptpython/entry_points/run_ptpython.py @@ -9,6 +9,7 @@ optional arguments: -h, --help show this help message and exit --vi Enable Vi key bindings -i, --interactive Start interactive shell after executing this file. + --asyncio Run an asyncio event loop to support top-level "await". --light-bg Run on a light background (use dark colors for text). --dark-bg Run on a dark background (use light colors for text). --config-file CONFIG_FILE @@ -24,11 +25,12 @@ environment variables: from __future__ import annotations import argparse +import asyncio import os import pathlib import sys from textwrap import dedent -from typing import IO, Optional, Tuple +from typing import IO import appdirs from prompt_toolkit.formatted_text import HTML @@ -69,15 +71,20 @@ def create_parser() -> _Parser: help="Start interactive shell after executing this file.", ) parser.add_argument( + "--asyncio", + action="store_true", + help='Run an asyncio event loop to support top-level "await".', + ) + parser.add_argument( "--light-bg", action="store_true", help="Run on a light background (use dark colors for text).", - ), + ) parser.add_argument( "--dark-bg", action="store_true", help="Run on a dark background (use light colors for text).", - ), + ) parser.add_argument( "--config-file", type=str, help="Location of configuration file." ) @@ -206,7 +213,7 @@ def run() -> None: import __main__ - embed( + embed_result = embed( # type: ignore vi_mode=a.vi, history_filename=history_file, configure=configure, @@ -214,8 +221,14 @@ def run() -> None: globals=__main__.__dict__, startup_paths=startup_paths, title="Python REPL (ptpython)", + return_asyncio_coroutine=a.asyncio, ) + if a.asyncio: + print("Starting ptpython asyncio REPL") + print('Use "await" directly instead of "asyncio.run()".') + asyncio.run(embed_result) + if __name__ == "__main__": run() diff --git a/ptpython/history_browser.py b/ptpython/history_browser.py index eea81c2..b667be1 100644 --- a/ptpython/history_browser.py +++ b/ptpython/history_browser.py @@ -7,7 +7,7 @@ run as a sub application of the Repl/PythonInput. from __future__ import annotations from functools import partial -from typing import TYPE_CHECKING, Callable, List, Optional, Set +from typing import TYPE_CHECKING, Callable from prompt_toolkit.application import Application from prompt_toolkit.application.current import get_app @@ -107,6 +107,7 @@ Further, remember that searching works like in Emacs class BORDER: "Box drawing characters." + HORIZONTAL = "\u2501" VERTICAL = "\u2503" TOP_LEFT = "\u250f" diff --git a/ptpython/ipython.py b/ptpython/ipython.py index fb4b5ed..ad0516a 100644 --- a/ptpython/ipython.py +++ b/ptpython/ipython.py @@ -14,7 +14,7 @@ from typing import Iterable from warnings import warn from IPython import utils as ipy_utils -from IPython.core.inputsplitter import IPythonInputSplitter +from IPython.core.inputtransformer2 import TransformerManager from IPython.terminal.embed import InteractiveShellEmbed as _InteractiveShellEmbed from IPython.terminal.ipapp import load_default_config from prompt_toolkit.completion import ( @@ -38,6 +38,7 @@ from ptpython.prompt_style import PromptStyle from .completer import PythonCompleter from .python_input import PythonInput +from .repl import PyCF_ALLOW_TOP_LEVEL_AWAIT from .style import default_ui_style from .validator import PythonValidator @@ -65,7 +66,7 @@ class IPythonPrompt(PromptStyle): class IPythonValidator(PythonValidator): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.isp = IPythonInputSplitter() + self.isp = TransformerManager() def validate(self, document: Document) -> None: document = Document(text=self.isp.transform_cell(document.text)) @@ -211,6 +212,12 @@ class IPythonInput(PythonInput): self.ui_styles = {"default": Style.from_dict(style_dict)} self.use_ui_colorscheme("default") + def get_compiler_flags(self): + flags = super().get_compiler_flags() + if self.ipython_shell.autoawait: + flags |= PyCF_ALLOW_TOP_LEVEL_AWAIT + return flags + class InteractiveShellEmbed(_InteractiveShellEmbed): """ diff --git a/ptpython/layout.py b/ptpython/layout.py index d15e52e..2c1ec15 100644 --- a/ptpython/layout.py +++ b/ptpython/layout.py @@ -7,7 +7,7 @@ import platform import sys from enum import Enum from inspect import _ParameterKind as ParameterKind -from typing import TYPE_CHECKING, Any, List, Optional, Type +from typing import TYPE_CHECKING, Any from prompt_toolkit.application import get_app from prompt_toolkit.enums import DEFAULT_BUFFER, SEARCH_BUFFER @@ -17,11 +17,7 @@ from prompt_toolkit.filters import ( is_done, renderer_height_is_known, ) -from prompt_toolkit.formatted_text import ( - AnyFormattedText, - fragment_list_width, - to_formatted_text, -) +from prompt_toolkit.formatted_text import fragment_list_width, to_formatted_text from prompt_toolkit.formatted_text.base import StyleAndTextTuples from prompt_toolkit.key_binding.vi_state import InputMode from prompt_toolkit.layout.containers import ( @@ -60,7 +56,6 @@ from prompt_toolkit.widgets.toolbars import ( SystemToolbar, ValidationToolbar, ) -from pygments.lexers import PythonLexer from .filters import HasSignature, ShowDocstring, ShowSidebar, ShowSignature from .prompt_style import PromptStyle @@ -74,6 +69,7 @@ __all__ = ["PtPythonLayout", "CompletionVisualisation"] class CompletionVisualisation(Enum): "Visualisation method for the completions." + NONE = "none" POP_UP = "pop-up" MULTI_COLUMN = "multi-column" @@ -151,7 +147,7 @@ def python_sidebar(python_input: PythonInput) -> Window: append_category(category) for option in category.options: - append(i, option.title, "%s" % (option.get_current_value(),)) + append(i, option.title, str(option.get_current_value())) i += 1 tokens.pop() # Remove last newline. @@ -302,13 +298,15 @@ def signature_toolbar(python_input: PythonInput) -> Container: content=Window( FormattedTextControl(get_text_fragments), height=Dimension.exact(1) ), - filter= # Show only when there is a signature - HasSignature(python_input) & + filter=HasSignature(python_input) + & # Signature needs to be shown. - ShowSignature(python_input) & + ShowSignature(python_input) + & # And no sidebar is visible. - ~ShowSidebar(python_input) & + ~ShowSidebar(python_input) + & # Not done yet. ~is_done, ) diff --git a/ptpython/lexer.py b/ptpython/lexer.py index 81924c9..d925e95 100644 --- a/ptpython/lexer.py +++ b/ptpython/lexer.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Callable, Optional +from typing import Callable from prompt_toolkit.document import Document from prompt_toolkit.formatted_text import StyleAndTextTuples diff --git a/ptpython/printer.py b/ptpython/printer.py new file mode 100644 index 0000000..3618934 --- /dev/null +++ b/ptpython/printer.py @@ -0,0 +1,435 @@ +from __future__ import annotations + +import sys +import traceback +from dataclasses import dataclass +from enum import Enum +from typing import Generator, Iterable + +from prompt_toolkit.formatted_text import ( + HTML, + AnyFormattedText, + FormattedText, + OneStyleAndTextTuple, + StyleAndTextTuples, + fragment_list_width, + merge_formatted_text, + to_formatted_text, +) +from prompt_toolkit.formatted_text.utils import split_lines +from prompt_toolkit.input import Input +from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent +from prompt_toolkit.output import Output +from prompt_toolkit.shortcuts import PromptSession, print_formatted_text +from prompt_toolkit.styles import BaseStyle, StyleTransformation +from prompt_toolkit.styles.pygments import pygments_token_to_classname +from prompt_toolkit.utils import get_cwidth +from pygments.lexers import PythonLexer, PythonTracebackLexer + +__all__ = ["OutputPrinter"] + +# Never reformat results larger than this: +MAX_REFORMAT_SIZE = 1_000_000 + + +@dataclass +class OutputPrinter: + """ + Result printer. + + Usage:: + + printer = OutputPrinter(...) + printer.display_result(...) + printer.display_exception(...) + """ + + output: Output + input: Input + style: BaseStyle + title: AnyFormattedText + style_transformation: StyleTransformation + + def display_result( + self, + result: object, + *, + out_prompt: AnyFormattedText, + reformat: bool, + highlight: bool, + paginate: bool, + ) -> None: + """ + Show __repr__ (or `__pt_repr__`) for an `eval` result and print to output. + + :param reformat: Reformat result using 'black' before printing if the + result is parsable as Python code. + :param highlight: Syntax highlight the result. + :param paginate: Show paginator when the result does not fit on the + screen. + """ + out_prompt = to_formatted_text(out_prompt) + out_prompt_width = fragment_list_width(out_prompt) + + result = self._insert_out_prompt_and_split_lines( + self._format_result_output( + result, + reformat=reformat, + highlight=highlight, + line_length=self.output.get_size().columns - out_prompt_width, + paginate=paginate, + ), + out_prompt=out_prompt, + ) + self._display_result(result, paginate=paginate) + + def display_exception( + self, e: BaseException, *, highlight: bool, paginate: bool + ) -> None: + """ + Render an exception. + """ + result = self._insert_out_prompt_and_split_lines( + self._format_exception_output(e, highlight=highlight), + out_prompt="", + ) + self._display_result(result, paginate=paginate) + + def display_style_and_text_tuples( + self, + result: Iterable[OneStyleAndTextTuple], + *, + paginate: bool, + ) -> None: + self._display_result( + self._insert_out_prompt_and_split_lines(result, out_prompt=""), + paginate=paginate, + ) + + def _display_result( + self, + lines: Iterable[StyleAndTextTuples], + *, + paginate: bool, + ) -> None: + if paginate: + self._print_paginated_formatted_text(lines) + else: + for line in lines: + self._print_formatted_text(line) + + self.output.flush() + + def _print_formatted_text(self, line: StyleAndTextTuples, end: str = "\n") -> None: + print_formatted_text( + FormattedText(line), + style=self.style, + style_transformation=self.style_transformation, + include_default_pygments_style=False, + output=self.output, + end=end, + ) + + def _format_result_output( + self, + result: object, + *, + reformat: bool, + highlight: bool, + line_length: int, + paginate: bool, + ) -> Generator[OneStyleAndTextTuple, None, None]: + """ + Format __repr__ for an `eval` result. + + Note: this can raise `KeyboardInterrupt` if either calling `__repr__`, + `__pt_repr__` or formatting the output with "Black" takes to long + and the user presses Control-C. + """ + # If __pt_repr__ is present, take this. This can return prompt_toolkit + # formatted text. + try: + if hasattr(result, "__pt_repr__"): + formatted_result_repr = to_formatted_text( + getattr(result, "__pt_repr__")() + ) + yield from formatted_result_repr + return + except KeyboardInterrupt: + raise # Don't catch here. + except: + # For bad code, `__getattr__` can raise something that's not an + # `AttributeError`. This happens already when calling `hasattr()`. + pass + + # Call `__repr__` of given object first, to turn it in a string. + try: + result_repr = repr(result) + except KeyboardInterrupt: + raise # Don't catch here. + except BaseException as e: + # Calling repr failed. + self.display_exception(e, highlight=highlight, paginate=paginate) + return + + # Determine whether it's valid Python code. If not, + # reformatting/highlighting won't be applied. + if len(result_repr) < MAX_REFORMAT_SIZE: + try: + compile(result_repr, "", "eval") + except SyntaxError: + valid_python = False + else: + valid_python = True + else: + valid_python = False + + if valid_python and reformat: + # Inline import. Slightly speed up start-up time if black is + # not used. + try: + import black + + if not hasattr(black, "Mode"): + raise ImportError + except ImportError: + pass # no Black package in your installation + else: + result_repr = black.format_str( + result_repr, + mode=black.Mode(line_length=line_length), + ) + + if valid_python and highlight: + yield from _lex_python_result(result_repr) + else: + yield ("", result_repr) + + def _insert_out_prompt_and_split_lines( + self, result: Iterable[OneStyleAndTextTuple], out_prompt: AnyFormattedText + ) -> Iterable[StyleAndTextTuples]: + r""" + Split styled result in lines (based on the \n characters in the result) + an insert output prompt on whitespace in front of each line. (This does + not yet do the soft wrapping.) + + Yield lines as a result. + """ + out_prompt = to_formatted_text(out_prompt) + out_prompt_width = fragment_list_width(out_prompt) + prefix = ("", " " * out_prompt_width) + + for i, line in enumerate(split_lines(result)): + if i == 0: + line = [*out_prompt, *line] + else: + line = [prefix, *line] + yield line + + def _apply_soft_wrapping( + self, lines: Iterable[StyleAndTextTuples] + ) -> Iterable[StyleAndTextTuples]: + """ + Apply soft wrapping to the given lines. Wrap according to the terminal + width. Insert whitespace in front of each wrapped line to align it with + the output prompt. + """ + line_length = self.output.get_size().columns + + # Iterate over hard wrapped lines. + for lineno, line in enumerate(lines): + columns_in_buffer = 0 + current_line: list[OneStyleAndTextTuple] = [] + + for style, text, *_ in line: + for c in text: + width = get_cwidth(c) + + # (Soft) wrap line if it doesn't fit. + if columns_in_buffer + width > line_length: + yield current_line + columns_in_buffer = 0 + current_line = [] + + columns_in_buffer += width + current_line.append((style, c)) + + if len(current_line) > 0: + yield current_line + + def _print_paginated_formatted_text( + self, lines: Iterable[StyleAndTextTuples] + ) -> None: + """ + Print formatted text, using --MORE-- style pagination. + (Avoid filling up the terminal's scrollback buffer.) + """ + lines = self._apply_soft_wrapping(lines) + pager_prompt = create_pager_prompt( + self.style, self.title, output=self.output, input=self.input + ) + + abort = False + print_all = False + + # Max number of lines allowed in the buffer before painting. + size = self.output.get_size() + max_rows = size.rows - 1 + + # Page buffer. + page: StyleAndTextTuples = [] + + def show_pager() -> None: + nonlocal abort, max_rows, print_all + + # Run pager prompt in another thread. + # Same as for the input. This prevents issues with nested event + # loops. + pager_result = pager_prompt.prompt(in_thread=True) + + if pager_result == PagerResult.ABORT: + print("...") + abort = True + + elif pager_result == PagerResult.NEXT_LINE: + max_rows = 1 + + elif pager_result == PagerResult.NEXT_PAGE: + max_rows = size.rows - 1 + + elif pager_result == PagerResult.PRINT_ALL: + print_all = True + + # Loop over lines. Show --MORE-- prompt when page is filled. + rows = 0 + + for lineno, line in enumerate(lines): + page.extend(line) + page.append(("", "\n")) + rows += 1 + + if rows >= max_rows: + self._print_formatted_text(page, end="") + page = [] + rows = 0 + + if not print_all: + show_pager() + if abort: + return + + self._print_formatted_text(page) + + def _format_exception_output( + self, e: BaseException, highlight: bool + ) -> Generator[OneStyleAndTextTuple, None, None]: + # Instead of just calling ``traceback.format_exc``, we take the + # traceback and skip the bottom calls of this framework. + t, v, tb = sys.exc_info() + + # Required for pdb.post_mortem() to work. + sys.last_type, sys.last_value, sys.last_traceback = t, v, tb + + tblist = list(traceback.extract_tb(tb)) + + for line_nr, tb_tuple in enumerate(tblist): + if tb_tuple[0] == "<stdin>": + tblist = tblist[line_nr:] + break + + tb_list = traceback.format_list(tblist) + if tb_list: + tb_list.insert(0, "Traceback (most recent call last):\n") + tb_list.extend(traceback.format_exception_only(t, v)) + + tb_str = "".join(tb_list) + + # Format exception and write to output. + # (We use the default style. Most other styles result + # in unreadable colors for the traceback.) + if highlight: + for index, tokentype, text in PythonTracebackLexer().get_tokens_unprocessed( + tb_str + ): + yield ("class:" + pygments_token_to_classname(tokentype), text) + else: + yield ("", tb_str) + + +class PagerResult(Enum): + ABORT = "ABORT" + NEXT_LINE = "NEXT_LINE" + NEXT_PAGE = "NEXT_PAGE" + PRINT_ALL = "PRINT_ALL" + + +def create_pager_prompt( + style: BaseStyle, + title: AnyFormattedText = "", + input: Input | None = None, + output: Output | None = None, +) -> PromptSession[PagerResult]: + """ + Create a "--MORE--" prompt for paginated output. + """ + bindings = KeyBindings() + + @bindings.add("enter") + @bindings.add("down") + def next_line(event: KeyPressEvent) -> None: + event.app.exit(result=PagerResult.NEXT_LINE) + + @bindings.add("space") + def next_page(event: KeyPressEvent) -> None: + event.app.exit(result=PagerResult.NEXT_PAGE) + + @bindings.add("a") + def print_all(event: KeyPressEvent) -> None: + event.app.exit(result=PagerResult.PRINT_ALL) + + @bindings.add("q") + @bindings.add("c-c") + @bindings.add("c-d") + @bindings.add("escape", eager=True) + def no(event: KeyPressEvent) -> None: + event.app.exit(result=PagerResult.ABORT) + + @bindings.add("<any>") + def _(event: KeyPressEvent) -> None: + "Disallow inserting other text." + pass + + session: PromptSession[PagerResult] = PromptSession( + merge_formatted_text( + [ + title, + HTML( + "<status-toolbar>" + "<more> -- MORE -- </more> " + "<key>[Enter]</key> Scroll " + "<key>[Space]</key> Next page " + "<key>[a]</key> Print all " + "<key>[q]</key> Quit " + "</status-toolbar>: " + ), + ] + ), + key_bindings=bindings, + erase_when_done=True, + style=style, + input=input, + output=output, + ) + return session + + +def _lex_python_result(result: str) -> Generator[tuple[str, str], None, None]: + "Return token list for Python string." + lexer = PythonLexer() + # Use `get_tokens_unprocessed`, so that we get exactly the same string, + # without line endings appended. `print_formatted_text` already appends a + # line ending, and otherwise we'll have two line endings. + tokens = lexer.get_tokens_unprocessed(result) + + for index, tokentype, text in tokens: + yield ("class:" + pygments_token_to_classname(tokentype), text) diff --git a/ptpython/python_input.py b/ptpython/python_input.py index da19076..54ddbef 100644 --- a/ptpython/python_input.py +++ b/ptpython/python_input.py @@ -4,20 +4,9 @@ This can be used for creation of Python REPLs. """ from __future__ import annotations -from asyncio import get_event_loop +from asyncio import get_running_loop from functools import partial -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Generic, - List, - Mapping, - Optional, - Tuple, - TypeVar, -) +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Mapping, TypeVar, Union from prompt_toolkit.application import Application, get_app from prompt_toolkit.auto_suggest import ( @@ -42,7 +31,7 @@ from prompt_toolkit.cursor_shapes import ( ) from prompt_toolkit.document import Document from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode -from prompt_toolkit.filters import Condition +from prompt_toolkit.filters import Condition, FilterOrBool from prompt_toolkit.formatted_text import AnyFormattedText from prompt_toolkit.history import ( FileHistory, @@ -60,8 +49,13 @@ from prompt_toolkit.key_binding.bindings.auto_suggest import load_auto_suggest_b from prompt_toolkit.key_binding.bindings.open_in_editor import ( load_open_in_editor_bindings, ) +from prompt_toolkit.key_binding.key_bindings import Binding, KeyHandlerCallable +from prompt_toolkit.key_binding.key_processor import KeyPressEvent from prompt_toolkit.key_binding.vi_state import InputMode +from prompt_toolkit.keys import Keys from prompt_toolkit.layout.containers import AnyContainer +from prompt_toolkit.layout.dimension import AnyDimension +from prompt_toolkit.layout.processors import Processor from prompt_toolkit.lexers import DynamicLexer, Lexer, SimpleLexer from prompt_toolkit.output import ColorDepth, Output from prompt_toolkit.styles import ( @@ -102,22 +96,23 @@ if TYPE_CHECKING: from typing_extensions import Protocol class _SupportsLessThan(Protocol): - # Taken from typeshed. _T is used by "sorted", which needs anything + # Taken from typeshed. _T_lt is used by "sorted", which needs anything # sortable. def __lt__(self, __other: Any) -> bool: ... -_T = TypeVar("_T", bound="_SupportsLessThan") +_T_lt = TypeVar("_T_lt", bound="_SupportsLessThan") +_T_kh = TypeVar("_T_kh", bound=Union[KeyHandlerCallable, Binding]) -class OptionCategory(Generic[_T]): - def __init__(self, title: str, options: list[Option[_T]]) -> None: +class OptionCategory(Generic[_T_lt]): + def __init__(self, title: str, options: list[Option[_T_lt]]) -> None: self.title = title self.options = options -class Option(Generic[_T]): +class Option(Generic[_T_lt]): """ Ptpython configuration option that can be shown and modified from the sidebar. @@ -133,10 +128,10 @@ class Option(Generic[_T]): self, title: str, description: str, - get_current_value: Callable[[], _T], + get_current_value: Callable[[], _T_lt], # We accept `object` as return type for the select functions, because # often they return an unused boolean. Maybe this can be improved. - get_values: Callable[[], Mapping[_T, Callable[[], object]]], + get_values: Callable[[], Mapping[_T_lt, Callable[[], object]]], ) -> None: self.title = title self.description = description @@ -144,7 +139,7 @@ class Option(Generic[_T]): self.get_values = get_values @property - def values(self) -> Mapping[_T, Callable[[], object]]: + def values(self) -> Mapping[_T_lt, Callable[[], object]]: return self.get_values() def activate_next(self, _previous: bool = False) -> None: @@ -219,10 +214,10 @@ class PythonInput: _completer: Completer | None = None, _validator: Validator | None = None, _lexer: Lexer | None = None, - _extra_buffer_processors=None, + _extra_buffer_processors: list[Processor] | None = None, _extra_layout_body: AnyContainer | None = None, - _extra_toolbars=None, - _input_buffer_height=None, + _extra_toolbars: list[AnyContainer] | None = None, + _input_buffer_height: AnyDimension | None = None, ) -> None: self.get_globals: _GetNamespace = get_globals or (lambda: {}) self.get_locals: _GetNamespace = get_locals or self.get_globals @@ -333,7 +328,7 @@ class PythonInput: # Cursor shapes. self.cursor_shape_config = "Block" - self.all_cursor_shape_configs: Dict[str, AnyCursorShapeConfig] = { + self.all_cursor_shape_configs: dict[str, AnyCursorShapeConfig] = { "Block": CursorShape.BLOCK, "Underline": CursorShape.UNDERLINE, "Beam": CursorShape.BEAM, @@ -379,7 +374,7 @@ class PythonInput: self.options = self._create_options() self.selected_option_index: int = 0 - #: Incremeting integer counting the current statement. + #: Incrementing integer counting the current statement. self.current_statement_index: int = 1 # Code signatures. (This is set asynchronously after a timeout.) @@ -477,24 +472,36 @@ class PythonInput: return flags - @property - def add_key_binding(self) -> Callable[[_T], _T]: + def add_key_binding( + self, + *keys: Keys | str, + filter: FilterOrBool = True, + eager: FilterOrBool = False, + is_global: FilterOrBool = False, + save_before: Callable[[KeyPressEvent], bool] = (lambda e: True), + record_in_macro: FilterOrBool = True, + ) -> Callable[[_T_kh], _T_kh]: """ Shortcut for adding new key bindings. (Mostly useful for a config.py file, that receives a PythonInput/Repl instance as input.) + All arguments are identical to prompt_toolkit's `KeyBindings.add`. + :: @python_input.add_key_binding(Keys.ControlX, filter=...) def handler(event): ... """ - - def add_binding_decorator(*k, **kw): - return self.extra_key_bindings.add(*k, **kw) - - return add_binding_decorator + return self.extra_key_bindings.add( + *keys, + filter=filter, + eager=eager, + is_global=is_global, + save_before=save_before, + record_in_macro=record_in_macro, + ) def install_code_colorscheme(self, name: str, style: BaseStyle) -> None: """ @@ -607,10 +614,10 @@ class PythonInput: description="Change the cursor style, possibly according " "to the Vi input mode.", get_current_value=lambda: self.cursor_shape_config, - get_values=lambda: dict( - (s, partial(enable, "cursor_shape_config", s)) + get_values=lambda: { + s: partial(enable, "cursor_shape_config", s) for s in self.all_cursor_shape_configs - ), + }, ), simple_option( title="Paste mode", @@ -835,7 +842,7 @@ class PythonInput: [ simple_option( title="Syntax highlighting", - description="Use colors for syntax highligthing", + description="Use colors for syntax highlighting", field_name="enable_syntax_highlighting", ), simple_option( @@ -1003,7 +1010,7 @@ class PythonInput: app = self.app async def on_timeout_task() -> None: - loop = get_event_loop() + loop = get_running_loop() # Never run multiple get-signature threads. if self._get_signatures_thread_running: diff --git a/ptpython/repl.py b/ptpython/repl.py index 02a5075..fc9b9da 100644 --- a/ptpython/repl.py +++ b/ptpython/repl.py @@ -12,38 +12,24 @@ from __future__ import annotations import asyncio import builtins import os +import signal import sys import traceback import types import warnings from dis import COMPILER_FLAG_NAMES -from enum import Enum -from typing import Any, Callable, ContextManager, Dict, Optional - -from prompt_toolkit.formatted_text import ( - HTML, - AnyFormattedText, - FormattedText, - PygmentsTokens, - StyleAndTextTuples, - fragment_list_width, - merge_formatted_text, - to_formatted_text, -) -from prompt_toolkit.formatted_text.utils import fragment_list_to_text, split_lines -from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent +from typing import Any, Callable, ContextManager, Iterable + +from prompt_toolkit.formatted_text import OneStyleAndTextTuple from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context from prompt_toolkit.shortcuts import ( - PromptSession, clear_title, - print_formatted_text, set_title, ) -from prompt_toolkit.styles import BaseStyle -from prompt_toolkit.utils import DummyContext, get_cwidth -from pygments.lexers import PythonLexer, PythonTracebackLexer -from pygments.token import Token +from prompt_toolkit.utils import DummyContext +from pygments.lexers import PythonTracebackLexer # noqa: F401 +from .printer import OutputPrinter from .python_input import PythonInput PyCF_ALLOW_TOP_LEVEL_AWAIT: int @@ -108,7 +94,9 @@ class PythonRepl(PythonInput): else: # Print. if result is not None: - self.show_result(result) + self._show_result(result) + if self.insert_blank_line_after_output: + self.app.output.write("\n") # Loop. self.current_statement_index += 1 @@ -123,6 +111,24 @@ class PythonRepl(PythonInput): # any case.) self._handle_keyboard_interrupt(e) + def _get_output_printer(self) -> OutputPrinter: + return OutputPrinter( + output=self.app.output, + input=self.app.input, + style=self._current_style, + style_transformation=self.style_transformation, + title=self.title, + ) + + def _show_result(self, result: object) -> None: + self._get_output_printer().display_result( + result=result, + out_prompt=self.get_output_prompt(), + reformat=self.enable_output_formatting, + highlight=self.enable_syntax_highlighting, + paginate=self.enable_pager, + ) + def run(self) -> None: """ Run the REPL loop. @@ -153,27 +159,58 @@ class PythonRepl(PythonInput): clear_title() self._remove_from_namespace() - async def run_and_show_expression_async(self, text: str): - loop = asyncio.get_event_loop() + async def run_and_show_expression_async(self, text: str) -> Any: + loop = asyncio.get_running_loop() + system_exit: SystemExit | None = None try: - result = await self.eval_async(text) - except KeyboardInterrupt: # KeyboardInterrupt doesn't inherit from Exception. - raise - except SystemExit: - return - except BaseException as e: - self._handle_exception(e) - else: - # Print. - if result is not None: - await loop.run_in_executor(None, lambda: self.show_result(result)) + try: + # Create `eval` task. Ensure that control-c will cancel this + # task. + async def eval() -> Any: + nonlocal system_exit + try: + return await self.eval_async(text) + except SystemExit as e: + # Don't propagate SystemExit in `create_task()`. That + # will kill the event loop. We want to handle it + # gracefully. + system_exit = e + + task = asyncio.create_task(eval()) + loop.add_signal_handler(signal.SIGINT, lambda *_: task.cancel()) + result = await task + + if system_exit is not None: + raise system_exit + except KeyboardInterrupt: + # KeyboardInterrupt doesn't inherit from Exception. + raise + except SystemExit: + raise + except BaseException as e: + self._handle_exception(e) + else: + # Print. + if result is not None: + await loop.run_in_executor(None, lambda: self._show_result(result)) - # Loop. - self.current_statement_index += 1 - self.signatures = [] - # Return the result for future consumers. - return result + # Loop. + self.current_statement_index += 1 + self.signatures = [] + # Return the result for future consumers. + return result + finally: + loop.remove_signal_handler(signal.SIGINT) + + except KeyboardInterrupt as e: + # Handle all possible `KeyboardInterrupt` errors. This can + # happen during the `eval`, but also during the + # `show_result` if something takes too long. + # (Try/catch is around the whole block, because we want to + # prevent that a Control-C keypress terminates the REPL in + # any case.) + self._handle_keyboard_interrupt(e) async def run_async(self) -> None: """ @@ -187,7 +224,7 @@ class PythonRepl(PythonInput): (Both for control-C to work, as well as for the code to see the right thread in which it was embedded). """ - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() if self.terminal_title: set_title(self.terminal_title) @@ -217,6 +254,8 @@ class PythonRepl(PythonInput): # `KeyboardInterrupt` exceptions can end up in the event # loop selector. self._handle_keyboard_interrupt(e) + except SystemExit: + return finally: if self.terminal_title: clear_title() @@ -245,7 +284,7 @@ class PythonRepl(PythonInput): result = eval(code, self.get_globals(), self.get_locals()) if _has_coroutine_flag(code): - result = asyncio.get_event_loop().run_until_complete(result) + result = asyncio.get_running_loop().run_until_complete(result) self._store_eval_result(result) return result @@ -258,7 +297,7 @@ class PythonRepl(PythonInput): result = eval(code, self.get_globals(), self.get_locals()) if _has_coroutine_flag(code): - result = asyncio.get_event_loop().run_until_complete(result) + result = asyncio.get_running_loop().run_until_complete(result) return None @@ -318,264 +357,12 @@ class PythonRepl(PythonInput): dont_inherit=True, ) - def _format_result_output(self, result: object) -> StyleAndTextTuples: - """ - Format __repr__ for an `eval` result. - - Note: this can raise `KeyboardInterrupt` if either calling `__repr__`, - `__pt_repr__` or formatting the output with "Black" takes to long - and the user presses Control-C. - """ - out_prompt = to_formatted_text(self.get_output_prompt()) - - # If the repr is valid Python code, use the Pygments lexer. - try: - result_repr = repr(result) - except KeyboardInterrupt: - raise # Don't catch here. - except BaseException as e: - # Calling repr failed. - self._handle_exception(e) - return [] - - try: - compile(result_repr, "", "eval") - except SyntaxError: - formatted_result_repr = to_formatted_text(result_repr) - else: - # Syntactically correct. Format with black and syntax highlight. - if self.enable_output_formatting: - # Inline import. Slightly speed up start-up time if black is - # not used. - try: - import black - - if not hasattr(black, "Mode"): - raise ImportError - except ImportError: - pass # no Black package in your installation - else: - result_repr = black.format_str( - result_repr, - mode=black.Mode(line_length=self.app.output.get_size().columns), - ) - - formatted_result_repr = to_formatted_text( - PygmentsTokens(list(_lex_python_result(result_repr))) - ) - - # If __pt_repr__ is present, take this. This can return prompt_toolkit - # formatted text. - try: - if hasattr(result, "__pt_repr__"): - formatted_result_repr = to_formatted_text( - getattr(result, "__pt_repr__")() - ) - if isinstance(formatted_result_repr, list): - formatted_result_repr = FormattedText(formatted_result_repr) - except KeyboardInterrupt: - raise # Don't catch here. - except: - # For bad code, `__getattr__` can raise something that's not an - # `AttributeError`. This happens already when calling `hasattr()`. - pass - - # Align every line to the prompt. - line_sep = "\n" + " " * fragment_list_width(out_prompt) - indented_repr: StyleAndTextTuples = [] - - lines = list(split_lines(formatted_result_repr)) - - for i, fragment in enumerate(lines): - indented_repr.extend(fragment) - - # Add indentation separator between lines, not after the last line. - if i != len(lines) - 1: - indented_repr.append(("", line_sep)) - - # Write output tokens. - if self.enable_syntax_highlighting: - formatted_output = merge_formatted_text([out_prompt, indented_repr]) - else: - formatted_output = FormattedText( - out_prompt + [("", fragment_list_to_text(formatted_result_repr))] - ) - - return to_formatted_text(formatted_output) - - def show_result(self, result: object) -> None: - """ - Show __repr__ for an `eval` result and print to output. - """ - formatted_text_output = self._format_result_output(result) - - if self.enable_pager: - self.print_paginated_formatted_text(formatted_text_output) - else: - self.print_formatted_text(formatted_text_output) - - self.app.output.flush() - - if self.insert_blank_line_after_output: - self.app.output.write("\n") - - def print_formatted_text( - self, formatted_text: StyleAndTextTuples, end: str = "\n" - ) -> None: - print_formatted_text( - FormattedText(formatted_text), - style=self._current_style, - style_transformation=self.style_transformation, - include_default_pygments_style=False, - output=self.app.output, - end=end, - ) - - def print_paginated_formatted_text( - self, - formatted_text: StyleAndTextTuples, - end: str = "\n", - ) -> None: - """ - Print formatted text, using --MORE-- style pagination. - (Avoid filling up the terminal's scrollback buffer.) - """ - pager_prompt = self.create_pager_prompt() - size = self.app.output.get_size() - - abort = False - print_all = False - - # Max number of lines allowed in the buffer before painting. - max_rows = size.rows - 1 - - # Page buffer. - rows_in_buffer = 0 - columns_in_buffer = 0 - page: StyleAndTextTuples = [] - - def flush_page() -> None: - nonlocal page, columns_in_buffer, rows_in_buffer - self.print_formatted_text(page, end="") - page = [] - columns_in_buffer = 0 - rows_in_buffer = 0 - - def show_pager() -> None: - nonlocal abort, max_rows, print_all - - # Run pager prompt in another thread. - # Same as for the input. This prevents issues with nested event - # loops. - pager_result = pager_prompt.prompt(in_thread=True) - - if pager_result == PagerResult.ABORT: - print("...") - abort = True - - elif pager_result == PagerResult.NEXT_LINE: - max_rows = 1 - - elif pager_result == PagerResult.NEXT_PAGE: - max_rows = size.rows - 1 - - elif pager_result == PagerResult.PRINT_ALL: - print_all = True - - # Loop over lines. Show --MORE-- prompt when page is filled. - - formatted_text = formatted_text + [("", end)] - lines = list(split_lines(formatted_text)) - - for lineno, line in enumerate(lines): - for style, text, *_ in line: - for c in text: - width = get_cwidth(c) - - # (Soft) wrap line if it doesn't fit. - if columns_in_buffer + width > size.columns: - # Show pager first if we get too many lines after - # wrapping. - if rows_in_buffer + 1 >= max_rows and not print_all: - page.append(("", "\n")) - flush_page() - show_pager() - if abort: - return - - rows_in_buffer += 1 - columns_in_buffer = 0 - - columns_in_buffer += width - page.append((style, c)) - - if rows_in_buffer + 1 >= max_rows and not print_all: - page.append(("", "\n")) - flush_page() - show_pager() - if abort: - return - else: - # Add line ending between lines (if `end="\n"` was given, one - # more empty line is added in `split_lines` automatically to - # take care of the final line ending). - if lineno != len(lines) - 1: - page.append(("", "\n")) - rows_in_buffer += 1 - columns_in_buffer = 0 - - flush_page() - - def create_pager_prompt(self) -> PromptSession[PagerResult]: - """ - Create pager --MORE-- prompt. - """ - return create_pager_prompt(self._current_style, self.title) - - def _format_exception_output(self, e: BaseException) -> PygmentsTokens: - # Instead of just calling ``traceback.format_exc``, we take the - # traceback and skip the bottom calls of this framework. - t, v, tb = sys.exc_info() - - # Required for pdb.post_mortem() to work. - sys.last_type, sys.last_value, sys.last_traceback = t, v, tb - - tblist = list(traceback.extract_tb(tb)) - - for line_nr, tb_tuple in enumerate(tblist): - if tb_tuple[0] == "<stdin>": - tblist = tblist[line_nr:] - break - - l = traceback.format_list(tblist) - if l: - l.insert(0, "Traceback (most recent call last):\n") - l.extend(traceback.format_exception_only(t, v)) - - tb_str = "".join(l) - - # Format exception and write to output. - # (We use the default style. Most other styles result - # in unreadable colors for the traceback.) - if self.enable_syntax_highlighting: - tokens = list(_lex_python_traceback(tb_str)) - else: - tokens = [(Token, tb_str)] - return PygmentsTokens(tokens) - def _handle_exception(self, e: BaseException) -> None: - output = self.app.output - - tokens = self._format_exception_output(e) - - print_formatted_text( - tokens, - style=self._current_style, - style_transformation=self.style_transformation, - include_default_pygments_style=False, - output=output, + self._get_output_printer().display_exception( + e, + highlight=self.enable_syntax_highlighting, + paginate=self.enable_pager, ) - output.flush() def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None: output = self.app.output @@ -602,21 +389,16 @@ class PythonRepl(PythonInput): globals = self.get_globals() del globals["get_ptpython"] - -def _lex_python_traceback(tb): - "Return token list for traceback string." - lexer = PythonTracebackLexer() - return lexer.get_tokens(tb) - - -def _lex_python_result(tb): - "Return token list for Python string." - lexer = PythonLexer() - # Use `get_tokens_unprocessed`, so that we get exactly the same string, - # without line endings appended. `print_formatted_text` already appends a - # line ending, and otherwise we'll have two line endings. - tokens = lexer.get_tokens_unprocessed(tb) - return [(tokentype, value) for index, tokentype, value in tokens] + def print_paginated_formatted_text( + self, + formatted_text: Iterable[OneStyleAndTextTuple], + end: str = "\n", + ) -> None: + # Warning: This is mainly here backwards-compatibility. Some projects + # call `print_paginated_formatted_text` on the Repl object. + self._get_output_printer().display_style_and_text_tuples( + formatted_text, paginate=True + ) def enable_deprecation_warnings() -> None: @@ -630,25 +412,31 @@ def enable_deprecation_warnings() -> None: warnings.filterwarnings("default", category=DeprecationWarning, module="__main__") -def run_config( - repl: PythonInput, config_file: str = "~/.config/ptpython/config.py" -) -> None: +DEFAULT_CONFIG_FILE = "~/.config/ptpython/config.py" + + +def run_config(repl: PythonInput, config_file: str | None = None) -> None: """ Execute REPL config file. :param repl: `PythonInput` instance. :param config_file: Path of the configuration file. """ + explicit_config_file = config_file is not None + # Expand tildes. - config_file = os.path.expanduser(config_file) + config_file = os.path.expanduser( + config_file if config_file is not None else DEFAULT_CONFIG_FILE + ) def enter_to_continue() -> None: input("\nPress ENTER to continue...") # Check whether this file exists. if not os.path.exists(config_file): - print("Impossible to read %r" % config_file) - enter_to_continue() + if explicit_config_file: + print(f"Impossible to read {config_file}") + enter_to_continue() return # Run the config file in an empty namespace. @@ -741,67 +529,3 @@ def embed( else: with patch_context: repl.run() - - -class PagerResult(Enum): - ABORT = "ABORT" - NEXT_LINE = "NEXT_LINE" - NEXT_PAGE = "NEXT_PAGE" - PRINT_ALL = "PRINT_ALL" - - -def create_pager_prompt( - style: BaseStyle, title: AnyFormattedText = "" -) -> PromptSession[PagerResult]: - """ - Create a "continue" prompt for paginated output. - """ - bindings = KeyBindings() - - @bindings.add("enter") - @bindings.add("down") - def next_line(event: KeyPressEvent) -> None: - event.app.exit(result=PagerResult.NEXT_LINE) - - @bindings.add("space") - def next_page(event: KeyPressEvent) -> None: - event.app.exit(result=PagerResult.NEXT_PAGE) - - @bindings.add("a") - def print_all(event: KeyPressEvent) -> None: - event.app.exit(result=PagerResult.PRINT_ALL) - - @bindings.add("q") - @bindings.add("c-c") - @bindings.add("c-d") - @bindings.add("escape", eager=True) - def no(event: KeyPressEvent) -> None: - event.app.exit(result=PagerResult.ABORT) - - @bindings.add("<any>") - def _(event: KeyPressEvent) -> None: - "Disallow inserting other text." - pass - - style - - session: PromptSession[PagerResult] = PromptSession( - merge_formatted_text( - [ - title, - HTML( - "<status-toolbar>" - "<more> -- MORE -- </more> " - "<key>[Enter]</key> Scroll " - "<key>[Space]</key> Next page " - "<key>[a]</key> Print all " - "<key>[q]</key> Quit " - "</status-toolbar>: " - ), - ] - ), - key_bindings=bindings, - erase_when_done=True, - style=style, - ) - return session diff --git a/ptpython/signatures.py b/ptpython/signatures.py index 5a6f286..d4cb98c 100644 --- a/ptpython/signatures.py +++ b/ptpython/signatures.py @@ -10,7 +10,7 @@ from __future__ import annotations import inspect from inspect import Signature as InspectSignature from inspect import _ParameterKind as ParameterKind -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple +from typing import TYPE_CHECKING, Any, Sequence from prompt_toolkit.document import Document @@ -203,7 +203,6 @@ def get_signatures_using_eval( running `eval()` over the detected function name. """ # Look for open parenthesis, before cursor position. - text = document.text_before_cursor pos = document.cursor_position - 1 paren_mapping = {")": "(", "}": "{", "]": "["} diff --git a/ptpython/style.py b/ptpython/style.py index 199d5ab..c5a04e5 100644 --- a/ptpython/style.py +++ b/ptpython/style.py @@ -1,7 +1,5 @@ from __future__ import annotations -from typing import Dict - from prompt_toolkit.styles import BaseStyle, Style, merge_styles from prompt_toolkit.styles.pygments import style_from_pygments_cls from prompt_toolkit.utils import is_conemu_ansi, is_windows, is_windows_vt100_supported diff --git a/ptpython/utils.py b/ptpython/utils.py index 5348899..28887d2 100644 --- a/ptpython/utils.py +++ b/ptpython/utils.py @@ -4,17 +4,7 @@ For internal use only. from __future__ import annotations import re -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - Optional, - Type, - TypeVar, - cast, -) +from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast from prompt_toolkit.document import Document from prompt_toolkit.formatted_text import to_formatted_text @@ -91,7 +81,7 @@ def get_jedi_script_from_document( # Workaround Jedi issue #514: for https://github.com/davidhalter/jedi/issues/514 return None except KeyError: - # Workaroud for a crash when the input is "u'", the start of a unicode string. + # Workaround for a crash when the input is "u'", the start of a unicode string. return None except Exception: # Workaround for: https://github.com/jonathanslenders/ptpython/issues/91 diff --git a/ptpython/validator.py b/ptpython/validator.py index 3b36d27..91b9c28 100644 --- a/ptpython/validator.py +++ b/ptpython/validator.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Callable, Optional +from typing import Callable from prompt_toolkit.document import Document from prompt_toolkit.validation import ValidationError, Validator diff --git a/pyproject.toml b/pyproject.toml index b356239..5421c45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,35 @@ -[tool.black] -target-version = ['py36'] - - -[tool.isort] -# isort configuration that is compatible with Black. -multi_line_output = 3 -include_trailing_comma = true -known_first_party = "ptpython" -known_third_party = "prompt_toolkit,pygments,asyncssh" -force_grid_wrap = 0 -use_parentheses = true -line_length = 88 +[tool.ruff] +target-version = "py37" +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "C", # flake8-comprehensions + "T", # Print. + "I", # isort + # "B", # flake8-bugbear + "UP", # pyupgrade + "RUF100", # unused-noqa + "Q", # quotes +] +ignore = [ + "E501", # Line too long, handled by black + "C901", # Too complex + "E722", # bare except. +] + + +[tool.ruff.per-file-ignores] +"examples/*" = ["T201"] # Print allowed in examples. +"examples/ptpython_config/config.py" = ["F401"] # Unused imports in config. +"ptpython/entry_points/run_ptipython.py" = ["T201", "F401"] # Print, import usage. +"ptpython/entry_points/run_ptpython.py" = ["T201"] # Print usage. +"ptpython/ipython.py" = ["T100"] # Import usage. +"ptpython/repl.py" = ["T201"] # Print usage. +"ptpython/printer.py" = ["T201"] # Print usage. +"tests/run_tests.py" = ["F401"] # Unused imports. + + +[tool.ruff.isort] +known-first-party = ["ptpython"] +known-third-party = ["prompt_toolkit", "pygments", "asyncssh"] @@ -11,7 +11,7 @@ with open(os.path.join(os.path.dirname(__file__), "README.rst")) as f: setup( name="ptpython", author="Jonathan Slenders", - version="3.0.23", + version="3.0.25", url="https://github.com/prompt-toolkit/ptpython", description="Python REPL build on top of prompt_toolkit", long_description=long_description, @@ -21,12 +21,13 @@ setup( "appdirs", "importlib_metadata;python_version<'3.8'", "jedi>=0.16.0", - # Use prompt_toolkit 3.0.28, because of cursor shape support. - "prompt_toolkit>=3.0.28,<3.1.0", + # Use prompt_toolkit 3.0.34, because of `OneStyleAndTextTuple` import. + "prompt_toolkit>=3.0.34,<3.1.0", "pygments", ], python_requires=">=3.7", classifiers=[ + "License :: OSI Approved :: BSD License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7", @@ -39,12 +40,14 @@ setup( "ptpython = ptpython.entry_points.run_ptpython:run", "ptipython = ptpython.entry_points.run_ptipython:run", "ptpython%s = ptpython.entry_points.run_ptpython:run" % sys.version_info[0], - "ptpython%s.%s = ptpython.entry_points.run_ptpython:run" - % sys.version_info[:2], + "ptpython{}.{} = ptpython.entry_points.run_ptpython:run".format( + *sys.version_info[:2] + ), "ptipython%s = ptpython.entry_points.run_ptipython:run" % sys.version_info[0], - "ptipython%s.%s = ptpython.entry_points.run_ptipython:run" - % sys.version_info[:2], + "ptipython{}.{} = ptpython.entry_points.run_ptipython:run".format( + *sys.version_info[:2] + ), ] }, extras_require={ |