summaryrefslogtreecommitdiffstats
path: root/library/Director/Daemon/DaemonDb.php
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--library/Director/Daemon/DaemonDb.php365
1 files changed, 365 insertions, 0 deletions
diff --git a/library/Director/Daemon/DaemonDb.php b/library/Director/Daemon/DaemonDb.php
new file mode 100644
index 0000000..7772b3a
--- /dev/null
+++ b/library/Director/Daemon/DaemonDb.php
@@ -0,0 +1,365 @@
+<?php
+
+namespace Icinga\Module\Director\Daemon;
+
+use Exception;
+use gipfl\IcingaCliDaemon\DbResourceConfigWatch;
+use gipfl\IcingaCliDaemon\RetryUnless;
+use Icinga\Data\ConfigObject;
+use Icinga\Module\Director\Db;
+use Icinga\Module\Director\Db\Migrations;
+use ipl\Stdlib\EventEmitter;
+use React\EventLoop\LoopInterface;
+use React\Promise\Deferred;
+use RuntimeException;
+use SplObjectStorage;
+use function React\Promise\reject;
+use function React\Promise\resolve;
+
+class DaemonDb
+{
+ use EventEmitter;
+
+ /** @var LoopInterface */
+ private $loop;
+
+ /** @var Db */
+ protected $connection;
+
+ /** @var \Zend_Db_Adapter_Abstract */
+ protected $db;
+
+ /** @var DaemonProcessDetails */
+ protected $details;
+
+ /** @var DbBasedComponent[] */
+ protected $registeredComponents = [];
+
+ /** @var DbResourceConfigWatch|null */
+ protected $configWatch;
+
+ /** @var array|null */
+ protected $dbConfig;
+
+ /** @var RetryUnless|null */
+ protected $pendingReconnection;
+
+ /** @var Deferred|null */
+ protected $pendingDisconnect;
+
+ /** @var \React\EventLoop\TimerInterface */
+ protected $refreshTimer;
+
+ /** @var \React\EventLoop\TimerInterface */
+ protected $schemaCheckTimer;
+
+ /** @var int */
+ protected $startupSchemaVersion;
+
+ public function __construct(DaemonProcessDetails $details, $dbConfig = null)
+ {
+ $this->details = $details;
+ $this->dbConfig = $dbConfig;
+ }
+
+ public function register(DbBasedComponent $component)
+ {
+ $this->registeredComponents[] = $component;
+
+ return $this;
+ }
+
+ public function setConfigWatch(DbResourceConfigWatch $configWatch)
+ {
+ $this->configWatch = $configWatch;
+ $configWatch->notify(function ($config) {
+ $this->disconnect()->then(function () use ($config) {
+ return $this->onNewConfig($config);
+ });
+ });
+ if ($this->loop) {
+ $configWatch->run($this->loop);
+ }
+
+ return $this;
+ }
+
+ public function run(LoopInterface $loop)
+ {
+ $this->loop = $loop;
+ $this->connect();
+ $this->refreshTimer = $loop->addPeriodicTimer(3, function () {
+ $this->refreshMyState();
+ });
+ $this->schemaCheckTimer = $loop->addPeriodicTimer(15, function () {
+ $this->checkDbSchema();
+ });
+ if ($this->configWatch) {
+ $this->configWatch->run($this->loop);
+ }
+ }
+
+ protected function onNewConfig($config)
+ {
+ if ($config === null) {
+ if ($this->dbConfig === null) {
+ Logger::error('DB configuration is not valid');
+ } else {
+ Logger::error('DB configuration is no longer valid');
+ }
+ $this->emitStatus('no configuration');
+ $this->dbConfig = $config;
+
+ return resolve();
+ } else {
+ $this->emitStatus('configuration loaded');
+ $this->dbConfig = $config;
+
+ return $this->establishConnection($config);
+ }
+ }
+
+ protected function establishConnection($config)
+ {
+ if ($this->connection !== null) {
+ Logger::error('Trying to establish a connection while being connected');
+ return reject();
+ }
+ $callback = function () use ($config) {
+ $this->reallyEstablishConnection($config);
+ };
+ $onSuccess = function () {
+ $this->pendingReconnection = null;
+ $this->onConnected();
+ };
+ if ($this->pendingReconnection) {
+ $this->pendingReconnection->reset();
+ $this->pendingReconnection = null;
+ }
+ $this->emitStatus('connecting');
+
+ return $this->pendingReconnection = RetryUnless::succeeding($callback)
+ ->setInterval(0.2)
+ ->slowDownAfter(10, 10)
+ ->run($this->loop)
+ ->then($onSuccess)
+ ;
+ }
+
+ protected function reallyEstablishConnection($config)
+ {
+ $connection = new Db(new ConfigObject($config));
+ $connection->getDbAdapter()->getConnection();
+ $migrations = new Migrations($connection);
+ if (! $migrations->hasSchema()) {
+ $this->emitStatus('no schema', 'error');
+ throw new RuntimeException('DB has no schema');
+ }
+ $this->wipeOrphanedInstances($connection);
+ if ($this->hasAnyOtherActiveInstance($connection)) {
+ $this->emitStatus('locked by other instance', 'warning');
+ throw new RuntimeException('DB is locked by a running daemon instance, will retry');
+ }
+ $this->startupSchemaVersion = $migrations->getLastMigrationNumber();
+ $this->details->set('schema_version', $this->startupSchemaVersion);
+
+ $this->connection = $connection;
+ $this->db = $connection->getDbAdapter();
+ $this->loop->futureTick(function () {
+ $this->refreshMyState();
+ });
+
+ return $connection;
+ }
+
+ protected function checkDbSchema()
+ {
+ if ($this->connection === null) {
+ return;
+ }
+
+ if ($this->schemaIsOutdated()) {
+ $this->emit('schemaChange', [
+ $this->getStartupSchemaVersion(),
+ $this->getDbSchemaVersion()
+ ]);
+ }
+ }
+
+ protected function schemaIsOutdated()
+ {
+ return $this->getStartupSchemaVersion() < $this->getDbSchemaVersion();
+ }
+
+ protected function getStartupSchemaVersion()
+ {
+ return $this->startupSchemaVersion;
+ }
+
+ protected function getDbSchemaVersion()
+ {
+ if ($this->connection === null) {
+ throw new RuntimeException(
+ 'Cannot determine DB schema version without an established DB connection'
+ );
+ }
+ $migrations = new Migrations($this->connection);
+
+ return $migrations->getLastMigrationNumber();
+ }
+
+ protected function onConnected()
+ {
+ $this->emitStatus('connected');
+ Logger::info('Connected to the database');
+ foreach ($this->registeredComponents as $component) {
+ $component->initDb($this->connection);
+ }
+ }
+
+ /**
+ * @return \React\Promise\PromiseInterface
+ */
+ protected function reconnect()
+ {
+ return $this->disconnect()->then(function () {
+ return $this->connect();
+ }, function (Exception $e) {
+ Logger::error('Disconnect failed. This should never happen: ' . $e->getMessage());
+ exit(1);
+ });
+ }
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function connect()
+ {
+ if ($this->connection === null) {
+ if ($this->dbConfig) {
+ return $this->establishConnection($this->dbConfig);
+ }
+ }
+
+ return resolve();
+ }
+
+ /**
+ * @return \React\Promise\ExtendedPromiseInterface
+ */
+ public function disconnect()
+ {
+ if (! $this->connection) {
+ return resolve();
+ }
+ if ($this->pendingDisconnect) {
+ return $this->pendingDisconnect->promise();
+ }
+
+ $this->eventuallySetStopped();
+ $this->pendingDisconnect = new Deferred();
+ $pendingComponents = new SplObjectStorage();
+ foreach ($this->registeredComponents as $component) {
+ $pendingComponents->attach($component);
+ $resolve = function () use ($pendingComponents, $component) {
+ $pendingComponents->detach($component);
+ if ($pendingComponents->count() === 0) {
+ $this->pendingDisconnect->resolve();
+ }
+ };
+ // TODO: What should we do in case they don't?
+ $component->stopDb()->then($resolve);
+ }
+
+ try {
+ if ($this->db) {
+ $this->db->closeConnection();
+ }
+ } catch (Exception $e) {
+ Logger::error('Failed to disconnect: ' . $e->getMessage());
+ }
+
+ return $this->pendingDisconnect->promise()->then(function () {
+ $this->connection = null;
+ $this->db = null;
+ $this->pendingDisconnect = null;
+ });
+ }
+
+ protected function emitStatus($message, $level = 'info')
+ {
+ $this->emit('state', [$message, $level]);
+
+ return $this;
+ }
+
+ protected function hasAnyOtherActiveInstance(Db $connection)
+ {
+ $db = $connection->getDbAdapter();
+
+ return (int) $db->fetchOne(
+ $db->select()
+ ->from('director_daemon_info', 'COUNT(*)')
+ ->where('ts_stopped IS NULL')
+ ) > 0;
+ }
+
+ protected function wipeOrphanedInstances(Db $connection)
+ {
+ $db = $connection->getDbAdapter();
+ $db->delete('director_daemon_info', 'ts_stopped IS NOT NULL');
+ $db->delete('director_daemon_info', $db->quoteInto(
+ 'instance_uuid_hex = ?',
+ $this->details->getInstanceUuid()
+ ));
+ $count = $db->delete(
+ 'director_daemon_info',
+ 'ts_stopped IS NULL AND ts_last_update < ' . (
+ DaemonUtil::timestampWithMilliseconds() - (60 * 1000)
+ )
+ );
+ if ($count > 1) {
+ Logger::error("Removed $count orphaned daemon instance(s) from DB");
+ }
+ }
+
+ protected function refreshMyState()
+ {
+ if ($this->db === null || $this->pendingReconnection || $this->pendingDisconnect) {
+ return;
+ }
+ try {
+ $updated = $this->db->update(
+ 'director_daemon_info',
+ $this->details->getPropertiesToUpdate(),
+ $this->db->quoteInto('instance_uuid_hex = ?', $this->details->getInstanceUuid())
+ );
+
+ if (! $updated) {
+ $this->db->insert(
+ 'director_daemon_info',
+ $this->details->getPropertiesToInsert()
+ );
+ }
+ } catch (Exception $e) {
+ Logger::error($e->getMessage());
+ $this->reconnect();
+ }
+ }
+
+ protected function eventuallySetStopped()
+ {
+ try {
+ if (! $this->db) {
+ return;
+ }
+ $this->db->update(
+ 'director_daemon_info',
+ ['ts_stopped' => DaemonUtil::timestampWithMilliseconds()],
+ $this->db->quoteInto('instance_uuid_hex = ?', $this->details->getInstanceUuid())
+ );
+ } catch (Exception $e) {
+ Logger::error('Failed to update daemon info (setting ts_stopped): ' . $e->getMessage());
+ }
+ }
+}