summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/test.yaml6
-rw-r--r--CHANGELOG29
-rw-r--r--LICENSE2
-rw-r--r--README.rst15
-rwxr-xr-xexamples/asyncio-python-embed.py15
-rwxr-xr-xexamples/asyncio-ssh-python-embed.py18
-rw-r--r--examples/ptpython_config/config.py3
-rwxr-xr-xexamples/python-embed-with-custom-prompt.py12
-rwxr-xr-xexamples/python-embed.py2
-rwxr-xr-xexamples/ssh-and-telnet-embed.py11
-rw-r--r--ptpython/completer.py8
-rw-r--r--ptpython/contrib/asyncssh_repl.py28
-rw-r--r--ptpython/entry_points/run_ptpython.py21
-rw-r--r--ptpython/history_browser.py3
-rw-r--r--ptpython/ipython.py11
-rw-r--r--ptpython/layout.py22
-rw-r--r--ptpython/lexer.py2
-rw-r--r--ptpython/printer.py435
-rw-r--r--ptpython/python_input.py85
-rw-r--r--ptpython/repl.py492
-rw-r--r--ptpython/signatures.py3
-rw-r--r--ptpython/style.py2
-rw-r--r--ptpython/utils.py14
-rw-r--r--ptpython/validator.py2
-rw-r--r--pyproject.toml48
-rw-r--r--setup.py17
26 files changed, 773 insertions, 533 deletions
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index 31837db..9a50f3b 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -22,13 +22,13 @@ jobs:
run: |
sudo apt remove python3-pip
python -m pip install --upgrade pip
- python -m pip install . black isort mypy pytest readme_renderer
+ python -m pip install . ruff mypy pytest readme_renderer
pip list
- name: Type Checker
run: |
mypy ptpython
- isort -c --profile black ptpython examples setup.py
- black --check ptpython examples setup.py
+ ruff .
+ ruff format --check .
- name: Run Tests
run: |
./tests/run_tests.py
diff --git a/CHANGELOG b/CHANGELOG
index 645ca60..e827700 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,33 @@
CHANGELOG
=========
+3.0.25: 2023-12-14
+------------------
+
+Fixes:
+- Fix handling of 'config file does not exist' when embedding ptpython.
+
+
+3.0.24: 2023-12-13
+------------------
+
+Fixes:
+- Don't show "Impossible to read config file" warnings when no config file was
+ passed to `run_config()`.
+- IPython integration fixes:
+ * Fix top-level await in IPython.
+ * Fix IPython `DeprecationWarning`.
+- Output printing fixes:
+ * Paginate exceptions if pagination is enabled.
+ * Handle big outputs without running out of memory.
+- Asyncio REPL improvements:
+ * From now on, passing `--asyncio` is required to activate the asyncio-REPL.
+ This will ensure that an event loop is created at the start in which we can
+ run top-level await statements.
+ * Use `get_running_loop()` instead of `get_event_loop()`.
+ * Better handling of `SystemExit` and control-c in the async REPL.
+
+
3.0.23: 2023-02-22
------------------
@@ -191,7 +218,7 @@ New features:
- Optional pager for displaying outputs that don't fit on the screen.
- Added --light-bg and --dark-bg flags to automatically optimize the brightness
of the colors according to the terminal background.
-- Addd `PTPYTHON_CONFIG_HOME` for explicitely setting the config directory.
+- Add `PTPYTHON_CONFIG_HOME` for explicitly setting the config directory.
- Show completion suffixes (like '(' for functions).
Fixes:
diff --git a/LICENSE b/LICENSE
index 910b80a..89a5114 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2015, Jonathan Slenders
+Copyright (c) 2015-2023, Jonathan Slenders
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
diff --git a/README.rst b/README.rst
index 2db3f69..8ec9aca 100644
--- a/README.rst
+++ b/README.rst
@@ -71,6 +71,7 @@ The help menu shows basic command-line options.
-h, --help show this help message and exit
--vi Enable Vi key bindings
-i, --interactive Start interactive shell after executing this file.
+ --asyncio Run an asyncio event loop to support top-level "await".
--light-bg Run on a light background (use dark colors for text).
--dark-bg Run on a dark background (use light colors for text).
--config-file CONFIG_FILE
@@ -171,6 +172,20 @@ error.
.. image :: https://github.com/jonathanslenders/ptpython/raw/master/docs/images/validation.png
+Asyncio REPL and top level await
+********************************
+
+In order to get top-level ``await`` support, start ptpython as follows:
+
+.. code::
+
+ ptpython --asyncio
+
+This will spawn an asyncio event loop and embed the async REPL in the event
+loop. After this, top-level await will work and statements like ``await
+asyncio.sleep(10)`` will execute.
+
+
Additional features
*******************
diff --git a/examples/asyncio-python-embed.py b/examples/asyncio-python-embed.py
index 05f52f1..a8fbba5 100755
--- a/examples/asyncio-python-embed.py
+++ b/examples/asyncio-python-embed.py
@@ -19,7 +19,7 @@ loop = asyncio.get_event_loop()
counter = [0]
-async def print_counter():
+async def print_counter() -> None:
"""
Coroutine that prints counters and saves it in a global variable.
"""
@@ -29,7 +29,7 @@ async def print_counter():
await asyncio.sleep(3)
-async def interactive_shell():
+async def interactive_shell() -> None:
"""
Coroutine that starts a Python REPL from which we can access the global
counter variable.
@@ -44,13 +44,10 @@ async def interactive_shell():
loop.stop()
-def main():
- asyncio.ensure_future(print_counter())
- asyncio.ensure_future(interactive_shell())
-
- loop.run_forever()
- loop.close()
+async def main() -> None:
+ asyncio.create_task(print_counter())
+ await interactive_shell()
if __name__ == "__main__":
- main()
+ asyncio.run(main())
diff --git a/examples/asyncio-ssh-python-embed.py b/examples/asyncio-ssh-python-embed.py
index 86b5607..be0689e 100755
--- a/examples/asyncio-ssh-python-embed.py
+++ b/examples/asyncio-ssh-python-embed.py
@@ -32,31 +32,25 @@ class MySSHServer(asyncssh.SSHServer):
return ReplSSHServerSession(self.get_namespace)
-def main(port=8222):
+async def main(port: int = 8222) -> None:
"""
Example that starts the REPL through an SSH server.
"""
- loop = asyncio.get_event_loop()
-
# Namespace exposed in the REPL.
environ = {"hello": "world"}
# Start SSH server.
- def create_server():
+ def create_server() -> MySSHServer:
return MySSHServer(lambda: environ)
print("Listening on :%i" % port)
print('To connect, do "ssh localhost -p %i"' % port)
- loop.run_until_complete(
- asyncssh.create_server(
- create_server, "", port, server_host_keys=["/etc/ssh/ssh_host_dsa_key"]
- )
+ await asyncssh.create_server(
+ create_server, "", port, server_host_keys=["/etc/ssh/ssh_host_dsa_key"]
)
-
- # Run eventloop.
- loop.run_forever()
+ await asyncio.Future() # Wait forever.
if __name__ == "__main__":
- main()
+ asyncio.run(main())
diff --git a/examples/ptpython_config/config.py b/examples/ptpython_config/config.py
index 2f3f49d..b25850a 100644
--- a/examples/ptpython_config/config.py
+++ b/examples/ptpython_config/config.py
@@ -70,6 +70,9 @@ def configure(repl):
# Vi mode.
repl.vi_mode = False
+ # Enable the modal cursor (when using Vi mode). Other options are 'Block', 'Underline', 'Beam', 'Blink under', 'Blink block', and 'Blink beam'
+ repl.cursor_shape_config = "Modal (vi)"
+
# Paste mode. (When True, don't insert whitespace after new line.)
repl.paste_mode = False
diff --git a/examples/python-embed-with-custom-prompt.py b/examples/python-embed-with-custom-prompt.py
index 968aedc..d54da1d 100755
--- a/examples/python-embed-with-custom-prompt.py
+++ b/examples/python-embed-with-custom-prompt.py
@@ -2,26 +2,26 @@
"""
Example of embedding a Python REPL, and setting a custom prompt.
"""
-from prompt_toolkit.formatted_text import HTML
+from prompt_toolkit.formatted_text import HTML, AnyFormattedText
from ptpython.prompt_style import PromptStyle
from ptpython.repl import embed
-def configure(repl):
+def configure(repl) -> None:
# Probably, the best is to add a new PromptStyle to `all_prompt_styles` and
# activate it. This way, the other styles are still selectable from the
# menu.
class CustomPrompt(PromptStyle):
- def in_prompt(self):
+ def in_prompt(self) -> AnyFormattedText:
return HTML("<ansigreen>Input[%s]</ansigreen>: ") % (
repl.current_statement_index,
)
- def in2_prompt(self, width):
+ def in2_prompt(self, width: int) -> AnyFormattedText:
return "...: ".rjust(width)
- def out_prompt(self):
+ def out_prompt(self) -> AnyFormattedText:
return HTML("<ansired>Result[%s]</ansired>: ") % (
repl.current_statement_index,
)
@@ -30,7 +30,7 @@ def configure(repl):
repl.prompt_style = "custom"
-def main():
+def main() -> None:
embed(globals(), locals(), configure=configure)
diff --git a/examples/python-embed.py b/examples/python-embed.py
index ac2cd06..49224ac 100755
--- a/examples/python-embed.py
+++ b/examples/python-embed.py
@@ -4,7 +4,7 @@
from ptpython.repl import embed
-def main():
+def main() -> None:
embed(globals(), locals(), vi_mode=False)
diff --git a/examples/ssh-and-telnet-embed.py b/examples/ssh-and-telnet-embed.py
index 378784c..62fa76d 100755
--- a/examples/ssh-and-telnet-embed.py
+++ b/examples/ssh-and-telnet-embed.py
@@ -11,13 +11,16 @@ import pathlib
import asyncssh
from prompt_toolkit import print_formatted_text
-from prompt_toolkit.contrib.ssh.server import PromptToolkitSSHServer
+from prompt_toolkit.contrib.ssh.server import (
+ PromptToolkitSSHServer,
+ PromptToolkitSSHSession,
+)
from prompt_toolkit.contrib.telnet.server import TelnetServer
from ptpython.repl import embed
-def ensure_key(filename="ssh_host_key"):
+def ensure_key(filename: str = "ssh_host_key") -> str:
path = pathlib.Path(filename)
if not path.exists():
rsa_key = asyncssh.generate_private_key("ssh-rsa")
@@ -25,12 +28,12 @@ def ensure_key(filename="ssh_host_key"):
return str(path)
-async def interact(connection=None):
+async def interact(connection: PromptToolkitSSHSession) -> None:
global_dict = {**globals(), "print": print_formatted_text}
await embed(return_asyncio_coroutine=True, globals=global_dict)
-async def main(ssh_port=8022, telnet_port=8023):
+async def main(ssh_port: int = 8022, telnet_port: int = 8023) -> None:
ssh_server = PromptToolkitSSHServer(interact=interact)
await asyncssh.create_server(
lambda: ssh_server, "", ssh_port, server_host_keys=[ensure_key()]
diff --git a/ptpython/completer.py b/ptpython/completer.py
index f28d2b1..91d6647 100644
--- a/ptpython/completer.py
+++ b/ptpython/completer.py
@@ -6,7 +6,7 @@ import inspect
import keyword
import re
from enum import Enum
-from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Callable, Iterable
from prompt_toolkit.completion import (
CompleteEvent,
@@ -259,7 +259,7 @@ class JediCompleter(Completer):
# See: https://github.com/jonathanslenders/ptpython/issues/223
pass
except Exception:
- # Supress all other Jedi exceptions.
+ # Suppress all other Jedi exceptions.
pass
else:
# Move function parameters to the top.
@@ -367,7 +367,7 @@ class DictionaryCompleter(Completer):
rf"""
{expression}
- # Dict loopup to complete (square bracket open + start of
+ # Dict lookup to complete (square bracket open + start of
# string).
\[
\s* ([^\[\]]*)$
@@ -380,7 +380,7 @@ class DictionaryCompleter(Completer):
rf"""
{expression}
- # Attribute loopup to complete (dot + varname).
+ # Attribute lookup to complete (dot + varname).
\.
\s* ([a-zA-Z0-9_]*)$
""",
diff --git a/ptpython/contrib/asyncssh_repl.py b/ptpython/contrib/asyncssh_repl.py
index 0347ade..2f74eb2 100644
--- a/ptpython/contrib/asyncssh_repl.py
+++ b/ptpython/contrib/asyncssh_repl.py
@@ -9,20 +9,20 @@ package should be installable in Python 2 as well!
from __future__ import annotations
import asyncio
-from typing import Any, Optional, TextIO, cast
+from typing import Any, AnyStr, TextIO, cast
import asyncssh
from prompt_toolkit.data_structures import Size
from prompt_toolkit.input import create_pipe_input
from prompt_toolkit.output.vt100 import Vt100_Output
-from ptpython.python_input import _GetNamespace
+from ptpython.python_input import _GetNamespace, _Namespace
from ptpython.repl import PythonRepl
__all__ = ["ReplSSHServerSession"]
-class ReplSSHServerSession(asyncssh.SSHServerSession):
+class ReplSSHServerSession(asyncssh.SSHServerSession[str]):
"""
SSH server session that runs a Python REPL.
@@ -35,7 +35,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession):
) -> None:
self._chan: Any = None
- def _globals() -> dict:
+ def _globals() -> _Namespace:
data = get_globals()
data.setdefault("print", self._print)
return data
@@ -79,7 +79,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession):
width, height, pixwidth, pixheight = self._chan.get_terminal_size()
return Size(rows=height, columns=width)
- def connection_made(self, chan):
+ def connection_made(self, chan: Any) -> None:
"""
Client connected, run repl in coroutine.
"""
@@ -89,7 +89,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession):
f = asyncio.ensure_future(self.repl.run_async())
# Close channel when done.
- def done(_) -> None:
+ def done(_: object) -> None:
chan.close()
self._chan = None
@@ -98,24 +98,28 @@ class ReplSSHServerSession(asyncssh.SSHServerSession):
def shell_requested(self) -> bool:
return True
- def terminal_size_changed(self, width, height, pixwidth, pixheight):
+ def terminal_size_changed(
+ self, width: int, height: int, pixwidth: int, pixheight: int
+ ) -> None:
"""
When the terminal size changes, report back to CLI.
"""
self.repl.app._on_resize()
- def data_received(self, data, datatype):
+ def data_received(self, data: AnyStr, datatype: int | None) -> None:
"""
When data is received, send to inputstream of the CLI and repaint.
"""
- self._input_pipe.send(data)
+ self._input_pipe.send(data) # type: ignore
- def _print(self, *data, sep=" ", end="\n", file=None) -> None:
+ def _print(
+ self, *data: object, sep: str = " ", end: str = "\n", file: Any = None
+ ) -> None:
"""
Alternative 'print' function that prints back into the SSH channel.
"""
# Pop keyword-only arguments. (We cannot use the syntax from the
# signature. Otherwise, Python2 will give a syntax error message when
# installing.)
- data = sep.join(map(str, data))
- self._chan.write(data + end)
+ data_as_str = sep.join(map(str, data))
+ self._chan.write(data_as_str + end)
diff --git a/ptpython/entry_points/run_ptpython.py b/ptpython/entry_points/run_ptpython.py
index 1b4074d..1d4a532 100644
--- a/ptpython/entry_points/run_ptpython.py
+++ b/ptpython/entry_points/run_ptpython.py
@@ -9,6 +9,7 @@ optional arguments:
-h, --help show this help message and exit
--vi Enable Vi key bindings
-i, --interactive Start interactive shell after executing this file.
+ --asyncio Run an asyncio event loop to support top-level "await".
--light-bg Run on a light background (use dark colors for text).
--dark-bg Run on a dark background (use light colors for text).
--config-file CONFIG_FILE
@@ -24,11 +25,12 @@ environment variables:
from __future__ import annotations
import argparse
+import asyncio
import os
import pathlib
import sys
from textwrap import dedent
-from typing import IO, Optional, Tuple
+from typing import IO
import appdirs
from prompt_toolkit.formatted_text import HTML
@@ -69,15 +71,20 @@ def create_parser() -> _Parser:
help="Start interactive shell after executing this file.",
)
parser.add_argument(
+ "--asyncio",
+ action="store_true",
+ help='Run an asyncio event loop to support top-level "await".',
+ )
+ parser.add_argument(
"--light-bg",
action="store_true",
help="Run on a light background (use dark colors for text).",
- ),
+ )
parser.add_argument(
"--dark-bg",
action="store_true",
help="Run on a dark background (use light colors for text).",
- ),
+ )
parser.add_argument(
"--config-file", type=str, help="Location of configuration file."
)
@@ -206,7 +213,7 @@ def run() -> None:
import __main__
- embed(
+ embed_result = embed( # type: ignore
vi_mode=a.vi,
history_filename=history_file,
configure=configure,
@@ -214,8 +221,14 @@ def run() -> None:
globals=__main__.__dict__,
startup_paths=startup_paths,
title="Python REPL (ptpython)",
+ return_asyncio_coroutine=a.asyncio,
)
+ if a.asyncio:
+ print("Starting ptpython asyncio REPL")
+ print('Use "await" directly instead of "asyncio.run()".')
+ asyncio.run(embed_result)
+
if __name__ == "__main__":
run()
diff --git a/ptpython/history_browser.py b/ptpython/history_browser.py
index eea81c2..b667be1 100644
--- a/ptpython/history_browser.py
+++ b/ptpython/history_browser.py
@@ -7,7 +7,7 @@ run as a sub application of the Repl/PythonInput.
from __future__ import annotations
from functools import partial
-from typing import TYPE_CHECKING, Callable, List, Optional, Set
+from typing import TYPE_CHECKING, Callable
from prompt_toolkit.application import Application
from prompt_toolkit.application.current import get_app
@@ -107,6 +107,7 @@ Further, remember that searching works like in Emacs
class BORDER:
"Box drawing characters."
+
HORIZONTAL = "\u2501"
VERTICAL = "\u2503"
TOP_LEFT = "\u250f"
diff --git a/ptpython/ipython.py b/ptpython/ipython.py
index fb4b5ed..ad0516a 100644
--- a/ptpython/ipython.py
+++ b/ptpython/ipython.py
@@ -14,7 +14,7 @@ from typing import Iterable
from warnings import warn
from IPython import utils as ipy_utils
-from IPython.core.inputsplitter import IPythonInputSplitter
+from IPython.core.inputtransformer2 import TransformerManager
from IPython.terminal.embed import InteractiveShellEmbed as _InteractiveShellEmbed
from IPython.terminal.ipapp import load_default_config
from prompt_toolkit.completion import (
@@ -38,6 +38,7 @@ from ptpython.prompt_style import PromptStyle
from .completer import PythonCompleter
from .python_input import PythonInput
+from .repl import PyCF_ALLOW_TOP_LEVEL_AWAIT
from .style import default_ui_style
from .validator import PythonValidator
@@ -65,7 +66,7 @@ class IPythonPrompt(PromptStyle):
class IPythonValidator(PythonValidator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- self.isp = IPythonInputSplitter()
+ self.isp = TransformerManager()
def validate(self, document: Document) -> None:
document = Document(text=self.isp.transform_cell(document.text))
@@ -211,6 +212,12 @@ class IPythonInput(PythonInput):
self.ui_styles = {"default": Style.from_dict(style_dict)}
self.use_ui_colorscheme("default")
+ def get_compiler_flags(self):
+ flags = super().get_compiler_flags()
+ if self.ipython_shell.autoawait:
+ flags |= PyCF_ALLOW_TOP_LEVEL_AWAIT
+ return flags
+
class InteractiveShellEmbed(_InteractiveShellEmbed):
"""
diff --git a/ptpython/layout.py b/ptpython/layout.py
index d15e52e..2c1ec15 100644
--- a/ptpython/layout.py
+++ b/ptpython/layout.py
@@ -7,7 +7,7 @@ import platform
import sys
from enum import Enum
from inspect import _ParameterKind as ParameterKind
-from typing import TYPE_CHECKING, Any, List, Optional, Type
+from typing import TYPE_CHECKING, Any
from prompt_toolkit.application import get_app
from prompt_toolkit.enums import DEFAULT_BUFFER, SEARCH_BUFFER
@@ -17,11 +17,7 @@ from prompt_toolkit.filters import (
is_done,
renderer_height_is_known,
)
-from prompt_toolkit.formatted_text import (
- AnyFormattedText,
- fragment_list_width,
- to_formatted_text,
-)
+from prompt_toolkit.formatted_text import fragment_list_width, to_formatted_text
from prompt_toolkit.formatted_text.base import StyleAndTextTuples
from prompt_toolkit.key_binding.vi_state import InputMode
from prompt_toolkit.layout.containers import (
@@ -60,7 +56,6 @@ from prompt_toolkit.widgets.toolbars import (
SystemToolbar,
ValidationToolbar,
)
-from pygments.lexers import PythonLexer
from .filters import HasSignature, ShowDocstring, ShowSidebar, ShowSignature
from .prompt_style import PromptStyle
@@ -74,6 +69,7 @@ __all__ = ["PtPythonLayout", "CompletionVisualisation"]
class CompletionVisualisation(Enum):
"Visualisation method for the completions."
+
NONE = "none"
POP_UP = "pop-up"
MULTI_COLUMN = "multi-column"
@@ -151,7 +147,7 @@ def python_sidebar(python_input: PythonInput) -> Window:
append_category(category)
for option in category.options:
- append(i, option.title, "%s" % (option.get_current_value(),))
+ append(i, option.title, str(option.get_current_value()))
i += 1
tokens.pop() # Remove last newline.
@@ -302,13 +298,15 @@ def signature_toolbar(python_input: PythonInput) -> Container:
content=Window(
FormattedTextControl(get_text_fragments), height=Dimension.exact(1)
),
- filter=
# Show only when there is a signature
- HasSignature(python_input) &
+ filter=HasSignature(python_input)
+ &
# Signature needs to be shown.
- ShowSignature(python_input) &
+ ShowSignature(python_input)
+ &
# And no sidebar is visible.
- ~ShowSidebar(python_input) &
+ ~ShowSidebar(python_input)
+ &
# Not done yet.
~is_done,
)
diff --git a/ptpython/lexer.py b/ptpython/lexer.py
index 81924c9..d925e95 100644
--- a/ptpython/lexer.py
+++ b/ptpython/lexer.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-from typing import Callable, Optional
+from typing import Callable
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import StyleAndTextTuples
diff --git a/ptpython/printer.py b/ptpython/printer.py
new file mode 100644
index 0000000..3618934
--- /dev/null
+++ b/ptpython/printer.py
@@ -0,0 +1,435 @@
+from __future__ import annotations
+
+import sys
+import traceback
+from dataclasses import dataclass
+from enum import Enum
+from typing import Generator, Iterable
+
+from prompt_toolkit.formatted_text import (
+ HTML,
+ AnyFormattedText,
+ FormattedText,
+ OneStyleAndTextTuple,
+ StyleAndTextTuples,
+ fragment_list_width,
+ merge_formatted_text,
+ to_formatted_text,
+)
+from prompt_toolkit.formatted_text.utils import split_lines
+from prompt_toolkit.input import Input
+from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
+from prompt_toolkit.output import Output
+from prompt_toolkit.shortcuts import PromptSession, print_formatted_text
+from prompt_toolkit.styles import BaseStyle, StyleTransformation
+from prompt_toolkit.styles.pygments import pygments_token_to_classname
+from prompt_toolkit.utils import get_cwidth
+from pygments.lexers import PythonLexer, PythonTracebackLexer
+
+__all__ = ["OutputPrinter"]
+
+# Never reformat results larger than this:
+MAX_REFORMAT_SIZE = 1_000_000
+
+
+@dataclass
+class OutputPrinter:
+ """
+ Result printer.
+
+ Usage::
+
+ printer = OutputPrinter(...)
+ printer.display_result(...)
+ printer.display_exception(...)
+ """
+
+ output: Output
+ input: Input
+ style: BaseStyle
+ title: AnyFormattedText
+ style_transformation: StyleTransformation
+
+ def display_result(
+ self,
+ result: object,
+ *,
+ out_prompt: AnyFormattedText,
+ reformat: bool,
+ highlight: bool,
+ paginate: bool,
+ ) -> None:
+ """
+ Show __repr__ (or `__pt_repr__`) for an `eval` result and print to output.
+
+ :param reformat: Reformat result using 'black' before printing if the
+ result is parsable as Python code.
+ :param highlight: Syntax highlight the result.
+ :param paginate: Show paginator when the result does not fit on the
+ screen.
+ """
+ out_prompt = to_formatted_text(out_prompt)
+ out_prompt_width = fragment_list_width(out_prompt)
+
+ result = self._insert_out_prompt_and_split_lines(
+ self._format_result_output(
+ result,
+ reformat=reformat,
+ highlight=highlight,
+ line_length=self.output.get_size().columns - out_prompt_width,
+ paginate=paginate,
+ ),
+ out_prompt=out_prompt,
+ )
+ self._display_result(result, paginate=paginate)
+
+ def display_exception(
+ self, e: BaseException, *, highlight: bool, paginate: bool
+ ) -> None:
+ """
+ Render an exception.
+ """
+ result = self._insert_out_prompt_and_split_lines(
+ self._format_exception_output(e, highlight=highlight),
+ out_prompt="",
+ )
+ self._display_result(result, paginate=paginate)
+
+ def display_style_and_text_tuples(
+ self,
+ result: Iterable[OneStyleAndTextTuple],
+ *,
+ paginate: bool,
+ ) -> None:
+ self._display_result(
+ self._insert_out_prompt_and_split_lines(result, out_prompt=""),
+ paginate=paginate,
+ )
+
+ def _display_result(
+ self,
+ lines: Iterable[StyleAndTextTuples],
+ *,
+ paginate: bool,
+ ) -> None:
+ if paginate:
+ self._print_paginated_formatted_text(lines)
+ else:
+ for line in lines:
+ self._print_formatted_text(line)
+
+ self.output.flush()
+
+ def _print_formatted_text(self, line: StyleAndTextTuples, end: str = "\n") -> None:
+ print_formatted_text(
+ FormattedText(line),
+ style=self.style,
+ style_transformation=self.style_transformation,
+ include_default_pygments_style=False,
+ output=self.output,
+ end=end,
+ )
+
+ def _format_result_output(
+ self,
+ result: object,
+ *,
+ reformat: bool,
+ highlight: bool,
+ line_length: int,
+ paginate: bool,
+ ) -> Generator[OneStyleAndTextTuple, None, None]:
+ """
+ Format __repr__ for an `eval` result.
+
+ Note: this can raise `KeyboardInterrupt` if either calling `__repr__`,
+ `__pt_repr__` or formatting the output with "Black" takes to long
+ and the user presses Control-C.
+ """
+ # If __pt_repr__ is present, take this. This can return prompt_toolkit
+ # formatted text.
+ try:
+ if hasattr(result, "__pt_repr__"):
+ formatted_result_repr = to_formatted_text(
+ getattr(result, "__pt_repr__")()
+ )
+ yield from formatted_result_repr
+ return
+ except KeyboardInterrupt:
+ raise # Don't catch here.
+ except:
+ # For bad code, `__getattr__` can raise something that's not an
+ # `AttributeError`. This happens already when calling `hasattr()`.
+ pass
+
+ # Call `__repr__` of given object first, to turn it in a string.
+ try:
+ result_repr = repr(result)
+ except KeyboardInterrupt:
+ raise # Don't catch here.
+ except BaseException as e:
+ # Calling repr failed.
+ self.display_exception(e, highlight=highlight, paginate=paginate)
+ return
+
+ # Determine whether it's valid Python code. If not,
+ # reformatting/highlighting won't be applied.
+ if len(result_repr) < MAX_REFORMAT_SIZE:
+ try:
+ compile(result_repr, "", "eval")
+ except SyntaxError:
+ valid_python = False
+ else:
+ valid_python = True
+ else:
+ valid_python = False
+
+ if valid_python and reformat:
+ # Inline import. Slightly speed up start-up time if black is
+ # not used.
+ try:
+ import black
+
+ if not hasattr(black, "Mode"):
+ raise ImportError
+ except ImportError:
+ pass # no Black package in your installation
+ else:
+ result_repr = black.format_str(
+ result_repr,
+ mode=black.Mode(line_length=line_length),
+ )
+
+ if valid_python and highlight:
+ yield from _lex_python_result(result_repr)
+ else:
+ yield ("", result_repr)
+
+ def _insert_out_prompt_and_split_lines(
+ self, result: Iterable[OneStyleAndTextTuple], out_prompt: AnyFormattedText
+ ) -> Iterable[StyleAndTextTuples]:
+ r"""
+ Split styled result in lines (based on the \n characters in the result)
+ an insert output prompt on whitespace in front of each line. (This does
+ not yet do the soft wrapping.)
+
+ Yield lines as a result.
+ """
+ out_prompt = to_formatted_text(out_prompt)
+ out_prompt_width = fragment_list_width(out_prompt)
+ prefix = ("", " " * out_prompt_width)
+
+ for i, line in enumerate(split_lines(result)):
+ if i == 0:
+ line = [*out_prompt, *line]
+ else:
+ line = [prefix, *line]
+ yield line
+
+ def _apply_soft_wrapping(
+ self, lines: Iterable[StyleAndTextTuples]
+ ) -> Iterable[StyleAndTextTuples]:
+ """
+ Apply soft wrapping to the given lines. Wrap according to the terminal
+ width. Insert whitespace in front of each wrapped line to align it with
+ the output prompt.
+ """
+ line_length = self.output.get_size().columns
+
+ # Iterate over hard wrapped lines.
+ for lineno, line in enumerate(lines):
+ columns_in_buffer = 0
+ current_line: list[OneStyleAndTextTuple] = []
+
+ for style, text, *_ in line:
+ for c in text:
+ width = get_cwidth(c)
+
+ # (Soft) wrap line if it doesn't fit.
+ if columns_in_buffer + width > line_length:
+ yield current_line
+ columns_in_buffer = 0
+ current_line = []
+
+ columns_in_buffer += width
+ current_line.append((style, c))
+
+ if len(current_line) > 0:
+ yield current_line
+
+ def _print_paginated_formatted_text(
+ self, lines: Iterable[StyleAndTextTuples]
+ ) -> None:
+ """
+ Print formatted text, using --MORE-- style pagination.
+ (Avoid filling up the terminal's scrollback buffer.)
+ """
+ lines = self._apply_soft_wrapping(lines)
+ pager_prompt = create_pager_prompt(
+ self.style, self.title, output=self.output, input=self.input
+ )
+
+ abort = False
+ print_all = False
+
+ # Max number of lines allowed in the buffer before painting.
+ size = self.output.get_size()
+ max_rows = size.rows - 1
+
+ # Page buffer.
+ page: StyleAndTextTuples = []
+
+ def show_pager() -> None:
+ nonlocal abort, max_rows, print_all
+
+ # Run pager prompt in another thread.
+ # Same as for the input. This prevents issues with nested event
+ # loops.
+ pager_result = pager_prompt.prompt(in_thread=True)
+
+ if pager_result == PagerResult.ABORT:
+ print("...")
+ abort = True
+
+ elif pager_result == PagerResult.NEXT_LINE:
+ max_rows = 1
+
+ elif pager_result == PagerResult.NEXT_PAGE:
+ max_rows = size.rows - 1
+
+ elif pager_result == PagerResult.PRINT_ALL:
+ print_all = True
+
+ # Loop over lines. Show --MORE-- prompt when page is filled.
+ rows = 0
+
+ for lineno, line in enumerate(lines):
+ page.extend(line)
+ page.append(("", "\n"))
+ rows += 1
+
+ if rows >= max_rows:
+ self._print_formatted_text(page, end="")
+ page = []
+ rows = 0
+
+ if not print_all:
+ show_pager()
+ if abort:
+ return
+
+ self._print_formatted_text(page)
+
+ def _format_exception_output(
+ self, e: BaseException, highlight: bool
+ ) -> Generator[OneStyleAndTextTuple, None, None]:
+ # Instead of just calling ``traceback.format_exc``, we take the
+ # traceback and skip the bottom calls of this framework.
+ t, v, tb = sys.exc_info()
+
+ # Required for pdb.post_mortem() to work.
+ sys.last_type, sys.last_value, sys.last_traceback = t, v, tb
+
+ tblist = list(traceback.extract_tb(tb))
+
+ for line_nr, tb_tuple in enumerate(tblist):
+ if tb_tuple[0] == "<stdin>":
+ tblist = tblist[line_nr:]
+ break
+
+ tb_list = traceback.format_list(tblist)
+ if tb_list:
+ tb_list.insert(0, "Traceback (most recent call last):\n")
+ tb_list.extend(traceback.format_exception_only(t, v))
+
+ tb_str = "".join(tb_list)
+
+ # Format exception and write to output.
+ # (We use the default style. Most other styles result
+ # in unreadable colors for the traceback.)
+ if highlight:
+ for index, tokentype, text in PythonTracebackLexer().get_tokens_unprocessed(
+ tb_str
+ ):
+ yield ("class:" + pygments_token_to_classname(tokentype), text)
+ else:
+ yield ("", tb_str)
+
+
+class PagerResult(Enum):
+ ABORT = "ABORT"
+ NEXT_LINE = "NEXT_LINE"
+ NEXT_PAGE = "NEXT_PAGE"
+ PRINT_ALL = "PRINT_ALL"
+
+
+def create_pager_prompt(
+ style: BaseStyle,
+ title: AnyFormattedText = "",
+ input: Input | None = None,
+ output: Output | None = None,
+) -> PromptSession[PagerResult]:
+ """
+ Create a "--MORE--" prompt for paginated output.
+ """
+ bindings = KeyBindings()
+
+ @bindings.add("enter")
+ @bindings.add("down")
+ def next_line(event: KeyPressEvent) -> None:
+ event.app.exit(result=PagerResult.NEXT_LINE)
+
+ @bindings.add("space")
+ def next_page(event: KeyPressEvent) -> None:
+ event.app.exit(result=PagerResult.NEXT_PAGE)
+
+ @bindings.add("a")
+ def print_all(event: KeyPressEvent) -> None:
+ event.app.exit(result=PagerResult.PRINT_ALL)
+
+ @bindings.add("q")
+ @bindings.add("c-c")
+ @bindings.add("c-d")
+ @bindings.add("escape", eager=True)
+ def no(event: KeyPressEvent) -> None:
+ event.app.exit(result=PagerResult.ABORT)
+
+ @bindings.add("<any>")
+ def _(event: KeyPressEvent) -> None:
+ "Disallow inserting other text."
+ pass
+
+ session: PromptSession[PagerResult] = PromptSession(
+ merge_formatted_text(
+ [
+ title,
+ HTML(
+ "<status-toolbar>"
+ "<more> -- MORE -- </more> "
+ "<key>[Enter]</key> Scroll "
+ "<key>[Space]</key> Next page "
+ "<key>[a]</key> Print all "
+ "<key>[q]</key> Quit "
+ "</status-toolbar>: "
+ ),
+ ]
+ ),
+ key_bindings=bindings,
+ erase_when_done=True,
+ style=style,
+ input=input,
+ output=output,
+ )
+ return session
+
+
+def _lex_python_result(result: str) -> Generator[tuple[str, str], None, None]:
+ "Return token list for Python string."
+ lexer = PythonLexer()
+ # Use `get_tokens_unprocessed`, so that we get exactly the same string,
+ # without line endings appended. `print_formatted_text` already appends a
+ # line ending, and otherwise we'll have two line endings.
+ tokens = lexer.get_tokens_unprocessed(result)
+
+ for index, tokentype, text in tokens:
+ yield ("class:" + pygments_token_to_classname(tokentype), text)
diff --git a/ptpython/python_input.py b/ptpython/python_input.py
index da19076..54ddbef 100644
--- a/ptpython/python_input.py
+++ b/ptpython/python_input.py
@@ -4,20 +4,9 @@ This can be used for creation of Python REPLs.
"""
from __future__ import annotations
-from asyncio import get_event_loop
+from asyncio import get_running_loop
from functools import partial
-from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Generic,
- List,
- Mapping,
- Optional,
- Tuple,
- TypeVar,
-)
+from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Mapping, TypeVar, Union
from prompt_toolkit.application import Application, get_app
from prompt_toolkit.auto_suggest import (
@@ -42,7 +31,7 @@ from prompt_toolkit.cursor_shapes import (
)
from prompt_toolkit.document import Document
from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
-from prompt_toolkit.filters import Condition
+from prompt_toolkit.filters import Condition, FilterOrBool
from prompt_toolkit.formatted_text import AnyFormattedText
from prompt_toolkit.history import (
FileHistory,
@@ -60,8 +49,13 @@ from prompt_toolkit.key_binding.bindings.auto_suggest import load_auto_suggest_b
from prompt_toolkit.key_binding.bindings.open_in_editor import (
load_open_in_editor_bindings,
)
+from prompt_toolkit.key_binding.key_bindings import Binding, KeyHandlerCallable
+from prompt_toolkit.key_binding.key_processor import KeyPressEvent
from prompt_toolkit.key_binding.vi_state import InputMode
+from prompt_toolkit.keys import Keys
from prompt_toolkit.layout.containers import AnyContainer
+from prompt_toolkit.layout.dimension import AnyDimension
+from prompt_toolkit.layout.processors import Processor
from prompt_toolkit.lexers import DynamicLexer, Lexer, SimpleLexer
from prompt_toolkit.output import ColorDepth, Output
from prompt_toolkit.styles import (
@@ -102,22 +96,23 @@ if TYPE_CHECKING:
from typing_extensions import Protocol
class _SupportsLessThan(Protocol):
- # Taken from typeshed. _T is used by "sorted", which needs anything
+ # Taken from typeshed. _T_lt is used by "sorted", which needs anything
# sortable.
def __lt__(self, __other: Any) -> bool:
...
-_T = TypeVar("_T", bound="_SupportsLessThan")
+_T_lt = TypeVar("_T_lt", bound="_SupportsLessThan")
+_T_kh = TypeVar("_T_kh", bound=Union[KeyHandlerCallable, Binding])
-class OptionCategory(Generic[_T]):
- def __init__(self, title: str, options: list[Option[_T]]) -> None:
+class OptionCategory(Generic[_T_lt]):
+ def __init__(self, title: str, options: list[Option[_T_lt]]) -> None:
self.title = title
self.options = options
-class Option(Generic[_T]):
+class Option(Generic[_T_lt]):
"""
Ptpython configuration option that can be shown and modified from the
sidebar.
@@ -133,10 +128,10 @@ class Option(Generic[_T]):
self,
title: str,
description: str,
- get_current_value: Callable[[], _T],
+ get_current_value: Callable[[], _T_lt],
# We accept `object` as return type for the select functions, because
# often they return an unused boolean. Maybe this can be improved.
- get_values: Callable[[], Mapping[_T, Callable[[], object]]],
+ get_values: Callable[[], Mapping[_T_lt, Callable[[], object]]],
) -> None:
self.title = title
self.description = description
@@ -144,7 +139,7 @@ class Option(Generic[_T]):
self.get_values = get_values
@property
- def values(self) -> Mapping[_T, Callable[[], object]]:
+ def values(self) -> Mapping[_T_lt, Callable[[], object]]:
return self.get_values()
def activate_next(self, _previous: bool = False) -> None:
@@ -219,10 +214,10 @@ class PythonInput:
_completer: Completer | None = None,
_validator: Validator | None = None,
_lexer: Lexer | None = None,
- _extra_buffer_processors=None,
+ _extra_buffer_processors: list[Processor] | None = None,
_extra_layout_body: AnyContainer | None = None,
- _extra_toolbars=None,
- _input_buffer_height=None,
+ _extra_toolbars: list[AnyContainer] | None = None,
+ _input_buffer_height: AnyDimension | None = None,
) -> None:
self.get_globals: _GetNamespace = get_globals or (lambda: {})
self.get_locals: _GetNamespace = get_locals or self.get_globals
@@ -333,7 +328,7 @@ class PythonInput:
# Cursor shapes.
self.cursor_shape_config = "Block"
- self.all_cursor_shape_configs: Dict[str, AnyCursorShapeConfig] = {
+ self.all_cursor_shape_configs: dict[str, AnyCursorShapeConfig] = {
"Block": CursorShape.BLOCK,
"Underline": CursorShape.UNDERLINE,
"Beam": CursorShape.BEAM,
@@ -379,7 +374,7 @@ class PythonInput:
self.options = self._create_options()
self.selected_option_index: int = 0
- #: Incremeting integer counting the current statement.
+ #: Incrementing integer counting the current statement.
self.current_statement_index: int = 1
# Code signatures. (This is set asynchronously after a timeout.)
@@ -477,24 +472,36 @@ class PythonInput:
return flags
- @property
- def add_key_binding(self) -> Callable[[_T], _T]:
+ def add_key_binding(
+ self,
+ *keys: Keys | str,
+ filter: FilterOrBool = True,
+ eager: FilterOrBool = False,
+ is_global: FilterOrBool = False,
+ save_before: Callable[[KeyPressEvent], bool] = (lambda e: True),
+ record_in_macro: FilterOrBool = True,
+ ) -> Callable[[_T_kh], _T_kh]:
"""
Shortcut for adding new key bindings.
(Mostly useful for a config.py file, that receives
a PythonInput/Repl instance as input.)
+ All arguments are identical to prompt_toolkit's `KeyBindings.add`.
+
::
@python_input.add_key_binding(Keys.ControlX, filter=...)
def handler(event):
...
"""
-
- def add_binding_decorator(*k, **kw):
- return self.extra_key_bindings.add(*k, **kw)
-
- return add_binding_decorator
+ return self.extra_key_bindings.add(
+ *keys,
+ filter=filter,
+ eager=eager,
+ is_global=is_global,
+ save_before=save_before,
+ record_in_macro=record_in_macro,
+ )
def install_code_colorscheme(self, name: str, style: BaseStyle) -> None:
"""
@@ -607,10 +614,10 @@ class PythonInput:
description="Change the cursor style, possibly according "
"to the Vi input mode.",
get_current_value=lambda: self.cursor_shape_config,
- get_values=lambda: dict(
- (s, partial(enable, "cursor_shape_config", s))
+ get_values=lambda: {
+ s: partial(enable, "cursor_shape_config", s)
for s in self.all_cursor_shape_configs
- ),
+ },
),
simple_option(
title="Paste mode",
@@ -835,7 +842,7 @@ class PythonInput:
[
simple_option(
title="Syntax highlighting",
- description="Use colors for syntax highligthing",
+ description="Use colors for syntax highlighting",
field_name="enable_syntax_highlighting",
),
simple_option(
@@ -1003,7 +1010,7 @@ class PythonInput:
app = self.app
async def on_timeout_task() -> None:
- loop = get_event_loop()
+ loop = get_running_loop()
# Never run multiple get-signature threads.
if self._get_signatures_thread_running:
diff --git a/ptpython/repl.py b/ptpython/repl.py
index 02a5075..fc9b9da 100644
--- a/ptpython/repl.py
+++ b/ptpython/repl.py
@@ -12,38 +12,24 @@ from __future__ import annotations
import asyncio
import builtins
import os
+import signal
import sys
import traceback
import types
import warnings
from dis import COMPILER_FLAG_NAMES
-from enum import Enum
-from typing import Any, Callable, ContextManager, Dict, Optional
-
-from prompt_toolkit.formatted_text import (
- HTML,
- AnyFormattedText,
- FormattedText,
- PygmentsTokens,
- StyleAndTextTuples,
- fragment_list_width,
- merge_formatted_text,
- to_formatted_text,
-)
-from prompt_toolkit.formatted_text.utils import fragment_list_to_text, split_lines
-from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
+from typing import Any, Callable, ContextManager, Iterable
+
+from prompt_toolkit.formatted_text import OneStyleAndTextTuple
from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context
from prompt_toolkit.shortcuts import (
- PromptSession,
clear_title,
- print_formatted_text,
set_title,
)
-from prompt_toolkit.styles import BaseStyle
-from prompt_toolkit.utils import DummyContext, get_cwidth
-from pygments.lexers import PythonLexer, PythonTracebackLexer
-from pygments.token import Token
+from prompt_toolkit.utils import DummyContext
+from pygments.lexers import PythonTracebackLexer # noqa: F401
+from .printer import OutputPrinter
from .python_input import PythonInput
PyCF_ALLOW_TOP_LEVEL_AWAIT: int
@@ -108,7 +94,9 @@ class PythonRepl(PythonInput):
else:
# Print.
if result is not None:
- self.show_result(result)
+ self._show_result(result)
+ if self.insert_blank_line_after_output:
+ self.app.output.write("\n")
# Loop.
self.current_statement_index += 1
@@ -123,6 +111,24 @@ class PythonRepl(PythonInput):
# any case.)
self._handle_keyboard_interrupt(e)
+ def _get_output_printer(self) -> OutputPrinter:
+ return OutputPrinter(
+ output=self.app.output,
+ input=self.app.input,
+ style=self._current_style,
+ style_transformation=self.style_transformation,
+ title=self.title,
+ )
+
+ def _show_result(self, result: object) -> None:
+ self._get_output_printer().display_result(
+ result=result,
+ out_prompt=self.get_output_prompt(),
+ reformat=self.enable_output_formatting,
+ highlight=self.enable_syntax_highlighting,
+ paginate=self.enable_pager,
+ )
+
def run(self) -> None:
"""
Run the REPL loop.
@@ -153,27 +159,58 @@ class PythonRepl(PythonInput):
clear_title()
self._remove_from_namespace()
- async def run_and_show_expression_async(self, text: str):
- loop = asyncio.get_event_loop()
+ async def run_and_show_expression_async(self, text: str) -> Any:
+ loop = asyncio.get_running_loop()
+ system_exit: SystemExit | None = None
try:
- result = await self.eval_async(text)
- except KeyboardInterrupt: # KeyboardInterrupt doesn't inherit from Exception.
- raise
- except SystemExit:
- return
- except BaseException as e:
- self._handle_exception(e)
- else:
- # Print.
- if result is not None:
- await loop.run_in_executor(None, lambda: self.show_result(result))
+ try:
+ # Create `eval` task. Ensure that control-c will cancel this
+ # task.
+ async def eval() -> Any:
+ nonlocal system_exit
+ try:
+ return await self.eval_async(text)
+ except SystemExit as e:
+ # Don't propagate SystemExit in `create_task()`. That
+ # will kill the event loop. We want to handle it
+ # gracefully.
+ system_exit = e
+
+ task = asyncio.create_task(eval())
+ loop.add_signal_handler(signal.SIGINT, lambda *_: task.cancel())
+ result = await task
+
+ if system_exit is not None:
+ raise system_exit
+ except KeyboardInterrupt:
+ # KeyboardInterrupt doesn't inherit from Exception.
+ raise
+ except SystemExit:
+ raise
+ except BaseException as e:
+ self._handle_exception(e)
+ else:
+ # Print.
+ if result is not None:
+ await loop.run_in_executor(None, lambda: self._show_result(result))
- # Loop.
- self.current_statement_index += 1
- self.signatures = []
- # Return the result for future consumers.
- return result
+ # Loop.
+ self.current_statement_index += 1
+ self.signatures = []
+ # Return the result for future consumers.
+ return result
+ finally:
+ loop.remove_signal_handler(signal.SIGINT)
+
+ except KeyboardInterrupt as e:
+ # Handle all possible `KeyboardInterrupt` errors. This can
+ # happen during the `eval`, but also during the
+ # `show_result` if something takes too long.
+ # (Try/catch is around the whole block, because we want to
+ # prevent that a Control-C keypress terminates the REPL in
+ # any case.)
+ self._handle_keyboard_interrupt(e)
async def run_async(self) -> None:
"""
@@ -187,7 +224,7 @@ class PythonRepl(PythonInput):
(Both for control-C to work, as well as for the code to see the right
thread in which it was embedded).
"""
- loop = asyncio.get_event_loop()
+ loop = asyncio.get_running_loop()
if self.terminal_title:
set_title(self.terminal_title)
@@ -217,6 +254,8 @@ class PythonRepl(PythonInput):
# `KeyboardInterrupt` exceptions can end up in the event
# loop selector.
self._handle_keyboard_interrupt(e)
+ except SystemExit:
+ return
finally:
if self.terminal_title:
clear_title()
@@ -245,7 +284,7 @@ class PythonRepl(PythonInput):
result = eval(code, self.get_globals(), self.get_locals())
if _has_coroutine_flag(code):
- result = asyncio.get_event_loop().run_until_complete(result)
+ result = asyncio.get_running_loop().run_until_complete(result)
self._store_eval_result(result)
return result
@@ -258,7 +297,7 @@ class PythonRepl(PythonInput):
result = eval(code, self.get_globals(), self.get_locals())
if _has_coroutine_flag(code):
- result = asyncio.get_event_loop().run_until_complete(result)
+ result = asyncio.get_running_loop().run_until_complete(result)
return None
@@ -318,264 +357,12 @@ class PythonRepl(PythonInput):
dont_inherit=True,
)
- def _format_result_output(self, result: object) -> StyleAndTextTuples:
- """
- Format __repr__ for an `eval` result.
-
- Note: this can raise `KeyboardInterrupt` if either calling `__repr__`,
- `__pt_repr__` or formatting the output with "Black" takes to long
- and the user presses Control-C.
- """
- out_prompt = to_formatted_text(self.get_output_prompt())
-
- # If the repr is valid Python code, use the Pygments lexer.
- try:
- result_repr = repr(result)
- except KeyboardInterrupt:
- raise # Don't catch here.
- except BaseException as e:
- # Calling repr failed.
- self._handle_exception(e)
- return []
-
- try:
- compile(result_repr, "", "eval")
- except SyntaxError:
- formatted_result_repr = to_formatted_text(result_repr)
- else:
- # Syntactically correct. Format with black and syntax highlight.
- if self.enable_output_formatting:
- # Inline import. Slightly speed up start-up time if black is
- # not used.
- try:
- import black
-
- if not hasattr(black, "Mode"):
- raise ImportError
- except ImportError:
- pass # no Black package in your installation
- else:
- result_repr = black.format_str(
- result_repr,
- mode=black.Mode(line_length=self.app.output.get_size().columns),
- )
-
- formatted_result_repr = to_formatted_text(
- PygmentsTokens(list(_lex_python_result(result_repr)))
- )
-
- # If __pt_repr__ is present, take this. This can return prompt_toolkit
- # formatted text.
- try:
- if hasattr(result, "__pt_repr__"):
- formatted_result_repr = to_formatted_text(
- getattr(result, "__pt_repr__")()
- )
- if isinstance(formatted_result_repr, list):
- formatted_result_repr = FormattedText(formatted_result_repr)
- except KeyboardInterrupt:
- raise # Don't catch here.
- except:
- # For bad code, `__getattr__` can raise something that's not an
- # `AttributeError`. This happens already when calling `hasattr()`.
- pass
-
- # Align every line to the prompt.
- line_sep = "\n" + " " * fragment_list_width(out_prompt)
- indented_repr: StyleAndTextTuples = []
-
- lines = list(split_lines(formatted_result_repr))
-
- for i, fragment in enumerate(lines):
- indented_repr.extend(fragment)
-
- # Add indentation separator between lines, not after the last line.
- if i != len(lines) - 1:
- indented_repr.append(("", line_sep))
-
- # Write output tokens.
- if self.enable_syntax_highlighting:
- formatted_output = merge_formatted_text([out_prompt, indented_repr])
- else:
- formatted_output = FormattedText(
- out_prompt + [("", fragment_list_to_text(formatted_result_repr))]
- )
-
- return to_formatted_text(formatted_output)
-
- def show_result(self, result: object) -> None:
- """
- Show __repr__ for an `eval` result and print to output.
- """
- formatted_text_output = self._format_result_output(result)
-
- if self.enable_pager:
- self.print_paginated_formatted_text(formatted_text_output)
- else:
- self.print_formatted_text(formatted_text_output)
-
- self.app.output.flush()
-
- if self.insert_blank_line_after_output:
- self.app.output.write("\n")
-
- def print_formatted_text(
- self, formatted_text: StyleAndTextTuples, end: str = "\n"
- ) -> None:
- print_formatted_text(
- FormattedText(formatted_text),
- style=self._current_style,
- style_transformation=self.style_transformation,
- include_default_pygments_style=False,
- output=self.app.output,
- end=end,
- )
-
- def print_paginated_formatted_text(
- self,
- formatted_text: StyleAndTextTuples,
- end: str = "\n",
- ) -> None:
- """
- Print formatted text, using --MORE-- style pagination.
- (Avoid filling up the terminal's scrollback buffer.)
- """
- pager_prompt = self.create_pager_prompt()
- size = self.app.output.get_size()
-
- abort = False
- print_all = False
-
- # Max number of lines allowed in the buffer before painting.
- max_rows = size.rows - 1
-
- # Page buffer.
- rows_in_buffer = 0
- columns_in_buffer = 0
- page: StyleAndTextTuples = []
-
- def flush_page() -> None:
- nonlocal page, columns_in_buffer, rows_in_buffer
- self.print_formatted_text(page, end="")
- page = []
- columns_in_buffer = 0
- rows_in_buffer = 0
-
- def show_pager() -> None:
- nonlocal abort, max_rows, print_all
-
- # Run pager prompt in another thread.
- # Same as for the input. This prevents issues with nested event
- # loops.
- pager_result = pager_prompt.prompt(in_thread=True)
-
- if pager_result == PagerResult.ABORT:
- print("...")
- abort = True
-
- elif pager_result == PagerResult.NEXT_LINE:
- max_rows = 1
-
- elif pager_result == PagerResult.NEXT_PAGE:
- max_rows = size.rows - 1
-
- elif pager_result == PagerResult.PRINT_ALL:
- print_all = True
-
- # Loop over lines. Show --MORE-- prompt when page is filled.
-
- formatted_text = formatted_text + [("", end)]
- lines = list(split_lines(formatted_text))
-
- for lineno, line in enumerate(lines):
- for style, text, *_ in line:
- for c in text:
- width = get_cwidth(c)
-
- # (Soft) wrap line if it doesn't fit.
- if columns_in_buffer + width > size.columns:
- # Show pager first if we get too many lines after
- # wrapping.
- if rows_in_buffer + 1 >= max_rows and not print_all:
- page.append(("", "\n"))
- flush_page()
- show_pager()
- if abort:
- return
-
- rows_in_buffer += 1
- columns_in_buffer = 0
-
- columns_in_buffer += width
- page.append((style, c))
-
- if rows_in_buffer + 1 >= max_rows and not print_all:
- page.append(("", "\n"))
- flush_page()
- show_pager()
- if abort:
- return
- else:
- # Add line ending between lines (if `end="\n"` was given, one
- # more empty line is added in `split_lines` automatically to
- # take care of the final line ending).
- if lineno != len(lines) - 1:
- page.append(("", "\n"))
- rows_in_buffer += 1
- columns_in_buffer = 0
-
- flush_page()
-
- def create_pager_prompt(self) -> PromptSession[PagerResult]:
- """
- Create pager --MORE-- prompt.
- """
- return create_pager_prompt(self._current_style, self.title)
-
- def _format_exception_output(self, e: BaseException) -> PygmentsTokens:
- # Instead of just calling ``traceback.format_exc``, we take the
- # traceback and skip the bottom calls of this framework.
- t, v, tb = sys.exc_info()
-
- # Required for pdb.post_mortem() to work.
- sys.last_type, sys.last_value, sys.last_traceback = t, v, tb
-
- tblist = list(traceback.extract_tb(tb))
-
- for line_nr, tb_tuple in enumerate(tblist):
- if tb_tuple[0] == "<stdin>":
- tblist = tblist[line_nr:]
- break
-
- l = traceback.format_list(tblist)
- if l:
- l.insert(0, "Traceback (most recent call last):\n")
- l.extend(traceback.format_exception_only(t, v))
-
- tb_str = "".join(l)
-
- # Format exception and write to output.
- # (We use the default style. Most other styles result
- # in unreadable colors for the traceback.)
- if self.enable_syntax_highlighting:
- tokens = list(_lex_python_traceback(tb_str))
- else:
- tokens = [(Token, tb_str)]
- return PygmentsTokens(tokens)
-
def _handle_exception(self, e: BaseException) -> None:
- output = self.app.output
-
- tokens = self._format_exception_output(e)
-
- print_formatted_text(
- tokens,
- style=self._current_style,
- style_transformation=self.style_transformation,
- include_default_pygments_style=False,
- output=output,
+ self._get_output_printer().display_exception(
+ e,
+ highlight=self.enable_syntax_highlighting,
+ paginate=self.enable_pager,
)
- output.flush()
def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None:
output = self.app.output
@@ -602,21 +389,16 @@ class PythonRepl(PythonInput):
globals = self.get_globals()
del globals["get_ptpython"]
-
-def _lex_python_traceback(tb):
- "Return token list for traceback string."
- lexer = PythonTracebackLexer()
- return lexer.get_tokens(tb)
-
-
-def _lex_python_result(tb):
- "Return token list for Python string."
- lexer = PythonLexer()
- # Use `get_tokens_unprocessed`, so that we get exactly the same string,
- # without line endings appended. `print_formatted_text` already appends a
- # line ending, and otherwise we'll have two line endings.
- tokens = lexer.get_tokens_unprocessed(tb)
- return [(tokentype, value) for index, tokentype, value in tokens]
+ def print_paginated_formatted_text(
+ self,
+ formatted_text: Iterable[OneStyleAndTextTuple],
+ end: str = "\n",
+ ) -> None:
+ # Warning: This is mainly here backwards-compatibility. Some projects
+ # call `print_paginated_formatted_text` on the Repl object.
+ self._get_output_printer().display_style_and_text_tuples(
+ formatted_text, paginate=True
+ )
def enable_deprecation_warnings() -> None:
@@ -630,25 +412,31 @@ def enable_deprecation_warnings() -> None:
warnings.filterwarnings("default", category=DeprecationWarning, module="__main__")
-def run_config(
- repl: PythonInput, config_file: str = "~/.config/ptpython/config.py"
-) -> None:
+DEFAULT_CONFIG_FILE = "~/.config/ptpython/config.py"
+
+
+def run_config(repl: PythonInput, config_file: str | None = None) -> None:
"""
Execute REPL config file.
:param repl: `PythonInput` instance.
:param config_file: Path of the configuration file.
"""
+ explicit_config_file = config_file is not None
+
# Expand tildes.
- config_file = os.path.expanduser(config_file)
+ config_file = os.path.expanduser(
+ config_file if config_file is not None else DEFAULT_CONFIG_FILE
+ )
def enter_to_continue() -> None:
input("\nPress ENTER to continue...")
# Check whether this file exists.
if not os.path.exists(config_file):
- print("Impossible to read %r" % config_file)
- enter_to_continue()
+ if explicit_config_file:
+ print(f"Impossible to read {config_file}")
+ enter_to_continue()
return
# Run the config file in an empty namespace.
@@ -741,67 +529,3 @@ def embed(
else:
with patch_context:
repl.run()
-
-
-class PagerResult(Enum):
- ABORT = "ABORT"
- NEXT_LINE = "NEXT_LINE"
- NEXT_PAGE = "NEXT_PAGE"
- PRINT_ALL = "PRINT_ALL"
-
-
-def create_pager_prompt(
- style: BaseStyle, title: AnyFormattedText = ""
-) -> PromptSession[PagerResult]:
- """
- Create a "continue" prompt for paginated output.
- """
- bindings = KeyBindings()
-
- @bindings.add("enter")
- @bindings.add("down")
- def next_line(event: KeyPressEvent) -> None:
- event.app.exit(result=PagerResult.NEXT_LINE)
-
- @bindings.add("space")
- def next_page(event: KeyPressEvent) -> None:
- event.app.exit(result=PagerResult.NEXT_PAGE)
-
- @bindings.add("a")
- def print_all(event: KeyPressEvent) -> None:
- event.app.exit(result=PagerResult.PRINT_ALL)
-
- @bindings.add("q")
- @bindings.add("c-c")
- @bindings.add("c-d")
- @bindings.add("escape", eager=True)
- def no(event: KeyPressEvent) -> None:
- event.app.exit(result=PagerResult.ABORT)
-
- @bindings.add("<any>")
- def _(event: KeyPressEvent) -> None:
- "Disallow inserting other text."
- pass
-
- style
-
- session: PromptSession[PagerResult] = PromptSession(
- merge_formatted_text(
- [
- title,
- HTML(
- "<status-toolbar>"
- "<more> -- MORE -- </more> "
- "<key>[Enter]</key> Scroll "
- "<key>[Space]</key> Next page "
- "<key>[a]</key> Print all "
- "<key>[q]</key> Quit "
- "</status-toolbar>: "
- ),
- ]
- ),
- key_bindings=bindings,
- erase_when_done=True,
- style=style,
- )
- return session
diff --git a/ptpython/signatures.py b/ptpython/signatures.py
index 5a6f286..d4cb98c 100644
--- a/ptpython/signatures.py
+++ b/ptpython/signatures.py
@@ -10,7 +10,7 @@ from __future__ import annotations
import inspect
from inspect import Signature as InspectSignature
from inspect import _ParameterKind as ParameterKind
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple
+from typing import TYPE_CHECKING, Any, Sequence
from prompt_toolkit.document import Document
@@ -203,7 +203,6 @@ def get_signatures_using_eval(
running `eval()` over the detected function name.
"""
# Look for open parenthesis, before cursor position.
- text = document.text_before_cursor
pos = document.cursor_position - 1
paren_mapping = {")": "(", "}": "{", "]": "["}
diff --git a/ptpython/style.py b/ptpython/style.py
index 199d5ab..c5a04e5 100644
--- a/ptpython/style.py
+++ b/ptpython/style.py
@@ -1,7 +1,5 @@
from __future__ import annotations
-from typing import Dict
-
from prompt_toolkit.styles import BaseStyle, Style, merge_styles
from prompt_toolkit.styles.pygments import style_from_pygments_cls
from prompt_toolkit.utils import is_conemu_ansi, is_windows, is_windows_vt100_supported
diff --git a/ptpython/utils.py b/ptpython/utils.py
index 5348899..28887d2 100644
--- a/ptpython/utils.py
+++ b/ptpython/utils.py
@@ -4,17 +4,7 @@ For internal use only.
from __future__ import annotations
import re
-from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Iterable,
- Optional,
- Type,
- TypeVar,
- cast,
-)
+from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import to_formatted_text
@@ -91,7 +81,7 @@ def get_jedi_script_from_document(
# Workaround Jedi issue #514: for https://github.com/davidhalter/jedi/issues/514
return None
except KeyError:
- # Workaroud for a crash when the input is "u'", the start of a unicode string.
+ # Workaround for a crash when the input is "u'", the start of a unicode string.
return None
except Exception:
# Workaround for: https://github.com/jonathanslenders/ptpython/issues/91
diff --git a/ptpython/validator.py b/ptpython/validator.py
index 3b36d27..91b9c28 100644
--- a/ptpython/validator.py
+++ b/ptpython/validator.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-from typing import Callable, Optional
+from typing import Callable
from prompt_toolkit.document import Document
from prompt_toolkit.validation import ValidationError, Validator
diff --git a/pyproject.toml b/pyproject.toml
index b356239..5421c45 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,13 +1,35 @@
-[tool.black]
-target-version = ['py36']
-
-
-[tool.isort]
-# isort configuration that is compatible with Black.
-multi_line_output = 3
-include_trailing_comma = true
-known_first_party = "ptpython"
-known_third_party = "prompt_toolkit,pygments,asyncssh"
-force_grid_wrap = 0
-use_parentheses = true
-line_length = 88
+[tool.ruff]
+target-version = "py37"
+select = [
+ "E", # pycodestyle errors
+ "W", # pycodestyle warnings
+ "F", # pyflakes
+ "C", # flake8-comprehensions
+ "T", # Print.
+ "I", # isort
+ # "B", # flake8-bugbear
+ "UP", # pyupgrade
+ "RUF100", # unused-noqa
+ "Q", # quotes
+]
+ignore = [
+ "E501", # Line too long, handled by black
+ "C901", # Too complex
+ "E722", # bare except.
+]
+
+
+[tool.ruff.per-file-ignores]
+"examples/*" = ["T201"] # Print allowed in examples.
+"examples/ptpython_config/config.py" = ["F401"] # Unused imports in config.
+"ptpython/entry_points/run_ptipython.py" = ["T201", "F401"] # Print, import usage.
+"ptpython/entry_points/run_ptpython.py" = ["T201"] # Print usage.
+"ptpython/ipython.py" = ["T100"] # Import usage.
+"ptpython/repl.py" = ["T201"] # Print usage.
+"ptpython/printer.py" = ["T201"] # Print usage.
+"tests/run_tests.py" = ["F401"] # Unused imports.
+
+
+[tool.ruff.isort]
+known-first-party = ["ptpython"]
+known-third-party = ["prompt_toolkit", "pygments", "asyncssh"]
diff --git a/setup.py b/setup.py
index 18d2911..bc1241b 100644
--- a/setup.py
+++ b/setup.py
@@ -11,7 +11,7 @@ with open(os.path.join(os.path.dirname(__file__), "README.rst")) as f:
setup(
name="ptpython",
author="Jonathan Slenders",
- version="3.0.23",
+ version="3.0.25",
url="https://github.com/prompt-toolkit/ptpython",
description="Python REPL build on top of prompt_toolkit",
long_description=long_description,
@@ -21,12 +21,13 @@ setup(
"appdirs",
"importlib_metadata;python_version<'3.8'",
"jedi>=0.16.0",
- # Use prompt_toolkit 3.0.28, because of cursor shape support.
- "prompt_toolkit>=3.0.28,<3.1.0",
+ # Use prompt_toolkit 3.0.34, because of `OneStyleAndTextTuple` import.
+ "prompt_toolkit>=3.0.34,<3.1.0",
"pygments",
],
python_requires=">=3.7",
classifiers=[
+ "License :: OSI Approved :: BSD License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.7",
@@ -39,12 +40,14 @@ setup(
"ptpython = ptpython.entry_points.run_ptpython:run",
"ptipython = ptpython.entry_points.run_ptipython:run",
"ptpython%s = ptpython.entry_points.run_ptpython:run" % sys.version_info[0],
- "ptpython%s.%s = ptpython.entry_points.run_ptpython:run"
- % sys.version_info[:2],
+ "ptpython{}.{} = ptpython.entry_points.run_ptpython:run".format(
+ *sys.version_info[:2]
+ ),
"ptipython%s = ptpython.entry_points.run_ptipython:run"
% sys.version_info[0],
- "ptipython%s.%s = ptpython.entry_points.run_ptipython:run"
- % sys.version_info[:2],
+ "ptipython{}.{} = ptpython.entry_points.run_ptipython:run".format(
+ *sys.version_info[:2]
+ ),
]
},
extras_require={