summaryrefslogtreecommitdiffstats
path: root/src/prompt_toolkit/input/posix_pipe.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/prompt_toolkit/input/posix_pipe.py')
-rw-r--r--src/prompt_toolkit/input/posix_pipe.py118
1 files changed, 118 insertions, 0 deletions
diff --git a/src/prompt_toolkit/input/posix_pipe.py b/src/prompt_toolkit/input/posix_pipe.py
new file mode 100644
index 0000000..c131fb8
--- /dev/null
+++ b/src/prompt_toolkit/input/posix_pipe.py
@@ -0,0 +1,118 @@
+from __future__ import annotations
+
+import sys
+
+assert sys.platform != "win32"
+
+import os
+from contextlib import contextmanager
+from typing import ContextManager, Iterator, TextIO, cast
+
+from ..utils import DummyContext
+from .base import PipeInput
+from .vt100 import Vt100Input
+
+__all__ = [
+ "PosixPipeInput",
+]
+
+
+class _Pipe:
+ "Wrapper around os.pipe, that ensures we don't double close any end."
+
+ def __init__(self) -> None:
+ self.read_fd, self.write_fd = os.pipe()
+ self._read_closed = False
+ self._write_closed = False
+
+ def close_read(self) -> None:
+ "Close read-end if not yet closed."
+ if self._read_closed:
+ return
+
+ os.close(self.read_fd)
+ self._read_closed = True
+
+ def close_write(self) -> None:
+ "Close write-end if not yet closed."
+ if self._write_closed:
+ return
+
+ os.close(self.write_fd)
+ self._write_closed = True
+
+ def close(self) -> None:
+ "Close both read and write ends."
+ self.close_read()
+ self.close_write()
+
+
+class PosixPipeInput(Vt100Input, PipeInput):
+ """
+ Input that is send through a pipe.
+ This is useful if we want to send the input programmatically into the
+ application. Mostly useful for unit testing.
+
+ Usage::
+
+ with PosixPipeInput.create() as input:
+ input.send_text('inputdata')
+ """
+
+ _id = 0
+
+ def __init__(self, _pipe: _Pipe, _text: str = "") -> None:
+ # Private constructor. Users should use the public `.create()` method.
+ self.pipe = _pipe
+
+ class Stdin:
+ encoding = "utf-8"
+
+ def isatty(stdin) -> bool:
+ return True
+
+ def fileno(stdin) -> int:
+ return self.pipe.read_fd
+
+ super().__init__(cast(TextIO, Stdin()))
+ self.send_text(_text)
+
+ # Identifier for every PipeInput for the hash.
+ self.__class__._id += 1
+ self._id = self.__class__._id
+
+ @classmethod
+ @contextmanager
+ def create(cls, text: str = "") -> Iterator[PosixPipeInput]:
+ pipe = _Pipe()
+ try:
+ yield PosixPipeInput(_pipe=pipe, _text=text)
+ finally:
+ pipe.close()
+
+ def send_bytes(self, data: bytes) -> None:
+ os.write(self.pipe.write_fd, data)
+
+ def send_text(self, data: str) -> None:
+ "Send text to the input."
+ os.write(self.pipe.write_fd, data.encode("utf-8"))
+
+ def raw_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def close(self) -> None:
+ "Close pipe fds."
+ # Only close the write-end of the pipe. This will unblock the reader
+ # callback (in vt100.py > _attached_input), which eventually will raise
+ # `EOFError`. If we'd also close the read-end, then the event loop
+ # won't wake up the corresponding callback because of this.
+ self.pipe.close_write()
+
+ def typeahead_hash(self) -> str:
+ """
+ This needs to be unique for every `PipeInput`.
+ """
+ return f"pipe-input-{self._id}"