diff options
Diffstat (limited to 'vendor/react/event-loop/src/ExtUvLoop.php')
-rw-r--r-- | vendor/react/event-loop/src/ExtUvLoop.php | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/vendor/react/event-loop/src/ExtUvLoop.php b/vendor/react/event-loop/src/ExtUvLoop.php new file mode 100644 index 0000000..631a459 --- /dev/null +++ b/vendor/react/event-loop/src/ExtUvLoop.php @@ -0,0 +1,342 @@ +<?php + +namespace React\EventLoop; + +use React\EventLoop\Tick\FutureTickQueue; +use React\EventLoop\Timer\Timer; +use SplObjectStorage; + +/** + * An `ext-uv` based event loop. + * + * This loop uses the [`uv` PECL extension](https://pecl.php.net/package/uv), + * that provides an interface to `libuv` library. + * `libuv` itself supports a number of system-specific backends (epoll, kqueue). + * + * This loop is known to work with PHP 7.x. + * + * @see https://github.com/bwoebi/php-uv + */ +final class ExtUvLoop implements LoopInterface +{ + private $uv; + private $futureTickQueue; + private $timers; + private $streamEvents = array(); + private $readStreams = array(); + private $writeStreams = array(); + private $running; + private $signals; + private $signalEvents = array(); + private $streamListener; + + public function __construct() + { + if (!\function_exists('uv_loop_new')) { + throw new \BadMethodCallException('Cannot create LibUvLoop, ext-uv extension missing'); + } + + $this->uv = \uv_loop_new(); + $this->futureTickQueue = new FutureTickQueue(); + $this->timers = new SplObjectStorage(); + $this->streamListener = $this->createStreamListener(); + $this->signals = new SignalsHandler(); + } + + /** + * Returns the underlying ext-uv event loop. (Internal ReactPHP use only.) + * + * @internal + * + * @return resource + */ + public function getUvLoop() + { + return $this->uv; + } + + /** + * {@inheritdoc} + */ + public function addReadStream($stream, $listener) + { + if (isset($this->readStreams[(int) $stream])) { + return; + } + + $this->readStreams[(int) $stream] = $listener; + $this->addStream($stream); + } + + /** + * {@inheritdoc} + */ + public function addWriteStream($stream, $listener) + { + if (isset($this->writeStreams[(int) $stream])) { + return; + } + + $this->writeStreams[(int) $stream] = $listener; + $this->addStream($stream); + } + + /** + * {@inheritdoc} + */ + public function removeReadStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + unset($this->readStreams[(int) $stream]); + $this->removeStream($stream); + } + + /** + * {@inheritdoc} + */ + public function removeWriteStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + unset($this->writeStreams[(int) $stream]); + $this->removeStream($stream); + } + + /** + * {@inheritdoc} + */ + public function addTimer($interval, $callback) + { + $timer = new Timer($interval, $callback, false); + + $that = $this; + $timers = $this->timers; + $callback = function () use ($timer, $timers, $that) { + \call_user_func($timer->getCallback(), $timer); + + if ($timers->contains($timer)) { + $that->cancelTimer($timer); + } + }; + + $event = \uv_timer_init($this->uv); + $this->timers->attach($timer, $event); + \uv_timer_start( + $event, + $this->convertFloatSecondsToMilliseconds($interval), + 0, + $callback + ); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function addPeriodicTimer($interval, $callback) + { + $timer = new Timer($interval, $callback, true); + + $callback = function () use ($timer) { + \call_user_func($timer->getCallback(), $timer); + }; + + $interval = $this->convertFloatSecondsToMilliseconds($interval); + $event = \uv_timer_init($this->uv); + $this->timers->attach($timer, $event); + \uv_timer_start( + $event, + $interval, + (int) $interval === 0 ? 1 : $interval, + $callback + ); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function cancelTimer(TimerInterface $timer) + { + if (isset($this->timers[$timer])) { + @\uv_timer_stop($this->timers[$timer]); + $this->timers->detach($timer); + } + } + + /** + * {@inheritdoc} + */ + public function futureTick($listener) + { + $this->futureTickQueue->add($listener); + } + + public function addSignal($signal, $listener) + { + $this->signals->add($signal, $listener); + + if (!isset($this->signalEvents[$signal])) { + $signals = $this->signals; + $this->signalEvents[$signal] = \uv_signal_init($this->uv); + \uv_signal_start($this->signalEvents[$signal], function () use ($signals, $signal) { + $signals->call($signal); + }, $signal); + } + } + + public function removeSignal($signal, $listener) + { + $this->signals->remove($signal, $listener); + + if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) { + \uv_signal_stop($this->signalEvents[$signal]); + unset($this->signalEvents[$signal]); + } + } + + /** + * {@inheritdoc} + */ + public function run() + { + $this->running = true; + + while ($this->running) { + $this->futureTickQueue->tick(); + + $hasPendingCallbacks = !$this->futureTickQueue->isEmpty(); + $wasJustStopped = !$this->running; + $nothingLeftToDo = !$this->readStreams + && !$this->writeStreams + && !$this->timers->count() + && $this->signals->isEmpty(); + + // Use UV::RUN_ONCE when there are only I/O events active in the loop and block until one of those triggers, + // otherwise use UV::RUN_NOWAIT. + // @link http://docs.libuv.org/en/v1.x/loop.html#c.uv_run + $flags = \UV::RUN_ONCE; + if ($wasJustStopped || $hasPendingCallbacks) { + $flags = \UV::RUN_NOWAIT; + } elseif ($nothingLeftToDo) { + break; + } + + \uv_run($this->uv, $flags); + } + } + + /** + * {@inheritdoc} + */ + public function stop() + { + $this->running = false; + } + + private function addStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + $this->streamEvents[(int)$stream] = \uv_poll_init_socket($this->uv, $stream); + } + + if ($this->streamEvents[(int) $stream] !== false) { + $this->pollStream($stream); + } + } + + private function removeStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + if (!isset($this->readStreams[(int) $stream]) + && !isset($this->writeStreams[(int) $stream])) { + \uv_poll_stop($this->streamEvents[(int) $stream]); + \uv_close($this->streamEvents[(int) $stream]); + unset($this->streamEvents[(int) $stream]); + return; + } + + $this->pollStream($stream); + } + + private function pollStream($stream) + { + if (!isset($this->streamEvents[(int) $stream])) { + return; + } + + $flags = 0; + if (isset($this->readStreams[(int) $stream])) { + $flags |= \UV::READABLE; + } + + if (isset($this->writeStreams[(int) $stream])) { + $flags |= \UV::WRITABLE; + } + + \uv_poll_start($this->streamEvents[(int) $stream], $flags, $this->streamListener); + } + + /** + * Create a stream listener + * + * @return callable Returns a callback + */ + private function createStreamListener() + { + $callback = function ($event, $status, $events, $stream) { + // libuv automatically stops polling on error, re-enable polling to match other loop implementations + if ($status !== 0) { + $this->pollStream($stream); + + // libuv may report no events on error, but this should still invoke stream listeners to report closed connections + // re-enable both readable and writable, correct listeners will be checked below anyway + if ($events === 0) { + $events = \UV::READABLE | \UV::WRITABLE; + } + } + + if (isset($this->readStreams[(int) $stream]) && ($events & \UV::READABLE)) { + \call_user_func($this->readStreams[(int) $stream], $stream); + } + + if (isset($this->writeStreams[(int) $stream]) && ($events & \UV::WRITABLE)) { + \call_user_func($this->writeStreams[(int) $stream], $stream); + } + }; + + return $callback; + } + + /** + * @param float $interval + * @return int + */ + private function convertFloatSecondsToMilliseconds($interval) + { + if ($interval < 0) { + return 0; + } + + $maxValue = (int) (\PHP_INT_MAX / 1000); + $intInterval = (int) $interval; + + if (($intInterval <= 0 && $interval > 1) || $intInterval >= $maxValue) { + throw new \InvalidArgumentException( + "Interval overflow, value must be lower than '{$maxValue}', but '{$interval}' passed." + ); + } + + return (int) \floor($interval * 1000); + } +} |