loop = new EvLoop(); $this->futureTickQueue = new FutureTickQueue(); $this->timers = new SplObjectStorage(); $this->signals = new SignalsHandler(); } public function addReadStream($stream, $listener) { $key = (int)$stream; if (isset($this->readStreams[$key])) { return; } $callback = $this->getStreamListenerClosure($stream, $listener); $event = $this->loop->io($stream, Ev::READ, $callback); $this->readStreams[$key] = $event; } /** * @param resource $stream * @param callable $listener * * @return \Closure */ private function getStreamListenerClosure($stream, $listener) { return function () use ($stream, $listener) { \call_user_func($listener, $stream); }; } public function addWriteStream($stream, $listener) { $key = (int)$stream; if (isset($this->writeStreams[$key])) { return; } $callback = $this->getStreamListenerClosure($stream, $listener); $event = $this->loop->io($stream, Ev::WRITE, $callback); $this->writeStreams[$key] = $event; } public function removeReadStream($stream) { $key = (int)$stream; if (!isset($this->readStreams[$key])) { return; } $this->readStreams[$key]->stop(); unset($this->readStreams[$key]); } public function removeWriteStream($stream) { $key = (int)$stream; if (!isset($this->writeStreams[$key])) { return; } $this->writeStreams[$key]->stop(); unset($this->writeStreams[$key]); } public function addTimer($interval, $callback) { $timer = new Timer($interval, $callback, false); $that = $this; $timers = $this->timers; $callback = function () use ($timer, $timers, $that) { \call_user_func($timer->getCallback(), $timer); if ($timers->contains($timer)) { $that->cancelTimer($timer); } }; $event = $this->loop->timer($timer->getInterval(), 0.0, $callback); $this->timers->attach($timer, $event); return $timer; } public function addPeriodicTimer($interval, $callback) { $timer = new Timer($interval, $callback, true); $callback = function () use ($timer) { \call_user_func($timer->getCallback(), $timer); }; $event = $this->loop->timer($timer->getInterval(), $timer->getInterval(), $callback); $this->timers->attach($timer, $event); return $timer; } public function cancelTimer(TimerInterface $timer) { if (!isset($this->timers[$timer])) { return; } $event = $this->timers[$timer]; $event->stop(); $this->timers->detach($timer); } public function futureTick($listener) { $this->futureTickQueue->add($listener); } public function run() { $this->running = true; while ($this->running) { $this->futureTickQueue->tick(); $hasPendingCallbacks = !$this->futureTickQueue->isEmpty(); $wasJustStopped = !$this->running; $nothingLeftToDo = !$this->readStreams && !$this->writeStreams && !$this->timers->count() && $this->signals->isEmpty(); $flags = Ev::RUN_ONCE; if ($wasJustStopped || $hasPendingCallbacks) { $flags |= Ev::RUN_NOWAIT; } elseif ($nothingLeftToDo) { break; } $this->loop->run($flags); } } public function stop() { $this->running = false; } public function __destruct() { /** @var TimerInterface $timer */ foreach ($this->timers as $timer) { $this->cancelTimer($timer); } foreach ($this->readStreams as $key => $stream) { $this->removeReadStream($key); } foreach ($this->writeStreams as $key => $stream) { $this->removeWriteStream($key); } } public function addSignal($signal, $listener) { $this->signals->add($signal, $listener); if (!isset($this->signalEvents[$signal])) { $this->signalEvents[$signal] = $this->loop->signal($signal, function() use ($signal) { $this->signals->call($signal); }); } } public function removeSignal($signal, $listener) { $this->signals->remove($signal, $listener); if (isset($this->signalEvents[$signal])) { $this->signalEvents[$signal]->stop(); unset($this->signalEvents[$signal]); } } }