summaryrefslogtreecommitdiffstats
path: root/library/Director/Daemon
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:43:12 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:43:12 +0000
commitcd989f9c3aff968e19a3aeabc4eb9085787a6673 (patch)
treefbff2135e7013f196b891bbde54618eb050e4aaf /library/Director/Daemon
parentInitial commit. (diff)
downloadicingaweb2-module-director-cd989f9c3aff968e19a3aeabc4eb9085787a6673.tar.xz
icingaweb2-module-director-cd989f9c3aff968e19a3aeabc4eb9085787a6673.zip
Adding upstream version 1.10.2.upstream/1.10.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/Director/Daemon')
-rw-r--r--library/Director/Daemon/BackgroundDaemon.php235
-rw-r--r--library/Director/Daemon/DaemonDb.php365
-rw-r--r--library/Director/Daemon/DaemonProcessDetails.php122
-rw-r--r--library/Director/Daemon/DaemonProcessState.php85
-rw-r--r--library/Director/Daemon/DaemonUtil.php16
-rw-r--r--library/Director/Daemon/DbBasedComponent.php19
-rw-r--r--library/Director/Daemon/DeploymentChecker.php51
-rw-r--r--library/Director/Daemon/JobRunner.php234
-rw-r--r--library/Director/Daemon/JsonRpcLogWriter.php37
-rw-r--r--library/Director/Daemon/LogProxy.php76
-rw-r--r--library/Director/Daemon/Logger.php24
-rw-r--r--library/Director/Daemon/ProcessList.php125
-rw-r--r--library/Director/Daemon/RunningDaemonInfo.php154
-rw-r--r--library/Director/Daemon/SystemdLogWriter.php27
14 files changed, 1570 insertions, 0 deletions
diff --git a/library/Director/Daemon/BackgroundDaemon.php b/library/Director/Daemon/BackgroundDaemon.php
new file mode 100644
index 0000000..34cc28b
--- /dev/null
+++ b/library/Director/Daemon/BackgroundDaemon.php
@@ -0,0 +1,235 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use Exception;
+use gipfl\Cli\Process;
+use gipfl\IcingaCliDaemon\DbResourceConfigWatch;
+use gipfl\SystemD\NotifySystemD;
+use React\EventLoop\Factory as Loop;
+use React\EventLoop\LoopInterface;
+use Ramsey\Uuid\Uuid;
+
+class BackgroundDaemon
+{
+ /** @var LoopInterface */
+ private $loop;
+
+ /** @var NotifySystemD|boolean */
+ protected $systemd;
+
+ /** @var JobRunner */
+ protected $jobRunner;
+
+ /** @var string|null */
+ protected $dbResourceName;
+
+ /** @var DaemonDb */
+ protected $daemonDb;
+
+ /** @var DaemonProcessState */
+ protected $processState;
+
+ /** @var DaemonProcessDetails */
+ protected $processDetails;
+
+ /** @var LogProxy */
+ protected $logProxy;
+
+ /** @var bool */
+ protected $reloading = false;
+
+ /** @var bool */
+ protected $shuttingDown = false;
+
+ public function run(LoopInterface $loop = null)
+ {
+ if ($ownLoop = ($loop === null)) {
+ $loop = Loop::create();
+ }
+ $this->loop = $loop;
+ $this->loop->futureTick(function () {
+ $this->initialize();
+ });
+ if ($ownLoop) {
+ $loop->run();
+ }
+ }
+
+ public function setDbResourceName($name)
+ {
+ $this->dbResourceName = $name;
+
+ return $this;
+ }
+
+ protected function initialize()
+ {
+ $this->registerSignalHandlers($this->loop);
+ $this->processState = new DaemonProcessState('icinga::director');
+ $this->jobRunner = new JobRunner($this->loop);
+ $this->systemd = $this->eventuallyInitializeSystemd();
+ $this->processState->setSystemd($this->systemd);
+ if ($this->systemd) {
+ $this->systemd->setReady();
+ }
+ $this->setState('ready');
+ $this->processDetails = $this
+ ->initializeProcessDetails($this->systemd)
+ ->registerProcessList($this->jobRunner->getProcessList());
+ $this->logProxy = new LogProxy($this->processDetails->getInstanceUuid());
+ $this->jobRunner->forwardLog($this->logProxy);
+ $this->daemonDb = $this->initializeDb(
+ $this->processDetails,
+ $this->processState,
+ $this->dbResourceName
+ );
+ $this->daemonDb
+ ->register($this->jobRunner)
+ ->register($this->logProxy)
+ ->register(new DeploymentChecker($this->loop))
+ ->run($this->loop);
+ $this->setState('running');
+ }
+
+ /**
+ * @param NotifySystemD|false $systemd
+ * @return DaemonProcessDetails
+ */
+ protected function initializeProcessDetails($systemd)
+ {
+ if ($systemd && $systemd->hasInvocationId()) {
+ $uuid = $systemd->getInvocationId();
+ } else {
+ try {
+ $uuid = \bin2hex(Uuid::uuid4()->getBytes());
+ } catch (Exception $e) {
+ $uuid = 'deadc0de' . \substr(\md5(\getmypid()), 0, 24);
+ }
+ }
+ $processDetails = new DaemonProcessDetails($uuid);
+ if ($systemd) {
+ $processDetails->set('running_with_systemd', 'y');
+ }
+
+ return $processDetails;
+ }
+
+ protected function eventuallyInitializeSystemd()
+ {
+ $systemd = NotifySystemD::ifRequired($this->loop);
+ if ($systemd) {
+ Logger::replaceRunningInstance(new SystemdLogWriter());
+ Logger::info(sprintf(
+ "Started by systemd, notifying watchdog every %0.2Gs via %s",
+ $systemd->getWatchdogInterval(),
+ $systemd->getSocketPath()
+ ));
+ } else {
+ Logger::debug('Running without systemd');
+ }
+
+ return $systemd;
+ }
+
+ /**
+ * @return DaemonProcessDetails
+ */
+ public function getProcessDetails()
+ {
+ return $this->processDetails;
+ }
+
+ /**
+ * @return DaemonProcessState
+ */
+ public function getProcessState()
+ {
+ return $this->processState;
+ }
+
+ protected function initializeDb(
+ DaemonProcessDetails $processDetails,
+ DaemonProcessState $processState,
+ $dbResourceName = null
+ ) {
+ $db = new DaemonDb($processDetails);
+ $db->on('state', function ($state, $level = null) use ($processState) {
+ // TODO: level is sent but not used
+ $processState->setComponentState('db', $state);
+ });
+ $db->on('schemaChange', function ($startupSchema, $dbSchema) {
+ Logger::info(sprintf(
+ "DB schema version changed. Started with %d, DB has %d. Restarting.",
+ $startupSchema,
+ $dbSchema
+ ));
+ $this->reload();
+ });
+
+ $db->setConfigWatch(
+ $dbResourceName
+ ? DbResourceConfigWatch::name($dbResourceName)
+ : DbResourceConfigWatch::module('director')
+ );
+
+ return $db;
+ }
+
+ protected function registerSignalHandlers(LoopInterface $loop)
+ {
+ $func = function ($signal) use (&$func) {
+ $this->shutdownWithSignal($signal, $func);
+ };
+ $funcReload = function () {
+ $this->reload();
+ };
+ $loop->addSignal(SIGHUP, $funcReload);
+ $loop->addSignal(SIGINT, $func);
+ $loop->addSignal(SIGTERM, $func);
+ }
+
+ protected function shutdownWithSignal($signal, &$func)
+ {
+ $this->loop->removeSignal($signal, $func);
+ $this->shutdown();
+ }
+
+ public function reload()
+ {
+ if ($this->reloading) {
+ Logger::error('Ignoring reload request, reload is already in progress');
+ return;
+ }
+ $this->reloading = true;
+ Logger::info('Going gown for reload now');
+ $this->setState('reloading the main process');
+ $this->daemonDb->disconnect()->then(function () {
+ Process::restart();
+ });
+ }
+
+ protected function shutdown()
+ {
+ if ($this->shuttingDown) {
+ Logger::error('Ignoring shutdown request, shutdown is already in progress');
+ return;
+ }
+ Logger::info('Shutting down');
+ $this->shuttingDown = true;
+ $this->setState('shutting down');
+ $this->daemonDb->disconnect()->then(function () {
+ Logger::info('DB has been disconnected, shutdown finished');
+ $this->loop->stop();
+ });
+ }
+
+ protected function setState($state)
+ {
+ if ($this->processState) {
+ $this->processState->setState($state);
+ }
+
+ return $this;
+ }
+}
diff --git a/library/Director/Daemon/DaemonDb.php b/library/Director/Daemon/DaemonDb.php
new file mode 100644
index 0000000..7772b3a
--- /dev/null
+++ b/library/Director/Daemon/DaemonDb.php
@@ -0,0 +1,365 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use Exception;
+use gipfl\IcingaCliDaemon\DbResourceConfigWatch;
+use gipfl\IcingaCliDaemon\RetryUnless;
+use Icinga\Data\ConfigObject;
+use Icinga\Module\Director\Db;
+use Icinga\Module\Director\Db\Migrations;
+use ipl\Stdlib\EventEmitter;
+use React\EventLoop\LoopInterface;
+use React\Promise\Deferred;
+use RuntimeException;
+use SplObjectStorage;
+use function React\Promise\reject;
+use function React\Promise\resolve;
+
+class DaemonDb
+{
+ use EventEmitter;
+
+ /** @var LoopInterface */
+ private $loop;
+
+ /** @var Db */
+ protected $connection;
+
+ /** @var \Zend_Db_Adapter_Abstract */
+ protected $db;
+
+ /** @var DaemonProcessDetails */
+ protected $details;
+
+ /** @var DbBasedComponent[] */
+ protected $registeredComponents = [];
+
+ /** @var DbResourceConfigWatch|null */
+ protected $configWatch;
+
+ /** @var array|null */
+ protected $dbConfig;
+
+ /** @var RetryUnless|null */
+ protected $pendingReconnection;
+
+ /** @var Deferred|null */
+ protected $pendingDisconnect;
+
+ /** @var \React\EventLoop\TimerInterface */
+ protected $refreshTimer;
+
+ /** @var \React\EventLoop\TimerInterface */
+ protected $schemaCheckTimer;
+
+ /** @var int */
+ protected $startupSchemaVersion;
+
+ public function __construct(DaemonProcessDetails $details, $dbConfig = null)
+ {
+ $this->details = $details;
+ $this->dbConfig = $dbConfig;
+ }
+
+ public function register(DbBasedComponent $component)
+ {
+ $this->registeredComponents[] = $component;
+
+ return $this;
+ }
+
+ public function setConfigWatch(DbResourceConfigWatch $configWatch)
+ {
+ $this->configWatch = $configWatch;
+ $configWatch->notify(function ($config) {
+ $this->disconnect()->then(function () use ($config) {
+ return $this->onNewConfig($config);
+ });
+ });
+ if ($this->loop) {
+ $configWatch->run($this->loop);
+ }
+
+ return $this;
+ }
+
+ public function run(LoopInterface $loop)
+ {
+ $this->loop = $loop;
+ $this->connect();
+ $this->refreshTimer = $loop->addPeriodicTimer(3, function () {
+ $this->refreshMyState();
+ });
+ $this->schemaCheckTimer = $loop->addPeriodicTimer(15, function () {
+ $this->checkDbSchema();
+ });
+ if ($this->configWatch) {
+ $this->configWatch->run($this->loop);
+ }
+ }
+
+ protected function onNewConfig($config)
+ {
+ if ($config === null) {
+ if ($this->dbConfig === null) {
+ Logger::error('DB configuration is not valid');
+ } else {
+ Logger::error('DB configuration is no longer valid');
+ }
+ $this->emitStatus('no configuration');
+ $this->dbConfig = $config;
+
+ return resolve();
+ } else {
+ $this->emitStatus('configuration loaded');
+ $this->dbConfig = $config;
+
+ return $this->establishConnection($config);
+ }
+ }
+
+ protected function establishConnection($config)
+ {
+ if ($this->connection !== null) {
+ Logger::error('Trying to establish a connection while being connected');
+ return reject();
+ }
+ $callback = function () use ($config) {
+ $this->reallyEstablishConnection($config);
+ };
+ $onSuccess = function () {
+ $this->pendingReconnection = null;
+ $this->onConnected();
+ };
+ if ($this->pendingReconnection) {
+ $this->pendingReconnection->reset();
+ $this->pendingReconnection = null;
+ }
+ $this->emitStatus('connecting');
+
+ return $this->pendingReconnection = RetryUnless::succeeding($callback)
+ ->setInterval(0.2)
+ ->slowDownAfter(10, 10)
+ ->run($this->loop)
+ ->then($onSuccess)
+ ;
+ }
+
+ protected function reallyEstablishConnection($config)
+ {
+ $connection = new Db(new ConfigObject($config));
+ $connection->getDbAdapter()->getConnection();
+ $migrations = new Migrations($connection);
+ if (! $migrations->hasSchema()) {
+ $this->emitStatus('no schema', 'error');
+ throw new RuntimeException('DB has no schema');
+ }
+ $this->wipeOrphanedInstances($connection);
+ if ($this->hasAnyOtherActiveInstance($connection)) {
+ $this->emitStatus('locked by other instance', 'warning');
+ throw new RuntimeException('DB is locked by a running daemon instance, will retry');
+ }
+ $this->startupSchemaVersion = $migrations->getLastMigrationNumber();
+ $this->details->set('schema_version', $this->startupSchemaVersion);
+
+ $this->connection = $connection;
+ $this->db = $connection->getDbAdapter();
+ $this->loop->futureTick(function () {
+ $this->refreshMyState();
+ });
+
+ return $connection;
+ }
+
+ protected function checkDbSchema()
+ {
+ if ($this->connection === null) {
+ return;
+ }
+
+ if ($this->schemaIsOutdated()) {
+ $this->emit('schemaChange', [
+ $this->getStartupSchemaVersion(),
+ $this->getDbSchemaVersion()
+ ]);
+ }
+ }
+
+ protected function schemaIsOutdated()
+ {
+ return $this->getStartupSchemaVersion() < $this->getDbSchemaVersion();
+ }
+
+ protected function getStartupSchemaVersion()
+ {
+ return $this->startupSchemaVersion;
+ }
+
+ protected function getDbSchemaVersion()
+ {
+ if ($this->connection === null) {
+ throw new RuntimeException(
+ 'Cannot determine DB schema version without an established DB connection'
+ );
+ }
+ $migrations = new Migrations($this->connection);
+
+ return $migrations->getLastMigrationNumber();
+ }
+
+ protected function onConnected()
+ {
+ $this->emitStatus('connected');
+ Logger::info('Connected to the database');
+ foreach ($this->registeredComponents as $component) {
+ $component->initDb($this->connection);
+ }
+ }
+
+ /**
+ * @return \React\Promise\PromiseInterface
+ */
+ protected function reconnect()
+ {
+ return $this->disconnect()->then(function () {
+ return $this->connect();
+ }, function (Exception $e) {
+ Logger::error('Disconnect failed. This should never happen: ' . $e->getMessage());
+ exit(1);
+ });
+ }
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function connect()
+ {
+ if ($this->connection === null) {
+ if ($this->dbConfig) {
+ return $this->establishConnection($this->dbConfig);
+ }
+ }
+
+ return resolve();
+ }
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function disconnect()
+ {
+ if (! $this->connection) {
+ return resolve();
+ }
+ if ($this->pendingDisconnect) {
+ return $this->pendingDisconnect->promise();
+ }
+
+ $this->eventuallySetStopped();
+ $this->pendingDisconnect = new Deferred();
+ $pendingComponents = new SplObjectStorage();
+ foreach ($this->registeredComponents as $component) {
+ $pendingComponents->attach($component);
+ $resolve = function () use ($pendingComponents, $component) {
+ $pendingComponents->detach($component);
+ if ($pendingComponents->count() === 0) {
+ $this->pendingDisconnect->resolve();
+ }
+ };
+ // TODO: What should we do in case they don't?
+ $component->stopDb()->then($resolve);
+ }
+
+ try {
+ if ($this->db) {
+ $this->db->closeConnection();
+ }
+ } catch (Exception $e) {
+ Logger::error('Failed to disconnect: ' . $e->getMessage());
+ }
+
+ return $this->pendingDisconnect->promise()->then(function () {
+ $this->connection = null;
+ $this->db = null;
+ $this->pendingDisconnect = null;
+ });
+ }
+
+ protected function emitStatus($message, $level = 'info')
+ {
+ $this->emit('state', [$message, $level]);
+
+ return $this;
+ }
+
+ protected function hasAnyOtherActiveInstance(Db $connection)
+ {
+ $db = $connection->getDbAdapter();
+
+ return (int) $db->fetchOne(
+ $db->select()
+ ->from('director_daemon_info', 'COUNT(*)')
+ ->where('ts_stopped IS NULL')
+ ) > 0;
+ }
+
+ protected function wipeOrphanedInstances(Db $connection)
+ {
+ $db = $connection->getDbAdapter();
+ $db->delete('director_daemon_info', 'ts_stopped IS NOT NULL');
+ $db->delete('director_daemon_info', $db->quoteInto(
+ 'instance_uuid_hex = ?',
+ $this->details->getInstanceUuid()
+ ));
+ $count = $db->delete(
+ 'director_daemon_info',
+ 'ts_stopped IS NULL AND ts_last_update < ' . (
+ DaemonUtil::timestampWithMilliseconds() - (60 * 1000)
+ )
+ );
+ if ($count > 1) {
+ Logger::error("Removed $count orphaned daemon instance(s) from DB");
+ }
+ }
+
+ protected function refreshMyState()
+ {
+ if ($this->db === null || $this->pendingReconnection || $this->pendingDisconnect) {
+ return;
+ }
+ try {
+ $updated = $this->db->update(
+ 'director_daemon_info',
+ $this->details->getPropertiesToUpdate(),
+ $this->db->quoteInto('instance_uuid_hex = ?', $this->details->getInstanceUuid())
+ );
+
+ if (! $updated) {
+ $this->db->insert(
+ 'director_daemon_info',
+ $this->details->getPropertiesToInsert()
+ );
+ }
+ } catch (Exception $e) {
+ Logger::error($e->getMessage());
+ $this->reconnect();
+ }
+ }
+
+ protected function eventuallySetStopped()
+ {
+ try {
+ if (! $this->db) {
+ return;
+ }
+ $this->db->update(
+ 'director_daemon_info',
+ ['ts_stopped' => DaemonUtil::timestampWithMilliseconds()],
+ $this->db->quoteInto('instance_uuid_hex = ?', $this->details->getInstanceUuid())
+ );
+ } catch (Exception $e) {
+ Logger::error('Failed to update daemon info (setting ts_stopped): ' . $e->getMessage());
+ }
+ }
+}
diff --git a/library/Director/Daemon/DaemonProcessDetails.php b/library/Director/Daemon/DaemonProcessDetails.php
new file mode 100644
index 0000000..454e31f
--- /dev/null
+++ b/library/Director/Daemon/DaemonProcessDetails.php
@@ -0,0 +1,122 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use gipfl\LinuxHealth\Memory;
+use Icinga\Application\Platform;
+use React\ChildProcess\Process;
+use gipfl\Cli\Process as CliProcess;
+
+class DaemonProcessDetails
+{
+ /** @var string */
+ protected $instanceUuid;
+
+ /** @var \stdClass */
+ protected $info;
+
+ /** @var ProcessList[] */
+ protected $processLists = [];
+
+ protected $myArgs;
+
+ protected $myPid;
+
+ public function __construct($instanceUuid)
+ {
+ $this->instanceUuid = $instanceUuid;
+ $this->initialize();
+ }
+
+ public function getInstanceUuid()
+ {
+ return $this->instanceUuid;
+ }
+
+ public function getPropertiesToInsert()
+ {
+ return $this->getPropertiesToUpdate() + (array) $this->info;
+ }
+
+ public function getPropertiesToUpdate()
+ {
+ return [
+ 'ts_last_update' => DaemonUtil::timestampWithMilliseconds(),
+ 'ts_stopped' => null,
+ 'process_info' => \json_encode($this->collectProcessInfo()),
+ ];
+ }
+
+ public function set($property, $value)
+ {
+ if (\property_exists($this->info, $property)) {
+ $this->info->$property = $value;
+ } else {
+ throw new \InvalidArgumentException("Trying to set invalid daemon info property: $property");
+ }
+ }
+
+ public function registerProcessList(ProcessList $list)
+ {
+ $refresh = function (Process $process) {
+ $this->refreshProcessInfo();
+ };
+ $list->on('start', $refresh)->on('exit', $refresh);
+ $this->processLists[] = $list;
+
+ return $this;
+ }
+
+ protected function refreshProcessInfo()
+ {
+ $this->set('process_info', \json_encode($this->collectProcessInfo()));
+ }
+
+ protected function collectProcessInfo()
+ {
+ $info = (object) [$this->myPid => (object) [
+ 'command' => implode(' ', $this->myArgs),
+ 'running' => true,
+ 'memory' => Memory::getUsageForPid($this->myPid)
+ ]];
+
+ foreach ($this->processLists as $processList) {
+ foreach ($processList->getOverview() as $pid => $details) {
+ $info->$pid = $details;
+ }
+ }
+
+ return $info;
+ }
+
+ protected function initialize()
+ {
+ global $argv;
+ CliProcess::getInitialCwd();
+ $this->myArgs = $argv;
+ $this->myPid = \posix_getpid();
+ if (isset($_SERVER['_'])) {
+ $self = $_SERVER['_'];
+ } else {
+ // Process does a better job, but want the relative path (if such)
+ $self = $_SERVER['PHP_SELF'];
+ }
+ $this->info = (object) [
+ 'instance_uuid_hex' => $this->instanceUuid,
+ 'running_with_systemd' => 'n',
+ 'ts_started' => (int) ((float) $_SERVER['REQUEST_TIME_FLOAT'] * 1000),
+ 'ts_stopped' => null,
+ 'pid' => \posix_getpid(),
+ 'fqdn' => Platform::getFqdn(),
+ 'username' => Platform::getPhpUser(),
+ 'schema_version' => null,
+ 'php_version' => Platform::getPhpVersion(),
+ 'binary_path' => $self,
+ 'binary_realpath' => CliProcess::getBinaryPath(),
+ 'php_integer_size' => PHP_INT_SIZE,
+ 'php_binary_path' => PHP_BINARY,
+ 'php_binary_realpath' => \realpath(PHP_BINARY), // TODO: useless?
+ 'process_info' => null,
+ ];
+ }
+}
diff --git a/library/Director/Daemon/DaemonProcessState.php b/library/Director/Daemon/DaemonProcessState.php
new file mode 100644
index 0000000..6ae3cd2
--- /dev/null
+++ b/library/Director/Daemon/DaemonProcessState.php
@@ -0,0 +1,85 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use gipfl\Cli\Process;
+use gipfl\SystemD\NotifySystemD;
+
+class DaemonProcessState
+{
+ /** @var NotifySystemD|null */
+ protected $systemd;
+
+ protected $components = [];
+
+ protected $currentMessage;
+
+ protected $processTitle;
+
+ protected $state;
+
+ public function __construct($processTitle)
+ {
+ $this->processTitle = $processTitle;
+ $this->refreshMessage();
+ }
+
+ /**
+ * @param NotifySystemD|false $systemd
+ * @return $this
+ */
+ public function setSystemd($systemd)
+ {
+ if ($systemd) {
+ $this->systemd = $systemd;
+ } else {
+ $this->systemd = null;
+ }
+
+ return $this;
+ }
+
+ public function setState($message)
+ {
+ $this->state = $message;
+ $this->refreshMessage();
+
+ return $this;
+ }
+
+ public function setComponentState($name, $stateMessage)
+ {
+ if ($stateMessage === null) {
+ unset($this->components[$name]);
+ } else {
+ $this->components[$name] = $stateMessage;
+ }
+ $this->refreshMessage();
+ }
+
+ protected function refreshMessage()
+ {
+ $messageParts = [];
+ if ($this->state !== null && \strlen($this->state)) {
+ $messageParts[] = $this->state;
+ }
+ foreach ($this->components as $component => $message) {
+ $messageParts[] = "$component: $message";
+ }
+
+ $message = \implode(', ', $messageParts);
+
+ if ($message !== $this->currentMessage) {
+ $this->currentMessage = $message;
+ if (\strlen($message) === 0) {
+ Process::setTitle($this->processTitle);
+ } else {
+ Process::setTitle($this->processTitle . ": $message");
+ }
+
+ if ($this->systemd) {
+ $this->systemd->setStatus($message);
+ }
+ }
+ }
+}
diff --git a/library/Director/Daemon/DaemonUtil.php b/library/Director/Daemon/DaemonUtil.php
new file mode 100644
index 0000000..c978d11
--- /dev/null
+++ b/library/Director/Daemon/DaemonUtil.php
@@ -0,0 +1,16 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+class DaemonUtil
+{
+ /**
+ * @return int
+ */
+ public static function timestampWithMilliseconds()
+ {
+ $mTime = explode(' ', microtime());
+
+ return (int) round($mTime[0] * 1000) + (int) $mTime[1] * 1000;
+ }
+}
diff --git a/library/Director/Daemon/DbBasedComponent.php b/library/Director/Daemon/DbBasedComponent.php
new file mode 100644
index 0000000..c176c14
--- /dev/null
+++ b/library/Director/Daemon/DbBasedComponent.php
@@ -0,0 +1,19 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use Icinga\Module\Director\Db;
+
+interface DbBasedComponent
+{
+ /**
+ * @param Db $db
+ * @return \React\Promise\ExtendedPromiseInterface;
+ */
+ public function initDb(Db $db);
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface;
+ */
+ public function stopDb();
+}
diff --git a/library/Director/Daemon/DeploymentChecker.php b/library/Director/Daemon/DeploymentChecker.php
new file mode 100644
index 0000000..82d6d05
--- /dev/null
+++ b/library/Director/Daemon/DeploymentChecker.php
@@ -0,0 +1,51 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use Exception;
+use Icinga\Module\Director\Db;
+use Icinga\Module\Director\Objects\DirectorDeploymentLog;
+use React\EventLoop\LoopInterface;
+use function React\Promise\resolve;
+
+class DeploymentChecker implements DbBasedComponent
+{
+ /** @var Db */
+ protected $connection;
+
+ public function __construct(LoopInterface $loop)
+ {
+ $loop->addPeriodicTimer(5, function () {
+ if ($db = $this->connection) {
+ try {
+ if (DirectorDeploymentLog::hasUncollected($db)) {
+ $db->getDeploymentEndpoint()->api()->collectLogFiles($db);
+ }
+ } catch (Exception $e) {
+ // Ignore eventual issues while talking to Icinga
+ }
+ }
+ });
+ }
+
+ /**
+ * @param Db $connection
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function initDb(Db $connection)
+ {
+ $this->connection = $connection;
+
+ return resolve();
+ }
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function stopDb()
+ {
+ $this->connection = null;
+
+ return resolve();
+ }
+}
diff --git a/library/Director/Daemon/JobRunner.php b/library/Director/Daemon/JobRunner.php
new file mode 100644
index 0000000..78d7747
--- /dev/null
+++ b/library/Director/Daemon/JobRunner.php
@@ -0,0 +1,234 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use gipfl\IcingaCliDaemon\FinishedProcessState;
+use gipfl\IcingaCliDaemon\IcingaCliRpc;
+use Icinga\Application\Logger;
+use Icinga\Module\Director\Db;
+use Icinga\Module\Director\Objects\DirectorJob;
+use React\ChildProcess\Process;
+use React\EventLoop\LoopInterface;
+use React\Promise\Promise;
+use function React\Promise\resolve;
+
+class JobRunner implements DbBasedComponent
+{
+ /** @var Db */
+ protected $db;
+
+ /** @var LoopInterface */
+ protected $loop;
+
+ /** @var int[] */
+ protected $scheduledIds = [];
+
+ /** @var Promise[] */
+ protected $runningIds = [];
+
+ protected $checkInterval = 10;
+
+ /** @var \React\EventLoop\TimerInterface */
+ protected $timer;
+
+ /** @var LogProxy */
+ protected $logProxy;
+
+ /** @var ProcessList */
+ protected $running;
+
+ public function __construct(LoopInterface $loop)
+ {
+ $this->loop = $loop;
+ $this->running = new ProcessList($loop);
+ }
+
+ public function forwardLog(LogProxy $logProxy)
+ {
+ $this->logProxy = $logProxy;
+
+ return $this;
+ }
+
+ /**
+ * @param Db $db
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function initDb(Db $db)
+ {
+ $this->db = $db;
+ $check = function () {
+ try {
+ $this->checkForPendingJobs();
+ $this->runNextPendingJob();
+ } catch (\Exception $e) {
+ Logger::error($e->getMessage());
+ }
+ };
+ if ($this->timer === null) {
+ $this->loop->futureTick($check);
+ }
+ if ($this->timer !== null) {
+ Logger::info('Cancelling former timer');
+ $this->loop->cancelTimer($this->timer);
+ }
+ $this->timer = $this->loop->addPeriodicTimer($this->checkInterval, $check);
+
+ return resolve();
+ }
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function stopDb()
+ {
+ $this->scheduledIds = [];
+ if ($this->timer !== null) {
+ $this->loop->cancelTimer($this->timer);
+ $this->timer = null;
+ }
+ $allFinished = $this->running->killOrTerminate();
+ foreach ($this->runningIds as $id => $promise) {
+ $promise->cancel();
+ }
+ $this->runningIds = [];
+
+ return $allFinished;
+ }
+
+ protected function hasBeenDisabled()
+ {
+ $db = $this->db->getDbAdapter();
+ return $db->fetchOne(
+ $db->select()
+ ->from('director_setting', 'setting_value')
+ ->where('setting_name = ?', 'disable_all_jobs')
+ ) === 'y';
+ }
+
+ protected function checkForPendingJobs()
+ {
+ if ($this->hasBeenDisabled()) {
+ $this->scheduledIds = [];
+ // TODO: disable jobs currently going on?
+ return;
+ }
+ if (empty($this->scheduledIds)) {
+ $this->loadNextIds();
+ }
+ }
+
+ protected function runNextPendingJob()
+ {
+ if ($this->timer === null) {
+ // Reset happened. Stopping?
+ return;
+ }
+
+ if (! empty($this->runningIds)) {
+ return;
+ }
+ while (! empty($this->scheduledIds)) {
+ if ($this->runNextJob()) {
+ break;
+ }
+ }
+ }
+
+ protected function loadNextIds()
+ {
+ $db = $this->db->getDbAdapter();
+
+ foreach ($db->fetchCol(
+ $db->select()->from('director_job', 'id')->where('disabled = ?', 'n')
+ ) as $id) {
+ $this->scheduledIds[] = (int) $id;
+ };
+ }
+
+ /**
+ * @return bool
+ */
+ protected function runNextJob()
+ {
+ $id = \array_shift($this->scheduledIds);
+ try {
+ $job = DirectorJob::loadWithAutoIncId((int) $id, $this->db);
+ if ($job->shouldRun()) {
+ $this->runJob($job);
+ return true;
+ }
+ } catch (\Exception $e) {
+ Logger::error('Trying to schedule Job failed: ' . $e->getMessage());
+ }
+
+ return false;
+ }
+
+ /**
+ * @param DirectorJob $job
+ */
+ protected function runJob(DirectorJob $job)
+ {
+ $id = $job->get('id');
+ $jobName = $job->get('job_name');
+ Logger::debug("Job ($jobName) starting");
+ $arguments = [
+ 'director',
+ 'job',
+ 'run',
+ '--id',
+ $job->get('id'),
+ '--debug',
+ '--rpc'
+ ];
+ $cli = new IcingaCliRpc();
+ $cli->setArguments($arguments);
+ $cli->on('start', function (Process $process) {
+ $this->onProcessStarted($process);
+ });
+
+ // Happens on protocol (Netstring) errors or similar:
+ $cli->on('error', function (\Exception $e) {
+ Logger::error('UNEXPECTED: ' . rtrim($e->getMessage()));
+ });
+ if ($this->logProxy) {
+ $logger = clone($this->logProxy);
+ $logger->setPrefix("Job ($jobName): ");
+ $cli->rpc()->setHandler($logger, 'logger');
+ }
+ unset($this->scheduledIds[$id]);
+ $this->runningIds[$id] = $cli->run($this->loop)->then(function () use ($id, $jobName) {
+ Logger::debug("Job ($jobName) finished");
+ })->otherwise(function (\Exception $e) use ($id, $jobName) {
+ Logger::error("Job ($jobName) failed: " . $e->getMessage());
+ })->otherwise(function (FinishedProcessState $state) use ($jobName) {
+ Logger::error("Job ($jobName) failed: " . $state->getReason());
+ })->always(function () use ($id) {
+ unset($this->runningIds[$id]);
+ $this->loop->futureTick(function () {
+ $this->runNextPendingJob();
+ });
+ });
+ }
+
+ /**
+ * @return ProcessList
+ */
+ public function getProcessList()
+ {
+ return $this->running;
+ }
+
+ protected function onProcessStarted(Process $process)
+ {
+ $this->running->attach($process);
+ }
+
+ public function __destruct()
+ {
+ $this->stopDb();
+ $this->logProxy = null;
+ $this->loop = null;
+ }
+}
diff --git a/library/Director/Daemon/JsonRpcLogWriter.php b/library/Director/Daemon/JsonRpcLogWriter.php
new file mode 100644
index 0000000..edfa23e
--- /dev/null
+++ b/library/Director/Daemon/JsonRpcLogWriter.php
@@ -0,0 +1,37 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use gipfl\Protocol\JsonRpc\Connection;
+use gipfl\Protocol\JsonRpc\Notification;
+use Icinga\Application\Logger\LogWriter;
+use Icinga\Data\ConfigObject;
+
+class JsonRpcLogWriter extends LogWriter
+{
+ protected $connection;
+
+ protected static $severityMap = [
+ Logger::DEBUG => 'debug',
+ Logger::INFO => 'info',
+ Logger::WARNING => 'warning',
+ Logger::ERROR => 'error',
+ ];
+
+ public function __construct(Connection $connection)
+ {
+ parent::__construct(new ConfigObject([]));
+ $this->connection = $connection;
+ }
+
+ public function log($severity, $message)
+ {
+ $message = \iconv('UTF-8', 'UTF-8//IGNORE', $message);
+ $this->connection->sendNotification(
+ Notification::create('logger.log', [
+ static::$severityMap[$severity],
+ $message
+ ])
+ );
+ }
+}
diff --git a/library/Director/Daemon/LogProxy.php b/library/Director/Daemon/LogProxy.php
new file mode 100644
index 0000000..0b58ae8
--- /dev/null
+++ b/library/Director/Daemon/LogProxy.php
@@ -0,0 +1,76 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use Exception;
+use Icinga\Module\Director\Db;
+use function React\Promise\resolve;
+
+class LogProxy implements DbBasedComponent
+{
+ protected $connection;
+
+ protected $db;
+
+ protected $server;
+
+ protected $instanceUuid;
+
+ protected $prefix = '';
+
+ public function __construct($instanceUuid)
+ {
+ $this->instanceUuid = $instanceUuid;
+ }
+
+ public function setPrefix($prefix)
+ {
+ $this->prefix = $prefix;
+
+ return $this;
+ }
+
+ /**
+ * @param Db $connection
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function initDb(Db $connection)
+ {
+ $this->connection = $connection;
+ $this->db = $connection->getDbAdapter();
+
+ return resolve();
+ }
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function stopDb()
+ {
+ $this->connection = null;
+ $this->db = null;
+
+ return resolve();
+ }
+
+ public function log($severity, $message)
+ {
+ Logger::$severity($this->prefix . $message);
+ /*
+ // Not yet
+ try {
+ if ($this->db) {
+ $this->db->insert('director_daemonlog', [
+ // environment/installation/db?
+ 'instance_uuid' => $this->instanceUuid,
+ 'ts_create' => DaemonUtil::timestampWithMilliseconds(),
+ 'level' => $severity,
+ 'message' => $message,
+ ]);
+ }
+ } catch (Exception $e) {
+ Logger::error($e->getMessage());
+ }
+ */
+ }
+}
diff --git a/library/Director/Daemon/Logger.php b/library/Director/Daemon/Logger.php
new file mode 100644
index 0000000..27fcbf5
--- /dev/null
+++ b/library/Director/Daemon/Logger.php
@@ -0,0 +1,24 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use Icinga\Application\Logger as IcingaLogger;
+use Icinga\Application\Logger\LogWriter;
+use Icinga\Exception\ConfigurationError;
+
+class Logger extends IcingaLogger
+{
+ public static function replaceRunningInstance(LogWriter $writer, $level = null)
+ {
+ try {
+ $instance = static::$instance;
+ if ($level !== null) {
+ $instance->setLevel($level);
+ }
+
+ $instance->writer = $writer;
+ } catch (ConfigurationError $e) {
+ self::$instance->error($e->getMessage());
+ }
+ }
+}
diff --git a/library/Director/Daemon/ProcessList.php b/library/Director/Daemon/ProcessList.php
new file mode 100644
index 0000000..85b9aac
--- /dev/null
+++ b/library/Director/Daemon/ProcessList.php
@@ -0,0 +1,125 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use gipfl\LinuxHealth\Memory;
+use Icinga\Application\Logger;
+use ipl\Stdlib\EventEmitter;
+use React\ChildProcess\Process;
+use React\EventLoop\LoopInterface;
+use React\Promise\Deferred;
+use function React\Promise\resolve;
+
+class ProcessList
+{
+ use EventEmitter;
+
+ /** @var LoopInterface */
+ protected $loop;
+
+ /** @var \SplObjectStorage */
+ protected $processes;
+
+ /**
+ * ProcessList constructor.
+ * @param LoopInterface $loop
+ * @param Process[] $processes
+ */
+ public function __construct(LoopInterface $loop, array $processes = [])
+ {
+ $this->loop = $loop;
+ $this->processes = new \SplObjectStorage();
+ foreach ($processes as $process) {
+ $this->attach($process);
+ }
+ }
+
+ public function attach(Process $process)
+ {
+ $this->processes->attach($process);
+ $this->emit('start', [$process]);
+ $process->on('exit', function () use ($process) {
+ $this->detach($process);
+ $this->emit('exit', [$process]);
+ });
+
+ return $this;
+ }
+
+ public function detach(Process $process)
+ {
+ $this->processes->detach($process);
+
+ return $this;
+ }
+
+ /**
+ * @param int $timeout
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function killOrTerminate($timeout = 5)
+ {
+ if ($this->processes->count() === 0) {
+ return resolve();
+ }
+ $deferred = new Deferred();
+ $killTimer = $this->loop->addTimer($timeout, function () use ($deferred) {
+ /** @var Process $process */
+ foreach ($this->processes as $process) {
+ $pid = $process->getPid();
+ Logger::error("Process $pid is still running, sending SIGKILL");
+ $process->terminate(SIGKILL);
+ }
+
+ // Let's a little bit of delay after KILLing
+ $this->loop->addTimer(0.1, function () use ($deferred) {
+ $deferred->resolve();
+ });
+ });
+
+ $timer = $this->loop->addPeriodicTimer($timeout / 20, function () use (
+ $deferred,
+ &$timer,
+ $killTimer
+ ) {
+ $stopped = [];
+ /** @var Process $process */
+ foreach ($this->processes as $process) {
+ if (! $process->isRunning()) {
+ $stopped[] = $process;
+ }
+ }
+ foreach ($stopped as $process) {
+ $this->processes->detach($process);
+ }
+ if ($this->processes->count() === 0) {
+ $this->loop->cancelTimer($timer);
+ $this->loop->cancelTimer($killTimer);
+ $deferred->resolve();
+ }
+ });
+ /** @var Process $process */
+ foreach ($this->processes as $process) {
+ $process->terminate(SIGTERM);
+ }
+
+ return $deferred->promise();
+ }
+
+ public function getOverview()
+ {
+ $info = [];
+
+ /** @var Process $process */
+ foreach ($this->processes as $process) {
+ $pid = $process->getPid();
+ $info[$pid] = (object) [
+ 'command' => preg_replace('/^exec /', '', $process->getCommand()),
+ 'running' => $process->isRunning(),
+ 'memory' => Memory::getUsageForPid($pid)
+ ];
+ }
+
+ return $info;
+ }
+}
diff --git a/library/Director/Daemon/RunningDaemonInfo.php b/library/Director/Daemon/RunningDaemonInfo.php
new file mode 100644
index 0000000..adb3549
--- /dev/null
+++ b/library/Director/Daemon/RunningDaemonInfo.php
@@ -0,0 +1,154 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+class RunningDaemonInfo
+{
+ /** @var object */
+ protected $info;
+
+ public function __construct($info = null)
+ {
+ $this->setInfo($info);
+ }
+
+ public function setInfo($info)
+ {
+ if (empty($info)) {
+ $this->info = $this->createEmptyInfo();
+ } else {
+ $this->info = $info;
+ }
+
+ return $this;
+ }
+
+ public function isRunning()
+ {
+ return $this->getPid() !== null && ! $this->isOutdated();
+ }
+
+ public function getPid()
+ {
+ return (int) $this->info->pid;
+ }
+
+ public function getUsername()
+ {
+ return $this->info->username;
+ }
+
+ public function getFqdn()
+ {
+ return $this->info->fqdn;
+ }
+
+ public function getLastUpdate()
+ {
+ return $this->info->ts_last_update;
+ }
+
+ public function getLastModification()
+ {
+ return $this->info->ts_last_modification;
+ }
+
+ public function getPhpVersion()
+ {
+ return $this->info->php_version;
+ }
+
+ public function hasBeenStopped()
+ {
+ return $this->getTimestampStopped() !== null;
+ }
+
+ public function getTimestampStarted()
+ {
+ return $this->info->ts_started;
+ }
+
+ public function getTimestampStopped()
+ {
+ return $this->info->ts_stopped;
+ }
+
+ public function isOutdated($seconds = 5)
+ {
+ return (
+ DaemonUtil::timestampWithMilliseconds() - $this->info->ts_last_update
+ ) > $seconds * 1000;
+ }
+
+ public function isRunningWithSystemd()
+ {
+ return $this->info->running_with_systemd === 'y';
+ }
+
+ public function getBinaryPath()
+ {
+ return $this->info->binary_path;
+ }
+
+ public function getBinaryRealpath()
+ {
+ return $this->info->binary_realpath;
+ }
+
+ public function binaryRealpathDiffers()
+ {
+ return $this->getBinaryPath() !== $this->getBinaryRealpath();
+ }
+
+ public function getPhpBinaryPath()
+ {
+ return $this->info->php_binary_path;
+ }
+
+ public function getPhpBinaryRealpath()
+ {
+ return $this->info->php_binary_realpath;
+ }
+
+ public function phpBinaryRealpathDiffers()
+ {
+ return $this->getPhpBinaryPath() !== $this->getPhpBinaryRealpath();
+ }
+
+ public function getPhpIntegerSize()
+ {
+ return (int) $this->info->php_integer_size;
+ }
+
+ public function has64bitIntegers()
+ {
+ return $this->getPhpIntegerSize() === 8;
+ }
+
+ /*
+ // TODO: not yet
+ public function isMaster()
+ {
+ return $this->info->is_master === 'y';
+ }
+
+ public function isStandby()
+ {
+ return ! $this->isMaster();
+ }
+ */
+
+ protected function createEmptyInfo()
+ {
+ return (object) [
+ 'pid' => null,
+ 'fqdn' => null,
+ 'username' => null,
+ 'php_version' => null,
+ // 'is_master' => null,
+ // Only if not running. Does this make any sense in 'empty info'?
+ 'ts_last_update' => null,
+ 'ts_last_modification' => null
+ ];
+ }
+}
diff --git a/library/Director/Daemon/SystemdLogWriter.php b/library/Director/Daemon/SystemdLogWriter.php
new file mode 100644
index 0000000..8b64442
--- /dev/null
+++ b/library/Director/Daemon/SystemdLogWriter.php
@@ -0,0 +1,27 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use Icinga\Application\Logger\LogWriter;
+use Icinga\Data\ConfigObject;
+
+class SystemdLogWriter extends LogWriter
+{
+ protected static $severityMap = [
+ Logger::DEBUG => 7,
+ Logger::INFO => 6,
+ Logger::WARNING => 4,
+ Logger::ERROR => 3,
+ ];
+
+ public function __construct()
+ {
+ parent::__construct(new ConfigObject([]));
+ }
+
+ public function log($severity, $message)
+ {
+ $severity = self::$severityMap[$severity];
+ echo "<$severity>$message\n";
+ }
+}