diff options
Diffstat (limited to 'vendor/react/stream/src/DuplexResourceStream.php')
-rw-r--r-- | vendor/react/stream/src/DuplexResourceStream.php | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/vendor/react/stream/src/DuplexResourceStream.php b/vendor/react/stream/src/DuplexResourceStream.php new file mode 100644 index 0000000..c3163c6 --- /dev/null +++ b/vendor/react/stream/src/DuplexResourceStream.php @@ -0,0 +1,227 @@ +<?php + +namespace React\Stream; + +use Evenement\EventEmitter; +use React\EventLoop\Loop; +use React\EventLoop\LoopInterface; +use InvalidArgumentException; + +final class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface +{ + private $stream; + + /** @var LoopInterface */ + private $loop; + + /** + * Controls the maximum buffer size in bytes to read at once from the stream. + * + * This can be a positive number which means that up to X bytes will be read + * at once from the underlying stream resource. Note that the actual number + * of bytes read may be lower if the stream resource has less than X bytes + * currently available. + * + * This can be `-1` which means read everything available from the + * underlying stream resource. + * This should read until the stream resource is not readable anymore + * (i.e. underlying buffer drained), note that this does not neccessarily + * mean it reached EOF. + * + * @var int + */ + private $bufferSize; + private $buffer; + + private $readable = true; + private $writable = true; + private $closing = false; + private $listening = false; + + public function __construct($stream, LoopInterface $loop = null, $readChunkSize = null, WritableStreamInterface $buffer = null) + { + if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") { + throw new InvalidArgumentException('First parameter must be a valid stream resource'); + } + + // ensure resource is opened for reading and wrting (fopen mode must contain "+") + $meta = \stream_get_meta_data($stream); + if (isset($meta['mode']) && $meta['mode'] !== '' && \strpos($meta['mode'], '+') === false) { + throw new InvalidArgumentException('Given stream resource is not opened in read and write mode'); + } + + // this class relies on non-blocking I/O in order to not interrupt the event loop + // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918 + if (\stream_set_blocking($stream, false) !== true) { + throw new \RuntimeException('Unable to set stream resource to non-blocking mode'); + } + + // Use unbuffered read operations on the underlying stream resource. + // Reading chunks from the stream may otherwise leave unread bytes in + // PHP's stream buffers which some event loop implementations do not + // trigger events on (edge triggered). + // This does not affect the default event loop implementation (level + // triggered), so we can ignore platforms not supporting this (HHVM). + // Pipe streams (such as STDIN) do not seem to require this and legacy + // PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this. + if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) { + \stream_set_read_buffer($stream, 0); + } + + if ($buffer === null) { + $buffer = new WritableResourceStream($stream, $loop); + } + + $this->stream = $stream; + $this->loop = $loop ?: Loop::get(); + $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize; + $this->buffer = $buffer; + + $that = $this; + + $this->buffer->on('error', function ($error) use ($that) { + $that->emit('error', array($error)); + }); + + $this->buffer->on('close', array($this, 'close')); + + $this->buffer->on('drain', function () use ($that) { + $that->emit('drain'); + }); + + $this->resume(); + } + + public function isReadable() + { + return $this->readable; + } + + public function isWritable() + { + return $this->writable; + } + + public function pause() + { + if ($this->listening) { + $this->loop->removeReadStream($this->stream); + $this->listening = false; + } + } + + public function resume() + { + if (!$this->listening && $this->readable) { + $this->loop->addReadStream($this->stream, array($this, 'handleData')); + $this->listening = true; + } + } + + public function write($data) + { + if (!$this->writable) { + return false; + } + + return $this->buffer->write($data); + } + + public function close() + { + if (!$this->writable && !$this->closing) { + return; + } + + $this->closing = false; + + $this->readable = false; + $this->writable = false; + + $this->emit('close'); + $this->pause(); + $this->buffer->close(); + $this->removeAllListeners(); + + if (\is_resource($this->stream)) { + \fclose($this->stream); + } + } + + public function end($data = null) + { + if (!$this->writable) { + return; + } + + $this->closing = true; + + $this->readable = false; + $this->writable = false; + $this->pause(); + + $this->buffer->end($data); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + return Util::pipe($this, $dest, $options); + } + + /** @internal */ + public function handleData($stream) + { + $error = null; + \set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) { + $error = new \ErrorException( + $errstr, + 0, + $errno, + $errfile, + $errline + ); + }); + + $data = \stream_get_contents($stream, $this->bufferSize); + + \restore_error_handler(); + + if ($error !== null) { + $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error))); + $this->close(); + return; + } + + if ($data !== '') { + $this->emit('data', array($data)); + } elseif (\feof($this->stream)) { + // no data read => we reached the end and close the stream + $this->emit('end'); + $this->close(); + } + } + + /** + * Returns whether this is a pipe resource in a legacy environment + * + * This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+ + * and PHP 5.5.12+ and newer. + * + * @param resource $resource + * @return bool + * @link https://github.com/reactphp/child-process/issues/40 + * + * @codeCoverageIgnore + */ + private function isLegacyPipe($resource) + { + if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) { + $meta = \stream_get_meta_data($resource); + + if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') { + return true; + } + } + return false; + } +} |