From c3ca98e1b35123f226c7f4c596b5dee78caa4223 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 14:38:42 +0200 Subject: Adding upstream version 0.11.0. Signed-off-by: Daniel Baumann --- vendor/clue/redis-react/src/StreamingClient.php | 203 ++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 vendor/clue/redis-react/src/StreamingClient.php (limited to 'vendor/clue/redis-react/src/StreamingClient.php') diff --git a/vendor/clue/redis-react/src/StreamingClient.php b/vendor/clue/redis-react/src/StreamingClient.php new file mode 100644 index 0000000..8afd84d --- /dev/null +++ b/vendor/clue/redis-react/src/StreamingClient.php @@ -0,0 +1,203 @@ +createResponseParser(); + } + if ($serializer === null) { + $serializer = $factory->createSerializer(); + } + } + + $that = $this; + $stream->on('data', function($chunk) use ($parser, $that) { + try { + $models = $parser->pushIncoming($chunk); + } catch (ParserException $error) { + $that->emit('error', array(new \UnexpectedValueException( + 'Invalid data received: ' . $error->getMessage() . ' (EBADMSG)', + defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG : 77, + $error + ))); + $that->close(); + return; + } + + foreach ($models as $data) { + try { + $that->handleMessage($data); + } catch (\UnderflowException $error) { + $that->emit('error', array($error)); + $that->close(); + return; + } + } + }); + + $stream->on('close', array($this, 'close')); + + $this->stream = $stream; + $this->parser = $parser; + $this->serializer = $serializer; + } + + public function __call($name, $args) + { + $request = new Deferred(); + $promise = $request->promise(); + + $name = strtolower($name); + + // special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied + static $pubsubs = array('subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); + + if ($this->ending) { + $request->reject(new \RuntimeException( + 'Connection ' . ($this->closed ? 'closed' : 'closing'). ' (ENOTCONN)', + defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107 + )); + } elseif (count($args) !== 1 && in_array($name, $pubsubs)) { + $request->reject(new \InvalidArgumentException( + 'PubSub commands limited to single argument (EINVAL)', + defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22 + )); + } elseif ($name === 'monitor') { + $request->reject(new \BadMethodCallException( + 'MONITOR command explicitly not supported (ENOTSUP)', + defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95) + )); + } else { + $this->stream->write($this->serializer->getRequestMessage($name, $args)); + $this->requests []= $request; + } + + if (in_array($name, $pubsubs)) { + $that = $this; + $subscribed =& $this->subscribed; + $psubscribed =& $this->psubscribed; + + $promise->then(function ($array) use ($that, &$subscribed, &$psubscribed) { + $first = array_shift($array); + + // (p)(un)subscribe messages are to be forwarded + $that->emit($first, $array); + + // remember number of (p)subscribe topics + if ($first === 'subscribe' || $first === 'unsubscribe') { + $subscribed = $array[1]; + } else { + $psubscribed = $array[1]; + } + }); + } + + return $promise; + } + + public function handleMessage(ModelInterface $message) + { + if (($this->subscribed !== 0 || $this->psubscribed !== 0) && $message instanceof MultiBulkReply) { + $array = $message->getValueNative(); + $first = array_shift($array); + + // pub/sub messages are to be forwarded and should not be processed as request responses + if (in_array($first, array('message', 'pmessage'))) { + $this->emit($first, $array); + return; + } + } + + if (!$this->requests) { + throw new \UnderflowException( + 'Unexpected reply received, no matching request found (ENOMSG)', + defined('SOCKET_ENOMSG') ? SOCKET_ENOMSG : 42 + ); + } + + $request = array_shift($this->requests); + assert($request instanceof Deferred); + + if ($message instanceof ErrorReply) { + $request->reject($message); + } else { + $request->resolve($message->getValueNative()); + } + + if ($this->ending && !$this->requests) { + $this->close(); + } + } + + public function end() + { + $this->ending = true; + + if (!$this->requests) { + $this->close(); + } + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->ending = true; + $this->closed = true; + + $remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false; + $this->stream->close(); + + $this->emit('close'); + + // reject all remaining requests in the queue + while ($this->requests) { + $request = array_shift($this->requests); + assert($request instanceof Deferred); + + if ($remoteClosed) { + $request->reject(new \RuntimeException( + 'Connection closed by peer (ECONNRESET)', + defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104 + )); + } else { + $request->reject(new \RuntimeException( + 'Connection closing (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); + } + } + } +} -- cgit v1.2.3