summaryrefslogtreecommitdiffstats
path: root/library/Director/Daemon/JobRunner.php
diff options
context:
space:
mode:
Diffstat (limited to 'library/Director/Daemon/JobRunner.php')
-rw-r--r--library/Director/Daemon/JobRunner.php234
1 files changed, 234 insertions, 0 deletions
diff --git a/library/Director/Daemon/JobRunner.php b/library/Director/Daemon/JobRunner.php
new file mode 100644
index 0000000..78d7747
--- /dev/null
+++ b/library/Director/Daemon/JobRunner.php
@@ -0,0 +1,234 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use gipfl\IcingaCliDaemon\FinishedProcessState;
+use gipfl\IcingaCliDaemon\IcingaCliRpc;
+use Icinga\Application\Logger;
+use Icinga\Module\Director\Db;
+use Icinga\Module\Director\Objects\DirectorJob;
+use React\ChildProcess\Process;
+use React\EventLoop\LoopInterface;
+use React\Promise\Promise;
+use function React\Promise\resolve;
+
+class JobRunner implements DbBasedComponent
+{
+ /** @var Db */
+ protected $db;
+
+ /** @var LoopInterface */
+ protected $loop;
+
+ /** @var int[] */
+ protected $scheduledIds = [];
+
+ /** @var Promise[] */
+ protected $runningIds = [];
+
+ protected $checkInterval = 10;
+
+ /** @var \React\EventLoop\TimerInterface */
+ protected $timer;
+
+ /** @var LogProxy */
+ protected $logProxy;
+
+ /** @var ProcessList */
+ protected $running;
+
+ public function __construct(LoopInterface $loop)
+ {
+ $this->loop = $loop;
+ $this->running = new ProcessList($loop);
+ }
+
+ public function forwardLog(LogProxy $logProxy)
+ {
+ $this->logProxy = $logProxy;
+
+ return $this;
+ }
+
+ /**
+ * @param Db $db
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function initDb(Db $db)
+ {
+ $this->db = $db;
+ $check = function () {
+ try {
+ $this->checkForPendingJobs();
+ $this->runNextPendingJob();
+ } catch (\Exception $e) {
+ Logger::error($e->getMessage());
+ }
+ };
+ if ($this->timer === null) {
+ $this->loop->futureTick($check);
+ }
+ if ($this->timer !== null) {
+ Logger::info('Cancelling former timer');
+ $this->loop->cancelTimer($this->timer);
+ }
+ $this->timer = $this->loop->addPeriodicTimer($this->checkInterval, $check);
+
+ return resolve();
+ }
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function stopDb()
+ {
+ $this->scheduledIds = [];
+ if ($this->timer !== null) {
+ $this->loop->cancelTimer($this->timer);
+ $this->timer = null;
+ }
+ $allFinished = $this->running->killOrTerminate();
+ foreach ($this->runningIds as $id => $promise) {
+ $promise->cancel();
+ }
+ $this->runningIds = [];
+
+ return $allFinished;
+ }
+
+ protected function hasBeenDisabled()
+ {
+ $db = $this->db->getDbAdapter();
+ return $db->fetchOne(
+ $db->select()
+ ->from('director_setting', 'setting_value')
+ ->where('setting_name = ?', 'disable_all_jobs')
+ ) === 'y';
+ }
+
+ protected function checkForPendingJobs()
+ {
+ if ($this->hasBeenDisabled()) {
+ $this->scheduledIds = [];
+ // TODO: disable jobs currently going on?
+ return;
+ }
+ if (empty($this->scheduledIds)) {
+ $this->loadNextIds();
+ }
+ }
+
+ protected function runNextPendingJob()
+ {
+ if ($this->timer === null) {
+ // Reset happened. Stopping?
+ return;
+ }
+
+ if (! empty($this->runningIds)) {
+ return;
+ }
+ while (! empty($this->scheduledIds)) {
+ if ($this->runNextJob()) {
+ break;
+ }
+ }
+ }
+
+ protected function loadNextIds()
+ {
+ $db = $this->db->getDbAdapter();
+
+ foreach ($db->fetchCol(
+ $db->select()->from('director_job', 'id')->where('disabled = ?', 'n')
+ ) as $id) {
+ $this->scheduledIds[] = (int) $id;
+ };
+ }
+
+ /**
+ * @return bool
+ */
+ protected function runNextJob()
+ {
+ $id = \array_shift($this->scheduledIds);
+ try {
+ $job = DirectorJob::loadWithAutoIncId((int) $id, $this->db);
+ if ($job->shouldRun()) {
+ $this->runJob($job);
+ return true;
+ }
+ } catch (\Exception $e) {
+ Logger::error('Trying to schedule Job failed: ' . $e->getMessage());
+ }
+
+ return false;
+ }
+
+ /**
+ * @param DirectorJob $job
+ */
+ protected function runJob(DirectorJob $job)
+ {
+ $id = $job->get('id');
+ $jobName = $job->get('job_name');
+ Logger::debug("Job ($jobName) starting");
+ $arguments = [
+ 'director',
+ 'job',
+ 'run',
+ '--id',
+ $job->get('id'),
+ '--debug',
+ '--rpc'
+ ];
+ $cli = new IcingaCliRpc();
+ $cli->setArguments($arguments);
+ $cli->on('start', function (Process $process) {
+ $this->onProcessStarted($process);
+ });
+
+ // Happens on protocol (Netstring) errors or similar:
+ $cli->on('error', function (\Exception $e) {
+ Logger::error('UNEXPECTED: ' . rtrim($e->getMessage()));
+ });
+ if ($this->logProxy) {
+ $logger = clone($this->logProxy);
+ $logger->setPrefix("Job ($jobName): ");
+ $cli->rpc()->setHandler($logger, 'logger');
+ }
+ unset($this->scheduledIds[$id]);
+ $this->runningIds[$id] = $cli->run($this->loop)->then(function () use ($id, $jobName) {
+ Logger::debug("Job ($jobName) finished");
+ })->otherwise(function (\Exception $e) use ($id, $jobName) {
+ Logger::error("Job ($jobName) failed: " . $e->getMessage());
+ })->otherwise(function (FinishedProcessState $state) use ($jobName) {
+ Logger::error("Job ($jobName) failed: " . $state->getReason());
+ })->always(function () use ($id) {
+ unset($this->runningIds[$id]);
+ $this->loop->futureTick(function () {
+ $this->runNextPendingJob();
+ });
+ });
+ }
+
+ /**
+ * @return ProcessList
+ */
+ public function getProcessList()
+ {
+ return $this->running;
+ }
+
+ protected function onProcessStarted(Process $process)
+ {
+ $this->running->attach($process);
+ }
+
+ public function __destruct()
+ {
+ $this->stopDb();
+ $this->logProxy = null;
+ $this->loop = null;
+ }
+}