summaryrefslogtreecommitdiffstats
path: root/ptpython/python_input.py
diff options
context:
space:
mode:
Diffstat (limited to 'ptpython/python_input.py')
-rw-r--r--ptpython/python_input.py158
1 files changed, 84 insertions, 74 deletions
diff --git a/ptpython/python_input.py b/ptpython/python_input.py
index e63cdf1..1785f52 100644
--- a/ptpython/python_input.py
+++ b/ptpython/python_input.py
@@ -4,7 +4,6 @@ This can be used for creation of Python REPLs.
"""
import __future__
-import threading
from asyncio import get_event_loop
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, TypeVar
@@ -174,6 +173,11 @@ class PythonInput:
python_input = PythonInput(...)
python_code = python_input.app.run()
+
+ :param create_app: When `False`, don't create and manage a prompt_toolkit
+ application. The default is `True` and should only be set
+ to false if PythonInput is being embedded in a separate
+ prompt_toolkit application.
"""
def __init__(
@@ -188,6 +192,7 @@ class PythonInput:
output: Optional[Output] = None,
# For internal use.
extra_key_bindings: Optional[KeyBindings] = None,
+ create_app=True,
_completer: Optional[Completer] = None,
_validator: Optional[Validator] = None,
_lexer: Optional[Lexer] = None,
@@ -380,10 +385,16 @@ class PythonInput:
extra_toolbars=self._extra_toolbars,
)
- self.app = self._create_application(input, output)
-
- if vi_mode:
- self.app.editing_mode = EditingMode.VI
+ # Create an app if requested. If not, the global get_app() is returned
+ # for self.app via property getter.
+ if create_app:
+ self._app: Optional[Application] = self._create_application(input, output)
+ # Setting vi_mode will not work unless the prompt_toolkit
+ # application has been created.
+ if vi_mode:
+ self.app.editing_mode = EditingMode.VI
+ else:
+ self._app = None
def _accept_handler(self, buff: Buffer) -> bool:
app = get_app()
@@ -393,12 +404,12 @@ class PythonInput:
@property
def option_count(self) -> int:
- " Return the total amount of options. (In all categories together.) "
+ "Return the total amount of options. (In all categories together.)"
return sum(len(category.options) for category in self.options)
@property
def selected_option(self) -> Option:
- " Return the currently selected option. "
+ "Return the currently selected option."
i = 0
for category in self.options:
for o in category.options:
@@ -521,7 +532,7 @@ class PythonInput:
def simple_option(
title: str, description: str, field_name: str, values: Optional[List] = None
) -> Option:
- " Create Simple on/of option. "
+ "Create Simple on/of option."
values = values or ["off", "on"]
def get_current_value():
@@ -914,23 +925,19 @@ class PythonInput:
else:
self.editing_mode = EditingMode.EMACS
- def _on_input_timeout(self, buff: Buffer, loop=None) -> None:
+ @property
+ def app(self) -> Application:
+ if self._app is None:
+ return get_app()
+ return self._app
+
+ def _on_input_timeout(self, buff: Buffer) -> None:
"""
When there is no input activity,
in another thread, get the signature of the current code.
"""
- app = self.app
-
- # Never run multiple get-signature threads.
- if self._get_signatures_thread_running:
- return
- self._get_signatures_thread_running = True
- document = buff.document
-
- loop = loop or get_event_loop()
-
- def run():
+ def get_signatures_in_executor(document: Document) -> List[Signature]:
# First, get signatures from Jedi. If we didn't found any and if
# "dictionary completion" (eval-based completion) is enabled, then
# get signatures using eval.
@@ -942,26 +949,47 @@ class PythonInput:
document, self.get_locals(), self.get_globals()
)
- self._get_signatures_thread_running = False
+ return signatures
+
+ app = self.app
- # Set signatures and redraw if the text didn't change in the
- # meantime. Otherwise request new signatures.
- if buff.text == document.text:
- self.signatures = signatures
+ async def on_timeout_task() -> None:
+ loop = get_event_loop()
- # Set docstring in docstring buffer.
- if signatures:
- self.docstring_buffer.reset(
- document=Document(signatures[0].docstring, cursor_position=0)
+ # Never run multiple get-signature threads.
+ if self._get_signatures_thread_running:
+ return
+ self._get_signatures_thread_running = True
+
+ try:
+ while True:
+ document = buff.document
+ signatures = await loop.run_in_executor(
+ None, get_signatures_in_executor, document
)
- else:
- self.docstring_buffer.reset()
- app.invalidate()
+ # If the text didn't change in the meantime, take these
+ # signatures. Otherwise, try again.
+ if buff.text == document.text:
+ break
+ finally:
+ self._get_signatures_thread_running = False
+
+ # Set signatures and redraw.
+ self.signatures = signatures
+
+ # Set docstring in docstring buffer.
+ if signatures:
+ self.docstring_buffer.reset(
+ document=Document(signatures[0].docstring, cursor_position=0)
+ )
else:
- self._on_input_timeout(buff, loop=loop)
+ self.docstring_buffer.reset()
+
+ app.invalidate()
- loop.run_in_executor(None, run)
+ if app.is_running:
+ app.create_background_task(on_timeout_task())
def on_reset(self) -> None:
self.signatures = []
@@ -970,7 +998,7 @@ class PythonInput:
"""
Display the history.
"""
- app = get_app()
+ app = self.app
app.vi_state.input_mode = InputMode.NAVIGATION
history = PythonHistory(self, self.default_buffer.document)
@@ -1012,43 +1040,25 @@ class PythonInput:
self.app.vi_state.input_mode = InputMode.NAVIGATION
# Run the UI.
- result: str = ""
- exception: Optional[BaseException] = None
-
- def in_thread() -> None:
- nonlocal result, exception
+ while True:
try:
- while True:
- try:
- result = self.app.run(pre_run=pre_run)
-
- if result.lstrip().startswith("\x1a"):
- # When the input starts with Ctrl-Z, quit the REPL.
- # (Important for Windows users.)
- raise EOFError
-
- # Remove leading whitespace.
- # (Users can add extra indentation, which happens for
- # instance because of copy/pasting code.)
- result = unindent_code(result)
-
- if result and not result.isspace():
- return
- except KeyboardInterrupt:
- # Abort - try again.
- self.default_buffer.document = Document()
- except BaseException as e:
- exception = e
- return
-
- finally:
- if self.insert_blank_line_after_input:
- self.app.output.write("\n")
-
- thread = threading.Thread(target=in_thread)
- thread.start()
- thread.join()
-
- if exception is not None:
- raise exception
- return result
+ result = self.app.run(pre_run=pre_run, in_thread=True)
+
+ if result.lstrip().startswith("\x1a"):
+ # When the input starts with Ctrl-Z, quit the REPL.
+ # (Important for Windows users.)
+ raise EOFError
+
+ # Remove leading whitespace.
+ # (Users can add extra indentation, which happens for
+ # instance because of copy/pasting code.)
+ result = unindent_code(result)
+
+ if result and not result.isspace():
+ if self.insert_blank_line_after_input:
+ self.app.output.write("\n")
+
+ return result
+ except KeyboardInterrupt:
+ # Abort - try again.
+ self.default_buffer.document = Document()