diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
commit | 56ae875861ab260b80a030f50c4aff9f9dc8fff0 (patch) | |
tree | 531412110fc901a5918c7f7442202804a83cada9 /lib/icingadb | |
parent | Initial commit. (diff) | |
download | icinga2-upstream/2.14.2.tar.xz icinga2-upstream/2.14.2.zip |
Adding upstream version 2.14.2.upstream/2.14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | lib/icingadb/CMakeLists.txt | 32 | ||||
-rw-r--r-- | lib/icingadb/icingadb-itl.conf | 24 | ||||
-rw-r--r-- | lib/icingadb/icingadb-objects.cpp | 2966 | ||||
-rw-r--r-- | lib/icingadb/icingadb-stats.cpp | 54 | ||||
-rw-r--r-- | lib/icingadb/icingadb-utility.cpp | 319 | ||||
-rw-r--r-- | lib/icingadb/icingadb.cpp | 311 | ||||
-rw-r--r-- | lib/icingadb/icingadb.hpp | 241 | ||||
-rw-r--r-- | lib/icingadb/icingadb.ti | 63 | ||||
-rw-r--r-- | lib/icingadb/icingadbchecktask.cpp | 513 | ||||
-rw-r--r-- | lib/icingadb/icingadbchecktask.hpp | 29 | ||||
-rw-r--r-- | lib/icingadb/redisconnection.cpp | 773 | ||||
-rw-r--r-- | lib/icingadb/redisconnection.hpp | 678 |
12 files changed, 6003 insertions, 0 deletions
diff --git a/lib/icingadb/CMakeLists.txt b/lib/icingadb/CMakeLists.txt new file mode 100644 index 0000000..de8e4ad --- /dev/null +++ b/lib/icingadb/CMakeLists.txt @@ -0,0 +1,32 @@ +# Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ + +mkclass_target(icingadb.ti icingadb-ti.cpp icingadb-ti.hpp) + +mkembedconfig_target(icingadb-itl.conf icingadb-itl.cpp) + +set(icingadb_SOURCES + icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp redisconnection.cpp icingadb-ti.hpp + icingadbchecktask.cpp icingadb-itl.cpp +) + +if(ICINGA2_UNITY_BUILD) + mkunity_target(icingadb icingadb icingadb_SOURCES) +endif() + +add_library(icingadb OBJECT ${icingadb_SOURCES}) + +include_directories(${icinga2_SOURCE_DIR}/third-party) + +add_dependencies(icingadb base config icinga remote) + +set_target_properties ( + icingadb PROPERTIES + FOLDER Components +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/icingadb.conf + ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available +) + +set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "${CPACK_NSIS_EXTRA_INSTALL_COMMANDS}" PARENT_SCOPE) diff --git a/lib/icingadb/icingadb-itl.conf b/lib/icingadb/icingadb-itl.conf new file mode 100644 index 0000000..5f3950e --- /dev/null +++ b/lib/icingadb/icingadb-itl.conf @@ -0,0 +1,24 @@ +/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */ + +System.assert(Internal.run_with_activation_context(function() { + template CheckCommand "icingadb-check-command" use (checkFunc = Internal.IcingadbCheck) { + execute = checkFunc + } + + object CheckCommand "icingadb" { + import "icingadb-check-command" + + vars.icingadb_name = "icingadb" + + vars.icingadb_full_dump_duration_warning = 5m + vars.icingadb_full_dump_duration_critical = 10m + vars.icingadb_full_sync_duration_warning = 5m + vars.icingadb_full_sync_duration_critical = 10m + vars.icingadb_redis_backlog_warning = 5m + vars.icingadb_redis_backlog_critical = 15m + vars.icingadb_database_backlog_warning = 5m + vars.icingadb_database_backlog_critical = 15m + } +})) + +Internal.remove("IcingadbCheck") diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp new file mode 100644 index 0000000..ff7a833 --- /dev/null +++ b/lib/icingadb/icingadb-objects.cpp @@ -0,0 +1,2966 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "icingadb/redisconnection.hpp" +#include "base/configtype.hpp" +#include "base/configobject.hpp" +#include "base/defer.hpp" +#include "base/json.hpp" +#include "base/logger.hpp" +#include "base/serializer.hpp" +#include "base/shared.hpp" +#include "base/tlsutility.hpp" +#include "base/initialize.hpp" +#include "base/convert.hpp" +#include "base/array.hpp" +#include "base/exception.hpp" +#include "base/utility.hpp" +#include "base/object-packer.hpp" +#include "icinga/command.hpp" +#include "icinga/compatutility.hpp" +#include "icinga/customvarobject.hpp" +#include "icinga/host.hpp" +#include "icinga/service.hpp" +#include "icinga/hostgroup.hpp" +#include "icinga/servicegroup.hpp" +#include "icinga/usergroup.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/eventcommand.hpp" +#include "icinga/notificationcommand.hpp" +#include "icinga/timeperiod.hpp" +#include "icinga/pluginutility.hpp" +#include "remote/zone.hpp" +#include <algorithm> +#include <chrono> +#include <cmath> +#include <cstdint> +#include <iterator> +#include <map> +#include <memory> +#include <mutex> +#include <set> +#include <utility> +#include <type_traits> + +using namespace icinga; + +using Prio = RedisConnection::QueryPriority; + +std::unordered_set<Type*> IcingaDB::m_IndexedTypes; + +INITIALIZE_ONCE(&IcingaDB::ConfigStaticInitialize); + +std::vector<Type::Ptr> IcingaDB::GetTypes() +{ + // The initial config sync will queue the types in the following order. + return { + // Sync them first to get their states ASAP. + Host::TypeInstance, + Service::TypeInstance, + + // Then sync them for similar reasons. + Downtime::TypeInstance, + Comment::TypeInstance, + + HostGroup::TypeInstance, + ServiceGroup::TypeInstance, + CheckCommand::TypeInstance, + Endpoint::TypeInstance, + EventCommand::TypeInstance, + Notification::TypeInstance, + NotificationCommand::TypeInstance, + TimePeriod::TypeInstance, + User::TypeInstance, + UserGroup::TypeInstance, + Zone::TypeInstance + }; +} + +void IcingaDB::ConfigStaticInitialize() +{ + for (auto& type : GetTypes()) { + m_IndexedTypes.emplace(type.get()); + } + + /* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */ + Checkable::OnStateChange.connect([](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) { + IcingaDB::StateChangeHandler(checkable, cr, type); + }); + + Checkable::OnAcknowledgementSet.connect([](const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool, bool persistent, double changeTime, double expiry, const MessageOrigin::Ptr&) { + AcknowledgementSetHandler(checkable, author, comment, type, persistent, changeTime, expiry); + }); + Checkable::OnAcknowledgementCleared.connect([](const Checkable::Ptr& checkable, const String& removedBy, double changeTime, const MessageOrigin::Ptr&) { + AcknowledgementClearedHandler(checkable, removedBy, changeTime); + }); + + Checkable::OnReachabilityChanged.connect([](const Checkable::Ptr&, const CheckResult::Ptr&, std::set<Checkable::Ptr> children, const MessageOrigin::Ptr&) { + IcingaDB::ReachabilityChangeHandler(children); + }); + + /* triggered on create, update and delete objects */ + ConfigObject::OnActiveChanged.connect([](const ConfigObject::Ptr& object, const Value&) { + IcingaDB::VersionChangedHandler(object); + }); + ConfigObject::OnVersionChanged.connect([](const ConfigObject::Ptr& object, const Value&) { + IcingaDB::VersionChangedHandler(object); + }); + + /* downtime start */ + Downtime::OnDowntimeTriggered.connect(&IcingaDB::DowntimeStartedHandler); + /* fixed/flexible downtime end or remove */ + Downtime::OnDowntimeRemoved.connect(&IcingaDB::DowntimeRemovedHandler); + + Checkable::OnNotificationSentToAllUsers.connect([]( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, + const NotificationType& type, const CheckResult::Ptr& cr, const String& author, const String& text, + const MessageOrigin::Ptr& + ) { + IcingaDB::NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text); + }); + + Comment::OnCommentAdded.connect(&IcingaDB::CommentAddedHandler); + Comment::OnCommentRemoved.connect(&IcingaDB::CommentRemovedHandler); + + Checkable::OnFlappingChange.connect(&IcingaDB::FlappingChangeHandler); + + Checkable::OnNewCheckResult.connect([](const Checkable::Ptr& checkable, const CheckResult::Ptr&, const MessageOrigin::Ptr&) { + IcingaDB::NewCheckResultHandler(checkable); + }); + + Checkable::OnNextCheckUpdated.connect([](const Checkable::Ptr& checkable) { + IcingaDB::NextCheckUpdatedHandler(checkable); + }); + + Service::OnHostProblemChanged.connect([](const Service::Ptr& service, const CheckResult::Ptr&, const MessageOrigin::Ptr&) { + IcingaDB::HostProblemChangedHandler(service); + }); + + Notification::OnUsersRawChangedWithOldValue.connect([](const Notification::Ptr& notification, const Value& oldValues, const Value& newValues) { + IcingaDB::NotificationUsersChangedHandler(notification, oldValues, newValues); + }); + Notification::OnUserGroupsRawChangedWithOldValue.connect([](const Notification::Ptr& notification, const Value& oldValues, const Value& newValues) { + IcingaDB::NotificationUserGroupsChangedHandler(notification, oldValues, newValues); + }); + TimePeriod::OnRangesChangedWithOldValue.connect([](const TimePeriod::Ptr& timeperiod, const Value& oldValues, const Value& newValues) { + IcingaDB::TimePeriodRangesChangedHandler(timeperiod, oldValues, newValues); + }); + TimePeriod::OnIncludesChangedWithOldValue.connect([](const TimePeriod::Ptr& timeperiod, const Value& oldValues, const Value& newValues) { + IcingaDB::TimePeriodIncludesChangedHandler(timeperiod, oldValues, newValues); + }); + TimePeriod::OnExcludesChangedWithOldValue.connect([](const TimePeriod::Ptr& timeperiod, const Value& oldValues, const Value& newValues) { + IcingaDB::TimePeriodExcludesChangedHandler(timeperiod, oldValues, newValues); + }); + User::OnGroupsChangedWithOldValue.connect([](const User::Ptr& user, const Value& oldValues, const Value& newValues) { + IcingaDB::UserGroupsChangedHandler(user, oldValues, newValues); + }); + Host::OnGroupsChangedWithOldValue.connect([](const Host::Ptr& host, const Value& oldValues, const Value& newValues) { + IcingaDB::HostGroupsChangedHandler(host, oldValues, newValues); + }); + Service::OnGroupsChangedWithOldValue.connect([](const Service::Ptr& service, const Value& oldValues, const Value& newValues) { + IcingaDB::ServiceGroupsChangedHandler(service, oldValues, newValues); + }); + Command::OnEnvChangedWithOldValue.connect([](const ConfigObject::Ptr& command, const Value& oldValues, const Value& newValues) { + IcingaDB::CommandEnvChangedHandler(command, oldValues, newValues); + }); + Command::OnArgumentsChangedWithOldValue.connect([](const ConfigObject::Ptr& command, const Value& oldValues, const Value& newValues) { + IcingaDB::CommandArgumentsChangedHandler(command, oldValues, newValues); + }); + CustomVarObject::OnVarsChangedWithOldValue.connect([](const ConfigObject::Ptr& object, const Value& oldValues, const Value& newValues) { + IcingaDB::CustomVarsChangedHandler(object, oldValues, newValues); + }); +} + +void IcingaDB::UpdateAllConfigObjects() +{ + m_Rcon->Sync(); + m_Rcon->FireAndForgetQuery({"XADD", "icinga:schema", "MAXLEN", "1", "*", "version", "5"}, Prio::Heartbeat); + + Log(LogInformation, "IcingaDB") << "Starting initial config/status dump"; + double startTime = Utility::GetTime(); + + SetOngoingDumpStart(startTime); + + Defer resetOngoingDumpStart ([this]() { + SetOngoingDumpStart(0); + }); + + // Use a Workqueue to pack objects in parallel + WorkQueue upq(25000, Configuration::Concurrency, LogNotice); + upq.SetName("IcingaDB:ConfigDump"); + + std::vector<Type::Ptr> types = GetTypes(); + + m_Rcon->SuppressQueryKind(Prio::CheckResult); + m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync); + + Defer unSuppress ([this]() { + m_Rcon->UnsuppressQueryKind(Prio::RuntimeStateSync); + m_Rcon->UnsuppressQueryKind(Prio::CheckResult); + }); + + // Add a new type=* state=wip entry to the stream and remove all previous entries (MAXLEN 1). + m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "MAXLEN", "1", "*", "key", "*", "state", "wip"}, Prio::Config); + + const std::vector<String> globalKeys = { + m_PrefixConfigObject + "customvar", + m_PrefixConfigObject + "action:url", + m_PrefixConfigObject + "notes:url", + m_PrefixConfigObject + "icon:image", + }; + DeleteKeys(m_Rcon, globalKeys, Prio::Config); + DeleteKeys(m_Rcon, {"icinga:nextupdate:host", "icinga:nextupdate:service"}, Prio::Config); + m_Rcon->Sync(); + + Defer resetDumpedGlobals ([this]() { + m_DumpedGlobals.CustomVar.Reset(); + m_DumpedGlobals.ActionUrl.Reset(); + m_DumpedGlobals.NotesUrl.Reset(); + m_DumpedGlobals.IconImage.Reset(); + }); + + upq.ParallelFor(types, false, [this](const Type::Ptr& type) { + String lcType = type->GetName().ToLower(); + ConfigType *ctype = dynamic_cast<ConfigType *>(type.get()); + if (!ctype) + return; + + auto& rcon (m_Rcons.at(ctype)); + + std::vector<String> keys = GetTypeOverwriteKeys(lcType); + DeleteKeys(rcon, keys, Prio::Config); + + WorkQueue upqObjectType(25000, Configuration::Concurrency, LogNotice); + upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType); + + std::map<String, String> redisCheckSums; + String configCheckSum = m_PrefixConfigCheckSum + lcType; + + upqObjectType.Enqueue([&rcon, &configCheckSum, &redisCheckSums]() { + String cursor = "0"; + + do { + Array::Ptr res = rcon->GetResultOfQuery({ + "HSCAN", configCheckSum, cursor, "COUNT", "1000" + }, Prio::Config); + + AddKvsToMap(res->Get(1), redisCheckSums); + + cursor = res->Get(0); + } while (cursor != "0"); + }); + + auto objectChunks (ChunkObjects(ctype->GetObjects(), 500)); + String configObject = m_PrefixConfigObject + lcType; + + // Skimmed away attributes and checksums HMSETs' keys and values by Redis key. + std::map<String, std::vector<std::vector<String>>> ourContentRaw {{configCheckSum, {}}, {configObject, {}}}; + std::mutex ourContentMutex; + + upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) { + std::map<String, std::vector<String>> hMSets; + // Two values are appended per object: Object ID (Hash encoded) and Object State (IcingaDB::SerializeState() -> JSON encoded) + std::vector<String> states = {"HMSET", m_PrefixConfigObject + lcType + ":state"}; + // Two values are appended per object: Object ID (Hash encoded) and State Checksum ({ "checksum": checksum } -> JSON encoded) + std::vector<String> statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"}; + std::vector<std::vector<String> > transaction = {{"MULTI"}}; + std::vector<String> hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; + + auto skimObjects ([&]() { + std::lock_guard<std::mutex> l (ourContentMutex); + + for (auto& kv : ourContentRaw) { + auto pos (hMSets.find(kv.first)); + + if (pos != hMSets.end()) { + kv.second.emplace_back(std::move(pos->second)); + hMSets.erase(pos); + } + } + }); + + bool dumpState = (lcType == "host" || lcType == "service"); + + size_t bulkCounter = 0; + for (const ConfigObject::Ptr& object : chunk) { + if (lcType != GetLowerCaseTypeNameDB(object)) + continue; + + std::vector<Dictionary::Ptr> runtimeUpdates; + CreateConfigUpdate(object, lcType, hMSets, runtimeUpdates, false); + + // Write out inital state for checkables + if (dumpState) { + String objectKey = GetObjectIdentifier(object); + Dictionary::Ptr state = SerializeState(dynamic_pointer_cast<Checkable>(object)); + + states.emplace_back(objectKey); + states.emplace_back(JsonEncode(state)); + + statesChksms.emplace_back(objectKey); + statesChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(state)}}))); + } + + bulkCounter++; + if (!(bulkCounter % 100)) { + skimObjects(); + + for (auto& kv : hMSets) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } + + if (states.size() > 2) { + transaction.emplace_back(std::move(states)); + transaction.emplace_back(std::move(statesChksms)); + states = {"HMSET", m_PrefixConfigObject + lcType + ":state"}; + statesChksms = {"HMSET", m_PrefixConfigCheckSum + lcType + ":state"}; + } + + hMSets = decltype(hMSets)(); + + if (transaction.size() > 1) { + transaction.push_back({"EXEC"}); + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + transaction = {{"MULTI"}}; + } + } + + auto checkable (dynamic_pointer_cast<Checkable>(object)); + + if (checkable && checkable->GetEnableActiveChecks()) { + auto zAdds (dynamic_pointer_cast<Service>(checkable) ? &serviceZAdds : &hostZAdds); + + zAdds->emplace_back(Convert::ToString(checkable->GetNextUpdate())); + zAdds->emplace_back(GetObjectIdentifier(checkable)); + + if (zAdds->size() >= 102u) { + std::vector<String> header (zAdds->begin(), zAdds->begin() + 2u); + + rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult); + + *zAdds = std::move(header); + } + } + } + + skimObjects(); + + for (auto& kv : hMSets) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } + + if (states.size() > 2) { + transaction.emplace_back(std::move(states)); + transaction.emplace_back(std::move(statesChksms)); + } + + if (transaction.size() > 1) { + transaction.push_back({"EXEC"}); + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + } + + for (auto zAdds : {&hostZAdds, &serviceZAdds}) { + if (zAdds->size() > 2u) { + rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult); + } + } + + Log(LogNotice, "IcingaDB") + << "Dumped " << bulkCounter << " objects of type " << lcType; + }); + + upqObjectType.Join(); + + if (upqObjectType.HasExceptions()) { + for (boost::exception_ptr exc : upqObjectType.GetExceptions()) { + if (exc) { + boost::rethrow_exception(exc); + } + } + } + + std::map<String, std::map<String, String>> ourContent; + + for (auto& source : ourContentRaw) { + auto& dest (ourContent[source.first]); + + upqObjectType.Enqueue([&]() { + for (auto& hMSet : source.second) { + for (decltype(hMSet.size()) i = 0, stop = hMSet.size() - 1u; i < stop; i += 2u) { + dest.emplace(std::move(hMSet[i]), std::move(hMSet[i + 1u])); + } + + hMSet.clear(); + } + + source.second.clear(); + }); + } + + upqObjectType.Join(); + ourContentRaw.clear(); + + auto& ourCheckSums (ourContent[configCheckSum]); + auto& ourObjects (ourContent[configObject]); + std::vector<String> setChecksum, setObject, delChecksum, delObject; + + auto redisCurrent (redisCheckSums.begin()); + auto redisEnd (redisCheckSums.end()); + auto ourCurrent (ourCheckSums.begin()); + auto ourEnd (ourCheckSums.end()); + + auto flushSets ([&]() { + auto affectedConfig (setObject.size() / 2u); + + setChecksum.insert(setChecksum.begin(), {"HMSET", configCheckSum}); + setObject.insert(setObject.begin(), {"HMSET", configObject}); + + std::vector<std::vector<String>> transaction; + + transaction.emplace_back(std::vector<String>{"MULTI"}); + transaction.emplace_back(std::move(setChecksum)); + transaction.emplace_back(std::move(setObject)); + transaction.emplace_back(std::vector<String>{"EXEC"}); + + setChecksum.clear(); + setObject.clear(); + + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig}); + }); + + auto flushDels ([&]() { + auto affectedConfig (delObject.size()); + + delChecksum.insert(delChecksum.begin(), {"HDEL", configCheckSum}); + delObject.insert(delObject.begin(), {"HDEL", configObject}); + + std::vector<std::vector<String>> transaction; + + transaction.emplace_back(std::vector<String>{"MULTI"}); + transaction.emplace_back(std::move(delChecksum)); + transaction.emplace_back(std::move(delObject)); + transaction.emplace_back(std::vector<String>{"EXEC"}); + + delChecksum.clear(); + delObject.clear(); + + rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig}); + }); + + auto setOne ([&]() { + setChecksum.emplace_back(ourCurrent->first); + setChecksum.emplace_back(ourCurrent->second); + setObject.emplace_back(ourCurrent->first); + setObject.emplace_back(ourObjects[ourCurrent->first]); + + if (setChecksum.size() == 100u) { + flushSets(); + } + }); + + auto delOne ([&]() { + delChecksum.emplace_back(redisCurrent->first); + delObject.emplace_back(redisCurrent->first); + + if (delChecksum.size() == 100u) { + flushDels(); + } + }); + + for (;;) { + if (redisCurrent == redisEnd) { + for (; ourCurrent != ourEnd; ++ourCurrent) { + setOne(); + } + + break; + } else if (ourCurrent == ourEnd) { + for (; redisCurrent != redisEnd; ++redisCurrent) { + delOne(); + } + + break; + } else if (redisCurrent->first < ourCurrent->first) { + delOne(); + ++redisCurrent; + } else if (redisCurrent->first > ourCurrent->first) { + setOne(); + ++ourCurrent; + } else { + if (redisCurrent->second != ourCurrent->second) { + setOne(); + } + + ++redisCurrent; + ++ourCurrent; + } + } + + if (delChecksum.size()) { + flushDels(); + } + + if (setChecksum.size()) { + flushSets(); + } + + for (auto& key : GetTypeDumpSignalKeys(type)) { + rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config); + } + rcon->Sync(); + }); + + upq.Join(); + + if (upq.HasExceptions()) { + for (boost::exception_ptr exc : upq.GetExceptions()) { + try { + if (exc) { + boost::rethrow_exception(exc); + } + } catch(const std::exception& e) { + Log(LogCritical, "IcingaDB") + << "Exception during ConfigDump: " << e.what(); + } + } + } + + for (auto& key : globalKeys) { + m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config); + } + + m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", "*", "state", "done"}, Prio::Config); + + // enqueue a callback that will notify us once all previous queries were executed and wait for this event + std::promise<void> p; + m_Rcon->EnqueueCallback([&p](boost::asio::yield_context& yc) { p.set_value(); }, Prio::Config); + p.get_future().wait(); + + auto endTime (Utility::GetTime()); + auto took (endTime - startTime); + + SetLastdumpTook(took); + SetLastdumpEnd(endTime); + + Log(LogInformation, "IcingaDB") + << "Initial config/status dump finished in " << took << " seconds."; +} + +std::vector<std::vector<intrusive_ptr<ConfigObject>>> IcingaDB::ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize) { + std::vector<std::vector<intrusive_ptr<ConfigObject>>> chunks; + auto offset (objects.begin()); + auto end (objects.end()); + + chunks.reserve((std::distance(offset, end) + chunkSize - 1) / chunkSize); + + while (std::distance(offset, end) >= chunkSize) { + auto until (offset + chunkSize); + chunks.emplace_back(offset, until); + offset = until; + } + + if (offset != end) { + chunks.emplace_back(offset, end); + } + + return chunks; +} + +void IcingaDB::DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<String>& keys, RedisConnection::QueryPriority priority) { + std::vector<String> query = {"DEL"}; + for (auto& key : keys) { + query.emplace_back(key); + } + + conn->FireAndForgetQuery(std::move(query), priority); +} + +std::vector<String> IcingaDB::GetTypeOverwriteKeys(const String& type) +{ + std::vector<String> keys = { + m_PrefixConfigObject + type + ":customvar", + }; + + if (type == "host" || type == "service" || type == "user") { + keys.emplace_back(m_PrefixConfigObject + type + "group:member"); + keys.emplace_back(m_PrefixConfigObject + type + ":state"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":state"); + } else if (type == "timeperiod") { + keys.emplace_back(m_PrefixConfigObject + type + ":override:include"); + keys.emplace_back(m_PrefixConfigObject + type + ":override:exclude"); + keys.emplace_back(m_PrefixConfigObject + type + ":range"); + } else if (type == "notification") { + keys.emplace_back(m_PrefixConfigObject + type + ":user"); + keys.emplace_back(m_PrefixConfigObject + type + ":usergroup"); + keys.emplace_back(m_PrefixConfigObject + type + ":recipient"); + } else if (type == "checkcommand" || type == "notificationcommand" || type == "eventcommand") { + keys.emplace_back(m_PrefixConfigObject + type + ":envvar"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":envvar"); + keys.emplace_back(m_PrefixConfigObject + type + ":argument"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":argument"); + } + + return keys; +} + +std::vector<String> IcingaDB::GetTypeDumpSignalKeys(const Type::Ptr& type) +{ + String lcType = type->GetName().ToLower(); + std::vector<String> keys = {m_PrefixConfigObject + lcType}; + + if (CustomVarObject::TypeInstance->IsAssignableFrom(type)) { + keys.emplace_back(m_PrefixConfigObject + lcType + ":customvar"); + } + + if (type == Host::TypeInstance || type == Service::TypeInstance) { + keys.emplace_back(m_PrefixConfigObject + lcType + "group:member"); + keys.emplace_back(m_PrefixConfigObject + lcType + ":state"); + } else if (type == User::TypeInstance) { + keys.emplace_back(m_PrefixConfigObject + lcType + "group:member"); + } else if (type == TimePeriod::TypeInstance) { + keys.emplace_back(m_PrefixConfigObject + lcType + ":override:include"); + keys.emplace_back(m_PrefixConfigObject + lcType + ":override:exclude"); + keys.emplace_back(m_PrefixConfigObject + lcType + ":range"); + } else if (type == Notification::TypeInstance) { + keys.emplace_back(m_PrefixConfigObject + lcType + ":user"); + keys.emplace_back(m_PrefixConfigObject + lcType + ":usergroup"); + keys.emplace_back(m_PrefixConfigObject + lcType + ":recipient"); + } else if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance) { + keys.emplace_back(m_PrefixConfigObject + lcType + ":envvar"); + keys.emplace_back(m_PrefixConfigObject + lcType + ":argument"); + } + + return keys; +} + +template<typename ConfigType> +static ConfigObject::Ptr GetObjectByName(const String& name) +{ + return ConfigObject::GetObject<ConfigType>(name); +} + +void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets, + std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate) +{ + String objectKey = GetObjectIdentifier(object); + String objectKeyName = typeName + "_id"; + + Type::Ptr type = object->GetReflectionType(); + + CustomVarObject::Ptr customVarObject = dynamic_pointer_cast<CustomVarObject>(object); + + if (customVarObject) { + auto vars(SerializeVars(customVarObject->GetVars())); + if (vars) { + auto& typeCvs (hMSets[m_PrefixConfigObject + typeName + ":customvar"]); + auto& allCvs (hMSets[m_PrefixConfigObject + "customvar"]); + + ObjectLock varsLock(vars); + Array::Ptr varsArray(new Array); + + varsArray->Reserve(vars->GetLength()); + + for (auto& kv : vars) { + if (runtimeUpdate || m_DumpedGlobals.CustomVar.IsNew(kv.first)) { + allCvs.emplace_back(kv.first); + allCvs.emplace_back(JsonEncode(kv.second)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, kv.first, m_PrefixConfigObject + "customvar", kv.second); + } + } + + String id = HashValue(new Array({m_EnvironmentId, kv.first, object->GetName()})); + typeCvs.emplace_back(id); + + Dictionary::Ptr data = new Dictionary({{objectKeyName, objectKey}, {"environment_id", m_EnvironmentId}, {"customvar_id", kv.first}}); + typeCvs.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":customvar", data); + } + } + } + } + + if (type == Host::TypeInstance || type == Service::TypeInstance) { + Checkable::Ptr checkable = static_pointer_cast<Checkable>(object); + + String actionUrl = checkable->GetActionUrl(); + String notesUrl = checkable->GetNotesUrl(); + String iconImage = checkable->GetIconImage(); + if (!actionUrl.IsEmpty()) { + auto& actionUrls (hMSets[m_PrefixConfigObject + "action:url"]); + + auto id (HashValue(new Array({m_EnvironmentId, actionUrl}))); + + if (runtimeUpdate || m_DumpedGlobals.ActionUrl.IsNew(id)) { + actionUrls.emplace_back(std::move(id)); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"action_url", actionUrl}}); + actionUrls.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, actionUrls.at(actionUrls.size() - 2u), m_PrefixConfigObject + "action:url", data); + } + } + } + if (!notesUrl.IsEmpty()) { + auto& notesUrls (hMSets[m_PrefixConfigObject + "notes:url"]); + + auto id (HashValue(new Array({m_EnvironmentId, notesUrl}))); + + if (runtimeUpdate || m_DumpedGlobals.NotesUrl.IsNew(id)) { + notesUrls.emplace_back(std::move(id)); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"notes_url", notesUrl}}); + notesUrls.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, notesUrls.at(notesUrls.size() - 2u), m_PrefixConfigObject + "notes:url", data); + } + } + } + if (!iconImage.IsEmpty()) { + auto& iconImages (hMSets[m_PrefixConfigObject + "icon:image"]); + + auto id (HashValue(new Array({m_EnvironmentId, iconImage}))); + + if (runtimeUpdate || m_DumpedGlobals.IconImage.IsNew(id)) { + iconImages.emplace_back(std::move(id)); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"icon_image", iconImage}}); + iconImages.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, iconImages.at(iconImages.size() - 2u), m_PrefixConfigObject + "icon:image", data); + } + } + } + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + ConfigObject::Ptr (*getGroup)(const String& name); + Array::Ptr groups; + if (service) { + groups = service->GetGroups(); + getGroup = &::GetObjectByName<ServiceGroup>; + } else { + groups = host->GetGroups(); + getGroup = &::GetObjectByName<HostGroup>; + } + + if (groups) { + ObjectLock groupsLock(groups); + Array::Ptr groupIds(new Array); + + groupIds->Reserve(groups->GetLength()); + + auto& members (hMSets[m_PrefixConfigObject + typeName + "group:member"]); + + for (auto& group : groups) { + auto groupObj ((*getGroup)(group)); + String groupId = GetObjectIdentifier(groupObj); + String id = HashValue(new Array({m_EnvironmentId, groupObj->GetName(), object->GetName()})); + members.emplace_back(id); + Dictionary::Ptr data = new Dictionary({{objectKeyName, objectKey}, {"environment_id", m_EnvironmentId}, {typeName + "group_id", groupId}}); + members.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + "group:member", data); + } + + groupIds->Add(groupId); + } + } + + return; + } + + if (type == TimePeriod::TypeInstance) { + TimePeriod::Ptr timeperiod = static_pointer_cast<TimePeriod>(object); + + Dictionary::Ptr ranges = timeperiod->GetRanges(); + if (ranges) { + ObjectLock rangesLock(ranges); + Array::Ptr rangeIds(new Array); + auto& typeRanges (hMSets[m_PrefixConfigObject + typeName + ":range"]); + + rangeIds->Reserve(ranges->GetLength()); + + for (auto& kv : ranges) { + String rangeId = HashValue(new Array({m_EnvironmentId, kv.first, kv.second})); + rangeIds->Add(rangeId); + + String id = HashValue(new Array({m_EnvironmentId, kv.first, kv.second, object->GetName()})); + typeRanges.emplace_back(id); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}}); + typeRanges.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":range", data); + } + } + } + + Array::Ptr includes; + ConfigObject::Ptr (*getInclude)(const String& name); + includes = timeperiod->GetIncludes(); + getInclude = &::GetObjectByName<TimePeriod>; + + Array::Ptr includeChecksums = new Array(); + + ObjectLock includesLock(includes); + ObjectLock includeChecksumsLock(includeChecksums); + + includeChecksums->Reserve(includes->GetLength()); + + + auto& includs (hMSets[m_PrefixConfigObject + typeName + ":override:include"]); + for (auto include : includes) { + auto includeTp ((*getInclude)(include.Get<String>())); + String includeId = GetObjectIdentifier(includeTp); + includeChecksums->Add(includeId); + + String id = HashValue(new Array({m_EnvironmentId, includeTp->GetName(), object->GetName()})); + includs.emplace_back(id); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"include_id", includeId}}); + includs.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":override:include", data); + } + } + + Array::Ptr excludes; + ConfigObject::Ptr (*getExclude)(const String& name); + + excludes = timeperiod->GetExcludes(); + getExclude = &::GetObjectByName<TimePeriod>; + + Array::Ptr excludeChecksums = new Array(); + + ObjectLock excludesLock(excludes); + ObjectLock excludeChecksumsLock(excludeChecksums); + + excludeChecksums->Reserve(excludes->GetLength()); + + auto& excluds (hMSets[m_PrefixConfigObject + typeName + ":override:exclude"]); + + for (auto exclude : excludes) { + auto excludeTp ((*getExclude)(exclude.Get<String>())); + String excludeId = GetObjectIdentifier(excludeTp); + excludeChecksums->Add(excludeId); + + String id = HashValue(new Array({m_EnvironmentId, excludeTp->GetName(), object->GetName()})); + excluds.emplace_back(id); + Dictionary::Ptr data = new Dictionary({{"environment_id", m_EnvironmentId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}}); + excluds.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":override:exclude", data); + } + } + + return; + } + + if (type == User::TypeInstance) { + User::Ptr user = static_pointer_cast<User>(object); + Array::Ptr groups = user->GetGroups(); + + if (groups) { + ObjectLock groupsLock(groups); + Array::Ptr groupIds(new Array); + + groupIds->Reserve(groups->GetLength()); + + auto& members (hMSets[m_PrefixConfigObject + typeName + "group:member"]); + auto& notificationRecipients (hMSets[m_PrefixConfigObject + "notification:recipient"]); + + for (auto& group : groups) { + UserGroup::Ptr groupObj = UserGroup::GetByName(group); + String groupId = GetObjectIdentifier(groupObj); + String id = HashValue(new Array({m_EnvironmentId, groupObj->GetName(), object->GetName()})); + members.emplace_back(id); + Dictionary::Ptr data = new Dictionary({{"user_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", groupId}}); + members.emplace_back(JsonEncode(data)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + "group:member", data); + + // Recipients are handled by notifications during initial dumps and only need to be handled here during runtime (e.g. User creation). + for (auto& notification : groupObj->GetNotifications()) { + String recipientId = HashValue(new Array({m_EnvironmentId, "usergroupuser", user->GetName(), groupObj->GetName(), notification->GetName()})); + notificationRecipients.emplace_back(recipientId); + Dictionary::Ptr recipientData = new Dictionary({{"notification_id", GetObjectIdentifier(notification)}, {"environment_id", m_EnvironmentId}, {"user_id", objectKey}, {"usergroup_id", groupId}}); + notificationRecipients.emplace_back(JsonEncode(recipientData)); + + AddObjectDataToRuntimeUpdates(runtimeUpdates, recipientId, m_PrefixConfigObject + "notification:recipient", recipientData); + } + } + + groupIds->Add(groupId); + } + } + + return; + } + + if (type == Notification::TypeInstance) { + Notification::Ptr notification = static_pointer_cast<Notification>(object); + + std::set<User::Ptr> users = notification->GetUsers(); + Array::Ptr userIds = new Array(); + + auto usergroups(notification->GetUserGroups()); + Array::Ptr usergroupIds = new Array(); + + userIds->Reserve(users.size()); + + auto& usrs (hMSets[m_PrefixConfigObject + typeName + ":user"]); + auto& notificationRecipients (hMSets[m_PrefixConfigObject + typeName + ":recipient"]); + + for (auto& user : users) { + String userId = GetObjectIdentifier(user); + String id = HashValue(new Array({m_EnvironmentId, "user", user->GetName(), object->GetName()})); + usrs.emplace_back(id); + notificationRecipients.emplace_back(id); + + Dictionary::Ptr data = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}}); + String dataJson = JsonEncode(data); + usrs.emplace_back(dataJson); + notificationRecipients.emplace_back(dataJson); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":user", data); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":recipient", data); + } + + userIds->Add(userId); + } + + usergroupIds->Reserve(usergroups.size()); + + auto& groups (hMSets[m_PrefixConfigObject + typeName + ":usergroup"]); + + for (auto& usergroup : usergroups) { + String usergroupId = GetObjectIdentifier(usergroup); + String id = HashValue(new Array({m_EnvironmentId, "usergroup", usergroup->GetName(), object->GetName()})); + groups.emplace_back(id); + notificationRecipients.emplace_back(id); + + Dictionary::Ptr groupData = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"usergroup_id", usergroupId}}); + String groupDataJson = JsonEncode(groupData); + groups.emplace_back(groupDataJson); + notificationRecipients.emplace_back(groupDataJson); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":usergroup", groupData); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":recipient", groupData); + } + + for (const User::Ptr& user : usergroup->GetMembers()) { + String userId = GetObjectIdentifier(user); + String recipientId = HashValue(new Array({m_EnvironmentId, "usergroupuser", user->GetName(), usergroup->GetName(), notification->GetName()})); + notificationRecipients.emplace_back(recipientId); + Dictionary::Ptr userData = new Dictionary({{"notification_id", objectKey}, {"environment_id", m_EnvironmentId}, {"user_id", userId}, {"usergroup_id", usergroupId}}); + notificationRecipients.emplace_back(JsonEncode(userData)); + + if (runtimeUpdate) { + AddObjectDataToRuntimeUpdates(runtimeUpdates, recipientId, m_PrefixConfigObject + typeName + ":recipient", userData); + } + } + + usergroupIds->Add(usergroupId); + } + + return; + } + + if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance) { + Command::Ptr command = static_pointer_cast<Command>(object); + + Dictionary::Ptr arguments = command->GetArguments(); + if (arguments) { + ObjectLock argumentsLock(arguments); + auto& typeArgs (hMSets[m_PrefixConfigObject + typeName + ":argument"]); + auto& argChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":argument"]); + + for (auto& kv : arguments) { + Dictionary::Ptr values; + if (kv.second.IsObjectType<Dictionary>()) { + values = kv.second; + values = values->ShallowClone(); + } else if (kv.second.IsObjectType<Array>()) { + values = new Dictionary({{"value", JsonEncode(kv.second)}}); + } else { + values = new Dictionary({{"value", kv.second}}); + } + + for (const char *attr : {"value", "set_if", "separator"}) { + Value value; + + // Stringify if set. + if (values->Get(attr, &value)) { + switch (value.GetType()) { + case ValueEmpty: + case ValueString: + break; + case ValueObject: + values->Set(attr, value.Get<Object::Ptr>()->ToString()); + break; + default: + values->Set(attr, JsonEncode(value)); + } + } + } + + for (const char *attr : {"repeat_key", "required", "skip_key"}) { + Value value; + + // Boolify if set. + if (values->Get(attr, &value)) { + values->Set(attr, value.ToBool()); + } + } + + { + Value order; + + // Intify if set. + if (values->Get("order", &order)) { + values->Set("order", (int)order); + } + } + + values->Set(objectKeyName, objectKey); + values->Set("argument_key", kv.first); + values->Set("environment_id", m_EnvironmentId); + + String id = HashValue(new Array({m_EnvironmentId, kv.first, object->GetName()})); + + typeArgs.emplace_back(id); + typeArgs.emplace_back(JsonEncode(values)); + + argChksms.emplace_back(id); + String checksum = HashValue(kv.second); + argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}}))); + + if (runtimeUpdate) { + values->Set("checksum", checksum); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":argument", values); + } + } + } + + Dictionary::Ptr envvars = command->GetEnv(); + if (envvars) { + ObjectLock envvarsLock(envvars); + Array::Ptr envvarIds(new Array); + auto& typeVars (hMSets[m_PrefixConfigObject + typeName + ":envvar"]); + auto& varChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":envvar"]); + + envvarIds->Reserve(envvars->GetLength()); + + for (auto& kv : envvars) { + Dictionary::Ptr values; + if (kv.second.IsObjectType<Dictionary>()) { + values = kv.second; + values = values->ShallowClone(); + } else if (kv.second.IsObjectType<Array>()) { + values = new Dictionary({{"value", JsonEncode(kv.second)}}); + } else { + values = new Dictionary({{"value", kv.second}}); + } + + { + Value value; + + // JsonEncode() the value if it's set. + if (values->Get("value", &value)) { + values->Set("value", JsonEncode(value)); + } + } + + values->Set(objectKeyName, objectKey); + values->Set("envvar_key", kv.first); + values->Set("environment_id", m_EnvironmentId); + + String id = HashValue(new Array({m_EnvironmentId, kv.first, object->GetName()})); + + typeVars.emplace_back(id); + typeVars.emplace_back(JsonEncode(values)); + + varChksms.emplace_back(id); + String checksum = HashValue(kv.second); + varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}}))); + + if (runtimeUpdate) { + values->Set("checksum", checksum); + AddObjectDataToRuntimeUpdates(runtimeUpdates, id, m_PrefixConfigObject + typeName + ":envvar", values); + } + } + } + + return; + } +} + +/** + * Update the state information of a checkable in Redis. + * + * What is updated exactly depends on the mode parameter: + * - Volatile: Update the volatile state information stored in icinga:host:state or icinga:service:state as well as + * the corresponding checksum stored in icinga:checksum:host:state or icinga:checksum:service:state. + * - RuntimeOnly: Write a runtime update to the icinga:runtime:state stream. It is up to the caller to ensure that + * identical volatile state information was already written before to avoid inconsistencies. This mode is only + * useful to upgrade a previous Volatile to a Full operation, otherwise Full should be used. + * - Full: Perform an update of all state information in Redis, that is updating the volatile information and sending + * a corresponding runtime update so that this state update gets written through to the persistent database by a + * running icingadb process. + * + * @param checkable State of this checkable is updated in Redis + * @param mode Mode of operation (StateUpdate::Volatile, StateUpdate::RuntimeOnly, or StateUpdate::Full) + */ +void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + String objectType = GetLowerCaseTypeNameDB(checkable); + String objectKey = GetObjectIdentifier(checkable); + + Dictionary::Ptr stateAttrs = SerializeState(checkable); + + String redisStateKey = m_PrefixConfigObject + objectType + ":state"; + String redisChecksumKey = m_PrefixConfigCheckSum + objectType + ":state"; + String checksum = HashValue(stateAttrs); + + if (mode & StateUpdate::Volatile) { + m_Rcon->FireAndForgetQueries({ + {"HSET", redisStateKey, objectKey, JsonEncode(stateAttrs)}, + {"HSET", redisChecksumKey, objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))}, + }, Prio::RuntimeStateSync); + } + + if (mode & StateUpdate::RuntimeOnly) { + ObjectLock olock(stateAttrs); + + std::vector<String> streamadd({ + "XADD", "icinga:runtime:state", "MAXLEN", "~", "1000000", "*", + "runtime_type", "upsert", + "redis_key", redisStateKey, + "checksum", checksum, + }); + + for (const Dictionary::Pair& kv : stateAttrs) { + streamadd.emplace_back(kv.first); + streamadd.emplace_back(IcingaToStreamValue(kv.second)); + } + + m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::RuntimeStateStream, {0, 1}); + } +} + +// Used to update a single object, used for runtime updates +void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + String typeName = GetLowerCaseTypeNameDB(object); + + std::map<String, std::vector<String>> hMSets; + std::vector<Dictionary::Ptr> runtimeUpdates; + + CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate); + Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object); + if (checkable) { + UpdateState(checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile); + } + + std::vector<std::vector<String> > transaction = {{"MULTI"}}; + + for (auto& kv : hMSets) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } + + for (auto& objectAttributes : runtimeUpdates) { + std::vector<String> xAdd({"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*"}); + ObjectLock olock(objectAttributes); + + for (const Dictionary::Pair& kv : objectAttributes) { + String value = IcingaToStreamValue(kv.second); + if (!value.IsEmpty()) { + xAdd.emplace_back(kv.first); + xAdd.emplace_back(value); + } + } + + transaction.emplace_back(std::move(xAdd)); + } + + if (transaction.size() > 1) { + transaction.push_back({"EXEC"}); + m_Rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1}); + } + + if (checkable) { + SendNextUpdate(checkable); + } +} + +void IcingaDB::AddObjectDataToRuntimeUpdates(std::vector<Dictionary::Ptr>& runtimeUpdates, const String& objectKey, + const String& redisKey, const Dictionary::Ptr& data) +{ + Dictionary::Ptr dataClone = data->ShallowClone(); + dataClone->Set("id", objectKey); + dataClone->Set("redis_key", redisKey); + dataClone->Set("runtime_type", "upsert"); + runtimeUpdates.emplace_back(dataClone); +} + +// Takes object and collects IcingaDB relevant attributes and computes checksums. Returns whether the object is relevant +// for IcingaDB. +bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checksums) +{ + auto originalAttrs (object->GetOriginalAttributes()); + + if (originalAttrs) { + originalAttrs = originalAttrs->ShallowClone(); + } + + attributes->Set("name_checksum", SHA1(object->GetName())); + attributes->Set("environment_id", m_EnvironmentId); + attributes->Set("name", object->GetName()); + attributes->Set("original_attributes", originalAttrs); + + Zone::Ptr ObjectsZone; + Type::Ptr type = object->GetReflectionType(); + + if (type == Endpoint::TypeInstance) { + ObjectsZone = static_cast<Endpoint*>(object.get())->GetZone(); + } else { + ObjectsZone = static_pointer_cast<Zone>(object->GetZone()); + } + + if (ObjectsZone) { + attributes->Set("zone_id", GetObjectIdentifier(ObjectsZone)); + attributes->Set("zone_name", ObjectsZone->GetName()); + } + + if (type == Endpoint::TypeInstance) { + return true; + } + + if (type == Zone::TypeInstance) { + Zone::Ptr zone = static_pointer_cast<Zone>(object); + + attributes->Set("is_global", zone->GetGlobal()); + + Zone::Ptr parent = zone->GetParent(); + if (parent) { + attributes->Set("parent_id", GetObjectIdentifier(parent)); + } + + auto parentsRaw (zone->GetAllParentsRaw()); + attributes->Set("depth", parentsRaw.size()); + + return true; + } + + if (type == Host::TypeInstance || type == Service::TypeInstance) { + Checkable::Ptr checkable = static_pointer_cast<Checkable>(object); + auto checkTimeout (checkable->GetCheckTimeout()); + + attributes->Set("checkcommand_name", checkable->GetCheckCommand()->GetName()); + attributes->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + attributes->Set("check_timeout", checkTimeout.IsEmpty() ? checkable->GetCheckCommand()->GetTimeout() : (double)checkTimeout); + attributes->Set("check_interval", checkable->GetCheckInterval()); + attributes->Set("check_retry_interval", checkable->GetRetryInterval()); + attributes->Set("active_checks_enabled", checkable->GetEnableActiveChecks()); + attributes->Set("passive_checks_enabled", checkable->GetEnablePassiveChecks()); + attributes->Set("event_handler_enabled", checkable->GetEnableEventHandler()); + attributes->Set("notifications_enabled", checkable->GetEnableNotifications()); + attributes->Set("flapping_enabled", checkable->GetEnableFlapping()); + attributes->Set("flapping_threshold_low", checkable->GetFlappingThresholdLow()); + attributes->Set("flapping_threshold_high", checkable->GetFlappingThresholdHigh()); + attributes->Set("perfdata_enabled", checkable->GetEnablePerfdata()); + attributes->Set("is_volatile", checkable->GetVolatile()); + attributes->Set("notes", checkable->GetNotes()); + attributes->Set("icon_image_alt", checkable->GetIconImageAlt()); + + attributes->Set("checkcommand_id", GetObjectIdentifier(checkable->GetCheckCommand())); + + Endpoint::Ptr commandEndpoint = checkable->GetCommandEndpoint(); + if (commandEndpoint) { + attributes->Set("command_endpoint_id", GetObjectIdentifier(commandEndpoint)); + attributes->Set("command_endpoint_name", commandEndpoint->GetName()); + } + + TimePeriod::Ptr timePeriod = checkable->GetCheckPeriod(); + if (timePeriod) { + attributes->Set("check_timeperiod_id", GetObjectIdentifier(timePeriod)); + attributes->Set("check_timeperiod_name", timePeriod->GetName()); + } + + EventCommand::Ptr eventCommand = checkable->GetEventCommand(); + if (eventCommand) { + attributes->Set("eventcommand_id", GetObjectIdentifier(eventCommand)); + attributes->Set("eventcommand_name", eventCommand->GetName()); + } + + String actionUrl = checkable->GetActionUrl(); + String notesUrl = checkable->GetNotesUrl(); + String iconImage = checkable->GetIconImage(); + if (!actionUrl.IsEmpty()) + attributes->Set("action_url_id", HashValue(new Array({m_EnvironmentId, actionUrl}))); + if (!notesUrl.IsEmpty()) + attributes->Set("notes_url_id", HashValue(new Array({m_EnvironmentId, notesUrl}))); + if (!iconImage.IsEmpty()) + attributes->Set("icon_image_id", HashValue(new Array({m_EnvironmentId, iconImage}))); + + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + if (service) { + attributes->Set("host_id", GetObjectIdentifier(service->GetHost())); + attributes->Set("display_name", service->GetDisplayName()); + + // Overwrite name here, `object->name` is 'HostName!ServiceName' but we only want the name of the Service + attributes->Set("name", service->GetShortName()); + } else { + attributes->Set("display_name", host->GetDisplayName()); + attributes->Set("address", host->GetAddress()); + attributes->Set("address6", host->GetAddress6()); + } + + return true; + } + + if (type == User::TypeInstance) { + User::Ptr user = static_pointer_cast<User>(object); + + attributes->Set("display_name", user->GetDisplayName()); + attributes->Set("email", user->GetEmail()); + attributes->Set("pager", user->GetPager()); + attributes->Set("notifications_enabled", user->GetEnableNotifications()); + attributes->Set("states", user->GetStates()); + attributes->Set("types", user->GetTypes()); + + if (user->GetPeriod()) + attributes->Set("timeperiod_id", GetObjectIdentifier(user->GetPeriod())); + + return true; + } + + if (type == TimePeriod::TypeInstance) { + TimePeriod::Ptr timeperiod = static_pointer_cast<TimePeriod>(object); + + attributes->Set("display_name", timeperiod->GetDisplayName()); + attributes->Set("prefer_includes", timeperiod->GetPreferIncludes()); + return true; + } + + if (type == Notification::TypeInstance) { + Notification::Ptr notification = static_pointer_cast<Notification>(object); + + Host::Ptr host; + Service::Ptr service; + + tie(host, service) = GetHostService(notification->GetCheckable()); + + attributes->Set("notificationcommand_id", GetObjectIdentifier(notification->GetCommand())); + + attributes->Set("host_id", GetObjectIdentifier(host)); + if (service) + attributes->Set("service_id", GetObjectIdentifier(service)); + + TimePeriod::Ptr timeperiod = notification->GetPeriod(); + if (timeperiod) + attributes->Set("timeperiod_id", GetObjectIdentifier(timeperiod)); + + if (notification->GetTimes()) { + auto begin (notification->GetTimes()->Get("begin")); + auto end (notification->GetTimes()->Get("end")); + + if (begin != Empty && (double)begin >= 0) { + attributes->Set("times_begin", std::round((double)begin)); + } + + if (end != Empty && (double)end >= 0) { + attributes->Set("times_end", std::round((double)end)); + } + } + + attributes->Set("notification_interval", std::max(0.0, std::round(notification->GetInterval()))); + attributes->Set("states", notification->GetStates()); + attributes->Set("types", notification->GetTypes()); + + return true; + } + + if (type == Comment::TypeInstance) { + Comment::Ptr comment = static_pointer_cast<Comment>(object); + + attributes->Set("author", comment->GetAuthor()); + attributes->Set("text", comment->GetText()); + attributes->Set("entry_type", comment->GetEntryType()); + attributes->Set("entry_time", TimestampToMilliseconds(comment->GetEntryTime())); + attributes->Set("is_persistent", comment->GetPersistent()); + attributes->Set("is_sticky", comment->GetSticky()); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(comment->GetCheckable()); + + attributes->Set("host_id", GetObjectIdentifier(host)); + if (service) { + attributes->Set("object_type", "service"); + attributes->Set("service_id", GetObjectIdentifier(service)); + } else + attributes->Set("object_type", "host"); + + auto expireTime (comment->GetExpireTime()); + + if (expireTime > 0) { + attributes->Set("expire_time", TimestampToMilliseconds(expireTime)); + } + + return true; + } + + if (type == Downtime::TypeInstance) { + Downtime::Ptr downtime = static_pointer_cast<Downtime>(object); + + attributes->Set("author", downtime->GetAuthor()); + attributes->Set("comment", downtime->GetComment()); + attributes->Set("entry_time", TimestampToMilliseconds(downtime->GetEntryTime())); + attributes->Set("scheduled_start_time", TimestampToMilliseconds(downtime->GetStartTime())); + attributes->Set("scheduled_end_time", TimestampToMilliseconds(downtime->GetEndTime())); + attributes->Set("scheduled_duration", TimestampToMilliseconds(std::max(0.0, downtime->GetEndTime() - downtime->GetStartTime()))); + attributes->Set("flexible_duration", TimestampToMilliseconds(std::max(0.0, downtime->GetDuration()))); + attributes->Set("is_flexible", !downtime->GetFixed()); + attributes->Set("is_in_effect", downtime->IsInEffect()); + if (downtime->IsInEffect()) { + attributes->Set("start_time", TimestampToMilliseconds(downtime->GetTriggerTime())); + + attributes->Set("end_time", TimestampToMilliseconds( + downtime->GetFixed() ? downtime->GetEndTime() : (downtime->GetTriggerTime() + std::max(0.0, downtime->GetDuration())) + )); + } + + auto duration = downtime->GetDuration(); + if (downtime->GetFixed()) { + duration = downtime->GetEndTime() - downtime->GetStartTime(); + } + attributes->Set("duration", TimestampToMilliseconds(std::max(0.0, duration))); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(downtime->GetCheckable()); + + attributes->Set("host_id", GetObjectIdentifier(host)); + if (service) { + attributes->Set("object_type", "service"); + attributes->Set("service_id", GetObjectIdentifier(service)); + } else + attributes->Set("object_type", "host"); + + auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); + if (triggeredBy) { + attributes->Set("triggered_by_id", GetObjectIdentifier(triggeredBy)); + } + + auto scheduledBy (downtime->GetScheduledBy()); + if (!scheduledBy.IsEmpty()) { + attributes->Set("scheduled_by", scheduledBy); + } + + auto parent (Downtime::GetByName(downtime->GetParent())); + if (parent) { + attributes->Set("parent_id", GetObjectIdentifier(parent)); + } + + return true; + } + + if (type == UserGroup::TypeInstance) { + UserGroup::Ptr userGroup = static_pointer_cast<UserGroup>(object); + + attributes->Set("display_name", userGroup->GetDisplayName()); + + return true; + } + + if (type == HostGroup::TypeInstance) { + HostGroup::Ptr hostGroup = static_pointer_cast<HostGroup>(object); + + attributes->Set("display_name", hostGroup->GetDisplayName()); + + return true; + } + + if (type == ServiceGroup::TypeInstance) { + ServiceGroup::Ptr serviceGroup = static_pointer_cast<ServiceGroup>(object); + + attributes->Set("display_name", serviceGroup->GetDisplayName()); + + return true; + } + + if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance) { + Command::Ptr command = static_pointer_cast<Command>(object); + + attributes->Set("command", JsonEncode(command->GetCommandLine())); + attributes->Set("timeout", std::max(0, command->GetTimeout())); + + return true; + } + + return false; +} + +/* Creates a config update with computed checksums etc. + * Writes attributes, customVars and checksums into the respective supplied vectors. Adds two values to each vector + * (if applicable), first the key then the value. To use in a Redis command the command (e.g. HSET) and the key (e.g. + * icinga:config:object:downtime) need to be prepended. There is nothing to indicate success or failure. + */ +void +IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets, + std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate) +{ + /* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup. + if (!runtimeUpdate && m_ConfigDumpInProgress) + return; + */ + + if (m_Rcon == nullptr) + return; + + Dictionary::Ptr attr = new Dictionary; + Dictionary::Ptr chksm = new Dictionary; + + if (!PrepareObject(object, attr, chksm)) + return; + + InsertObjectDependencies(object, typeName, hMSets, runtimeUpdates, runtimeUpdate); + + String objectKey = GetObjectIdentifier(object); + auto& attrs (hMSets[m_PrefixConfigObject + typeName]); + auto& chksms (hMSets[m_PrefixConfigCheckSum + typeName]); + + attrs.emplace_back(objectKey); + attrs.emplace_back(JsonEncode(attr)); + + String checksum = HashValue(attr); + chksms.emplace_back(objectKey); + chksms.emplace_back(JsonEncode(new Dictionary({{"checksum", checksum}}))); + + /* Send an update event to subscribers. */ + if (runtimeUpdate) { + attr->Set("checksum", checksum); + AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, m_PrefixConfigObject + typeName, attr); + } +} + +void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + Type::Ptr type = object->GetReflectionType(); + String typeName = type->GetName().ToLower(); + String objectKey = GetObjectIdentifier(object); + + m_Rcon->FireAndForgetQueries({ + {"HDEL", m_PrefixConfigObject + typeName, objectKey}, + {"HDEL", m_PrefixConfigCheckSum + typeName, objectKey}, + { + "XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*", + "redis_key", m_PrefixConfigObject + typeName, "id", objectKey, "runtime_type", "delete" + } + }, Prio::Config); + + CustomVarObject::Ptr customVarObject = dynamic_pointer_cast<CustomVarObject>(object); + + if (customVarObject) { + Dictionary::Ptr vars = customVarObject->GetVars(); + SendCustomVarsChanged(object, vars, nullptr); + } + + if (type == Host::TypeInstance || type == Service::TypeInstance) { + Checkable::Ptr checkable = static_pointer_cast<Checkable>(object); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + m_Rcon->FireAndForgetQuery({ + "ZREM", + service ? "icinga:nextupdate:service" : "icinga:nextupdate:host", + GetObjectIdentifier(checkable) + }, Prio::CheckResult); + + m_Rcon->FireAndForgetQueries({ + {"HDEL", m_PrefixConfigObject + typeName + ":state", objectKey}, + {"HDEL", m_PrefixConfigCheckSum + typeName + ":state", objectKey} + }, Prio::RuntimeStateSync); + + if (service) { + SendGroupsChanged<ServiceGroup>(checkable, service->GetGroups(), nullptr); + } else { + SendGroupsChanged<HostGroup>(checkable, host->GetGroups(), nullptr); + } + + return; + } + + if (type == TimePeriod::TypeInstance) { + TimePeriod::Ptr timeperiod = static_pointer_cast<TimePeriod>(object); + SendTimePeriodRangesChanged(timeperiod, timeperiod->GetRanges(), nullptr); + SendTimePeriodIncludesChanged(timeperiod, timeperiod->GetIncludes(), nullptr); + SendTimePeriodExcludesChanged(timeperiod, timeperiod->GetExcludes(), nullptr); + return; + } + + if (type == User::TypeInstance) { + User::Ptr user = static_pointer_cast<User>(object); + SendGroupsChanged<UserGroup>(user, user->GetGroups(), nullptr); + return; + } + + if (type == Notification::TypeInstance) { + Notification::Ptr notification = static_pointer_cast<Notification>(object); + SendNotificationUsersChanged(notification, notification->GetUsersRaw(), nullptr); + SendNotificationUserGroupsChanged(notification, notification->GetUserGroupsRaw(), nullptr); + return; + } + + if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance) { + Command::Ptr command = static_pointer_cast<Command>(object); + SendCommandArgumentsChanged(command, command->GetArguments(), nullptr); + SendCommandEnvChanged(command, command->GetEnv(), nullptr); + return; + } +} + +static inline +unsigned short GetPreviousState(const Checkable::Ptr& checkable, const Service::Ptr& service, StateType type) +{ + auto phs ((type == StateTypeHard ? checkable->GetLastHardStatesRaw() : checkable->GetLastSoftStatesRaw()) % 100u); + + if (service) { + return phs; + } else { + return phs == 99 ? phs : Host::CalculateState(ServiceState(phs)); + } +} + +void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type) +{ + if (!GetActive()) { + return; + } + + Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object); + if (!checkable) + return; + + if (!cr) + return; + + Host::Ptr host; + Service::Ptr service; + + tie(host, service) = GetHostService(checkable); + + UpdateState(checkable, StateUpdate::RuntimeOnly); + + int hard_state; + if (!cr) { + hard_state = 99; + } else { + hard_state = service ? Convert::ToLong(service->GetLastHardState()) : Convert::ToLong(host->GetLastHardState()); + } + + auto eventTime (cr->GetExecutionEnd()); + auto eventTs (TimestampToMilliseconds(eventTime)); + + Array::Ptr rawId = new Array({m_EnvironmentId, object->GetName()}); + rawId->Add(eventTs); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:state", "*", + "id", HashValue(rawId), + "environment_id", m_EnvironmentId, + "host_id", GetObjectIdentifier(host), + "state_type", Convert::ToString(type), + "soft_state", Convert::ToString(cr ? service ? Convert::ToLong(cr->GetState()) : Convert::ToLong(Host::CalculateState(cr->GetState())) : 99), + "hard_state", Convert::ToString(hard_state), + "check_attempt", Convert::ToString(checkable->GetCheckAttempt()), + "previous_soft_state", Convert::ToString(GetPreviousState(checkable, service, StateTypeSoft)), + "previous_hard_state", Convert::ToString(GetPreviousState(checkable, service, StateTypeHard)), + "max_check_attempts", Convert::ToString(checkable->GetMaxCheckAttempts()), + "event_time", Convert::ToString(eventTs), + "event_id", CalcEventID("state_change", object, eventTime), + "event_type", "state_change" + }); + + if (cr) { + auto output (cr->GetOutput()); + auto pos (output.Find("\n")); + + if (pos != String::NPos) { + auto longOutput (output.SubStr(pos + 1u)); + output.erase(output.Begin() + pos, output.End()); + + xAdd.emplace_back("long_output"); + xAdd.emplace_back(Utility::ValidateUTF8(std::move(longOutput))); + } + + xAdd.emplace_back("output"); + xAdd.emplace_back(Utility::ValidateUTF8(std::move(output))); + xAdd.emplace_back("check_source"); + xAdd.emplace_back(cr->GetCheckSource()); + xAdd.emplace_back("scheduling_source"); + xAdd.emplace_back(cr->GetSchedulingSource()); + } + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::SendSentNotification( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, + NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text, double sendTime +) +{ + if (!GetActive()) { + return; + } + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + auto finalText = text; + if (finalText == "" && cr) { + finalText = cr->GetOutput(); + } + + auto usersAmount (users.size()); + auto sendTs (TimestampToMilliseconds(sendTime)); + + Array::Ptr rawId = new Array({m_EnvironmentId, notification->GetName()}); + rawId->Add(GetNotificationTypeByEnum(type)); + rawId->Add(sendTs); + + auto notificationHistoryId (HashValue(rawId)); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:notification", "*", + "id", notificationHistoryId, + "environment_id", m_EnvironmentId, + "notification_id", GetObjectIdentifier(notification), + "host_id", GetObjectIdentifier(host), + "type", Convert::ToString(type), + "state", Convert::ToString(cr ? service ? Convert::ToLong(cr->GetState()) : Convert::ToLong(Host::CalculateState(cr->GetState())) : 99), + "previous_hard_state", Convert::ToString(cr ? Convert::ToLong(service ? cr->GetPreviousHardState() : Host::CalculateState(cr->GetPreviousHardState())) : 99), + "author", Utility::ValidateUTF8(author), + "text", Utility::ValidateUTF8(finalText), + "users_notified", Convert::ToString(usersAmount), + "send_time", Convert::ToString(sendTs), + "event_id", CalcEventID("notification", notification, sendTime, type), + "event_type", "notification" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + if (!users.empty()) { + Array::Ptr users_notified = new Array(); + for (const User::Ptr& user : users) { + users_notified->Add(GetObjectIdentifier(user)); + } + xAdd.emplace_back("users_notified_ids"); + xAdd.emplace_back(JsonEncode(users_notified)); + } + + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) +{ + if (!GetActive()) { + return; + } + + SendConfigUpdate(downtime, true); + + auto checkable (downtime->GetCheckable()); + auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + /* Update checkable state as in_downtime may have changed. */ + UpdateState(checkable, StateUpdate::Full); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:downtime", "*", + "downtime_id", GetObjectIdentifier(downtime), + "environment_id", m_EnvironmentId, + "host_id", GetObjectIdentifier(host), + "entry_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEntryTime())), + "author", Utility::ValidateUTF8(downtime->GetAuthor()), + "comment", Utility::ValidateUTF8(downtime->GetComment()), + "is_flexible", Convert::ToString((unsigned short)!downtime->GetFixed()), + "flexible_duration", Convert::ToString(TimestampToMilliseconds(std::max(0.0, downtime->GetDuration()))), + "scheduled_start_time", Convert::ToString(TimestampToMilliseconds(downtime->GetStartTime())), + "scheduled_end_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime())), + "has_been_cancelled", Convert::ToString((unsigned short)downtime->GetWasCancelled()), + "trigger_time", Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime())), + "cancel_time", Convert::ToString(TimestampToMilliseconds(downtime->GetRemoveTime())), + "event_id", CalcEventID("downtime_start", downtime), + "event_type", "downtime_start" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + if (triggeredBy) { + xAdd.emplace_back("triggered_by_id"); + xAdd.emplace_back(GetObjectIdentifier(triggeredBy)); + } + + if (downtime->GetFixed()) { + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetStartTime()))); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime()))); + } else { + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime()))); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime() + std::max(0.0, downtime->GetDuration())))); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + auto parent (Downtime::GetByName(downtime->GetParent())); + + if (parent) { + xAdd.emplace_back("parent_id"); + xAdd.emplace_back(GetObjectIdentifier(parent)); + } + + auto scheduledBy (downtime->GetScheduledBy()); + + if (!scheduledBy.IsEmpty()) { + xAdd.emplace_back("scheduled_by"); + xAdd.emplace_back(scheduledBy); + } + + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) +{ + if (!GetActive()) { + return; + } + + auto checkable (downtime->GetCheckable()); + auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + // Downtime never got triggered (didn't send "downtime_start") so we don't want to send "downtime_end" + if (downtime->GetTriggerTime() == 0) + return; + + /* Update checkable state as in_downtime may have changed. */ + UpdateState(checkable, StateUpdate::Full); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:downtime", "*", + "downtime_id", GetObjectIdentifier(downtime), + "environment_id", m_EnvironmentId, + "host_id", GetObjectIdentifier(host), + "entry_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEntryTime())), + "author", Utility::ValidateUTF8(downtime->GetAuthor()), + "cancelled_by", Utility::ValidateUTF8(downtime->GetRemovedBy()), + "comment", Utility::ValidateUTF8(downtime->GetComment()), + "is_flexible", Convert::ToString((unsigned short)!downtime->GetFixed()), + "flexible_duration", Convert::ToString(TimestampToMilliseconds(std::max(0.0, downtime->GetDuration()))), + "scheduled_start_time", Convert::ToString(TimestampToMilliseconds(downtime->GetStartTime())), + "scheduled_end_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime())), + "has_been_cancelled", Convert::ToString((unsigned short)downtime->GetWasCancelled()), + "trigger_time", Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime())), + "cancel_time", Convert::ToString(TimestampToMilliseconds(downtime->GetRemoveTime())), + "event_id", CalcEventID("downtime_end", downtime), + "event_type", "downtime_end" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + if (triggeredBy) { + xAdd.emplace_back("triggered_by_id"); + xAdd.emplace_back(GetObjectIdentifier(triggeredBy)); + } + + if (downtime->GetFixed()) { + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetStartTime()))); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime()))); + } else { + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime()))); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime() + std::max(0.0, downtime->GetDuration())))); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + auto parent (Downtime::GetByName(downtime->GetParent())); + + if (parent) { + xAdd.emplace_back("parent_id"); + xAdd.emplace_back(GetObjectIdentifier(parent)); + } + + auto scheduledBy (downtime->GetScheduledBy()); + + if (!scheduledBy.IsEmpty()) { + xAdd.emplace_back("scheduled_by"); + xAdd.emplace_back(scheduledBy); + } + + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::SendAddedComment(const Comment::Ptr& comment) +{ + if (comment->GetEntryType() != CommentUser || !GetActive()) + return; + + auto checkable (comment->GetCheckable()); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:comment", "*", + "comment_id", GetObjectIdentifier(comment), + "environment_id", m_EnvironmentId, + "host_id", GetObjectIdentifier(host), + "entry_time", Convert::ToString(TimestampToMilliseconds(comment->GetEntryTime())), + "author", Utility::ValidateUTF8(comment->GetAuthor()), + "comment", Utility::ValidateUTF8(comment->GetText()), + "entry_type", Convert::ToString(comment->GetEntryType()), + "is_persistent", Convert::ToString((unsigned short)comment->GetPersistent()), + "is_sticky", Convert::ToString((unsigned short)comment->GetSticky()), + "event_id", CalcEventID("comment_add", comment), + "event_type", "comment_add" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + { + auto expireTime (comment->GetExpireTime()); + + if (expireTime > 0) { + xAdd.emplace_back("expire_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(expireTime))); + } + } + + m_HistoryBulker.ProduceOne(std::move(xAdd)); + UpdateState(checkable, StateUpdate::Full); +} + +void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) +{ + if (comment->GetEntryType() != CommentUser || !GetActive()) { + return; + } + + double removeTime = comment->GetRemoveTime(); + bool wasRemoved = removeTime > 0; + + double expireTime = comment->GetExpireTime(); + bool hasExpireTime = expireTime > 0; + bool isExpired = hasExpireTime && expireTime <= Utility::GetTime(); + + if (!wasRemoved && !isExpired) { + /* The comment object disappeared for no apparent reason, most likely because it simply was deleted instead + * of using the proper remove-comment API action. In this case, information that should normally be set is + * missing and a proper history event cannot be generated. + */ + return; + } + + auto checkable (comment->GetCheckable()); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:comment", "*", + "comment_id", GetObjectIdentifier(comment), + "environment_id", m_EnvironmentId, + "host_id", GetObjectIdentifier(host), + "entry_time", Convert::ToString(TimestampToMilliseconds(comment->GetEntryTime())), + "author", Utility::ValidateUTF8(comment->GetAuthor()), + "comment", Utility::ValidateUTF8(comment->GetText()), + "entry_type", Convert::ToString(comment->GetEntryType()), + "is_persistent", Convert::ToString((unsigned short)comment->GetPersistent()), + "is_sticky", Convert::ToString((unsigned short)comment->GetSticky()), + "event_id", CalcEventID("comment_remove", comment), + "event_type", "comment_remove" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + if (wasRemoved) { + xAdd.emplace_back("remove_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(removeTime))); + xAdd.emplace_back("has_been_removed"); + xAdd.emplace_back("1"); + xAdd.emplace_back("removed_by"); + xAdd.emplace_back(Utility::ValidateUTF8(comment->GetRemovedBy())); + } else { + xAdd.emplace_back("has_been_removed"); + xAdd.emplace_back("0"); + } + + if (hasExpireTime) { + xAdd.emplace_back("expire_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(expireTime))); + } + + m_HistoryBulker.ProduceOne(std::move(xAdd)); + UpdateState(checkable, StateUpdate::Full); +} + +void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange) +{ + if (!GetActive()) { + return; + } + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:flapping", "*", + "environment_id", m_EnvironmentId, + "host_id", GetObjectIdentifier(host), + "flapping_threshold_low", Convert::ToString(checkable->GetFlappingThresholdLow()), + "flapping_threshold_high", Convert::ToString(checkable->GetFlappingThresholdHigh()) + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + long long startTime; + + if (checkable->IsFlapping()) { + startTime = TimestampToMilliseconds(changeTime); + + xAdd.emplace_back("event_type"); + xAdd.emplace_back("flapping_start"); + xAdd.emplace_back("percent_state_change_start"); + xAdd.emplace_back(Convert::ToString(checkable->GetFlappingCurrent())); + } else { + startTime = TimestampToMilliseconds(flappingLastChange); + + xAdd.emplace_back("event_type"); + xAdd.emplace_back("flapping_end"); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(changeTime))); + xAdd.emplace_back("percent_state_change_end"); + xAdd.emplace_back(Convert::ToString(checkable->GetFlappingCurrent())); + } + + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(startTime)); + xAdd.emplace_back("event_id"); + xAdd.emplace_back(CalcEventID(checkable->IsFlapping() ? "flapping_start" : "flapping_end", checkable, startTime)); + xAdd.emplace_back("id"); + xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), startTime}))); + + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + if (checkable->GetEnableActiveChecks()) { + m_Rcon->FireAndForgetQuery( + { + "ZADD", + dynamic_pointer_cast<Service>(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host", + Convert::ToString(checkable->GetNextUpdate()), + GetObjectIdentifier(checkable) + }, + Prio::CheckResult + ); + } else { + m_Rcon->FireAndForgetQuery( + { + "ZREM", + dynamic_pointer_cast<Service>(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host", + GetObjectIdentifier(checkable) + }, + Prio::CheckResult + ); + } +} + +void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry) +{ + if (!GetActive()) { + return; + } + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + /* Update checkable state as is_acknowledged may have changed. */ + UpdateState(checkable, StateUpdate::Full); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:acknowledgement", "*", + "environment_id", m_EnvironmentId, + "host_id", GetObjectIdentifier(host), + "event_type", "ack_set", + "author", author, + "comment", comment, + "is_sticky", Convert::ToString((unsigned short)(type == AcknowledgementSticky)), + "is_persistent", Convert::ToString((unsigned short)persistent) + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + if (expiry > 0) { + xAdd.emplace_back("expire_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(expiry))); + } + + long long setTime = TimestampToMilliseconds(changeTime); + + xAdd.emplace_back("set_time"); + xAdd.emplace_back(Convert::ToString(setTime)); + xAdd.emplace_back("event_id"); + xAdd.emplace_back(CalcEventID("ack_set", checkable, setTime)); + xAdd.emplace_back("id"); + xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), setTime}))); + + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const String& removedBy, double changeTime, double ackLastChange) +{ + if (!GetActive()) { + return; + } + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + /* Update checkable state as is_acknowledged may have changed. */ + UpdateState(checkable, StateUpdate::Full); + + std::vector<String> xAdd ({ + "XADD", "icinga:history:stream:acknowledgement", "*", + "environment_id", m_EnvironmentId, + "host_id", GetObjectIdentifier(host), + "clear_time", Convert::ToString(TimestampToMilliseconds(changeTime)), + "event_type", "ack_clear" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + long long setTime = TimestampToMilliseconds(ackLastChange); + + xAdd.emplace_back("set_time"); + xAdd.emplace_back(Convert::ToString(setTime)); + xAdd.emplace_back("event_id"); + xAdd.emplace_back(CalcEventID("ack_clear", checkable, setTime)); + xAdd.emplace_back("id"); + xAdd.emplace_back(HashValue(new Array({m_EnvironmentId, checkable->GetName(), setTime}))); + + if (!removedBy.IsEmpty()) { + xAdd.emplace_back("cleared_by"); + xAdd.emplace_back(removedBy); + } + + m_HistoryBulker.ProduceOne(std::move(xAdd)); +} + +void IcingaDB::ForwardHistoryEntries() +{ + using clock = std::chrono::steady_clock; + + const std::chrono::seconds logInterval (10); + auto nextLog (clock::now() + logInterval); + + auto logPeriodically ([this, logInterval, &nextLog]() { + if (clock::now() > nextLog) { + nextLog += logInterval; + + auto size (m_HistoryBulker.Size()); + + Log(size > m_HistoryBulker.GetBulkSize() ? LogInformation : LogNotice, "IcingaDB") + << "Pending history queries: " << size; + } + }); + + for (;;) { + logPeriodically(); + + auto haystack (m_HistoryBulker.ConsumeMany()); + + if (haystack.empty()) { + if (!GetActive()) { + break; + } + + continue; + } + + uintmax_t attempts = 0; + + auto logFailure ([&haystack, &attempts](const char* err = nullptr) { + Log msg (LogNotice, "IcingaDB"); + + msg << "history: " << haystack.size() << " queries failed temporarily (attempt #" << ++attempts << ")"; + + if (err) { + msg << ": " << err; + } + }); + + for (;;) { + logPeriodically(); + + if (m_Rcon && m_Rcon->IsConnected()) { + try { + m_Rcon->GetResultsOfQueries(haystack, Prio::History, {0, 0, haystack.size()}); + break; + } catch (const std::exception& ex) { + logFailure(ex.what()); + } catch (...) { + logFailure(); + } + } else { + logFailure("not connected to Redis"); + } + + if (!GetActive()) { + Log(LogCritical, "IcingaDB") << "history: " << haystack.size() << " queries failed (attempt #" << attempts + << ") while we're about to shut down. Giving up and discarding additional " + << m_HistoryBulker.Size() << " queued history queries."; + + return; + } + + Utility::Sleep(2); + } + } +} + +void IcingaDB::SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + std::vector<Value> deletedUsers = GetArrayDeletedValues(oldValues, newValues); + + for (const auto& userName : deletedUsers) { + String id = HashValue(new Array({m_EnvironmentId, "user", userName, notification->GetName()})); + DeleteRelationship(id, "notification:user"); + DeleteRelationship(id, "notification:recipient"); + } +} + +void IcingaDB::SendNotificationUserGroupsChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + std::vector<Value> deletedUserGroups = GetArrayDeletedValues(oldValues, newValues); + + for (const auto& userGroupName : deletedUserGroups) { + UserGroup::Ptr userGroup = UserGroup::GetByName(userGroupName); + String id = HashValue(new Array({m_EnvironmentId, "usergroup", userGroupName, notification->GetName()})); + DeleteRelationship(id, "notification:usergroup"); + DeleteRelationship(id, "notification:recipient"); + + for (const User::Ptr& user : userGroup->GetMembers()) { + String userId = HashValue(new Array({m_EnvironmentId, "usergroupuser", user->GetName(), userGroupName, notification->GetName()})); + DeleteRelationship(userId, "notification:recipient"); + } + } +} + +void IcingaDB::SendTimePeriodRangesChanged(const TimePeriod::Ptr& timeperiod, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + std::vector<String> deletedKeys = GetDictionaryDeletedKeys(oldValues, newValues); + String typeName = GetLowerCaseTypeNameDB(timeperiod); + + for (const auto& rangeKey : deletedKeys) { + String id = HashValue(new Array({m_EnvironmentId, rangeKey, oldValues->Get(rangeKey), timeperiod->GetName()})); + DeleteRelationship(id, "timeperiod:range"); + } +} + +void IcingaDB::SendTimePeriodIncludesChanged(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + std::vector<Value> deletedIncludes = GetArrayDeletedValues(oldValues, newValues); + + for (const auto& includeName : deletedIncludes) { + String id = HashValue(new Array({m_EnvironmentId, includeName, timeperiod->GetName()})); + DeleteRelationship(id, "timeperiod:override:include"); + } +} + +void IcingaDB::SendTimePeriodExcludesChanged(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + std::vector<Value> deletedExcludes = GetArrayDeletedValues(oldValues, newValues); + + for (const auto& excludeName : deletedExcludes) { + String id = HashValue(new Array({m_EnvironmentId, excludeName, timeperiod->GetName()})); + DeleteRelationship(id, "timeperiod:override:exclude"); + } +} + +template<typename T> +void IcingaDB::SendGroupsChanged(const ConfigObject::Ptr& object, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + std::vector<Value> deletedGroups = GetArrayDeletedValues(oldValues, newValues); + String typeName = GetLowerCaseTypeNameDB(object); + + for (const auto& groupName : deletedGroups) { + typename T::Ptr group = ConfigObject::GetObject<T>(groupName); + String id = HashValue(new Array({m_EnvironmentId, group->GetName(), object->GetName()})); + DeleteRelationship(id, typeName + "group:member"); + + if (std::is_same<T, UserGroup>::value) { + UserGroup::Ptr userGroup = dynamic_pointer_cast<UserGroup>(group); + + for (const auto& notification : userGroup->GetNotifications()) { + String userId = HashValue(new Array({m_EnvironmentId, "usergroupuser", object->GetName(), groupName, notification->GetName()})); + DeleteRelationship(userId, "notification:recipient"); + } + } + } +} + +void IcingaDB::SendCommandEnvChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + std::vector<String> deletedKeys = GetDictionaryDeletedKeys(oldValues, newValues); + String typeName = GetLowerCaseTypeNameDB(command); + + for (const auto& envvarKey : deletedKeys) { + String id = HashValue(new Array({m_EnvironmentId, envvarKey, command->GetName()})); + DeleteRelationship(id, typeName + ":envvar", true); + } +} + +void IcingaDB::SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + std::vector<String> deletedKeys = GetDictionaryDeletedKeys(oldValues, newValues); + String typeName = GetLowerCaseTypeNameDB(command); + + for (const auto& argumentKey : deletedKeys) { + String id = HashValue(new Array({m_EnvironmentId, argumentKey, command->GetName()})); + DeleteRelationship(id, typeName + ":argument", true); + } +} + +void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { + if (m_IndexedTypes.find(object->GetReflectionType().get()) == m_IndexedTypes.end()) { + return; + } + + if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + return; + } + + Dictionary::Ptr oldVars = SerializeVars(oldValues); + Dictionary::Ptr newVars = SerializeVars(newValues); + + std::vector<String> deletedVars = GetDictionaryDeletedKeys(oldVars, newVars); + String typeName = GetLowerCaseTypeNameDB(object); + + for (const auto& varId : deletedVars) { + String id = HashValue(new Array({m_EnvironmentId, varId, object->GetName()})); + DeleteRelationship(id, typeName + ":customvar"); + } +} + +Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable) +{ + Dictionary::Ptr attrs = new Dictionary(); + + Host::Ptr host; + Service::Ptr service; + + tie(host, service) = GetHostService(checkable); + + String id = GetObjectIdentifier(checkable); + + /* + * As there is a 1:1 relationship between host and host state, the host ID ('host_id') + * is also used as the host state ID ('id'). These are duplicated to 1) avoid having + * special handling for this in Icinga DB and 2) to have both a primary key and a foreign key + * in the SQL database in the end. In the database 'host_id' ends up as foreign key 'host_state.host_id' + * referring to 'host.id' while 'id' ends up as the primary key 'host_state.id'. This also applies for service. + */ + attrs->Set("id", id); + attrs->Set("environment_id", m_EnvironmentId); + attrs->Set("state_type", checkable->HasBeenChecked() ? checkable->GetStateType() : StateTypeHard); + + // TODO: last_hard/soft_state should be "previous". + if (service) { + attrs->Set("service_id", id); + auto state = service->HasBeenChecked() ? service->GetState() : 99; + attrs->Set("soft_state", state); + attrs->Set("hard_state", service->HasBeenChecked() ? service->GetLastHardState() : 99); + attrs->Set("severity", service->GetSeverity()); + attrs->Set("host_id", GetObjectIdentifier(host)); + } else { + attrs->Set("host_id", id); + auto state = host->HasBeenChecked() ? host->GetState() : 99; + attrs->Set("soft_state", state); + attrs->Set("hard_state", host->HasBeenChecked() ? host->GetLastHardState() : 99); + attrs->Set("severity", host->GetSeverity()); + } + + attrs->Set("previous_soft_state", GetPreviousState(checkable, service, StateTypeSoft)); + attrs->Set("previous_hard_state", GetPreviousState(checkable, service, StateTypeHard)); + attrs->Set("check_attempt", checkable->GetCheckAttempt()); + + attrs->Set("is_active", checkable->IsActive()); + + CheckResult::Ptr cr = checkable->GetLastCheckResult(); + + if (cr) { + String rawOutput = cr->GetOutput(); + if (!rawOutput.IsEmpty()) { + size_t lineBreak = rawOutput.Find("\n"); + String output = rawOutput.SubStr(0, lineBreak); + if (!output.IsEmpty()) + attrs->Set("output", rawOutput.SubStr(0, lineBreak)); + + if (lineBreak > 0 && lineBreak != String::NPos) { + String longOutput = rawOutput.SubStr(lineBreak+1, rawOutput.GetLength()); + if (!longOutput.IsEmpty()) + attrs->Set("long_output", longOutput); + } + } + + String perfData = PluginUtility::FormatPerfdata(cr->GetPerformanceData()); + if (!perfData.IsEmpty()) + attrs->Set("performance_data", perfData); + + String normedPerfData = PluginUtility::FormatPerfdata(cr->GetPerformanceData(), true); + if (!normedPerfData.IsEmpty()) + attrs->Set("normalized_performance_data", normedPerfData); + + if (!cr->GetCommand().IsEmpty()) + attrs->Set("check_commandline", FormatCommandLine(cr->GetCommand())); + attrs->Set("execution_time", TimestampToMilliseconds(fmax(0.0, cr->CalculateExecutionTime()))); + attrs->Set("latency", TimestampToMilliseconds(cr->CalculateLatency())); + attrs->Set("check_source", cr->GetCheckSource()); + attrs->Set("scheduling_source", cr->GetSchedulingSource()); + } + + attrs->Set("is_problem", checkable->GetProblem()); + attrs->Set("is_handled", checkable->GetHandled()); + attrs->Set("is_reachable", checkable->IsReachable()); + attrs->Set("is_flapping", checkable->IsFlapping()); + + attrs->Set("is_acknowledged", checkable->GetAcknowledgement()); + if (checkable->IsAcknowledged()) { + Timestamp entry = 0; + Comment::Ptr AckComment; + for (const Comment::Ptr& c : checkable->GetComments()) { + if (c->GetEntryType() == CommentAcknowledgement) { + if (c->GetEntryTime() > entry) { + entry = c->GetEntryTime(); + AckComment = c; + } + } + } + if (AckComment != nullptr) { + attrs->Set("acknowledgement_comment_id", GetObjectIdentifier(AckComment)); + } + } + + { + auto lastComment (checkable->GetLastComment()); + + if (lastComment) { + attrs->Set("last_comment_id", GetObjectIdentifier(lastComment)); + } + } + + attrs->Set("in_downtime", checkable->IsInDowntime()); + + if (checkable->GetCheckTimeout().IsEmpty()) + attrs->Set("check_timeout", TimestampToMilliseconds(checkable->GetCheckCommand()->GetTimeout())); + else + attrs->Set("check_timeout", TimestampToMilliseconds(checkable->GetCheckTimeout())); + + long long lastCheck = TimestampToMilliseconds(checkable->GetLastCheck()); + if (lastCheck > 0) + attrs->Set("last_update", lastCheck); + + attrs->Set("last_state_change", TimestampToMilliseconds(checkable->GetLastStateChange())); + attrs->Set("next_check", TimestampToMilliseconds(checkable->GetNextCheck())); + attrs->Set("next_update", TimestampToMilliseconds(checkable->GetNextUpdate())); + + return attrs; +} + +std::vector<String> +IcingaDB::UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, + const String& typeNameOverride) +{ + Type::Ptr type = object->GetReflectionType(); + Dictionary::Ptr attrs(new Dictionary); + + for (int fid = 0; fid < type->GetFieldCount(); fid++) { + Field field = type->GetFieldInfo(fid); + + if ((field.Attributes & fieldType) == 0) + continue; + + Value val = object->GetField(fid); + + /* hide attributes which shouldn't be user-visible */ + if (field.Attributes & FANoUserView) + continue; + + /* hide internal navigation fields */ + if (field.Attributes & FANavigation && !(field.Attributes & (FAConfig | FAState))) + continue; + + attrs->Set(field.Name, Serialize(val)); + } + + /* Downtimes require in_effect, which is not an attribute */ + Downtime::Ptr downtime = dynamic_pointer_cast<Downtime>(object); + if (downtime) { + attrs->Set("in_effect", Serialize(downtime->IsInEffect())); + attrs->Set("trigger_time", Serialize(TimestampToMilliseconds(downtime->GetTriggerTime()))); + } + + + /* Use the name checksum as unique key. */ + String typeName = type->GetName().ToLower(); + if (!typeNameOverride.IsEmpty()) + typeName = typeNameOverride.ToLower(); + + return {GetObjectIdentifier(object), JsonEncode(attrs)}; + //m_Rcon->FireAndForgetQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); +} + +void IcingaDB::StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type) +{ + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendStateChange(object, cr, type); + } +} + +void IcingaDB::ReachabilityChangeHandler(const std::set<Checkable::Ptr>& children) +{ + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + for (auto& checkable : children) { + rw->UpdateState(checkable, StateUpdate::Full); + } + } +} + +void IcingaDB::VersionChangedHandler(const ConfigObject::Ptr& object) +{ + Type::Ptr type = object->GetReflectionType(); + + if (m_IndexedTypes.find(type.get()) == m_IndexedTypes.end()) { + return; + } + + if (object->IsActive()) { + // Create or update the object config + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + if (rw) + rw->SendConfigUpdate(object, true); + } + } else if (!object->IsActive() && + object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp + // Delete object config + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + if (rw) + rw->SendConfigDelete(object); + } + } +} + +void IcingaDB::DowntimeStartedHandler(const Downtime::Ptr& downtime) +{ + for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendStartedDowntime(downtime); + } +} + +void IcingaDB::DowntimeRemovedHandler(const Downtime::Ptr& downtime) +{ + for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendRemovedDowntime(downtime); + } +} + +void IcingaDB::NotificationSentToAllUsersHandler( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, + NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text +) +{ + auto rws (ConfigType::GetObjectsByType<IcingaDB>()); + auto sendTime (notification->GetLastNotification()); + + if (!rws.empty()) { + for (auto& rw : rws) { + rw->SendSentNotification(notification, checkable, users, type, cr, author, text, sendTime); + } + } +} + +void IcingaDB::CommentAddedHandler(const Comment::Ptr& comment) +{ + for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendAddedComment(comment); + } +} + +void IcingaDB::CommentRemovedHandler(const Comment::Ptr& comment) +{ + for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendRemovedComment(comment); + } +} + +void IcingaDB::FlappingChangeHandler(const Checkable::Ptr& checkable, double changeTime) +{ + auto flappingLastChange (checkable->GetFlappingLastChange()); + + for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendFlappingChange(checkable, changeTime, flappingLastChange); + } +} + +void IcingaDB::NewCheckResultHandler(const Checkable::Ptr& checkable) +{ + for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->UpdateState(checkable, StateUpdate::Volatile); + rw->SendNextUpdate(checkable); + } +} + +void IcingaDB::NextCheckUpdatedHandler(const Checkable::Ptr& checkable) +{ + for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->UpdateState(checkable, StateUpdate::Volatile); + rw->SendNextUpdate(checkable); + } +} + +void IcingaDB::HostProblemChangedHandler(const Service::Ptr& service) { + for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + /* Host state changes affect is_handled and severity of services. */ + rw->UpdateState(service, StateUpdate::Full); + } +} + +void IcingaDB::AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry) +{ + auto rws (ConfigType::GetObjectsByType<IcingaDB>()); + + if (!rws.empty()) { + for (auto& rw : rws) { + rw->SendAcknowledgementSet(checkable, author, comment, type, persistent, changeTime, expiry); + } + } +} + +void IcingaDB::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const String& removedBy, double changeTime) +{ + auto rws (ConfigType::GetObjectsByType<IcingaDB>()); + + if (!rws.empty()) { + auto rb (Shared<String>::Make(removedBy)); + auto ackLastChange (checkable->GetAcknowledgementLastChange()); + + for (auto& rw : rws) { + rw->SendAcknowledgementCleared(checkable, *rb, changeTime, ackLastChange); + } + } +} + +void IcingaDB::NotificationUsersChangedHandler(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendNotificationUsersChanged(notification, oldValues, newValues); + } +} + +void IcingaDB::NotificationUserGroupsChangedHandler(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendNotificationUserGroupsChanged(notification, oldValues, newValues); + } +} + +void IcingaDB::TimePeriodRangesChangedHandler(const TimePeriod::Ptr& timeperiod, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendTimePeriodRangesChanged(timeperiod, oldValues, newValues); + } +} + +void IcingaDB::TimePeriodIncludesChangedHandler(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendTimePeriodIncludesChanged(timeperiod, oldValues, newValues); + } +} + +void IcingaDB::TimePeriodExcludesChangedHandler(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendTimePeriodExcludesChanged(timeperiod, oldValues, newValues); + } +} + +void IcingaDB::UserGroupsChangedHandler(const User::Ptr& user, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendGroupsChanged<UserGroup>(user, oldValues, newValues); + } +} + +void IcingaDB::HostGroupsChangedHandler(const Host::Ptr& host, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendGroupsChanged<HostGroup>(host, oldValues, newValues); + } +} + +void IcingaDB::ServiceGroupsChangedHandler(const Service::Ptr& service, const Array::Ptr& oldValues, const Array::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendGroupsChanged<ServiceGroup>(service, oldValues, newValues); + } +} + +void IcingaDB::CommandEnvChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendCommandEnvChanged(command, oldValues, newValues); + } +} + +void IcingaDB::CommandArgumentsChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendCommandArgumentsChanged(command, oldValues, newValues); + } +} + +void IcingaDB::CustomVarsChangedHandler(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) { + rw->SendCustomVarsChanged(object, oldValues, newValues); + } +} + +void IcingaDB::DeleteRelationship(const String& id, const String& redisKeyWithoutPrefix, bool hasChecksum) { + Log(LogNotice, "IcingaDB") << "Deleting relationship '" << redisKeyWithoutPrefix << " -> '" << id << "'"; + + String redisKey = m_PrefixConfigObject + redisKeyWithoutPrefix; + + std::vector<std::vector<String>> queries; + + if (hasChecksum) { + queries.push_back({"HDEL", m_PrefixConfigCheckSum + redisKeyWithoutPrefix, id}); + } + + queries.push_back({"HDEL", redisKey, id}); + queries.push_back({ + "XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*", + "redis_key", redisKey, "id", id, "runtime_type", "delete" + }); + + m_Rcon->FireAndForgetQueries(queries, Prio::Config); +} diff --git a/lib/icingadb/icingadb-stats.cpp b/lib/icingadb/icingadb-stats.cpp new file mode 100644 index 0000000..476756b --- /dev/null +++ b/lib/icingadb/icingadb-stats.cpp @@ -0,0 +1,54 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "base/application.hpp" +#include "base/json.hpp" +#include "base/logger.hpp" +#include "base/serializer.hpp" +#include "base/statsfunction.hpp" +#include "base/convert.hpp" + +using namespace icinga; + +Dictionary::Ptr IcingaDB::GetStats() +{ + Dictionary::Ptr stats = new Dictionary(); + + //TODO: Figure out if more stats can be useful here. + Namespace::Ptr statsFunctions = ScriptGlobal::Get("StatsFunctions", &Empty); + + if (!statsFunctions) + Dictionary::Ptr(); + + ObjectLock olock(statsFunctions); + + for (auto& kv : statsFunctions) + { + Function::Ptr func = kv.second.Val; + + if (!func) + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid status function name.")); + + Dictionary::Ptr status = new Dictionary(); + Array::Ptr perfdata = new Array(); + func->Invoke({ status, perfdata }); + + stats->Set(kv.first, new Dictionary({ + { "status", status }, + { "perfdata", Serialize(perfdata, FAState) } + })); + } + + typedef Dictionary::Ptr DP; + DP app = DP(DP(DP(stats->Get("IcingaApplication"))->Get("status"))->Get("icingaapplication"))->Get("app"); + + app->Set("program_start", TimestampToMilliseconds(Application::GetStartTime())); + + auto localEndpoint (Endpoint::GetLocalEndpoint()); + if (localEndpoint) { + app->Set("endpoint_id", GetObjectIdentifier(localEndpoint)); + } + + return stats; +} + diff --git a/lib/icingadb/icingadb-utility.cpp b/lib/icingadb/icingadb-utility.cpp new file mode 100644 index 0000000..b247ed8 --- /dev/null +++ b/lib/icingadb/icingadb-utility.cpp @@ -0,0 +1,319 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "base/configtype.hpp" +#include "base/object-packer.hpp" +#include "base/logger.hpp" +#include "base/serializer.hpp" +#include "base/tlsutility.hpp" +#include "base/initialize.hpp" +#include "base/objectlock.hpp" +#include "base/array.hpp" +#include "base/scriptglobal.hpp" +#include "base/convert.hpp" +#include "base/json.hpp" +#include "icinga/customvarobject.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/notificationcommand.hpp" +#include "icinga/eventcommand.hpp" +#include "icinga/host.hpp" +#include <boost/algorithm/string.hpp> +#include <map> +#include <utility> +#include <vector> + +using namespace icinga; + +String IcingaDB::FormatCheckSumBinary(const String& str) +{ + char output[20*2+1]; + for (int i = 0; i < 20; i++) + sprintf(output + 2 * i, "%02x", str[i]); + + return output; +} + +String IcingaDB::FormatCommandLine(const Value& commandLine) +{ + String result; + if (commandLine.IsObjectType<Array>()) { + Array::Ptr args = commandLine; + bool first = true; + + ObjectLock olock(args); + for (const Value& arg : args) { + String token = "'" + Convert::ToString(arg) + "'"; + + if (first) + first = false; + else + result += String(1, ' '); + + result += token; + } + } else if (!commandLine.IsEmpty()) { + result = commandLine; + boost::algorithm::replace_all(result, "\'", "\\'"); + result = "'" + result + "'"; + } + + return result; +} + +String IcingaDB::GetObjectIdentifier(const ConfigObject::Ptr& object) +{ + String identifier = object->GetIcingadbIdentifier(); + if (identifier.IsEmpty()) { + identifier = HashValue(new Array({m_EnvironmentId, object->GetName()})); + object->SetIcingadbIdentifier(identifier); + } + + return identifier; +} + +/** + * Calculates a deterministic history event ID like SHA1(env, eventType, x...[, nt][, eventTime]) + * + * Where SHA1(env, x...) = GetObjectIdentifier(object) + */ +String IcingaDB::CalcEventID(const char* eventType, const ConfigObject::Ptr& object, double eventTime, NotificationType nt) +{ + Array::Ptr rawId = new Array({object->GetName()}); + rawId->Insert(0, m_EnvironmentId); + rawId->Insert(1, eventType); + + if (nt) { + rawId->Add(GetNotificationTypeByEnum(nt)); + } + + if (eventTime) { + rawId->Add(TimestampToMilliseconds(eventTime)); + } + + return HashValue(std::move(rawId)); +} + +static const std::set<String> metadataWhitelist ({"package", "source_location", "templates"}); + +/** + * Prepare custom vars for being written to Redis + * + * object.vars = { + * "disks": { + * "disk": {}, + * "disk /": { + * "disk_partitions": "/" + * } + * } + * } + * + * return { + * SHA1(PackObject([ + * EnvironmentId, + * "disks", + * { + * "disk": {}, + * "disk /": { + * "disk_partitions": "/" + * } + * } + * ])): { + * "environment_id": EnvironmentId, + * "name_checksum": SHA1("disks"), + * "name": "disks", + * "value": { + * "disk": {}, + * "disk /": { + * "disk_partitions": "/" + * } + * } + * } + * } + * + * @param Dictionary Config object with custom vars + * + * @return JSON-like data structure for Redis + */ +Dictionary::Ptr IcingaDB::SerializeVars(const Dictionary::Ptr& vars) +{ + if (!vars) + return nullptr; + + Dictionary::Ptr res = new Dictionary(); + + ObjectLock olock(vars); + + for (auto& kv : vars) { + res->Set( + SHA1(PackObject((Array::Ptr)new Array({m_EnvironmentId, kv.first, kv.second}))), + (Dictionary::Ptr)new Dictionary({ + {"environment_id", m_EnvironmentId}, + {"name_checksum", SHA1(kv.first)}, + {"name", kv.first}, + {"value", JsonEncode(kv.second)}, + }) + ); + } + + return res; +} + +const char* IcingaDB::GetNotificationTypeByEnum(NotificationType type) +{ + switch (type) { + case NotificationDowntimeStart: + return "downtime_start"; + case NotificationDowntimeEnd: + return "downtime_end"; + case NotificationDowntimeRemoved: + return "downtime_removed"; + case NotificationCustom: + return "custom"; + case NotificationAcknowledgement: + return "acknowledgement"; + case NotificationProblem: + return "problem"; + case NotificationRecovery: + return "recovery"; + case NotificationFlappingStart: + return "flapping_start"; + case NotificationFlappingEnd: + return "flapping_end"; + } + + VERIFY(!"Invalid notification type."); +} + +static const std::set<String> propertiesBlacklistEmpty; + +String IcingaDB::HashValue(const Value& value) +{ + return HashValue(value, propertiesBlacklistEmpty); +} + +String IcingaDB::HashValue(const Value& value, const std::set<String>& propertiesBlacklist, bool propertiesWhitelist) +{ + Value temp; + bool mutabl; + + Type::Ptr type = value.GetReflectionType(); + + if (ConfigObject::TypeInstance->IsAssignableFrom(type)) { + temp = Serialize(value, FAConfig); + mutabl = true; + } else { + temp = value; + mutabl = false; + } + + if (propertiesBlacklist.size() && temp.IsObject()) { + Dictionary::Ptr dict = dynamic_pointer_cast<Dictionary>((Object::Ptr)temp); + + if (dict) { + if (!mutabl) + dict = dict->ShallowClone(); + + ObjectLock olock(dict); + + if (propertiesWhitelist) { + auto current = dict->Begin(); + auto propertiesBlacklistEnd = propertiesBlacklist.end(); + + while (current != dict->End()) { + if (propertiesBlacklist.find(current->first) == propertiesBlacklistEnd) { + dict->Remove(current++); + } else { + ++current; + } + } + } else { + for (auto& property : propertiesBlacklist) + dict->Remove(property); + } + + if (!mutabl) + temp = dict; + } + } + + return SHA1(PackObject(temp)); +} + +String IcingaDB::GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj) +{ + return obj->GetReflectionType()->GetName().ToLower(); +} + +long long IcingaDB::TimestampToMilliseconds(double timestamp) { + return static_cast<long long>(timestamp * 1000); +} + +String IcingaDB::IcingaToStreamValue(const Value& value) +{ + switch (value.GetType()) { + case ValueBoolean: + return Convert::ToString(int(value)); + case ValueString: + return Utility::ValidateUTF8(value); + case ValueNumber: + case ValueEmpty: + return Convert::ToString(value); + default: + return JsonEncode(value); + } +} + +// Returns the items that exist in "arrayOld" but not in "arrayNew" +std::vector<Value> IcingaDB::GetArrayDeletedValues(const Array::Ptr& arrayOld, const Array::Ptr& arrayNew) { + std::vector<Value> deletedValues; + + if (!arrayOld) { + return deletedValues; + } + + if (!arrayNew) { + ObjectLock olock (arrayOld); + return std::vector<Value>(arrayOld->Begin(), arrayOld->End()); + } + + std::vector<Value> vectorOld; + { + ObjectLock olock (arrayOld); + vectorOld.assign(arrayOld->Begin(), arrayOld->End()); + } + std::sort(vectorOld.begin(), vectorOld.end()); + vectorOld.erase(std::unique(vectorOld.begin(), vectorOld.end()), vectorOld.end()); + + std::vector<Value> vectorNew; + { + ObjectLock olock (arrayNew); + vectorNew.assign(arrayNew->Begin(), arrayNew->End()); + } + std::sort(vectorNew.begin(), vectorNew.end()); + vectorNew.erase(std::unique(vectorNew.begin(), vectorNew.end()), vectorNew.end()); + + std::set_difference(vectorOld.begin(), vectorOld.end(), vectorNew.begin(), vectorNew.end(), std::back_inserter(deletedValues)); + + return deletedValues; +} + +// Returns the keys that exist in "dictOld" but not in "dictNew" +std::vector<String> IcingaDB::GetDictionaryDeletedKeys(const Dictionary::Ptr& dictOld, const Dictionary::Ptr& dictNew) { + std::vector<String> deletedKeys; + + if (!dictOld) { + return deletedKeys; + } + + std::vector<String> oldKeys = dictOld->GetKeys(); + + if (!dictNew) { + return oldKeys; + } + + std::vector<String> newKeys = dictNew->GetKeys(); + + std::set_difference(oldKeys.begin(), oldKeys.end(), newKeys.begin(), newKeys.end(), std::back_inserter(deletedKeys)); + + return deletedKeys; +} diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp new file mode 100644 index 0000000..6d5ded9 --- /dev/null +++ b/lib/icingadb/icingadb.cpp @@ -0,0 +1,311 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "icingadb/icingadb-ti.cpp" +#include "icingadb/redisconnection.hpp" +#include "remote/apilistener.hpp" +#include "remote/eventqueue.hpp" +#include "base/configuration.hpp" +#include "base/json.hpp" +#include "base/perfdatavalue.hpp" +#include "base/statsfunction.hpp" +#include "base/tlsutility.hpp" +#include "base/utility.hpp" +#include "icinga/checkable.hpp" +#include "icinga/host.hpp" +#include <boost/algorithm/string.hpp> +#include <fstream> +#include <memory> +#include <utility> + +using namespace icinga; + +#define MAX_EVENTS_DEFAULT 5000 + +using Prio = RedisConnection::QueryPriority; + +String IcingaDB::m_EnvironmentId; +std::mutex IcingaDB::m_EnvironmentIdInitMutex; + +REGISTER_TYPE(IcingaDB); + +IcingaDB::IcingaDB() + : m_Rcon(nullptr) +{ + m_RconLocked.store(nullptr); + + m_WorkQueue.SetName("IcingaDB"); + + m_PrefixConfigObject = "icinga:"; + m_PrefixConfigCheckSum = "icinga:checksum:"; +} + +void IcingaDB::Validate(int types, const ValidationUtils& utils) +{ + ObjectImpl<IcingaDB>::Validate(types, utils); + + if (!(types & FAConfig)) + return; + + if (GetEnableTls() && GetCertPath().IsEmpty() != GetKeyPath().IsEmpty()) { + BOOST_THROW_EXCEPTION(ValidationError(this, std::vector<String>(), "Validation failed: Either both a client certificate (cert_path) and its private key (key_path) or none of them must be given.")); + } + + try { + InitEnvironmentId(); + } catch (const std::exception& e) { + BOOST_THROW_EXCEPTION(ValidationError(this, std::vector<String>(), + String("Validation failed: ") + e.what())); + } +} + +/** + * Starts the component. + */ +void IcingaDB::Start(bool runtimeCreated) +{ + ObjectImpl<IcingaDB>::Start(runtimeCreated); + + VERIFY(!m_EnvironmentId.IsEmpty()); + PersistEnvironmentId(); + + Log(LogInformation, "IcingaDB") + << "'" << GetName() << "' started."; + + m_ConfigDumpInProgress = false; + m_ConfigDumpDone = false; + + m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + + m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex(), + GetEnableTls(), GetInsecureNoverify(), GetCertPath(), GetKeyPath(), GetCaPath(), GetCrlPath(), + GetTlsProtocolmin(), GetCipherList(), GetConnectTimeout(), GetDebugInfo()); + m_RconLocked.store(m_Rcon); + + for (const Type::Ptr& type : GetTypes()) { + auto ctype (dynamic_cast<ConfigType*>(type.get())); + if (!ctype) + continue; + + RedisConnection::Ptr con = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex(), + GetEnableTls(), GetInsecureNoverify(), GetCertPath(), GetKeyPath(), GetCaPath(), GetCrlPath(), + GetTlsProtocolmin(), GetCipherList(), GetConnectTimeout(), GetDebugInfo(), m_Rcon); + + con->SetConnectedCallback([this, con](boost::asio::yield_context& yc) { + con->SetConnectedCallback(nullptr); + + size_t pending = --m_PendingRcons; + Log(LogDebug, "IcingaDB") << pending << " pending child connections remaining"; + if (pending == 0) { + m_WorkQueue.Enqueue([this]() { OnConnectedHandler(); }); + } + }); + + m_Rcons[ctype] = std::move(con); + } + + m_PendingRcons = m_Rcons.size(); + + m_Rcon->SetConnectedCallback([this](boost::asio::yield_context& yc) { + m_Rcon->SetConnectedCallback(nullptr); + + for (auto& kv : m_Rcons) { + kv.second->Start(); + } + }); + m_Rcon->Start(); + + m_StatsTimer = Timer::Create(); + m_StatsTimer->SetInterval(1); + m_StatsTimer->OnTimerExpired.connect([this](const Timer * const&) { PublishStatsTimerHandler(); }); + m_StatsTimer->Start(); + + m_WorkQueue.SetName("IcingaDB"); + + m_Rcon->SuppressQueryKind(Prio::CheckResult); + m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync); + + Ptr keepAlive (this); + + m_HistoryThread = std::async(std::launch::async, [this, keepAlive]() { ForwardHistoryEntries(); }); +} + +void IcingaDB::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "IcingaDB", "Exception during redis query. Verify that Redis is operational."); + + Log(LogDebug, "IcingaDB") + << "Exception during redis operation: " << DiagnosticInformation(exp); +} + +void IcingaDB::OnConnectedHandler() +{ + AssertOnWorkQueue(); + + if (m_ConfigDumpInProgress || m_ConfigDumpDone) + return; + + /* Config dump */ + m_ConfigDumpInProgress = true; + PublishStats(); + + UpdateAllConfigObjects(); + + m_ConfigDumpDone = true; + + m_ConfigDumpInProgress = false; +} + +void IcingaDB::PublishStatsTimerHandler(void) +{ + PublishStats(); +} + +void IcingaDB::PublishStats() +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + Dictionary::Ptr status = GetStats(); + status->Set("config_dump_in_progress", m_ConfigDumpInProgress); + status->Set("timestamp", TimestampToMilliseconds(Utility::GetTime())); + status->Set("icingadb_environment", m_EnvironmentId); + + std::vector<String> query {"XADD", "icinga:stats", "MAXLEN", "1", "*"}; + + { + ObjectLock statusLock (status); + for (auto& kv : status) { + query.emplace_back(kv.first); + query.emplace_back(JsonEncode(kv.second)); + } + } + + m_Rcon->FireAndForgetQuery(std::move(query), Prio::Heartbeat); +} + +void IcingaDB::Stop(bool runtimeRemoved) +{ + Log(LogInformation, "IcingaDB") + << "Flushing history data buffer to Redis."; + + if (m_HistoryThread.wait_for(std::chrono::minutes(1)) == std::future_status::timeout) { + Log(LogCritical, "IcingaDB") + << "Flushing takes more than one minute (while we're about to shut down). Giving up and discarding " + << m_HistoryBulker.Size() << " queued history queries."; + } + + m_StatsTimer->Stop(true); + + Log(LogInformation, "IcingaDB") + << "'" << GetName() << "' stopped."; + + ObjectImpl<IcingaDB>::Stop(runtimeRemoved); +} + +void IcingaDB::ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<IcingaDB>::ValidateTlsProtocolmin(lvalue, utils); + + try { + ResolveTlsProtocolVersion(lvalue()); + } catch (const std::exception& ex) { + BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, ex.what())); + } +} + +void IcingaDB::ValidateConnectTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<IcingaDB>::ValidateConnectTimeout(lvalue, utils); + + if (lvalue() <= 0) { + BOOST_THROW_EXCEPTION(ValidationError(this, { "connect_timeout" }, "Value must be greater than 0.")); + } +} + +void IcingaDB::AssertOnWorkQueue() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void IcingaDB::DumpedGlobals::Reset() +{ + std::lock_guard<std::mutex> l (m_Mutex); + m_Ids.clear(); +} + +String IcingaDB::GetEnvironmentId() const { + return m_EnvironmentId; +} + +bool IcingaDB::DumpedGlobals::IsNew(const String& id) +{ + std::lock_guard<std::mutex> l (m_Mutex); + return m_Ids.emplace(id).second; +} + +/** + * Initializes the m_EnvironmentId attribute or throws an exception on failure to do so. Can be called concurrently. + */ +void IcingaDB::InitEnvironmentId() +{ + // Initialize m_EnvironmentId once across all IcingaDB objects. In theory, this could be done using + // std::call_once, however, due to a bug in libstdc++ (https://gcc.gnu.org/bugzilla/show_bug.cgi?id=66146), + // this can result in a deadlock when an exception is thrown (which is explicitly allowed by the standard). + std::unique_lock<std::mutex> lock (m_EnvironmentIdInitMutex); + + if (m_EnvironmentId.IsEmpty()) { + String path = Configuration::DataDir + "/icingadb.env"; + String envId; + + if (Utility::PathExists(path)) { + envId = Utility::LoadJsonFile(path); + + if (envId.GetLength() != 2*SHA_DIGEST_LENGTH) { + throw std::runtime_error("environment ID stored at " + path + " is corrupt: wrong length."); + } + + for (unsigned char c : envId) { + if (!std::isxdigit(c)) { + throw std::runtime_error("environment ID stored at " + path + " is corrupt: invalid hex string."); + } + } + } else { + String caPath = ApiListener::GetDefaultCaPath(); + + if (!Utility::PathExists(caPath)) { + throw std::runtime_error("Cannot find the CA certificate at '" + caPath + "'. " + "Please ensure the ApiListener is enabled first using 'icinga2 api setup'."); + } + + std::shared_ptr<X509> cert = GetX509Certificate(caPath); + + unsigned int n; + unsigned char digest[EVP_MAX_MD_SIZE]; + if (X509_pubkey_digest(cert.get(), EVP_sha1(), digest, &n) != 1) { + BOOST_THROW_EXCEPTION(openssl_error() + << boost::errinfo_api_function("X509_pubkey_digest") + << errinfo_openssl_error(ERR_peek_error())); + } + + envId = BinaryToHex(digest, n); + } + + m_EnvironmentId = envId.ToLower(); + } +} + +/** + * Ensures that the environment ID is persisted on disk or throws an exception on failure to do so. + * Can be called concurrently. + */ +void IcingaDB::PersistEnvironmentId() +{ + String path = Configuration::DataDir + "/icingadb.env"; + + std::unique_lock<std::mutex> lock (m_EnvironmentIdInitMutex); + + if (!Utility::PathExists(path)) { + Utility::SaveJsonFile(path, 0600, m_EnvironmentId); + } +} diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp new file mode 100644 index 0000000..6652d9c --- /dev/null +++ b/lib/icingadb/icingadb.hpp @@ -0,0 +1,241 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef ICINGADB_H +#define ICINGADB_H + +#include "icingadb/icingadb-ti.hpp" +#include "icingadb/redisconnection.hpp" +#include "base/atomic.hpp" +#include "base/bulker.hpp" +#include "base/timer.hpp" +#include "base/workqueue.hpp" +#include "icinga/customvarobject.hpp" +#include "icinga/checkable.hpp" +#include "icinga/service.hpp" +#include "icinga/downtime.hpp" +#include "remote/messageorigin.hpp" +#include <atomic> +#include <chrono> +#include <future> +#include <memory> +#include <mutex> +#include <set> +#include <unordered_map> +#include <unordered_set> +#include <utility> + +namespace icinga +{ + +/** + * @ingroup icingadb + */ +class IcingaDB : public ObjectImpl<IcingaDB> +{ +public: + DECLARE_OBJECT(IcingaDB); + DECLARE_OBJECTNAME(IcingaDB); + + IcingaDB(); + + static void ConfigStaticInitialize(); + + void Validate(int types, const ValidationUtils& utils) override; + virtual void Start(bool runtimeCreated) override; + virtual void Stop(bool runtimeRemoved) override; + + String GetEnvironmentId() const override; + + inline RedisConnection::Ptr GetConnection() + { + return m_RconLocked.load(); + } + + template<class T> + static void AddKvsToMap(const Array::Ptr& kvs, T& map) + { + Value* key = nullptr; + ObjectLock oLock (kvs); + + for (auto& kv : kvs) { + if (key) { + map.emplace(std::move(*key), std::move(kv)); + key = nullptr; + } else { + key = &kv; + } + } + } + +protected: + void ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils) override; + void ValidateConnectTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils) override; + +private: + class DumpedGlobals + { + public: + void Reset(); + bool IsNew(const String& id); + + private: + std::set<String> m_Ids; + std::mutex m_Mutex; + }; + + enum StateUpdate + { + Volatile = 1ull << 0, + RuntimeOnly = 1ull << 1, + Full = Volatile | RuntimeOnly, + }; + + void OnConnectedHandler(); + + void PublishStatsTimerHandler(); + void PublishStats(); + + /* config & status dump */ + void UpdateAllConfigObjects(); + std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize); + void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<String>& keys, RedisConnection::QueryPriority priority); + std::vector<String> GetTypeOverwriteKeys(const String& type); + std::vector<String> GetTypeDumpSignalKeys(const Type::Ptr& type); + void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets, + std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate); + void UpdateState(const Checkable::Ptr& checkable, StateUpdate mode); + void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate); + void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String>>& hMSets, + std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate); + void SendConfigDelete(const ConfigObject::Ptr& object); + void SendStateChange(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type); + void AddObjectDataToRuntimeUpdates(std::vector<Dictionary::Ptr>& runtimeUpdates, const String& objectKey, + const String& redisKey, const Dictionary::Ptr& data); + void DeleteRelationship(const String& id, const String& redisKeyWithoutPrefix, bool hasChecksum = false); + + void SendSentNotification( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, + NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text, double sendTime + ); + + void SendStartedDowntime(const Downtime::Ptr& downtime); + void SendRemovedDowntime(const Downtime::Ptr& downtime); + void SendAddedComment(const Comment::Ptr& comment); + void SendRemovedComment(const Comment::Ptr& comment); + void SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange); + void SendNextUpdate(const Checkable::Ptr& checkable); + void SendAcknowledgementSet(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry); + void SendAcknowledgementCleared(const Checkable::Ptr& checkable, const String& removedBy, double changeTime, double ackLastChange); + void SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues); + void SendNotificationUserGroupsChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues); + void SendTimePeriodRangesChanged(const TimePeriod::Ptr& timeperiod, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + void SendTimePeriodIncludesChanged(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues); + void SendTimePeriodExcludesChanged(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues); + template<class T> + void SendGroupsChanged(const ConfigObject::Ptr& command, const Array::Ptr& oldValues, const Array::Ptr& newValues); + void SendCommandEnvChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + void SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + void SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + + void ForwardHistoryEntries(); + + std::vector<String> UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride); + Dictionary::Ptr SerializeState(const Checkable::Ptr& checkable); + + /* Stats */ + Dictionary::Ptr GetStats(); + + /* utilities */ + static String FormatCheckSumBinary(const String& str); + static String FormatCommandLine(const Value& commandLine); + static long long TimestampToMilliseconds(double timestamp); + static String IcingaToStreamValue(const Value& value); + static std::vector<Value> GetArrayDeletedValues(const Array::Ptr& arrayOld, const Array::Ptr& arrayNew); + static std::vector<String> GetDictionaryDeletedKeys(const Dictionary::Ptr& dictOld, const Dictionary::Ptr& dictNew); + + static String GetObjectIdentifier(const ConfigObject::Ptr& object); + static String CalcEventID(const char* eventType, const ConfigObject::Ptr& object, double eventTime = 0, NotificationType nt = NotificationType(0)); + static const char* GetNotificationTypeByEnum(NotificationType type); + static Dictionary::Ptr SerializeVars(const Dictionary::Ptr& vars); + + static String HashValue(const Value& value); + static String HashValue(const Value& value, const std::set<String>& propertiesBlacklist, bool propertiesWhitelist = false); + + static String GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj); + static bool PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checkSums); + + static void ReachabilityChangeHandler(const std::set<Checkable::Ptr>& children); + static void StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type); + static void VersionChangedHandler(const ConfigObject::Ptr& object); + static void DowntimeStartedHandler(const Downtime::Ptr& downtime); + static void DowntimeRemovedHandler(const Downtime::Ptr& downtime); + + static void NotificationSentToAllUsersHandler( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, + NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text + ); + + static void CommentAddedHandler(const Comment::Ptr& comment); + static void CommentRemovedHandler(const Comment::Ptr& comment); + static void FlappingChangeHandler(const Checkable::Ptr& checkable, double changeTime); + static void NewCheckResultHandler(const Checkable::Ptr& checkable); + static void NextCheckUpdatedHandler(const Checkable::Ptr& checkable); + static void HostProblemChangedHandler(const Service::Ptr& service); + static void AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry); + static void AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const String& removedBy, double changeTime); + static void NotificationUsersChangedHandler(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues); + static void NotificationUserGroupsChangedHandler(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues); + static void TimePeriodRangesChangedHandler(const TimePeriod::Ptr& timeperiod, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + static void TimePeriodIncludesChangedHandler(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues); + static void TimePeriodExcludesChangedHandler(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues); + static void UserGroupsChangedHandler(const User::Ptr& user, const Array::Ptr&, const Array::Ptr& newValues); + static void HostGroupsChangedHandler(const Host::Ptr& host, const Array::Ptr& oldValues, const Array::Ptr& newValues); + static void ServiceGroupsChangedHandler(const Service::Ptr& service, const Array::Ptr& oldValues, const Array::Ptr& newValues); + static void CommandEnvChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + static void CommandArgumentsChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + static void CustomVarsChangedHandler(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + + void AssertOnWorkQueue(); + + void ExceptionHandler(boost::exception_ptr exp); + + static std::vector<Type::Ptr> GetTypes(); + + static void InitEnvironmentId(); + static void PersistEnvironmentId(); + + Timer::Ptr m_StatsTimer; + WorkQueue m_WorkQueue{0, 1, LogNotice}; + + std::future<void> m_HistoryThread; + Bulker<RedisConnection::Query> m_HistoryBulker {4096, std::chrono::milliseconds(250)}; + + String m_PrefixConfigObject; + String m_PrefixConfigCheckSum; + + bool m_ConfigDumpInProgress; + bool m_ConfigDumpDone; + + RedisConnection::Ptr m_Rcon; + // m_RconLocked containes a copy of the value in m_Rcon where all accesses are guarded by a mutex to allow safe + // concurrent access like from the icingadb check command. It's a copy to still allow fast access without additional + // syncronization to m_Rcon within the IcingaDB feature itself. + Locked<RedisConnection::Ptr> m_RconLocked; + std::unordered_map<ConfigType*, RedisConnection::Ptr> m_Rcons; + std::atomic_size_t m_PendingRcons; + + struct { + DumpedGlobals CustomVar, ActionUrl, NotesUrl, IconImage; + } m_DumpedGlobals; + + // m_EnvironmentId is shared across all IcingaDB objects (typically there is at most one, but it is perfectly fine + // to have multiple ones). It is initialized once (synchronized using m_EnvironmentIdInitMutex). After successful + // initialization, the value is read-only and can be accessed without further synchronization. + static String m_EnvironmentId; + static std::mutex m_EnvironmentIdInitMutex; + + static std::unordered_set<Type*> m_IndexedTypes; +}; +} + +#endif /* ICINGADB_H */ diff --git a/lib/icingadb/icingadb.ti b/lib/icingadb/icingadb.ti new file mode 100644 index 0000000..1c649c8 --- /dev/null +++ b/lib/icingadb/icingadb.ti @@ -0,0 +1,63 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" +#include "base/tlsutility.hpp" + +library icingadb; + +namespace icinga +{ + +class IcingaDB : ConfigObject +{ + activation_priority 100; + + [config] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config] int port { + default {{{ return 6380; }}} + }; + [config] String path; + [config, no_user_view, no_user_modify] String password; + [config] int db_index; + + [config] bool enable_tls { + default {{{ return false; }}} + }; + + [config] bool insecure_noverify { + default {{{ return false; }}} + }; + + [config] String cert_path; + [config] String key_path; + [config] String ca_path; + [config] String crl_path; + [config] String cipher_list { + default {{{ return DEFAULT_TLS_CIPHERS; }}} + }; + [config] String tls_protocolmin { + default {{{ return DEFAULT_TLS_PROTOCOLMIN; }}} + }; + + [config] double connect_timeout { + default {{{ return DEFAULT_CONNECT_TIMEOUT; }}} + }; + + [no_storage] String environment_id { + get; + }; + + [set_protected] double ongoing_dump_start { + default {{{ return 0; }}} + }; + [state, set_protected] double lastdump_end { + default {{{ return 0; }}} + }; + [state, set_protected] double lastdump_took { + default {{{ return 0; }}} + }; +}; + +} diff --git a/lib/icingadb/icingadbchecktask.cpp b/lib/icingadb/icingadbchecktask.cpp new file mode 100644 index 0000000..f7c5964 --- /dev/null +++ b/lib/icingadb/icingadbchecktask.cpp @@ -0,0 +1,513 @@ +/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadbchecktask.hpp" +#include "icinga/host.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/pluginutility.hpp" +#include "base/function.hpp" +#include "base/utility.hpp" +#include "base/perfdatavalue.hpp" +#include "base/convert.hpp" +#include <utility> + +using namespace icinga; + +REGISTER_FUNCTION_NONCONST(Internal, IcingadbCheck, &IcingadbCheckTask::ScriptFunc, "checkable:cr:resolvedMacros:useResolvedMacros"); + +static void ReportIcingadbCheck( + const Checkable::Ptr& checkable, const CheckCommand::Ptr& commandObj, + const CheckResult::Ptr& cr, String output, ServiceState state) +{ + if (Checkable::ExecuteCommandProcessFinishedHandler) { + double now = Utility::GetTime(); + ProcessResult pr; + pr.PID = -1; + pr.Output = std::move(output); + pr.ExecutionStart = now; + pr.ExecutionEnd = now; + pr.ExitStatus = state; + + Checkable::ExecuteCommandProcessFinishedHandler(commandObj->GetName(), pr); + } else { + cr->SetState(state); + cr->SetOutput(output); + checkable->ProcessCheckResult(cr); + } +} + +static inline +double GetXMessageTs(const Array::Ptr& xMessage) +{ + return Convert::ToLong(String(xMessage->Get(0)).Split("-")[0]) / 1000.0; +} + +void IcingadbCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, + const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros) +{ + CheckCommand::Ptr commandObj = CheckCommand::ExecuteOverride ? CheckCommand::ExecuteOverride : checkable->GetCheckCommand(); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + MacroProcessor::ResolverList resolvers; + String silenceMissingMacroWarning; + + if (MacroResolver::OverrideMacros) + resolvers.emplace_back("override", MacroResolver::OverrideMacros); + + if (service) + resolvers.emplace_back("service", service); + resolvers.emplace_back("host", host); + resolvers.emplace_back("command", commandObj); + + auto resolve ([&](const String& macro) { + return MacroProcessor::ResolveMacros(macro, resolvers, checkable->GetLastCheckResult(), + &silenceMissingMacroWarning, MacroProcessor::EscapeCallback(), resolvedMacros, useResolvedMacros); + }); + + struct Thresholds + { + Value Warning, Critical; + }; + + auto resolveThresholds ([&resolve](const String& wmacro, const String& cmacro) { + return Thresholds{resolve(wmacro), resolve(cmacro)}; + }); + + String icingadbName = resolve("$icingadb_name$"); + + auto dumpTakesThresholds (resolveThresholds("$icingadb_full_dump_duration_warning$", "$icingadb_full_dump_duration_critical$")); + auto syncTakesThresholds (resolveThresholds("$icingadb_full_sync_duration_warning$", "$icingadb_full_sync_duration_critical$")); + auto icingaBacklogThresholds (resolveThresholds("$icingadb_redis_backlog_warning$", "$icingadb_redis_backlog_critical$")); + auto icingadbBacklogThresholds (resolveThresholds("$icingadb_database_backlog_warning$", "$icingadb_database_backlog_critical$")); + + if (resolvedMacros && !useResolvedMacros) + return; + + if (icingadbName.IsEmpty()) { + ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB UNKNOWN: Attribute 'icingadb_name' must be set.", ServiceUnknown); + return; + } + + auto conn (IcingaDB::GetByName(icingadbName)); + + if (!conn) { + ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB UNKNOWN: Icinga DB connection '" + icingadbName + "' does not exist.", ServiceUnknown); + return; + } + + auto redis (conn->GetConnection()); + + if (!redis || !redis->GetConnected()) { + ReportIcingadbCheck(checkable, commandObj, cr, "Icinga DB CRITICAL: Not connected to Redis.", ServiceCritical); + return; + } + + auto now (Utility::GetTime()); + Array::Ptr redisTime, xReadHeartbeat, xReadStats, xReadRuntimeBacklog, xReadHistoryBacklog; + + try { + auto replies (redis->GetResultsOfQueries( + { + {"TIME"}, + {"XREAD", "STREAMS", "icingadb:telemetry:heartbeat", "0-0"}, + {"XREAD", "STREAMS", "icingadb:telemetry:stats", "0-0"}, + {"XREAD", "COUNT", "1", "STREAMS", "icinga:runtime", "icinga:runtime:state", "0-0", "0-0"}, + { + "XREAD", "COUNT", "1", "STREAMS", + "icinga:history:stream:acknowledgement", + "icinga:history:stream:comment", + "icinga:history:stream:downtime", + "icinga:history:stream:flapping", + "icinga:history:stream:notification", + "icinga:history:stream:state", + "0-0", "0-0", "0-0", "0-0", "0-0", "0-0", + } + }, + RedisConnection::QueryPriority::Heartbeat + )); + + redisTime = std::move(replies.at(0)); + xReadHeartbeat = std::move(replies.at(1)); + xReadStats = std::move(replies.at(2)); + xReadRuntimeBacklog = std::move(replies.at(3)); + xReadHistoryBacklog = std::move(replies.at(4)); + } catch (const std::exception& ex) { + ReportIcingadbCheck( + checkable, commandObj, cr, + String("Icinga DB CRITICAL: Could not query Redis: ") + ex.what(), ServiceCritical + ); + return; + } + + if (!xReadHeartbeat) { + ReportIcingadbCheck( + checkable, commandObj, cr, + "Icinga DB CRITICAL: The Icinga DB daemon seems to have never run. (Missing heartbeat)", + ServiceCritical + ); + + return; + } + + auto redisOldestPending (redis->GetOldestPendingQueryTs()); + auto ongoingDumpStart (conn->GetOngoingDumpStart()); + auto dumpWhen (conn->GetLastdumpEnd()); + auto dumpTook (conn->GetLastdumpTook()); + + auto redisNow (Convert::ToLong(redisTime->Get(0)) + Convert::ToLong(redisTime->Get(1)) / 1000000.0); + Array::Ptr heartbeatMessage = Array::Ptr(Array::Ptr(xReadHeartbeat->Get(0))->Get(1))->Get(0); + auto heartbeatTime (GetXMessageTs(heartbeatMessage)); + std::map<String, String> heartbeatData; + + IcingaDB::AddKvsToMap(heartbeatMessage->Get(1), heartbeatData); + + String version = heartbeatData.at("version"); + auto icingadbNow (Convert::ToLong(heartbeatData.at("time")) / 1000.0 + (redisNow - heartbeatTime)); + auto icingadbStartTime (Convert::ToLong(heartbeatData.at("start-time")) / 1000.0); + String errMsg (heartbeatData.at("error")); + auto errSince (Convert::ToLong(heartbeatData.at("error-since")) / 1000.0); + String perfdataFromRedis = heartbeatData.at("performance-data"); + auto heartbeatLastReceived (Convert::ToLong(heartbeatData.at("last-heartbeat-received")) / 1000.0); + bool weResponsible = Convert::ToLong(heartbeatData.at("ha-responsible")); + auto weResponsibleTs (Convert::ToLong(heartbeatData.at("ha-responsible-ts")) / 1000.0); + bool otherResponsible = Convert::ToLong(heartbeatData.at("ha-other-responsible")); + auto syncOngoingSince (Convert::ToLong(heartbeatData.at("sync-ongoing-since")) / 1000.0); + auto syncSuccessWhen (Convert::ToLong(heartbeatData.at("sync-success-finish")) / 1000.0); + auto syncSuccessTook (Convert::ToLong(heartbeatData.at("sync-success-duration")) / 1000.0); + + std::ostringstream i2okmsgs, idbokmsgs, warnmsgs, critmsgs; + Array::Ptr perfdata = new Array(); + + i2okmsgs << std::fixed << std::setprecision(3); + idbokmsgs << std::fixed << std::setprecision(3); + warnmsgs << std::fixed << std::setprecision(3); + critmsgs << std::fixed << std::setprecision(3); + + const auto downForCritical (10); + auto downFor (redisNow - heartbeatTime); + bool down = false; + + if (downFor > downForCritical) { + down = true; + + critmsgs << " Last seen " << Utility::FormatDuration(downFor) + << " ago, greater than CRITICAL threshold (" << Utility::FormatDuration(downForCritical) << ")!"; + } else { + idbokmsgs << "\n* Last seen: " << Utility::FormatDuration(downFor) << " ago"; + } + + perfdata->Add(new PerfdataValue("icingadb_heartbeat_age", downFor, false, "seconds", Empty, downForCritical, 0)); + + const auto errForCritical (10); + auto err (!errMsg.IsEmpty()); + auto errFor (icingadbNow - errSince); + + if (err) { + if (errFor > errForCritical) { + critmsgs << " ERROR: " << errMsg << "!"; + } + + perfdata->Add(new PerfdataValue("error_for", errFor * (err ? 1 : -1), false, "seconds", Empty, errForCritical, 0)); + } + + if (!down) { + const auto heartbeatLagWarning (3/* Icinga DB read freq. */ + 1/* Icinga DB write freq. */ + 2/* threshold */); + auto heartbeatLag (fmin(icingadbNow - heartbeatLastReceived, 10 * 60)); + + if (!heartbeatLastReceived) { + critmsgs << " Lost Icinga 2 heartbeat!"; + } else if (heartbeatLag > heartbeatLagWarning) { + warnmsgs << " Icinga 2 heartbeat lag: " << Utility::FormatDuration(heartbeatLag) + << ", greater than WARNING threshold (" << Utility::FormatDuration(heartbeatLagWarning) << ")."; + } + + perfdata->Add(new PerfdataValue("icinga2_heartbeat_age", heartbeatLag, false, "seconds", heartbeatLagWarning, Empty, 0)); + } + + if (weResponsible) { + idbokmsgs << "\n* Responsible"; + } else if (otherResponsible) { + idbokmsgs << "\n* Not responsible, but another instance is"; + } else { + critmsgs << " No instance is responsible!"; + } + + perfdata->Add(new PerfdataValue("icingadb_responsible_instances", int(weResponsible || otherResponsible), false, "", Empty, Empty, 0, 1)); + + const auto clockDriftWarning (5); + const auto clockDriftCritical (30); + auto clockDrift (std::max({ + fabs(now - redisNow), + fabs(redisNow - icingadbNow), + fabs(icingadbNow - now), + })); + + if (clockDrift > clockDriftCritical) { + critmsgs << " Icinga 2/Redis/Icinga DB clock drift: " << Utility::FormatDuration(clockDrift) + << ", greater than CRITICAL threshold (" << Utility::FormatDuration(clockDriftCritical) << ")!"; + } else if (clockDrift > clockDriftWarning) { + warnmsgs << " Icinga 2/Redis/Icinga DB clock drift: " << Utility::FormatDuration(clockDrift) + << ", greater than WARNING threshold (" << Utility::FormatDuration(clockDriftWarning) << ")."; + } + + perfdata->Add(new PerfdataValue("clock_drift", clockDrift, false, "seconds", clockDriftWarning, clockDriftCritical, 0)); + + if (ongoingDumpStart) { + auto ongoingDumpTakes (now - ongoingDumpStart); + + if (!dumpTakesThresholds.Critical.IsEmpty() && ongoingDumpTakes > dumpTakesThresholds.Critical) { + critmsgs << " Current Icinga 2 full dump already takes " << Utility::FormatDuration(ongoingDumpTakes) + << ", greater than CRITICAL threshold (" << Utility::FormatDuration(dumpTakesThresholds.Critical) << ")!"; + } else if (!dumpTakesThresholds.Warning.IsEmpty() && ongoingDumpTakes > dumpTakesThresholds.Warning) { + warnmsgs << " Current Icinga 2 full dump already takes " << Utility::FormatDuration(ongoingDumpTakes) + << ", greater than WARNING threshold (" << Utility::FormatDuration(dumpTakesThresholds.Warning) << ")."; + } else { + i2okmsgs << "\n* Current full dump running for " << Utility::FormatDuration(ongoingDumpTakes); + } + + perfdata->Add(new PerfdataValue("icinga2_current_full_dump_duration", ongoingDumpTakes, false, "seconds", + dumpTakesThresholds.Warning, dumpTakesThresholds.Critical, 0)); + } + + if (!down && syncOngoingSince) { + auto ongoingSyncTakes (icingadbNow - syncOngoingSince); + + if (!syncTakesThresholds.Critical.IsEmpty() && ongoingSyncTakes > syncTakesThresholds.Critical) { + critmsgs << " Current full sync already takes " << Utility::FormatDuration(ongoingSyncTakes) + << ", greater than CRITICAL threshold (" << Utility::FormatDuration(syncTakesThresholds.Critical) << ")!"; + } else if (!syncTakesThresholds.Warning.IsEmpty() && ongoingSyncTakes > syncTakesThresholds.Warning) { + warnmsgs << " Current full sync already takes " << Utility::FormatDuration(ongoingSyncTakes) + << ", greater than WARNING threshold (" << Utility::FormatDuration(syncTakesThresholds.Warning) << ")."; + } else { + idbokmsgs << "\n* Current full sync running for " << Utility::FormatDuration(ongoingSyncTakes); + } + + perfdata->Add(new PerfdataValue("icingadb_current_full_sync_duration", ongoingSyncTakes, false, "seconds", + syncTakesThresholds.Warning, syncTakesThresholds.Critical, 0)); + } + + auto redisBacklog (now - redisOldestPending); + + if (!redisOldestPending) { + redisBacklog = 0; + } + + if (!icingaBacklogThresholds.Critical.IsEmpty() && redisBacklog > icingaBacklogThresholds.Critical) { + critmsgs << " Icinga 2 Redis query backlog: " << Utility::FormatDuration(redisBacklog) + << ", greater than CRITICAL threshold (" << Utility::FormatDuration(icingaBacklogThresholds.Critical) << ")!"; + } else if (!icingaBacklogThresholds.Warning.IsEmpty() && redisBacklog > icingaBacklogThresholds.Warning) { + warnmsgs << " Icinga 2 Redis query backlog: " << Utility::FormatDuration(redisBacklog) + << ", greater than WARNING threshold (" << Utility::FormatDuration(icingaBacklogThresholds.Warning) << ")."; + } + + perfdata->Add(new PerfdataValue("icinga2_redis_query_backlog", redisBacklog, false, "seconds", + icingaBacklogThresholds.Warning, icingaBacklogThresholds.Critical, 0)); + + if (!down) { + auto getBacklog = [redisNow](const Array::Ptr& streams) -> double { + if (!streams) { + return 0; + } + + double minTs = 0; + ObjectLock lock (streams); + + for (Array::Ptr stream : streams) { + auto ts (GetXMessageTs(Array::Ptr(stream->Get(1))->Get(0))); + + if (minTs == 0 || ts < minTs) { + minTs = ts; + } + } + + if (minTs > 0) { + return redisNow - minTs; + } else { + return 0; + } + }; + + double historyBacklog = getBacklog(xReadHistoryBacklog); + + if (!icingadbBacklogThresholds.Critical.IsEmpty() && historyBacklog > icingadbBacklogThresholds.Critical) { + critmsgs << " History backlog: " << Utility::FormatDuration(historyBacklog) + << ", greater than CRITICAL threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Critical) << ")!"; + } else if (!icingadbBacklogThresholds.Warning.IsEmpty() && historyBacklog > icingadbBacklogThresholds.Warning) { + warnmsgs << " History backlog: " << Utility::FormatDuration(historyBacklog) + << ", greater than WARNING threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Warning) << ")."; + } + + perfdata->Add(new PerfdataValue("icingadb_history_backlog", historyBacklog, false, "seconds", + icingadbBacklogThresholds.Warning, icingadbBacklogThresholds.Critical, 0)); + + double runtimeBacklog = 0; + + if (weResponsible && !syncOngoingSince) { + // These streams are only processed by the responsible instance after the full sync finished, + // it's fine for some backlog to exist otherwise. + runtimeBacklog = getBacklog(xReadRuntimeBacklog); + + if (!icingadbBacklogThresholds.Critical.IsEmpty() && runtimeBacklog > icingadbBacklogThresholds.Critical) { + critmsgs << " Runtime update backlog: " << Utility::FormatDuration(runtimeBacklog) + << ", greater than CRITICAL threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Critical) << ")!"; + } else if (!icingadbBacklogThresholds.Warning.IsEmpty() && runtimeBacklog > icingadbBacklogThresholds.Warning) { + warnmsgs << " Runtime update backlog: " << Utility::FormatDuration(runtimeBacklog) + << ", greater than WARNING threshold (" << Utility::FormatDuration(icingadbBacklogThresholds.Warning) << ")."; + } + } + + // Also report the perfdata value on the standby instance or during a full sync (as 0 in this case). + perfdata->Add(new PerfdataValue("icingadb_runtime_update_backlog", runtimeBacklog, false, "seconds", + icingadbBacklogThresholds.Warning, icingadbBacklogThresholds.Critical, 0)); + } + + auto dumpAgo (now - dumpWhen); + + if (dumpWhen) { + perfdata->Add(new PerfdataValue("icinga2_last_full_dump_ago", dumpAgo, false, "seconds", Empty, Empty, 0)); + } + + if (dumpTook) { + perfdata->Add(new PerfdataValue("icinga2_last_full_dump_duration", dumpTook, false, "seconds", Empty, Empty, 0)); + } + + if (dumpWhen && dumpTook) { + i2okmsgs << "\n* Last full dump: " << Utility::FormatDuration(dumpAgo) + << " ago, took " << Utility::FormatDuration(dumpTook); + } + + auto icingadbUptime (icingadbNow - icingadbStartTime); + + if (!down) { + perfdata->Add(new PerfdataValue("icingadb_uptime", icingadbUptime, false, "seconds", Empty, Empty, 0)); + } + + { + Array::Ptr values = PluginUtility::SplitPerfdata(perfdataFromRedis); + ObjectLock lock (values); + + for (auto& v : values) { + perfdata->Add(PerfdataValue::Parse(v)); + } + } + + if (weResponsibleTs) { + perfdata->Add(new PerfdataValue("icingadb_responsible_for", + (weResponsible ? 1 : -1) * (icingadbNow - weResponsibleTs), false, "seconds")); + } + + auto syncAgo (icingadbNow - syncSuccessWhen); + + if (syncSuccessWhen) { + perfdata->Add(new PerfdataValue("icingadb_last_full_sync_ago", syncAgo, false, "seconds", Empty, Empty, 0)); + } + + if (syncSuccessTook) { + perfdata->Add(new PerfdataValue("icingadb_last_full_sync_duration", syncSuccessTook, false, "seconds", Empty, Empty, 0)); + } + + if (syncSuccessWhen && syncSuccessTook) { + idbokmsgs << "\n* Last full sync: " << Utility::FormatDuration(syncAgo) + << " ago, took " << Utility::FormatDuration(syncSuccessTook); + } + + std::map<String, RingBuffer> statsPerOp; + + const char * const icingadbKnownStats[] = { + "config_sync", "state_sync", "history_sync", "overdue_sync", "history_cleanup" + }; + + for (auto metric : icingadbKnownStats) { + statsPerOp.emplace(std::piecewise_construct, std::forward_as_tuple(metric), std::forward_as_tuple(15 * 60)); + } + + if (xReadStats) { + Array::Ptr messages = Array::Ptr(xReadStats->Get(0))->Get(1); + ObjectLock lock (messages); + + for (Array::Ptr message : messages) { + auto ts (GetXMessageTs(message)); + std::map<String, String> opsPerSec; + + IcingaDB::AddKvsToMap(message->Get(1), opsPerSec); + + for (auto& kv : opsPerSec) { + auto buf (statsPerOp.find(kv.first)); + + if (buf == statsPerOp.end()) { + buf = statsPerOp.emplace( + std::piecewise_construct, + std::forward_as_tuple(kv.first), std::forward_as_tuple(15 * 60) + ).first; + } + + buf->second.InsertValue(ts, Convert::ToLong(kv.second)); + } + } + } + + for (auto& kv : statsPerOp) { + perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_items_1min", kv.second.UpdateAndGetValues(now, 60), false, "", Empty, Empty, 0)); + perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_items_5mins", kv.second.UpdateAndGetValues(now, 5 * 60), false, "", Empty, Empty, 0)); + perfdata->Add(new PerfdataValue("icingadb_" + kv.first + "_items_15mins", kv.second.UpdateAndGetValues(now, 15 * 60), false, "", Empty, Empty, 0)); + } + + perfdata->Add(new PerfdataValue("icinga2_redis_queries_1min", redis->GetQueryCount(60), false, "", Empty, Empty, 0)); + perfdata->Add(new PerfdataValue("icinga2_redis_queries_5mins", redis->GetQueryCount(5 * 60), false, "", Empty, Empty, 0)); + perfdata->Add(new PerfdataValue("icinga2_redis_queries_15mins", redis->GetQueryCount(15 * 60), false, "", Empty, Empty, 0)); + + perfdata->Add(new PerfdataValue("icinga2_redis_pending_queries", redis->GetPendingQueryCount(), false, "", Empty, Empty, 0)); + + struct { + const char * Name; + int (RedisConnection::* Getter)(RingBuffer::SizeType span, RingBuffer::SizeType tv); + } const icingaWriteSubjects[] = { + {"config_dump", &RedisConnection::GetWrittenConfigFor}, + {"state_dump", &RedisConnection::GetWrittenStateFor}, + {"history_dump", &RedisConnection::GetWrittenHistoryFor} + }; + + for (auto subject : icingaWriteSubjects) { + perfdata->Add(new PerfdataValue(String("icinga2_") + subject.Name + "_items_1min", (redis.get()->*subject.Getter)(60, now), false, "", Empty, Empty, 0)); + perfdata->Add(new PerfdataValue(String("icinga2_") + subject.Name + "_items_5mins", (redis.get()->*subject.Getter)(5 * 60, now), false, "", Empty, Empty, 0)); + perfdata->Add(new PerfdataValue(String("icinga2_") + subject.Name + "_items_15mins", (redis.get()->*subject.Getter)(15 * 60, now), false, "", Empty, Empty, 0)); + } + + ServiceState state; + std::ostringstream msgbuf; + auto i2okmsg (i2okmsgs.str()); + auto idbokmsg (idbokmsgs.str()); + auto warnmsg (warnmsgs.str()); + auto critmsg (critmsgs.str()); + + msgbuf << "Icinga DB "; + + if (!critmsg.empty()) { + state = ServiceCritical; + msgbuf << "CRITICAL:" << critmsg; + + if (!warnmsg.empty()) { + msgbuf << "\n\nWARNING:" << warnmsg; + } + } else if (!warnmsg.empty()) { + state = ServiceWarning; + msgbuf << "WARNING:" << warnmsg; + } else { + state = ServiceOK; + msgbuf << "OK: Uptime: " << Utility::FormatDuration(icingadbUptime) << ". Version: " << version << "."; + } + + if (!i2okmsg.empty()) { + msgbuf << "\n\nIcinga 2:\n" << i2okmsg; + } + + if (!idbokmsg.empty()) { + msgbuf << "\n\nIcinga DB:\n" << idbokmsg; + } + + cr->SetPerformanceData(perfdata); + ReportIcingadbCheck(checkable, commandObj, cr, msgbuf.str(), state); +} diff --git a/lib/icingadb/icingadbchecktask.hpp b/lib/icingadb/icingadbchecktask.hpp new file mode 100644 index 0000000..ba7d61b --- /dev/null +++ b/lib/icingadb/icingadbchecktask.hpp @@ -0,0 +1,29 @@ +/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */ + +#ifndef ICINGADBCHECKTASK_H +#define ICINGADBCHECKTASK_H + +#include "icingadb/icingadb.hpp" +#include "icinga/checkable.hpp" + +namespace icinga +{ + +/** + * Icinga DB check. + * + * @ingroup icingadb + */ +class IcingadbCheckTask +{ +public: + static void ScriptFunc(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, + const Dictionary::Ptr& resolvedMacros, bool useResolvedMacros); + +private: + IcingadbCheckTask(); +}; + +} + +#endif /* ICINGADBCHECKTASK_H */ diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp new file mode 100644 index 0000000..798a827 --- /dev/null +++ b/lib/icingadb/redisconnection.cpp @@ -0,0 +1,773 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/redisconnection.hpp" +#include "base/array.hpp" +#include "base/convert.hpp" +#include "base/defer.hpp" +#include "base/exception.hpp" +#include "base/io-engine.hpp" +#include "base/logger.hpp" +#include "base/objectlock.hpp" +#include "base/string.hpp" +#include "base/tcpsocket.hpp" +#include "base/tlsutility.hpp" +#include "base/utility.hpp" +#include <boost/asio.hpp> +#include <boost/coroutine/exceptions.hpp> +#include <boost/date_time/posix_time/posix_time_duration.hpp> +#include <boost/utility/string_view.hpp> +#include <boost/variant/get.hpp> +#include <exception> +#include <future> +#include <iterator> +#include <memory> +#include <openssl/ssl.h> +#include <openssl/x509_vfy.h> +#include <utility> + +using namespace icinga; +namespace asio = boost::asio; + +boost::regex RedisConnection::m_ErrAuth ("\\AERR AUTH "); + +RedisConnection::RedisConnection(const String& host, int port, const String& path, const String& password, int db, + bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath, + const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent) + : RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db, + useTls, insecure, certPath, keyPath, caPath, crlPath, tlsProtocolmin, cipherList, connectTimeout, std::move(di), parent) +{ +} + +RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, + int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath, + String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent) + : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), + m_DbIndex(db), m_CertPath(std::move(certPath)), m_KeyPath(std::move(keyPath)), m_Insecure(insecure), + m_CaPath(std::move(caPath)), m_CrlPath(std::move(crlPath)), m_TlsProtocolmin(std::move(tlsProtocolmin)), + m_CipherList(std::move(cipherList)), m_ConnectTimeout(connectTimeout), m_DebugInfo(std::move(di)), m_Connecting(false), m_Connected(false), + m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent) +{ + if (useTls && m_Path.IsEmpty()) { + UpdateTLSContext(); + } +} + +void RedisConnection::UpdateTLSContext() +{ + m_TLSContext = SetupSslContext(m_CertPath, m_KeyPath, m_CaPath, + m_CrlPath, m_CipherList, m_TlsProtocolmin, m_DebugInfo); +} + +void RedisConnection::Start() +{ + if (!m_Started.exchange(true)) { + Ptr keepAlive (this); + + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); + + if (!m_Parent) { + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); }); + } + } + + if (!m_Connecting.exchange(true)) { + Ptr keepAlive (this); + + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); + } +} + +bool RedisConnection::IsConnected() { + return m_Connected.load(); +} + +/** + * Append a Redis query to a log message + * + * @param query Redis query + * @param msg Log message + */ +static inline +void LogQuery(RedisConnection::Query& query, Log& msg) +{ + int i = 0; + + for (auto& arg : query) { + if (++i == 8) { + msg << " ..."; + break; + } + + if (arg.GetLength() > 64) { + msg << " '" << arg.SubStr(0, 61) << "...'"; + } else { + msg << " '" << arg << '\''; + } + } +} + +/** + * Queue a Redis query for sending + * + * @param query Redis query + * @param priority The query's priority + */ +void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects) +{ + if (LogDebug >= Logger::GetMinLogSeverity()) { + Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:"); + LogQuery(query, msg); + } + + auto item (Shared<Query>::Make(std::move(query))); + auto ctime (Utility::GetTime()); + + asio::post(m_Strand, [this, item, priority, ctime, affects]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects}); + m_QueuedWrites.Set(); + IncreasePendingQueries(1); + }); +} + +/** + * Queue Redis queries for sending + * + * @param queries Redis queries + * @param priority The queries' priority + */ +void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects) +{ + if (LogDebug >= Logger::GetMinLogSeverity()) { + for (auto& query : queries) { + Log msg(LogDebug, "IcingaDB", "Firing and forgetting query:"); + LogQuery(query, msg); + } + } + + auto item (Shared<Queries>::Make(std::move(queries))); + auto ctime (Utility::GetTime()); + + asio::post(m_Strand, [this, item, priority, ctime, affects]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects}); + m_QueuedWrites.Set(); + IncreasePendingQueries(item->size()); + }); +} + +/** + * Queue a Redis query for sending, wait for the response and return (or throw) it + * + * @param query Redis query + * @param priority The query's priority + * + * @return The response + */ +RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects) +{ + if (LogDebug >= Logger::GetMinLogSeverity()) { + Log msg (LogDebug, "IcingaDB", "Executing query:"); + LogQuery(query, msg); + } + + std::promise<Reply> promise; + auto future (promise.get_future()); + auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise))); + auto ctime (Utility::GetTime()); + + asio::post(m_Strand, [this, item, priority, ctime, affects]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects}); + m_QueuedWrites.Set(); + IncreasePendingQueries(1); + }); + + item = nullptr; + future.wait(); + return future.get(); +} + +/** + * Queue Redis queries for sending, wait for the responses and return (or throw) them + * + * @param queries Redis queries + * @param priority The queries' priority + * + * @return The responses + */ +RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects) +{ + if (LogDebug >= Logger::GetMinLogSeverity()) { + for (auto& query : queries) { + Log msg(LogDebug, "IcingaDB", "Executing query:"); + LogQuery(query, msg); + } + } + + std::promise<Replies> promise; + auto future (promise.get_future()); + auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise))); + auto ctime (Utility::GetTime()); + + asio::post(m_Strand, [this, item, priority, ctime, affects]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects}); + m_QueuedWrites.Set(); + IncreasePendingQueries(item->first.size()); + }); + + item = nullptr; + future.wait(); + return future.get(); +} + +void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, RedisConnection::QueryPriority priority) +{ + auto ctime (Utility::GetTime()); + + asio::post(m_Strand, [this, callback, priority, ctime]() { + m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime}); + m_QueuedWrites.Set(); + }); +} + +/** + * Puts a no-op command with a result at the end of the queue and wait for the result, + * i.e. for everything enqueued to be processed by the server. + * + * @ingroup icingadb + */ +void RedisConnection::Sync() +{ + GetResultOfQuery({"PING"}, RedisConnection::QueryPriority::SyncConnection); +} + +/** + * Get the enqueue time of the oldest still queued Redis query + * + * @return *nix timestamp or 0 + */ +double RedisConnection::GetOldestPendingQueryTs() +{ + auto promise (Shared<std::promise<double>>::Make()); + auto future (promise->get_future()); + + asio::post(m_Strand, [this, promise]() { + double oldest = 0; + + for (auto& queue : m_Queues.Writes) { + if (m_SuppressedQueryKinds.find(queue.first) == m_SuppressedQueryKinds.end() && !queue.second.empty()) { + auto ctime (queue.second.front().CTime); + + if (ctime < oldest || oldest == 0) { + oldest = ctime; + } + } + } + + promise->set_value(oldest); + }); + + future.wait(); + return future.get(); +} + +/** + * Mark kind as kind of queries not to actually send yet + * + * @param kind Query kind + */ +void RedisConnection::SuppressQueryKind(RedisConnection::QueryPriority kind) +{ + asio::post(m_Strand, [this, kind]() { m_SuppressedQueryKinds.emplace(kind); }); +} + +/** + * Unmark kind as kind of queries not to actually send yet + * + * @param kind Query kind + */ +void RedisConnection::UnsuppressQueryKind(RedisConnection::QueryPriority kind) +{ + asio::post(m_Strand, [this, kind]() { + m_SuppressedQueryKinds.erase(kind); + m_QueuedWrites.Set(); + }); +} + +/** + * Try to connect to Redis + */ +void RedisConnection::Connect(asio::yield_context& yc) +{ + Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); }); + + boost::asio::deadline_timer timer (m_Strand.context()); + + auto waitForReadLoop ([this, &yc]() { + while (!m_Queues.FutureResponseActions.empty()) { + IoEngine::YieldCurrentCoroutine(yc); + } + }); + + for (;;) { + try { + if (m_Path.IsEmpty()) { + if (m_TLSContext) { + Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") + << "Trying to connect to Redis server (async, TLS) on host '" << m_Host << ":" << m_Port << "'"; + + auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host)); + auto& tlsConn (conn->next_layer()); + auto connectTimeout (MakeTimeout(conn)); + Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); + + icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc); + tlsConn.async_handshake(tlsConn.client, yc); + + if (!m_Insecure) { + std::shared_ptr<X509> cert (tlsConn.GetPeerCertificate()); + + if (!cert) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "Redis didn't present any TLS certificate." + )); + } + + if (!tlsConn.IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + std::string(tlsConn.GetVerifyError()) + )); + } + } + + Handshake(conn, yc); + waitForReadLoop(); + m_TlsConn = std::move(conn); + } else { + Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") + << "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'"; + + auto conn (Shared<TcpConn>::Make(m_Strand.context())); + auto connectTimeout (MakeTimeout(conn)); + Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); + + icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); + Handshake(conn, yc); + waitForReadLoop(); + m_TcpConn = std::move(conn); + } + } else { + Log(LogInformation, "IcingaDB") + << "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'"; + + auto conn (Shared<UnixConn>::Make(m_Strand.context())); + auto connectTimeout (MakeTimeout(conn)); + Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); + + conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); + Handshake(conn, yc); + waitForReadLoop(); + m_UnixConn = std::move(conn); + } + + m_Connected.store(true); + + Log(m_Parent ? LogNotice : LogInformation, "IcingaDB", "Connected to Redis server"); + + // Operate on a copy so that the callback can set a new callback without destroying itself while running. + auto callback (m_ConnectedCallback); + if (callback) { + callback(yc); + } + + break; + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { + Log(LogCritical, "IcingaDB") + << "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what(); + } + + timer.expires_from_now(boost::posix_time::seconds(5)); + timer.async_wait(yc); + } + +} + +/** + * Actually receive the responses to the Redis queries send by WriteItem() and handle them + */ +void RedisConnection::ReadLoop(asio::yield_context& yc) +{ + for (;;) { + m_QueuedReads.Wait(yc); + + while (!m_Queues.FutureResponseActions.empty()) { + auto item (std::move(m_Queues.FutureResponseActions.front())); + m_Queues.FutureResponseActions.pop(); + + switch (item.Action) { + case ResponseAction::Ignore: + try { + for (auto i (item.Amount); i; --i) { + ReadOne(yc); + } + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { + Log(LogCritical, "IcingaDB") + << "Error during receiving the response to a query which has been fired and forgotten: " << ex.what(); + + continue; + } catch (...) { + Log(LogCritical, "IcingaDB") + << "Error during receiving the response to a query which has been fired and forgotten"; + + continue; + } + + break; + case ResponseAction::Deliver: + for (auto i (item.Amount); i; --i) { + auto promise (std::move(m_Queues.ReplyPromises.front())); + m_Queues.ReplyPromises.pop(); + + Reply reply; + + try { + reply = ReadOne(yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + promise.set_exception(std::current_exception()); + + continue; + } + + promise.set_value(std::move(reply)); + } + + break; + case ResponseAction::DeliverBulk: + { + auto promise (std::move(m_Queues.RepliesPromises.front())); + m_Queues.RepliesPromises.pop(); + + Replies replies; + replies.reserve(item.Amount); + + for (auto i (item.Amount); i; --i) { + try { + replies.emplace_back(ReadOne(yc)); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + promise.set_exception(std::current_exception()); + break; + } + } + + try { + promise.set_value(std::move(replies)); + } catch (const std::future_error&) { + // Complaint about the above op is not allowed + // due to promise.set_exception() was already called + } + } + } + } + + m_QueuedReads.Clear(); + } +} + +/** + * Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}() + */ +void RedisConnection::WriteLoop(asio::yield_context& yc) +{ + for (;;) { + m_QueuedWrites.Wait(yc); + + WriteFirstOfHighestPrio: + for (auto& queue : m_Queues.Writes) { + if (m_SuppressedQueryKinds.find(queue.first) != m_SuppressedQueryKinds.end() || queue.second.empty()) { + continue; + } + + auto next (std::move(queue.second.front())); + queue.second.pop(); + + WriteItem(yc, std::move(next)); + + goto WriteFirstOfHighestPrio; + } + + m_QueuedWrites.Clear(); + } +} + +/** + * Periodically log current query performance + */ +void RedisConnection::LogStats(asio::yield_context& yc) +{ + double lastMessage = 0; + + m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10)); + + for (;;) { + m_LogStatsTimer.async_wait(yc); + m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10)); + + if (!IsConnected()) + continue; + + auto now (Utility::GetTime()); + bool timeoutReached = now - lastMessage >= 5 * 60; + + if (m_PendingQueries < 1 && !timeoutReached) + continue; + + auto output (round(m_OutputQueries.CalculateRate(now, 10))); + + if (m_PendingQueries < output * 5 && !timeoutReached) + continue; + + Log(LogInformation, "IcingaDB") + << "Pending queries: " << m_PendingQueries << " (Input: " + << round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)"; + + lastMessage = now; + } +} + +/** + * Send next and schedule receiving the response + * + * @param next Redis queries + */ +void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next) +{ + if (next.FireAndForgetQuery) { + auto& item (*next.FireAndForgetQuery); + DecreasePendingQueries(1); + + try { + WriteOne(item, yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(item, msg); + msg << " which has been fired and forgotten: " << ex.what(); + + return; + } catch (...) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(item, msg); + msg << " which has been fired and forgotten"; + + return; + } + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); + } else { + ++m_Queues.FutureResponseActions.back().Amount; + } + + m_QueuedReads.Set(); + } + + if (next.FireAndForgetQueries) { + auto& item (*next.FireAndForgetQueries); + size_t i = 0; + + DecreasePendingQueries(item.size()); + + try { + for (auto& query : item) { + WriteOne(query, yc); + ++i; + } + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(item[i], msg); + msg << " which has been fired and forgotten: " << ex.what(); + + return; + } catch (...) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(item[i], msg); + msg << " which has been fired and forgotten"; + + return; + } + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore}); + } else { + m_Queues.FutureResponseActions.back().Amount += item.size(); + } + + m_QueuedReads.Set(); + } + + if (next.GetResultOfQuery) { + auto& item (*next.GetResultOfQuery); + DecreasePendingQueries(1); + + try { + WriteOne(item.first, yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + item.second.set_exception(std::current_exception()); + + return; + } + + m_Queues.ReplyPromises.emplace(std::move(item.second)); + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver}); + } else { + ++m_Queues.FutureResponseActions.back().Amount; + } + + m_QueuedReads.Set(); + } + + if (next.GetResultsOfQueries) { + auto& item (*next.GetResultsOfQueries); + DecreasePendingQueries(item.first.size()); + + try { + for (auto& query : item.first) { + WriteOne(query, yc); + } + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + item.second.set_exception(std::current_exception()); + + return; + } + + m_Queues.RepliesPromises.emplace(std::move(item.second)); + m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk}); + + m_QueuedReads.Set(); + } + + if (next.Callback) { + next.Callback(yc); + } + + RecordAffected(next.Affects, Utility::GetTime()); +} + +/** + * Receive the response to a Redis query + * + * @return The response + */ +RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) +{ + if (m_Path.IsEmpty()) { + if (m_TLSContext) { + return ReadOne(m_TlsConn, yc); + } else { + return ReadOne(m_TcpConn, yc); + } + } else { + return ReadOne(m_UnixConn, yc); + } +} + +/** + * Send query + * + * @param query Redis query + */ +void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) +{ + if (m_Path.IsEmpty()) { + if (m_TLSContext) { + WriteOne(m_TlsConn, query, yc); + } else { + WriteOne(m_TcpConn, query, yc); + } + } else { + WriteOne(m_UnixConn, query, yc); + } +} + +/** + * Specify a callback that is run each time a connection is successfully established + * + * The callback is executed from a Boost.Asio coroutine and should therefore not perform blocking operations. + * + * @param callback Callback to execute + */ +void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) { + m_ConnectedCallback = std::move(callback); +} + +int RedisConnection::GetQueryCount(RingBuffer::SizeType span) +{ + return m_OutputQueries.UpdateAndGetValues(Utility::GetTime(), span); +} + +void RedisConnection::IncreasePendingQueries(int count) +{ + if (m_Parent) { + auto parent (m_Parent); + + asio::post(parent->m_Strand, [parent, count]() { + parent->IncreasePendingQueries(count); + }); + } else { + m_PendingQueries += count; + m_InputQueries.InsertValue(Utility::GetTime(), count); + } +} + +void RedisConnection::DecreasePendingQueries(int count) +{ + if (m_Parent) { + auto parent (m_Parent); + + asio::post(parent->m_Strand, [parent, count]() { + parent->DecreasePendingQueries(count); + }); + } else { + m_PendingQueries -= count; + m_OutputQueries.InsertValue(Utility::GetTime(), count); + } +} + +void RedisConnection::RecordAffected(RedisConnection::QueryAffects affected, double when) +{ + if (m_Parent) { + auto parent (m_Parent); + + asio::post(parent->m_Strand, [parent, affected, when]() { + parent->RecordAffected(affected, when); + }); + } else { + if (affected.Config) { + m_WrittenConfig.InsertValue(when, affected.Config); + } + + if (affected.State) { + m_WrittenState.InsertValue(when, affected.State); + } + + if (affected.History) { + m_WrittenHistory.InsertValue(when, affected.History); + } + } +} diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp new file mode 100644 index 0000000..f346ba2 --- /dev/null +++ b/lib/icingadb/redisconnection.hpp @@ -0,0 +1,678 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef REDISCONNECTION_H +#define REDISCONNECTION_H + +#include "base/array.hpp" +#include "base/atomic.hpp" +#include "base/convert.hpp" +#include "base/io-engine.hpp" +#include "base/object.hpp" +#include "base/ringbuffer.hpp" +#include "base/shared.hpp" +#include "base/string.hpp" +#include "base/tlsstream.hpp" +#include "base/value.hpp" +#include <boost/asio/buffer.hpp> +#include <boost/asio/buffered_stream.hpp> +#include <boost/asio/deadline_timer.hpp> +#include <boost/asio/io_context.hpp> +#include <boost/asio/io_context_strand.hpp> +#include <boost/asio/ip/tcp.hpp> +#include <boost/asio/local/stream_protocol.hpp> +#include <boost/asio/read.hpp> +#include <boost/asio/read_until.hpp> +#include <boost/asio/ssl/context.hpp> +#include <boost/asio/streambuf.hpp> +#include <boost/asio/write.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/regex.hpp> +#include <boost/utility/string_view.hpp> +#include <cstddef> +#include <cstdint> +#include <cstdio> +#include <cstring> +#include <future> +#include <map> +#include <memory> +#include <queue> +#include <set> +#include <stdexcept> +#include <utility> +#include <vector> + +namespace icinga +{ +/** + * An Async Redis connection. + * + * @ingroup icingadb + */ + class RedisConnection final : public Object + { + public: + DECLARE_PTR_TYPEDEFS(RedisConnection); + + typedef std::vector<String> Query; + typedef std::vector<Query> Queries; + typedef Value Reply; + typedef std::vector<Reply> Replies; + + /** + * Redis query priorities, highest first. + * + * @ingroup icingadb + */ + enum class QueryPriority : unsigned char + { + Heartbeat, + RuntimeStateStream, // runtime state updates, doesn't affect initially synced states + Config, // includes initially synced states + RuntimeStateSync, // updates initially synced states at runtime, in parallel to config dump, therefore must be < Config + History, + CheckResult, + SyncConnection = 255 + }; + + struct QueryAffects + { + size_t Config; + size_t State; + size_t History; + + QueryAffects(size_t config = 0, size_t state = 0, size_t history = 0) + : Config(config), State(state), History(history) { } + }; + + RedisConnection(const String& host, int port, const String& path, const String& password, int db, + bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath, + const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const Ptr& parent = nullptr); + + void UpdateTLSContext(); + + void Start(); + + bool IsConnected(); + + void FireAndForgetQuery(Query query, QueryPriority priority, QueryAffects affects = {}); + void FireAndForgetQueries(Queries queries, QueryPriority priority, QueryAffects affects = {}); + + Reply GetResultOfQuery(Query query, QueryPriority priority, QueryAffects affects = {}); + Replies GetResultsOfQueries(Queries queries, QueryPriority priority, QueryAffects affects = {}); + + void EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, QueryPriority priority); + void Sync(); + double GetOldestPendingQueryTs(); + + void SuppressQueryKind(QueryPriority kind); + void UnsuppressQueryKind(QueryPriority kind); + + void SetConnectedCallback(std::function<void(boost::asio::yield_context& yc)> callback); + + inline bool GetConnected() + { + return m_Connected.load(); + } + + int GetQueryCount(RingBuffer::SizeType span); + + inline int GetPendingQueryCount() + { + return m_PendingQueries; + } + + inline int GetWrittenConfigFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime()) + { + return m_WrittenConfig.UpdateAndGetValues(tv, span); + } + + inline int GetWrittenStateFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime()) + { + return m_WrittenState.UpdateAndGetValues(tv, span); + } + + inline int GetWrittenHistoryFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime()) + { + return m_WrittenHistory.UpdateAndGetValues(tv, span); + } + + private: + /** + * What to do with the responses to Redis queries. + * + * @ingroup icingadb + */ + enum class ResponseAction : unsigned char + { + Ignore, // discard + Deliver, // submit to the requestor + DeliverBulk // submit multiple responses to the requestor at once + }; + + /** + * What to do with how many responses to Redis queries. + * + * @ingroup icingadb + */ + struct FutureResponseAction + { + size_t Amount; + ResponseAction Action; + }; + + /** + * Something to be send to Redis. + * + * @ingroup icingadb + */ + struct WriteQueueItem + { + Shared<Query>::Ptr FireAndForgetQuery; + Shared<Queries>::Ptr FireAndForgetQueries; + Shared<std::pair<Query, std::promise<Reply>>>::Ptr GetResultOfQuery; + Shared<std::pair<Queries, std::promise<Replies>>>::Ptr GetResultsOfQueries; + std::function<void(boost::asio::yield_context&)> Callback; + + double CTime; + QueryAffects Affects; + }; + + typedef boost::asio::ip::tcp Tcp; + typedef boost::asio::local::stream_protocol Unix; + + typedef boost::asio::buffered_stream<Tcp::socket> TcpConn; + typedef boost::asio::buffered_stream<Unix::socket> UnixConn; + + Shared<boost::asio::ssl::context>::Ptr m_TLSContext; + + template<class AsyncReadStream> + static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc); + + template<class AsyncReadStream> + static std::vector<char> ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0); + + template<class AsyncWriteStream> + static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); + + static boost::regex m_ErrAuth; + + RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, + int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath, + String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const Ptr& parent); + + void Connect(boost::asio::yield_context& yc); + void ReadLoop(boost::asio::yield_context& yc); + void WriteLoop(boost::asio::yield_context& yc); + void LogStats(boost::asio::yield_context& yc); + void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item); + Reply ReadOne(boost::asio::yield_context& yc); + void WriteOne(Query& query, boost::asio::yield_context& yc); + + template<class StreamPtr> + Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc); + + template<class StreamPtr> + void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc); + + void IncreasePendingQueries(int count); + void DecreasePendingQueries(int count); + void RecordAffected(QueryAffects affected, double when); + + template<class StreamPtr> + void Handshake(StreamPtr& stream, boost::asio::yield_context& yc); + + template<class StreamPtr> + Timeout::Ptr MakeTimeout(StreamPtr& stream); + + String m_Path; + String m_Host; + int m_Port; + String m_Password; + int m_DbIndex; + + String m_CertPath; + String m_KeyPath; + bool m_Insecure; + String m_CaPath; + String m_CrlPath; + String m_TlsProtocolmin; + String m_CipherList; + double m_ConnectTimeout; + DebugInfo m_DebugInfo; + + boost::asio::io_context::strand m_Strand; + Shared<TcpConn>::Ptr m_TcpConn; + Shared<UnixConn>::Ptr m_UnixConn; + Shared<AsioTlsStream>::Ptr m_TlsConn; + Atomic<bool> m_Connecting, m_Connected, m_Started; + + struct { + // Items to be send to Redis + std::map<QueryPriority, std::queue<WriteQueueItem>> Writes; + // Requestors, each waiting for a single response + std::queue<std::promise<Reply>> ReplyPromises; + // Requestors, each waiting for multiple responses at once + std::queue<std::promise<Replies>> RepliesPromises; + // Metadata about all of the above + std::queue<FutureResponseAction> FutureResponseActions; + } m_Queues; + + // Kinds of queries not to actually send yet + std::set<QueryPriority> m_SuppressedQueryKinds; + + // Indicate that there's something to send/receive + AsioConditionVariable m_QueuedWrites, m_QueuedReads; + + std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback; + + // Stats + RingBuffer m_InputQueries{10}; + RingBuffer m_OutputQueries{15 * 60}; + RingBuffer m_WrittenConfig{15 * 60}; + RingBuffer m_WrittenState{15 * 60}; + RingBuffer m_WrittenHistory{15 * 60}; + int m_PendingQueries{0}; + boost::asio::deadline_timer m_LogStatsTimer; + Ptr m_Parent; + }; + +/** + * An error response from the Redis server. + * + * @ingroup icingadb + */ +class RedisError final : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(RedisError); + + inline RedisError(String message) : m_Message(std::move(message)) + { + } + + inline const String& GetMessage() + { + return m_Message; + } + +private: + String m_Message; +}; + +/** + * Thrown if the connection to the Redis server has already been lost. + * + * @ingroup icingadb + */ +class RedisDisconnected : public std::runtime_error +{ +public: + inline RedisDisconnected() : runtime_error("") + { + } +}; + +/** + * Thrown on malformed Redis server responses. + * + * @ingroup icingadb + */ +class RedisProtocolError : public std::runtime_error +{ +protected: + inline RedisProtocolError() : runtime_error("") + { + } +}; + +/** + * Thrown on malformed types in Redis server responses. + * + * @ingroup icingadb + */ +class BadRedisType : public RedisProtocolError +{ +public: + inline BadRedisType(char type) : m_What{type, 0} + { + } + + virtual const char * what() const noexcept override + { + return m_What; + } + +private: + char m_What[2]; +}; + +/** + * Thrown on malformed ints in Redis server responses. + * + * @ingroup icingadb + */ +class BadRedisInt : public RedisProtocolError +{ +public: + inline BadRedisInt(std::vector<char> intStr) : m_What(std::move(intStr)) + { + m_What.emplace_back(0); + } + + virtual const char * what() const noexcept override + { + return m_What.data(); + } + +private: + std::vector<char> m_What; +}; + +/** + * Read a Redis server response from stream + * + * @param stream Redis server connection + * + * @return The response + */ +template<class StreamPtr> +RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + if (!stream) { + throw RedisDisconnected(); + } + + auto strm (stream); + + try { + return ReadRESP(*strm, yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + if (m_Connecting.exchange(false)) { + m_Connected.store(false); + stream = nullptr; + + if (!m_Connecting.exchange(true)) { + Ptr keepAlive (this); + + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); + } + } + + throw; + } +} + +/** + * Write a Redis query to stream + * + * @param stream Redis server connection + * @param query Redis query + */ +template<class StreamPtr> +void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + if (!stream) { + throw RedisDisconnected(); + } + + auto strm (stream); + + try { + WriteRESP(*strm, query, yc); + strm->async_flush(yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + if (m_Connecting.exchange(false)) { + m_Connected.store(false); + stream = nullptr; + + if (!m_Connecting.exchange(true)) { + Ptr keepAlive (this); + + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); + } + } + + throw; + } +} + +/** + * Initialize a Redis stream + * + * @param stream Redis server connection + * @param query Redis query + */ +template<class StreamPtr> +void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) +{ + if (m_Password.IsEmpty() && !m_DbIndex) { + // Trigger NOAUTH + WriteRESP(*strm, {"PING"}, yc); + } else { + if (!m_Password.IsEmpty()) { + WriteRESP(*strm, {"AUTH", m_Password}, yc); + } + + if (m_DbIndex) { + WriteRESP(*strm, {"SELECT", Convert::ToString(m_DbIndex)}, yc); + } + } + + strm->async_flush(yc); + + if (m_Password.IsEmpty() && !m_DbIndex) { + Reply pong (ReadRESP(*strm, yc)); + + if (pong.IsObjectType<RedisError>()) { + // Likely NOAUTH + BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(pong)->GetMessage())); + } + } else { + if (!m_Password.IsEmpty()) { + Reply auth (ReadRESP(*strm, yc)); + + if (auth.IsObjectType<RedisError>()) { + auto& authErr (RedisError::Ptr(auth)->GetMessage().GetData()); + boost::smatch what; + + if (boost::regex_search(authErr, what, m_ErrAuth)) { + Log(LogWarning, "IcingaDB") << authErr; + } else { + // Likely WRONGPASS + BOOST_THROW_EXCEPTION(std::runtime_error(authErr)); + } + } + } + + if (m_DbIndex) { + Reply select (ReadRESP(*strm, yc)); + + if (select.IsObjectType<RedisError>()) { + // Likely NOAUTH or ERR DB + BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(select)->GetMessage())); + } + } + } +} + +/** + * Creates a Timeout which cancels stream's I/O after m_ConnectTimeout + * + * @param stream Redis server connection + */ +template<class StreamPtr> +Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream) +{ + Ptr keepAlive (this); + + return new Timeout( + m_Strand.context(), + m_Strand, + boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)), + [keepAlive, stream](boost::asio::yield_context yc) { + boost::system::error_code ec; + stream->lowest_layer().cancel(ec); + } + ); +} + +/** + * Read a Redis protocol value from stream + * + * @param stream Redis server connection + * + * @return The value + */ +template<class AsyncReadStream> +Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + char type = 0; + asio::async_read(stream, asio::mutable_buffer(&type, 1), yc); + + switch (type) { + case '+': + { + auto buf (ReadLine(stream, yc)); + return String(buf.begin(), buf.end()); + } + case '-': + { + auto buf (ReadLine(stream, yc)); + return new RedisError(String(buf.begin(), buf.end())); + } + case ':': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + return (double)i; + } + case '$': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + if (i < 0) { + return Value(); + } + + buf.clear(); + buf.insert(buf.end(), i, 0); + asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc); + + { + char crlf[2]; + asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc); + } + + return String(buf.begin(), buf.end()); + } + case '*': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + if (i < 0) { + return Empty; + } + + Array::Ptr arr = new Array(); + + arr->Reserve(i); + + for (; i; --i) { + arr->Add(ReadRESP(stream, yc)); + } + + return arr; + } + default: + throw BadRedisType(type); + } +} + +/** + * Read from stream until \r\n + * + * @param stream Redis server connection + * @param hint Expected amount of data + * + * @return Read data ex. \r\n + */ +template<class AsyncReadStream> +std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) +{ + namespace asio = boost::asio; + + std::vector<char> line; + line.reserve(hint); + + char next = 0; + asio::mutable_buffer buf (&next, 1); + + for (;;) { + asio::async_read(stream, buf, yc); + + if (next == '\r') { + asio::async_read(stream, buf, yc); + return line; + } + + line.emplace_back(next); + } +} + +/** + * Write a Redis protocol value to stream + * + * @param stream Redis server connection + * @param query Redis protocol value + */ +template<class AsyncWriteStream> +void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + asio::streambuf writeBuffer; + std::ostream msg(&writeBuffer); + + msg << "*" << query.size() << "\r\n"; + + for (auto& arg : query) { + msg << "$" << arg.GetLength() << "\r\n" << arg << "\r\n"; + } + + asio::async_write(stream, writeBuffer, yc); +} + +} + +#endif //REDISCONNECTION_H |