summaryrefslogtreecommitdiffstats
path: root/vendor/react/event-loop/src/StreamSelectLoop.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/react/event-loop/src/StreamSelectLoop.php')
-rw-r--r--vendor/react/event-loop/src/StreamSelectLoop.php330
1 files changed, 330 insertions, 0 deletions
diff --git a/vendor/react/event-loop/src/StreamSelectLoop.php b/vendor/react/event-loop/src/StreamSelectLoop.php
new file mode 100644
index 0000000..1686fd7
--- /dev/null
+++ b/vendor/react/event-loop/src/StreamSelectLoop.php
@@ -0,0 +1,330 @@
+<?php
+
+namespace React\EventLoop;
+
+use React\EventLoop\Tick\FutureTickQueue;
+use React\EventLoop\Timer\Timer;
+use React\EventLoop\Timer\Timers;
+
+/**
+ * A `stream_select()` based event loop.
+ *
+ * This uses the [`stream_select()`](https://www.php.net/manual/en/function.stream-select.php)
+ * function and is the only implementation that works out of the box with PHP.
+ *
+ * This event loop works out of the box on PHP 5.4 through PHP 8+ and HHVM.
+ * This means that no installation is required and this library works on all
+ * platforms and supported PHP versions.
+ * Accordingly, the [`Loop` class](#loop) and the deprecated [`Factory`](#factory)
+ * will use this event loop by default if you do not install any of the event loop
+ * extensions listed below.
+ *
+ * Under the hood, it does a simple `select` system call.
+ * This system call is limited to the maximum file descriptor number of
+ * `FD_SETSIZE` (platform dependent, commonly 1024) and scales with `O(m)`
+ * (`m` being the maximum file descriptor number passed).
+ * This means that you may run into issues when handling thousands of streams
+ * concurrently and you may want to look into using one of the alternative
+ * event loop implementations listed below in this case.
+ * If your use case is among the many common use cases that involve handling only
+ * dozens or a few hundred streams at once, then this event loop implementation
+ * performs really well.
+ *
+ * If you want to use signal handling (see also [`addSignal()`](#addsignal) below),
+ * this event loop implementation requires `ext-pcntl`.
+ * This extension is only available for Unix-like platforms and does not support
+ * Windows.
+ * It is commonly installed as part of many PHP distributions.
+ * If this extension is missing (or you're running on Windows), signal handling is
+ * not supported and throws a `BadMethodCallException` instead.
+ *
+ * This event loop is known to rely on wall-clock time to schedule future timers
+ * when using any version before PHP 7.3, because a monotonic time source is
+ * only available as of PHP 7.3 (`hrtime()`).
+ * While this does not affect many common use cases, this is an important
+ * distinction for programs that rely on a high time precision or on systems
+ * that are subject to discontinuous time adjustments (time jumps).
+ * This means that if you schedule a timer to trigger in 30s on PHP < 7.3 and
+ * then adjust your system time forward by 20s, the timer may trigger in 10s.
+ * See also [`addTimer()`](#addtimer) for more details.
+ *
+ * @link https://www.php.net/manual/en/function.stream-select.php
+ */
+final class StreamSelectLoop implements LoopInterface
+{
+ /** @internal */
+ const MICROSECONDS_PER_SECOND = 1000000;
+
+ private $futureTickQueue;
+ private $timers;
+ private $readStreams = array();
+ private $readListeners = array();
+ private $writeStreams = array();
+ private $writeListeners = array();
+ private $running;
+ private $pcntl = false;
+ private $pcntlPoll = false;
+ private $signals;
+
+ public function __construct()
+ {
+ $this->futureTickQueue = new FutureTickQueue();
+ $this->timers = new Timers();
+ $this->pcntl = \function_exists('pcntl_signal') && \function_exists('pcntl_signal_dispatch');
+ $this->pcntlPoll = $this->pcntl && !\function_exists('pcntl_async_signals');
+ $this->signals = new SignalsHandler();
+
+ // prefer async signals if available (PHP 7.1+) or fall back to dispatching on each tick
+ if ($this->pcntl && !$this->pcntlPoll) {
+ \pcntl_async_signals(true);
+ }
+ }
+
+ public function addReadStream($stream, $listener)
+ {
+ $key = (int) $stream;
+
+ if (!isset($this->readStreams[$key])) {
+ $this->readStreams[$key] = $stream;
+ $this->readListeners[$key] = $listener;
+ }
+ }
+
+ public function addWriteStream($stream, $listener)
+ {
+ $key = (int) $stream;
+
+ if (!isset($this->writeStreams[$key])) {
+ $this->writeStreams[$key] = $stream;
+ $this->writeListeners[$key] = $listener;
+ }
+ }
+
+ public function removeReadStream($stream)
+ {
+ $key = (int) $stream;
+
+ unset(
+ $this->readStreams[$key],
+ $this->readListeners[$key]
+ );
+ }
+
+ public function removeWriteStream($stream)
+ {
+ $key = (int) $stream;
+
+ unset(
+ $this->writeStreams[$key],
+ $this->writeListeners[$key]
+ );
+ }
+
+ public function addTimer($interval, $callback)
+ {
+ $timer = new Timer($interval, $callback, false);
+
+ $this->timers->add($timer);
+
+ return $timer;
+ }
+
+ public function addPeriodicTimer($interval, $callback)
+ {
+ $timer = new Timer($interval, $callback, true);
+
+ $this->timers->add($timer);
+
+ return $timer;
+ }
+
+ public function cancelTimer(TimerInterface $timer)
+ {
+ $this->timers->cancel($timer);
+ }
+
+ public function futureTick($listener)
+ {
+ $this->futureTickQueue->add($listener);
+ }
+
+ public function addSignal($signal, $listener)
+ {
+ if ($this->pcntl === false) {
+ throw new \BadMethodCallException('Event loop feature "signals" isn\'t supported by the "StreamSelectLoop"');
+ }
+
+ $first = $this->signals->count($signal) === 0;
+ $this->signals->add($signal, $listener);
+
+ if ($first) {
+ \pcntl_signal($signal, array($this->signals, 'call'));
+ }
+ }
+
+ public function removeSignal($signal, $listener)
+ {
+ if (!$this->signals->count($signal)) {
+ return;
+ }
+
+ $this->signals->remove($signal, $listener);
+
+ if ($this->signals->count($signal) === 0) {
+ \pcntl_signal($signal, \SIG_DFL);
+ }
+ }
+
+ public function run()
+ {
+ $this->running = true;
+
+ while ($this->running) {
+ $this->futureTickQueue->tick();
+
+ $this->timers->tick();
+
+ // Future-tick queue has pending callbacks ...
+ if (!$this->running || !$this->futureTickQueue->isEmpty()) {
+ $timeout = 0;
+
+ // There is a pending timer, only block until it is due ...
+ } elseif ($scheduledAt = $this->timers->getFirst()) {
+ $timeout = $scheduledAt - $this->timers->getTime();
+ if ($timeout < 0) {
+ $timeout = 0;
+ } else {
+ // Convert float seconds to int microseconds.
+ // Ensure we do not exceed maximum integer size, which may
+ // cause the loop to tick once every ~35min on 32bit systems.
+ $timeout *= self::MICROSECONDS_PER_SECOND;
+ $timeout = $timeout > \PHP_INT_MAX ? \PHP_INT_MAX : (int)$timeout;
+ }
+
+ // The only possible event is stream or signal activity, so wait forever ...
+ } elseif ($this->readStreams || $this->writeStreams || !$this->signals->isEmpty()) {
+ $timeout = null;
+
+ // There's nothing left to do ...
+ } else {
+ break;
+ }
+
+ $this->waitForStreamActivity($timeout);
+ }
+ }
+
+ public function stop()
+ {
+ $this->running = false;
+ }
+
+ /**
+ * Wait/check for stream activity, or until the next timer is due.
+ *
+ * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
+ */
+ private function waitForStreamActivity($timeout)
+ {
+ $read = $this->readStreams;
+ $write = $this->writeStreams;
+
+ $available = $this->streamSelect($read, $write, $timeout);
+ if ($this->pcntlPoll) {
+ \pcntl_signal_dispatch();
+ }
+ if (false === $available) {
+ // if a system call has been interrupted,
+ // we cannot rely on it's outcome
+ return;
+ }
+
+ foreach ($read as $stream) {
+ $key = (int) $stream;
+
+ if (isset($this->readListeners[$key])) {
+ \call_user_func($this->readListeners[$key], $stream);
+ }
+ }
+
+ foreach ($write as $stream) {
+ $key = (int) $stream;
+
+ if (isset($this->writeListeners[$key])) {
+ \call_user_func($this->writeListeners[$key], $stream);
+ }
+ }
+ }
+
+ /**
+ * Emulate a stream_select() implementation that does not break when passed
+ * empty stream arrays.
+ *
+ * @param array $read An array of read streams to select upon.
+ * @param array $write An array of write streams to select upon.
+ * @param int|null $timeout Activity timeout in microseconds, or null to wait forever.
+ *
+ * @return int|false The total number of streams that are ready for read/write.
+ * Can return false if stream_select() is interrupted by a signal.
+ */
+ private function streamSelect(array &$read, array &$write, $timeout)
+ {
+ if ($read || $write) {
+ // We do not usually use or expose the `exceptfds` parameter passed to the underlying `select`.
+ // However, Windows does not report failed connection attempts in `writefds` passed to `select` like most other platforms.
+ // Instead, it uses `writefds` only for successful connection attempts and `exceptfds` for failed connection attempts.
+ // We work around this by adding all sockets that look like a pending connection attempt to `exceptfds` automatically on Windows and merge it back later.
+ // This ensures the public API matches other loop implementations across all platforms (see also test suite or rather test matrix).
+ // Lacking better APIs, every write-only socket that has not yet read any data is assumed to be in a pending connection attempt state.
+ // @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
+ $except = null;
+ if (\DIRECTORY_SEPARATOR === '\\') {
+ $except = array();
+ foreach ($write as $key => $socket) {
+ if (!isset($read[$key]) && @\ftell($socket) === 0) {
+ $except[$key] = $socket;
+ }
+ }
+ }
+
+ /** @var ?callable $previous */
+ $previous = \set_error_handler(function ($errno, $errstr) use (&$previous) {
+ // suppress warnings that occur when `stream_select()` is interrupted by a signal
+ // PHP defines `EINTR` through `ext-sockets` or `ext-pcntl`, otherwise use common default (Linux & Mac)
+ $eintr = \defined('SOCKET_EINTR') ? \SOCKET_EINTR : (\defined('PCNTL_EINTR') ? \PCNTL_EINTR : 4);
+ if ($errno === \E_WARNING && \strpos($errstr, '[' . $eintr .']: ') !== false) {
+ return;
+ }
+
+ // forward any other error to registered error handler or print warning
+ return ($previous !== null) ? \call_user_func_array($previous, \func_get_args()) : false;
+ });
+
+ try {
+ $ret = \stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
+ \restore_error_handler();
+ } catch (\Throwable $e) { // @codeCoverageIgnoreStart
+ \restore_error_handler();
+ throw $e;
+ } catch (\Exception $e) {
+ \restore_error_handler();
+ throw $e;
+ } // @codeCoverageIgnoreEnd
+
+ if ($except) {
+ $write = \array_merge($write, $except);
+ }
+ return $ret;
+ }
+
+ if ($timeout > 0) {
+ \usleep($timeout);
+ } elseif ($timeout === null) {
+ // wait forever (we only reach this if we're only awaiting signals)
+ // this may be interrupted and return earlier when a signal is received
+ \sleep(PHP_INT_MAX);
+ }
+
+ return 0;
+ }
+}