diff options
Diffstat (limited to 'ptpython/python_input.py')
-rw-r--r-- | ptpython/python_input.py | 158 |
1 files changed, 84 insertions, 74 deletions
diff --git a/ptpython/python_input.py b/ptpython/python_input.py index e63cdf1..1785f52 100644 --- a/ptpython/python_input.py +++ b/ptpython/python_input.py @@ -4,7 +4,6 @@ This can be used for creation of Python REPLs. """ import __future__ -import threading from asyncio import get_event_loop from functools import partial from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, TypeVar @@ -174,6 +173,11 @@ class PythonInput: python_input = PythonInput(...) python_code = python_input.app.run() + + :param create_app: When `False`, don't create and manage a prompt_toolkit + application. The default is `True` and should only be set + to false if PythonInput is being embedded in a separate + prompt_toolkit application. """ def __init__( @@ -188,6 +192,7 @@ class PythonInput: output: Optional[Output] = None, # For internal use. extra_key_bindings: Optional[KeyBindings] = None, + create_app=True, _completer: Optional[Completer] = None, _validator: Optional[Validator] = None, _lexer: Optional[Lexer] = None, @@ -380,10 +385,16 @@ class PythonInput: extra_toolbars=self._extra_toolbars, ) - self.app = self._create_application(input, output) - - if vi_mode: - self.app.editing_mode = EditingMode.VI + # Create an app if requested. If not, the global get_app() is returned + # for self.app via property getter. + if create_app: + self._app: Optional[Application] = self._create_application(input, output) + # Setting vi_mode will not work unless the prompt_toolkit + # application has been created. + if vi_mode: + self.app.editing_mode = EditingMode.VI + else: + self._app = None def _accept_handler(self, buff: Buffer) -> bool: app = get_app() @@ -393,12 +404,12 @@ class PythonInput: @property def option_count(self) -> int: - " Return the total amount of options. (In all categories together.) " + "Return the total amount of options. (In all categories together.)" return sum(len(category.options) for category in self.options) @property def selected_option(self) -> Option: - " Return the currently selected option. " + "Return the currently selected option." i = 0 for category in self.options: for o in category.options: @@ -521,7 +532,7 @@ class PythonInput: def simple_option( title: str, description: str, field_name: str, values: Optional[List] = None ) -> Option: - " Create Simple on/of option. " + "Create Simple on/of option." values = values or ["off", "on"] def get_current_value(): @@ -914,23 +925,19 @@ class PythonInput: else: self.editing_mode = EditingMode.EMACS - def _on_input_timeout(self, buff: Buffer, loop=None) -> None: + @property + def app(self) -> Application: + if self._app is None: + return get_app() + return self._app + + def _on_input_timeout(self, buff: Buffer) -> None: """ When there is no input activity, in another thread, get the signature of the current code. """ - app = self.app - - # Never run multiple get-signature threads. - if self._get_signatures_thread_running: - return - self._get_signatures_thread_running = True - document = buff.document - - loop = loop or get_event_loop() - - def run(): + def get_signatures_in_executor(document: Document) -> List[Signature]: # First, get signatures from Jedi. If we didn't found any and if # "dictionary completion" (eval-based completion) is enabled, then # get signatures using eval. @@ -942,26 +949,47 @@ class PythonInput: document, self.get_locals(), self.get_globals() ) - self._get_signatures_thread_running = False + return signatures + + app = self.app - # Set signatures and redraw if the text didn't change in the - # meantime. Otherwise request new signatures. - if buff.text == document.text: - self.signatures = signatures + async def on_timeout_task() -> None: + loop = get_event_loop() - # Set docstring in docstring buffer. - if signatures: - self.docstring_buffer.reset( - document=Document(signatures[0].docstring, cursor_position=0) + # Never run multiple get-signature threads. + if self._get_signatures_thread_running: + return + self._get_signatures_thread_running = True + + try: + while True: + document = buff.document + signatures = await loop.run_in_executor( + None, get_signatures_in_executor, document ) - else: - self.docstring_buffer.reset() - app.invalidate() + # If the text didn't change in the meantime, take these + # signatures. Otherwise, try again. + if buff.text == document.text: + break + finally: + self._get_signatures_thread_running = False + + # Set signatures and redraw. + self.signatures = signatures + + # Set docstring in docstring buffer. + if signatures: + self.docstring_buffer.reset( + document=Document(signatures[0].docstring, cursor_position=0) + ) else: - self._on_input_timeout(buff, loop=loop) + self.docstring_buffer.reset() + + app.invalidate() - loop.run_in_executor(None, run) + if app.is_running: + app.create_background_task(on_timeout_task()) def on_reset(self) -> None: self.signatures = [] @@ -970,7 +998,7 @@ class PythonInput: """ Display the history. """ - app = get_app() + app = self.app app.vi_state.input_mode = InputMode.NAVIGATION history = PythonHistory(self, self.default_buffer.document) @@ -1012,43 +1040,25 @@ class PythonInput: self.app.vi_state.input_mode = InputMode.NAVIGATION # Run the UI. - result: str = "" - exception: Optional[BaseException] = None - - def in_thread() -> None: - nonlocal result, exception + while True: try: - while True: - try: - result = self.app.run(pre_run=pre_run) - - if result.lstrip().startswith("\x1a"): - # When the input starts with Ctrl-Z, quit the REPL. - # (Important for Windows users.) - raise EOFError - - # Remove leading whitespace. - # (Users can add extra indentation, which happens for - # instance because of copy/pasting code.) - result = unindent_code(result) - - if result and not result.isspace(): - return - except KeyboardInterrupt: - # Abort - try again. - self.default_buffer.document = Document() - except BaseException as e: - exception = e - return - - finally: - if self.insert_blank_line_after_input: - self.app.output.write("\n") - - thread = threading.Thread(target=in_thread) - thread.start() - thread.join() - - if exception is not None: - raise exception - return result + result = self.app.run(pre_run=pre_run, in_thread=True) + + if result.lstrip().startswith("\x1a"): + # When the input starts with Ctrl-Z, quit the REPL. + # (Important for Windows users.) + raise EOFError + + # Remove leading whitespace. + # (Users can add extra indentation, which happens for + # instance because of copy/pasting code.) + result = unindent_code(result) + + if result and not result.isspace(): + if self.insert_blank_line_after_input: + self.app.output.write("\n") + + return result + except KeyboardInterrupt: + # Abort - try again. + self.default_buffer.document = Document() |