summaryrefslogtreecommitdiffstats
path: root/vendor/react/event-loop/src/ExtUvLoop.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/react/event-loop/src/ExtUvLoop.php')
-rw-r--r--vendor/react/event-loop/src/ExtUvLoop.php342
1 files changed, 342 insertions, 0 deletions
diff --git a/vendor/react/event-loop/src/ExtUvLoop.php b/vendor/react/event-loop/src/ExtUvLoop.php
new file mode 100644
index 0000000..4434720
--- /dev/null
+++ b/vendor/react/event-loop/src/ExtUvLoop.php
@@ -0,0 +1,342 @@
+<?php
+
+namespace React\EventLoop;
+
+use React\EventLoop\Tick\FutureTickQueue;
+use React\EventLoop\Timer\Timer;
+use SplObjectStorage;
+
+/**
+ * An `ext-uv` based event loop.
+ *
+ * This loop uses the [`uv` PECL extension](https://pecl.php.net/package/uv),
+ * that provides an interface to `libuv` library.
+ * `libuv` itself supports a number of system-specific backends (epoll, kqueue).
+ *
+ * This loop is known to work with PHP 7+.
+ *
+ * @see https://github.com/bwoebi/php-uv
+ */
+final class ExtUvLoop implements LoopInterface
+{
+ private $uv;
+ private $futureTickQueue;
+ private $timers;
+ private $streamEvents = array();
+ private $readStreams = array();
+ private $writeStreams = array();
+ private $running;
+ private $signals;
+ private $signalEvents = array();
+ private $streamListener;
+
+ public function __construct()
+ {
+ if (!\function_exists('uv_loop_new')) {
+ throw new \BadMethodCallException('Cannot create LibUvLoop, ext-uv extension missing');
+ }
+
+ $this->uv = \uv_loop_new();
+ $this->futureTickQueue = new FutureTickQueue();
+ $this->timers = new SplObjectStorage();
+ $this->streamListener = $this->createStreamListener();
+ $this->signals = new SignalsHandler();
+ }
+
+ /**
+ * Returns the underlying ext-uv event loop. (Internal ReactPHP use only.)
+ *
+ * @internal
+ *
+ * @return resource
+ */
+ public function getUvLoop()
+ {
+ return $this->uv;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function addReadStream($stream, $listener)
+ {
+ if (isset($this->readStreams[(int) $stream])) {
+ return;
+ }
+
+ $this->readStreams[(int) $stream] = $listener;
+ $this->addStream($stream);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function addWriteStream($stream, $listener)
+ {
+ if (isset($this->writeStreams[(int) $stream])) {
+ return;
+ }
+
+ $this->writeStreams[(int) $stream] = $listener;
+ $this->addStream($stream);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function removeReadStream($stream)
+ {
+ if (!isset($this->streamEvents[(int) $stream])) {
+ return;
+ }
+
+ unset($this->readStreams[(int) $stream]);
+ $this->removeStream($stream);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function removeWriteStream($stream)
+ {
+ if (!isset($this->streamEvents[(int) $stream])) {
+ return;
+ }
+
+ unset($this->writeStreams[(int) $stream]);
+ $this->removeStream($stream);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function addTimer($interval, $callback)
+ {
+ $timer = new Timer($interval, $callback, false);
+
+ $that = $this;
+ $timers = $this->timers;
+ $callback = function () use ($timer, $timers, $that) {
+ \call_user_func($timer->getCallback(), $timer);
+
+ if ($timers->contains($timer)) {
+ $that->cancelTimer($timer);
+ }
+ };
+
+ $event = \uv_timer_init($this->uv);
+ $this->timers->attach($timer, $event);
+ \uv_timer_start(
+ $event,
+ $this->convertFloatSecondsToMilliseconds($interval),
+ 0,
+ $callback
+ );
+
+ return $timer;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function addPeriodicTimer($interval, $callback)
+ {
+ $timer = new Timer($interval, $callback, true);
+
+ $callback = function () use ($timer) {
+ \call_user_func($timer->getCallback(), $timer);
+ };
+
+ $interval = $this->convertFloatSecondsToMilliseconds($interval);
+ $event = \uv_timer_init($this->uv);
+ $this->timers->attach($timer, $event);
+ \uv_timer_start(
+ $event,
+ $interval,
+ (int) $interval === 0 ? 1 : $interval,
+ $callback
+ );
+
+ return $timer;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function cancelTimer(TimerInterface $timer)
+ {
+ if (isset($this->timers[$timer])) {
+ @\uv_timer_stop($this->timers[$timer]);
+ $this->timers->detach($timer);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function futureTick($listener)
+ {
+ $this->futureTickQueue->add($listener);
+ }
+
+ public function addSignal($signal, $listener)
+ {
+ $this->signals->add($signal, $listener);
+
+ if (!isset($this->signalEvents[$signal])) {
+ $signals = $this->signals;
+ $this->signalEvents[$signal] = \uv_signal_init($this->uv);
+ \uv_signal_start($this->signalEvents[$signal], function () use ($signals, $signal) {
+ $signals->call($signal);
+ }, $signal);
+ }
+ }
+
+ public function removeSignal($signal, $listener)
+ {
+ $this->signals->remove($signal, $listener);
+
+ if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
+ \uv_signal_stop($this->signalEvents[$signal]);
+ unset($this->signalEvents[$signal]);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function run()
+ {
+ $this->running = true;
+
+ while ($this->running) {
+ $this->futureTickQueue->tick();
+
+ $hasPendingCallbacks = !$this->futureTickQueue->isEmpty();
+ $wasJustStopped = !$this->running;
+ $nothingLeftToDo = !$this->readStreams
+ && !$this->writeStreams
+ && !$this->timers->count()
+ && $this->signals->isEmpty();
+
+ // Use UV::RUN_ONCE when there are only I/O events active in the loop and block until one of those triggers,
+ // otherwise use UV::RUN_NOWAIT.
+ // @link http://docs.libuv.org/en/v1.x/loop.html#c.uv_run
+ $flags = \UV::RUN_ONCE;
+ if ($wasJustStopped || $hasPendingCallbacks) {
+ $flags = \UV::RUN_NOWAIT;
+ } elseif ($nothingLeftToDo) {
+ break;
+ }
+
+ \uv_run($this->uv, $flags);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function stop()
+ {
+ $this->running = false;
+ }
+
+ private function addStream($stream)
+ {
+ if (!isset($this->streamEvents[(int) $stream])) {
+ $this->streamEvents[(int)$stream] = \uv_poll_init_socket($this->uv, $stream);
+ }
+
+ if ($this->streamEvents[(int) $stream] !== false) {
+ $this->pollStream($stream);
+ }
+ }
+
+ private function removeStream($stream)
+ {
+ if (!isset($this->streamEvents[(int) $stream])) {
+ return;
+ }
+
+ if (!isset($this->readStreams[(int) $stream])
+ && !isset($this->writeStreams[(int) $stream])) {
+ \uv_poll_stop($this->streamEvents[(int) $stream]);
+ \uv_close($this->streamEvents[(int) $stream]);
+ unset($this->streamEvents[(int) $stream]);
+ return;
+ }
+
+ $this->pollStream($stream);
+ }
+
+ private function pollStream($stream)
+ {
+ if (!isset($this->streamEvents[(int) $stream])) {
+ return;
+ }
+
+ $flags = 0;
+ if (isset($this->readStreams[(int) $stream])) {
+ $flags |= \UV::READABLE;
+ }
+
+ if (isset($this->writeStreams[(int) $stream])) {
+ $flags |= \UV::WRITABLE;
+ }
+
+ \uv_poll_start($this->streamEvents[(int) $stream], $flags, $this->streamListener);
+ }
+
+ /**
+ * Create a stream listener
+ *
+ * @return callable Returns a callback
+ */
+ private function createStreamListener()
+ {
+ $callback = function ($event, $status, $events, $stream) {
+ // libuv automatically stops polling on error, re-enable polling to match other loop implementations
+ if ($status !== 0) {
+ $this->pollStream($stream);
+
+ // libuv may report no events on error, but this should still invoke stream listeners to report closed connections
+ // re-enable both readable and writable, correct listeners will be checked below anyway
+ if ($events === 0) {
+ $events = \UV::READABLE | \UV::WRITABLE;
+ }
+ }
+
+ if (isset($this->readStreams[(int) $stream]) && ($events & \UV::READABLE)) {
+ \call_user_func($this->readStreams[(int) $stream], $stream);
+ }
+
+ if (isset($this->writeStreams[(int) $stream]) && ($events & \UV::WRITABLE)) {
+ \call_user_func($this->writeStreams[(int) $stream], $stream);
+ }
+ };
+
+ return $callback;
+ }
+
+ /**
+ * @param float $interval
+ * @return int
+ */
+ private function convertFloatSecondsToMilliseconds($interval)
+ {
+ if ($interval < 0) {
+ return 0;
+ }
+
+ $maxValue = (int) (\PHP_INT_MAX / 1000);
+ $intInterval = (int) $interval;
+
+ if (($intInterval <= 0 && $interval > 1) || $intInterval >= $maxValue) {
+ throw new \InvalidArgumentException(
+ "Interval overflow, value must be lower than '{$maxValue}', but '{$interval}' passed."
+ );
+ }
+
+ return (int) \floor($interval * 1000);
+ }
+}