summaryrefslogtreecommitdiffstats
path: root/vendor/react/http-client
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:38:42 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:38:42 +0000
commitc3ca98e1b35123f226c7f4c596b5dee78caa4223 (patch)
tree9b6eb109283da55e7d9064baa9fac795a40264cb /vendor/react/http-client
parentInitial commit. (diff)
downloadicinga-php-thirdparty-c3ca98e1b35123f226c7f4c596b5dee78caa4223.tar.xz
icinga-php-thirdparty-c3ca98e1b35123f226c7f4c596b5dee78caa4223.zip
Adding upstream version 0.11.0.upstream/0.11.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/react/http-client')
-rw-r--r--vendor/react/http-client/LICENSE19
-rw-r--r--vendor/react/http-client/composer.json30
-rw-r--r--vendor/react/http-client/src/ChunkedStreamDecoder.php207
-rw-r--r--vendor/react/http-client/src/Client.php28
-rw-r--r--vendor/react/http-client/src/Request.php294
-rw-r--r--vendor/react/http-client/src/RequestData.php125
-rw-r--r--vendor/react/http-client/src/Response.php174
7 files changed, 877 insertions, 0 deletions
diff --git a/vendor/react/http-client/LICENSE b/vendor/react/http-client/LICENSE
new file mode 100644
index 0000000..a808108
--- /dev/null
+++ b/vendor/react/http-client/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2012 Igor Wiedler, Chris Boden
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/vendor/react/http-client/composer.json b/vendor/react/http-client/composer.json
new file mode 100644
index 0000000..9207639
--- /dev/null
+++ b/vendor/react/http-client/composer.json
@@ -0,0 +1,30 @@
+{
+ "name": "react/http-client",
+ "description": "Event-driven, streaming HTTP client for ReactPHP",
+ "keywords": ["http"],
+ "license": "MIT",
+ "require": {
+ "php": ">=5.3.0",
+ "evenement/evenement": "^3.0 || ^2.0 || ^1.0",
+ "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3",
+ "react/promise": "^2.1 || ^1.2.1",
+ "react/socket": "^1.0 || ^0.8.4",
+ "react/stream": "^1.0 || ^0.7.1",
+ "ringcentral/psr7": "^1.2"
+ },
+ "require-dev": {
+ "clue/block-react": "^1.2",
+ "phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35",
+ "react/promise-stream": "^1.1"
+ },
+ "autoload": {
+ "psr-4": {
+ "React\\HttpClient\\": "src"
+ }
+ },
+ "autoload-dev": {
+ "psr-4": {
+ "React\\Tests\\HttpClient\\": "tests"
+ }
+ }
+}
diff --git a/vendor/react/http-client/src/ChunkedStreamDecoder.php b/vendor/react/http-client/src/ChunkedStreamDecoder.php
new file mode 100644
index 0000000..bc150ad
--- /dev/null
+++ b/vendor/react/http-client/src/ChunkedStreamDecoder.php
@@ -0,0 +1,207 @@
+<?php
+
+namespace React\HttpClient;
+
+use Evenement\EventEmitter;
+use Exception;
+use React\Stream\ReadableStreamInterface;
+use React\Stream\Util;
+use React\Stream\WritableStreamInterface;
+
+/**
+ * @internal
+ */
+class ChunkedStreamDecoder extends EventEmitter implements ReadableStreamInterface
+{
+ const CRLF = "\r\n";
+
+ /**
+ * @var string
+ */
+ protected $buffer = '';
+
+ /**
+ * @var int
+ */
+ protected $remainingLength = 0;
+
+ /**
+ * @var bool
+ */
+ protected $nextChunkIsLength = true;
+
+ /**
+ * @var ReadableStreamInterface
+ */
+ protected $stream;
+
+ /**
+ * @var bool
+ */
+ protected $closed = false;
+
+ /**
+ * @var bool
+ */
+ protected $reachedEnd = false;
+
+ /**
+ * @param ReadableStreamInterface $stream
+ */
+ public function __construct(ReadableStreamInterface $stream)
+ {
+ $this->stream = $stream;
+ $this->stream->on('data', array($this, 'handleData'));
+ $this->stream->on('end', array($this, 'handleEnd'));
+ Util::forwardEvents($this->stream, $this, array(
+ 'error',
+ ));
+ }
+
+ /** @internal */
+ public function handleData($data)
+ {
+ $this->buffer .= $data;
+
+ do {
+ $bufferLength = strlen($this->buffer);
+ $continue = $this->iterateBuffer();
+ $iteratedBufferLength = strlen($this->buffer);
+ } while (
+ $continue &&
+ $bufferLength !== $iteratedBufferLength &&
+ $iteratedBufferLength > 0
+ );
+
+ if ($this->buffer === false) {
+ $this->buffer = '';
+ }
+ }
+
+ protected function iterateBuffer()
+ {
+ if (strlen($this->buffer) <= 1) {
+ return false;
+ }
+
+ if ($this->nextChunkIsLength) {
+ $crlfPosition = strpos($this->buffer, static::CRLF);
+ if ($crlfPosition === false && strlen($this->buffer) > 1024) {
+ $this->emit('error', array(
+ new Exception('Chunk length header longer then 1024 bytes'),
+ ));
+ $this->close();
+ return false;
+ }
+ if ($crlfPosition === false) {
+ return false; // Chunk header hasn't completely come in yet
+ }
+ $lengthChunk = substr($this->buffer, 0, $crlfPosition);
+ if (strpos($lengthChunk, ';') !== false) {
+ list($lengthChunk) = explode(';', $lengthChunk, 2);
+ }
+ if ($lengthChunk !== '') {
+ $lengthChunk = ltrim(trim($lengthChunk), "0");
+ if ($lengthChunk === '') {
+ // We've reached the end of the stream
+ $this->reachedEnd = true;
+ $this->emit('end');
+ $this->close();
+ return false;
+ }
+ }
+ $this->nextChunkIsLength = false;
+ if (dechex((int)@hexdec($lengthChunk)) !== strtolower($lengthChunk)) {
+ $this->emit('error', array(
+ new Exception('Unable to validate "' . $lengthChunk . '" as chunk length header'),
+ ));
+ $this->close();
+ return false;
+ }
+ $this->remainingLength = hexdec($lengthChunk);
+ $this->buffer = substr($this->buffer, $crlfPosition + 2);
+ return true;
+ }
+
+ if ($this->remainingLength > 0) {
+ $chunkLength = $this->getChunkLength();
+ if ($chunkLength === 0) {
+ return true;
+ }
+ $this->emit('data', array(
+ substr($this->buffer, 0, $chunkLength),
+ $this
+ ));
+ $this->remainingLength -= $chunkLength;
+ $this->buffer = substr($this->buffer, $chunkLength);
+ return true;
+ }
+
+ $this->nextChunkIsLength = true;
+ $this->buffer = substr($this->buffer, 2);
+ return true;
+ }
+
+ protected function getChunkLength()
+ {
+ $bufferLength = strlen($this->buffer);
+
+ if ($bufferLength >= $this->remainingLength) {
+ return $this->remainingLength;
+ }
+
+ return $bufferLength;
+ }
+
+ public function pause()
+ {
+ $this->stream->pause();
+ }
+
+ public function resume()
+ {
+ $this->stream->resume();
+ }
+
+ public function isReadable()
+ {
+ return $this->stream->isReadable();
+ }
+
+ public function pipe(WritableStreamInterface $dest, array $options = array())
+ {
+ Util::pipe($this, $dest, $options);
+
+ return $dest;
+ }
+
+ public function close()
+ {
+ $this->closed = true;
+ return $this->stream->close();
+ }
+
+ /** @internal */
+ public function handleEnd()
+ {
+ $this->handleData('');
+
+ if ($this->closed) {
+ return;
+ }
+
+ if ($this->buffer === '' && $this->reachedEnd) {
+ $this->emit('end');
+ $this->close();
+ return;
+ }
+
+ $this->emit(
+ 'error',
+ array(
+ new Exception('Stream ended with incomplete control code')
+ )
+ );
+ $this->close();
+ }
+}
diff --git a/vendor/react/http-client/src/Client.php b/vendor/react/http-client/src/Client.php
new file mode 100644
index 0000000..fc14426
--- /dev/null
+++ b/vendor/react/http-client/src/Client.php
@@ -0,0 +1,28 @@
+<?php
+
+namespace React\HttpClient;
+
+use React\EventLoop\LoopInterface;
+use React\Socket\ConnectorInterface;
+use React\Socket\Connector;
+
+class Client
+{
+ private $connector;
+
+ public function __construct(LoopInterface $loop, ConnectorInterface $connector = null)
+ {
+ if ($connector === null) {
+ $connector = new Connector($loop);
+ }
+
+ $this->connector = $connector;
+ }
+
+ public function request($method, $url, array $headers = array(), $protocolVersion = '1.0')
+ {
+ $requestData = new RequestData($method, $url, $headers, $protocolVersion);
+
+ return new Request($this->connector, $requestData);
+ }
+}
diff --git a/vendor/react/http-client/src/Request.php b/vendor/react/http-client/src/Request.php
new file mode 100644
index 0000000..caa242b
--- /dev/null
+++ b/vendor/react/http-client/src/Request.php
@@ -0,0 +1,294 @@
+<?php
+
+namespace React\HttpClient;
+
+use Evenement\EventEmitter;
+use React\Promise;
+use React\Socket\ConnectionInterface;
+use React\Socket\ConnectorInterface;
+use React\Stream\WritableStreamInterface;
+use RingCentral\Psr7 as gPsr;
+
+/**
+ * @event response
+ * @event drain
+ * @event error
+ * @event end
+ */
+class Request extends EventEmitter implements WritableStreamInterface
+{
+ const STATE_INIT = 0;
+ const STATE_WRITING_HEAD = 1;
+ const STATE_HEAD_WRITTEN = 2;
+ const STATE_END = 3;
+
+ private $connector;
+ private $requestData;
+
+ private $stream;
+ private $buffer;
+ private $responseFactory;
+ private $state = self::STATE_INIT;
+ private $ended = false;
+
+ private $pendingWrites = '';
+
+ public function __construct(ConnectorInterface $connector, RequestData $requestData)
+ {
+ $this->connector = $connector;
+ $this->requestData = $requestData;
+ }
+
+ public function isWritable()
+ {
+ return self::STATE_END > $this->state && !$this->ended;
+ }
+
+ private function writeHead()
+ {
+ $this->state = self::STATE_WRITING_HEAD;
+
+ $requestData = $this->requestData;
+ $streamRef = &$this->stream;
+ $stateRef = &$this->state;
+ $pendingWrites = &$this->pendingWrites;
+ $that = $this;
+
+ $promise = $this->connect();
+ $promise->then(
+ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRef, &$pendingWrites, $that) {
+ $streamRef = $stream;
+
+ $stream->on('drain', array($that, 'handleDrain'));
+ $stream->on('data', array($that, 'handleData'));
+ $stream->on('end', array($that, 'handleEnd'));
+ $stream->on('error', array($that, 'handleError'));
+ $stream->on('close', array($that, 'handleClose'));
+
+ $headers = (string) $requestData;
+
+ $more = $stream->write($headers . $pendingWrites);
+
+ $stateRef = Request::STATE_HEAD_WRITTEN;
+
+ // clear pending writes if non-empty
+ if ($pendingWrites !== '') {
+ $pendingWrites = '';
+
+ if ($more) {
+ $that->emit('drain');
+ }
+ }
+ },
+ array($this, 'closeError')
+ );
+
+ $this->on('close', function() use ($promise) {
+ $promise->cancel();
+ });
+ }
+
+ public function write($data)
+ {
+ if (!$this->isWritable()) {
+ return false;
+ }
+
+ // write directly to connection stream if already available
+ if (self::STATE_HEAD_WRITTEN <= $this->state) {
+ return $this->stream->write($data);
+ }
+
+ // otherwise buffer and try to establish connection
+ $this->pendingWrites .= $data;
+ if (self::STATE_WRITING_HEAD > $this->state) {
+ $this->writeHead();
+ }
+
+ return false;
+ }
+
+ public function end($data = null)
+ {
+ if (!$this->isWritable()) {
+ return;
+ }
+
+ if (null !== $data) {
+ $this->write($data);
+ } else if (self::STATE_WRITING_HEAD > $this->state) {
+ $this->writeHead();
+ }
+
+ $this->ended = true;
+ }
+
+ /** @internal */
+ public function handleDrain()
+ {
+ $this->emit('drain');
+ }
+
+ /** @internal */
+ public function handleData($data)
+ {
+ $this->buffer .= $data;
+
+ // buffer until double CRLF (or double LF for compatibility with legacy servers)
+ if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) {
+ try {
+ list($response, $bodyChunk) = $this->parseResponse($this->buffer);
+ } catch (\InvalidArgumentException $exception) {
+ $this->emit('error', array($exception));
+ }
+
+ $this->buffer = null;
+
+ $this->stream->removeListener('drain', array($this, 'handleDrain'));
+ $this->stream->removeListener('data', array($this, 'handleData'));
+ $this->stream->removeListener('end', array($this, 'handleEnd'));
+ $this->stream->removeListener('error', array($this, 'handleError'));
+ $this->stream->removeListener('close', array($this, 'handleClose'));
+
+ if (!isset($response)) {
+ return;
+ }
+
+ $response->on('close', array($this, 'close'));
+ $that = $this;
+ $response->on('error', function (\Exception $error) use ($that) {
+ $that->closeError(new \RuntimeException(
+ "An error occured in the response",
+ 0,
+ $error
+ ));
+ });
+
+ $this->emit('response', array($response, $this));
+
+ $this->stream->emit('data', array($bodyChunk));
+ }
+ }
+
+ /** @internal */
+ public function handleEnd()
+ {
+ $this->closeError(new \RuntimeException(
+ "Connection ended before receiving response"
+ ));
+ }
+
+ /** @internal */
+ public function handleError(\Exception $error)
+ {
+ $this->closeError(new \RuntimeException(
+ "An error occurred in the underlying stream",
+ 0,
+ $error
+ ));
+ }
+
+ /** @internal */
+ public function handleClose()
+ {
+ $this->close();
+ }
+
+ /** @internal */
+ public function closeError(\Exception $error)
+ {
+ if (self::STATE_END <= $this->state) {
+ return;
+ }
+ $this->emit('error', array($error));
+ $this->close();
+ }
+
+ public function close()
+ {
+ if (self::STATE_END <= $this->state) {
+ return;
+ }
+
+ $this->state = self::STATE_END;
+ $this->pendingWrites = '';
+
+ if ($this->stream) {
+ $this->stream->close();
+ }
+
+ $this->emit('close');
+ $this->removeAllListeners();
+ }
+
+ protected function parseResponse($data)
+ {
+ $psrResponse = gPsr\parse_response($data);
+ $headers = array_map(function($val) {
+ if (1 === count($val)) {
+ $val = $val[0];
+ }
+
+ return $val;
+ }, $psrResponse->getHeaders());
+
+ $factory = $this->getResponseFactory();
+
+ $response = $factory(
+ 'HTTP',
+ $psrResponse->getProtocolVersion(),
+ $psrResponse->getStatusCode(),
+ $psrResponse->getReasonPhrase(),
+ $headers
+ );
+
+ return array($response, (string)($psrResponse->getBody()));
+ }
+
+ protected function connect()
+ {
+ $scheme = $this->requestData->getScheme();
+ if ($scheme !== 'https' && $scheme !== 'http') {
+ return Promise\reject(
+ new \InvalidArgumentException('Invalid request URL given')
+ );
+ }
+
+ $host = $this->requestData->getHost();
+ $port = $this->requestData->getPort();
+
+ if ($scheme === 'https') {
+ $host = 'tls://' . $host;
+ }
+
+ return $this->connector
+ ->connect($host . ':' . $port);
+ }
+
+ public function setResponseFactory($factory)
+ {
+ $this->responseFactory = $factory;
+ }
+
+ public function getResponseFactory()
+ {
+ if (null === $factory = $this->responseFactory) {
+ $stream = $this->stream;
+
+ $factory = function ($protocol, $version, $code, $reasonPhrase, $headers) use ($stream) {
+ return new Response(
+ $stream,
+ $protocol,
+ $version,
+ $code,
+ $reasonPhrase,
+ $headers
+ );
+ };
+
+ $this->responseFactory = $factory;
+ }
+
+ return $factory;
+ }
+}
diff --git a/vendor/react/http-client/src/RequestData.php b/vendor/react/http-client/src/RequestData.php
new file mode 100644
index 0000000..1c7d5eb
--- /dev/null
+++ b/vendor/react/http-client/src/RequestData.php
@@ -0,0 +1,125 @@
+<?php
+
+namespace React\HttpClient;
+
+class RequestData
+{
+ private $method;
+ private $url;
+ private $headers;
+ private $protocolVersion;
+
+ public function __construct($method, $url, array $headers = array(), $protocolVersion = '1.0')
+ {
+ $this->method = $method;
+ $this->url = $url;
+ $this->headers = $headers;
+ $this->protocolVersion = $protocolVersion;
+ }
+
+ private function mergeDefaultheaders(array $headers)
+ {
+ $port = ($this->getDefaultPort() === $this->getPort()) ? '' : ":{$this->getPort()}";
+ $connectionHeaders = ('1.1' === $this->protocolVersion) ? array('Connection' => 'close') : array();
+ $authHeaders = $this->getAuthHeaders();
+
+ $defaults = array_merge(
+ array(
+ 'Host' => $this->getHost().$port,
+ 'User-Agent' => 'React/alpha',
+ ),
+ $connectionHeaders,
+ $authHeaders
+ );
+
+ // remove all defaults that already exist in $headers
+ $lower = array_change_key_case($headers, CASE_LOWER);
+ foreach ($defaults as $key => $_) {
+ if (isset($lower[strtolower($key)])) {
+ unset($defaults[$key]);
+ }
+ }
+
+ return array_merge($defaults, $headers);
+ }
+
+ public function getScheme()
+ {
+ return parse_url($this->url, PHP_URL_SCHEME);
+ }
+
+ public function getHost()
+ {
+ return parse_url($this->url, PHP_URL_HOST);
+ }
+
+ public function getPort()
+ {
+ return (int) parse_url($this->url, PHP_URL_PORT) ?: $this->getDefaultPort();
+ }
+
+ public function getDefaultPort()
+ {
+ return ('https' === $this->getScheme()) ? 443 : 80;
+ }
+
+ public function getPath()
+ {
+ $path = parse_url($this->url, PHP_URL_PATH);
+ $queryString = parse_url($this->url, PHP_URL_QUERY);
+
+ // assume "/" path by default, but allow "OPTIONS *"
+ if ($path === null) {
+ $path = ($this->method === 'OPTIONS' && $queryString === null) ? '*': '/';
+ }
+ if ($queryString !== null) {
+ $path .= '?' . $queryString;
+ }
+
+ return $path;
+ }
+
+ public function setProtocolVersion($version)
+ {
+ $this->protocolVersion = $version;
+ }
+
+ public function __toString()
+ {
+ $headers = $this->mergeDefaultheaders($this->headers);
+
+ $data = '';
+ $data .= "{$this->method} {$this->getPath()} HTTP/{$this->protocolVersion}\r\n";
+ foreach ($headers as $name => $values) {
+ foreach ((array)$values as $value) {
+ $data .= "$name: $value\r\n";
+ }
+ }
+ $data .= "\r\n";
+
+ return $data;
+ }
+
+ private function getUrlUserPass()
+ {
+ $components = parse_url($this->url);
+
+ if (isset($components['user'])) {
+ return array(
+ 'user' => $components['user'],
+ 'pass' => isset($components['pass']) ? $components['pass'] : null,
+ );
+ }
+ }
+
+ private function getAuthHeaders()
+ {
+ if (null !== $auth = $this->getUrlUserPass()) {
+ return array(
+ 'Authorization' => 'Basic ' . base64_encode($auth['user'].':'.$auth['pass']),
+ );
+ }
+
+ return array();
+ }
+}
diff --git a/vendor/react/http-client/src/Response.php b/vendor/react/http-client/src/Response.php
new file mode 100644
index 0000000..5ed271f
--- /dev/null
+++ b/vendor/react/http-client/src/Response.php
@@ -0,0 +1,174 @@
+<?php
+
+namespace React\HttpClient;
+
+use Evenement\EventEmitter;
+use React\Stream\ReadableStreamInterface;
+use React\Stream\Util;
+use React\Stream\WritableStreamInterface;
+
+/**
+ * @event data ($bodyChunk)
+ * @event error
+ * @event end
+ */
+class Response extends EventEmitter implements ReadableStreamInterface
+{
+ private $stream;
+ private $protocol;
+ private $version;
+ private $code;
+ private $reasonPhrase;
+ private $headers;
+ private $readable = true;
+
+ public function __construct(ReadableStreamInterface $stream, $protocol, $version, $code, $reasonPhrase, $headers)
+ {
+ $this->stream = $stream;
+ $this->protocol = $protocol;
+ $this->version = $version;
+ $this->code = $code;
+ $this->reasonPhrase = $reasonPhrase;
+ $this->headers = $headers;
+
+ if (strtolower($this->getHeaderLine('Transfer-Encoding')) === 'chunked') {
+ $this->stream = new ChunkedStreamDecoder($stream);
+ $this->removeHeader('Transfer-Encoding');
+ }
+
+ $this->stream->on('data', array($this, 'handleData'));
+ $this->stream->on('error', array($this, 'handleError'));
+ $this->stream->on('end', array($this, 'handleEnd'));
+ $this->stream->on('close', array($this, 'handleClose'));
+ }
+
+ public function getProtocol()
+ {
+ return $this->protocol;
+ }
+
+ public function getVersion()
+ {
+ return $this->version;
+ }
+
+ public function getCode()
+ {
+ return $this->code;
+ }
+
+ public function getReasonPhrase()
+ {
+ return $this->reasonPhrase;
+ }
+
+ public function getHeaders()
+ {
+ return $this->headers;
+ }
+
+ private function removeHeader($name)
+ {
+ foreach ($this->headers as $key => $value) {
+ if (strcasecmp($name, $key) === 0) {
+ unset($this->headers[$key]);
+ break;
+ }
+ }
+ }
+
+ private function getHeader($name)
+ {
+ $name = strtolower($name);
+ $normalized = array_change_key_case($this->headers, CASE_LOWER);
+
+ return isset($normalized[$name]) ? (array)$normalized[$name] : array();
+ }
+
+ private function getHeaderLine($name)
+ {
+ return implode(', ' , $this->getHeader($name));
+ }
+
+ /** @internal */
+ public function handleData($data)
+ {
+ if ($this->readable) {
+ $this->emit('data', array($data));
+ }
+ }
+
+ /** @internal */
+ public function handleEnd()
+ {
+ if (!$this->readable) {
+ return;
+ }
+ $this->emit('end');
+ $this->close();
+ }
+
+ /** @internal */
+ public function handleError(\Exception $error)
+ {
+ if (!$this->readable) {
+ return;
+ }
+ $this->emit('error', array(new \RuntimeException(
+ "An error occurred in the underlying stream",
+ 0,
+ $error
+ )));
+
+ $this->close();
+ }
+
+ /** @internal */
+ public function handleClose()
+ {
+ $this->close();
+ }
+
+ public function close()
+ {
+ if (!$this->readable) {
+ return;
+ }
+
+ $this->readable = false;
+ $this->stream->close();
+
+ $this->emit('close');
+ $this->removeAllListeners();
+ }
+
+ public function isReadable()
+ {
+ return $this->readable;
+ }
+
+ public function pause()
+ {
+ if (!$this->readable) {
+ return;
+ }
+
+ $this->stream->pause();
+ }
+
+ public function resume()
+ {
+ if (!$this->readable) {
+ return;
+ }
+
+ $this->stream->resume();
+ }
+
+ public function pipe(WritableStreamInterface $dest, array $options = array())
+ {
+ Util::pipe($this, $dest, $options);
+
+ return $dest;
+ }
+}