summaryrefslogtreecommitdiffstats
path: root/ptpython/repl.py
diff options
context:
space:
mode:
Diffstat (limited to 'ptpython/repl.py')
-rw-r--r--ptpython/repl.py492
1 files changed, 108 insertions, 384 deletions
diff --git a/ptpython/repl.py b/ptpython/repl.py
index 02a5075..fc9b9da 100644
--- a/ptpython/repl.py
+++ b/ptpython/repl.py
@@ -12,38 +12,24 @@ from __future__ import annotations
import asyncio
import builtins
import os
+import signal
import sys
import traceback
import types
import warnings
from dis import COMPILER_FLAG_NAMES
-from enum import Enum
-from typing import Any, Callable, ContextManager, Dict, Optional
-
-from prompt_toolkit.formatted_text import (
- HTML,
- AnyFormattedText,
- FormattedText,
- PygmentsTokens,
- StyleAndTextTuples,
- fragment_list_width,
- merge_formatted_text,
- to_formatted_text,
-)
-from prompt_toolkit.formatted_text.utils import fragment_list_to_text, split_lines
-from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
+from typing import Any, Callable, ContextManager, Iterable
+
+from prompt_toolkit.formatted_text import OneStyleAndTextTuple
from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context
from prompt_toolkit.shortcuts import (
- PromptSession,
clear_title,
- print_formatted_text,
set_title,
)
-from prompt_toolkit.styles import BaseStyle
-from prompt_toolkit.utils import DummyContext, get_cwidth
-from pygments.lexers import PythonLexer, PythonTracebackLexer
-from pygments.token import Token
+from prompt_toolkit.utils import DummyContext
+from pygments.lexers import PythonTracebackLexer # noqa: F401
+from .printer import OutputPrinter
from .python_input import PythonInput
PyCF_ALLOW_TOP_LEVEL_AWAIT: int
@@ -108,7 +94,9 @@ class PythonRepl(PythonInput):
else:
# Print.
if result is not None:
- self.show_result(result)
+ self._show_result(result)
+ if self.insert_blank_line_after_output:
+ self.app.output.write("\n")
# Loop.
self.current_statement_index += 1
@@ -123,6 +111,24 @@ class PythonRepl(PythonInput):
# any case.)
self._handle_keyboard_interrupt(e)
+ def _get_output_printer(self) -> OutputPrinter:
+ return OutputPrinter(
+ output=self.app.output,
+ input=self.app.input,
+ style=self._current_style,
+ style_transformation=self.style_transformation,
+ title=self.title,
+ )
+
+ def _show_result(self, result: object) -> None:
+ self._get_output_printer().display_result(
+ result=result,
+ out_prompt=self.get_output_prompt(),
+ reformat=self.enable_output_formatting,
+ highlight=self.enable_syntax_highlighting,
+ paginate=self.enable_pager,
+ )
+
def run(self) -> None:
"""
Run the REPL loop.
@@ -153,27 +159,58 @@ class PythonRepl(PythonInput):
clear_title()
self._remove_from_namespace()
- async def run_and_show_expression_async(self, text: str):
- loop = asyncio.get_event_loop()
+ async def run_and_show_expression_async(self, text: str) -> Any:
+ loop = asyncio.get_running_loop()
+ system_exit: SystemExit | None = None
try:
- result = await self.eval_async(text)
- except KeyboardInterrupt: # KeyboardInterrupt doesn't inherit from Exception.
- raise
- except SystemExit:
- return
- except BaseException as e:
- self._handle_exception(e)
- else:
- # Print.
- if result is not None:
- await loop.run_in_executor(None, lambda: self.show_result(result))
+ try:
+ # Create `eval` task. Ensure that control-c will cancel this
+ # task.
+ async def eval() -> Any:
+ nonlocal system_exit
+ try:
+ return await self.eval_async(text)
+ except SystemExit as e:
+ # Don't propagate SystemExit in `create_task()`. That
+ # will kill the event loop. We want to handle it
+ # gracefully.
+ system_exit = e
+
+ task = asyncio.create_task(eval())
+ loop.add_signal_handler(signal.SIGINT, lambda *_: task.cancel())
+ result = await task
+
+ if system_exit is not None:
+ raise system_exit
+ except KeyboardInterrupt:
+ # KeyboardInterrupt doesn't inherit from Exception.
+ raise
+ except SystemExit:
+ raise
+ except BaseException as e:
+ self._handle_exception(e)
+ else:
+ # Print.
+ if result is not None:
+ await loop.run_in_executor(None, lambda: self._show_result(result))
- # Loop.
- self.current_statement_index += 1
- self.signatures = []
- # Return the result for future consumers.
- return result
+ # Loop.
+ self.current_statement_index += 1
+ self.signatures = []
+ # Return the result for future consumers.
+ return result
+ finally:
+ loop.remove_signal_handler(signal.SIGINT)
+
+ except KeyboardInterrupt as e:
+ # Handle all possible `KeyboardInterrupt` errors. This can
+ # happen during the `eval`, but also during the
+ # `show_result` if something takes too long.
+ # (Try/catch is around the whole block, because we want to
+ # prevent that a Control-C keypress terminates the REPL in
+ # any case.)
+ self._handle_keyboard_interrupt(e)
async def run_async(self) -> None:
"""
@@ -187,7 +224,7 @@ class PythonRepl(PythonInput):
(Both for control-C to work, as well as for the code to see the right
thread in which it was embedded).
"""
- loop = asyncio.get_event_loop()
+ loop = asyncio.get_running_loop()
if self.terminal_title:
set_title(self.terminal_title)
@@ -217,6 +254,8 @@ class PythonRepl(PythonInput):
# `KeyboardInterrupt` exceptions can end up in the event
# loop selector.
self._handle_keyboard_interrupt(e)
+ except SystemExit:
+ return
finally:
if self.terminal_title:
clear_title()
@@ -245,7 +284,7 @@ class PythonRepl(PythonInput):
result = eval(code, self.get_globals(), self.get_locals())
if _has_coroutine_flag(code):
- result = asyncio.get_event_loop().run_until_complete(result)
+ result = asyncio.get_running_loop().run_until_complete(result)
self._store_eval_result(result)
return result
@@ -258,7 +297,7 @@ class PythonRepl(PythonInput):
result = eval(code, self.get_globals(), self.get_locals())
if _has_coroutine_flag(code):
- result = asyncio.get_event_loop().run_until_complete(result)
+ result = asyncio.get_running_loop().run_until_complete(result)
return None
@@ -318,264 +357,12 @@ class PythonRepl(PythonInput):
dont_inherit=True,
)
- def _format_result_output(self, result: object) -> StyleAndTextTuples:
- """
- Format __repr__ for an `eval` result.
-
- Note: this can raise `KeyboardInterrupt` if either calling `__repr__`,
- `__pt_repr__` or formatting the output with "Black" takes to long
- and the user presses Control-C.
- """
- out_prompt = to_formatted_text(self.get_output_prompt())
-
- # If the repr is valid Python code, use the Pygments lexer.
- try:
- result_repr = repr(result)
- except KeyboardInterrupt:
- raise # Don't catch here.
- except BaseException as e:
- # Calling repr failed.
- self._handle_exception(e)
- return []
-
- try:
- compile(result_repr, "", "eval")
- except SyntaxError:
- formatted_result_repr = to_formatted_text(result_repr)
- else:
- # Syntactically correct. Format with black and syntax highlight.
- if self.enable_output_formatting:
- # Inline import. Slightly speed up start-up time if black is
- # not used.
- try:
- import black
-
- if not hasattr(black, "Mode"):
- raise ImportError
- except ImportError:
- pass # no Black package in your installation
- else:
- result_repr = black.format_str(
- result_repr,
- mode=black.Mode(line_length=self.app.output.get_size().columns),
- )
-
- formatted_result_repr = to_formatted_text(
- PygmentsTokens(list(_lex_python_result(result_repr)))
- )
-
- # If __pt_repr__ is present, take this. This can return prompt_toolkit
- # formatted text.
- try:
- if hasattr(result, "__pt_repr__"):
- formatted_result_repr = to_formatted_text(
- getattr(result, "__pt_repr__")()
- )
- if isinstance(formatted_result_repr, list):
- formatted_result_repr = FormattedText(formatted_result_repr)
- except KeyboardInterrupt:
- raise # Don't catch here.
- except:
- # For bad code, `__getattr__` can raise something that's not an
- # `AttributeError`. This happens already when calling `hasattr()`.
- pass
-
- # Align every line to the prompt.
- line_sep = "\n" + " " * fragment_list_width(out_prompt)
- indented_repr: StyleAndTextTuples = []
-
- lines = list(split_lines(formatted_result_repr))
-
- for i, fragment in enumerate(lines):
- indented_repr.extend(fragment)
-
- # Add indentation separator between lines, not after the last line.
- if i != len(lines) - 1:
- indented_repr.append(("", line_sep))
-
- # Write output tokens.
- if self.enable_syntax_highlighting:
- formatted_output = merge_formatted_text([out_prompt, indented_repr])
- else:
- formatted_output = FormattedText(
- out_prompt + [("", fragment_list_to_text(formatted_result_repr))]
- )
-
- return to_formatted_text(formatted_output)
-
- def show_result(self, result: object) -> None:
- """
- Show __repr__ for an `eval` result and print to output.
- """
- formatted_text_output = self._format_result_output(result)
-
- if self.enable_pager:
- self.print_paginated_formatted_text(formatted_text_output)
- else:
- self.print_formatted_text(formatted_text_output)
-
- self.app.output.flush()
-
- if self.insert_blank_line_after_output:
- self.app.output.write("\n")
-
- def print_formatted_text(
- self, formatted_text: StyleAndTextTuples, end: str = "\n"
- ) -> None:
- print_formatted_text(
- FormattedText(formatted_text),
- style=self._current_style,
- style_transformation=self.style_transformation,
- include_default_pygments_style=False,
- output=self.app.output,
- end=end,
- )
-
- def print_paginated_formatted_text(
- self,
- formatted_text: StyleAndTextTuples,
- end: str = "\n",
- ) -> None:
- """
- Print formatted text, using --MORE-- style pagination.
- (Avoid filling up the terminal's scrollback buffer.)
- """
- pager_prompt = self.create_pager_prompt()
- size = self.app.output.get_size()
-
- abort = False
- print_all = False
-
- # Max number of lines allowed in the buffer before painting.
- max_rows = size.rows - 1
-
- # Page buffer.
- rows_in_buffer = 0
- columns_in_buffer = 0
- page: StyleAndTextTuples = []
-
- def flush_page() -> None:
- nonlocal page, columns_in_buffer, rows_in_buffer
- self.print_formatted_text(page, end="")
- page = []
- columns_in_buffer = 0
- rows_in_buffer = 0
-
- def show_pager() -> None:
- nonlocal abort, max_rows, print_all
-
- # Run pager prompt in another thread.
- # Same as for the input. This prevents issues with nested event
- # loops.
- pager_result = pager_prompt.prompt(in_thread=True)
-
- if pager_result == PagerResult.ABORT:
- print("...")
- abort = True
-
- elif pager_result == PagerResult.NEXT_LINE:
- max_rows = 1
-
- elif pager_result == PagerResult.NEXT_PAGE:
- max_rows = size.rows - 1
-
- elif pager_result == PagerResult.PRINT_ALL:
- print_all = True
-
- # Loop over lines. Show --MORE-- prompt when page is filled.
-
- formatted_text = formatted_text + [("", end)]
- lines = list(split_lines(formatted_text))
-
- for lineno, line in enumerate(lines):
- for style, text, *_ in line:
- for c in text:
- width = get_cwidth(c)
-
- # (Soft) wrap line if it doesn't fit.
- if columns_in_buffer + width > size.columns:
- # Show pager first if we get too many lines after
- # wrapping.
- if rows_in_buffer + 1 >= max_rows and not print_all:
- page.append(("", "\n"))
- flush_page()
- show_pager()
- if abort:
- return
-
- rows_in_buffer += 1
- columns_in_buffer = 0
-
- columns_in_buffer += width
- page.append((style, c))
-
- if rows_in_buffer + 1 >= max_rows and not print_all:
- page.append(("", "\n"))
- flush_page()
- show_pager()
- if abort:
- return
- else:
- # Add line ending between lines (if `end="\n"` was given, one
- # more empty line is added in `split_lines` automatically to
- # take care of the final line ending).
- if lineno != len(lines) - 1:
- page.append(("", "\n"))
- rows_in_buffer += 1
- columns_in_buffer = 0
-
- flush_page()
-
- def create_pager_prompt(self) -> PromptSession[PagerResult]:
- """
- Create pager --MORE-- prompt.
- """
- return create_pager_prompt(self._current_style, self.title)
-
- def _format_exception_output(self, e: BaseException) -> PygmentsTokens:
- # Instead of just calling ``traceback.format_exc``, we take the
- # traceback and skip the bottom calls of this framework.
- t, v, tb = sys.exc_info()
-
- # Required for pdb.post_mortem() to work.
- sys.last_type, sys.last_value, sys.last_traceback = t, v, tb
-
- tblist = list(traceback.extract_tb(tb))
-
- for line_nr, tb_tuple in enumerate(tblist):
- if tb_tuple[0] == "<stdin>":
- tblist = tblist[line_nr:]
- break
-
- l = traceback.format_list(tblist)
- if l:
- l.insert(0, "Traceback (most recent call last):\n")
- l.extend(traceback.format_exception_only(t, v))
-
- tb_str = "".join(l)
-
- # Format exception and write to output.
- # (We use the default style. Most other styles result
- # in unreadable colors for the traceback.)
- if self.enable_syntax_highlighting:
- tokens = list(_lex_python_traceback(tb_str))
- else:
- tokens = [(Token, tb_str)]
- return PygmentsTokens(tokens)
-
def _handle_exception(self, e: BaseException) -> None:
- output = self.app.output
-
- tokens = self._format_exception_output(e)
-
- print_formatted_text(
- tokens,
- style=self._current_style,
- style_transformation=self.style_transformation,
- include_default_pygments_style=False,
- output=output,
+ self._get_output_printer().display_exception(
+ e,
+ highlight=self.enable_syntax_highlighting,
+ paginate=self.enable_pager,
)
- output.flush()
def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None:
output = self.app.output
@@ -602,21 +389,16 @@ class PythonRepl(PythonInput):
globals = self.get_globals()
del globals["get_ptpython"]
-
-def _lex_python_traceback(tb):
- "Return token list for traceback string."
- lexer = PythonTracebackLexer()
- return lexer.get_tokens(tb)
-
-
-def _lex_python_result(tb):
- "Return token list for Python string."
- lexer = PythonLexer()
- # Use `get_tokens_unprocessed`, so that we get exactly the same string,
- # without line endings appended. `print_formatted_text` already appends a
- # line ending, and otherwise we'll have two line endings.
- tokens = lexer.get_tokens_unprocessed(tb)
- return [(tokentype, value) for index, tokentype, value in tokens]
+ def print_paginated_formatted_text(
+ self,
+ formatted_text: Iterable[OneStyleAndTextTuple],
+ end: str = "\n",
+ ) -> None:
+ # Warning: This is mainly here backwards-compatibility. Some projects
+ # call `print_paginated_formatted_text` on the Repl object.
+ self._get_output_printer().display_style_and_text_tuples(
+ formatted_text, paginate=True
+ )
def enable_deprecation_warnings() -> None:
@@ -630,25 +412,31 @@ def enable_deprecation_warnings() -> None:
warnings.filterwarnings("default", category=DeprecationWarning, module="__main__")
-def run_config(
- repl: PythonInput, config_file: str = "~/.config/ptpython/config.py"
-) -> None:
+DEFAULT_CONFIG_FILE = "~/.config/ptpython/config.py"
+
+
+def run_config(repl: PythonInput, config_file: str | None = None) -> None:
"""
Execute REPL config file.
:param repl: `PythonInput` instance.
:param config_file: Path of the configuration file.
"""
+ explicit_config_file = config_file is not None
+
# Expand tildes.
- config_file = os.path.expanduser(config_file)
+ config_file = os.path.expanduser(
+ config_file if config_file is not None else DEFAULT_CONFIG_FILE
+ )
def enter_to_continue() -> None:
input("\nPress ENTER to continue...")
# Check whether this file exists.
if not os.path.exists(config_file):
- print("Impossible to read %r" % config_file)
- enter_to_continue()
+ if explicit_config_file:
+ print(f"Impossible to read {config_file}")
+ enter_to_continue()
return
# Run the config file in an empty namespace.
@@ -741,67 +529,3 @@ def embed(
else:
with patch_context:
repl.run()
-
-
-class PagerResult(Enum):
- ABORT = "ABORT"
- NEXT_LINE = "NEXT_LINE"
- NEXT_PAGE = "NEXT_PAGE"
- PRINT_ALL = "PRINT_ALL"
-
-
-def create_pager_prompt(
- style: BaseStyle, title: AnyFormattedText = ""
-) -> PromptSession[PagerResult]:
- """
- Create a "continue" prompt for paginated output.
- """
- bindings = KeyBindings()
-
- @bindings.add("enter")
- @bindings.add("down")
- def next_line(event: KeyPressEvent) -> None:
- event.app.exit(result=PagerResult.NEXT_LINE)
-
- @bindings.add("space")
- def next_page(event: KeyPressEvent) -> None:
- event.app.exit(result=PagerResult.NEXT_PAGE)
-
- @bindings.add("a")
- def print_all(event: KeyPressEvent) -> None:
- event.app.exit(result=PagerResult.PRINT_ALL)
-
- @bindings.add("q")
- @bindings.add("c-c")
- @bindings.add("c-d")
- @bindings.add("escape", eager=True)
- def no(event: KeyPressEvent) -> None:
- event.app.exit(result=PagerResult.ABORT)
-
- @bindings.add("<any>")
- def _(event: KeyPressEvent) -> None:
- "Disallow inserting other text."
- pass
-
- style
-
- session: PromptSession[PagerResult] = PromptSession(
- merge_formatted_text(
- [
- title,
- HTML(
- "<status-toolbar>"
- "<more> -- MORE -- </more> "
- "<key>[Enter]</key> Scroll "
- "<key>[Space]</key> Next page "
- "<key>[a]</key> Print all "
- "<key>[q]</key> Quit "
- "</status-toolbar>: "
- ),
- ]
- ),
- key_bindings=bindings,
- erase_when_done=True,
- style=style,
- )
- return session