summaryrefslogtreecommitdiffstats
path: root/lib/perfdata
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 11:32:39 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 11:32:39 +0000
commit56ae875861ab260b80a030f50c4aff9f9dc8fff0 (patch)
tree531412110fc901a5918c7f7442202804a83cada9 /lib/perfdata
parentInitial commit. (diff)
downloadicinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.tar.xz
icinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.zip
Adding upstream version 2.14.2.upstream/2.14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/perfdata')
-rw-r--r--lib/perfdata/CMakeLists.txt74
-rw-r--r--lib/perfdata/elasticsearchwriter.cpp685
-rw-r--r--lib/perfdata/elasticsearchwriter.hpp65
-rw-r--r--lib/perfdata/elasticsearchwriter.ti50
-rw-r--r--lib/perfdata/gelfwriter.cpp535
-rw-r--r--lib/perfdata/gelfwriter.hpp70
-rw-r--r--lib/perfdata/gelfwriter.ti45
-rw-r--r--lib/perfdata/graphitewriter.cpp514
-rw-r--r--lib/perfdata/graphitewriter.hpp69
-rw-r--r--lib/perfdata/graphitewriter.ti38
-rw-r--r--lib/perfdata/influxdb2writer.cpp44
-rw-r--r--lib/perfdata/influxdb2writer.hpp33
-rw-r--r--lib/perfdata/influxdb2writer.ti19
-rw-r--r--lib/perfdata/influxdbcommonwriter.cpp596
-rw-r--r--lib/perfdata/influxdbcommonwriter.hpp101
-rw-r--r--lib/perfdata/influxdbcommonwriter.ti88
-rw-r--r--lib/perfdata/influxdbwriter.cpp56
-rw-r--r--lib/perfdata/influxdbwriter.hpp31
-rw-r--r--lib/perfdata/influxdbwriter.ti35
-rw-r--r--lib/perfdata/opentsdbwriter.cpp525
-rw-r--r--lib/perfdata/opentsdbwriter.hpp62
-rw-r--r--lib/perfdata/opentsdbwriter.ti55
-rw-r--r--lib/perfdata/perfdatawriter.cpp201
-rw-r--r--lib/perfdata/perfdatawriter.hpp53
-rw-r--r--lib/perfdata/perfdatawriter.ti61
25 files changed, 4105 insertions, 0 deletions
diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt
new file mode 100644
index 0000000..168938c
--- /dev/null
+++ b/lib/perfdata/CMakeLists.txt
@@ -0,0 +1,74 @@
+# Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+
+
+mkclass_target(gelfwriter.ti gelfwriter-ti.cpp gelfwriter-ti.hpp)
+mkclass_target(graphitewriter.ti graphitewriter-ti.cpp graphitewriter-ti.hpp)
+mkclass_target(influxdbcommonwriter.ti influxdbcommonwriter-ti.cpp influxdbcommonwriter-ti.hpp)
+mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp)
+mkclass_target(influxdb2writer.ti influxdb2writer-ti.cpp influxdb2writer-ti.hpp)
+mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp)
+mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp)
+mkclass_target(perfdatawriter.ti perfdatawriter-ti.cpp perfdatawriter-ti.hpp)
+
+set(perfdata_SOURCES
+ elasticsearchwriter.cpp elasticsearchwriter.hpp elasticsearchwriter-ti.hpp
+ gelfwriter.cpp gelfwriter.hpp gelfwriter-ti.hpp
+ graphitewriter.cpp graphitewriter.hpp graphitewriter-ti.hpp
+ influxdbcommonwriter.cpp influxdbcommonwriter.hpp influxdbcommonwriter-ti.hpp
+ influxdbwriter.cpp influxdbwriter.hpp influxdbwriter-ti.hpp
+ influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
+ opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
+ perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
+)
+
+if(ICINGA2_UNITY_BUILD)
+ mkunity_target(perfdata perfdata perfdata_SOURCES)
+endif()
+
+add_library(perfdata OBJECT ${perfdata_SOURCES})
+
+add_dependencies(perfdata base config icinga)
+
+set_target_properties (
+ perfdata PROPERTIES
+ FOLDER Components
+)
+
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/gelf.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/graphite.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb2.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearch.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/perfdata.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
+install(CODE "file(MAKE_DIRECTORY \"\$ENV{DESTDIR}${ICINGA2_FULL_SPOOLDIR}/perfdata\")")
+install(CODE "file(MAKE_DIRECTORY \"\$ENV{DESTDIR}${ICINGA2_FULL_SPOOLDIR}/tmp\")")
+
+set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "${CPACK_NSIS_EXTRA_INSTALL_COMMANDS}" PARENT_SCOPE)
diff --git a/lib/perfdata/elasticsearchwriter.cpp b/lib/perfdata/elasticsearchwriter.cpp
new file mode 100644
index 0000000..9fb2aa9
--- /dev/null
+++ b/lib/perfdata/elasticsearchwriter.cpp
@@ -0,0 +1,685 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/elasticsearchwriter.hpp"
+#include "perfdata/elasticsearchwriter-ti.cpp"
+#include "remote/url.hpp"
+#include "icinga/compatutility.hpp"
+#include "icinga/service.hpp"
+#include "icinga/checkcommand.hpp"
+#include "base/application.hpp"
+#include "base/defer.hpp"
+#include "base/io-engine.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/stream.hpp"
+#include "base/base64.hpp"
+#include "base/json.hpp"
+#include "base/utility.hpp"
+#include "base/networkstream.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/exception.hpp"
+#include "base/statsfunction.hpp"
+#include <boost/algorithm/string.hpp>
+#include <boost/asio/ssl/context.hpp>
+#include <boost/beast/core/flat_buffer.hpp>
+#include <boost/beast/http/field.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/parser.hpp>
+#include <boost/beast/http/read.hpp>
+#include <boost/beast/http/status.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <boost/beast/http/verb.hpp>
+#include <boost/beast/http/write.hpp>
+#include <boost/scoped_array.hpp>
+#include <memory>
+#include <string>
+#include <utility>
+
+using namespace icinga;
+
+REGISTER_TYPE(ElasticsearchWriter);
+
+REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
+
+void ElasticsearchWriter::OnConfigLoaded()
+{
+ ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
+
+ m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, "ElasticsearchWriter")
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ DictionaryData nodes;
+
+ for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
+ size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+ nodes.emplace_back(elasticsearchwriter->GetName(), new Dictionary({
+ { "work_queue_items", workQueueItems },
+ { "work_queue_item_rate", workQueueItemRate }
+ }));
+
+ perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+ }
+
+ status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
+}
+
+void ElasticsearchWriter::Resume()
+{
+ ObjectImpl<ElasticsearchWriter>::Resume();
+
+ m_EventPrefix = "icinga2.event.";
+
+ Log(LogInformation, "ElasticsearchWriter")
+ << "'" << GetName() << "' resumed.";
+
+ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
+
+ /* Setup timer for periodically flushing m_DataBuffer */
+ m_FlushTimer = Timer::Create();
+ m_FlushTimer->SetInterval(GetFlushInterval());
+ m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
+ m_FlushTimer->Start();
+ m_FlushTimer->Reschedule(0);
+
+ /* Register for new metrics. */
+ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ });
+ m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
+ StateChangeHandler(checkable, cr, type);
+ });
+ m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification,
+ const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, const NotificationType& type,
+ const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr&) {
+ NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text);
+ });
+}
+
+/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+void ElasticsearchWriter::Pause()
+{
+ m_HandleCheckResults.disconnect();
+ m_HandleStateChanges.disconnect();
+ m_HandleNotifications.disconnect();
+
+ m_FlushTimer->Stop(true);
+ m_WorkQueue.Join();
+
+ {
+ std::unique_lock<std::mutex> lock (m_DataBufferMutex);
+ Flush();
+ }
+
+ Log(LogInformation, "ElasticsearchWriter")
+ << "'" << GetName() << "' paused.";
+
+ ObjectImpl<ElasticsearchWriter>::Pause();
+}
+
+void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ String prefix = "check_result.";
+
+ fields->Set(prefix + "output", cr->GetOutput());
+ fields->Set(prefix + "check_source", cr->GetCheckSource());
+ fields->Set(prefix + "exit_status", cr->GetExitStatus());
+ fields->Set(prefix + "command", cr->GetCommand());
+ fields->Set(prefix + "state", cr->GetState());
+ fields->Set(prefix + "vars_before", cr->GetVarsBefore());
+ fields->Set(prefix + "vars_after", cr->GetVarsAfter());
+
+ fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart()));
+ fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd()));
+ fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart()));
+ fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
+
+ /* Add extra calculated field. */
+ fields->Set(prefix + "latency", cr->CalculateLatency());
+ fields->Set(prefix + "execution_time", cr->CalculateExecutionTime());
+
+ if (!GetEnableSendPerfdata())
+ return;
+
+ Array::Ptr perfdata = cr->GetPerformanceData();
+
+ CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
+
+ if (perfdata) {
+ ObjectLock olock(perfdata);
+ for (const Value& val : perfdata) {
+ PerfdataValue::Ptr pdv;
+
+ if (val.IsObjectType<PerfdataValue>())
+ pdv = val;
+ else {
+ try {
+ pdv = PerfdataValue::Parse(val);
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchWriter")
+ << "Ignoring invalid perfdata for checkable '"
+ << checkable->GetName() << "' and command '"
+ << checkCommand->GetName() << "' with value: " << val;
+ continue;
+ }
+ }
+
+ String escapedKey = pdv->GetLabel();
+ boost::replace_all(escapedKey, " ", "_");
+ boost::replace_all(escapedKey, ".", "_");
+ boost::replace_all(escapedKey, "\\", "_");
+ boost::algorithm::replace_all(escapedKey, "::", ".");
+
+ String perfdataPrefix = prefix + "perfdata." + escapedKey;
+
+ fields->Set(perfdataPrefix + ".value", pdv->GetValue());
+
+ if (!pdv->GetMin().IsEmpty())
+ fields->Set(perfdataPrefix + ".min", pdv->GetMin());
+ if (!pdv->GetMax().IsEmpty())
+ fields->Set(perfdataPrefix + ".max", pdv->GetMax());
+ if (!pdv->GetWarn().IsEmpty())
+ fields->Set(perfdataPrefix + ".warn", pdv->GetWarn());
+ if (!pdv->GetCrit().IsEmpty())
+ fields->Set(perfdataPrefix + ".crit", pdv->GetCrit());
+
+ if (!pdv->GetUnit().IsEmpty())
+ fields->Set(perfdataPrefix + ".unit", pdv->GetUnit());
+ }
+ }
+}
+
+void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr]() { InternalCheckResultHandler(checkable, cr); });
+}
+
+void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
+
+ if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
+ return;
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ if (service) {
+ fields->Set("service", service->GetShortName());
+ fields->Set("state", service->GetState());
+ fields->Set("last_state", service->GetLastState());
+ fields->Set("last_hard_state", service->GetLastHardState());
+ } else {
+ fields->Set("state", host->GetState());
+ fields->Set("last_state", host->GetLastState());
+ fields->Set("last_hard_state", host->GetLastHardState());
+ }
+
+ fields->Set("host", host->GetName());
+ fields->Set("state_type", checkable->GetStateType());
+
+ fields->Set("current_check_attempt", checkable->GetCheckAttempt());
+ fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
+
+ fields->Set("reachable", checkable->IsReachable());
+
+ CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+ if (commandObj)
+ fields->Set("check_command", commandObj->GetName());
+
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ AddCheckResult(fields, checkable, cr);
+ ts = cr->GetExecutionEnd();
+ }
+
+ Enqueue(checkable, "checkresult", fields, ts);
+}
+
+void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
+}
+
+void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ fields->Set("current_check_attempt", checkable->GetCheckAttempt());
+ fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
+ fields->Set("host", host->GetName());
+
+ if (service) {
+ fields->Set("service", service->GetShortName());
+ fields->Set("state", service->GetState());
+ fields->Set("last_state", service->GetLastState());
+ fields->Set("last_hard_state", service->GetLastHardState());
+ } else {
+ fields->Set("state", host->GetState());
+ fields->Set("last_state", host->GetLastState());
+ fields->Set("last_hard_state", host->GetLastHardState());
+ }
+
+ CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+ if (commandObj)
+ fields->Set("check_command", commandObj->GetName());
+
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ AddCheckResult(fields, checkable, cr);
+ ts = cr->GetExecutionEnd();
+ }
+
+ Enqueue(checkable, "statechange", fields, ts);
+}
+
+void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
+ const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
+ const CheckResult::Ptr& cr, const String& author, const String& text)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, notification, checkable, users, type, cr, author, text]() {
+ NotificationSentToAllUsersHandlerInternal(notification, checkable, users, type, cr, author, text);
+ });
+}
+
+void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
+ const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
+ const CheckResult::Ptr& cr, const String& author, const String& text)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
+
+ Log(LogDebug, "ElasticsearchWriter")
+ << "Processing notification for '" << checkable->GetName() << "'";
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ String notificationTypeString = Notification::NotificationTypeToStringCompat(type); //TODO: Change that to our own types.
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ if (service) {
+ fields->Set("service", service->GetShortName());
+ fields->Set("state", service->GetState());
+ fields->Set("last_state", service->GetLastState());
+ fields->Set("last_hard_state", service->GetLastHardState());
+ } else {
+ fields->Set("state", host->GetState());
+ fields->Set("last_state", host->GetLastState());
+ fields->Set("last_hard_state", host->GetLastHardState());
+ }
+
+ fields->Set("host", host->GetName());
+
+ ArrayData userNames;
+
+ for (const User::Ptr& user : users) {
+ userNames.push_back(user->GetName());
+ }
+
+ fields->Set("users", new Array(std::move(userNames)));
+ fields->Set("notification_type", notificationTypeString);
+ fields->Set("author", author);
+ fields->Set("text", text);
+
+ CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+ if (commandObj)
+ fields->Set("check_command", commandObj->GetName());
+
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ AddCheckResult(fields, checkable, cr);
+ ts = cr->GetExecutionEnd();
+ }
+
+ Enqueue(checkable, "notification", fields, ts);
+}
+
+void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
+ const Dictionary::Ptr& fields, double ts)
+{
+ /* Atomically buffer the data point. */
+ std::unique_lock<std::mutex> lock(m_DataBufferMutex);
+
+ /* Format the timestamps to dynamically select the date datatype inside the index. */
+ fields->Set("@timestamp", FormatTimestamp(ts));
+ fields->Set("timestamp", FormatTimestamp(ts));
+
+ String eventType = m_EventPrefix + type;
+ fields->Set("type", eventType);
+
+ /* Every payload needs a line describing the index.
+ * We do it this way to avoid problems with a near full queue.
+ */
+ String indexBody = "{\"index\": {} }\n";
+ String fieldsBody = JsonEncode(fields);
+
+ Log(LogDebug, "ElasticsearchWriter")
+ << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << fieldsBody << "'.";
+
+ m_DataBuffer.emplace_back(indexBody + fieldsBody);
+
+ /* Flush if we've buffered too much to prevent excessive memory use. */
+ if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
+ Log(LogDebug, "ElasticsearchWriter")
+ << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
+ Flush();
+ }
+}
+
+void ElasticsearchWriter::FlushTimeout()
+{
+ /* Prevent new data points from being added to the array, there is a
+ * race condition where they could disappear.
+ */
+ std::unique_lock<std::mutex> lock(m_DataBufferMutex);
+
+ /* Flush if there are any data available. */
+ if (m_DataBuffer.size() > 0) {
+ Log(LogDebug, "ElasticsearchWriter")
+ << "Timer expired writing " << m_DataBuffer.size() << " data points";
+ Flush();
+ }
+}
+
+void ElasticsearchWriter::Flush()
+{
+ /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
+ if (m_DataBuffer.empty())
+ return;
+
+ /* Ensure you hold a lock against m_DataBuffer so that things
+ * don't go missing after creating the body and clearing the buffer.
+ */
+ String body = boost::algorithm::join(m_DataBuffer, "\n");
+ m_DataBuffer.clear();
+
+ /* Elasticsearch 6.x requires a new line. This is compatible to 5.x.
+ * Tested with 6.0.0 and 5.6.4.
+ */
+ body += "\n";
+
+ SendRequest(body);
+}
+
+void ElasticsearchWriter::SendRequest(const String& body)
+{
+ namespace beast = boost::beast;
+ namespace http = beast::http;
+
+ Url::Ptr url = new Url();
+
+ url->SetScheme(GetEnableTls() ? "https" : "http");
+ url->SetHost(GetHost());
+ url->SetPort(GetPort());
+
+ std::vector<String> path;
+
+ /* Specify the index path. Best practice is a daily rotation.
+ * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1
+ */
+ path.emplace_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime()));
+
+ /* Use the bulk message format. */
+ path.emplace_back("_bulk");
+
+ url->SetPath(path);
+
+ OptionalTlsStream stream;
+
+ try {
+ stream = Connect();
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "ElasticsearchWriter")
+ << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
+ return;
+ }
+
+ Defer s ([&stream]() {
+ if (stream.first) {
+ stream.first->next_layer().shutdown();
+ }
+ });
+
+ http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
+
+ request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
+ request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
+
+ /* Specify required headers by Elasticsearch. */
+ request.set(http::field::accept, "application/json");
+
+ /* Use application/x-ndjson for bulk streams. While ES
+ * is able to handle application/json, the newline separator
+ * causes problems with Logstash (#6609).
+ */
+ request.set(http::field::content_type, "application/x-ndjson");
+
+ /* Send authentication if configured. */
+ String username = GetUsername();
+ String password = GetPassword();
+
+ if (!username.IsEmpty() && !password.IsEmpty())
+ request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password));
+
+ request.body() = body;
+ request.content_length(request.body().size());
+
+ /* Don't log the request body to debug log, this is already done above. */
+ Log(LogDebug, "ElasticsearchWriter")
+ << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
+ << " to '" << url->Format() << "'.";
+
+ try {
+ if (stream.first) {
+ http::write(*stream.first, request);
+ stream.first->flush();
+ } else {
+ http::write(*stream.second, request);
+ stream.second->flush();
+ }
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchWriter")
+ << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ http::parser<false, http::string_body> parser;
+ beast::flat_buffer buf;
+
+ try {
+ if (stream.first) {
+ http::read(*stream.first, buf, parser);
+ } else {
+ http::read(*stream.second, buf, parser);
+ }
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "ElasticsearchWriter")
+ << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
+ throw;
+ }
+
+ auto& response (parser.get());
+
+ if (response.result_int() > 299) {
+ if (response.result() == http::status::unauthorized) {
+ /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
+ if (!username.IsEmpty() && !password.IsEmpty()) {
+ Log(LogCritical, "ElasticsearchWriter")
+ << "401 Unauthorized. Please ensure that the user '" << username
+ << "' is able to authenticate against the HTTP API/Proxy.";
+ } else {
+ Log(LogCritical, "ElasticsearchWriter")
+ << "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
+ }
+
+ return;
+ }
+
+ std::ostringstream msgbuf;
+ msgbuf << "Unexpected response code " << response.result_int() << " from URL '" << url->Format() << "'";
+
+ auto& contentType (response[http::field::content_type]);
+
+ if (contentType != "application/json" && contentType != "application/json; charset=utf-8") {
+ msgbuf << "; Unexpected Content-Type: '" << contentType << "'";
+ }
+
+ auto& body (response.body());
+
+#ifdef I2_DEBUG
+ msgbuf << "; Response body: '" << body << "'";
+#endif /* I2_DEBUG */
+
+ Dictionary::Ptr jsonResponse;
+
+ try {
+ jsonResponse = JsonDecode(body);
+ } catch (...) {
+ Log(LogWarning, "ElasticsearchWriter")
+ << "Unable to parse JSON response:\n" << body;
+ return;
+ }
+
+ String error = jsonResponse->Get("error");
+
+ Log(LogCritical, "ElasticsearchWriter")
+ << "Error: '" << error << "'. " << msgbuf.str();
+ }
+}
+
+OptionalTlsStream ElasticsearchWriter::Connect()
+{
+ Log(LogNotice, "ElasticsearchWriter")
+ << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ OptionalTlsStream stream;
+ bool tls = GetEnableTls();
+
+ if (tls) {
+ Shared<boost::asio::ssl::context>::Ptr sslContext;
+
+ try {
+ sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchWriter")
+ << "Unable to create SSL context.";
+ throw;
+ }
+
+ stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
+
+ } else {
+ stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
+ }
+
+ try {
+ icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchWriter")
+ << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ if (tls) {
+ auto& tlsStream (stream.first->next_layer());
+
+ try {
+ tlsStream.handshake(tlsStream.client);
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchWriter")
+ << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
+ throw;
+ }
+
+ if (!GetInsecureNoverify()) {
+ if (!tlsStream.GetPeerCertificate()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
+ }
+
+ if (!tlsStream.IsVerifyOK()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error(
+ "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
+ ));
+ }
+ }
+ }
+
+ return stream;
+}
+
+void ElasticsearchWriter::AssertOnWorkQueue()
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
+
+ Log(LogDebug, "ElasticsearchWriter")
+ << "Exception during Elasticsearch operation: " << DiagnosticInformation(std::move(exp));
+}
+
+String ElasticsearchWriter::FormatTimestamp(double ts)
+{
+ /* The date format must match the default dynamic date detection
+ * pattern in indexes. This enables applications like Kibana to
+ * detect a qualified timestamp index for time-series data.
+ *
+ * Example: 2017-09-11T10:56:21.463+0200
+ *
+ * References:
+ * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection
+ * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
+ * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html
+ */
+ auto milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000);
+
+ return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts);
+}
diff --git a/lib/perfdata/elasticsearchwriter.hpp b/lib/perfdata/elasticsearchwriter.hpp
new file mode 100644
index 0000000..a988094
--- /dev/null
+++ b/lib/perfdata/elasticsearchwriter.hpp
@@ -0,0 +1,65 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef ELASTICSEARCHWRITER_H
+#define ELASTICSEARCHWRITER_H
+
+#include "perfdata/elasticsearchwriter-ti.hpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/workqueue.hpp"
+#include "base/timer.hpp"
+#include "base/tlsstream.hpp"
+
+namespace icinga
+{
+
+class ElasticsearchWriter final : public ObjectImpl<ElasticsearchWriter>
+{
+public:
+ DECLARE_OBJECT(ElasticsearchWriter);
+ DECLARE_OBJECTNAME(ElasticsearchWriter);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+ static String FormatTimestamp(double ts);
+
+protected:
+ void OnConfigLoaded() override;
+ void Resume() override;
+ void Pause() override;
+
+private:
+ String m_EventPrefix;
+ WorkQueue m_WorkQueue{10000000, 1};
+ boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications;
+ Timer::Ptr m_FlushTimer;
+ std::vector<String> m_DataBuffer;
+ std::mutex m_DataBufferMutex;
+
+ void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+
+ void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+ void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+ void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
+ const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
+ const CheckResult::Ptr& cr, const String& author, const String& text);
+ void NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
+ const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
+ const CheckResult::Ptr& cr, const String& author, const String& text);
+
+ void Enqueue(const Checkable::Ptr& checkable, const String& type,
+ const Dictionary::Ptr& fields, double ts);
+
+ OptionalTlsStream Connect();
+ void AssertOnWorkQueue();
+ void ExceptionHandler(boost::exception_ptr exp);
+ void FlushTimeout();
+ void Flush();
+ void SendRequest(const String& body);
+};
+
+}
+
+#endif /* ELASTICSEARCHWRITER_H */
diff --git a/lib/perfdata/elasticsearchwriter.ti b/lib/perfdata/elasticsearchwriter.ti
new file mode 100644
index 0000000..e3b8e27
--- /dev/null
+++ b/lib/perfdata/elasticsearchwriter.ti
@@ -0,0 +1,50 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class ElasticsearchWriter : ConfigObject
+{
+ activation_priority 100;
+
+ [config, required] String host {
+ default {{{ return "127.0.0.1"; }}}
+ };
+ [config, required] String port {
+ default {{{ return "9200"; }}}
+ };
+ [config, required] String index {
+ default {{{ return "icinga2"; }}}
+ };
+ [config] bool enable_send_perfdata {
+ default {{{ return false; }}}
+ };
+ [config] String username;
+ [config, no_user_view, no_user_modify] String password;
+
+ [config] bool enable_tls {
+ default {{{ return false; }}}
+ };
+ [config] bool insecure_noverify {
+ default {{{ return false; }}}
+ };
+ [config] String ca_path;
+ [config] String cert_path;
+ [config] String key_path;
+
+ [config] int flush_interval {
+ default {{{ return 10; }}}
+ };
+ [config] int flush_threshold {
+ default {{{ return 1024; }}}
+ };
+ [config] bool enable_ha {
+ default {{{ return false; }}}
+ };
+};
+
+}
diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp
new file mode 100644
index 0000000..c5b2bbd
--- /dev/null
+++ b/lib/perfdata/gelfwriter.cpp
@@ -0,0 +1,535 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/gelfwriter.hpp"
+#include "perfdata/gelfwriter-ti.cpp"
+#include "icinga/service.hpp"
+#include "icinga/notification.hpp"
+#include "icinga/checkcommand.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/compatutility.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/configtype.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/utility.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/application.hpp"
+#include "base/stream.hpp"
+#include "base/networkstream.hpp"
+#include "base/context.hpp"
+#include "base/exception.hpp"
+#include "base/json.hpp"
+#include "base/statsfunction.hpp"
+#include <boost/algorithm/string/replace.hpp>
+#include <utility>
+#include "base/io-engine.hpp"
+#include <boost/asio/write.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/system/error_code.hpp>
+#include <boost/asio/error.hpp>
+
+using namespace icinga;
+
+REGISTER_TYPE(GelfWriter);
+
+REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
+
+void GelfWriter::OnConfigLoaded()
+{
+ ObjectImpl<GelfWriter>::OnConfigLoaded();
+
+ m_WorkQueue.SetName("GelfWriter, " + GetName());
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, "GelfWriter")
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ DictionaryData nodes;
+
+ for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
+ size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+ nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
+ { "work_queue_items", workQueueItems },
+ { "work_queue_item_rate", workQueueItemRate },
+ { "connected", gelfwriter->GetConnected() },
+ { "source", gelfwriter->GetSource() }
+ }));
+
+ perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+ }
+
+ status->Set("gelfwriter", new Dictionary(std::move(nodes)));
+}
+
+void GelfWriter::Resume()
+{
+ ObjectImpl<GelfWriter>::Resume();
+
+ Log(LogInformation, "GelfWriter")
+ << "'" << GetName() << "' resumed.";
+
+ /* Register exception handler for WQ tasks. */
+ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
+
+ /* Timer for reconnecting */
+ m_ReconnectTimer = Timer::Create();
+ m_ReconnectTimer->SetInterval(10);
+ m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
+ m_ReconnectTimer->Start();
+ m_ReconnectTimer->Reschedule(0);
+
+ /* Register event handlers. */
+ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ });
+ m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification,
+ const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr,
+ const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
+ NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
+ });
+ m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
+ StateChangeHandler(checkable, cr, type);
+ });
+}
+
+/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+void GelfWriter::Pause()
+{
+ m_HandleCheckResults.disconnect();
+ m_HandleNotifications.disconnect();
+ m_HandleStateChanges.disconnect();
+
+ m_ReconnectTimer->Stop(true);
+
+ m_WorkQueue.Enqueue([this]() {
+ try {
+ ReconnectInternal();
+ } catch (const std::exception&) {
+ Log(LogInformation, "GelfWriter")
+ << "Unable to connect, not flushing buffers. Data may be lost.";
+ }
+ }, PriorityImmediate);
+
+ m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow);
+ m_WorkQueue.Join();
+
+ Log(LogInformation, "GelfWriter")
+ << "'" << GetName() << "' paused.";
+
+ ObjectImpl<GelfWriter>::Pause();
+}
+
+void GelfWriter::AssertOnWorkQueue()
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
+ Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
+
+ DisconnectInternal();
+}
+
+void GelfWriter::Reconnect()
+{
+ AssertOnWorkQueue();
+
+ if (IsPaused()) {
+ SetConnected(false);
+ return;
+ }
+
+ ReconnectInternal();
+}
+
+void GelfWriter::ReconnectInternal()
+{
+ double startTime = Utility::GetTime();
+
+ CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'");
+
+ SetShouldConnect(true);
+
+ if (GetConnected())
+ return;
+
+ Log(LogNotice, "GelfWriter")
+ << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ bool ssl = GetEnableTls();
+
+ if (ssl) {
+ Shared<boost::asio::ssl::context>::Ptr sslContext;
+
+ try {
+ sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "GelfWriter")
+ << "Unable to create SSL context.";
+ throw;
+ }
+
+ m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
+
+ } else {
+ m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
+ }
+
+ try {
+ icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "GelfWriter")
+ << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
+ throw;
+ }
+
+ if (ssl) {
+ auto& tlsStream (m_Stream.first->next_layer());
+
+ try {
+ tlsStream.handshake(tlsStream.client);
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "GelfWriter")
+ << "TLS handshake with host '" << GetHost() << " failed.'";
+ throw;
+ }
+
+ if (!GetInsecureNoverify()) {
+ if (!tlsStream.GetPeerCertificate()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
+ }
+
+ if (!tlsStream.IsVerifyOK()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error(
+ "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
+ ));
+ }
+ }
+ }
+
+ SetConnected(true);
+
+ Log(LogInformation, "GelfWriter")
+ << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+}
+
+void GelfWriter::ReconnectTimerHandler()
+{
+ m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
+}
+
+void GelfWriter::Disconnect()
+{
+ AssertOnWorkQueue();
+
+ DisconnectInternal();
+}
+
+void GelfWriter::DisconnectInternal()
+{
+ if (!GetConnected())
+ return;
+
+ if (m_Stream.first) {
+ boost::system::error_code ec;
+ m_Stream.first->next_layer().shutdown(ec);
+
+ // https://stackoverflow.com/a/25703699
+ // As long as the error code's category is not an SSL category, then the protocol was securely shutdown
+ if (ec.category() == boost::asio::error::get_ssl_category()) {
+ Log(LogCritical, "GelfWriter")
+ << "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
+ }
+ } else if (m_Stream.second) {
+ m_Stream.second->close();
+ }
+
+ SetConnected(false);
+
+}
+
+void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
+}
+
+void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
+
+ Log(LogDebug, "GelfWriter")
+ << "Processing check result for '" << checkable->GetName() << "'";
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ if (service) {
+ fields->Set("_service_name", service->GetShortName());
+ fields->Set("_service_state", Service::StateToString(service->GetState()));
+ fields->Set("_last_state", service->GetLastState());
+ fields->Set("_last_hard_state", service->GetLastHardState());
+ } else {
+ fields->Set("_last_state", host->GetLastState());
+ fields->Set("_last_hard_state", host->GetLastHardState());
+ }
+
+ fields->Set("_hostname", host->GetName());
+ fields->Set("_type", "CHECK RESULT");
+ fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+
+ fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
+ fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
+
+ fields->Set("_reachable", checkable->IsReachable());
+
+ CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
+
+ if (checkCommand)
+ fields->Set("_check_command", checkCommand->GetName());
+
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ fields->Set("_latency", cr->CalculateLatency());
+ fields->Set("_execution_time", cr->CalculateExecutionTime());
+ fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
+ fields->Set("full_message", cr->GetOutput());
+ fields->Set("_check_source", cr->GetCheckSource());
+ ts = cr->GetExecutionEnd();
+ }
+
+ if (cr && GetEnableSendPerfdata()) {
+ Array::Ptr perfdata = cr->GetPerformanceData();
+
+ if (perfdata) {
+ ObjectLock olock(perfdata);
+ for (const Value& val : perfdata) {
+ PerfdataValue::Ptr pdv;
+
+ if (val.IsObjectType<PerfdataValue>())
+ pdv = val;
+ else {
+ try {
+ pdv = PerfdataValue::Parse(val);
+ } catch (const std::exception&) {
+ Log(LogWarning, "GelfWriter")
+ << "Ignoring invalid perfdata for checkable '"
+ << checkable->GetName() << "' and command '"
+ << checkCommand->GetName() << "' with value: " << val;
+ continue;
+ }
+ }
+
+ String escaped_key = pdv->GetLabel();
+ boost::replace_all(escaped_key, " ", "_");
+ boost::replace_all(escaped_key, ".", "_");
+ boost::replace_all(escaped_key, "\\", "_");
+ boost::algorithm::replace_all(escaped_key, "::", ".");
+
+ fields->Set("_" + escaped_key, pdv->GetValue());
+
+ if (!pdv->GetMin().IsEmpty())
+ fields->Set("_" + escaped_key + "_min", pdv->GetMin());
+ if (!pdv->GetMax().IsEmpty())
+ fields->Set("_" + escaped_key + "_max", pdv->GetMax());
+ if (!pdv->GetWarn().IsEmpty())
+ fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
+ if (!pdv->GetCrit().IsEmpty())
+ fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
+
+ if (!pdv->GetUnit().IsEmpty())
+ fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
+ }
+ }
+ }
+
+ SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
+}
+
+void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+ const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
+ const String& author, const String& commentText, const String& commandName)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() {
+ NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName);
+ });
+}
+
+void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+ const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
+ const String& author, const String& commentText, const String& commandName)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
+
+ Log(LogDebug, "GelfWriter")
+ << "Processing notification for '" << checkable->GetName() << "'";
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types.
+
+ String authorComment = "";
+
+ if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
+ authorComment = author + ";" + commentText;
+ }
+
+ String output;
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ output = CompatUtility::GetCheckResultOutput(cr);
+ ts = cr->GetExecutionEnd();
+ }
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ if (service) {
+ fields->Set("_type", "SERVICE NOTIFICATION");
+ //TODO: fix this to _service_name
+ fields->Set("_service", service->GetShortName());
+ fields->Set("short_message", output);
+ } else {
+ fields->Set("_type", "HOST NOTIFICATION");
+ fields->Set("short_message", output);
+ }
+
+ fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+
+ fields->Set("_hostname", host->GetName());
+ fields->Set("_command", commandName);
+ fields->Set("_notification_type", notificationTypeString);
+ fields->Set("_comment", authorComment);
+
+ CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+ if (commandObj)
+ fields->Set("_check_command", commandObj->GetName());
+
+ SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
+}
+
+void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
+}
+
+void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
+
+ Log(LogDebug, "GelfWriter")
+ << "Processing state change for '" << checkable->GetName() << "'";
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+ fields->Set("_type", "STATE CHANGE");
+ fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
+ fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
+ fields->Set("_hostname", host->GetName());
+
+ if (service) {
+ fields->Set("_service_name", service->GetShortName());
+ fields->Set("_service_state", Service::StateToString(service->GetState()));
+ fields->Set("_last_state", service->GetLastState());
+ fields->Set("_last_hard_state", service->GetLastHardState());
+ } else {
+ fields->Set("_last_state", host->GetLastState());
+ fields->Set("_last_hard_state", host->GetLastHardState());
+ }
+
+ CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+ if (commandObj)
+ fields->Set("_check_command", commandObj->GetName());
+
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
+ fields->Set("full_message", cr->GetOutput());
+ fields->Set("_check_source", cr->GetCheckSource());
+ ts = cr->GetExecutionEnd();
+ }
+
+ SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
+}
+
+String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
+{
+ fields->Set("version", "1.1");
+ fields->Set("host", source);
+ fields->Set("timestamp", ts);
+
+ return JsonEncode(fields);
+}
+
+void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
+{
+ std::ostringstream msgbuf;
+ msgbuf << gelfMessage;
+ msgbuf << '\0';
+
+ String log = msgbuf.str();
+
+ if (!GetConnected())
+ return;
+
+ try {
+ Log(LogDebug, "GelfWriter")
+ << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
+
+ if (m_Stream.first) {
+ boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
+ m_Stream.first->flush();
+ } else {
+ boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
+ m_Stream.second->flush();
+ }
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "GelfWriter")
+ << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ throw ex;
+ }
+}
diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp
new file mode 100644
index 0000000..ce9ee35
--- /dev/null
+++ b/lib/perfdata/gelfwriter.hpp
@@ -0,0 +1,70 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef GELFWRITER_H
+#define GELFWRITER_H
+
+#include "perfdata/gelfwriter-ti.hpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/timer.hpp"
+#include "base/workqueue.hpp"
+#include <fstream>
+
+namespace icinga
+{
+
+/**
+ * An Icinga Gelf writer for Graylog.
+ *
+ * @ingroup perfdata
+ */
+class GelfWriter final : public ObjectImpl<GelfWriter>
+{
+public:
+ DECLARE_OBJECT(GelfWriter);
+ DECLARE_OBJECTNAME(GelfWriter);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+protected:
+ void OnConfigLoaded() override;
+ void Resume() override;
+ void Pause() override;
+
+private:
+ OptionalTlsStream m_Stream;
+ WorkQueue m_WorkQueue{10000000, 1};
+
+ boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges;
+ Timer::Ptr m_ReconnectTimer;
+
+ void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+ const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr,
+ const String& author, const String& commentText, const String& commandName);
+ void NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+ const User::Ptr& user, NotificationType notification_type, const CheckResult::Ptr& cr,
+ const String& author, const String& comment_text, const String& command_name);
+ void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+ void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+
+ String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
+ void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);
+
+ void ReconnectTimerHandler();
+
+ void Disconnect();
+ void DisconnectInternal();
+ void Reconnect();
+ void ReconnectInternal();
+
+ void AssertOnWorkQueue();
+
+ void ExceptionHandler(boost::exception_ptr exp);
+};
+
+}
+
+#endif /* GELFWRITER_H */
diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti
new file mode 100644
index 0000000..387ee14
--- /dev/null
+++ b/lib/perfdata/gelfwriter.ti
@@ -0,0 +1,45 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class GelfWriter : ConfigObject
+{
+ activation_priority 100;
+
+ [config] String host {
+ default {{{ return "127.0.0.1"; }}}
+ };
+ [config] String port {
+ default {{{ return "12201"; }}}
+ };
+ [config] String source {
+ default {{{ return "icinga2"; }}}
+ };
+ [config] bool enable_send_perfdata {
+ default {{{ return false; }}}
+ };
+
+ [no_user_modify] bool connected;
+ [no_user_modify] bool should_connect {
+ default {{{ return true; }}}
+ };
+ [config] bool enable_ha {
+ default {{{ return false; }}}
+ };
+ [config] bool enable_tls {
+ default {{{ return false; }}}
+ };
+ [config] bool insecure_noverify {
+ default {{{ return false; }}}
+ };
+ [config] String ca_path;
+ [config] String cert_path;
+ [config] String key_path;
+};
+
+}
diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp
new file mode 100644
index 0000000..6adae02
--- /dev/null
+++ b/lib/perfdata/graphitewriter.cpp
@@ -0,0 +1,514 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/graphitewriter.hpp"
+#include "perfdata/graphitewriter-ti.cpp"
+#include "icinga/service.hpp"
+#include "icinga/checkcommand.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/icingaapplication.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/configtype.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/convert.hpp"
+#include "base/utility.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/application.hpp"
+#include "base/stream.hpp"
+#include "base/networkstream.hpp"
+#include "base/exception.hpp"
+#include "base/statsfunction.hpp"
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/replace.hpp>
+#include <utility>
+
+using namespace icinga;
+
+REGISTER_TYPE(GraphiteWriter);
+
+REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
+
+/*
+ * Enable HA capabilities once the config object is loaded.
+ */
+void GraphiteWriter::OnConfigLoaded()
+{
+ ObjectImpl<GraphiteWriter>::OnConfigLoaded();
+
+ m_WorkQueue.SetName("GraphiteWriter, " + GetName());
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, "GraphiteWriter")
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+/**
+ * Feature stats interface
+ *
+ * @param status Key value pairs for feature stats
+ * @param perfdata Array of PerfdataValue objects
+ */
+void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ DictionaryData nodes;
+
+ for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
+ size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+ nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
+ { "work_queue_items", workQueueItems },
+ { "work_queue_item_rate", workQueueItemRate },
+ { "connected", graphitewriter->GetConnected() }
+ }));
+
+ perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+ }
+
+ status->Set("graphitewriter", new Dictionary(std::move(nodes)));
+}
+
+/**
+ * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
+ */
+void GraphiteWriter::Resume()
+{
+ ObjectImpl<GraphiteWriter>::Resume();
+
+ Log(LogInformation, "GraphiteWriter")
+ << "'" << GetName() << "' resumed.";
+
+ /* Register exception handler for WQ tasks. */
+ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
+
+ /* Timer for reconnecting */
+ m_ReconnectTimer = Timer::Create();
+ m_ReconnectTimer->SetInterval(10);
+ m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
+ m_ReconnectTimer->Start();
+ m_ReconnectTimer->Reschedule(0);
+
+ /* Register event handlers. */
+ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ });
+}
+
+/**
+ * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
+ */
+void GraphiteWriter::Pause()
+{
+ m_HandleCheckResults.disconnect();
+ m_ReconnectTimer->Stop(true);
+
+ try {
+ ReconnectInternal();
+ } catch (const std::exception&) {
+ Log(LogInformation, "GraphiteWriter")
+ << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
+
+ ObjectImpl<GraphiteWriter>::Pause();
+ return;
+ }
+
+ m_WorkQueue.Join();
+ DisconnectInternal();
+
+ Log(LogInformation, "GraphiteWriter")
+ << "'" << GetName() << "' paused.";
+
+ ObjectImpl<GraphiteWriter>::Pause();
+}
+
+/**
+ * Check if method is called inside the WQ thread.
+ */
+void GraphiteWriter::AssertOnWorkQueue()
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+/**
+ * Exception handler for the WQ.
+ *
+ * Closes the connection if connected.
+ *
+ * @param exp Exception pointer
+ */
+void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
+
+ Log(LogDebug, "GraphiteWriter")
+ << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
+
+ if (GetConnected()) {
+ m_Stream->close();
+
+ SetConnected(false);
+ }
+}
+
+/**
+ * Reconnect method, stops when the feature is paused in HA zones.
+ *
+ * Called inside the WQ.
+ */
+void GraphiteWriter::Reconnect()
+{
+ AssertOnWorkQueue();
+
+ if (IsPaused()) {
+ SetConnected(false);
+ return;
+ }
+
+ ReconnectInternal();
+}
+
+/**
+ * Reconnect method, connects to a TCP Stream
+ */
+void GraphiteWriter::ReconnectInternal()
+{
+ double startTime = Utility::GetTime();
+
+ CONTEXT("Reconnecting to Graphite '" << GetName() << "'");
+
+ SetShouldConnect(true);
+
+ if (GetConnected())
+ return;
+
+ Log(LogNotice, "GraphiteWriter")
+ << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
+
+ try {
+ icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "GraphiteWriter")
+ << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
+
+ SetConnected(false);
+
+ throw;
+ }
+
+ SetConnected(true);
+
+ Log(LogInformation, "GraphiteWriter")
+ << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+}
+
+/**
+ * Reconnect handler called by the timer.
+ *
+ * Enqueues a reconnect task into the WQ.
+ */
+void GraphiteWriter::ReconnectTimerHandler()
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh);
+}
+
+/**
+ * Disconnect the stream.
+ *
+ * Called inside the WQ.
+ */
+void GraphiteWriter::Disconnect()
+{
+ AssertOnWorkQueue();
+
+ DisconnectInternal();
+}
+
+/**
+ * Disconnect the stream.
+ *
+ * Called outside the WQ.
+ */
+void GraphiteWriter::DisconnectInternal()
+{
+ if (!GetConnected())
+ return;
+
+ m_Stream->close();
+
+ SetConnected(false);
+}
+
+/**
+ * Check result event handler, checks whether feature is not paused in HA setups.
+ *
+ * @param checkable Host/Service object
+ * @param cr Check result including performance data
+ */
+void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
+}
+
+/**
+ * Check result event handler, prepares metadata and perfdata values and calls Send*()
+ *
+ * Called inside the WQ.
+ *
+ * @param checkable Host/Service object
+ * @param cr Check result including performance data
+ */
+void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("Processing check result for '" << checkable->GetName() << "'");
+
+ /* TODO: Deal with missing connection here. Needs refactoring
+ * into parsing the actual performance data and then putting it
+ * into a queue for re-inserting. */
+
+ if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
+ return;
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ MacroProcessor::ResolverList resolvers;
+ if (service)
+ resolvers.emplace_back("service", service);
+ resolvers.emplace_back("host", host);
+
+ String prefix;
+
+ if (service) {
+ prefix = MacroProcessor::ResolveMacros(GetServiceNameTemplate(), resolvers, cr, nullptr, [](const Value& value) -> Value {
+ return EscapeMacroMetric(value);
+ });
+ } else {
+ prefix = MacroProcessor::ResolveMacros(GetHostNameTemplate(), resolvers, cr, nullptr, [](const Value& value) -> Value {
+ return EscapeMacroMetric(value);
+ });
+ }
+
+ String prefixPerfdata = prefix + ".perfdata";
+ String prefixMetadata = prefix + ".metadata";
+
+ double ts = cr->GetExecutionEnd();
+
+ if (GetEnableSendMetadata()) {
+ if (service) {
+ SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
+ } else {
+ SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
+ }
+
+ SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
+ SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
+ SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
+ SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
+ SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
+ SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
+ SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
+ SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
+ }
+
+ SendPerfdata(checkable, prefixPerfdata, cr, ts);
+}
+
+/**
+ * Parse performance data from check result and call SendMetric()
+ *
+ * @param checkable Host/service object
+ * @param prefix Metric prefix string
+ * @param cr Check result including performance data
+ * @param ts Timestamp when the check result was created
+ */
+void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
+{
+ Array::Ptr perfdata = cr->GetPerformanceData();
+
+ if (!perfdata)
+ return;
+
+ CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
+
+ ObjectLock olock(perfdata);
+ for (const Value& val : perfdata) {
+ PerfdataValue::Ptr pdv;
+
+ if (val.IsObjectType<PerfdataValue>())
+ pdv = val;
+ else {
+ try {
+ pdv = PerfdataValue::Parse(val);
+ } catch (const std::exception&) {
+ Log(LogWarning, "GraphiteWriter")
+ << "Ignoring invalid perfdata for checkable '"
+ << checkable->GetName() << "' and command '"
+ << checkCommand->GetName() << "' with value: " << val;
+ continue;
+ }
+ }
+
+ String escapedKey = EscapeMetricLabel(pdv->GetLabel());
+
+ SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
+
+ if (GetEnableSendThresholds()) {
+ if (!pdv->GetCrit().IsEmpty())
+ SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
+ if (!pdv->GetWarn().IsEmpty())
+ SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
+ if (!pdv->GetMin().IsEmpty())
+ SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
+ if (!pdv->GetMax().IsEmpty())
+ SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
+ }
+ }
+}
+
+/**
+ * Computes metric data and sends to Graphite
+ *
+ * @param checkable Host/service object
+ * @param prefix Computed metric prefix string
+ * @param name Metric name
+ * @param value Metric value
+ * @param ts Timestamp when the check result was created
+ */
+void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
+{
+ namespace asio = boost::asio;
+
+ std::ostringstream msgbuf;
+ msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
+
+ Log(LogDebug, "GraphiteWriter")
+ << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
+
+ // do not send \n to debug log
+ msgbuf << "\n";
+
+ std::unique_lock<std::mutex> lock(m_StreamMutex);
+
+ if (!GetConnected())
+ return;
+
+ try {
+ asio::write(*m_Stream, asio::buffer(msgbuf.str()));
+ m_Stream->flush();
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "GraphiteWriter")
+ << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ throw ex;
+ }
+}
+
+/**
+ * Escape metric tree elements
+ *
+ * Dots are not allowed, e.g. in host names
+ *
+ * @param str Metric part name
+ * @return Escape string
+ */
+String GraphiteWriter::EscapeMetric(const String& str)
+{
+ String result = str;
+
+ //don't allow '.' in metric prefixes
+ boost::replace_all(result, " ", "_");
+ boost::replace_all(result, ".", "_");
+ boost::replace_all(result, "\\", "_");
+ boost::replace_all(result, "/", "_");
+
+ return result;
+}
+
+/**
+ * Escape metric label
+ *
+ * Dots are allowed - users can create trees from perfdata labels
+ *
+ * @param str Metric label name
+ * @return Escaped string
+ */
+String GraphiteWriter::EscapeMetricLabel(const String& str)
+{
+ String result = str;
+
+ //allow to pass '.' in perfdata labels
+ boost::replace_all(result, " ", "_");
+ boost::replace_all(result, "\\", "_");
+ boost::replace_all(result, "/", "_");
+ boost::replace_all(result, "::", ".");
+
+ return result;
+}
+
+/**
+ * Escape macro metrics found via host/service name templates
+ *
+ * @param value Array or string with macro metric names
+ * @return Escaped string. Arrays are joined with dots.
+ */
+Value GraphiteWriter::EscapeMacroMetric(const Value& value)
+{
+ if (value.IsObjectType<Array>()) {
+ Array::Ptr arr = value;
+ ArrayData result;
+
+ ObjectLock olock(arr);
+ for (const Value& arg : arr) {
+ result.push_back(EscapeMetric(arg));
+ }
+
+ return Utility::Join(new Array(std::move(result)), '.');
+ } else
+ return EscapeMetric(value);
+}
+
+/**
+ * Validate the configuration setting 'host_name_template'
+ *
+ * @param lvalue String containing runtime macros.
+ * @param utils Helper, unused
+ */
+void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
+
+ if (!MacroProcessor::ValidateMacroString(lvalue()))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
+}
+
+/**
+ * Validate the configuration setting 'service_name_template'
+ *
+ * @param lvalue String containing runtime macros.
+ * @param utils Helper, unused
+ */
+void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);
+
+ if (!MacroProcessor::ValidateMacroString(lvalue()))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "service_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
+}
diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp
new file mode 100644
index 0000000..e0c8b78
--- /dev/null
+++ b/lib/perfdata/graphitewriter.hpp
@@ -0,0 +1,69 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef GRAPHITEWRITER_H
+#define GRAPHITEWRITER_H
+
+#include "perfdata/graphitewriter-ti.hpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/timer.hpp"
+#include "base/workqueue.hpp"
+#include <fstream>
+#include <mutex>
+
+namespace icinga
+{
+
+/**
+ * An Icinga graphite writer.
+ *
+ * @ingroup perfdata
+ */
+class GraphiteWriter final : public ObjectImpl<GraphiteWriter>
+{
+public:
+ DECLARE_OBJECT(GraphiteWriter);
+ DECLARE_OBJECTNAME(GraphiteWriter);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+ void ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override;
+ void ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override;
+
+protected:
+ void OnConfigLoaded() override;
+ void Resume() override;
+ void Pause() override;
+
+private:
+ Shared<AsioTcpStream>::Ptr m_Stream;
+ std::mutex m_StreamMutex;
+ WorkQueue m_WorkQueue{10000000, 1};
+
+ boost::signals2::connection m_HandleCheckResults;
+ Timer::Ptr m_ReconnectTimer;
+
+ void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
+ void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts);
+ static String EscapeMetric(const String& str);
+ static String EscapeMetricLabel(const String& str);
+ static Value EscapeMacroMetric(const Value& value);
+
+ void ReconnectTimerHandler();
+
+ void Disconnect();
+ void DisconnectInternal();
+ void Reconnect();
+ void ReconnectInternal();
+
+ void AssertOnWorkQueue();
+
+ void ExceptionHandler(boost::exception_ptr exp);
+};
+
+}
+
+#endif /* GRAPHITEWRITER_H */
diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti
new file mode 100644
index 0000000..c8db067
--- /dev/null
+++ b/lib/perfdata/graphitewriter.ti
@@ -0,0 +1,38 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class GraphiteWriter : ConfigObject
+{
+ activation_priority 100;
+
+ [config] String host {
+ default {{{ return "127.0.0.1"; }}}
+ };
+ [config] String port {
+ default {{{ return "2003"; }}}
+ };
+ [config] String host_name_template {
+ default {{{ return "icinga2.$host.name$.host.$host.check_command$"; }}}
+ };
+ [config] String service_name_template {
+ default {{{ return "icinga2.$host.name$.services.$service.name$.$service.check_command$"; }}}
+ };
+ [config] bool enable_send_thresholds;
+ [config] bool enable_send_metadata;
+
+ [no_user_modify] bool connected;
+ [no_user_modify] bool should_connect {
+ default {{{ return true; }}}
+ };
+ [config] bool enable_ha {
+ default {{{ return false; }}}
+ };
+};
+
+}
diff --git a/lib/perfdata/influxdb2writer.cpp b/lib/perfdata/influxdb2writer.cpp
new file mode 100644
index 0000000..c92d7d4
--- /dev/null
+++ b/lib/perfdata/influxdb2writer.cpp
@@ -0,0 +1,44 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/influxdb2writer.hpp"
+#include "perfdata/influxdb2writer-ti.cpp"
+#include "remote/url.hpp"
+#include "base/configtype.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/statsfunction.hpp"
+#include <utility>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+
+using namespace icinga;
+
+REGISTER_TYPE(Influxdb2Writer);
+
+REGISTER_STATSFUNCTION(Influxdb2Writer, &Influxdb2Writer::StatsFunc);
+
+void Influxdb2Writer::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ InfluxdbCommonWriter::StatsFunc<Influxdb2Writer>(status, perfdata);
+}
+
+boost::beast::http::request<boost::beast::http::string_body> Influxdb2Writer::AssembleRequest(String body)
+{
+ auto request (AssembleBaseRequest(std::move(body)));
+
+ request.set(boost::beast::http::field::authorization, "Token " + GetAuthToken());
+
+ return request;
+}
+
+Url::Ptr Influxdb2Writer::AssembleUrl()
+{
+ auto url (AssembleBaseUrl());
+
+ std::vector<String> path ({"api", "v2", "write"});
+ url->SetPath(path);
+
+ url->AddQueryElement("org", GetOrganization());
+ url->AddQueryElement("bucket", GetBucket());
+
+ return url;
+}
diff --git a/lib/perfdata/influxdb2writer.hpp b/lib/perfdata/influxdb2writer.hpp
new file mode 100644
index 0000000..3b20f8b
--- /dev/null
+++ b/lib/perfdata/influxdb2writer.hpp
@@ -0,0 +1,33 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#ifndef INFLUXDB2WRITER_H
+#define INFLUXDB2WRITER_H
+
+#include "perfdata/influxdb2writer-ti.hpp"
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+
+namespace icinga
+{
+
+/**
+ * An Icinga InfluxDB v2 writer.
+ *
+ * @ingroup perfdata
+ */
+class Influxdb2Writer final : public ObjectImpl<Influxdb2Writer>
+{
+public:
+ DECLARE_OBJECT(Influxdb2Writer);
+ DECLARE_OBJECTNAME(Influxdb2Writer);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+protected:
+ boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) override;
+ Url::Ptr AssembleUrl() override;
+};
+
+}
+
+#endif /* INFLUXDB2WRITER_H */
diff --git a/lib/perfdata/influxdb2writer.ti b/lib/perfdata/influxdb2writer.ti
new file mode 100644
index 0000000..f806187
--- /dev/null
+++ b/lib/perfdata/influxdb2writer.ti
@@ -0,0 +1,19 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/influxdbcommonwriter.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class Influxdb2Writer : InfluxdbCommonWriter
+{
+ activation_priority 100;
+
+ [config, required] String organization;
+ [config, required] String bucket;
+ [config, required, no_user_view] String auth_token;
+};
+
+}
diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp
new file mode 100644
index 0000000..fb0bcc9
--- /dev/null
+++ b/lib/perfdata/influxdbcommonwriter.cpp
@@ -0,0 +1,596 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/influxdbcommonwriter.hpp"
+#include "perfdata/influxdbcommonwriter-ti.cpp"
+#include "remote/url.hpp"
+#include "icinga/service.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/icingaapplication.hpp"
+#include "icinga/checkcommand.hpp"
+#include "base/application.hpp"
+#include "base/defer.hpp"
+#include "base/io-engine.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/configtype.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/convert.hpp"
+#include "base/utility.hpp"
+#include "base/stream.hpp"
+#include "base/json.hpp"
+#include "base/networkstream.hpp"
+#include "base/exception.hpp"
+#include "base/statsfunction.hpp"
+#include "base/tlsutility.hpp"
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/replace.hpp>
+#include <boost/asio/ssl/context.hpp>
+#include <boost/beast/core/flat_buffer.hpp>
+#include <boost/beast/http/field.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/parser.hpp>
+#include <boost/beast/http/read.hpp>
+#include <boost/beast/http/status.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <boost/beast/http/verb.hpp>
+#include <boost/beast/http/write.hpp>
+#include <boost/math/special_functions/fpclassify.hpp>
+#include <boost/regex.hpp>
+#include <boost/scoped_array.hpp>
+#include <memory>
+#include <string>
+#include <utility>
+
+using namespace icinga;
+
+REGISTER_TYPE(InfluxdbCommonWriter);
+
+class InfluxdbInteger final : public Object
+{
+public:
+ DECLARE_PTR_TYPEDEFS(InfluxdbInteger);
+
+ InfluxdbInteger(int value)
+ : m_Value(value)
+ { }
+
+ int GetValue() const
+ {
+ return m_Value;
+ }
+
+private:
+ int m_Value;
+};
+
+void InfluxdbCommonWriter::OnConfigLoaded()
+{
+ ObjectImpl<InfluxdbCommonWriter>::OnConfigLoaded();
+
+ m_WorkQueue.SetName(GetReflectionType()->GetName() + ", " + GetName());
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+void InfluxdbCommonWriter::Resume()
+{
+ ObjectImpl<InfluxdbCommonWriter>::Resume();
+
+ Log(LogInformation, GetReflectionType()->GetName())
+ << "'" << GetName() << "' resumed.";
+
+ /* Register exception handler for WQ tasks. */
+ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
+
+ /* Setup timer for periodically flushing m_DataBuffer */
+ m_FlushTimer = Timer::Create();
+ m_FlushTimer->SetInterval(GetFlushInterval());
+ m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
+ m_FlushTimer->Start();
+ m_FlushTimer->Reschedule(0);
+
+ /* Register for new metrics. */
+ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ });
+}
+
+/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+void InfluxdbCommonWriter::Pause()
+{
+ m_HandleCheckResults.disconnect();
+
+ /* Force a flush. */
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Processing pending tasks and flushing data buffers.";
+
+ m_FlushTimer->Stop(true);
+ m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow);
+
+ /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */
+ m_WorkQueue.Join();
+
+ Log(LogInformation, GetReflectionType()->GetName())
+ << "'" << GetName() << "' paused.";
+
+ ObjectImpl<InfluxdbCommonWriter>::Pause();
+}
+
+void InfluxdbCommonWriter::AssertOnWorkQueue()
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, GetReflectionType()->GetName(), "Exception during InfluxDB operation: Verify that your backend is operational!");
+
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp));
+
+ //TODO: Close the connection, if we keep it open.
+}
+
+OptionalTlsStream InfluxdbCommonWriter::Connect()
+{
+ Log(LogNotice, GetReflectionType()->GetName())
+ << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ OptionalTlsStream stream;
+ bool ssl = GetSslEnable();
+
+ if (ssl) {
+ Shared<boost::asio::ssl::context>::Ptr sslContext;
+
+ try {
+ sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Unable to create SSL context.";
+ throw;
+ }
+
+ stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
+
+ } else {
+ stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
+ }
+
+ try {
+ icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ if (ssl) {
+ auto& tlsStream (stream.first->next_layer());
+
+ try {
+ tlsStream.handshake(tlsStream.client);
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "TLS handshake with host '" << GetHost() << "' failed.";
+ throw;
+ }
+
+ if (!GetSslInsecureNoverify()) {
+ if (!tlsStream.GetPeerCertificate()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate."));
+ }
+
+ if (!tlsStream.IsVerifyOK()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error(
+ "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
+ ));
+ }
+ }
+ }
+
+ return stream;
+}
+
+void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow);
+}
+
+void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("Processing check result for '" << checkable->GetName() << "'");
+
+ if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
+ return;
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ MacroProcessor::ResolverList resolvers;
+ if (service)
+ resolvers.emplace_back("service", service);
+ resolvers.emplace_back("host", host);
+
+ String prefix;
+
+ double ts = cr->GetExecutionEnd();
+
+ // Clone the template and perform an in-place macro expansion of measurement and tag values
+ Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
+ Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->ShallowClone());
+ tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
+
+ Dictionary::Ptr tagsClean = tmpl->Get("tags");
+ if (tagsClean) {
+ Dictionary::Ptr tags = new Dictionary();
+
+ {
+ ObjectLock olock(tagsClean);
+ for (const Dictionary::Pair& pair : tagsClean) {
+ String missing_macro;
+ Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);
+
+ if (missing_macro.IsEmpty()) {
+ tags->Set(pair.first, value);
+ }
+ }
+ }
+
+ tmpl->Set("tags", tags);
+ }
+
+ CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
+
+ Array::Ptr perfdata = cr->GetPerformanceData();
+
+ if (perfdata) {
+ ObjectLock olock(perfdata);
+ for (const Value& val : perfdata) {
+ PerfdataValue::Ptr pdv;
+
+ if (val.IsObjectType<PerfdataValue>())
+ pdv = val;
+ else {
+ try {
+ pdv = PerfdataValue::Parse(val);
+ } catch (const std::exception&) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Ignoring invalid perfdata for checkable '"
+ << checkable->GetName() << "' and command '"
+ << checkCommand->GetName() << "' with value: " << val;
+ continue;
+ }
+ }
+
+ Dictionary::Ptr fields = new Dictionary();
+ fields->Set("value", pdv->GetValue());
+
+ if (GetEnableSendThresholds()) {
+ if (!pdv->GetCrit().IsEmpty())
+ fields->Set("crit", pdv->GetCrit());
+ if (!pdv->GetWarn().IsEmpty())
+ fields->Set("warn", pdv->GetWarn());
+ if (!pdv->GetMin().IsEmpty())
+ fields->Set("min", pdv->GetMin());
+ if (!pdv->GetMax().IsEmpty())
+ fields->Set("max", pdv->GetMax());
+ }
+ if (!pdv->GetUnit().IsEmpty()) {
+ fields->Set("unit", pdv->GetUnit());
+ }
+
+ SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts);
+ }
+ }
+
+ if (GetEnableSendMetadata()) {
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ if (service)
+ fields->Set("state", new InfluxdbInteger(service->GetState()));
+ else
+ fields->Set("state", new InfluxdbInteger(host->GetState()));
+
+ fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt()));
+ fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts()));
+ fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType()));
+ fields->Set("reachable", checkable->IsReachable());
+ fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth()));
+ fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
+ fields->Set("latency", cr->CalculateLatency());
+ fields->Set("execution_time", cr->CalculateExecutionTime());
+
+ SendMetric(checkable, tmpl, Empty, fields, ts);
+ }
+}
+
+String InfluxdbCommonWriter::EscapeKeyOrTagValue(const String& str)
+{
+ // Iterate over the key name and escape commas and spaces with a backslash
+ String result = str;
+ boost::algorithm::replace_all(result, "\"", "\\\"");
+ boost::algorithm::replace_all(result, "=", "\\=");
+ boost::algorithm::replace_all(result, ",", "\\,");
+ boost::algorithm::replace_all(result, " ", "\\ ");
+
+ // InfluxDB 'feature': although backslashes are allowed in keys they also act
+ // as escape sequences when followed by ',' or ' '. When your tag is like
+ // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped
+ // and through experimentation they also escape '='. To be safe we replace
+ // trailing backslashes with and underscore.
+ // See https://github.com/influxdata/influxdb/issues/8587 for more info
+ size_t length = result.GetLength();
+ if (result[length - 1] == '\\')
+ result[length - 1] = '_';
+
+ return result;
+}
+
+String InfluxdbCommonWriter::EscapeValue(const Value& value)
+{
+ if (value.IsObjectType<InfluxdbInteger>()) {
+ std::ostringstream os;
+ os << static_cast<InfluxdbInteger::Ptr>(value)->GetValue() << "i";
+ return os.str();
+ }
+
+ if (value.IsBoolean())
+ return value ? "true" : "false";
+
+ if (value.IsString())
+ return "\"" + EscapeKeyOrTagValue(value) + "\"";
+
+ return value;
+}
+
+void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
+ const String& label, const Dictionary::Ptr& fields, double ts)
+{
+ std::ostringstream msgbuf;
+ msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));
+
+ Dictionary::Ptr tags = tmpl->Get("tags");
+ if (tags) {
+ ObjectLock olock(tags);
+ for (const Dictionary::Pair& pair : tags) {
+ // Empty macro expansion, no tag
+ if (!pair.second.IsEmpty()) {
+ msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second);
+ }
+ }
+ }
+
+ // Label may be empty in the case of metadata
+ if (!label.IsEmpty())
+ msgbuf << ",metric=" << EscapeKeyOrTagValue(label);
+
+ msgbuf << " ";
+
+ {
+ bool first = true;
+
+ ObjectLock fieldLock(fields);
+ for (const Dictionary::Pair& pair : fields) {
+ if (first)
+ first = false;
+ else
+ msgbuf << ",";
+
+ msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second);
+ }
+ }
+
+ msgbuf << " " << static_cast<unsigned long>(ts);
+
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Checkable '" << checkable->GetName() << "' adds to metric list:'" << msgbuf.str() << "'.";
+
+ // Buffer the data point
+ m_DataBuffer.emplace_back(msgbuf.str());
+ m_DataBufferSize = m_DataBuffer.size();
+
+ // Flush if we've buffered too much to prevent excessive memory use
+ if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
+
+ try {
+ FlushWQ();
+ } catch (...) {
+ /* Do nothing. */
+ }
+ }
+}
+
+void InfluxdbCommonWriter::FlushTimeout()
+{
+ m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh);
+}
+
+void InfluxdbCommonWriter::FlushTimeoutWQ()
+{
+ AssertOnWorkQueue();
+
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Timer expired writing " << m_DataBuffer.size() << " data points";
+
+ FlushWQ();
+}
+
+void InfluxdbCommonWriter::FlushWQ()
+{
+ AssertOnWorkQueue();
+
+ namespace beast = boost::beast;
+ namespace http = beast::http;
+
+ /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
+ if (m_DataBuffer.empty())
+ return;
+
+ Log(LogDebug, GetReflectionType()->GetName())
+ << "Flushing data buffer to InfluxDB.";
+
+ String body = boost::algorithm::join(m_DataBuffer, "\n");
+ m_DataBuffer.clear();
+ m_DataBufferSize = 0;
+
+ OptionalTlsStream stream;
+
+ try {
+ stream = Connect();
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false);
+ return;
+ }
+
+ Defer s ([&stream]() {
+ if (stream.first) {
+ stream.first->next_layer().shutdown();
+ }
+ });
+
+ auto request (AssembleRequest(std::move(body)));
+
+ try {
+ if (stream.first) {
+ http::write(*stream.first, request);
+ stream.first->flush();
+ } else {
+ http::write(*stream.second, request);
+ stream.second->flush();
+ }
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ http::parser<false, http::string_body> parser;
+ beast::flat_buffer buf;
+
+ try {
+ if (stream.first) {
+ http::read(*stream.first, buf, parser);
+ } else {
+ http::read(*stream.second, buf, parser);
+ }
+ } catch (const std::exception& ex) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
+ throw;
+ }
+
+ auto& response (parser.get());
+
+ if (response.result() != http::status::no_content) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Unexpected response code: " << response.result();
+
+ auto& contentType (response[http::field::content_type]);
+ if (contentType != "application/json") {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Unexpected Content-Type: " << contentType;
+ return;
+ }
+
+ Dictionary::Ptr jsonResponse;
+ auto& body (response.body());
+
+ try {
+ jsonResponse = JsonDecode(body);
+ } catch (...) {
+ Log(LogWarning, GetReflectionType()->GetName())
+ << "Unable to parse JSON response:\n" << body;
+ return;
+ }
+
+ String error = jsonResponse->Get("error");
+
+ Log(LogCritical, GetReflectionType()->GetName())
+ << "InfluxDB error message:\n" << error;
+ }
+}
+
+boost::beast::http::request<boost::beast::http::string_body> InfluxdbCommonWriter::AssembleBaseRequest(String body)
+{
+ namespace http = boost::beast::http;
+
+ auto url (AssembleUrl());
+ http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
+
+ request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
+ request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
+ request.body() = std::move(body);
+ request.content_length(request.body().size());
+
+ return request;
+}
+
+Url::Ptr InfluxdbCommonWriter::AssembleBaseUrl()
+{
+ Url::Ptr url = new Url();
+
+ url->SetScheme(GetSslEnable() ? "https" : "http");
+ url->SetHost(GetHost());
+ url->SetPort(GetPort());
+ url->AddQueryElement("precision", "s");
+
+ return url;
+}
+
+void InfluxdbCommonWriter::ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl<InfluxdbCommonWriter>::ValidateHostTemplate(lvalue, utils);
+
+ String measurement = lvalue()->Get("measurement");
+ if (!MacroProcessor::ValidateMacroString(measurement))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
+
+ Dictionary::Ptr tags = lvalue()->Get("tags");
+ if (tags) {
+ ObjectLock olock(tags);
+ for (const Dictionary::Pair& pair : tags) {
+ if (!MacroProcessor::ValidateMacroString(pair.second))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
+ }
+ }
+}
+
+void InfluxdbCommonWriter::ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl<InfluxdbCommonWriter>::ValidateServiceTemplate(lvalue, utils);
+
+ String measurement = lvalue()->Get("measurement");
+ if (!MacroProcessor::ValidateMacroString(measurement))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
+
+ Dictionary::Ptr tags = lvalue()->Get("tags");
+ if (tags) {
+ ObjectLock olock(tags);
+ for (const Dictionary::Pair& pair : tags) {
+ if (!MacroProcessor::ValidateMacroString(pair.second))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
+ }
+ }
+}
+
diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp
new file mode 100644
index 0000000..380b20c
--- /dev/null
+++ b/lib/perfdata/influxdbcommonwriter.hpp
@@ -0,0 +1,101 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#ifndef INFLUXDBCOMMONWRITER_H
+#define INFLUXDBCOMMONWRITER_H
+
+#include "perfdata/influxdbcommonwriter-ti.hpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/timer.hpp"
+#include "base/tlsstream.hpp"
+#include "base/workqueue.hpp"
+#include "remote/url.hpp"
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <atomic>
+#include <fstream>
+
+namespace icinga
+{
+
+/**
+ * Common base class for InfluxDB v1/v2 writers.
+ *
+ * @ingroup perfdata
+ */
+class InfluxdbCommonWriter : public ObjectImpl<InfluxdbCommonWriter>
+{
+public:
+ DECLARE_OBJECT(InfluxdbCommonWriter);
+
+ template<class InfluxWriter>
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+ void ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
+ void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
+
+protected:
+ void OnConfigLoaded() override;
+ void Resume() override;
+ void Pause() override;
+
+ boost::beast::http::request<boost::beast::http::string_body> AssembleBaseRequest(String body);
+ Url::Ptr AssembleBaseUrl();
+ virtual boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) = 0;
+ virtual Url::Ptr AssembleUrl() = 0;
+
+private:
+ boost::signals2::connection m_HandleCheckResults;
+ Timer::Ptr m_FlushTimer;
+ WorkQueue m_WorkQueue{10000000, 1};
+ std::vector<String> m_DataBuffer;
+ std::atomic_size_t m_DataBufferSize{0};
+
+ void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
+ const String& label, const Dictionary::Ptr& fields, double ts);
+ void FlushTimeout();
+ void FlushTimeoutWQ();
+ void FlushWQ();
+
+ static String EscapeKeyOrTagValue(const String& str);
+ static String EscapeValue(const Value& value);
+
+ OptionalTlsStream Connect();
+
+ void AssertOnWorkQueue();
+
+ void ExceptionHandler(boost::exception_ptr exp);
+};
+
+template<class InfluxWriter>
+void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ DictionaryData nodes;
+ auto typeName (InfluxWriter::TypeInstance->GetName().ToLower());
+
+ for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType<InfluxWriter>()) {
+ size_t workQueueItems = influxwriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+ size_t dataBufferItems = influxwriter->m_DataBufferSize;
+
+ nodes.emplace_back(influxwriter->GetName(), new Dictionary({
+ { "work_queue_items", workQueueItems },
+ { "work_queue_item_rate", workQueueItemRate },
+ { "data_buffer_items", dataBufferItems }
+ }));
+
+ perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+ perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_data_queue_items", dataBufferItems));
+ }
+
+ status->Set(typeName, new Dictionary(std::move(nodes)));
+}
+
+}
+
+#endif /* INFLUXDBCOMMONWRITER_H */
diff --git a/lib/perfdata/influxdbcommonwriter.ti b/lib/perfdata/influxdbcommonwriter.ti
new file mode 100644
index 0000000..5cfe83f
--- /dev/null
+++ b/lib/perfdata/influxdbcommonwriter.ti
@@ -0,0 +1,88 @@
+/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */
+
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+abstract class InfluxdbCommonWriter : ConfigObject
+{
+ [config, required] String host {
+ default {{{ return "127.0.0.1"; }}}
+ };
+ [config, required] String port {
+ default {{{ return "8086"; }}}
+ };
+ [config] bool ssl_enable {
+ default {{{ return false; }}}
+ };
+ [config] bool ssl_insecure_noverify {
+ default {{{ return false; }}}
+ };
+ [config] String ssl_ca_cert {
+ default {{{ return ""; }}}
+ };
+ [config] String ssl_cert {
+ default {{{ return ""; }}}
+ };
+ [config] String ssl_key{
+ default {{{ return ""; }}}
+ };
+ [config, required] Dictionary::Ptr host_template {
+ default {{{
+ return new Dictionary({
+ { "measurement", "$host.check_command$" },
+ { "tags", new Dictionary({
+ { "hostname", "$host.name$" }
+ }) }
+ });
+ }}}
+ };
+ [config, required] Dictionary::Ptr service_template {
+ default {{{
+ return new Dictionary({
+ { "measurement", "$service.check_command$" },
+ { "tags", new Dictionary({
+ { "hostname", "$host.name$" },
+ { "service", "$service.name$" }
+ }) }
+ });
+ }}}
+ };
+ [config] bool enable_send_thresholds {
+ default {{{ return false; }}}
+ };
+ [config] bool enable_send_metadata {
+ default {{{ return false; }}}
+ };
+ [config] int flush_interval {
+ default {{{ return 10; }}}
+ };
+ [config] int flush_threshold {
+ default {{{ return 1024; }}}
+ };
+ [config] bool enable_ha {
+ default {{{ return false; }}}
+ };
+};
+
+validator InfluxdbCommonWriter {
+ Dictionary host_template {
+ required measurement;
+ String measurement;
+ Dictionary "tags" {
+ String "*";
+ };
+ };
+ Dictionary service_template {
+ required measurement;
+ String measurement;
+ Dictionary "tags" {
+ String "*";
+ };
+ };
+};
+
+}
diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp
new file mode 100644
index 0000000..4bc992d
--- /dev/null
+++ b/lib/perfdata/influxdbwriter.cpp
@@ -0,0 +1,56 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/influxdbwriter.hpp"
+#include "perfdata/influxdbwriter-ti.cpp"
+#include "base/base64.hpp"
+#include "remote/url.hpp"
+#include "base/configtype.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/statsfunction.hpp"
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <utility>
+
+using namespace icinga;
+
+REGISTER_TYPE(InfluxdbWriter);
+
+REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
+
+void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ InfluxdbCommonWriter::StatsFunc<InfluxdbWriter>(status, perfdata);
+}
+
+boost::beast::http::request<boost::beast::http::string_body> InfluxdbWriter::AssembleRequest(String body)
+{
+ auto request (AssembleBaseRequest(std::move(body)));
+ Dictionary::Ptr basicAuth = GetBasicAuth();
+
+ if (basicAuth) {
+ request.set(
+ boost::beast::http::field::authorization,
+ "Basic " + Base64::Encode(basicAuth->Get("username") + ":" + basicAuth->Get("password"))
+ );
+ }
+
+ return request;
+}
+
+Url::Ptr InfluxdbWriter::AssembleUrl()
+{
+ auto url (AssembleBaseUrl());
+
+ std::vector<String> path;
+ path.emplace_back("write");
+ url->SetPath(path);
+
+ url->AddQueryElement("db", GetDatabase());
+
+ if (!GetUsername().IsEmpty())
+ url->AddQueryElement("u", GetUsername());
+ if (!GetPassword().IsEmpty())
+ url->AddQueryElement("p", GetPassword());
+
+ return url;
+}
diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp
new file mode 100644
index 0000000..48676cc
--- /dev/null
+++ b/lib/perfdata/influxdbwriter.hpp
@@ -0,0 +1,31 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef INFLUXDBWRITER_H
+#define INFLUXDBWRITER_H
+
+#include "perfdata/influxdbwriter-ti.hpp"
+
+namespace icinga
+{
+
+/**
+ * An Icinga InfluxDB v1 writer.
+ *
+ * @ingroup perfdata
+ */
+class InfluxdbWriter final : public ObjectImpl<InfluxdbWriter>
+{
+public:
+ DECLARE_OBJECT(InfluxdbWriter);
+ DECLARE_OBJECTNAME(InfluxdbWriter);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+protected:
+ boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) override;
+ Url::Ptr AssembleUrl() override;
+};
+
+}
+
+#endif /* INFLUXDBWRITER_H */
diff --git a/lib/perfdata/influxdbwriter.ti b/lib/perfdata/influxdbwriter.ti
new file mode 100644
index 0000000..e6fc84e
--- /dev/null
+++ b/lib/perfdata/influxdbwriter.ti
@@ -0,0 +1,35 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/influxdbcommonwriter.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class InfluxdbWriter : InfluxdbCommonWriter
+{
+ activation_priority 100;
+
+ [config, required] String database {
+ default {{{ return "icinga2"; }}}
+ };
+ [config] String username {
+ default {{{ return ""; }}}
+ };
+ [config, no_user_view] String password {
+ default {{{ return ""; }}}
+ };
+ [config, no_user_view] Dictionary::Ptr basic_auth;
+};
+
+validator InfluxdbWriter {
+ Dictionary basic_auth {
+ required username;
+ String username;
+ required password;
+ String password;
+ };
+};
+
+}
diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp
new file mode 100644
index 0000000..2a9cfc0
--- /dev/null
+++ b/lib/perfdata/opentsdbwriter.cpp
@@ -0,0 +1,525 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/opentsdbwriter.hpp"
+#include "perfdata/opentsdbwriter-ti.cpp"
+#include "icinga/service.hpp"
+#include "icinga/checkcommand.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/icingaapplication.hpp"
+#include "icinga/compatutility.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/configtype.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/convert.hpp"
+#include "base/utility.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/application.hpp"
+#include "base/stream.hpp"
+#include "base/networkstream.hpp"
+#include "base/exception.hpp"
+#include "base/statsfunction.hpp"
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/replace.hpp>
+
+using namespace icinga;
+
+REGISTER_TYPE(OpenTsdbWriter);
+
+REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc);
+
+/*
+ * Enable HA capabilities once the config object is loaded.
+ */
+void OpenTsdbWriter::OnConfigLoaded()
+{
+ ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, "OpenTsdbWriter")
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+/**
+ * Feature stats interface
+ *
+ * @param status Key value pairs for feature stats
+ */
+void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
+{
+ DictionaryData nodes;
+
+ for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
+ nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({
+ { "connected", opentsdbwriter->GetConnected() }
+ }));
+ }
+
+ status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
+}
+
+/**
+ * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
+ */
+void OpenTsdbWriter::Resume()
+{
+ ObjectImpl<OpenTsdbWriter>::Resume();
+
+ Log(LogInformation, "OpentsdbWriter")
+ << "'" << GetName() << "' resumed.";
+
+ ReadConfigTemplate(m_ServiceConfigTemplate, m_HostConfigTemplate);
+
+ m_ReconnectTimer = Timer::Create();
+ m_ReconnectTimer->SetInterval(10);
+ m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
+ m_ReconnectTimer->Start();
+ m_ReconnectTimer->Reschedule(0);
+
+ m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ });
+}
+
+/**
+ * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
+ */
+void OpenTsdbWriter::Pause()
+{
+ m_HandleCheckResults.disconnect();
+ m_ReconnectTimer->Stop(true);
+
+ Log(LogInformation, "OpentsdbWriter")
+ << "'" << GetName() << "' paused.";
+
+ m_Stream->close();
+
+ SetConnected(false);
+
+ ObjectImpl<OpenTsdbWriter>::Pause();
+}
+
+/**
+ * Reconnect handler called by the timer.
+ * Handles TLS
+ */
+void OpenTsdbWriter::ReconnectTimerHandler()
+{
+ if (IsPaused())
+ return;
+
+ SetShouldConnect(true);
+
+ if (GetConnected())
+ return;
+
+ double startTime = Utility::GetTime();
+
+ Log(LogNotice, "OpenTsdbWriter")
+ << "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ /*
+ * We're using telnet as input method. Future PRs may change this into using the HTTP API.
+ * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
+ */
+ m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
+
+ try {
+ icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "OpenTsdbWriter")
+ << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ SetConnected(false);
+
+ return;
+ }
+
+ SetConnected(true);
+
+ Log(LogInformation, "OpenTsdbWriter")
+ << "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+}
+
+/**
+ * Registered check result handler processing data.
+ * Calculates tags from the config.
+ *
+ * @param checkable Host/service object
+ * @param cr Check result
+ */
+void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ CONTEXT("Processing check result for '" << checkable->GetName() << "'");
+
+ if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
+ return;
+
+ Service::Ptr service = dynamic_pointer_cast<Service>(checkable);
+ Host::Ptr host;
+ Dictionary::Ptr config_tmpl;
+ Dictionary::Ptr config_tmpl_tags;
+ String config_tmpl_metric;
+
+ if (service) {
+ host = service->GetHost();
+ config_tmpl = m_ServiceConfigTemplate;
+ }
+ else {
+ host = static_pointer_cast<Host>(checkable);
+ config_tmpl = m_HostConfigTemplate;
+ }
+
+ // Get the tags nested dictionary in the service/host template in the config
+ if (config_tmpl) {
+ config_tmpl_tags = config_tmpl->Get("tags");
+ config_tmpl_metric = config_tmpl->Get("metric");
+ }
+
+ String metric;
+ std::map<String, String> tags;
+
+ // Resolve macros in configuration template and build custom tag list
+ if (config_tmpl_tags || !config_tmpl_metric.IsEmpty()) {
+
+ // Configure config template macro resolver
+ MacroProcessor::ResolverList resolvers;
+ if (service)
+ resolvers.emplace_back("service", service);
+ resolvers.emplace_back("host", host);
+
+ // Resolve macros for the service and host template config line
+ if (config_tmpl_tags) {
+ ObjectLock olock(config_tmpl_tags);
+
+ for (const Dictionary::Pair& pair : config_tmpl_tags) {
+
+ String missing_macro;
+ Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);
+
+ if (!missing_macro.IsEmpty()) {
+ Log(LogDebug, "OpenTsdbWriter")
+ << "Unable to resolve macro:'" << missing_macro
+ << "' for this host or service.";
+
+ continue;
+ }
+
+ String tagname = Convert::ToString(pair.first);
+ tags[tagname] = EscapeTag(value);
+
+ }
+ }
+
+ // Resolve macros for the metric config line
+ if (!config_tmpl_metric.IsEmpty()) {
+
+ String missing_macro;
+ Value value = MacroProcessor::ResolveMacros(config_tmpl_metric, resolvers, cr, &missing_macro);
+
+ if (!missing_macro.IsEmpty()) {
+ Log(LogDebug, "OpenTsdbWriter")
+ << "Unable to resolve macro:'" << missing_macro
+ << "' for this host or service.";
+
+ }
+ else {
+
+ config_tmpl_metric = Convert::ToString(value);
+
+ }
+ }
+ }
+
+ String escaped_hostName = EscapeTag(host->GetName());
+ tags["host"] = escaped_hostName;
+
+ double ts = cr->GetExecutionEnd();
+
+ if (service) {
+
+ if (!config_tmpl_metric.IsEmpty()) {
+ metric = config_tmpl_metric;
+ } else {
+ String serviceName = service->GetShortName();
+ String escaped_serviceName = EscapeMetric(serviceName);
+ metric = "icinga.service." + escaped_serviceName;
+ }
+
+ SendMetric(checkable, metric + ".state", tags, service->GetState(), ts);
+
+ } else {
+ if (!config_tmpl_metric.IsEmpty()) {
+ metric = config_tmpl_metric;
+ } else {
+ metric = "icinga.host";
+ }
+ SendMetric(checkable, metric + ".state", tags, host->GetState(), ts);
+ }
+
+ SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
+ SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
+ SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
+ SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
+
+ SendPerfdata(checkable, metric, tags, cr, ts);
+
+ metric = "icinga.check";
+
+ if (service) {
+ tags["type"] = "service";
+ String serviceName = service->GetShortName();
+ String escaped_serviceName = EscapeTag(serviceName);
+ tags["service"] = escaped_serviceName;
+ } else {
+ tags["type"] = "host";
+ }
+
+ SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
+ SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
+ SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
+ SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
+}
+
+/**
+ * Parse and send performance data metrics to OpenTSDB
+ *
+ * @param checkable Host/service object
+ * @param metric Full metric name
+ * @param tags Tag key pairs
+ * @param cr Check result containing performance data
+ * @param ts Timestamp when the check result was received
+ */
+void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
+ const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
+{
+ Array::Ptr perfdata = cr->GetPerformanceData();
+
+ if (!perfdata)
+ return;
+
+ CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
+
+ ObjectLock olock(perfdata);
+ for (const Value& val : perfdata) {
+ PerfdataValue::Ptr pdv;
+
+ if (val.IsObjectType<PerfdataValue>())
+ pdv = val;
+ else {
+ try {
+ pdv = PerfdataValue::Parse(val);
+ } catch (const std::exception&) {
+ Log(LogWarning, "OpenTsdbWriter")
+ << "Ignoring invalid perfdata for checkable '"
+ << checkable->GetName() << "' and command '"
+ << checkCommand->GetName() << "' with value: " << val;
+ continue;
+ }
+ }
+
+ String metric_name;
+ std::map<String, String> tags_new = tags;
+
+ // Do not break original functionality where perfdata labels form
+ // part of the metric name
+ if (!GetEnableGenericMetrics()) {
+ String escaped_key = EscapeMetric(pdv->GetLabel());
+ boost::algorithm::replace_all(escaped_key, "::", ".");
+ metric_name = metric + "." + escaped_key;
+ } else {
+ String escaped_key = EscapeTag(pdv->GetLabel());
+ metric_name = metric;
+ tags_new["label"] = escaped_key;
+ }
+
+ SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
+
+ if (!pdv->GetCrit().IsEmpty())
+ SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
+ if (!pdv->GetWarn().IsEmpty())
+ SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
+ if (!pdv->GetMin().IsEmpty())
+ SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
+ if (!pdv->GetMax().IsEmpty())
+ SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
+ }
+}
+
+/**
+ * Send given metric to OpenTSDB
+ *
+ * @param checkable Host/service object
+ * @param metric Full metric name
+ * @param tags Tag key pairs
+ * @param value Floating point metric value
+ * @param ts Timestamp where the metric was received from the check result
+ */
+void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
+ const std::map<String, String>& tags, double value, double ts)
+{
+ String tags_string = "";
+
+ for (const Dictionary::Pair& tag : tags) {
+ tags_string += " " + tag.first + "=" + Convert::ToString(tag.second);
+ }
+
+ std::ostringstream msgbuf;
+ /*
+ * must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html)
+ * put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
+ * "tags" must include at least one tag, we use "host=HOSTNAME"
+ */
+ msgbuf << "put " << metric << " " << static_cast<long>(ts) << " " << Convert::ToString(value) << tags_string;
+
+ Log(LogDebug, "OpenTsdbWriter")
+ << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
+
+ /* do not send \n to debug log */
+ msgbuf << "\n";
+ String put = msgbuf.str();
+
+ ObjectLock olock(this);
+
+ if (!GetConnected())
+ return;
+
+ try {
+ Log(LogDebug, "OpenTsdbWriter")
+ << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
+
+ boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
+ m_Stream->flush();
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "OpenTsdbWriter")
+ << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ }
+}
+
+/**
+ * Escape tags for OpenTSDB
+ * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
+ *
+ * @param str Tag name
+ * @return Escaped tag
+ */
+String OpenTsdbWriter::EscapeTag(const String& str)
+{
+ String result = str;
+
+ boost::replace_all(result, " ", "_");
+ boost::replace_all(result, "\\", "_");
+ boost::replace_all(result, ":", "_");
+
+ return result;
+}
+
+/**
+ * Escape metric name for OpenTSDB
+ * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
+ *
+ * @param str Metric name
+ * @return Escaped metric
+ */
+String OpenTsdbWriter::EscapeMetric(const String& str)
+{
+ String result = str;
+
+ boost::replace_all(result, " ", "_");
+ boost::replace_all(result, ".", "_");
+ boost::replace_all(result, "\\", "_");
+ boost::replace_all(result, ":", "_");
+
+ return result;
+}
+
+/**
+* Saves the template dictionaries defined in the config file into running memory
+*
+* @param stemplate The dictionary to save the service configuration to
+* @param htemplate The dictionary to save the host configuration to
+*/
+void OpenTsdbWriter::ReadConfigTemplate(const Dictionary::Ptr& stemplate,
+ const Dictionary::Ptr& htemplate)
+{
+
+ m_ServiceConfigTemplate = GetServiceTemplate();
+
+ if (!m_ServiceConfigTemplate) {
+ Log(LogDebug, "OpenTsdbWriter")
+ << "Unable to locate service template configuration.";
+ } else if (m_ServiceConfigTemplate->GetLength() == 0) {
+ Log(LogDebug, "OpenTsdbWriter")
+ << "The service template configuration is empty.";
+ }
+
+ m_HostConfigTemplate = GetHostTemplate();
+
+ if (!m_HostConfigTemplate) {
+ Log(LogDebug, "OpenTsdbWriter")
+ << "Unable to locate host template configuration.";
+ } else if (m_HostConfigTemplate->GetLength() == 0) {
+ Log(LogDebug, "OpenTsdbWriter")
+ << "The host template configuration is empty.";
+ }
+
+}
+
+
+/**
+* Validates the host_template configuration block in the configuration
+* file and checks for syntax errors.
+*
+* @param lvalue The host_template dictionary
+* @param utils Validation helper utilities
+*/
+void OpenTsdbWriter::ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl<OpenTsdbWriter>::ValidateHostTemplate(lvalue, utils);
+
+ String metric = lvalue()->Get("metric");
+ if (!MacroProcessor::ValidateMacroString(metric))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "metric" }, "Closing $ not found in macro format string '" + metric + "'."));
+
+ Dictionary::Ptr tags = lvalue()->Get("tags");
+ if (tags) {
+ ObjectLock olock(tags);
+ for (const Dictionary::Pair& pair : tags) {
+ if (!MacroProcessor::ValidateMacroString(pair.second))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
+ }
+ }
+}
+
+/**
+* Validates the service_template configuration block in the
+* configuration file and checks for syntax errors.
+*
+* @param lvalue The service_template dictionary
+* @param utils Validation helper utilities
+*/
+void OpenTsdbWriter::ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl<OpenTsdbWriter>::ValidateServiceTemplate(lvalue, utils);
+
+ String metric = lvalue()->Get("metric");
+ if (!MacroProcessor::ValidateMacroString(metric))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "metric" }, "Closing $ not found in macro format string '" + metric + "'."));
+
+ Dictionary::Ptr tags = lvalue()->Get("tags");
+ if (tags) {
+ ObjectLock olock(tags);
+ for (const Dictionary::Pair& pair : tags) {
+ if (!MacroProcessor::ValidateMacroString(pair.second))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
+ }
+ }
+}
diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp
new file mode 100644
index 0000000..e37ef42
--- /dev/null
+++ b/lib/perfdata/opentsdbwriter.hpp
@@ -0,0 +1,62 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef OPENTSDBWRITER_H
+#define OPENTSDBWRITER_H
+
+#include "perfdata/opentsdbwriter-ti.hpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/timer.hpp"
+#include <fstream>
+
+namespace icinga
+{
+
+/**
+ * An Icinga opentsdb writer.
+ *
+ * @ingroup perfdata
+ */
+class OpenTsdbWriter final : public ObjectImpl<OpenTsdbWriter>
+{
+public:
+ DECLARE_OBJECT(OpenTsdbWriter);
+ DECLARE_OBJECTNAME(OpenTsdbWriter);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+ void ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
+ void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
+
+protected:
+ void OnConfigLoaded() override;
+ void Resume() override;
+ void Pause() override;
+
+private:
+ Shared<AsioTcpStream>::Ptr m_Stream;
+
+ boost::signals2::connection m_HandleCheckResults;
+ Timer::Ptr m_ReconnectTimer;
+
+ Dictionary::Ptr m_ServiceConfigTemplate;
+ Dictionary::Ptr m_HostConfigTemplate;
+
+ void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void SendMetric(const Checkable::Ptr& checkable, const String& metric,
+ const std::map<String, String>& tags, double value, double ts);
+ void SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
+ const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);
+ static String EscapeTag(const String& str);
+ static String EscapeMetric(const String& str);
+
+ void ReconnectTimerHandler();
+
+ void ReadConfigTemplate(const Dictionary::Ptr& stemplate,
+ const Dictionary::Ptr& htemplate);
+};
+
+}
+
+#endif /* OPENTSDBWRITER_H */
diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti
new file mode 100644
index 0000000..626350a
--- /dev/null
+++ b/lib/perfdata/opentsdbwriter.ti
@@ -0,0 +1,55 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class OpenTsdbWriter : ConfigObject
+{
+ activation_priority 100;
+
+ [config] String host {
+ default {{{ return "127.0.0.1"; }}}
+ };
+ [config] String port {
+ default {{{ return "4242"; }}}
+ };
+ [config] bool enable_ha {
+ default {{{ return false; }}}
+ };
+ [config] Dictionary::Ptr host_template {
+ default {{{ return new Dictionary(); }}}
+
+ };
+ [config] Dictionary::Ptr service_template {
+ default {{{ return new Dictionary(); }}}
+ };
+ [config] bool enable_generic_metrics {
+ default {{{ return false; }}}
+ };
+
+ [no_user_modify] bool connected;
+ [no_user_modify] bool should_connect {
+ default {{{ return true; }}}
+ };
+};
+
+validator OpenTsdbWriter {
+ Dictionary host_template {
+ String metric;
+ Dictionary "tags" {
+ String "*";
+ };
+ };
+ Dictionary service_template {
+ String metric;
+ Dictionary "tags" {
+ String "*";
+ };
+ };
+};
+
+}
diff --git a/lib/perfdata/perfdatawriter.cpp b/lib/perfdata/perfdatawriter.cpp
new file mode 100644
index 0000000..849f19e
--- /dev/null
+++ b/lib/perfdata/perfdatawriter.cpp
@@ -0,0 +1,201 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/perfdatawriter.hpp"
+#include "perfdata/perfdatawriter-ti.cpp"
+#include "icinga/service.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/icingaapplication.hpp"
+#include "base/configtype.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/convert.hpp"
+#include "base/utility.hpp"
+#include "base/context.hpp"
+#include "base/exception.hpp"
+#include "base/application.hpp"
+#include "base/statsfunction.hpp"
+
+using namespace icinga;
+
+REGISTER_TYPE(PerfdataWriter);
+
+REGISTER_STATSFUNCTION(PerfdataWriter, &PerfdataWriter::StatsFunc);
+
+void PerfdataWriter::OnConfigLoaded()
+{
+ ObjectImpl<PerfdataWriter>::OnConfigLoaded();
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, "PerfdataWriter")
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+void PerfdataWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
+{
+ DictionaryData nodes;
+
+ for (const PerfdataWriter::Ptr& perfdatawriter : ConfigType::GetObjectsByType<PerfdataWriter>()) {
+ nodes.emplace_back(perfdatawriter->GetName(), 1); //add more stats
+ }
+
+ status->Set("perfdatawriter", new Dictionary(std::move(nodes)));
+}
+
+void PerfdataWriter::Resume()
+{
+ ObjectImpl<PerfdataWriter>::Resume();
+
+ Log(LogInformation, "PerfdataWriter")
+ << "'" << GetName() << "' resumed.";
+
+ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ });
+
+ m_RotationTimer = Timer::Create();
+ m_RotationTimer->OnTimerExpired.connect([this](const Timer * const&) { RotationTimerHandler(); });
+ m_RotationTimer->SetInterval(GetRotationInterval());
+ m_RotationTimer->Start();
+
+ RotateFile(m_ServiceOutputFile, GetServiceTempPath(), GetServicePerfdataPath());
+ RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath());
+}
+
+void PerfdataWriter::Pause()
+{
+ m_HandleCheckResults.disconnect();
+ m_RotationTimer->Stop(true);
+
+#ifdef I2_DEBUG
+ //m_HostOutputFile << "\n# Pause the feature" << "\n\n";
+ //m_ServiceOutputFile << "\n# Pause the feature" << "\n\n";
+#endif /* I2_DEBUG */
+
+ /* Force a rotation closing the file stream. */
+ RotateAllFiles();
+
+ Log(LogInformation, "PerfdataWriter")
+ << "'" << GetName() << "' paused.";
+
+ ObjectImpl<PerfdataWriter>::Pause();
+}
+
+Value PerfdataWriter::EscapeMacroMetric(const Value& value)
+{
+ if (value.IsObjectType<Array>())
+ return Utility::Join(value, ';');
+ else
+ return value;
+}
+
+void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ CONTEXT("Writing performance data for object '" << checkable->GetName() << "'");
+
+ if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
+ return;
+
+ Service::Ptr service = dynamic_pointer_cast<Service>(checkable);
+ Host::Ptr host;
+
+ if (service)
+ host = service->GetHost();
+ else
+ host = static_pointer_cast<Host>(checkable);
+
+ MacroProcessor::ResolverList resolvers;
+ if (service)
+ resolvers.emplace_back("service", service);
+ resolvers.emplace_back("host", host);
+
+ if (service) {
+ String line = MacroProcessor::ResolveMacros(GetServiceFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric);
+
+ {
+ std::unique_lock<std::mutex> lock(m_StreamMutex);
+
+ if (!m_ServiceOutputFile.good())
+ return;
+
+ m_ServiceOutputFile << line << "\n";
+ }
+ } else {
+ String line = MacroProcessor::ResolveMacros(GetHostFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric);
+
+ {
+ std::unique_lock<std::mutex> lock(m_StreamMutex);
+
+ if (!m_HostOutputFile.good())
+ return;
+
+ m_HostOutputFile << line << "\n";
+ }
+ }
+}
+
+void PerfdataWriter::RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path)
+{
+ Log(LogDebug, "PerfdataWriter")
+ << "Rotating perfdata files.";
+
+ std::unique_lock<std::mutex> lock(m_StreamMutex);
+
+ if (output.good()) {
+ output.close();
+
+ if (Utility::PathExists(temp_path)) {
+ String finalFile = perfdata_path + "." + Convert::ToString((long)Utility::GetTime());
+
+ Log(LogDebug, "PerfdataWriter")
+ << "Closed output file and renaming into '" << finalFile << "'.";
+
+ Utility::RenameFile(temp_path, finalFile);
+ }
+ }
+
+ output.open(temp_path.CStr());
+
+ if (!output.good()) {
+ Log(LogWarning, "PerfdataWriter")
+ << "Could not open perfdata file '" << temp_path << "' for writing. Perfdata will be lost.";
+ }
+}
+
+void PerfdataWriter::RotationTimerHandler()
+{
+ if (IsPaused())
+ return;
+
+ RotateAllFiles();
+}
+
+void PerfdataWriter::RotateAllFiles()
+{
+ RotateFile(m_ServiceOutputFile, GetServiceTempPath(), GetServicePerfdataPath());
+ RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath());
+}
+
+void PerfdataWriter::ValidateHostFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl<PerfdataWriter>::ValidateHostFormatTemplate(lvalue, utils);
+
+ if (!MacroProcessor::ValidateMacroString(lvalue()))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_format_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
+}
+
+void PerfdataWriter::ValidateServiceFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl<PerfdataWriter>::ValidateServiceFormatTemplate(lvalue, utils);
+
+ if (!MacroProcessor::ValidateMacroString(lvalue()))
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "service_format_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
+}
diff --git a/lib/perfdata/perfdatawriter.hpp b/lib/perfdata/perfdatawriter.hpp
new file mode 100644
index 0000000..961d4e9
--- /dev/null
+++ b/lib/perfdata/perfdatawriter.hpp
@@ -0,0 +1,53 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef PERFDATAWRITER_H
+#define PERFDATAWRITER_H
+
+#include "perfdata/perfdatawriter-ti.hpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/timer.hpp"
+#include <fstream>
+
+namespace icinga
+{
+
+/**
+ * An Icinga perfdata writer.
+ *
+ * @ingroup icinga
+ */
+class PerfdataWriter final : public ObjectImpl<PerfdataWriter>
+{
+public:
+ DECLARE_OBJECT(PerfdataWriter);
+ DECLARE_OBJECTNAME(PerfdataWriter);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+ void ValidateHostFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override;
+ void ValidateServiceFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override;
+
+protected:
+ void OnConfigLoaded() override;
+ void Resume() override;
+ void Pause() override;
+
+private:
+ boost::signals2::connection m_HandleCheckResults;
+ Timer::Ptr m_RotationTimer;
+ std::ofstream m_ServiceOutputFile;
+ std::ofstream m_HostOutputFile;
+ std::mutex m_StreamMutex;
+
+ void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ static Value EscapeMacroMetric(const Value& value);
+
+ void RotationTimerHandler();
+ void RotateAllFiles();
+ void RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path);
+};
+
+}
+
+#endif /* PERFDATAWRITER_H */
diff --git a/lib/perfdata/perfdatawriter.ti b/lib/perfdata/perfdatawriter.ti
new file mode 100644
index 0000000..d6d99e8
--- /dev/null
+++ b/lib/perfdata/perfdatawriter.ti
@@ -0,0 +1,61 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "base/configobject.hpp"
+#include "base/application.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class PerfdataWriter : ConfigObject
+{
+ activation_priority 100;
+
+ [config] String host_perfdata_path {
+ default {{{ return Configuration::SpoolDir + "/perfdata/host-perfdata"; }}}
+ };
+ [config] String service_perfdata_path {
+ default {{{ return Configuration::SpoolDir + "/perfdata/service-perfdata"; }}}
+ };
+ [config] String host_temp_path {
+ default {{{ return Configuration::SpoolDir + "/tmp/host-perfdata"; }}}
+ };
+ [config] String service_temp_path {
+ default {{{ return Configuration::SpoolDir + "/tmp/service-perfdata"; }}}
+ };
+ [config] String host_format_template {
+ default {{{
+ return "DATATYPE::HOSTPERFDATA\t"
+ "TIMET::$host.last_check$\t"
+ "HOSTNAME::$host.name$\t"
+ "HOSTPERFDATA::$host.perfdata$\t"
+ "HOSTCHECKCOMMAND::$host.check_command$\t"
+ "HOSTSTATE::$host.state$\t"
+ "HOSTSTATETYPE::$host.state_type$";
+ }}}
+ };
+ [config] String service_format_template {
+ default {{{
+ return "DATATYPE::SERVICEPERFDATA\t"
+ "TIMET::$service.last_check$\t"
+ "HOSTNAME::$host.name$\t"
+ "SERVICEDESC::$service.name$\t"
+ "SERVICEPERFDATA::$service.perfdata$\t"
+ "SERVICECHECKCOMMAND::$service.check_command$\t"
+ "HOSTSTATE::$host.state$\t"
+ "HOSTSTATETYPE::$host.state_type$\t"
+ "SERVICESTATE::$service.state$\t"
+ "SERVICESTATETYPE::$service.state_type$";
+ }}}
+ };
+
+ [config] double rotation_interval {
+ default {{{ return 30; }}}
+ };
+ [config] bool enable_ha {
+ default {{{ return false; }}}
+ };
+};
+
+}