summaryrefslogtreecommitdiffstats
path: root/src/prompt_toolkit/input/posix_pipe.py
blob: c131fb816e3461088643b4d79797e75c96129b65 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from __future__ import annotations

import sys

assert sys.platform != "win32"

import os
from contextlib import contextmanager
from typing import ContextManager, Iterator, TextIO, cast

from ..utils import DummyContext
from .base import PipeInput
from .vt100 import Vt100Input

__all__ = [
    "PosixPipeInput",
]


class _Pipe:
    "Wrapper around os.pipe, that ensures we don't double close any end."

    def __init__(self) -> None:
        self.read_fd, self.write_fd = os.pipe()
        self._read_closed = False
        self._write_closed = False

    def close_read(self) -> None:
        "Close read-end if not yet closed."
        if self._read_closed:
            return

        os.close(self.read_fd)
        self._read_closed = True

    def close_write(self) -> None:
        "Close write-end if not yet closed."
        if self._write_closed:
            return

        os.close(self.write_fd)
        self._write_closed = True

    def close(self) -> None:
        "Close both read and write ends."
        self.close_read()
        self.close_write()


class PosixPipeInput(Vt100Input, PipeInput):
    """
    Input that is send through a pipe.
    This is useful if we want to send the input programmatically into the
    application. Mostly useful for unit testing.

    Usage::

        with PosixPipeInput.create() as input:
            input.send_text('inputdata')
    """

    _id = 0

    def __init__(self, _pipe: _Pipe, _text: str = "") -> None:
        # Private constructor. Users should use the public `.create()` method.
        self.pipe = _pipe

        class Stdin:
            encoding = "utf-8"

            def isatty(stdin) -> bool:
                return True

            def fileno(stdin) -> int:
                return self.pipe.read_fd

        super().__init__(cast(TextIO, Stdin()))
        self.send_text(_text)

        # Identifier for every PipeInput for the hash.
        self.__class__._id += 1
        self._id = self.__class__._id

    @classmethod
    @contextmanager
    def create(cls, text: str = "") -> Iterator[PosixPipeInput]:
        pipe = _Pipe()
        try:
            yield PosixPipeInput(_pipe=pipe, _text=text)
        finally:
            pipe.close()

    def send_bytes(self, data: bytes) -> None:
        os.write(self.pipe.write_fd, data)

    def send_text(self, data: str) -> None:
        "Send text to the input."
        os.write(self.pipe.write_fd, data.encode("utf-8"))

    def raw_mode(self) -> ContextManager[None]:
        return DummyContext()

    def cooked_mode(self) -> ContextManager[None]:
        return DummyContext()

    def close(self) -> None:
        "Close pipe fds."
        # Only close the write-end of the pipe. This will unblock the reader
        # callback (in vt100.py > _attached_input), which eventually will raise
        # `EOFError`. If we'd also close the read-end, then the event loop
        # won't wake up the corresponding callback because of this.
        self.pipe.close_write()

    def typeahead_hash(self) -> str:
        """
        This needs to be unique for every `PipeInput`.
        """
        return f"pipe-input-{self._id}"