diff options
Diffstat (limited to 'library/Director/Daemon/DaemonDb.php')
-rw-r--r-- | library/Director/Daemon/DaemonDb.php | 365 |
1 files changed, 365 insertions, 0 deletions
diff --git a/library/Director/Daemon/DaemonDb.php b/library/Director/Daemon/DaemonDb.php new file mode 100644 index 0000000..7772b3a --- /dev/null +++ b/library/Director/Daemon/DaemonDb.php @@ -0,0 +1,365 @@ +<?php + +namespace Icinga\Module\Director\Daemon; + +use Exception; +use gipfl\IcingaCliDaemon\DbResourceConfigWatch; +use gipfl\IcingaCliDaemon\RetryUnless; +use Icinga\Data\ConfigObject; +use Icinga\Module\Director\Db; +use Icinga\Module\Director\Db\Migrations; +use ipl\Stdlib\EventEmitter; +use React\EventLoop\LoopInterface; +use React\Promise\Deferred; +use RuntimeException; +use SplObjectStorage; +use function React\Promise\reject; +use function React\Promise\resolve; + +class DaemonDb +{ + use EventEmitter; + + /** @var LoopInterface */ + private $loop; + + /** @var Db */ + protected $connection; + + /** @var \Zend_Db_Adapter_Abstract */ + protected $db; + + /** @var DaemonProcessDetails */ + protected $details; + + /** @var DbBasedComponent[] */ + protected $registeredComponents = []; + + /** @var DbResourceConfigWatch|null */ + protected $configWatch; + + /** @var array|null */ + protected $dbConfig; + + /** @var RetryUnless|null */ + protected $pendingReconnection; + + /** @var Deferred|null */ + protected $pendingDisconnect; + + /** @var \React\EventLoop\TimerInterface */ + protected $refreshTimer; + + /** @var \React\EventLoop\TimerInterface */ + protected $schemaCheckTimer; + + /** @var int */ + protected $startupSchemaVersion; + + public function __construct(DaemonProcessDetails $details, $dbConfig = null) + { + $this->details = $details; + $this->dbConfig = $dbConfig; + } + + public function register(DbBasedComponent $component) + { + $this->registeredComponents[] = $component; + + return $this; + } + + public function setConfigWatch(DbResourceConfigWatch $configWatch) + { + $this->configWatch = $configWatch; + $configWatch->notify(function ($config) { + $this->disconnect()->then(function () use ($config) { + return $this->onNewConfig($config); + }); + }); + if ($this->loop) { + $configWatch->run($this->loop); + } + + return $this; + } + + public function run(LoopInterface $loop) + { + $this->loop = $loop; + $this->connect(); + $this->refreshTimer = $loop->addPeriodicTimer(3, function () { + $this->refreshMyState(); + }); + $this->schemaCheckTimer = $loop->addPeriodicTimer(15, function () { + $this->checkDbSchema(); + }); + if ($this->configWatch) { + $this->configWatch->run($this->loop); + } + } + + protected function onNewConfig($config) + { + if ($config === null) { + if ($this->dbConfig === null) { + Logger::error('DB configuration is not valid'); + } else { + Logger::error('DB configuration is no longer valid'); + } + $this->emitStatus('no configuration'); + $this->dbConfig = $config; + + return resolve(); + } else { + $this->emitStatus('configuration loaded'); + $this->dbConfig = $config; + + return $this->establishConnection($config); + } + } + + protected function establishConnection($config) + { + if ($this->connection !== null) { + Logger::error('Trying to establish a connection while being connected'); + return reject(); + } + $callback = function () use ($config) { + $this->reallyEstablishConnection($config); + }; + $onSuccess = function () { + $this->pendingReconnection = null; + $this->onConnected(); + }; + if ($this->pendingReconnection) { + $this->pendingReconnection->reset(); + $this->pendingReconnection = null; + } + $this->emitStatus('connecting'); + + return $this->pendingReconnection = RetryUnless::succeeding($callback) + ->setInterval(0.2) + ->slowDownAfter(10, 10) + ->run($this->loop) + ->then($onSuccess) + ; + } + + protected function reallyEstablishConnection($config) + { + $connection = new Db(new ConfigObject($config)); + $connection->getDbAdapter()->getConnection(); + $migrations = new Migrations($connection); + if (! $migrations->hasSchema()) { + $this->emitStatus('no schema', 'error'); + throw new RuntimeException('DB has no schema'); + } + $this->wipeOrphanedInstances($connection); + if ($this->hasAnyOtherActiveInstance($connection)) { + $this->emitStatus('locked by other instance', 'warning'); + throw new RuntimeException('DB is locked by a running daemon instance, will retry'); + } + $this->startupSchemaVersion = $migrations->getLastMigrationNumber(); + $this->details->set('schema_version', $this->startupSchemaVersion); + + $this->connection = $connection; + $this->db = $connection->getDbAdapter(); + $this->loop->futureTick(function () { + $this->refreshMyState(); + }); + + return $connection; + } + + protected function checkDbSchema() + { + if ($this->connection === null) { + return; + } + + if ($this->schemaIsOutdated()) { + $this->emit('schemaChange', [ + $this->getStartupSchemaVersion(), + $this->getDbSchemaVersion() + ]); + } + } + + protected function schemaIsOutdated() + { + return $this->getStartupSchemaVersion() < $this->getDbSchemaVersion(); + } + + protected function getStartupSchemaVersion() + { + return $this->startupSchemaVersion; + } + + protected function getDbSchemaVersion() + { + if ($this->connection === null) { + throw new RuntimeException( + 'Cannot determine DB schema version without an established DB connection' + ); + } + $migrations = new Migrations($this->connection); + + return $migrations->getLastMigrationNumber(); + } + + protected function onConnected() + { + $this->emitStatus('connected'); + Logger::info('Connected to the database'); + foreach ($this->registeredComponents as $component) { + $component->initDb($this->connection); + } + } + + /** + * @return \React\Promise\PromiseInterface + */ + protected function reconnect() + { + return $this->disconnect()->then(function () { + return $this->connect(); + }, function (Exception $e) { + Logger::error('Disconnect failed. This should never happen: ' . $e->getMessage()); + exit(1); + }); + } + + /** + * @return \React\Promise\ExtendedPromiseInterface + */ + public function connect() + { + if ($this->connection === null) { + if ($this->dbConfig) { + return $this->establishConnection($this->dbConfig); + } + } + + return resolve(); + } + + /** + * @return \React\Promise\ExtendedPromiseInterface + */ + public function disconnect() + { + if (! $this->connection) { + return resolve(); + } + if ($this->pendingDisconnect) { + return $this->pendingDisconnect->promise(); + } + + $this->eventuallySetStopped(); + $this->pendingDisconnect = new Deferred(); + $pendingComponents = new SplObjectStorage(); + foreach ($this->registeredComponents as $component) { + $pendingComponents->attach($component); + $resolve = function () use ($pendingComponents, $component) { + $pendingComponents->detach($component); + if ($pendingComponents->count() === 0) { + $this->pendingDisconnect->resolve(); + } + }; + // TODO: What should we do in case they don't? + $component->stopDb()->then($resolve); + } + + try { + if ($this->db) { + $this->db->closeConnection(); + } + } catch (Exception $e) { + Logger::error('Failed to disconnect: ' . $e->getMessage()); + } + + return $this->pendingDisconnect->promise()->then(function () { + $this->connection = null; + $this->db = null; + $this->pendingDisconnect = null; + }); + } + + protected function emitStatus($message, $level = 'info') + { + $this->emit('state', [$message, $level]); + + return $this; + } + + protected function hasAnyOtherActiveInstance(Db $connection) + { + $db = $connection->getDbAdapter(); + + return (int) $db->fetchOne( + $db->select() + ->from('director_daemon_info', 'COUNT(*)') + ->where('ts_stopped IS NULL') + ) > 0; + } + + protected function wipeOrphanedInstances(Db $connection) + { + $db = $connection->getDbAdapter(); + $db->delete('director_daemon_info', 'ts_stopped IS NOT NULL'); + $db->delete('director_daemon_info', $db->quoteInto( + 'instance_uuid_hex = ?', + $this->details->getInstanceUuid() + )); + $count = $db->delete( + 'director_daemon_info', + 'ts_stopped IS NULL AND ts_last_update < ' . ( + DaemonUtil::timestampWithMilliseconds() - (60 * 1000) + ) + ); + if ($count > 1) { + Logger::error("Removed $count orphaned daemon instance(s) from DB"); + } + } + + protected function refreshMyState() + { + if ($this->db === null || $this->pendingReconnection || $this->pendingDisconnect) { + return; + } + try { + $updated = $this->db->update( + 'director_daemon_info', + $this->details->getPropertiesToUpdate(), + $this->db->quoteInto('instance_uuid_hex = ?', $this->details->getInstanceUuid()) + ); + + if (! $updated) { + $this->db->insert( + 'director_daemon_info', + $this->details->getPropertiesToInsert() + ); + } + } catch (Exception $e) { + Logger::error($e->getMessage()); + $this->reconnect(); + } + } + + protected function eventuallySetStopped() + { + try { + if (! $this->db) { + return; + } + $this->db->update( + 'director_daemon_info', + ['ts_stopped' => DaemonUtil::timestampWithMilliseconds()], + $this->db->quoteInto('instance_uuid_hex = ?', $this->details->getInstanceUuid()) + ); + } catch (Exception $e) { + Logger::error('Failed to update daemon info (setting ts_stopped): ' . $e->getMessage()); + } + } +} |