loop = $loop; $this->running = new ProcessList($loop); } public function forwardLog(LogProxy $logProxy) { $this->logProxy = $logProxy; return $this; } /** * @param Db $db * @return \React\Promise\ExtendedPromiseInterface */ public function initDb(Db $db) { $this->db = $db; $check = function () { try { $this->checkForPendingJobs(); $this->runNextPendingJob(); } catch (\Exception $e) { Logger::error($e->getMessage()); } }; if ($this->timer === null) { $this->loop->futureTick($check); } if ($this->timer !== null) { Logger::info('Cancelling former timer'); $this->loop->cancelTimer($this->timer); } $this->timer = $this->loop->addPeriodicTimer($this->checkInterval, $check); return resolve(); } /** * @return \React\Promise\ExtendedPromiseInterface */ public function stopDb() { $this->scheduledIds = []; if ($this->timer !== null) { $this->loop->cancelTimer($this->timer); $this->timer = null; } $allFinished = $this->running->killOrTerminate(); foreach ($this->runningIds as $id => $promise) { $promise->cancel(); } $this->runningIds = []; return $allFinished; } protected function hasBeenDisabled() { $db = $this->db->getDbAdapter(); return $db->fetchOne( $db->select() ->from('director_setting', 'setting_value') ->where('setting_name = ?', 'disable_all_jobs') ) === 'y'; } protected function checkForPendingJobs() { if ($this->hasBeenDisabled()) { $this->scheduledIds = []; // TODO: disable jobs currently going on? return; } if (empty($this->scheduledIds)) { $this->loadNextIds(); } } protected function runNextPendingJob() { if ($this->timer === null) { // Reset happened. Stopping? return; } if (! empty($this->runningIds)) { return; } while (! empty($this->scheduledIds)) { if ($this->runNextJob()) { break; } } } protected function loadNextIds() { $db = $this->db->getDbAdapter(); foreach ($db->fetchCol( $db->select()->from('director_job', 'id')->where('disabled = ?', 'n') ) as $id) { $this->scheduledIds[] = (int) $id; }; } /** * @return bool */ protected function runNextJob() { $id = \array_shift($this->scheduledIds); try { $job = DirectorJob::loadWithAutoIncId((int) $id, $this->db); if ($job->shouldRun()) { $this->runJob($job); return true; } } catch (\Exception $e) { Logger::error('Trying to schedule Job failed: ' . $e->getMessage()); } return false; } /** * @param DirectorJob $job */ protected function runJob(DirectorJob $job) { $id = $job->get('id'); $jobName = $job->get('job_name'); Logger::debug("Job ($jobName) starting"); $arguments = [ 'director', 'job', 'run', '--id', $job->get('id'), '--debug', '--rpc' ]; $cli = new IcingaCliRpc(); $cli->setArguments($arguments); $cli->on('start', function (Process $process) { $this->onProcessStarted($process); }); // Happens on protocol (Netstring) errors or similar: $cli->on('error', function (\Exception $e) { Logger::error('UNEXPECTED: ' . rtrim($e->getMessage())); }); if ($this->logProxy) { $logger = clone($this->logProxy); $logger->setPrefix("Job ($jobName): "); $cli->rpc()->setHandler($logger, 'logger'); } unset($this->scheduledIds[$id]); $this->runningIds[$id] = $cli->run($this->loop)->then(function () use ($id, $jobName) { Logger::debug("Job ($jobName) finished"); })->otherwise(function (\Exception $e) use ($id, $jobName) { Logger::error("Job ($jobName) failed: " . $e->getMessage()); })->otherwise(function (FinishedProcessState $state) use ($jobName) { Logger::error("Job ($jobName) failed: " . $state->getReason()); })->always(function () use ($id) { unset($this->runningIds[$id]); $this->loop->futureTick(function () { $this->runNextPendingJob(); }); }); } /** * @return ProcessList */ public function getProcessList() { return $this->running; } protected function onProcessStarted(Process $process) { $this->running->attach($process); } public function __destruct() { $this->stopDb(); $this->logProxy = null; $this->loop = null; } }