From c3ca98e1b35123f226c7f4c596b5dee78caa4223 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 14:38:42 +0200 Subject: Adding upstream version 0.11.0. Signed-off-by: Daniel Baumann --- vendor/clue/redis-react/src/Client.php | 54 ++++++ vendor/clue/redis-react/src/Factory.php | 191 +++++++++++++++++++++ vendor/clue/redis-react/src/LazyClient.php | 219 ++++++++++++++++++++++++ vendor/clue/redis-react/src/StreamingClient.php | 203 ++++++++++++++++++++++ 4 files changed, 667 insertions(+) create mode 100644 vendor/clue/redis-react/src/Client.php create mode 100644 vendor/clue/redis-react/src/Factory.php create mode 100644 vendor/clue/redis-react/src/LazyClient.php create mode 100644 vendor/clue/redis-react/src/StreamingClient.php (limited to 'vendor/clue/redis-react/src') diff --git a/vendor/clue/redis-react/src/Client.php b/vendor/clue/redis-react/src/Client.php new file mode 100644 index 0000000..ec54229 --- /dev/null +++ b/vendor/clue/redis-react/src/Client.php @@ -0,0 +1,54 @@ + + */ + public function __call($name, $args); + + /** + * end connection once all pending requests have been replied to + * + * @return void + * @uses self::close() once all replies have been received + * @see self::close() for closing the connection immediately + */ + public function end(); + + /** + * close connection immediately + * + * This will emit the "close" event. + * + * @return void + * @see self::end() for closing the connection once the client is idle + */ + public function close(); +} diff --git a/vendor/clue/redis-react/src/Factory.php b/vendor/clue/redis-react/src/Factory.php new file mode 100644 index 0000000..4e94905 --- /dev/null +++ b/vendor/clue/redis-react/src/Factory.php @@ -0,0 +1,191 @@ +loop = $loop ?: Loop::get(); + $this->connector = $connector ?: new Connector(array(), $this->loop); + $this->protocol = $protocol ?: new ProtocolFactory(); + } + + /** + * Create Redis client connected to address of given redis instance + * + * @param string $uri Redis server URI to connect to + * @return \React\Promise\PromiseInterface Promise that will + * be fulfilled with `Client` on success or rejects with `\Exception` on error. + */ + public function createClient($uri) + { + // support `redis+unix://` scheme for Unix domain socket (UDS) paths + if (preg_match('/^(redis\+unix:\/\/(?:[^:]*:[^@]*@)?)(.+?)?$/', $uri, $match)) { + $parts = parse_url($match[1] . 'localhost/' . $match[2]); + } else { + if (strpos($uri, '://') === false) { + $uri = 'redis://' . $uri; + } + + $parts = parse_url($uri); + } + + $uri = preg_replace(array('/(:)[^:\/]*(@)/', '/([?&]password=).*?($|&)/'), '$1***$2', $uri); + if ($parts === false || !isset($parts['scheme'], $parts['host']) || !in_array($parts['scheme'], array('redis', 'rediss', 'redis+unix'))) { + return \React\Promise\reject(new \InvalidArgumentException( + 'Invalid Redis URI given (EINVAL)', + defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22 + )); + } + + $args = array(); + parse_str(isset($parts['query']) ? $parts['query'] : '', $args); + + $authority = $parts['host'] . ':' . (isset($parts['port']) ? $parts['port'] : 6379); + if ($parts['scheme'] === 'rediss') { + $authority = 'tls://' . $authority; + } elseif ($parts['scheme'] === 'redis+unix') { + $authority = 'unix://' . substr($parts['path'], 1); + unset($parts['path']); + } + $connecting = $this->connector->connect($authority); + + $deferred = new Deferred(function ($_, $reject) use ($connecting, $uri) { + // connection cancelled, start with rejecting attempt, then clean up + $reject(new \RuntimeException( + 'Connection to ' . $uri . ' cancelled (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); + + // either close successful connection or cancel pending connection attempt + $connecting->then(function (ConnectionInterface $connection) { + $connection->close(); + }); + $connecting->cancel(); + }); + + $protocol = $this->protocol; + $promise = $connecting->then(function (ConnectionInterface $stream) use ($protocol) { + return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer()); + }, function (\Exception $e) use ($uri) { + throw new \RuntimeException( + 'Connection to ' . $uri . ' failed: ' . $e->getMessage(), + $e->getCode(), + $e + ); + }); + + // use `?password=secret` query or `user:secret@host` password form URL + $pass = isset($args['password']) ? $args['password'] : (isset($parts['pass']) ? rawurldecode($parts['pass']) : null); + if (isset($args['password']) || isset($parts['pass'])) { + $pass = isset($args['password']) ? $args['password'] : rawurldecode($parts['pass']); + $promise = $promise->then(function (StreamingClient $redis) use ($pass, $uri) { + return $redis->auth($pass)->then( + function () use ($redis) { + return $redis; + }, + function (\Exception $e) use ($redis, $uri) { + $redis->close(); + + $const = ''; + $errno = $e->getCode(); + if ($errno === 0) { + $const = ' (EACCES)'; + $errno = $e->getCode() ?: (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13); + } + + throw new \RuntimeException( + 'Connection to ' . $uri . ' failed during AUTH command: ' . $e->getMessage() . $const, + $errno, + $e + ); + } + ); + }); + } + + // use `?db=1` query or `/1` path (skip first slash) + if (isset($args['db']) || (isset($parts['path']) && $parts['path'] !== '/')) { + $db = isset($args['db']) ? $args['db'] : substr($parts['path'], 1); + $promise = $promise->then(function (StreamingClient $redis) use ($db, $uri) { + return $redis->select($db)->then( + function () use ($redis) { + return $redis; + }, + function (\Exception $e) use ($redis, $uri) { + $redis->close(); + + $const = ''; + $errno = $e->getCode(); + if ($errno === 0 && strpos($e->getMessage(), 'NOAUTH ') === 0) { + $const = ' (EACCES)'; + $errno = defined('SOCKET_EACCES') ? SOCKET_EACCES : 13; + } elseif ($errno === 0) { + $const = ' (ENOENT)'; + $errno = defined('SOCKET_ENOENT') ? SOCKET_ENOENT : 2; + } + + throw new \RuntimeException( + 'Connection to ' . $uri . ' failed during SELECT command: ' . $e->getMessage() . $const, + $errno, + $e + ); + } + ); + }); + } + + $promise->then(array($deferred, 'resolve'), array($deferred, 'reject')); + + // use timeout from explicit ?timeout=x parameter or default to PHP's default_socket_timeout (60) + $timeout = isset($args['timeout']) ? (float) $args['timeout'] : (int) ini_get("default_socket_timeout"); + if ($timeout < 0) { + return $deferred->promise(); + } + + return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) use ($uri) { + if ($e instanceof TimeoutException) { + throw new \RuntimeException( + 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)', + defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110 + ); + } + throw $e; + }); + } + + /** + * Create Redis client connected to address of given redis instance + * + * @param string $target + * @return Client + */ + public function createLazyClient($target) + { + return new LazyClient($target, $this, $this->loop); + } +} diff --git a/vendor/clue/redis-react/src/LazyClient.php b/vendor/clue/redis-react/src/LazyClient.php new file mode 100644 index 0000000..d82b257 --- /dev/null +++ b/vendor/clue/redis-react/src/LazyClient.php @@ -0,0 +1,219 @@ +idlePeriod = (float)$args['idle']; + } + + $this->target = $target; + $this->factory = $factory; + $this->loop = $loop; + } + + private function client() + { + if ($this->promise !== null) { + return $this->promise; + } + + $self = $this; + $pending =& $this->promise; + $idleTimer=& $this->idleTimer; + $subscribed =& $this->subscribed; + $psubscribed =& $this->psubscribed; + $loop = $this->loop; + return $pending = $this->factory->createClient($this->target)->then(function (Client $redis) use ($self, &$pending, &$idleTimer, &$subscribed, &$psubscribed, $loop) { + // connection completed => remember only until closed + $redis->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed, &$idleTimer, $loop) { + $pending = null; + + // foward unsubscribe/punsubscribe events when underlying connection closes + $n = count($subscribed); + foreach ($subscribed as $channel => $_) { + $self->emit('unsubscribe', array($channel, --$n)); + } + $n = count($psubscribed); + foreach ($psubscribed as $pattern => $_) { + $self->emit('punsubscribe', array($pattern, --$n)); + } + $subscribed = array(); + $psubscribed = array(); + + if ($idleTimer !== null) { + $loop->cancelTimer($idleTimer); + $idleTimer = null; + } + }); + + // keep track of all channels and patterns this connection is subscribed to + $redis->on('subscribe', function ($channel) use (&$subscribed) { + $subscribed[$channel] = true; + }); + $redis->on('psubscribe', function ($pattern) use (&$psubscribed) { + $psubscribed[$pattern] = true; + }); + $redis->on('unsubscribe', function ($channel) use (&$subscribed) { + unset($subscribed[$channel]); + }); + $redis->on('punsubscribe', function ($pattern) use (&$psubscribed) { + unset($psubscribed[$pattern]); + }); + + Util::forwardEvents( + $redis, + $self, + array( + 'message', + 'subscribe', + 'unsubscribe', + 'pmessage', + 'psubscribe', + 'punsubscribe', + ) + ); + + return $redis; + }, function (\Exception $e) use (&$pending) { + // connection failed => discard connection attempt + $pending = null; + + throw $e; + }); + } + + public function __call($name, $args) + { + if ($this->closed) { + return \React\Promise\reject(new \RuntimeException( + 'Connection closed (ENOTCONN)', + defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107 + )); + } + + $that = $this; + return $this->client()->then(function (Client $redis) use ($name, $args, $that) { + $that->awake(); + return \call_user_func_array(array($redis, $name), $args)->then( + function ($result) use ($that) { + $that->idle(); + return $result; + }, + function ($error) use ($that) { + $that->idle(); + throw $error; + } + ); + }); + } + + public function end() + { + if ($this->promise === null) { + $this->close(); + } + + if ($this->closed) { + return; + } + + $that = $this; + return $this->client()->then(function (Client $redis) use ($that) { + $redis->on('close', function () use ($that) { + $that->close(); + }); + $redis->end(); + }); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + // either close active connection or cancel pending connection attempt + if ($this->promise !== null) { + $this->promise->then(function (Client $redis) { + $redis->close(); + }); + if ($this->promise !== null) { + $this->promise->cancel(); + $this->promise = null; + } + } + + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + + $this->emit('close'); + $this->removeAllListeners(); + } + + /** + * @internal + */ + public function awake() + { + ++$this->pending; + + if ($this->idleTimer !== null) { + $this->loop->cancelTimer($this->idleTimer); + $this->idleTimer = null; + } + } + + /** + * @internal + */ + public function idle() + { + --$this->pending; + + if ($this->pending < 1 && $this->idlePeriod >= 0 && !$this->subscribed && !$this->psubscribed && $this->promise !== null) { + $idleTimer =& $this->idleTimer; + $promise =& $this->promise; + $idleTimer = $this->loop->addTimer($this->idlePeriod, function () use (&$idleTimer, &$promise) { + $promise->then(function (Client $redis) { + $redis->close(); + }); + $promise = null; + $idleTimer = null; + }); + } + } +} diff --git a/vendor/clue/redis-react/src/StreamingClient.php b/vendor/clue/redis-react/src/StreamingClient.php new file mode 100644 index 0000000..8afd84d --- /dev/null +++ b/vendor/clue/redis-react/src/StreamingClient.php @@ -0,0 +1,203 @@ +createResponseParser(); + } + if ($serializer === null) { + $serializer = $factory->createSerializer(); + } + } + + $that = $this; + $stream->on('data', function($chunk) use ($parser, $that) { + try { + $models = $parser->pushIncoming($chunk); + } catch (ParserException $error) { + $that->emit('error', array(new \UnexpectedValueException( + 'Invalid data received: ' . $error->getMessage() . ' (EBADMSG)', + defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG : 77, + $error + ))); + $that->close(); + return; + } + + foreach ($models as $data) { + try { + $that->handleMessage($data); + } catch (\UnderflowException $error) { + $that->emit('error', array($error)); + $that->close(); + return; + } + } + }); + + $stream->on('close', array($this, 'close')); + + $this->stream = $stream; + $this->parser = $parser; + $this->serializer = $serializer; + } + + public function __call($name, $args) + { + $request = new Deferred(); + $promise = $request->promise(); + + $name = strtolower($name); + + // special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied + static $pubsubs = array('subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); + + if ($this->ending) { + $request->reject(new \RuntimeException( + 'Connection ' . ($this->closed ? 'closed' : 'closing'). ' (ENOTCONN)', + defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107 + )); + } elseif (count($args) !== 1 && in_array($name, $pubsubs)) { + $request->reject(new \InvalidArgumentException( + 'PubSub commands limited to single argument (EINVAL)', + defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22 + )); + } elseif ($name === 'monitor') { + $request->reject(new \BadMethodCallException( + 'MONITOR command explicitly not supported (ENOTSUP)', + defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95) + )); + } else { + $this->stream->write($this->serializer->getRequestMessage($name, $args)); + $this->requests []= $request; + } + + if (in_array($name, $pubsubs)) { + $that = $this; + $subscribed =& $this->subscribed; + $psubscribed =& $this->psubscribed; + + $promise->then(function ($array) use ($that, &$subscribed, &$psubscribed) { + $first = array_shift($array); + + // (p)(un)subscribe messages are to be forwarded + $that->emit($first, $array); + + // remember number of (p)subscribe topics + if ($first === 'subscribe' || $first === 'unsubscribe') { + $subscribed = $array[1]; + } else { + $psubscribed = $array[1]; + } + }); + } + + return $promise; + } + + public function handleMessage(ModelInterface $message) + { + if (($this->subscribed !== 0 || $this->psubscribed !== 0) && $message instanceof MultiBulkReply) { + $array = $message->getValueNative(); + $first = array_shift($array); + + // pub/sub messages are to be forwarded and should not be processed as request responses + if (in_array($first, array('message', 'pmessage'))) { + $this->emit($first, $array); + return; + } + } + + if (!$this->requests) { + throw new \UnderflowException( + 'Unexpected reply received, no matching request found (ENOMSG)', + defined('SOCKET_ENOMSG') ? SOCKET_ENOMSG : 42 + ); + } + + $request = array_shift($this->requests); + assert($request instanceof Deferred); + + if ($message instanceof ErrorReply) { + $request->reject($message); + } else { + $request->resolve($message->getValueNative()); + } + + if ($this->ending && !$this->requests) { + $this->close(); + } + } + + public function end() + { + $this->ending = true; + + if (!$this->requests) { + $this->close(); + } + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->ending = true; + $this->closed = true; + + $remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false; + $this->stream->close(); + + $this->emit('close'); + + // reject all remaining requests in the queue + while ($this->requests) { + $request = array_shift($this->requests); + assert($request instanceof Deferred); + + if ($remoteClosed) { + $request->reject(new \RuntimeException( + 'Connection closed by peer (ECONNRESET)', + defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104 + )); + } else { + $request->reject(new \RuntimeException( + 'Connection closing (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); + } + } + } +} -- cgit v1.2.3