diff options
Diffstat (limited to 'library/Director/Daemon')
-rw-r--r-- | library/Director/Daemon/BackgroundDaemon.php | 235 | ||||
-rw-r--r-- | library/Director/Daemon/DaemonDb.php | 365 | ||||
-rw-r--r-- | library/Director/Daemon/DaemonProcessDetails.php | 122 | ||||
-rw-r--r-- | library/Director/Daemon/DaemonProcessState.php | 85 | ||||
-rw-r--r-- | library/Director/Daemon/DaemonUtil.php | 16 | ||||
-rw-r--r-- | library/Director/Daemon/DbBasedComponent.php | 19 | ||||
-rw-r--r-- | library/Director/Daemon/DeploymentChecker.php | 51 | ||||
-rw-r--r-- | library/Director/Daemon/JobRunner.php | 234 | ||||
-rw-r--r-- | library/Director/Daemon/JsonRpcLogWriter.php | 37 | ||||
-rw-r--r-- | library/Director/Daemon/LogProxy.php | 76 | ||||
-rw-r--r-- | library/Director/Daemon/Logger.php | 24 | ||||
-rw-r--r-- | library/Director/Daemon/ProcessList.php | 125 | ||||
-rw-r--r-- | library/Director/Daemon/RunningDaemonInfo.php | 154 | ||||
-rw-r--r-- | library/Director/Daemon/SystemdLogWriter.php | 27 |
14 files changed, 1570 insertions, 0 deletions
diff --git a/library/Director/Daemon/BackgroundDaemon.php b/library/Director/Daemon/BackgroundDaemon.php new file mode 100644 index 0000000..34cc28b --- /dev/null +++ b/library/Director/Daemon/BackgroundDaemon.php @@ -0,0 +1,235 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use Exception; +use gipfl\Cli\Process; +use gipfl\IcingaCliDaemon\DbResourceConfigWatch; +use gipfl\SystemD\NotifySystemD; +use React\EventLoop\Factory as Loop; +use React\EventLoop\LoopInterface; +use Ramsey\Uuid\Uuid; + +class BackgroundDaemon +{ + /** @var LoopInterface */ + private $loop; + + /** @var NotifySystemD|boolean */ + protected $systemd; + + /** @var JobRunner */ + protected $jobRunner; + + /** @var string|null */ + protected $dbResourceName; + + /** @var DaemonDb */ + protected $daemonDb; + + /** @var DaemonProcessState */ + protected $processState; + + /** @var DaemonProcessDetails */ + protected $processDetails; + + /** @var LogProxy */ + protected $logProxy; + + /** @var bool */ + protected $reloading = false; + + /** @var bool */ + protected $shuttingDown = false; + + public function run(LoopInterface $loop = null) + { + if ($ownLoop = ($loop === null)) { + $loop = Loop::create(); + } + $this->loop = $loop; + $this->loop->futureTick(function () { + $this->initialize(); + }); + if ($ownLoop) { + $loop->run(); + } + } + + public function setDbResourceName($name) + { + $this->dbResourceName = $name; + + return $this; + } + + protected function initialize() + { + $this->registerSignalHandlers($this->loop); + $this->processState = new DaemonProcessState('icinga::director'); + $this->jobRunner = new JobRunner($this->loop); + $this->systemd = $this->eventuallyInitializeSystemd(); + $this->processState->setSystemd($this->systemd); + if ($this->systemd) { + $this->systemd->setReady(); + } + $this->setState('ready'); + $this->processDetails = $this + ->initializeProcessDetails($this->systemd) + ->registerProcessList($this->jobRunner->getProcessList()); + $this->logProxy = new LogProxy($this->processDetails->getInstanceUuid()); + $this->jobRunner->forwardLog($this->logProxy); + $this->daemonDb = $this->initializeDb( + $this->processDetails, + $this->processState, + $this->dbResourceName + ); + $this->daemonDb + ->register($this->jobRunner) + ->register($this->logProxy) + ->register(new DeploymentChecker($this->loop)) + ->run($this->loop); + $this->setState('running'); + } + + /** + * @param NotifySystemD|false $systemd + * @return DaemonProcessDetails + */ + protected function initializeProcessDetails($systemd) + { + if ($systemd && $systemd->hasInvocationId()) { + $uuid = $systemd->getInvocationId(); + } else { + try { + $uuid = \bin2hex(Uuid::uuid4()->getBytes()); + } catch (Exception $e) { + $uuid = 'deadc0de' . \substr(\md5(\getmypid()), 0, 24); + } + } + $processDetails = new DaemonProcessDetails($uuid); + if ($systemd) { + $processDetails->set('running_with_systemd', 'y'); + } + + return $processDetails; + } + + protected function eventuallyInitializeSystemd() + { + $systemd = NotifySystemD::ifRequired($this->loop); + if ($systemd) { + Logger::replaceRunningInstance(new SystemdLogWriter()); + Logger::info(sprintf( + "Started by systemd, notifying watchdog every %0.2Gs via %s", + $systemd->getWatchdogInterval(), + $systemd->getSocketPath() + )); + } else { + Logger::debug('Running without systemd'); + } + + return $systemd; + } + + /** + * @return DaemonProcessDetails + */ + public function getProcessDetails() + { + return $this->processDetails; + } + + /** + * @return DaemonProcessState + */ + public function getProcessState() + { + return $this->processState; + } + + protected function initializeDb( + DaemonProcessDetails $processDetails, + DaemonProcessState $processState, + $dbResourceName = null + ) { + $db = new DaemonDb($processDetails); + $db->on('state', function ($state, $level = null) use ($processState) { + // TODO: level is sent but not used + $processState->setComponentState('db', $state); + }); + $db->on('schemaChange', function ($startupSchema, $dbSchema) { + Logger::info(sprintf( + "DB schema version changed. Started with %d, DB has %d. Restarting.", + $startupSchema, + $dbSchema + )); + $this->reload(); + }); + + $db->setConfigWatch( + $dbResourceName + ? DbResourceConfigWatch::name($dbResourceName) + : DbResourceConfigWatch::module('director') + ); + + return $db; + } + + protected function registerSignalHandlers(LoopInterface $loop) + { + $func = function ($signal) use (&$func) { + $this->shutdownWithSignal($signal, $func); + }; + $funcReload = function () { + $this->reload(); + }; + $loop->addSignal(SIGHUP, $funcReload); + $loop->addSignal(SIGINT, $func); + $loop->addSignal(SIGTERM, $func); + } + + protected function shutdownWithSignal($signal, &$func) + { + $this->loop->removeSignal($signal, $func); + $this->shutdown(); + } + + public function reload() + { + if ($this->reloading) { + Logger::error('Ignoring reload request, reload is already in progress'); + return; + } + $this->reloading = true; + Logger::info('Going gown for reload now'); + $this->setState('reloading the main process'); + $this->daemonDb->disconnect()->then(function () { + Process::restart(); + }); + } + + protected function shutdown() + { + if ($this->shuttingDown) { + Logger::error('Ignoring shutdown request, shutdown is already in progress'); + return; + } + Logger::info('Shutting down'); + $this->shuttingDown = true; + $this->setState('shutting down'); + $this->daemonDb->disconnect()->then(function () { + Logger::info('DB has been disconnected, shutdown finished'); + $this->loop->stop(); + }); + } + + protected function setState($state) + { + if ($this->processState) { + $this->processState->setState($state); + } + + return $this; + } +} diff --git a/library/Director/Daemon/DaemonDb.php b/library/Director/Daemon/DaemonDb.php new file mode 100644 index 0000000..7772b3a --- /dev/null +++ b/library/Director/Daemon/DaemonDb.php @@ -0,0 +1,365 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use Exception; +use gipfl\IcingaCliDaemon\DbResourceConfigWatch; +use gipfl\IcingaCliDaemon\RetryUnless; +use Icinga\Data\ConfigObject; +use Icinga\Module\Director\Db; +use Icinga\Module\Director\Db\Migrations; +use ipl\Stdlib\EventEmitter; +use React\EventLoop\LoopInterface; +use React\Promise\Deferred; +use RuntimeException; +use SplObjectStorage; +use function React\Promise\reject; +use function React\Promise\resolve; + +class DaemonDb +{ + use EventEmitter; + + /** @var LoopInterface */ + private $loop; + + /** @var Db */ + protected $connection; + + /** @var \Zend_Db_Adapter_Abstract */ + protected $db; + + /** @var DaemonProcessDetails */ + protected $details; + + /** @var DbBasedComponent[] */ + protected $registeredComponents = []; + + /** @var DbResourceConfigWatch|null */ + protected $configWatch; + + /** @var array|null */ + protected $dbConfig; + + /** @var RetryUnless|null */ + protected $pendingReconnection; + + /** @var Deferred|null */ + protected $pendingDisconnect; + + /** @var \React\EventLoop\TimerInterface */ + protected $refreshTimer; + + /** @var \React\EventLoop\TimerInterface */ + protected $schemaCheckTimer; + + /** @var int */ + protected $startupSchemaVersion; + + public function __construct(DaemonProcessDetails $details, $dbConfig = null) + { + $this->details = $details; + $this->dbConfig = $dbConfig; + } + + public function register(DbBasedComponent $component) + { + $this->registeredComponents[] = $component; + + return $this; + } + + public function setConfigWatch(DbResourceConfigWatch $configWatch) + { + $this->configWatch = $configWatch; + $configWatch->notify(function ($config) { + $this->disconnect()->then(function () use ($config) { + return $this->onNewConfig($config); + }); + }); + if ($this->loop) { + $configWatch->run($this->loop); + } + + return $this; + } + + public function run(LoopInterface $loop) + { + $this->loop = $loop; + $this->connect(); + $this->refreshTimer = $loop->addPeriodicTimer(3, function () { + $this->refreshMyState(); + }); + $this->schemaCheckTimer = $loop->addPeriodicTimer(15, function () { + $this->checkDbSchema(); + }); + if ($this->configWatch) { + $this->configWatch->run($this->loop); + } + } + + protected function onNewConfig($config) + { + if ($config === null) { + if ($this->dbConfig === null) { + Logger::error('DB configuration is not valid'); + } else { + Logger::error('DB configuration is no longer valid'); + } + $this->emitStatus('no configuration'); + $this->dbConfig = $config; + + return resolve(); + } else { + $this->emitStatus('configuration loaded'); + $this->dbConfig = $config; + + return $this->establishConnection($config); + } + } + + protected function establishConnection($config) + { + if ($this->connection !== null) { + Logger::error('Trying to establish a connection while being connected'); + return reject(); + } + $callback = function () use ($config) { + $this->reallyEstablishConnection($config); + }; + $onSuccess = function () { + $this->pendingReconnection = null; + $this->onConnected(); + }; + if ($this->pendingReconnection) { + $this->pendingReconnection->reset(); + $this->pendingReconnection = null; + } + $this->emitStatus('connecting'); + + return $this->pendingReconnection = RetryUnless::succeeding($callback) + ->setInterval(0.2) + ->slowDownAfter(10, 10) + ->run($this->loop) + ->then($onSuccess) + ; + } + + protected function reallyEstablishConnection($config) + { + $connection = new Db(new ConfigObject($config)); + $connection->getDbAdapter()->getConnection(); + $migrations = new Migrations($connection); + if (! $migrations->hasSchema()) { + $this->emitStatus('no schema', 'error'); + throw new RuntimeException('DB has no schema'); + } + $this->wipeOrphanedInstances($connection); + if ($this->hasAnyOtherActiveInstance($connection)) { + $this->emitStatus('locked by other instance', 'warning'); + throw new RuntimeException('DB is locked by a running daemon instance, will retry'); + } + $this->startupSchemaVersion = $migrations->getLastMigrationNumber(); + $this->details->set('schema_version', $this->startupSchemaVersion); + + $this->connection = $connection; + $this->db = $connection->getDbAdapter(); + $this->loop->futureTick(function () { + $this->refreshMyState(); + }); + + return $connection; + } + + protected function checkDbSchema() + { + if ($this->connection === null) { + return; + } + + if ($this->schemaIsOutdated()) { + $this->emit('schemaChange', [ + $this->getStartupSchemaVersion(), + $this->getDbSchemaVersion() + ]); + } + } + + protected function schemaIsOutdated() + { + return $this->getStartupSchemaVersion() < $this->getDbSchemaVersion(); + } + + protected function getStartupSchemaVersion() + { + return $this->startupSchemaVersion; + } + + protected function getDbSchemaVersion() + { + if ($this->connection === null) { + throw new RuntimeException( + 'Cannot determine DB schema version without an established DB connection' + ); + } + $migrations = new Migrations($this->connection); + + return $migrations->getLastMigrationNumber(); + } + + protected function onConnected() + { + $this->emitStatus('connected'); + Logger::info('Connected to the database'); + foreach ($this->registeredComponents as $component) { + $component->initDb($this->connection); + } + } + + /** + * @return \React\Promise\PromiseInterface + */ + protected function reconnect() + { + return $this->disconnect()->then(function () { + return $this->connect(); + }, function (Exception $e) { + Logger::error('Disconnect failed. This should never happen: ' . $e->getMessage()); + exit(1); + }); + } + + /** + * @return \React\Promise\ExtendedPromiseInterface + */ + public function connect() + { + if ($this->connection === null) { + if ($this->dbConfig) { + return $this->establishConnection($this->dbConfig); + } + } + + return resolve(); + } + + /** + * @return \React\Promise\ExtendedPromiseInterface + */ + public function disconnect() + { + if (! $this->connection) { + return resolve(); + } + if ($this->pendingDisconnect) { + return $this->pendingDisconnect->promise(); + } + + $this->eventuallySetStopped(); + $this->pendingDisconnect = new Deferred(); + $pendingComponents = new SplObjectStorage(); + foreach ($this->registeredComponents as $component) { + $pendingComponents->attach($component); + $resolve = function () use ($pendingComponents, $component) { + $pendingComponents->detach($component); + if ($pendingComponents->count() === 0) { + $this->pendingDisconnect->resolve(); + } + }; + // TODO: What should we do in case they don't? + $component->stopDb()->then($resolve); + } + + try { + if ($this->db) { + $this->db->closeConnection(); + } + } catch (Exception $e) { + Logger::error('Failed to disconnect: ' . $e->getMessage()); + } + + return $this->pendingDisconnect->promise()->then(function () { + $this->connection = null; + $this->db = null; + $this->pendingDisconnect = null; + }); + } + + protected function emitStatus($message, $level = 'info') + { + $this->emit('state', [$message, $level]); + + return $this; + } + + protected function hasAnyOtherActiveInstance(Db $connection) + { + $db = $connection->getDbAdapter(); + + return (int) $db->fetchOne( + $db->select() + ->from('director_daemon_info', 'COUNT(*)') + ->where('ts_stopped IS NULL') + ) > 0; + } + + protected function wipeOrphanedInstances(Db $connection) + { + $db = $connection->getDbAdapter(); + $db->delete('director_daemon_info', 'ts_stopped IS NOT NULL'); + $db->delete('director_daemon_info', $db->quoteInto( + 'instance_uuid_hex = ?', + $this->details->getInstanceUuid() + )); + $count = $db->delete( + 'director_daemon_info', + 'ts_stopped IS NULL AND ts_last_update < ' . ( + DaemonUtil::timestampWithMilliseconds() - (60 * 1000) + ) + ); + if ($count > 1) { + Logger::error("Removed $count orphaned daemon instance(s) from DB"); + } + } + + protected function refreshMyState() + { + if ($this->db === null || $this->pendingReconnection || $this->pendingDisconnect) { + return; + } + try { + $updated = $this->db->update( + 'director_daemon_info', + $this->details->getPropertiesToUpdate(), + $this->db->quoteInto('instance_uuid_hex = ?', $this->details->getInstanceUuid()) + ); + + if (! $updated) { + $this->db->insert( + 'director_daemon_info', + $this->details->getPropertiesToInsert() + ); + } + } catch (Exception $e) { + Logger::error($e->getMessage()); + $this->reconnect(); + } + } + + protected function eventuallySetStopped() + { + try { + if (! $this->db) { + return; + } + $this->db->update( + 'director_daemon_info', + ['ts_stopped' => DaemonUtil::timestampWithMilliseconds()], + $this->db->quoteInto('instance_uuid_hex = ?', $this->details->getInstanceUuid()) + ); + } catch (Exception $e) { + Logger::error('Failed to update daemon info (setting ts_stopped): ' . $e->getMessage()); + } + } +} diff --git a/library/Director/Daemon/DaemonProcessDetails.php b/library/Director/Daemon/DaemonProcessDetails.php new file mode 100644 index 0000000..454e31f --- /dev/null +++ b/library/Director/Daemon/DaemonProcessDetails.php @@ -0,0 +1,122 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use gipfl\LinuxHealth\Memory; +use Icinga\Application\Platform; +use React\ChildProcess\Process; +use gipfl\Cli\Process as CliProcess; + +class DaemonProcessDetails +{ + /** @var string */ + protected $instanceUuid; + + /** @var \stdClass */ + protected $info; + + /** @var ProcessList[] */ + protected $processLists = []; + + protected $myArgs; + + protected $myPid; + + public function __construct($instanceUuid) + { + $this->instanceUuid = $instanceUuid; + $this->initialize(); + } + + public function getInstanceUuid() + { + return $this->instanceUuid; + } + + public function getPropertiesToInsert() + { + return $this->getPropertiesToUpdate() + (array) $this->info; + } + + public function getPropertiesToUpdate() + { + return [ + 'ts_last_update' => DaemonUtil::timestampWithMilliseconds(), + 'ts_stopped' => null, + 'process_info' => \json_encode($this->collectProcessInfo()), + ]; + } + + public function set($property, $value) + { + if (\property_exists($this->info, $property)) { + $this->info->$property = $value; + } else { + throw new \InvalidArgumentException("Trying to set invalid daemon info property: $property"); + } + } + + public function registerProcessList(ProcessList $list) + { + $refresh = function (Process $process) { + $this->refreshProcessInfo(); + }; + $list->on('start', $refresh)->on('exit', $refresh); + $this->processLists[] = $list; + + return $this; + } + + protected function refreshProcessInfo() + { + $this->set('process_info', \json_encode($this->collectProcessInfo())); + } + + protected function collectProcessInfo() + { + $info = (object) [$this->myPid => (object) [ + 'command' => implode(' ', $this->myArgs), + 'running' => true, + 'memory' => Memory::getUsageForPid($this->myPid) + ]]; + + foreach ($this->processLists as $processList) { + foreach ($processList->getOverview() as $pid => $details) { + $info->$pid = $details; + } + } + + return $info; + } + + protected function initialize() + { + global $argv; + CliProcess::getInitialCwd(); + $this->myArgs = $argv; + $this->myPid = \posix_getpid(); + if (isset($_SERVER['_'])) { + $self = $_SERVER['_']; + } else { + // Process does a better job, but want the relative path (if such) + $self = $_SERVER['PHP_SELF']; + } + $this->info = (object) [ + 'instance_uuid_hex' => $this->instanceUuid, + 'running_with_systemd' => 'n', + 'ts_started' => (int) ((float) $_SERVER['REQUEST_TIME_FLOAT'] * 1000), + 'ts_stopped' => null, + 'pid' => \posix_getpid(), + 'fqdn' => Platform::getFqdn(), + 'username' => Platform::getPhpUser(), + 'schema_version' => null, + 'php_version' => Platform::getPhpVersion(), + 'binary_path' => $self, + 'binary_realpath' => CliProcess::getBinaryPath(), + 'php_integer_size' => PHP_INT_SIZE, + 'php_binary_path' => PHP_BINARY, + 'php_binary_realpath' => \realpath(PHP_BINARY), // TODO: useless? + 'process_info' => null, + ]; + } +} diff --git a/library/Director/Daemon/DaemonProcessState.php b/library/Director/Daemon/DaemonProcessState.php new file mode 100644 index 0000000..6ae3cd2 --- /dev/null +++ b/library/Director/Daemon/DaemonProcessState.php @@ -0,0 +1,85 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use gipfl\Cli\Process; +use gipfl\SystemD\NotifySystemD; + +class DaemonProcessState +{ + /** @var NotifySystemD|null */ + protected $systemd; + + protected $components = []; + + protected $currentMessage; + + protected $processTitle; + + protected $state; + + public function __construct($processTitle) + { + $this->processTitle = $processTitle; + $this->refreshMessage(); + } + + /** + * @param NotifySystemD|false $systemd + * @return $this + */ + public function setSystemd($systemd) + { + if ($systemd) { + $this->systemd = $systemd; + } else { + $this->systemd = null; + } + + return $this; + } + + public function setState($message) + { + $this->state = $message; + $this->refreshMessage(); + + return $this; + } + + public function setComponentState($name, $stateMessage) + { + if ($stateMessage === null) { + unset($this->components[$name]); + } else { + $this->components[$name] = $stateMessage; + } + $this->refreshMessage(); + } + + protected function refreshMessage() + { + $messageParts = []; + if ($this->state !== null && \strlen($this->state)) { + $messageParts[] = $this->state; + } + foreach ($this->components as $component => $message) { + $messageParts[] = "$component: $message"; + } + + $message = \implode(', ', $messageParts); + + if ($message !== $this->currentMessage) { + $this->currentMessage = $message; + if (\strlen($message) === 0) { + Process::setTitle($this->processTitle); + } else { + Process::setTitle($this->processTitle . ": $message"); + } + + if ($this->systemd) { + $this->systemd->setStatus($message); + } + } + } +} diff --git a/library/Director/Daemon/DaemonUtil.php b/library/Director/Daemon/DaemonUtil.php new file mode 100644 index 0000000..c978d11 --- /dev/null +++ b/library/Director/Daemon/DaemonUtil.php @@ -0,0 +1,16 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +class DaemonUtil +{ + /** + * @return int + */ + public static function timestampWithMilliseconds() + { + $mTime = explode(' ', microtime()); + + return (int) round($mTime[0] * 1000) + (int) $mTime[1] * 1000; + } +} diff --git a/library/Director/Daemon/DbBasedComponent.php b/library/Director/Daemon/DbBasedComponent.php new file mode 100644 index 0000000..c176c14 --- /dev/null +++ b/library/Director/Daemon/DbBasedComponent.php @@ -0,0 +1,19 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use Icinga\Module\Director\Db; + +interface DbBasedComponent +{ + /** + * @param Db $db + * @return \React\Promise\ExtendedPromiseInterface; + */ + public function initDb(Db $db); + + /** + * @return \React\Promise\ExtendedPromiseInterface; + */ + public function stopDb(); +} diff --git a/library/Director/Daemon/DeploymentChecker.php b/library/Director/Daemon/DeploymentChecker.php new file mode 100644 index 0000000..82d6d05 --- /dev/null +++ b/library/Director/Daemon/DeploymentChecker.php @@ -0,0 +1,51 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use Exception; +use Icinga\Module\Director\Db; +use Icinga\Module\Director\Objects\DirectorDeploymentLog; +use React\EventLoop\LoopInterface; +use function React\Promise\resolve; + +class DeploymentChecker implements DbBasedComponent +{ + /** @var Db */ + protected $connection; + + public function __construct(LoopInterface $loop) + { + $loop->addPeriodicTimer(5, function () { + if ($db = $this->connection) { + try { + if (DirectorDeploymentLog::hasUncollected($db)) { + $db->getDeploymentEndpoint()->api()->collectLogFiles($db); + } + } catch (Exception $e) { + // Ignore eventual issues while talking to Icinga + } + } + }); + } + + /** + * @param Db $connection + * @return \React\Promise\ExtendedPromiseInterface + */ + public function initDb(Db $connection) + { + $this->connection = $connection; + + return resolve(); + } + + /** + * @return \React\Promise\ExtendedPromiseInterface + */ + public function stopDb() + { + $this->connection = null; + + return resolve(); + } +} diff --git a/library/Director/Daemon/JobRunner.php b/library/Director/Daemon/JobRunner.php new file mode 100644 index 0000000..78d7747 --- /dev/null +++ b/library/Director/Daemon/JobRunner.php @@ -0,0 +1,234 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use gipfl\IcingaCliDaemon\FinishedProcessState; +use gipfl\IcingaCliDaemon\IcingaCliRpc; +use Icinga\Application\Logger; +use Icinga\Module\Director\Db; +use Icinga\Module\Director\Objects\DirectorJob; +use React\ChildProcess\Process; +use React\EventLoop\LoopInterface; +use React\Promise\Promise; +use function React\Promise\resolve; + +class JobRunner implements DbBasedComponent +{ + /** @var Db */ + protected $db; + + /** @var LoopInterface */ + protected $loop; + + /** @var int[] */ + protected $scheduledIds = []; + + /** @var Promise[] */ + protected $runningIds = []; + + protected $checkInterval = 10; + + /** @var \React\EventLoop\TimerInterface */ + protected $timer; + + /** @var LogProxy */ + protected $logProxy; + + /** @var ProcessList */ + protected $running; + + public function __construct(LoopInterface $loop) + { + $this->loop = $loop; + $this->running = new ProcessList($loop); + } + + public function forwardLog(LogProxy $logProxy) + { + $this->logProxy = $logProxy; + + return $this; + } + + /** + * @param Db $db + * @return \React\Promise\ExtendedPromiseInterface + */ + public function initDb(Db $db) + { + $this->db = $db; + $check = function () { + try { + $this->checkForPendingJobs(); + $this->runNextPendingJob(); + } catch (\Exception $e) { + Logger::error($e->getMessage()); + } + }; + if ($this->timer === null) { + $this->loop->futureTick($check); + } + if ($this->timer !== null) { + Logger::info('Cancelling former timer'); + $this->loop->cancelTimer($this->timer); + } + $this->timer = $this->loop->addPeriodicTimer($this->checkInterval, $check); + + return resolve(); + } + + /** + * @return \React\Promise\ExtendedPromiseInterface + */ + public function stopDb() + { + $this->scheduledIds = []; + if ($this->timer !== null) { + $this->loop->cancelTimer($this->timer); + $this->timer = null; + } + $allFinished = $this->running->killOrTerminate(); + foreach ($this->runningIds as $id => $promise) { + $promise->cancel(); + } + $this->runningIds = []; + + return $allFinished; + } + + protected function hasBeenDisabled() + { + $db = $this->db->getDbAdapter(); + return $db->fetchOne( + $db->select() + ->from('director_setting', 'setting_value') + ->where('setting_name = ?', 'disable_all_jobs') + ) === 'y'; + } + + protected function checkForPendingJobs() + { + if ($this->hasBeenDisabled()) { + $this->scheduledIds = []; + // TODO: disable jobs currently going on? + return; + } + if (empty($this->scheduledIds)) { + $this->loadNextIds(); + } + } + + protected function runNextPendingJob() + { + if ($this->timer === null) { + // Reset happened. Stopping? + return; + } + + if (! empty($this->runningIds)) { + return; + } + while (! empty($this->scheduledIds)) { + if ($this->runNextJob()) { + break; + } + } + } + + protected function loadNextIds() + { + $db = $this->db->getDbAdapter(); + + foreach ($db->fetchCol( + $db->select()->from('director_job', 'id')->where('disabled = ?', 'n') + ) as $id) { + $this->scheduledIds[] = (int) $id; + }; + } + + /** + * @return bool + */ + protected function runNextJob() + { + $id = \array_shift($this->scheduledIds); + try { + $job = DirectorJob::loadWithAutoIncId((int) $id, $this->db); + if ($job->shouldRun()) { + $this->runJob($job); + return true; + } + } catch (\Exception $e) { + Logger::error('Trying to schedule Job failed: ' . $e->getMessage()); + } + + return false; + } + + /** + * @param DirectorJob $job + */ + protected function runJob(DirectorJob $job) + { + $id = $job->get('id'); + $jobName = $job->get('job_name'); + Logger::debug("Job ($jobName) starting"); + $arguments = [ + 'director', + 'job', + 'run', + '--id', + $job->get('id'), + '--debug', + '--rpc' + ]; + $cli = new IcingaCliRpc(); + $cli->setArguments($arguments); + $cli->on('start', function (Process $process) { + $this->onProcessStarted($process); + }); + + // Happens on protocol (Netstring) errors or similar: + $cli->on('error', function (\Exception $e) { + Logger::error('UNEXPECTED: ' . rtrim($e->getMessage())); + }); + if ($this->logProxy) { + $logger = clone($this->logProxy); + $logger->setPrefix("Job ($jobName): "); + $cli->rpc()->setHandler($logger, 'logger'); + } + unset($this->scheduledIds[$id]); + $this->runningIds[$id] = $cli->run($this->loop)->then(function () use ($id, $jobName) { + Logger::debug("Job ($jobName) finished"); + })->otherwise(function (\Exception $e) use ($id, $jobName) { + Logger::error("Job ($jobName) failed: " . $e->getMessage()); + })->otherwise(function (FinishedProcessState $state) use ($jobName) { + Logger::error("Job ($jobName) failed: " . $state->getReason()); + })->always(function () use ($id) { + unset($this->runningIds[$id]); + $this->loop->futureTick(function () { + $this->runNextPendingJob(); + }); + }); + } + + /** + * @return ProcessList + */ + public function getProcessList() + { + return $this->running; + } + + protected function onProcessStarted(Process $process) + { + $this->running->attach($process); + } + + public function __destruct() + { + $this->stopDb(); + $this->logProxy = null; + $this->loop = null; + } +} diff --git a/library/Director/Daemon/JsonRpcLogWriter.php b/library/Director/Daemon/JsonRpcLogWriter.php new file mode 100644 index 0000000..edfa23e --- /dev/null +++ b/library/Director/Daemon/JsonRpcLogWriter.php @@ -0,0 +1,37 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use gipfl\Protocol\JsonRpc\Connection; +use gipfl\Protocol\JsonRpc\Notification; +use Icinga\Application\Logger\LogWriter; +use Icinga\Data\ConfigObject; + +class JsonRpcLogWriter extends LogWriter +{ + protected $connection; + + protected static $severityMap = [ + Logger::DEBUG => 'debug', + Logger::INFO => 'info', + Logger::WARNING => 'warning', + Logger::ERROR => 'error', + ]; + + public function __construct(Connection $connection) + { + parent::__construct(new ConfigObject([])); + $this->connection = $connection; + } + + public function log($severity, $message) + { + $message = \iconv('UTF-8', 'UTF-8//IGNORE', $message); + $this->connection->sendNotification( + Notification::create('logger.log', [ + static::$severityMap[$severity], + $message + ]) + ); + } +} diff --git a/library/Director/Daemon/LogProxy.php b/library/Director/Daemon/LogProxy.php new file mode 100644 index 0000000..0b58ae8 --- /dev/null +++ b/library/Director/Daemon/LogProxy.php @@ -0,0 +1,76 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use Exception; +use Icinga\Module\Director\Db; +use function React\Promise\resolve; + +class LogProxy implements DbBasedComponent +{ + protected $connection; + + protected $db; + + protected $server; + + protected $instanceUuid; + + protected $prefix = ''; + + public function __construct($instanceUuid) + { + $this->instanceUuid = $instanceUuid; + } + + public function setPrefix($prefix) + { + $this->prefix = $prefix; + + return $this; + } + + /** + * @param Db $connection + * @return \React\Promise\ExtendedPromiseInterface + */ + public function initDb(Db $connection) + { + $this->connection = $connection; + $this->db = $connection->getDbAdapter(); + + return resolve(); + } + + /** + * @return \React\Promise\ExtendedPromiseInterface + */ + public function stopDb() + { + $this->connection = null; + $this->db = null; + + return resolve(); + } + + public function log($severity, $message) + { + Logger::$severity($this->prefix . $message); + /* + // Not yet + try { + if ($this->db) { + $this->db->insert('director_daemonlog', [ + // environment/installation/db? + 'instance_uuid' => $this->instanceUuid, + 'ts_create' => DaemonUtil::timestampWithMilliseconds(), + 'level' => $severity, + 'message' => $message, + ]); + } + } catch (Exception $e) { + Logger::error($e->getMessage()); + } + */ + } +} diff --git a/library/Director/Daemon/Logger.php b/library/Director/Daemon/Logger.php new file mode 100644 index 0000000..27fcbf5 --- /dev/null +++ b/library/Director/Daemon/Logger.php @@ -0,0 +1,24 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use Icinga\Application\Logger as IcingaLogger; +use Icinga\Application\Logger\LogWriter; +use Icinga\Exception\ConfigurationError; + +class Logger extends IcingaLogger +{ + public static function replaceRunningInstance(LogWriter $writer, $level = null) + { + try { + $instance = static::$instance; + if ($level !== null) { + $instance->setLevel($level); + } + + $instance->writer = $writer; + } catch (ConfigurationError $e) { + self::$instance->error($e->getMessage()); + } + } +} diff --git a/library/Director/Daemon/ProcessList.php b/library/Director/Daemon/ProcessList.php new file mode 100644 index 0000000..85b9aac --- /dev/null +++ b/library/Director/Daemon/ProcessList.php @@ -0,0 +1,125 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use gipfl\LinuxHealth\Memory; +use Icinga\Application\Logger; +use ipl\Stdlib\EventEmitter; +use React\ChildProcess\Process; +use React\EventLoop\LoopInterface; +use React\Promise\Deferred; +use function React\Promise\resolve; + +class ProcessList +{ + use EventEmitter; + + /** @var LoopInterface */ + protected $loop; + + /** @var \SplObjectStorage */ + protected $processes; + + /** + * ProcessList constructor. + * @param LoopInterface $loop + * @param Process[] $processes + */ + public function __construct(LoopInterface $loop, array $processes = []) + { + $this->loop = $loop; + $this->processes = new \SplObjectStorage(); + foreach ($processes as $process) { + $this->attach($process); + } + } + + public function attach(Process $process) + { + $this->processes->attach($process); + $this->emit('start', [$process]); + $process->on('exit', function () use ($process) { + $this->detach($process); + $this->emit('exit', [$process]); + }); + + return $this; + } + + public function detach(Process $process) + { + $this->processes->detach($process); + + return $this; + } + + /** + * @param int $timeout + * @return \React\Promise\ExtendedPromiseInterface + */ + public function killOrTerminate($timeout = 5) + { + if ($this->processes->count() === 0) { + return resolve(); + } + $deferred = new Deferred(); + $killTimer = $this->loop->addTimer($timeout, function () use ($deferred) { + /** @var Process $process */ + foreach ($this->processes as $process) { + $pid = $process->getPid(); + Logger::error("Process $pid is still running, sending SIGKILL"); + $process->terminate(SIGKILL); + } + + // Let's a little bit of delay after KILLing + $this->loop->addTimer(0.1, function () use ($deferred) { + $deferred->resolve(); + }); + }); + + $timer = $this->loop->addPeriodicTimer($timeout / 20, function () use ( + $deferred, + &$timer, + $killTimer + ) { + $stopped = []; + /** @var Process $process */ + foreach ($this->processes as $process) { + if (! $process->isRunning()) { + $stopped[] = $process; + } + } + foreach ($stopped as $process) { + $this->processes->detach($process); + } + if ($this->processes->count() === 0) { + $this->loop->cancelTimer($timer); + $this->loop->cancelTimer($killTimer); + $deferred->resolve(); + } + }); + /** @var Process $process */ + foreach ($this->processes as $process) { + $process->terminate(SIGTERM); + } + + return $deferred->promise(); + } + + public function getOverview() + { + $info = []; + + /** @var Process $process */ + foreach ($this->processes as $process) { + $pid = $process->getPid(); + $info[$pid] = (object) [ + 'command' => preg_replace('/^exec /', '', $process->getCommand()), + 'running' => $process->isRunning(), + 'memory' => Memory::getUsageForPid($pid) + ]; + } + + return $info; + } +} diff --git a/library/Director/Daemon/RunningDaemonInfo.php b/library/Director/Daemon/RunningDaemonInfo.php new file mode 100644 index 0000000..adb3549 --- /dev/null +++ b/library/Director/Daemon/RunningDaemonInfo.php @@ -0,0 +1,154 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +class RunningDaemonInfo +{ + /** @var object */ + protected $info; + + public function __construct($info = null) + { + $this->setInfo($info); + } + + public function setInfo($info) + { + if (empty($info)) { + $this->info = $this->createEmptyInfo(); + } else { + $this->info = $info; + } + + return $this; + } + + public function isRunning() + { + return $this->getPid() !== null && ! $this->isOutdated(); + } + + public function getPid() + { + return (int) $this->info->pid; + } + + public function getUsername() + { + return $this->info->username; + } + + public function getFqdn() + { + return $this->info->fqdn; + } + + public function getLastUpdate() + { + return $this->info->ts_last_update; + } + + public function getLastModification() + { + return $this->info->ts_last_modification; + } + + public function getPhpVersion() + { + return $this->info->php_version; + } + + public function hasBeenStopped() + { + return $this->getTimestampStopped() !== null; + } + + public function getTimestampStarted() + { + return $this->info->ts_started; + } + + public function getTimestampStopped() + { + return $this->info->ts_stopped; + } + + public function isOutdated($seconds = 5) + { + return ( + DaemonUtil::timestampWithMilliseconds() - $this->info->ts_last_update + ) > $seconds * 1000; + } + + public function isRunningWithSystemd() + { + return $this->info->running_with_systemd === 'y'; + } + + public function getBinaryPath() + { + return $this->info->binary_path; + } + + public function getBinaryRealpath() + { + return $this->info->binary_realpath; + } + + public function binaryRealpathDiffers() + { + return $this->getBinaryPath() !== $this->getBinaryRealpath(); + } + + public function getPhpBinaryPath() + { + return $this->info->php_binary_path; + } + + public function getPhpBinaryRealpath() + { + return $this->info->php_binary_realpath; + } + + public function phpBinaryRealpathDiffers() + { + return $this->getPhpBinaryPath() !== $this->getPhpBinaryRealpath(); + } + + public function getPhpIntegerSize() + { + return (int) $this->info->php_integer_size; + } + + public function has64bitIntegers() + { + return $this->getPhpIntegerSize() === 8; + } + + /* + // TODO: not yet + public function isMaster() + { + return $this->info->is_master === 'y'; + } + + public function isStandby() + { + return ! $this->isMaster(); + } + */ + + protected function createEmptyInfo() + { + return (object) [ + 'pid' => null, + 'fqdn' => null, + 'username' => null, + 'php_version' => null, + // 'is_master' => null, + // Only if not running. Does this make any sense in 'empty info'? + 'ts_last_update' => null, + 'ts_last_modification' => null + ]; + } +} diff --git a/library/Director/Daemon/SystemdLogWriter.php b/library/Director/Daemon/SystemdLogWriter.php new file mode 100644 index 0000000..8b64442 --- /dev/null +++ b/library/Director/Daemon/SystemdLogWriter.php @@ -0,0 +1,27 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use Icinga\Application\Logger\LogWriter; +use Icinga\Data\ConfigObject; + +class SystemdLogWriter extends LogWriter +{ + protected static $severityMap = [ + Logger::DEBUG => 7, + Logger::INFO => 6, + Logger::WARNING => 4, + Logger::ERROR => 3, + ]; + + public function __construct() + { + parent::__construct(new ConfigObject([])); + } + + public function log($severity, $message) + { + $severity = self::$severityMap[$severity]; + echo "<$severity>$message\n"; + } +} |