diff options
Diffstat (limited to 'vendor/clue/redis-react')
-rw-r--r-- | vendor/clue/redis-react/LICENSE | 21 | ||||
-rw-r--r-- | vendor/clue/redis-react/composer.json | 32 | ||||
-rw-r--r-- | vendor/clue/redis-react/src/Client.php | 54 | ||||
-rw-r--r-- | vendor/clue/redis-react/src/Factory.php | 191 | ||||
-rw-r--r-- | vendor/clue/redis-react/src/LazyClient.php | 219 | ||||
-rw-r--r-- | vendor/clue/redis-react/src/StreamingClient.php | 203 |
6 files changed, 720 insertions, 0 deletions
diff --git a/vendor/clue/redis-react/LICENSE b/vendor/clue/redis-react/LICENSE new file mode 100644 index 0000000..da15612 --- /dev/null +++ b/vendor/clue/redis-react/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2013 Christian Lück + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/clue/redis-react/composer.json b/vendor/clue/redis-react/composer.json new file mode 100644 index 0000000..c1752cc --- /dev/null +++ b/vendor/clue/redis-react/composer.json @@ -0,0 +1,32 @@ +{ + "name": "clue/redis-react", + "description": "Async Redis client implementation, built on top of ReactPHP.", + "keywords": ["Redis", "database", "client", "async", "ReactPHP"], + "homepage": "https://github.com/clue/reactphp-redis", + "license": "MIT", + "authors": [ + { + "name": "Christian Lück", + "email": "christian@clue.engineering" + } + ], + "require": { + "php": ">=5.3", + "clue/redis-protocol": "0.3.*", + "evenement/evenement": "^3.0 || ^2.0 || ^1.0", + "react/event-loop": "^1.2", + "react/promise": "^2.0 || ^1.1", + "react/promise-timer": "^1.8", + "react/socket": "^1.9" + }, + "require-dev": { + "clue/block-react": "^1.1", + "phpunit/phpunit": "^9.3 || ^5.7 || ^4.8.35" + }, + "autoload": { + "psr-4": { "Clue\\React\\Redis\\": "src/" } + }, + "autoload-dev": { + "psr-4": { "Clue\\Tests\\React\\Redis\\": "tests/" } + } +} diff --git a/vendor/clue/redis-react/src/Client.php b/vendor/clue/redis-react/src/Client.php new file mode 100644 index 0000000..ec54229 --- /dev/null +++ b/vendor/clue/redis-react/src/Client.php @@ -0,0 +1,54 @@ +<?php + +namespace Clue\React\Redis; + +use Evenement\EventEmitterInterface; +use React\Promise\PromiseInterface; + +/** + * Simple interface for executing redis commands + * + * @event error(Exception $error) + * @event close() + * + * @event message($channel, $message) + * @event subscribe($channel, $numberOfChannels) + * @event unsubscribe($channel, $numberOfChannels) + * + * @event pmessage($pattern, $channel, $message) + * @event psubscribe($channel, $numberOfChannels) + * @event punsubscribe($channel, $numberOfChannels) + */ +interface Client extends EventEmitterInterface +{ + /** + * Invoke the given command and return a Promise that will be fulfilled when the request has been replied to + * + * This is a magic method that will be invoked when calling any redis + * command on this instance. + * + * @param string $name + * @param string[] $args + * @return PromiseInterface Promise<mixed,Exception> + */ + public function __call($name, $args); + + /** + * end connection once all pending requests have been replied to + * + * @return void + * @uses self::close() once all replies have been received + * @see self::close() for closing the connection immediately + */ + public function end(); + + /** + * close connection immediately + * + * This will emit the "close" event. + * + * @return void + * @see self::end() for closing the connection once the client is idle + */ + public function close(); +} diff --git a/vendor/clue/redis-react/src/Factory.php b/vendor/clue/redis-react/src/Factory.php new file mode 100644 index 0000000..4e94905 --- /dev/null +++ b/vendor/clue/redis-react/src/Factory.php @@ -0,0 +1,191 @@ +<?php + +namespace Clue\React\Redis; + +use Clue\Redis\Protocol\Factory as ProtocolFactory; +use React\EventLoop\Loop; +use React\EventLoop\LoopInterface; +use React\Promise\Deferred; +use React\Promise\Timer\TimeoutException; +use React\Socket\ConnectionInterface; +use React\Socket\Connector; +use React\Socket\ConnectorInterface; + +class Factory +{ + /** @var LoopInterface */ + private $loop; + + /** @var ConnectorInterface */ + private $connector; + + /** @var ProtocolFactory */ + private $protocol; + + /** + * @param ?LoopInterface $loop + * @param ?ConnectorInterface $connector + * @param ?ProtocolFactory $protocol + */ + public function __construct(LoopInterface $loop = null, ConnectorInterface $connector = null, ProtocolFactory $protocol = null) + { + $this->loop = $loop ?: Loop::get(); + $this->connector = $connector ?: new Connector(array(), $this->loop); + $this->protocol = $protocol ?: new ProtocolFactory(); + } + + /** + * Create Redis client connected to address of given redis instance + * + * @param string $uri Redis server URI to connect to + * @return \React\Promise\PromiseInterface<Client,\Exception> Promise that will + * be fulfilled with `Client` on success or rejects with `\Exception` on error. + */ + public function createClient($uri) + { + // support `redis+unix://` scheme for Unix domain socket (UDS) paths + if (preg_match('/^(redis\+unix:\/\/(?:[^:]*:[^@]*@)?)(.+?)?$/', $uri, $match)) { + $parts = parse_url($match[1] . 'localhost/' . $match[2]); + } else { + if (strpos($uri, '://') === false) { + $uri = 'redis://' . $uri; + } + + $parts = parse_url($uri); + } + + $uri = preg_replace(array('/(:)[^:\/]*(@)/', '/([?&]password=).*?($|&)/'), '$1***$2', $uri); + if ($parts === false || !isset($parts['scheme'], $parts['host']) || !in_array($parts['scheme'], array('redis', 'rediss', 'redis+unix'))) { + return \React\Promise\reject(new \InvalidArgumentException( + 'Invalid Redis URI given (EINVAL)', + defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22 + )); + } + + $args = array(); + parse_str(isset($parts['query']) ? $parts['query'] : '', $args); + + $authority = $parts['host'] . ':' . (isset($parts['port']) ? $parts['port'] : 6379); + if ($parts['scheme'] === 'rediss') { + $authority = 'tls://' . $authority; + } elseif ($parts['scheme'] === 'redis+unix') { + $authority = 'unix://' . substr($parts['path'], 1); + unset($parts['path']); + } + $connecting = $this->connector->connect($authority); + + $deferred = new Deferred(function ($_, $reject) use ($connecting, $uri) { + // connection cancelled, start with rejecting attempt, then clean up + $reject(new \RuntimeException( + 'Connection to ' . $uri . ' cancelled (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); + + // either close successful connection or cancel pending connection attempt + $connecting->then(function (ConnectionInterface $connection) { + $connection->close(); + }); + $connecting->cancel(); + }); + + $protocol = $this->protocol; + $promise = $connecting->then(function (ConnectionInterface $stream) use ($protocol) { + return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer()); + }, function (\Exception $e) use ($uri) { + throw new \RuntimeException( + 'Connection to ' . $uri . ' failed: ' . $e->getMessage(), + $e->getCode(), + $e + ); + }); + + // use `?password=secret` query or `user:secret@host` password form URL + $pass = isset($args['password']) ? $args['password'] : (isset($parts['pass']) ? rawurldecode($parts['pass']) : null); + if (isset($args['password']) || isset($parts['pass'])) { + $pass = isset($args['password']) ? $args['password'] : rawurldecode($parts['pass']); + $promise = $promise->then(function (StreamingClient $redis) use ($pass, $uri) { + return $redis->auth($pass)->then( + function () use ($redis) { + return $redis; + }, + function (\Exception $e) use ($redis, $uri) { + $redis->close(); + + $const = ''; + $errno = $e->getCode(); + if ($errno === 0) { + $const = ' (EACCES)'; + $errno = $e->getCode() ?: (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13); + } + + throw new \RuntimeException( + 'Connection to ' . $uri . ' failed during AUTH command: ' . $e->getMessage() . $const, + $errno, + $e + ); + } + ); + }); + } + + // use `?db=1` query or `/1` path (skip first slash) + if (isset($args['db']) || (isset($parts['path']) && $parts['path'] !== '/')) { + $db = isset($args['db']) ? $args['db'] : substr($parts['path'], 1); + $promise = $promise->then(function (StreamingClient $redis) use ($db, $uri) { + return $redis->select($db)->then( + function () use ($redis) { + return $redis; + }, + function (\Exception $e) use ($redis, $uri) { + $redis->close(); + + $const = ''; + $errno = $e->getCode(); + if ($errno === 0 && strpos($e->getMessage(), 'NOAUTH ') === 0) { + $const = ' (EACCES)'; + $errno = defined('SOCKET_EACCES') ? SOCKET_EACCES : 13; + } elseif ($errno === 0) { + $const = ' (ENOENT)'; + $errno = defined('SOCKET_ENOENT') ? SOCKET_ENOENT : 2; + } + + throw new \RuntimeException( + 'Connection to ' . $uri . ' failed during SELECT command: ' . $e->getMessage() . $const, + $errno, + $e + ); + } + ); + }); + } + + $promise->then(array($deferred, 'resolve'), array($deferred, 'reject')); + + // use timeout from explicit ?timeout=x parameter or default to PHP's default_socket_timeout (60) + $timeout = isset($args['timeout']) ? (float) $args['timeout'] : (int) ini_get("default_socket_timeout"); + if ($timeout < 0) { + return $deferred->promise(); + } + + return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) use ($uri) { + if ($e instanceof TimeoutException) { + throw new \RuntimeException( + 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)', + defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110 + ); + } + throw $e; + }); + } + + /** + * Create Redis client connected to address of given redis instance + * + * @param string $target + * @return Client + */ + public function createLazyClient($target) + { + return new LazyClient($target, $this, $this->loop); + } +} diff --git a/vendor/clue/redis-react/src/LazyClient.php b/vendor/clue/redis-react/src/LazyClient.php new file mode 100644 index 0000000..d82b257 --- /dev/null +++ b/vendor/clue/redis-react/src/LazyClient.php @@ -0,0 +1,219 @@ +<?php + +namespace Clue\React\Redis; + +use Evenement\EventEmitter; +use React\Stream\Util; +use React\EventLoop\LoopInterface; + +/** + * @internal + */ +class LazyClient extends EventEmitter implements Client +{ + private $target; + /** @var Factory */ + private $factory; + private $closed = false; + private $promise; + + private $loop; + private $idlePeriod = 60.0; + private $idleTimer; + private $pending = 0; + + private $subscribed = array(); + private $psubscribed = array(); + + /** + * @param $target + */ + public function __construct($target, Factory $factory, LoopInterface $loop) + { + $args = array(); + \parse_str((string) \parse_url($target, \PHP_URL_QUERY), $args); + if (isset($args['idle'])) { + $this->idlePeriod = (float)$args['idle']; + } + + $this->target = $target; + $this->factory = $factory; + $this->loop = $loop; + } + + private function client() + { + if ($this->promise !== null) { + return $this->promise; + } + + $self = $this; + $pending =& $this->promise; + $idleTimer=& $this->idleTimer; + $subscribed =& $this->subscribed; + $psubscribed =& $this->psubscribed; + $loop = $this->loop; + return $pending = $this->factory->createClient($this->target)->then(function (Client $redis) use ($self, &$pending, &$idleTimer, &$subscribed, &$psubscribed, $loop) { + // connection completed => remember only until closed + $redis->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed, &$idleTimer, $loop) { + $pending = null; + + // foward unsubscribe/punsubscribe events when underlying connection closes + $n = count($subscribed); + foreach ($subscribed as $channel => $_) { + $self->emit('unsubscribe', array($channel, --$n)); + } + $n = count($psubscribed); + foreach ($psubscribed as $pattern => $_) { + $self->emit('punsubscribe', array($pattern, --$n)); + } + $subscribed = array(); + $psubscribed = array(); + + if ($idleTimer !== null) { + $loop->cancelTimer($idleTimer); + $idleTimer = null; + } + }); + + // keep track of all channels and patterns this connection is subscribed to + $redis->on('subscribe', function ($channel) use (&$subscribed) { + $subscribed[$channel] = true; + }); + $redis->on('psubscribe', function ($pattern) use (&$psubscribed) { + $psubscribed[$pattern] = true; + }); + $redis->on('unsubscribe', function ($channel) use (&$subscribed) { + unset($subscribed[$channel]); + }); + $redis->on('punsubscribe', function ($pattern) use (&$psubscribed) { + unset($psubscribed[$pattern]); + }); + + Util::forwardEvents( + $redis, + $self, + array( + 'message', + 'subscribe', + 'unsubscribe', + 'pmessage', + 'psubscribe', + 'punsubscribe', + ) + ); + + return $redis; + }, function (\Exception $e) use (&$pending) { + // connection failed => discard connection attempt + $pending = null; + + throw $e; + }); + } + + public function __call($name, $args) + { + if ($this->closed) { + return \React\Promise\reject(new \RuntimeException( + 'Connection closed (ENOTCONN)', + defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107 + )); + } + + $that = $this; + return $this->client()->then(function (Client $redis) use ($name, $args, $that) { + $that->awake(); + return \call_user_func_array(array($redis, $name), $args)->then( + function ($result) use ($that) { + $that->idle(); + return $result; + }, + function ($error) use ($that) { + $that->idle(); + throw $error; + } + ); + }); + } + + public function end() + { + if ($this->promise === null) { + $this->close(); + } + + if ($this->closed) { + return; + } + + $that = $this; + return $this->client()->then(function (Client $redis) use ($that) { + $redis->on('close', function () use ($that) { + $that->close(); + }); + $redis->end(); + }); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + // either close active connection or cancel pending connection attempt + if ($this->promise !== null) { + $this->promise->then(function (Client $redis) { + $redis->close(); + }); + if ($this->promise !== null) { + $this->promise->cancel(); + $this->promise = null; + } + } + + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + + $this->emit('close'); + $this->removeAllListeners(); + } + + /** + * @internal + */ + public function awake() + { + ++$this->pending; + + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + } + + /** + * @internal + */ + public function idle() + { + --$this->pending; + + if ($this->pending < 1 && $this->idlePeriod >= 0 && !$this->subscribed && !$this->psubscribed && $this->promise !== null) { + $idleTimer =& $this->idleTimer; + $promise =& $this->promise; + $idleTimer = $this->loop->addTimer($this->idlePeriod, function () use (&$idleTimer, &$promise) { + $promise->then(function (Client $redis) { + $redis->close(); + }); + $promise = null; + $idleTimer = null; + }); + } + } +} diff --git a/vendor/clue/redis-react/src/StreamingClient.php b/vendor/clue/redis-react/src/StreamingClient.php new file mode 100644 index 0000000..8afd84d --- /dev/null +++ b/vendor/clue/redis-react/src/StreamingClient.php @@ -0,0 +1,203 @@ +<?php + +namespace Clue\React\Redis; + +use Clue\Redis\Protocol\Factory as ProtocolFactory; +use Clue\Redis\Protocol\Model\ErrorReply; +use Clue\Redis\Protocol\Model\ModelInterface; +use Clue\Redis\Protocol\Model\MultiBulkReply; +use Clue\Redis\Protocol\Parser\ParserException; +use Clue\Redis\Protocol\Parser\ParserInterface; +use Clue\Redis\Protocol\Serializer\SerializerInterface; +use Evenement\EventEmitter; +use React\Promise\Deferred; +use React\Stream\DuplexStreamInterface; + +/** + * @internal + */ +class StreamingClient extends EventEmitter implements Client +{ + private $stream; + private $parser; + private $serializer; + private $requests = array(); + private $ending = false; + private $closed = false; + + private $subscribed = 0; + private $psubscribed = 0; + + public function __construct(DuplexStreamInterface $stream, ParserInterface $parser = null, SerializerInterface $serializer = null) + { + if ($parser === null || $serializer === null) { + $factory = new ProtocolFactory(); + if ($parser === null) { + $parser = $factory->createResponseParser(); + } + if ($serializer === null) { + $serializer = $factory->createSerializer(); + } + } + + $that = $this; + $stream->on('data', function($chunk) use ($parser, $that) { + try { + $models = $parser->pushIncoming($chunk); + } catch (ParserException $error) { + $that->emit('error', array(new \UnexpectedValueException( + 'Invalid data received: ' . $error->getMessage() . ' (EBADMSG)', + defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG : 77, + $error + ))); + $that->close(); + return; + } + + foreach ($models as $data) { + try { + $that->handleMessage($data); + } catch (\UnderflowException $error) { + $that->emit('error', array($error)); + $that->close(); + return; + } + } + }); + + $stream->on('close', array($this, 'close')); + + $this->stream = $stream; + $this->parser = $parser; + $this->serializer = $serializer; + } + + public function __call($name, $args) + { + $request = new Deferred(); + $promise = $request->promise(); + + $name = strtolower($name); + + // special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied + static $pubsubs = array('subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); + + if ($this->ending) { + $request->reject(new \RuntimeException( + 'Connection ' . ($this->closed ? 'closed' : 'closing'). ' (ENOTCONN)', + defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107 + )); + } elseif (count($args) !== 1 && in_array($name, $pubsubs)) { + $request->reject(new \InvalidArgumentException( + 'PubSub commands limited to single argument (EINVAL)', + defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22 + )); + } elseif ($name === 'monitor') { + $request->reject(new \BadMethodCallException( + 'MONITOR command explicitly not supported (ENOTSUP)', + defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95) + )); + } else { + $this->stream->write($this->serializer->getRequestMessage($name, $args)); + $this->requests []= $request; + } + + if (in_array($name, $pubsubs)) { + $that = $this; + $subscribed =& $this->subscribed; + $psubscribed =& $this->psubscribed; + + $promise->then(function ($array) use ($that, &$subscribed, &$psubscribed) { + $first = array_shift($array); + + // (p)(un)subscribe messages are to be forwarded + $that->emit($first, $array); + + // remember number of (p)subscribe topics + if ($first === 'subscribe' || $first === 'unsubscribe') { + $subscribed = $array[1]; + } else { + $psubscribed = $array[1]; + } + }); + } + + return $promise; + } + + public function handleMessage(ModelInterface $message) + { + if (($this->subscribed !== 0 || $this->psubscribed !== 0) && $message instanceof MultiBulkReply) { + $array = $message->getValueNative(); + $first = array_shift($array); + + // pub/sub messages are to be forwarded and should not be processed as request responses + if (in_array($first, array('message', 'pmessage'))) { + $this->emit($first, $array); + return; + } + } + + if (!$this->requests) { + throw new \UnderflowException( + 'Unexpected reply received, no matching request found (ENOMSG)', + defined('SOCKET_ENOMSG') ? SOCKET_ENOMSG : 42 + ); + } + + $request = array_shift($this->requests); + assert($request instanceof Deferred); + + if ($message instanceof ErrorReply) { + $request->reject($message); + } else { + $request->resolve($message->getValueNative()); + } + + if ($this->ending && !$this->requests) { + $this->close(); + } + } + + public function end() + { + $this->ending = true; + + if (!$this->requests) { + $this->close(); + } + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->ending = true; + $this->closed = true; + + $remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false; + $this->stream->close(); + + $this->emit('close'); + + // reject all remaining requests in the queue + while ($this->requests) { + $request = array_shift($this->requests); + assert($request instanceof Deferred); + + if ($remoteClosed) { + $request->reject(new \RuntimeException( + 'Connection closed by peer (ECONNRESET)', + defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104 + )); + } else { + $request->reject(new \RuntimeException( + 'Connection closing (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); + } + } + } +} |