summaryrefslogtreecommitdiffstats
path: root/vendor/clue/redis-react
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:38:42 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:38:42 +0000
commitc3ca98e1b35123f226c7f4c596b5dee78caa4223 (patch)
tree9b6eb109283da55e7d9064baa9fac795a40264cb /vendor/clue/redis-react
parentInitial commit. (diff)
downloadicinga-php-thirdparty-c3ca98e1b35123f226c7f4c596b5dee78caa4223.tar.xz
icinga-php-thirdparty-c3ca98e1b35123f226c7f4c596b5dee78caa4223.zip
Adding upstream version 0.11.0.upstream/0.11.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/clue/redis-react')
-rw-r--r--vendor/clue/redis-react/LICENSE21
-rw-r--r--vendor/clue/redis-react/composer.json32
-rw-r--r--vendor/clue/redis-react/src/Client.php54
-rw-r--r--vendor/clue/redis-react/src/Factory.php191
-rw-r--r--vendor/clue/redis-react/src/LazyClient.php219
-rw-r--r--vendor/clue/redis-react/src/StreamingClient.php203
6 files changed, 720 insertions, 0 deletions
diff --git a/vendor/clue/redis-react/LICENSE b/vendor/clue/redis-react/LICENSE
new file mode 100644
index 0000000..da15612
--- /dev/null
+++ b/vendor/clue/redis-react/LICENSE
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 Christian Lück
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/vendor/clue/redis-react/composer.json b/vendor/clue/redis-react/composer.json
new file mode 100644
index 0000000..c1752cc
--- /dev/null
+++ b/vendor/clue/redis-react/composer.json
@@ -0,0 +1,32 @@
+{
+ "name": "clue/redis-react",
+ "description": "Async Redis client implementation, built on top of ReactPHP.",
+ "keywords": ["Redis", "database", "client", "async", "ReactPHP"],
+ "homepage": "https://github.com/clue/reactphp-redis",
+ "license": "MIT",
+ "authors": [
+ {
+ "name": "Christian Lück",
+ "email": "christian@clue.engineering"
+ }
+ ],
+ "require": {
+ "php": ">=5.3",
+ "clue/redis-protocol": "0.3.*",
+ "evenement/evenement": "^3.0 || ^2.0 || ^1.0",
+ "react/event-loop": "^1.2",
+ "react/promise": "^2.0 || ^1.1",
+ "react/promise-timer": "^1.8",
+ "react/socket": "^1.9"
+ },
+ "require-dev": {
+ "clue/block-react": "^1.1",
+ "phpunit/phpunit": "^9.3 || ^5.7 || ^4.8.35"
+ },
+ "autoload": {
+ "psr-4": { "Clue\\React\\Redis\\": "src/" }
+ },
+ "autoload-dev": {
+ "psr-4": { "Clue\\Tests\\React\\Redis\\": "tests/" }
+ }
+}
diff --git a/vendor/clue/redis-react/src/Client.php b/vendor/clue/redis-react/src/Client.php
new file mode 100644
index 0000000..ec54229
--- /dev/null
+++ b/vendor/clue/redis-react/src/Client.php
@@ -0,0 +1,54 @@
+<?php
+
+namespace Clue\React\Redis;
+
+use Evenement\EventEmitterInterface;
+use React\Promise\PromiseInterface;
+
+/**
+ * Simple interface for executing redis commands
+ *
+ * @event error(Exception $error)
+ * @event close()
+ *
+ * @event message($channel, $message)
+ * @event subscribe($channel, $numberOfChannels)
+ * @event unsubscribe($channel, $numberOfChannels)
+ *
+ * @event pmessage($pattern, $channel, $message)
+ * @event psubscribe($channel, $numberOfChannels)
+ * @event punsubscribe($channel, $numberOfChannels)
+ */
+interface Client extends EventEmitterInterface
+{
+ /**
+ * Invoke the given command and return a Promise that will be fulfilled when the request has been replied to
+ *
+ * This is a magic method that will be invoked when calling any redis
+ * command on this instance.
+ *
+ * @param string $name
+ * @param string[] $args
+ * @return PromiseInterface Promise<mixed,Exception>
+ */
+ public function __call($name, $args);
+
+ /**
+ * end connection once all pending requests have been replied to
+ *
+ * @return void
+ * @uses self::close() once all replies have been received
+ * @see self::close() for closing the connection immediately
+ */
+ public function end();
+
+ /**
+ * close connection immediately
+ *
+ * This will emit the "close" event.
+ *
+ * @return void
+ * @see self::end() for closing the connection once the client is idle
+ */
+ public function close();
+}
diff --git a/vendor/clue/redis-react/src/Factory.php b/vendor/clue/redis-react/src/Factory.php
new file mode 100644
index 0000000..4e94905
--- /dev/null
+++ b/vendor/clue/redis-react/src/Factory.php
@@ -0,0 +1,191 @@
+<?php
+
+namespace Clue\React\Redis;
+
+use Clue\Redis\Protocol\Factory as ProtocolFactory;
+use React\EventLoop\Loop;
+use React\EventLoop\LoopInterface;
+use React\Promise\Deferred;
+use React\Promise\Timer\TimeoutException;
+use React\Socket\ConnectionInterface;
+use React\Socket\Connector;
+use React\Socket\ConnectorInterface;
+
+class Factory
+{
+ /** @var LoopInterface */
+ private $loop;
+
+ /** @var ConnectorInterface */
+ private $connector;
+
+ /** @var ProtocolFactory */
+ private $protocol;
+
+ /**
+ * @param ?LoopInterface $loop
+ * @param ?ConnectorInterface $connector
+ * @param ?ProtocolFactory $protocol
+ */
+ public function __construct(LoopInterface $loop = null, ConnectorInterface $connector = null, ProtocolFactory $protocol = null)
+ {
+ $this->loop = $loop ?: Loop::get();
+ $this->connector = $connector ?: new Connector(array(), $this->loop);
+ $this->protocol = $protocol ?: new ProtocolFactory();
+ }
+
+ /**
+ * Create Redis client connected to address of given redis instance
+ *
+ * @param string $uri Redis server URI to connect to
+ * @return \React\Promise\PromiseInterface<Client,\Exception> Promise that will
+ * be fulfilled with `Client` on success or rejects with `\Exception` on error.
+ */
+ public function createClient($uri)
+ {
+ // support `redis+unix://` scheme for Unix domain socket (UDS) paths
+ if (preg_match('/^(redis\+unix:\/\/(?:[^:]*:[^@]*@)?)(.+?)?$/', $uri, $match)) {
+ $parts = parse_url($match[1] . 'localhost/' . $match[2]);
+ } else {
+ if (strpos($uri, '://') === false) {
+ $uri = 'redis://' . $uri;
+ }
+
+ $parts = parse_url($uri);
+ }
+
+ $uri = preg_replace(array('/(:)[^:\/]*(@)/', '/([?&]password=).*?($|&)/'), '$1***$2', $uri);
+ if ($parts === false || !isset($parts['scheme'], $parts['host']) || !in_array($parts['scheme'], array('redis', 'rediss', 'redis+unix'))) {
+ return \React\Promise\reject(new \InvalidArgumentException(
+ 'Invalid Redis URI given (EINVAL)',
+ defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22
+ ));
+ }
+
+ $args = array();
+ parse_str(isset($parts['query']) ? $parts['query'] : '', $args);
+
+ $authority = $parts['host'] . ':' . (isset($parts['port']) ? $parts['port'] : 6379);
+ if ($parts['scheme'] === 'rediss') {
+ $authority = 'tls://' . $authority;
+ } elseif ($parts['scheme'] === 'redis+unix') {
+ $authority = 'unix://' . substr($parts['path'], 1);
+ unset($parts['path']);
+ }
+ $connecting = $this->connector->connect($authority);
+
+ $deferred = new Deferred(function ($_, $reject) use ($connecting, $uri) {
+ // connection cancelled, start with rejecting attempt, then clean up
+ $reject(new \RuntimeException(
+ 'Connection to ' . $uri . ' cancelled (ECONNABORTED)',
+ defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103
+ ));
+
+ // either close successful connection or cancel pending connection attempt
+ $connecting->then(function (ConnectionInterface $connection) {
+ $connection->close();
+ });
+ $connecting->cancel();
+ });
+
+ $protocol = $this->protocol;
+ $promise = $connecting->then(function (ConnectionInterface $stream) use ($protocol) {
+ return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer());
+ }, function (\Exception $e) use ($uri) {
+ throw new \RuntimeException(
+ 'Connection to ' . $uri . ' failed: ' . $e->getMessage(),
+ $e->getCode(),
+ $e
+ );
+ });
+
+ // use `?password=secret` query or `user:secret@host` password form URL
+ $pass = isset($args['password']) ? $args['password'] : (isset($parts['pass']) ? rawurldecode($parts['pass']) : null);
+ if (isset($args['password']) || isset($parts['pass'])) {
+ $pass = isset($args['password']) ? $args['password'] : rawurldecode($parts['pass']);
+ $promise = $promise->then(function (StreamingClient $redis) use ($pass, $uri) {
+ return $redis->auth($pass)->then(
+ function () use ($redis) {
+ return $redis;
+ },
+ function (\Exception $e) use ($redis, $uri) {
+ $redis->close();
+
+ $const = '';
+ $errno = $e->getCode();
+ if ($errno === 0) {
+ $const = ' (EACCES)';
+ $errno = $e->getCode() ?: (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13);
+ }
+
+ throw new \RuntimeException(
+ 'Connection to ' . $uri . ' failed during AUTH command: ' . $e->getMessage() . $const,
+ $errno,
+ $e
+ );
+ }
+ );
+ });
+ }
+
+ // use `?db=1` query or `/1` path (skip first slash)
+ if (isset($args['db']) || (isset($parts['path']) && $parts['path'] !== '/')) {
+ $db = isset($args['db']) ? $args['db'] : substr($parts['path'], 1);
+ $promise = $promise->then(function (StreamingClient $redis) use ($db, $uri) {
+ return $redis->select($db)->then(
+ function () use ($redis) {
+ return $redis;
+ },
+ function (\Exception $e) use ($redis, $uri) {
+ $redis->close();
+
+ $const = '';
+ $errno = $e->getCode();
+ if ($errno === 0 && strpos($e->getMessage(), 'NOAUTH ') === 0) {
+ $const = ' (EACCES)';
+ $errno = defined('SOCKET_EACCES') ? SOCKET_EACCES : 13;
+ } elseif ($errno === 0) {
+ $const = ' (ENOENT)';
+ $errno = defined('SOCKET_ENOENT') ? SOCKET_ENOENT : 2;
+ }
+
+ throw new \RuntimeException(
+ 'Connection to ' . $uri . ' failed during SELECT command: ' . $e->getMessage() . $const,
+ $errno,
+ $e
+ );
+ }
+ );
+ });
+ }
+
+ $promise->then(array($deferred, 'resolve'), array($deferred, 'reject'));
+
+ // use timeout from explicit ?timeout=x parameter or default to PHP's default_socket_timeout (60)
+ $timeout = isset($args['timeout']) ? (float) $args['timeout'] : (int) ini_get("default_socket_timeout");
+ if ($timeout < 0) {
+ return $deferred->promise();
+ }
+
+ return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) use ($uri) {
+ if ($e instanceof TimeoutException) {
+ throw new \RuntimeException(
+ 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)',
+ defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110
+ );
+ }
+ throw $e;
+ });
+ }
+
+ /**
+ * Create Redis client connected to address of given redis instance
+ *
+ * @param string $target
+ * @return Client
+ */
+ public function createLazyClient($target)
+ {
+ return new LazyClient($target, $this, $this->loop);
+ }
+}
diff --git a/vendor/clue/redis-react/src/LazyClient.php b/vendor/clue/redis-react/src/LazyClient.php
new file mode 100644
index 0000000..d82b257
--- /dev/null
+++ b/vendor/clue/redis-react/src/LazyClient.php
@@ -0,0 +1,219 @@
+<?php
+
+namespace Clue\React\Redis;
+
+use Evenement\EventEmitter;
+use React\Stream\Util;
+use React\EventLoop\LoopInterface;
+
+/**
+ * @internal
+ */
+class LazyClient extends EventEmitter implements Client
+{
+ private $target;
+ /** @var Factory */
+ private $factory;
+ private $closed = false;
+ private $promise;
+
+ private $loop;
+ private $idlePeriod = 60.0;
+ private $idleTimer;
+ private $pending = 0;
+
+ private $subscribed = array();
+ private $psubscribed = array();
+
+ /**
+ * @param $target
+ */
+ public function __construct($target, Factory $factory, LoopInterface $loop)
+ {
+ $args = array();
+ \parse_str((string) \parse_url($target, \PHP_URL_QUERY), $args);
+ if (isset($args['idle'])) {
+ $this->idlePeriod = (float)$args['idle'];
+ }
+
+ $this->target = $target;
+ $this->factory = $factory;
+ $this->loop = $loop;
+ }
+
+ private function client()
+ {
+ if ($this->promise !== null) {
+ return $this->promise;
+ }
+
+ $self = $this;
+ $pending =& $this->promise;
+ $idleTimer=& $this->idleTimer;
+ $subscribed =& $this->subscribed;
+ $psubscribed =& $this->psubscribed;
+ $loop = $this->loop;
+ return $pending = $this->factory->createClient($this->target)->then(function (Client $redis) use ($self, &$pending, &$idleTimer, &$subscribed, &$psubscribed, $loop) {
+ // connection completed => remember only until closed
+ $redis->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed, &$idleTimer, $loop) {
+ $pending = null;
+
+ // foward unsubscribe/punsubscribe events when underlying connection closes
+ $n = count($subscribed);
+ foreach ($subscribed as $channel => $_) {
+ $self->emit('unsubscribe', array($channel, --$n));
+ }
+ $n = count($psubscribed);
+ foreach ($psubscribed as $pattern => $_) {
+ $self->emit('punsubscribe', array($pattern, --$n));
+ }
+ $subscribed = array();
+ $psubscribed = array();
+
+ if ($idleTimer !== null) {
+ $loop->cancelTimer($idleTimer);
+ $idleTimer = null;
+ }
+ });
+
+ // keep track of all channels and patterns this connection is subscribed to
+ $redis->on('subscribe', function ($channel) use (&$subscribed) {
+ $subscribed[$channel] = true;
+ });
+ $redis->on('psubscribe', function ($pattern) use (&$psubscribed) {
+ $psubscribed[$pattern] = true;
+ });
+ $redis->on('unsubscribe', function ($channel) use (&$subscribed) {
+ unset($subscribed[$channel]);
+ });
+ $redis->on('punsubscribe', function ($pattern) use (&$psubscribed) {
+ unset($psubscribed[$pattern]);
+ });
+
+ Util::forwardEvents(
+ $redis,
+ $self,
+ array(
+ 'message',
+ 'subscribe',
+ 'unsubscribe',
+ 'pmessage',
+ 'psubscribe',
+ 'punsubscribe',
+ )
+ );
+
+ return $redis;
+ }, function (\Exception $e) use (&$pending) {
+ // connection failed => discard connection attempt
+ $pending = null;
+
+ throw $e;
+ });
+ }
+
+ public function __call($name, $args)
+ {
+ if ($this->closed) {
+ return \React\Promise\reject(new \RuntimeException(
+ 'Connection closed (ENOTCONN)',
+ defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107
+ ));
+ }
+
+ $that = $this;
+ return $this->client()->then(function (Client $redis) use ($name, $args, $that) {
+ $that->awake();
+ return \call_user_func_array(array($redis, $name), $args)->then(
+ function ($result) use ($that) {
+ $that->idle();
+ return $result;
+ },
+ function ($error) use ($that) {
+ $that->idle();
+ throw $error;
+ }
+ );
+ });
+ }
+
+ public function end()
+ {
+ if ($this->promise === null) {
+ $this->close();
+ }
+
+ if ($this->closed) {
+ return;
+ }
+
+ $that = $this;
+ return $this->client()->then(function (Client $redis) use ($that) {
+ $redis->on('close', function () use ($that) {
+ $that->close();
+ });
+ $redis->end();
+ });
+ }
+
+ public function close()
+ {
+ if ($this->closed) {
+ return;
+ }
+
+ $this->closed = true;
+
+ // either close active connection or cancel pending connection attempt
+ if ($this->promise !== null) {
+ $this->promise->then(function (Client $redis) {
+ $redis->close();
+ });
+ if ($this->promise !== null) {
+ $this->promise->cancel();
+ $this->promise = null;
+ }
+ }
+
+ if ($this->idleTimer !== null) {
+ $this->loop->cancelTimer($this->idleTimer);
+ $this->idleTimer = null;
+ }
+
+ $this->emit('close');
+ $this->removeAllListeners();
+ }
+
+ /**
+ * @internal
+ */
+ public function awake()
+ {
+ ++$this->pending;
+
+ if ($this->idleTimer !== null) {
+ $this->loop->cancelTimer($this->idleTimer);
+ $this->idleTimer = null;
+ }
+ }
+
+ /**
+ * @internal
+ */
+ public function idle()
+ {
+ --$this->pending;
+
+ if ($this->pending < 1 && $this->idlePeriod >= 0 && !$this->subscribed && !$this->psubscribed && $this->promise !== null) {
+ $idleTimer =& $this->idleTimer;
+ $promise =& $this->promise;
+ $idleTimer = $this->loop->addTimer($this->idlePeriod, function () use (&$idleTimer, &$promise) {
+ $promise->then(function (Client $redis) {
+ $redis->close();
+ });
+ $promise = null;
+ $idleTimer = null;
+ });
+ }
+ }
+}
diff --git a/vendor/clue/redis-react/src/StreamingClient.php b/vendor/clue/redis-react/src/StreamingClient.php
new file mode 100644
index 0000000..8afd84d
--- /dev/null
+++ b/vendor/clue/redis-react/src/StreamingClient.php
@@ -0,0 +1,203 @@
+<?php
+
+namespace Clue\React\Redis;
+
+use Clue\Redis\Protocol\Factory as ProtocolFactory;
+use Clue\Redis\Protocol\Model\ErrorReply;
+use Clue\Redis\Protocol\Model\ModelInterface;
+use Clue\Redis\Protocol\Model\MultiBulkReply;
+use Clue\Redis\Protocol\Parser\ParserException;
+use Clue\Redis\Protocol\Parser\ParserInterface;
+use Clue\Redis\Protocol\Serializer\SerializerInterface;
+use Evenement\EventEmitter;
+use React\Promise\Deferred;
+use React\Stream\DuplexStreamInterface;
+
+/**
+ * @internal
+ */
+class StreamingClient extends EventEmitter implements Client
+{
+ private $stream;
+ private $parser;
+ private $serializer;
+ private $requests = array();
+ private $ending = false;
+ private $closed = false;
+
+ private $subscribed = 0;
+ private $psubscribed = 0;
+
+ public function __construct(DuplexStreamInterface $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
+ {
+ if ($parser === null || $serializer === null) {
+ $factory = new ProtocolFactory();
+ if ($parser === null) {
+ $parser = $factory->createResponseParser();
+ }
+ if ($serializer === null) {
+ $serializer = $factory->createSerializer();
+ }
+ }
+
+ $that = $this;
+ $stream->on('data', function($chunk) use ($parser, $that) {
+ try {
+ $models = $parser->pushIncoming($chunk);
+ } catch (ParserException $error) {
+ $that->emit('error', array(new \UnexpectedValueException(
+ 'Invalid data received: ' . $error->getMessage() . ' (EBADMSG)',
+ defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG : 77,
+ $error
+ )));
+ $that->close();
+ return;
+ }
+
+ foreach ($models as $data) {
+ try {
+ $that->handleMessage($data);
+ } catch (\UnderflowException $error) {
+ $that->emit('error', array($error));
+ $that->close();
+ return;
+ }
+ }
+ });
+
+ $stream->on('close', array($this, 'close'));
+
+ $this->stream = $stream;
+ $this->parser = $parser;
+ $this->serializer = $serializer;
+ }
+
+ public function __call($name, $args)
+ {
+ $request = new Deferred();
+ $promise = $request->promise();
+
+ $name = strtolower($name);
+
+ // special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied
+ static $pubsubs = array('subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
+
+ if ($this->ending) {
+ $request->reject(new \RuntimeException(
+ 'Connection ' . ($this->closed ? 'closed' : 'closing'). ' (ENOTCONN)',
+ defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107
+ ));
+ } elseif (count($args) !== 1 && in_array($name, $pubsubs)) {
+ $request->reject(new \InvalidArgumentException(
+ 'PubSub commands limited to single argument (EINVAL)',
+ defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22
+ ));
+ } elseif ($name === 'monitor') {
+ $request->reject(new \BadMethodCallException(
+ 'MONITOR command explicitly not supported (ENOTSUP)',
+ defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95)
+ ));
+ } else {
+ $this->stream->write($this->serializer->getRequestMessage($name, $args));
+ $this->requests []= $request;
+ }
+
+ if (in_array($name, $pubsubs)) {
+ $that = $this;
+ $subscribed =& $this->subscribed;
+ $psubscribed =& $this->psubscribed;
+
+ $promise->then(function ($array) use ($that, &$subscribed, &$psubscribed) {
+ $first = array_shift($array);
+
+ // (p)(un)subscribe messages are to be forwarded
+ $that->emit($first, $array);
+
+ // remember number of (p)subscribe topics
+ if ($first === 'subscribe' || $first === 'unsubscribe') {
+ $subscribed = $array[1];
+ } else {
+ $psubscribed = $array[1];
+ }
+ });
+ }
+
+ return $promise;
+ }
+
+ public function handleMessage(ModelInterface $message)
+ {
+ if (($this->subscribed !== 0 || $this->psubscribed !== 0) && $message instanceof MultiBulkReply) {
+ $array = $message->getValueNative();
+ $first = array_shift($array);
+
+ // pub/sub messages are to be forwarded and should not be processed as request responses
+ if (in_array($first, array('message', 'pmessage'))) {
+ $this->emit($first, $array);
+ return;
+ }
+ }
+
+ if (!$this->requests) {
+ throw new \UnderflowException(
+ 'Unexpected reply received, no matching request found (ENOMSG)',
+ defined('SOCKET_ENOMSG') ? SOCKET_ENOMSG : 42
+ );
+ }
+
+ $request = array_shift($this->requests);
+ assert($request instanceof Deferred);
+
+ if ($message instanceof ErrorReply) {
+ $request->reject($message);
+ } else {
+ $request->resolve($message->getValueNative());
+ }
+
+ if ($this->ending && !$this->requests) {
+ $this->close();
+ }
+ }
+
+ public function end()
+ {
+ $this->ending = true;
+
+ if (!$this->requests) {
+ $this->close();
+ }
+ }
+
+ public function close()
+ {
+ if ($this->closed) {
+ return;
+ }
+
+ $this->ending = true;
+ $this->closed = true;
+
+ $remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false;
+ $this->stream->close();
+
+ $this->emit('close');
+
+ // reject all remaining requests in the queue
+ while ($this->requests) {
+ $request = array_shift($this->requests);
+ assert($request instanceof Deferred);
+
+ if ($remoteClosed) {
+ $request->reject(new \RuntimeException(
+ 'Connection closed by peer (ECONNRESET)',
+ defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104
+ ));
+ } else {
+ $request->reject(new \RuntimeException(
+ 'Connection closing (ECONNABORTED)',
+ defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103
+ ));
+ }
+ }
+ }
+}