isLegacyPipe($stream)) { \stream_set_read_buffer($stream, 0); } if ($buffer === null) { $buffer = new WritableResourceStream($stream, $loop); } $this->stream = $stream; $this->loop = $loop ?: Loop::get(); $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize; $this->buffer = $buffer; $that = $this; $this->buffer->on('error', function ($error) use ($that) { $that->emit('error', array($error)); }); $this->buffer->on('close', array($this, 'close')); $this->buffer->on('drain', function () use ($that) { $that->emit('drain'); }); $this->resume(); } public function isReadable() { return $this->readable; } public function isWritable() { return $this->writable; } public function pause() { if ($this->listening) { $this->loop->removeReadStream($this->stream); $this->listening = false; } } public function resume() { if (!$this->listening && $this->readable) { $this->loop->addReadStream($this->stream, array($this, 'handleData')); $this->listening = true; } } public function write($data) { if (!$this->writable) { return false; } return $this->buffer->write($data); } public function close() { if (!$this->writable && !$this->closing) { return; } $this->closing = false; $this->readable = false; $this->writable = false; $this->emit('close'); $this->pause(); $this->buffer->close(); $this->removeAllListeners(); if (\is_resource($this->stream)) { \fclose($this->stream); } } public function end($data = null) { if (!$this->writable) { return; } $this->closing = true; $this->readable = false; $this->writable = false; $this->pause(); $this->buffer->end($data); } public function pipe(WritableStreamInterface $dest, array $options = array()) { return Util::pipe($this, $dest, $options); } /** @internal */ public function handleData($stream) { $error = null; \set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) { $error = new \ErrorException( $errstr, 0, $errno, $errfile, $errline ); }); $data = \stream_get_contents($stream, $this->bufferSize); \restore_error_handler(); if ($error !== null) { $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error))); $this->close(); return; } if ($data !== '') { $this->emit('data', array($data)); } elseif (\feof($this->stream)) { // no data read => we reached the end and close the stream $this->emit('end'); $this->close(); } } /** * Returns whether this is a pipe resource in a legacy environment * * This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+ * and PHP 5.5.12+ and newer. * * @param resource $resource * @return bool * @link https://github.com/reactphp/child-process/issues/40 * * @codeCoverageIgnore */ private function isLegacyPipe($resource) { if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) { $meta = \stream_get_meta_data($resource); if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') { return true; } } return false; } }