diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
commit | 56ae875861ab260b80a030f50c4aff9f9dc8fff0 (patch) | |
tree | 531412110fc901a5918c7f7442202804a83cada9 /lib/perfdata | |
parent | Initial commit. (diff) | |
download | icinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.tar.xz icinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.zip |
Adding upstream version 2.14.2.upstream/2.14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/perfdata')
25 files changed, 4105 insertions, 0 deletions
diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt new file mode 100644 index 0000000..168938c --- /dev/null +++ b/lib/perfdata/CMakeLists.txt @@ -0,0 +1,74 @@ +# Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ + +mkclass_target(gelfwriter.ti gelfwriter-ti.cpp gelfwriter-ti.hpp) +mkclass_target(graphitewriter.ti graphitewriter-ti.cpp graphitewriter-ti.hpp) +mkclass_target(influxdbcommonwriter.ti influxdbcommonwriter-ti.cpp influxdbcommonwriter-ti.hpp) +mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp) +mkclass_target(influxdb2writer.ti influxdb2writer-ti.cpp influxdb2writer-ti.hpp) +mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp) +mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp) +mkclass_target(perfdatawriter.ti perfdatawriter-ti.cpp perfdatawriter-ti.hpp) + +set(perfdata_SOURCES + elasticsearchwriter.cpp elasticsearchwriter.hpp elasticsearchwriter-ti.hpp + gelfwriter.cpp gelfwriter.hpp gelfwriter-ti.hpp + graphitewriter.cpp graphitewriter.hpp graphitewriter-ti.hpp + influxdbcommonwriter.cpp influxdbcommonwriter.hpp influxdbcommonwriter-ti.hpp + influxdbwriter.cpp influxdbwriter.hpp influxdbwriter-ti.hpp + influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp + opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp + perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp +) + +if(ICINGA2_UNITY_BUILD) + mkunity_target(perfdata perfdata perfdata_SOURCES) +endif() + +add_library(perfdata OBJECT ${perfdata_SOURCES}) + +add_dependencies(perfdata base config icinga) + +set_target_properties ( + perfdata PROPERTIES + FOLDER Components +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/gelf.conf + ${ICINGA2_CONFIGDIR}/features-available +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/graphite.conf + ${ICINGA2_CONFIGDIR}/features-available +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb.conf + ${ICINGA2_CONFIGDIR}/features-available +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb2.conf + ${ICINGA2_CONFIGDIR}/features-available +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearch.conf + ${ICINGA2_CONFIGDIR}/features-available +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf + ${ICINGA2_CONFIGDIR}/features-available +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/perfdata.conf + ${ICINGA2_CONFIGDIR}/features-available +) + +install(CODE "file(MAKE_DIRECTORY \"\$ENV{DESTDIR}${ICINGA2_FULL_SPOOLDIR}/perfdata\")") +install(CODE "file(MAKE_DIRECTORY \"\$ENV{DESTDIR}${ICINGA2_FULL_SPOOLDIR}/tmp\")") + +set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "${CPACK_NSIS_EXTRA_INSTALL_COMMANDS}" PARENT_SCOPE) diff --git a/lib/perfdata/elasticsearchwriter.cpp b/lib/perfdata/elasticsearchwriter.cpp new file mode 100644 index 0000000..9fb2aa9 --- /dev/null +++ b/lib/perfdata/elasticsearchwriter.cpp @@ -0,0 +1,685 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "perfdata/elasticsearchwriter.hpp" +#include "perfdata/elasticsearchwriter-ti.cpp" +#include "remote/url.hpp" +#include "icinga/compatutility.hpp" +#include "icinga/service.hpp" +#include "icinga/checkcommand.hpp" +#include "base/application.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/tcpsocket.hpp" +#include "base/stream.hpp" +#include "base/base64.hpp" +#include "base/json.hpp" +#include "base/utility.hpp" +#include "base/networkstream.hpp" +#include "base/perfdatavalue.hpp" +#include "base/exception.hpp" +#include "base/statsfunction.hpp" +#include <boost/algorithm/string.hpp> +#include <boost/asio/ssl/context.hpp> +#include <boost/beast/core/flat_buffer.hpp> +#include <boost/beast/http/field.hpp> +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/parser.hpp> +#include <boost/beast/http/read.hpp> +#include <boost/beast/http/status.hpp> +#include <boost/beast/http/string_body.hpp> +#include <boost/beast/http/verb.hpp> +#include <boost/beast/http/write.hpp> +#include <boost/scoped_array.hpp> +#include <memory> +#include <string> +#include <utility> + +using namespace icinga; + +REGISTER_TYPE(ElasticsearchWriter); + +REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc); + +void ElasticsearchWriter::OnConfigLoaded() +{ + ObjectImpl<ElasticsearchWriter>::OnConfigLoaded(); + + m_WorkQueue.SetName("ElasticsearchWriter, " + GetName()); + + if (!GetEnableHa()) { + Log(LogDebug, "ElasticsearchWriter") + << "HA functionality disabled. Won't pause connection: " << GetName(); + + SetHAMode(HARunEverywhere); + } else { + SetHAMode(HARunOnce); + } +} + +void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + DictionaryData nodes; + + for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) { + size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + nodes.emplace_back(elasticsearchwriter->GetName(), new Dictionary({ + { "work_queue_items", workQueueItems }, + { "work_queue_item_rate", workQueueItemRate } + })); + + perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + } + + status->Set("elasticsearchwriter", new Dictionary(std::move(nodes))); +} + +void ElasticsearchWriter::Resume() +{ + ObjectImpl<ElasticsearchWriter>::Resume(); + + m_EventPrefix = "icinga2.event."; + + Log(LogInformation, "ElasticsearchWriter") + << "'" << GetName() << "' resumed."; + + m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = Timer::Create(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); }); + m_FlushTimer->Start(); + m_FlushTimer->Reschedule(0); + + /* Register for new metrics. */ + m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, + const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { + CheckResultHandler(checkable, cr); + }); + m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, + const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) { + StateChangeHandler(checkable, cr, type); + }); + m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, const NotificationType& type, + const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr&) { + NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text); + }); +} + +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ +void ElasticsearchWriter::Pause() +{ + m_HandleCheckResults.disconnect(); + m_HandleStateChanges.disconnect(); + m_HandleNotifications.disconnect(); + + m_FlushTimer->Stop(true); + m_WorkQueue.Join(); + + { + std::unique_lock<std::mutex> lock (m_DataBufferMutex); + Flush(); + } + + Log(LogInformation, "ElasticsearchWriter") + << "'" << GetName() << "' paused."; + + ObjectImpl<ElasticsearchWriter>::Pause(); +} + +void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + String prefix = "check_result."; + + fields->Set(prefix + "output", cr->GetOutput()); + fields->Set(prefix + "check_source", cr->GetCheckSource()); + fields->Set(prefix + "exit_status", cr->GetExitStatus()); + fields->Set(prefix + "command", cr->GetCommand()); + fields->Set(prefix + "state", cr->GetState()); + fields->Set(prefix + "vars_before", cr->GetVarsBefore()); + fields->Set(prefix + "vars_after", cr->GetVarsAfter()); + + fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart())); + fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd())); + fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart())); + fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd())); + + /* Add extra calculated field. */ + fields->Set(prefix + "latency", cr->CalculateLatency()); + fields->Set(prefix + "execution_time", cr->CalculateExecutionTime()); + + if (!GetEnableSendPerfdata()) + return; + + Array::Ptr perfdata = cr->GetPerformanceData(); + + CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); + + if (perfdata) { + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType<PerfdataValue>()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "Ignoring invalid perfdata for checkable '" + << checkable->GetName() << "' and command '" + << checkCommand->GetName() << "' with value: " << val; + continue; + } + } + + String escapedKey = pdv->GetLabel(); + boost::replace_all(escapedKey, " ", "_"); + boost::replace_all(escapedKey, ".", "_"); + boost::replace_all(escapedKey, "\\", "_"); + boost::algorithm::replace_all(escapedKey, "::", "."); + + String perfdataPrefix = prefix + "perfdata." + escapedKey; + + fields->Set(perfdataPrefix + ".value", pdv->GetValue()); + + if (!pdv->GetMin().IsEmpty()) + fields->Set(perfdataPrefix + ".min", pdv->GetMin()); + if (!pdv->GetMax().IsEmpty()) + fields->Set(perfdataPrefix + ".max", pdv->GetMax()); + if (!pdv->GetWarn().IsEmpty()) + fields->Set(perfdataPrefix + ".warn", pdv->GetWarn()); + if (!pdv->GetCrit().IsEmpty()) + fields->Set(perfdataPrefix + ".crit", pdv->GetCrit()); + + if (!pdv->GetUnit().IsEmpty()) + fields->Set(perfdataPrefix + ".unit", pdv->GetUnit()); + } + } +} + +void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, checkable, cr]() { InternalCheckResultHandler(checkable, cr); }); +} + +void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + + CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'"); + + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + if (service) { + fields->Set("service", service->GetShortName()); + fields->Set("state", service->GetState()); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("state", host->GetState()); + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } + + fields->Set("host", host->GetName()); + fields->Set("state_type", checkable->GetStateType()); + + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + + fields->Set("reachable", checkable->IsReachable()); + + CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); + + if (commandObj) + fields->Set("check_command", commandObj->GetName()); + + double ts = Utility::GetTime(); + + if (cr) { + AddCheckResult(fields, checkable, cr); + ts = cr->GetExecutionEnd(); + } + + Enqueue(checkable, "checkresult", fields, ts); +} + +void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); }); +} + +void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) +{ + AssertOnWorkQueue(); + + CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'"); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("host", host->GetName()); + + if (service) { + fields->Set("service", service->GetShortName()); + fields->Set("state", service->GetState()); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("state", host->GetState()); + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } + + CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); + + if (commandObj) + fields->Set("check_command", commandObj->GetName()); + + double ts = Utility::GetTime(); + + if (cr) { + AddCheckResult(fields, checkable, cr); + ts = cr->GetExecutionEnd(); + } + + Enqueue(checkable, "statechange", fields, ts); +} + +void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, notification, checkable, users, type, cr, author, text]() { + NotificationSentToAllUsersHandlerInternal(notification, checkable, users, type, cr, author, text); + }); +} + +void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text) +{ + AssertOnWorkQueue(); + + CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'"); + + Log(LogDebug, "ElasticsearchWriter") + << "Processing notification for '" << checkable->GetName() << "'"; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + String notificationTypeString = Notification::NotificationTypeToStringCompat(type); //TODO: Change that to our own types. + + Dictionary::Ptr fields = new Dictionary(); + + if (service) { + fields->Set("service", service->GetShortName()); + fields->Set("state", service->GetState()); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("state", host->GetState()); + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } + + fields->Set("host", host->GetName()); + + ArrayData userNames; + + for (const User::Ptr& user : users) { + userNames.push_back(user->GetName()); + } + + fields->Set("users", new Array(std::move(userNames))); + fields->Set("notification_type", notificationTypeString); + fields->Set("author", author); + fields->Set("text", text); + + CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); + + if (commandObj) + fields->Set("check_command", commandObj->GetName()); + + double ts = Utility::GetTime(); + + if (cr) { + AddCheckResult(fields, checkable, cr); + ts = cr->GetExecutionEnd(); + } + + Enqueue(checkable, "notification", fields, ts); +} + +void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type, + const Dictionary::Ptr& fields, double ts) +{ + /* Atomically buffer the data point. */ + std::unique_lock<std::mutex> lock(m_DataBufferMutex); + + /* Format the timestamps to dynamically select the date datatype inside the index. */ + fields->Set("@timestamp", FormatTimestamp(ts)); + fields->Set("timestamp", FormatTimestamp(ts)); + + String eventType = m_EventPrefix + type; + fields->Set("type", eventType); + + /* Every payload needs a line describing the index. + * We do it this way to avoid problems with a near full queue. + */ + String indexBody = "{\"index\": {} }\n"; + String fieldsBody = JsonEncode(fields); + + Log(LogDebug, "ElasticsearchWriter") + << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << fieldsBody << "'."; + + m_DataBuffer.emplace_back(indexBody + fieldsBody); + + /* Flush if we've buffered too much to prevent excessive memory use. */ + if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) { + Log(LogDebug, "ElasticsearchWriter") + << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; + Flush(); + } +} + +void ElasticsearchWriter::FlushTimeout() +{ + /* Prevent new data points from being added to the array, there is a + * race condition where they could disappear. + */ + std::unique_lock<std::mutex> lock(m_DataBufferMutex); + + /* Flush if there are any data available. */ + if (m_DataBuffer.size() > 0) { + Log(LogDebug, "ElasticsearchWriter") + << "Timer expired writing " << m_DataBuffer.size() << " data points"; + Flush(); + } +} + +void ElasticsearchWriter::Flush() +{ + /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */ + if (m_DataBuffer.empty()) + return; + + /* Ensure you hold a lock against m_DataBuffer so that things + * don't go missing after creating the body and clearing the buffer. + */ + String body = boost::algorithm::join(m_DataBuffer, "\n"); + m_DataBuffer.clear(); + + /* Elasticsearch 6.x requires a new line. This is compatible to 5.x. + * Tested with 6.0.0 and 5.6.4. + */ + body += "\n"; + + SendRequest(body); +} + +void ElasticsearchWriter::SendRequest(const String& body) +{ + namespace beast = boost::beast; + namespace http = beast::http; + + Url::Ptr url = new Url(); + + url->SetScheme(GetEnableTls() ? "https" : "http"); + url->SetHost(GetHost()); + url->SetPort(GetPort()); + + std::vector<String> path; + + /* Specify the index path. Best practice is a daily rotation. + * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1 + */ + path.emplace_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime())); + + /* Use the bulk message format. */ + path.emplace_back("_bulk"); + + url->SetPath(path); + + OptionalTlsStream stream; + + try { + stream = Connect(); + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticsearchWriter") + << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false); + return; + } + + Defer s ([&stream]() { + if (stream.first) { + stream.first->next_layer().shutdown(); + } + }); + + http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10); + + request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); + request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); + + /* Specify required headers by Elasticsearch. */ + request.set(http::field::accept, "application/json"); + + /* Use application/x-ndjson for bulk streams. While ES + * is able to handle application/json, the newline separator + * causes problems with Logstash (#6609). + */ + request.set(http::field::content_type, "application/x-ndjson"); + + /* Send authentication if configured. */ + String username = GetUsername(); + String password = GetPassword(); + + if (!username.IsEmpty() && !password.IsEmpty()) + request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password)); + + request.body() = body; + request.content_length(request.body().size()); + + /* Don't log the request body to debug log, this is already done above. */ + Log(LogDebug, "ElasticsearchWriter") + << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) + << " to '" << url->Format() << "'."; + + try { + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + } else { + http::write(*stream.second, request); + stream.second->flush(); + } + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + http::parser<false, http::string_body> parser; + beast::flat_buffer buf; + + try { + if (stream.first) { + http::read(*stream.first, buf, parser); + } else { + http::read(*stream.second, buf, parser); + } + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticsearchWriter") + << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false); + throw; + } + + auto& response (parser.get()); + + if (response.result_int() > 299) { + if (response.result() == http::status::unauthorized) { + /* More verbose error logging with Elasticsearch is hidden behind a proxy. */ + if (!username.IsEmpty() && !password.IsEmpty()) { + Log(LogCritical, "ElasticsearchWriter") + << "401 Unauthorized. Please ensure that the user '" << username + << "' is able to authenticate against the HTTP API/Proxy."; + } else { + Log(LogCritical, "ElasticsearchWriter") + << "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured."; + } + + return; + } + + std::ostringstream msgbuf; + msgbuf << "Unexpected response code " << response.result_int() << " from URL '" << url->Format() << "'"; + + auto& contentType (response[http::field::content_type]); + + if (contentType != "application/json" && contentType != "application/json; charset=utf-8") { + msgbuf << "; Unexpected Content-Type: '" << contentType << "'"; + } + + auto& body (response.body()); + +#ifdef I2_DEBUG + msgbuf << "; Response body: '" << body << "'"; +#endif /* I2_DEBUG */ + + Dictionary::Ptr jsonResponse; + + try { + jsonResponse = JsonDecode(body); + } catch (...) { + Log(LogWarning, "ElasticsearchWriter") + << "Unable to parse JSON response:\n" << body; + return; + } + + String error = jsonResponse->Get("error"); + + Log(LogCritical, "ElasticsearchWriter") + << "Error: '" << error << "'. " << msgbuf.str(); + } +} + +OptionalTlsStream ElasticsearchWriter::Connect() +{ + Log(LogNotice, "ElasticsearchWriter") + << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + + OptionalTlsStream stream; + bool tls = GetEnableTls(); + + if (tls) { + Shared<boost::asio::ssl::context>::Ptr sslContext; + + try { + sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "Unable to create SSL context."; + throw; + } + + stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); + + } else { + stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext()); + } + + try { + icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (tls) { + auto& tlsStream (stream.first->next_layer()); + + try { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed."; + throw; + } + + if (!GetInsecureNoverify()) { + if (!tlsStream.GetPeerCertificate()) { + BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate.")); + } + + if (!tlsStream.IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) + )); + } + } + } + + return stream; +} + +void ElasticsearchWriter::AssertOnWorkQueue() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!"); + + Log(LogDebug, "ElasticsearchWriter") + << "Exception during Elasticsearch operation: " << DiagnosticInformation(std::move(exp)); +} + +String ElasticsearchWriter::FormatTimestamp(double ts) +{ + /* The date format must match the default dynamic date detection + * pattern in indexes. This enables applications like Kibana to + * detect a qualified timestamp index for time-series data. + * + * Example: 2017-09-11T10:56:21.463+0200 + * + * References: + * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection + * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html + * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html + */ + auto milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000); + + return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts); +} diff --git a/lib/perfdata/elasticsearchwriter.hpp b/lib/perfdata/elasticsearchwriter.hpp new file mode 100644 index 0000000..a988094 --- /dev/null +++ b/lib/perfdata/elasticsearchwriter.hpp @@ -0,0 +1,65 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef ELASTICSEARCHWRITER_H +#define ELASTICSEARCHWRITER_H + +#include "perfdata/elasticsearchwriter-ti.hpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/workqueue.hpp" +#include "base/timer.hpp" +#include "base/tlsstream.hpp" + +namespace icinga +{ + +class ElasticsearchWriter final : public ObjectImpl<ElasticsearchWriter> +{ +public: + DECLARE_OBJECT(ElasticsearchWriter); + DECLARE_OBJECTNAME(ElasticsearchWriter); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + + static String FormatTimestamp(double ts); + +protected: + void OnConfigLoaded() override; + void Resume() override; + void Pause() override; + +private: + String m_EventPrefix; + WorkQueue m_WorkQueue{10000000, 1}; + boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications; + Timer::Ptr m_FlushTimer; + std::vector<String> m_DataBuffer; + std::mutex m_DataBufferMutex; + + void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + + void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void NotificationSentToAllUsersHandler(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text); + void NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text); + + void Enqueue(const Checkable::Ptr& checkable, const String& type, + const Dictionary::Ptr& fields, double ts); + + OptionalTlsStream Connect(); + void AssertOnWorkQueue(); + void ExceptionHandler(boost::exception_ptr exp); + void FlushTimeout(); + void Flush(); + void SendRequest(const String& body); +}; + +} + +#endif /* ELASTICSEARCHWRITER_H */ diff --git a/lib/perfdata/elasticsearchwriter.ti b/lib/perfdata/elasticsearchwriter.ti new file mode 100644 index 0000000..e3b8e27 --- /dev/null +++ b/lib/perfdata/elasticsearchwriter.ti @@ -0,0 +1,50 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +class ElasticsearchWriter : ConfigObject +{ + activation_priority 100; + + [config, required] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config, required] String port { + default {{{ return "9200"; }}} + }; + [config, required] String index { + default {{{ return "icinga2"; }}} + }; + [config] bool enable_send_perfdata { + default {{{ return false; }}} + }; + [config] String username; + [config, no_user_view, no_user_modify] String password; + + [config] bool enable_tls { + default {{{ return false; }}} + }; + [config] bool insecure_noverify { + default {{{ return false; }}} + }; + [config] String ca_path; + [config] String cert_path; + [config] String key_path; + + [config] int flush_interval { + default {{{ return 10; }}} + }; + [config] int flush_threshold { + default {{{ return 1024; }}} + }; + [config] bool enable_ha { + default {{{ return false; }}} + }; +}; + +} diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp new file mode 100644 index 0000000..c5b2bbd --- /dev/null +++ b/lib/perfdata/gelfwriter.cpp @@ -0,0 +1,535 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "perfdata/gelfwriter.hpp" +#include "perfdata/gelfwriter-ti.cpp" +#include "icinga/service.hpp" +#include "icinga/notification.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/compatutility.hpp" +#include "base/tcpsocket.hpp" +#include "base/configtype.hpp" +#include "base/objectlock.hpp" +#include "base/logger.hpp" +#include "base/utility.hpp" +#include "base/perfdatavalue.hpp" +#include "base/application.hpp" +#include "base/stream.hpp" +#include "base/networkstream.hpp" +#include "base/context.hpp" +#include "base/exception.hpp" +#include "base/json.hpp" +#include "base/statsfunction.hpp" +#include <boost/algorithm/string/replace.hpp> +#include <utility> +#include "base/io-engine.hpp" +#include <boost/asio/write.hpp> +#include <boost/asio/buffer.hpp> +#include <boost/system/error_code.hpp> +#include <boost/asio/error.hpp> + +using namespace icinga; + +REGISTER_TYPE(GelfWriter); + +REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc); + +void GelfWriter::OnConfigLoaded() +{ + ObjectImpl<GelfWriter>::OnConfigLoaded(); + + m_WorkQueue.SetName("GelfWriter, " + GetName()); + + if (!GetEnableHa()) { + Log(LogDebug, "GelfWriter") + << "HA functionality disabled. Won't pause connection: " << GetName(); + + SetHAMode(HARunEverywhere); + } else { + SetHAMode(HARunOnce); + } +} + +void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + DictionaryData nodes; + + for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) { + size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + nodes.emplace_back(gelfwriter->GetName(), new Dictionary({ + { "work_queue_items", workQueueItems }, + { "work_queue_item_rate", workQueueItemRate }, + { "connected", gelfwriter->GetConnected() }, + { "source", gelfwriter->GetSource() } + })); + + perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + } + + status->Set("gelfwriter", new Dictionary(std::move(nodes))); +} + +void GelfWriter::Resume() +{ + ObjectImpl<GelfWriter>::Resume(); + + Log(LogInformation, "GelfWriter") + << "'" << GetName() << "' resumed."; + + /* Register exception handler for WQ tasks. */ + m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + + /* Timer for reconnecting */ + m_ReconnectTimer = Timer::Create(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); + + /* Register event handlers. */ + m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, + const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { + CheckResultHandler(checkable, cr); + }); + m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr, + const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) { + NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName); + }); + m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, + const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) { + StateChangeHandler(checkable, cr, type); + }); +} + +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ +void GelfWriter::Pause() +{ + m_HandleCheckResults.disconnect(); + m_HandleNotifications.disconnect(); + m_HandleStateChanges.disconnect(); + + m_ReconnectTimer->Stop(true); + + m_WorkQueue.Enqueue([this]() { + try { + ReconnectInternal(); + } catch (const std::exception&) { + Log(LogInformation, "GelfWriter") + << "Unable to connect, not flushing buffers. Data may be lost."; + } + }, PriorityImmediate); + + m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow); + m_WorkQueue.Join(); + + Log(LogInformation, "GelfWriter") + << "'" << GetName() << "' paused."; + + ObjectImpl<GelfWriter>::Pause(); +} + +void GelfWriter::AssertOnWorkQueue() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void GelfWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false); + Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true); + + DisconnectInternal(); +} + +void GelfWriter::Reconnect() +{ + AssertOnWorkQueue(); + + if (IsPaused()) { + SetConnected(false); + return; + } + + ReconnectInternal(); +} + +void GelfWriter::ReconnectInternal() +{ + double startTime = Utility::GetTime(); + + CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'"); + + SetShouldConnect(true); + + if (GetConnected()) + return; + + Log(LogNotice, "GelfWriter") + << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; + + bool ssl = GetEnableTls(); + + if (ssl) { + Shared<boost::asio::ssl::context>::Ptr sslContext; + + try { + sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "Unable to create SSL context."; + throw; + } + + m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); + + } else { + m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext()); + } + + try { + icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'"; + throw; + } + + if (ssl) { + auto& tlsStream (m_Stream.first->next_layer()); + + try { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "TLS handshake with host '" << GetHost() << " failed.'"; + throw; + } + + if (!GetInsecureNoverify()) { + if (!tlsStream.GetPeerCertificate()) { + BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate.")); + } + + if (!tlsStream.IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) + )); + } + } + } + + SetConnected(true); + + Log(LogInformation, "GelfWriter") + << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; +} + +void GelfWriter::ReconnectTimerHandler() +{ + m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal); +} + +void GelfWriter::Disconnect() +{ + AssertOnWorkQueue(); + + DisconnectInternal(); +} + +void GelfWriter::DisconnectInternal() +{ + if (!GetConnected()) + return; + + if (m_Stream.first) { + boost::system::error_code ec; + m_Stream.first->next_layer().shutdown(ec); + + // https://stackoverflow.com/a/25703699 + // As long as the error code's category is not an SSL category, then the protocol was securely shutdown + if (ec.category() == boost::asio::error::get_ssl_category()) { + Log(LogCritical, "GelfWriter") + << "TLS shutdown with host '" << GetHost() << "' could not be done securely."; + } + } else if (m_Stream.second) { + m_Stream.second->close(); + } + + SetConnected(false); + +} + +void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); }); +} + +void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + + CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'"); + + Log(LogDebug, "GelfWriter") + << "Processing check result for '" << checkable->GetName() << "'"; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + if (service) { + fields->Set("_service_name", service->GetShortName()); + fields->Set("_service_state", Service::StateToString(service->GetState())); + fields->Set("_last_state", service->GetLastState()); + fields->Set("_last_hard_state", service->GetLastHardState()); + } else { + fields->Set("_last_state", host->GetLastState()); + fields->Set("_last_hard_state", host->GetLastHardState()); + } + + fields->Set("_hostname", host->GetName()); + fields->Set("_type", "CHECK RESULT"); + fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + + fields->Set("_current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts()); + + fields->Set("_reachable", checkable->IsReachable()); + + CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); + + if (checkCommand) + fields->Set("_check_command", checkCommand->GetName()); + + double ts = Utility::GetTime(); + + if (cr) { + fields->Set("_latency", cr->CalculateLatency()); + fields->Set("_execution_time", cr->CalculateExecutionTime()); + fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); + fields->Set("full_message", cr->GetOutput()); + fields->Set("_check_source", cr->GetCheckSource()); + ts = cr->GetExecutionEnd(); + } + + if (cr && GetEnableSendPerfdata()) { + Array::Ptr perfdata = cr->GetPerformanceData(); + + if (perfdata) { + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType<PerfdataValue>()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, "GelfWriter") + << "Ignoring invalid perfdata for checkable '" + << checkable->GetName() << "' and command '" + << checkCommand->GetName() << "' with value: " << val; + continue; + } + } + + String escaped_key = pdv->GetLabel(); + boost::replace_all(escaped_key, " ", "_"); + boost::replace_all(escaped_key, ".", "_"); + boost::replace_all(escaped_key, "\\", "_"); + boost::algorithm::replace_all(escaped_key, "::", "."); + + fields->Set("_" + escaped_key, pdv->GetValue()); + + if (!pdv->GetMin().IsEmpty()) + fields->Set("_" + escaped_key + "_min", pdv->GetMin()); + if (!pdv->GetMax().IsEmpty()) + fields->Set("_" + escaped_key + "_max", pdv->GetMax()); + if (!pdv->GetWarn().IsEmpty()) + fields->Set("_" + escaped_key + "_warn", pdv->GetWarn()); + if (!pdv->GetCrit().IsEmpty()) + fields->Set("_" + escaped_key + "_crit", pdv->GetCrit()); + + if (!pdv->GetUnit().IsEmpty()) + fields->Set("_" + escaped_key + "_unit", pdv->GetUnit()); + } + } + } + + SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts)); +} + +void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr, + const String& author, const String& commentText, const String& commandName) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() { + NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName); + }); +} + +void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr, + const String& author, const String& commentText, const String& commandName) +{ + AssertOnWorkQueue(); + + CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'"); + + Log(LogDebug, "GelfWriter") + << "Processing notification for '" << checkable->GetName() << "'"; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types. + + String authorComment = ""; + + if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) { + authorComment = author + ";" + commentText; + } + + String output; + double ts = Utility::GetTime(); + + if (cr) { + output = CompatUtility::GetCheckResultOutput(cr); + ts = cr->GetExecutionEnd(); + } + + Dictionary::Ptr fields = new Dictionary(); + + if (service) { + fields->Set("_type", "SERVICE NOTIFICATION"); + //TODO: fix this to _service_name + fields->Set("_service", service->GetShortName()); + fields->Set("short_message", output); + } else { + fields->Set("_type", "HOST NOTIFICATION"); + fields->Set("short_message", output); + } + + fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + + fields->Set("_hostname", host->GetName()); + fields->Set("_command", commandName); + fields->Set("_notification_type", notificationTypeString); + fields->Set("_comment", authorComment); + + CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); + + if (commandObj) + fields->Set("_check_command", commandObj->GetName()); + + SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts)); +} + +void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); }); +} + +void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) +{ + AssertOnWorkQueue(); + + CONTEXT("GELF Processing state change '" << checkable->GetName() << "'"); + + Log(LogDebug, "GelfWriter") + << "Processing state change for '" << checkable->GetName() << "'"; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); + fields->Set("_type", "STATE CHANGE"); + fields->Set("_current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("_hostname", host->GetName()); + + if (service) { + fields->Set("_service_name", service->GetShortName()); + fields->Set("_service_state", Service::StateToString(service->GetState())); + fields->Set("_last_state", service->GetLastState()); + fields->Set("_last_hard_state", service->GetLastHardState()); + } else { + fields->Set("_last_state", host->GetLastState()); + fields->Set("_last_hard_state", host->GetLastHardState()); + } + + CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); + + if (commandObj) + fields->Set("_check_command", commandObj->GetName()); + + double ts = Utility::GetTime(); + + if (cr) { + fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); + fields->Set("full_message", cr->GetOutput()); + fields->Set("_check_source", cr->GetCheckSource()); + ts = cr->GetExecutionEnd(); + } + + SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts)); +} + +String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts) +{ + fields->Set("version", "1.1"); + fields->Set("host", source); + fields->Set("timestamp", ts); + + return JsonEncode(fields); +} + +void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage) +{ + std::ostringstream msgbuf; + msgbuf << gelfMessage; + msgbuf << '\0'; + + String log = msgbuf.str(); + + if (!GetConnected()) + return; + + try { + Log(LogDebug, "GelfWriter") + << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'."; + + if (m_Stream.first) { + boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str())); + m_Stream.first->flush(); + } else { + boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str())); + m_Stream.second->flush(); + } + } catch (const std::exception& ex) { + Log(LogCritical, "GelfWriter") + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + + throw ex; + } +} diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp new file mode 100644 index 0000000..ce9ee35 --- /dev/null +++ b/lib/perfdata/gelfwriter.hpp @@ -0,0 +1,70 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef GELFWRITER_H +#define GELFWRITER_H + +#include "perfdata/gelfwriter-ti.hpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" +#include "base/workqueue.hpp" +#include <fstream> + +namespace icinga +{ + +/** + * An Icinga Gelf writer for Graylog. + * + * @ingroup perfdata + */ +class GelfWriter final : public ObjectImpl<GelfWriter> +{ +public: + DECLARE_OBJECT(GelfWriter); + DECLARE_OBJECTNAME(GelfWriter); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + +protected: + void OnConfigLoaded() override; + void Resume() override; + void Pause() override; + +private: + OptionalTlsStream m_Stream; + WorkQueue m_WorkQueue{10000000, 1}; + + boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges; + Timer::Ptr m_ReconnectTimer; + + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, + const String& author, const String& commentText, const String& commandName); + void NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const User::Ptr& user, NotificationType notification_type, const CheckResult::Ptr& cr, + const String& author, const String& comment_text, const String& command_name); + void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + + String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts); + void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage); + + void ReconnectTimerHandler(); + + void Disconnect(); + void DisconnectInternal(); + void Reconnect(); + void ReconnectInternal(); + + void AssertOnWorkQueue(); + + void ExceptionHandler(boost::exception_ptr exp); +}; + +} + +#endif /* GELFWRITER_H */ diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti new file mode 100644 index 0000000..387ee14 --- /dev/null +++ b/lib/perfdata/gelfwriter.ti @@ -0,0 +1,45 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +class GelfWriter : ConfigObject +{ + activation_priority 100; + + [config] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config] String port { + default {{{ return "12201"; }}} + }; + [config] String source { + default {{{ return "icinga2"; }}} + }; + [config] bool enable_send_perfdata { + default {{{ return false; }}} + }; + + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; + [config] bool enable_ha { + default {{{ return false; }}} + }; + [config] bool enable_tls { + default {{{ return false; }}} + }; + [config] bool insecure_noverify { + default {{{ return false; }}} + }; + [config] String ca_path; + [config] String cert_path; + [config] String key_path; +}; + +} diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp new file mode 100644 index 0000000..6adae02 --- /dev/null +++ b/lib/perfdata/graphitewriter.cpp @@ -0,0 +1,514 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "perfdata/graphitewriter.hpp" +#include "perfdata/graphitewriter-ti.cpp" +#include "icinga/service.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/icingaapplication.hpp" +#include "base/tcpsocket.hpp" +#include "base/configtype.hpp" +#include "base/objectlock.hpp" +#include "base/logger.hpp" +#include "base/convert.hpp" +#include "base/utility.hpp" +#include "base/perfdatavalue.hpp" +#include "base/application.hpp" +#include "base/stream.hpp" +#include "base/networkstream.hpp" +#include "base/exception.hpp" +#include "base/statsfunction.hpp" +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/replace.hpp> +#include <utility> + +using namespace icinga; + +REGISTER_TYPE(GraphiteWriter); + +REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc); + +/* + * Enable HA capabilities once the config object is loaded. + */ +void GraphiteWriter::OnConfigLoaded() +{ + ObjectImpl<GraphiteWriter>::OnConfigLoaded(); + + m_WorkQueue.SetName("GraphiteWriter, " + GetName()); + + if (!GetEnableHa()) { + Log(LogDebug, "GraphiteWriter") + << "HA functionality disabled. Won't pause connection: " << GetName(); + + SetHAMode(HARunEverywhere); + } else { + SetHAMode(HARunOnce); + } +} + +/** + * Feature stats interface + * + * @param status Key value pairs for feature stats + * @param perfdata Array of PerfdataValue objects + */ +void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + DictionaryData nodes; + + for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) { + size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength(); + double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + nodes.emplace_back(graphitewriter->GetName(), new Dictionary({ + { "work_queue_items", workQueueItems }, + { "work_queue_item_rate", workQueueItemRate }, + { "connected", graphitewriter->GetConnected() } + })); + + perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + } + + status->Set("graphitewriter", new Dictionary(std::move(nodes))); +} + +/** + * Resume is equivalent to Start, but with HA capabilities to resume at runtime. + */ +void GraphiteWriter::Resume() +{ + ObjectImpl<GraphiteWriter>::Resume(); + + Log(LogInformation, "GraphiteWriter") + << "'" << GetName() << "' resumed."; + + /* Register exception handler for WQ tasks. */ + m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + + /* Timer for reconnecting */ + m_ReconnectTimer = Timer::Create(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); + + /* Register event handlers. */ + m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, + const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { + CheckResultHandler(checkable, cr); + }); +} + +/** + * Pause is equivalent to Stop, but with HA capabilities to resume at runtime. + */ +void GraphiteWriter::Pause() +{ + m_HandleCheckResults.disconnect(); + m_ReconnectTimer->Stop(true); + + try { + ReconnectInternal(); + } catch (const std::exception&) { + Log(LogInformation, "GraphiteWriter") + << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload."; + + ObjectImpl<GraphiteWriter>::Pause(); + return; + } + + m_WorkQueue.Join(); + DisconnectInternal(); + + Log(LogInformation, "GraphiteWriter") + << "'" << GetName() << "' paused."; + + ObjectImpl<GraphiteWriter>::Pause(); +} + +/** + * Check if method is called inside the WQ thread. + */ +void GraphiteWriter::AssertOnWorkQueue() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +/** + * Exception handler for the WQ. + * + * Closes the connection if connected. + * + * @param exp Exception pointer + */ +void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!"); + + Log(LogDebug, "GraphiteWriter") + << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp)); + + if (GetConnected()) { + m_Stream->close(); + + SetConnected(false); + } +} + +/** + * Reconnect method, stops when the feature is paused in HA zones. + * + * Called inside the WQ. + */ +void GraphiteWriter::Reconnect() +{ + AssertOnWorkQueue(); + + if (IsPaused()) { + SetConnected(false); + return; + } + + ReconnectInternal(); +} + +/** + * Reconnect method, connects to a TCP Stream + */ +void GraphiteWriter::ReconnectInternal() +{ + double startTime = Utility::GetTime(); + + CONTEXT("Reconnecting to Graphite '" << GetName() << "'"); + + SetShouldConnect(true); + + if (GetConnected()) + return; + + Log(LogNotice, "GraphiteWriter") + << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; + + m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext()); + + try { + icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "GraphiteWriter") + << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'"; + + SetConnected(false); + + throw; + } + + SetConnected(true); + + Log(LogInformation, "GraphiteWriter") + << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; +} + +/** + * Reconnect handler called by the timer. + * + * Enqueues a reconnect task into the WQ. + */ +void GraphiteWriter::ReconnectTimerHandler() +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh); +} + +/** + * Disconnect the stream. + * + * Called inside the WQ. + */ +void GraphiteWriter::Disconnect() +{ + AssertOnWorkQueue(); + + DisconnectInternal(); +} + +/** + * Disconnect the stream. + * + * Called outside the WQ. + */ +void GraphiteWriter::DisconnectInternal() +{ + if (!GetConnected()) + return; + + m_Stream->close(); + + SetConnected(false); +} + +/** + * Check result event handler, checks whether feature is not paused in HA setups. + * + * @param checkable Host/Service object + * @param cr Check result including performance data + */ +void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); }); +} + +/** + * Check result event handler, prepares metadata and perfdata values and calls Send*() + * + * Called inside the WQ. + * + * @param checkable Host/Service object + * @param cr Check result including performance data + */ +void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); + + /* TODO: Deal with missing connection here. Needs refactoring + * into parsing the actual performance data and then putting it + * into a queue for re-inserting. */ + + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + MacroProcessor::ResolverList resolvers; + if (service) + resolvers.emplace_back("service", service); + resolvers.emplace_back("host", host); + + String prefix; + + if (service) { + prefix = MacroProcessor::ResolveMacros(GetServiceNameTemplate(), resolvers, cr, nullptr, [](const Value& value) -> Value { + return EscapeMacroMetric(value); + }); + } else { + prefix = MacroProcessor::ResolveMacros(GetHostNameTemplate(), resolvers, cr, nullptr, [](const Value& value) -> Value { + return EscapeMacroMetric(value); + }); + } + + String prefixPerfdata = prefix + ".perfdata"; + String prefixMetadata = prefix + ".metadata"; + + double ts = cr->GetExecutionEnd(); + + if (GetEnableSendMetadata()) { + if (service) { + SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts); + } else { + SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts); + } + + SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts); + SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts); + SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts); + SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts); + SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts); + SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts); + SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts); + SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts); + } + + SendPerfdata(checkable, prefixPerfdata, cr, ts); +} + +/** + * Parse performance data from check result and call SendMetric() + * + * @param checkable Host/service object + * @param prefix Metric prefix string + * @param cr Check result including performance data + * @param ts Timestamp when the check result was created + */ +void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts) +{ + Array::Ptr perfdata = cr->GetPerformanceData(); + + if (!perfdata) + return; + + CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); + + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType<PerfdataValue>()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, "GraphiteWriter") + << "Ignoring invalid perfdata for checkable '" + << checkable->GetName() << "' and command '" + << checkCommand->GetName() << "' with value: " << val; + continue; + } + } + + String escapedKey = EscapeMetricLabel(pdv->GetLabel()); + + SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts); + + if (GetEnableSendThresholds()) { + if (!pdv->GetCrit().IsEmpty()) + SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts); + if (!pdv->GetWarn().IsEmpty()) + SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts); + if (!pdv->GetMin().IsEmpty()) + SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts); + if (!pdv->GetMax().IsEmpty()) + SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts); + } + } +} + +/** + * Computes metric data and sends to Graphite + * + * @param checkable Host/service object + * @param prefix Computed metric prefix string + * @param name Metric name + * @param value Metric value + * @param ts Timestamp when the check result was created + */ +void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts) +{ + namespace asio = boost::asio; + + std::ostringstream msgbuf; + msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts); + + Log(LogDebug, "GraphiteWriter") + << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'."; + + // do not send \n to debug log + msgbuf << "\n"; + + std::unique_lock<std::mutex> lock(m_StreamMutex); + + if (!GetConnected()) + return; + + try { + asio::write(*m_Stream, asio::buffer(msgbuf.str())); + m_Stream->flush(); + } catch (const std::exception& ex) { + Log(LogCritical, "GraphiteWriter") + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + + throw ex; + } +} + +/** + * Escape metric tree elements + * + * Dots are not allowed, e.g. in host names + * + * @param str Metric part name + * @return Escape string + */ +String GraphiteWriter::EscapeMetric(const String& str) +{ + String result = str; + + //don't allow '.' in metric prefixes + boost::replace_all(result, " ", "_"); + boost::replace_all(result, ".", "_"); + boost::replace_all(result, "\\", "_"); + boost::replace_all(result, "/", "_"); + + return result; +} + +/** + * Escape metric label + * + * Dots are allowed - users can create trees from perfdata labels + * + * @param str Metric label name + * @return Escaped string + */ +String GraphiteWriter::EscapeMetricLabel(const String& str) +{ + String result = str; + + //allow to pass '.' in perfdata labels + boost::replace_all(result, " ", "_"); + boost::replace_all(result, "\\", "_"); + boost::replace_all(result, "/", "_"); + boost::replace_all(result, "::", "."); + + return result; +} + +/** + * Escape macro metrics found via host/service name templates + * + * @param value Array or string with macro metric names + * @return Escaped string. Arrays are joined with dots. + */ +Value GraphiteWriter::EscapeMacroMetric(const Value& value) +{ + if (value.IsObjectType<Array>()) { + Array::Ptr arr = value; + ArrayData result; + + ObjectLock olock(arr); + for (const Value& arg : arr) { + result.push_back(EscapeMetric(arg)); + } + + return Utility::Join(new Array(std::move(result)), '.'); + } else + return EscapeMetric(value); +} + +/** + * Validate the configuration setting 'host_name_template' + * + * @param lvalue String containing runtime macros. + * @param utils Helper, unused + */ +void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils); + + if (!MacroProcessor::ValidateMacroString(lvalue())) + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'.")); +} + +/** + * Validate the configuration setting 'service_name_template' + * + * @param lvalue String containing runtime macros. + * @param utils Helper, unused + */ +void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils); + + if (!MacroProcessor::ValidateMacroString(lvalue())) + BOOST_THROW_EXCEPTION(ValidationError(this, { "service_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'.")); +} diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp new file mode 100644 index 0000000..e0c8b78 --- /dev/null +++ b/lib/perfdata/graphitewriter.hpp @@ -0,0 +1,69 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef GRAPHITEWRITER_H +#define GRAPHITEWRITER_H + +#include "perfdata/graphitewriter-ti.hpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" +#include "base/workqueue.hpp" +#include <fstream> +#include <mutex> + +namespace icinga +{ + +/** + * An Icinga graphite writer. + * + * @ingroup perfdata + */ +class GraphiteWriter final : public ObjectImpl<GraphiteWriter> +{ +public: + DECLARE_OBJECT(GraphiteWriter); + DECLARE_OBJECTNAME(GraphiteWriter); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + + void ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override; + void ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override; + +protected: + void OnConfigLoaded() override; + void Resume() override; + void Pause() override; + +private: + Shared<AsioTcpStream>::Ptr m_Stream; + std::mutex m_StreamMutex; + WorkQueue m_WorkQueue{10000000, 1}; + + boost::signals2::connection m_HandleCheckResults; + Timer::Ptr m_ReconnectTimer; + + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts); + void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts); + static String EscapeMetric(const String& str); + static String EscapeMetricLabel(const String& str); + static Value EscapeMacroMetric(const Value& value); + + void ReconnectTimerHandler(); + + void Disconnect(); + void DisconnectInternal(); + void Reconnect(); + void ReconnectInternal(); + + void AssertOnWorkQueue(); + + void ExceptionHandler(boost::exception_ptr exp); +}; + +} + +#endif /* GRAPHITEWRITER_H */ diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti new file mode 100644 index 0000000..c8db067 --- /dev/null +++ b/lib/perfdata/graphitewriter.ti @@ -0,0 +1,38 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +class GraphiteWriter : ConfigObject +{ + activation_priority 100; + + [config] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config] String port { + default {{{ return "2003"; }}} + }; + [config] String host_name_template { + default {{{ return "icinga2.$host.name$.host.$host.check_command$"; }}} + }; + [config] String service_name_template { + default {{{ return "icinga2.$host.name$.services.$service.name$.$service.check_command$"; }}} + }; + [config] bool enable_send_thresholds; + [config] bool enable_send_metadata; + + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; + [config] bool enable_ha { + default {{{ return false; }}} + }; +}; + +} diff --git a/lib/perfdata/influxdb2writer.cpp b/lib/perfdata/influxdb2writer.cpp new file mode 100644 index 0000000..c92d7d4 --- /dev/null +++ b/lib/perfdata/influxdb2writer.cpp @@ -0,0 +1,44 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#include "perfdata/influxdb2writer.hpp" +#include "perfdata/influxdb2writer-ti.cpp" +#include "remote/url.hpp" +#include "base/configtype.hpp" +#include "base/perfdatavalue.hpp" +#include "base/statsfunction.hpp" +#include <utility> +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/string_body.hpp> + +using namespace icinga; + +REGISTER_TYPE(Influxdb2Writer); + +REGISTER_STATSFUNCTION(Influxdb2Writer, &Influxdb2Writer::StatsFunc); + +void Influxdb2Writer::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + InfluxdbCommonWriter::StatsFunc<Influxdb2Writer>(status, perfdata); +} + +boost::beast::http::request<boost::beast::http::string_body> Influxdb2Writer::AssembleRequest(String body) +{ + auto request (AssembleBaseRequest(std::move(body))); + + request.set(boost::beast::http::field::authorization, "Token " + GetAuthToken()); + + return request; +} + +Url::Ptr Influxdb2Writer::AssembleUrl() +{ + auto url (AssembleBaseUrl()); + + std::vector<String> path ({"api", "v2", "write"}); + url->SetPath(path); + + url->AddQueryElement("org", GetOrganization()); + url->AddQueryElement("bucket", GetBucket()); + + return url; +} diff --git a/lib/perfdata/influxdb2writer.hpp b/lib/perfdata/influxdb2writer.hpp new file mode 100644 index 0000000..3b20f8b --- /dev/null +++ b/lib/perfdata/influxdb2writer.hpp @@ -0,0 +1,33 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#ifndef INFLUXDB2WRITER_H +#define INFLUXDB2WRITER_H + +#include "perfdata/influxdb2writer-ti.hpp" +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/string_body.hpp> + +namespace icinga +{ + +/** + * An Icinga InfluxDB v2 writer. + * + * @ingroup perfdata + */ +class Influxdb2Writer final : public ObjectImpl<Influxdb2Writer> +{ +public: + DECLARE_OBJECT(Influxdb2Writer); + DECLARE_OBJECTNAME(Influxdb2Writer); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + +protected: + boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) override; + Url::Ptr AssembleUrl() override; +}; + +} + +#endif /* INFLUXDB2WRITER_H */ diff --git a/lib/perfdata/influxdb2writer.ti b/lib/perfdata/influxdb2writer.ti new file mode 100644 index 0000000..f806187 --- /dev/null +++ b/lib/perfdata/influxdb2writer.ti @@ -0,0 +1,19 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#include "perfdata/influxdbcommonwriter.hpp" + +library perfdata; + +namespace icinga +{ + +class Influxdb2Writer : InfluxdbCommonWriter +{ + activation_priority 100; + + [config, required] String organization; + [config, required] String bucket; + [config, required, no_user_view] String auth_token; +}; + +} diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp new file mode 100644 index 0000000..fb0bcc9 --- /dev/null +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -0,0 +1,596 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#include "perfdata/influxdbcommonwriter.hpp" +#include "perfdata/influxdbcommonwriter-ti.cpp" +#include "remote/url.hpp" +#include "icinga/service.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/icingaapplication.hpp" +#include "icinga/checkcommand.hpp" +#include "base/application.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/tcpsocket.hpp" +#include "base/configtype.hpp" +#include "base/objectlock.hpp" +#include "base/logger.hpp" +#include "base/convert.hpp" +#include "base/utility.hpp" +#include "base/stream.hpp" +#include "base/json.hpp" +#include "base/networkstream.hpp" +#include "base/exception.hpp" +#include "base/statsfunction.hpp" +#include "base/tlsutility.hpp" +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/replace.hpp> +#include <boost/asio/ssl/context.hpp> +#include <boost/beast/core/flat_buffer.hpp> +#include <boost/beast/http/field.hpp> +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/parser.hpp> +#include <boost/beast/http/read.hpp> +#include <boost/beast/http/status.hpp> +#include <boost/beast/http/string_body.hpp> +#include <boost/beast/http/verb.hpp> +#include <boost/beast/http/write.hpp> +#include <boost/math/special_functions/fpclassify.hpp> +#include <boost/regex.hpp> +#include <boost/scoped_array.hpp> +#include <memory> +#include <string> +#include <utility> + +using namespace icinga; + +REGISTER_TYPE(InfluxdbCommonWriter); + +class InfluxdbInteger final : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(InfluxdbInteger); + + InfluxdbInteger(int value) + : m_Value(value) + { } + + int GetValue() const + { + return m_Value; + } + +private: + int m_Value; +}; + +void InfluxdbCommonWriter::OnConfigLoaded() +{ + ObjectImpl<InfluxdbCommonWriter>::OnConfigLoaded(); + + m_WorkQueue.SetName(GetReflectionType()->GetName() + ", " + GetName()); + + if (!GetEnableHa()) { + Log(LogDebug, GetReflectionType()->GetName()) + << "HA functionality disabled. Won't pause connection: " << GetName(); + + SetHAMode(HARunEverywhere); + } else { + SetHAMode(HARunOnce); + } +} + +void InfluxdbCommonWriter::Resume() +{ + ObjectImpl<InfluxdbCommonWriter>::Resume(); + + Log(LogInformation, GetReflectionType()->GetName()) + << "'" << GetName() << "' resumed."; + + /* Register exception handler for WQ tasks. */ + m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = Timer::Create(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); }); + m_FlushTimer->Start(); + m_FlushTimer->Reschedule(0); + + /* Register for new metrics. */ + m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, + const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { + CheckResultHandler(checkable, cr); + }); +} + +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ +void InfluxdbCommonWriter::Pause() +{ + m_HandleCheckResults.disconnect(); + + /* Force a flush. */ + Log(LogDebug, GetReflectionType()->GetName()) + << "Processing pending tasks and flushing data buffers."; + + m_FlushTimer->Stop(true); + m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + + /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ + m_WorkQueue.Join(); + + Log(LogInformation, GetReflectionType()->GetName()) + << "'" << GetName() << "' paused."; + + ObjectImpl<InfluxdbCommonWriter>::Pause(); +} + +void InfluxdbCommonWriter::AssertOnWorkQueue() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, GetReflectionType()->GetName(), "Exception during InfluxDB operation: Verify that your backend is operational!"); + + Log(LogDebug, GetReflectionType()->GetName()) + << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp)); + + //TODO: Close the connection, if we keep it open. +} + +OptionalTlsStream InfluxdbCommonWriter::Connect() +{ + Log(LogNotice, GetReflectionType()->GetName()) + << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + + OptionalTlsStream stream; + bool ssl = GetSslEnable(); + + if (ssl) { + Shared<boost::asio::ssl::context>::Ptr sslContext; + + try { + sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unable to create SSL context."; + throw; + } + + stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); + + } else { + stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext()); + } + + try { + icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (ssl) { + auto& tlsStream (stream.first->next_layer()); + + try { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "TLS handshake with host '" << GetHost() << "' failed."; + throw; + } + + if (!GetSslInsecureNoverify()) { + if (!tlsStream.GetPeerCertificate()) { + BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate.")); + } + + if (!tlsStream.IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) + )); + } + } + } + + return stream; +} + +void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (IsPaused()) + return; + + m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerWQ(checkable, cr); }, PriorityLow); +} + +void InfluxdbCommonWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); + + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + MacroProcessor::ResolverList resolvers; + if (service) + resolvers.emplace_back("service", service); + resolvers.emplace_back("host", host); + + String prefix; + + double ts = cr->GetExecutionEnd(); + + // Clone the template and perform an in-place macro expansion of measurement and tag values + Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate(); + Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->ShallowClone()); + tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr)); + + Dictionary::Ptr tagsClean = tmpl->Get("tags"); + if (tagsClean) { + Dictionary::Ptr tags = new Dictionary(); + + { + ObjectLock olock(tagsClean); + for (const Dictionary::Pair& pair : tagsClean) { + String missing_macro; + Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro); + + if (missing_macro.IsEmpty()) { + tags->Set(pair.first, value); + } + } + } + + tmpl->Set("tags", tags); + } + + CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); + + Array::Ptr perfdata = cr->GetPerformanceData(); + + if (perfdata) { + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType<PerfdataValue>()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Ignoring invalid perfdata for checkable '" + << checkable->GetName() << "' and command '" + << checkCommand->GetName() << "' with value: " << val; + continue; + } + } + + Dictionary::Ptr fields = new Dictionary(); + fields->Set("value", pdv->GetValue()); + + if (GetEnableSendThresholds()) { + if (!pdv->GetCrit().IsEmpty()) + fields->Set("crit", pdv->GetCrit()); + if (!pdv->GetWarn().IsEmpty()) + fields->Set("warn", pdv->GetWarn()); + if (!pdv->GetMin().IsEmpty()) + fields->Set("min", pdv->GetMin()); + if (!pdv->GetMax().IsEmpty()) + fields->Set("max", pdv->GetMax()); + } + if (!pdv->GetUnit().IsEmpty()) { + fields->Set("unit", pdv->GetUnit()); + } + + SendMetric(checkable, tmpl, pdv->GetLabel(), fields, ts); + } + } + + if (GetEnableSendMetadata()) { + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + if (service) + fields->Set("state", new InfluxdbInteger(service->GetState())); + else + fields->Set("state", new InfluxdbInteger(host->GetState())); + + fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt())); + fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts())); + fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType())); + fields->Set("reachable", checkable->IsReachable()); + fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth())); + fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement())); + fields->Set("latency", cr->CalculateLatency()); + fields->Set("execution_time", cr->CalculateExecutionTime()); + + SendMetric(checkable, tmpl, Empty, fields, ts); + } +} + +String InfluxdbCommonWriter::EscapeKeyOrTagValue(const String& str) +{ + // Iterate over the key name and escape commas and spaces with a backslash + String result = str; + boost::algorithm::replace_all(result, "\"", "\\\""); + boost::algorithm::replace_all(result, "=", "\\="); + boost::algorithm::replace_all(result, ",", "\\,"); + boost::algorithm::replace_all(result, " ", "\\ "); + + // InfluxDB 'feature': although backslashes are allowed in keys they also act + // as escape sequences when followed by ',' or ' '. When your tag is like + // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped + // and through experimentation they also escape '='. To be safe we replace + // trailing backslashes with and underscore. + // See https://github.com/influxdata/influxdb/issues/8587 for more info + size_t length = result.GetLength(); + if (result[length - 1] == '\\') + result[length - 1] = '_'; + + return result; +} + +String InfluxdbCommonWriter::EscapeValue(const Value& value) +{ + if (value.IsObjectType<InfluxdbInteger>()) { + std::ostringstream os; + os << static_cast<InfluxdbInteger::Ptr>(value)->GetValue() << "i"; + return os.str(); + } + + if (value.IsBoolean()) + return value ? "true" : "false"; + + if (value.IsString()) + return "\"" + EscapeKeyOrTagValue(value) + "\""; + + return value; +} + +void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, + const String& label, const Dictionary::Ptr& fields, double ts) +{ + std::ostringstream msgbuf; + msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement")); + + Dictionary::Ptr tags = tmpl->Get("tags"); + if (tags) { + ObjectLock olock(tags); + for (const Dictionary::Pair& pair : tags) { + // Empty macro expansion, no tag + if (!pair.second.IsEmpty()) { + msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second); + } + } + } + + // Label may be empty in the case of metadata + if (!label.IsEmpty()) + msgbuf << ",metric=" << EscapeKeyOrTagValue(label); + + msgbuf << " "; + + { + bool first = true; + + ObjectLock fieldLock(fields); + for (const Dictionary::Pair& pair : fields) { + if (first) + first = false; + else + msgbuf << ","; + + msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second); + } + } + + msgbuf << " " << static_cast<unsigned long>(ts); + + Log(LogDebug, GetReflectionType()->GetName()) + << "Checkable '" << checkable->GetName() << "' adds to metric list:'" << msgbuf.str() << "'."; + + // Buffer the data point + m_DataBuffer.emplace_back(msgbuf.str()); + m_DataBufferSize = m_DataBuffer.size(); + + // Flush if we've buffered too much to prevent excessive memory use + if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) { + Log(LogDebug, GetReflectionType()->GetName()) + << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; + + try { + FlushWQ(); + } catch (...) { + /* Do nothing. */ + } + } +} + +void InfluxdbCommonWriter::FlushTimeout() +{ + m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh); +} + +void InfluxdbCommonWriter::FlushTimeoutWQ() +{ + AssertOnWorkQueue(); + + Log(LogDebug, GetReflectionType()->GetName()) + << "Timer expired writing " << m_DataBuffer.size() << " data points"; + + FlushWQ(); +} + +void InfluxdbCommonWriter::FlushWQ() +{ + AssertOnWorkQueue(); + + namespace beast = boost::beast; + namespace http = beast::http; + + /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */ + if (m_DataBuffer.empty()) + return; + + Log(LogDebug, GetReflectionType()->GetName()) + << "Flushing data buffer to InfluxDB."; + + String body = boost::algorithm::join(m_DataBuffer, "\n"); + m_DataBuffer.clear(); + m_DataBufferSize = 0; + + OptionalTlsStream stream; + + try { + stream = Connect(); + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false); + return; + } + + Defer s ([&stream]() { + if (stream.first) { + stream.first->next_layer().shutdown(); + } + }); + + auto request (AssembleRequest(std::move(body))); + + try { + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + } else { + http::write(*stream.second, request); + stream.second->flush(); + } + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + http::parser<false, http::string_body> parser; + beast::flat_buffer buf; + + try { + if (stream.first) { + http::read(*stream.first, buf, parser); + } else { + http::read(*stream.second, buf, parser); + } + } catch (const std::exception& ex) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); + throw; + } + + auto& response (parser.get()); + + if (response.result() != http::status::no_content) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unexpected response code: " << response.result(); + + auto& contentType (response[http::field::content_type]); + if (contentType != "application/json") { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unexpected Content-Type: " << contentType; + return; + } + + Dictionary::Ptr jsonResponse; + auto& body (response.body()); + + try { + jsonResponse = JsonDecode(body); + } catch (...) { + Log(LogWarning, GetReflectionType()->GetName()) + << "Unable to parse JSON response:\n" << body; + return; + } + + String error = jsonResponse->Get("error"); + + Log(LogCritical, GetReflectionType()->GetName()) + << "InfluxDB error message:\n" << error; + } +} + +boost::beast::http::request<boost::beast::http::string_body> InfluxdbCommonWriter::AssembleBaseRequest(String body) +{ + namespace http = boost::beast::http; + + auto url (AssembleUrl()); + http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10); + + request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); + request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); + request.body() = std::move(body); + request.content_length(request.body().size()); + + return request; +} + +Url::Ptr InfluxdbCommonWriter::AssembleBaseUrl() +{ + Url::Ptr url = new Url(); + + url->SetScheme(GetSslEnable() ? "https" : "http"); + url->SetHost(GetHost()); + url->SetPort(GetPort()); + url->AddQueryElement("precision", "s"); + + return url; +} + +void InfluxdbCommonWriter::ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<InfluxdbCommonWriter>::ValidateHostTemplate(lvalue, utils); + + String measurement = lvalue()->Get("measurement"); + if (!MacroProcessor::ValidateMacroString(measurement)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'.")); + + Dictionary::Ptr tags = lvalue()->Get("tags"); + if (tags) { + ObjectLock olock(tags); + for (const Dictionary::Pair& pair : tags) { + if (!MacroProcessor::ValidateMacroString(pair.second)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); + } + } +} + +void InfluxdbCommonWriter::ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<InfluxdbCommonWriter>::ValidateServiceTemplate(lvalue, utils); + + String measurement = lvalue()->Get("measurement"); + if (!MacroProcessor::ValidateMacroString(measurement)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'.")); + + Dictionary::Ptr tags = lvalue()->Get("tags"); + if (tags) { + ObjectLock olock(tags); + for (const Dictionary::Pair& pair : tags) { + if (!MacroProcessor::ValidateMacroString(pair.second)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); + } + } +} + diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp new file mode 100644 index 0000000..380b20c --- /dev/null +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -0,0 +1,101 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#ifndef INFLUXDBCOMMONWRITER_H +#define INFLUXDBCOMMONWRITER_H + +#include "perfdata/influxdbcommonwriter-ti.hpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/perfdatavalue.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" +#include "base/tlsstream.hpp" +#include "base/workqueue.hpp" +#include "remote/url.hpp" +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/string_body.hpp> +#include <atomic> +#include <fstream> + +namespace icinga +{ + +/** + * Common base class for InfluxDB v1/v2 writers. + * + * @ingroup perfdata + */ +class InfluxdbCommonWriter : public ObjectImpl<InfluxdbCommonWriter> +{ +public: + DECLARE_OBJECT(InfluxdbCommonWriter); + + template<class InfluxWriter> + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + + void ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override; + void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override; + +protected: + void OnConfigLoaded() override; + void Resume() override; + void Pause() override; + + boost::beast::http::request<boost::beast::http::string_body> AssembleBaseRequest(String body); + Url::Ptr AssembleBaseUrl(); + virtual boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) = 0; + virtual Url::Ptr AssembleUrl() = 0; + +private: + boost::signals2::connection m_HandleCheckResults; + Timer::Ptr m_FlushTimer; + WorkQueue m_WorkQueue{10000000, 1}; + std::vector<String> m_DataBuffer; + std::atomic_size_t m_DataBufferSize{0}; + + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, + const String& label, const Dictionary::Ptr& fields, double ts); + void FlushTimeout(); + void FlushTimeoutWQ(); + void FlushWQ(); + + static String EscapeKeyOrTagValue(const String& str); + static String EscapeValue(const Value& value); + + OptionalTlsStream Connect(); + + void AssertOnWorkQueue(); + + void ExceptionHandler(boost::exception_ptr exp); +}; + +template<class InfluxWriter> +void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + DictionaryData nodes; + auto typeName (InfluxWriter::TypeInstance->GetName().ToLower()); + + for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType<InfluxWriter>()) { + size_t workQueueItems = influxwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + size_t dataBufferItems = influxwriter->m_DataBufferSize; + + nodes.emplace_back(influxwriter->GetName(), new Dictionary({ + { "work_queue_items", workQueueItems }, + { "work_queue_item_rate", workQueueItemRate }, + { "data_buffer_items", dataBufferItems } + })); + + perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_data_queue_items", dataBufferItems)); + } + + status->Set(typeName, new Dictionary(std::move(nodes))); +} + +} + +#endif /* INFLUXDBCOMMONWRITER_H */ diff --git a/lib/perfdata/influxdbcommonwriter.ti b/lib/perfdata/influxdbcommonwriter.ti new file mode 100644 index 0000000..5cfe83f --- /dev/null +++ b/lib/perfdata/influxdbcommonwriter.ti @@ -0,0 +1,88 @@ +/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +abstract class InfluxdbCommonWriter : ConfigObject +{ + [config, required] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config, required] String port { + default {{{ return "8086"; }}} + }; + [config] bool ssl_enable { + default {{{ return false; }}} + }; + [config] bool ssl_insecure_noverify { + default {{{ return false; }}} + }; + [config] String ssl_ca_cert { + default {{{ return ""; }}} + }; + [config] String ssl_cert { + default {{{ return ""; }}} + }; + [config] String ssl_key{ + default {{{ return ""; }}} + }; + [config, required] Dictionary::Ptr host_template { + default {{{ + return new Dictionary({ + { "measurement", "$host.check_command$" }, + { "tags", new Dictionary({ + { "hostname", "$host.name$" } + }) } + }); + }}} + }; + [config, required] Dictionary::Ptr service_template { + default {{{ + return new Dictionary({ + { "measurement", "$service.check_command$" }, + { "tags", new Dictionary({ + { "hostname", "$host.name$" }, + { "service", "$service.name$" } + }) } + }); + }}} + }; + [config] bool enable_send_thresholds { + default {{{ return false; }}} + }; + [config] bool enable_send_metadata { + default {{{ return false; }}} + }; + [config] int flush_interval { + default {{{ return 10; }}} + }; + [config] int flush_threshold { + default {{{ return 1024; }}} + }; + [config] bool enable_ha { + default {{{ return false; }}} + }; +}; + +validator InfluxdbCommonWriter { + Dictionary host_template { + required measurement; + String measurement; + Dictionary "tags" { + String "*"; + }; + }; + Dictionary service_template { + required measurement; + String measurement; + Dictionary "tags" { + String "*"; + }; + }; +}; + +} diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp new file mode 100644 index 0000000..4bc992d --- /dev/null +++ b/lib/perfdata/influxdbwriter.cpp @@ -0,0 +1,56 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "perfdata/influxdbwriter.hpp" +#include "perfdata/influxdbwriter-ti.cpp" +#include "base/base64.hpp" +#include "remote/url.hpp" +#include "base/configtype.hpp" +#include "base/perfdatavalue.hpp" +#include "base/statsfunction.hpp" +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/string_body.hpp> +#include <utility> + +using namespace icinga; + +REGISTER_TYPE(InfluxdbWriter); + +REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc); + +void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + InfluxdbCommonWriter::StatsFunc<InfluxdbWriter>(status, perfdata); +} + +boost::beast::http::request<boost::beast::http::string_body> InfluxdbWriter::AssembleRequest(String body) +{ + auto request (AssembleBaseRequest(std::move(body))); + Dictionary::Ptr basicAuth = GetBasicAuth(); + + if (basicAuth) { + request.set( + boost::beast::http::field::authorization, + "Basic " + Base64::Encode(basicAuth->Get("username") + ":" + basicAuth->Get("password")) + ); + } + + return request; +} + +Url::Ptr InfluxdbWriter::AssembleUrl() +{ + auto url (AssembleBaseUrl()); + + std::vector<String> path; + path.emplace_back("write"); + url->SetPath(path); + + url->AddQueryElement("db", GetDatabase()); + + if (!GetUsername().IsEmpty()) + url->AddQueryElement("u", GetUsername()); + if (!GetPassword().IsEmpty()) + url->AddQueryElement("p", GetPassword()); + + return url; +} diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp new file mode 100644 index 0000000..48676cc --- /dev/null +++ b/lib/perfdata/influxdbwriter.hpp @@ -0,0 +1,31 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef INFLUXDBWRITER_H +#define INFLUXDBWRITER_H + +#include "perfdata/influxdbwriter-ti.hpp" + +namespace icinga +{ + +/** + * An Icinga InfluxDB v1 writer. + * + * @ingroup perfdata + */ +class InfluxdbWriter final : public ObjectImpl<InfluxdbWriter> +{ +public: + DECLARE_OBJECT(InfluxdbWriter); + DECLARE_OBJECTNAME(InfluxdbWriter); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + +protected: + boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) override; + Url::Ptr AssembleUrl() override; +}; + +} + +#endif /* INFLUXDBWRITER_H */ diff --git a/lib/perfdata/influxdbwriter.ti b/lib/perfdata/influxdbwriter.ti new file mode 100644 index 0000000..e6fc84e --- /dev/null +++ b/lib/perfdata/influxdbwriter.ti @@ -0,0 +1,35 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "perfdata/influxdbcommonwriter.hpp" + +library perfdata; + +namespace icinga +{ + +class InfluxdbWriter : InfluxdbCommonWriter +{ + activation_priority 100; + + [config, required] String database { + default {{{ return "icinga2"; }}} + }; + [config] String username { + default {{{ return ""; }}} + }; + [config, no_user_view] String password { + default {{{ return ""; }}} + }; + [config, no_user_view] Dictionary::Ptr basic_auth; +}; + +validator InfluxdbWriter { + Dictionary basic_auth { + required username; + String username; + required password; + String password; + }; +}; + +} diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp new file mode 100644 index 0000000..2a9cfc0 --- /dev/null +++ b/lib/perfdata/opentsdbwriter.cpp @@ -0,0 +1,525 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "perfdata/opentsdbwriter.hpp" +#include "perfdata/opentsdbwriter-ti.cpp" +#include "icinga/service.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/icingaapplication.hpp" +#include "icinga/compatutility.hpp" +#include "base/tcpsocket.hpp" +#include "base/configtype.hpp" +#include "base/objectlock.hpp" +#include "base/logger.hpp" +#include "base/convert.hpp" +#include "base/utility.hpp" +#include "base/perfdatavalue.hpp" +#include "base/application.hpp" +#include "base/stream.hpp" +#include "base/networkstream.hpp" +#include "base/exception.hpp" +#include "base/statsfunction.hpp" +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/replace.hpp> + +using namespace icinga; + +REGISTER_TYPE(OpenTsdbWriter); + +REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc); + +/* + * Enable HA capabilities once the config object is loaded. + */ +void OpenTsdbWriter::OnConfigLoaded() +{ + ObjectImpl<OpenTsdbWriter>::OnConfigLoaded(); + + if (!GetEnableHa()) { + Log(LogDebug, "OpenTsdbWriter") + << "HA functionality disabled. Won't pause connection: " << GetName(); + + SetHAMode(HARunEverywhere); + } else { + SetHAMode(HARunOnce); + } +} + +/** + * Feature stats interface + * + * @param status Key value pairs for feature stats + */ +void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) +{ + DictionaryData nodes; + + for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) { + nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({ + { "connected", opentsdbwriter->GetConnected() } + })); + } + + status->Set("opentsdbwriter", new Dictionary(std::move(nodes))); +} + +/** + * Resume is equivalent to Start, but with HA capabilities to resume at runtime. + */ +void OpenTsdbWriter::Resume() +{ + ObjectImpl<OpenTsdbWriter>::Resume(); + + Log(LogInformation, "OpentsdbWriter") + << "'" << GetName() << "' resumed."; + + ReadConfigTemplate(m_ServiceConfigTemplate, m_HostConfigTemplate); + + m_ReconnectTimer = Timer::Create(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); + + m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { + CheckResultHandler(checkable, cr); + }); +} + +/** + * Pause is equivalent to Stop, but with HA capabilities to resume at runtime. + */ +void OpenTsdbWriter::Pause() +{ + m_HandleCheckResults.disconnect(); + m_ReconnectTimer->Stop(true); + + Log(LogInformation, "OpentsdbWriter") + << "'" << GetName() << "' paused."; + + m_Stream->close(); + + SetConnected(false); + + ObjectImpl<OpenTsdbWriter>::Pause(); +} + +/** + * Reconnect handler called by the timer. + * Handles TLS + */ +void OpenTsdbWriter::ReconnectTimerHandler() +{ + if (IsPaused()) + return; + + SetShouldConnect(true); + + if (GetConnected()) + return; + + double startTime = Utility::GetTime(); + + Log(LogNotice, "OpenTsdbWriter") + << "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'."; + + /* + * We're using telnet as input method. Future PRs may change this into using the HTTP API. + * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet + */ + m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext()); + + try { + icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "OpenTsdbWriter") + << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + + SetConnected(false); + + return; + } + + SetConnected(true); + + Log(LogInformation, "OpenTsdbWriter") + << "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; +} + +/** + * Registered check result handler processing data. + * Calculates tags from the config. + * + * @param checkable Host/service object + * @param cr Check result + */ +void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (IsPaused()) + return; + + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); + + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) + return; + + Service::Ptr service = dynamic_pointer_cast<Service>(checkable); + Host::Ptr host; + Dictionary::Ptr config_tmpl; + Dictionary::Ptr config_tmpl_tags; + String config_tmpl_metric; + + if (service) { + host = service->GetHost(); + config_tmpl = m_ServiceConfigTemplate; + } + else { + host = static_pointer_cast<Host>(checkable); + config_tmpl = m_HostConfigTemplate; + } + + // Get the tags nested dictionary in the service/host template in the config + if (config_tmpl) { + config_tmpl_tags = config_tmpl->Get("tags"); + config_tmpl_metric = config_tmpl->Get("metric"); + } + + String metric; + std::map<String, String> tags; + + // Resolve macros in configuration template and build custom tag list + if (config_tmpl_tags || !config_tmpl_metric.IsEmpty()) { + + // Configure config template macro resolver + MacroProcessor::ResolverList resolvers; + if (service) + resolvers.emplace_back("service", service); + resolvers.emplace_back("host", host); + + // Resolve macros for the service and host template config line + if (config_tmpl_tags) { + ObjectLock olock(config_tmpl_tags); + + for (const Dictionary::Pair& pair : config_tmpl_tags) { + + String missing_macro; + Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro); + + if (!missing_macro.IsEmpty()) { + Log(LogDebug, "OpenTsdbWriter") + << "Unable to resolve macro:'" << missing_macro + << "' for this host or service."; + + continue; + } + + String tagname = Convert::ToString(pair.first); + tags[tagname] = EscapeTag(value); + + } + } + + // Resolve macros for the metric config line + if (!config_tmpl_metric.IsEmpty()) { + + String missing_macro; + Value value = MacroProcessor::ResolveMacros(config_tmpl_metric, resolvers, cr, &missing_macro); + + if (!missing_macro.IsEmpty()) { + Log(LogDebug, "OpenTsdbWriter") + << "Unable to resolve macro:'" << missing_macro + << "' for this host or service."; + + } + else { + + config_tmpl_metric = Convert::ToString(value); + + } + } + } + + String escaped_hostName = EscapeTag(host->GetName()); + tags["host"] = escaped_hostName; + + double ts = cr->GetExecutionEnd(); + + if (service) { + + if (!config_tmpl_metric.IsEmpty()) { + metric = config_tmpl_metric; + } else { + String serviceName = service->GetShortName(); + String escaped_serviceName = EscapeMetric(serviceName); + metric = "icinga.service." + escaped_serviceName; + } + + SendMetric(checkable, metric + ".state", tags, service->GetState(), ts); + + } else { + if (!config_tmpl_metric.IsEmpty()) { + metric = config_tmpl_metric; + } else { + metric = "icinga.host"; + } + SendMetric(checkable, metric + ".state", tags, host->GetState(), ts); + } + + SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts); + SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts); + SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts); + SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts); + + SendPerfdata(checkable, metric, tags, cr, ts); + + metric = "icinga.check"; + + if (service) { + tags["type"] = "service"; + String serviceName = service->GetShortName(); + String escaped_serviceName = EscapeTag(serviceName); + tags["service"] = escaped_serviceName; + } else { + tags["type"] = "host"; + } + + SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); + SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); + SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); + SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); +} + +/** + * Parse and send performance data metrics to OpenTSDB + * + * @param checkable Host/service object + * @param metric Full metric name + * @param tags Tag key pairs + * @param cr Check result containing performance data + * @param ts Timestamp when the check result was received + */ +void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric, + const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts) +{ + Array::Ptr perfdata = cr->GetPerformanceData(); + + if (!perfdata) + return; + + CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); + + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType<PerfdataValue>()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, "OpenTsdbWriter") + << "Ignoring invalid perfdata for checkable '" + << checkable->GetName() << "' and command '" + << checkCommand->GetName() << "' with value: " << val; + continue; + } + } + + String metric_name; + std::map<String, String> tags_new = tags; + + // Do not break original functionality where perfdata labels form + // part of the metric name + if (!GetEnableGenericMetrics()) { + String escaped_key = EscapeMetric(pdv->GetLabel()); + boost::algorithm::replace_all(escaped_key, "::", "."); + metric_name = metric + "." + escaped_key; + } else { + String escaped_key = EscapeTag(pdv->GetLabel()); + metric_name = metric; + tags_new["label"] = escaped_key; + } + + SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); + + if (!pdv->GetCrit().IsEmpty()) + SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); + if (!pdv->GetWarn().IsEmpty()) + SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); + if (!pdv->GetMin().IsEmpty()) + SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); + if (!pdv->GetMax().IsEmpty()) + SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); + } +} + +/** + * Send given metric to OpenTSDB + * + * @param checkable Host/service object + * @param metric Full metric name + * @param tags Tag key pairs + * @param value Floating point metric value + * @param ts Timestamp where the metric was received from the check result + */ +void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric, + const std::map<String, String>& tags, double value, double ts) +{ + String tags_string = ""; + + for (const Dictionary::Pair& tag : tags) { + tags_string += " " + tag.first + "=" + Convert::ToString(tag.second); + } + + std::ostringstream msgbuf; + /* + * must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html) + * put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]> + * "tags" must include at least one tag, we use "host=HOSTNAME" + */ + msgbuf << "put " << metric << " " << static_cast<long>(ts) << " " << Convert::ToString(value) << tags_string; + + Log(LogDebug, "OpenTsdbWriter") + << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'."; + + /* do not send \n to debug log */ + msgbuf << "\n"; + String put = msgbuf.str(); + + ObjectLock olock(this); + + if (!GetConnected()) + return; + + try { + Log(LogDebug, "OpenTsdbWriter") + << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'."; + + boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str())); + m_Stream->flush(); + } catch (const std::exception& ex) { + Log(LogCritical, "OpenTsdbWriter") + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + } +} + +/** + * Escape tags for OpenTSDB + * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags + * + * @param str Tag name + * @return Escaped tag + */ +String OpenTsdbWriter::EscapeTag(const String& str) +{ + String result = str; + + boost::replace_all(result, " ", "_"); + boost::replace_all(result, "\\", "_"); + boost::replace_all(result, ":", "_"); + + return result; +} + +/** + * Escape metric name for OpenTSDB + * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags + * + * @param str Metric name + * @return Escaped metric + */ +String OpenTsdbWriter::EscapeMetric(const String& str) +{ + String result = str; + + boost::replace_all(result, " ", "_"); + boost::replace_all(result, ".", "_"); + boost::replace_all(result, "\\", "_"); + boost::replace_all(result, ":", "_"); + + return result; +} + +/** +* Saves the template dictionaries defined in the config file into running memory +* +* @param stemplate The dictionary to save the service configuration to +* @param htemplate The dictionary to save the host configuration to +*/ +void OpenTsdbWriter::ReadConfigTemplate(const Dictionary::Ptr& stemplate, + const Dictionary::Ptr& htemplate) +{ + + m_ServiceConfigTemplate = GetServiceTemplate(); + + if (!m_ServiceConfigTemplate) { + Log(LogDebug, "OpenTsdbWriter") + << "Unable to locate service template configuration."; + } else if (m_ServiceConfigTemplate->GetLength() == 0) { + Log(LogDebug, "OpenTsdbWriter") + << "The service template configuration is empty."; + } + + m_HostConfigTemplate = GetHostTemplate(); + + if (!m_HostConfigTemplate) { + Log(LogDebug, "OpenTsdbWriter") + << "Unable to locate host template configuration."; + } else if (m_HostConfigTemplate->GetLength() == 0) { + Log(LogDebug, "OpenTsdbWriter") + << "The host template configuration is empty."; + } + +} + + +/** +* Validates the host_template configuration block in the configuration +* file and checks for syntax errors. +* +* @param lvalue The host_template dictionary +* @param utils Validation helper utilities +*/ +void OpenTsdbWriter::ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<OpenTsdbWriter>::ValidateHostTemplate(lvalue, utils); + + String metric = lvalue()->Get("metric"); + if (!MacroProcessor::ValidateMacroString(metric)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "metric" }, "Closing $ not found in macro format string '" + metric + "'.")); + + Dictionary::Ptr tags = lvalue()->Get("tags"); + if (tags) { + ObjectLock olock(tags); + for (const Dictionary::Pair& pair : tags) { + if (!MacroProcessor::ValidateMacroString(pair.second)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); + } + } +} + +/** +* Validates the service_template configuration block in the +* configuration file and checks for syntax errors. +* +* @param lvalue The service_template dictionary +* @param utils Validation helper utilities +*/ +void OpenTsdbWriter::ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<OpenTsdbWriter>::ValidateServiceTemplate(lvalue, utils); + + String metric = lvalue()->Get("metric"); + if (!MacroProcessor::ValidateMacroString(metric)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "metric" }, "Closing $ not found in macro format string '" + metric + "'.")); + + Dictionary::Ptr tags = lvalue()->Get("tags"); + if (tags) { + ObjectLock olock(tags); + for (const Dictionary::Pair& pair : tags) { + if (!MacroProcessor::ValidateMacroString(pair.second)) + BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second)); + } + } +} diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp new file mode 100644 index 0000000..e37ef42 --- /dev/null +++ b/lib/perfdata/opentsdbwriter.hpp @@ -0,0 +1,62 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef OPENTSDBWRITER_H +#define OPENTSDBWRITER_H + +#include "perfdata/opentsdbwriter-ti.hpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/tcpsocket.hpp" +#include "base/timer.hpp" +#include <fstream> + +namespace icinga +{ + +/** + * An Icinga opentsdb writer. + * + * @ingroup perfdata + */ +class OpenTsdbWriter final : public ObjectImpl<OpenTsdbWriter> +{ +public: + DECLARE_OBJECT(OpenTsdbWriter); + DECLARE_OBJECTNAME(OpenTsdbWriter); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + + void ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override; + void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override; + +protected: + void OnConfigLoaded() override; + void Resume() override; + void Pause() override; + +private: + Shared<AsioTcpStream>::Ptr m_Stream; + + boost::signals2::connection m_HandleCheckResults; + Timer::Ptr m_ReconnectTimer; + + Dictionary::Ptr m_ServiceConfigTemplate; + Dictionary::Ptr m_HostConfigTemplate; + + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void SendMetric(const Checkable::Ptr& checkable, const String& metric, + const std::map<String, String>& tags, double value, double ts); + void SendPerfdata(const Checkable::Ptr& checkable, const String& metric, + const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts); + static String EscapeTag(const String& str); + static String EscapeMetric(const String& str); + + void ReconnectTimerHandler(); + + void ReadConfigTemplate(const Dictionary::Ptr& stemplate, + const Dictionary::Ptr& htemplate); +}; + +} + +#endif /* OPENTSDBWRITER_H */ diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti new file mode 100644 index 0000000..626350a --- /dev/null +++ b/lib/perfdata/opentsdbwriter.ti @@ -0,0 +1,55 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +class OpenTsdbWriter : ConfigObject +{ + activation_priority 100; + + [config] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config] String port { + default {{{ return "4242"; }}} + }; + [config] bool enable_ha { + default {{{ return false; }}} + }; + [config] Dictionary::Ptr host_template { + default {{{ return new Dictionary(); }}} + + }; + [config] Dictionary::Ptr service_template { + default {{{ return new Dictionary(); }}} + }; + [config] bool enable_generic_metrics { + default {{{ return false; }}} + }; + + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; +}; + +validator OpenTsdbWriter { + Dictionary host_template { + String metric; + Dictionary "tags" { + String "*"; + }; + }; + Dictionary service_template { + String metric; + Dictionary "tags" { + String "*"; + }; + }; +}; + +} diff --git a/lib/perfdata/perfdatawriter.cpp b/lib/perfdata/perfdatawriter.cpp new file mode 100644 index 0000000..849f19e --- /dev/null +++ b/lib/perfdata/perfdatawriter.cpp @@ -0,0 +1,201 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "perfdata/perfdatawriter.hpp" +#include "perfdata/perfdatawriter-ti.cpp" +#include "icinga/service.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/icingaapplication.hpp" +#include "base/configtype.hpp" +#include "base/objectlock.hpp" +#include "base/logger.hpp" +#include "base/convert.hpp" +#include "base/utility.hpp" +#include "base/context.hpp" +#include "base/exception.hpp" +#include "base/application.hpp" +#include "base/statsfunction.hpp" + +using namespace icinga; + +REGISTER_TYPE(PerfdataWriter); + +REGISTER_STATSFUNCTION(PerfdataWriter, &PerfdataWriter::StatsFunc); + +void PerfdataWriter::OnConfigLoaded() +{ + ObjectImpl<PerfdataWriter>::OnConfigLoaded(); + + if (!GetEnableHa()) { + Log(LogDebug, "PerfdataWriter") + << "HA functionality disabled. Won't pause connection: " << GetName(); + + SetHAMode(HARunEverywhere); + } else { + SetHAMode(HARunOnce); + } +} + +void PerfdataWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) +{ + DictionaryData nodes; + + for (const PerfdataWriter::Ptr& perfdatawriter : ConfigType::GetObjectsByType<PerfdataWriter>()) { + nodes.emplace_back(perfdatawriter->GetName(), 1); //add more stats + } + + status->Set("perfdatawriter", new Dictionary(std::move(nodes))); +} + +void PerfdataWriter::Resume() +{ + ObjectImpl<PerfdataWriter>::Resume(); + + Log(LogInformation, "PerfdataWriter") + << "'" << GetName() << "' resumed."; + + m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, + const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { + CheckResultHandler(checkable, cr); + }); + + m_RotationTimer = Timer::Create(); + m_RotationTimer->OnTimerExpired.connect([this](const Timer * const&) { RotationTimerHandler(); }); + m_RotationTimer->SetInterval(GetRotationInterval()); + m_RotationTimer->Start(); + + RotateFile(m_ServiceOutputFile, GetServiceTempPath(), GetServicePerfdataPath()); + RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath()); +} + +void PerfdataWriter::Pause() +{ + m_HandleCheckResults.disconnect(); + m_RotationTimer->Stop(true); + +#ifdef I2_DEBUG + //m_HostOutputFile << "\n# Pause the feature" << "\n\n"; + //m_ServiceOutputFile << "\n# Pause the feature" << "\n\n"; +#endif /* I2_DEBUG */ + + /* Force a rotation closing the file stream. */ + RotateAllFiles(); + + Log(LogInformation, "PerfdataWriter") + << "'" << GetName() << "' paused."; + + ObjectImpl<PerfdataWriter>::Pause(); +} + +Value PerfdataWriter::EscapeMacroMetric(const Value& value) +{ + if (value.IsObjectType<Array>()) + return Utility::Join(value, ';'); + else + return value; +} + +void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (IsPaused()) + return; + + CONTEXT("Writing performance data for object '" << checkable->GetName() << "'"); + + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) + return; + + Service::Ptr service = dynamic_pointer_cast<Service>(checkable); + Host::Ptr host; + + if (service) + host = service->GetHost(); + else + host = static_pointer_cast<Host>(checkable); + + MacroProcessor::ResolverList resolvers; + if (service) + resolvers.emplace_back("service", service); + resolvers.emplace_back("host", host); + + if (service) { + String line = MacroProcessor::ResolveMacros(GetServiceFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric); + + { + std::unique_lock<std::mutex> lock(m_StreamMutex); + + if (!m_ServiceOutputFile.good()) + return; + + m_ServiceOutputFile << line << "\n"; + } + } else { + String line = MacroProcessor::ResolveMacros(GetHostFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric); + + { + std::unique_lock<std::mutex> lock(m_StreamMutex); + + if (!m_HostOutputFile.good()) + return; + + m_HostOutputFile << line << "\n"; + } + } +} + +void PerfdataWriter::RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path) +{ + Log(LogDebug, "PerfdataWriter") + << "Rotating perfdata files."; + + std::unique_lock<std::mutex> lock(m_StreamMutex); + + if (output.good()) { + output.close(); + + if (Utility::PathExists(temp_path)) { + String finalFile = perfdata_path + "." + Convert::ToString((long)Utility::GetTime()); + + Log(LogDebug, "PerfdataWriter") + << "Closed output file and renaming into '" << finalFile << "'."; + + Utility::RenameFile(temp_path, finalFile); + } + } + + output.open(temp_path.CStr()); + + if (!output.good()) { + Log(LogWarning, "PerfdataWriter") + << "Could not open perfdata file '" << temp_path << "' for writing. Perfdata will be lost."; + } +} + +void PerfdataWriter::RotationTimerHandler() +{ + if (IsPaused()) + return; + + RotateAllFiles(); +} + +void PerfdataWriter::RotateAllFiles() +{ + RotateFile(m_ServiceOutputFile, GetServiceTempPath(), GetServicePerfdataPath()); + RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath()); +} + +void PerfdataWriter::ValidateHostFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<PerfdataWriter>::ValidateHostFormatTemplate(lvalue, utils); + + if (!MacroProcessor::ValidateMacroString(lvalue())) + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_format_template" }, "Closing $ not found in macro format string '" + lvalue() + "'.")); +} + +void PerfdataWriter::ValidateServiceFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<PerfdataWriter>::ValidateServiceFormatTemplate(lvalue, utils); + + if (!MacroProcessor::ValidateMacroString(lvalue())) + BOOST_THROW_EXCEPTION(ValidationError(this, { "service_format_template" }, "Closing $ not found in macro format string '" + lvalue() + "'.")); +} diff --git a/lib/perfdata/perfdatawriter.hpp b/lib/perfdata/perfdatawriter.hpp new file mode 100644 index 0000000..961d4e9 --- /dev/null +++ b/lib/perfdata/perfdatawriter.hpp @@ -0,0 +1,53 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef PERFDATAWRITER_H +#define PERFDATAWRITER_H + +#include "perfdata/perfdatawriter-ti.hpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/timer.hpp" +#include <fstream> + +namespace icinga +{ + +/** + * An Icinga perfdata writer. + * + * @ingroup icinga + */ +class PerfdataWriter final : public ObjectImpl<PerfdataWriter> +{ +public: + DECLARE_OBJECT(PerfdataWriter); + DECLARE_OBJECTNAME(PerfdataWriter); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + + void ValidateHostFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override; + void ValidateServiceFormatTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) override; + +protected: + void OnConfigLoaded() override; + void Resume() override; + void Pause() override; + +private: + boost::signals2::connection m_HandleCheckResults; + Timer::Ptr m_RotationTimer; + std::ofstream m_ServiceOutputFile; + std::ofstream m_HostOutputFile; + std::mutex m_StreamMutex; + + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + static Value EscapeMacroMetric(const Value& value); + + void RotationTimerHandler(); + void RotateAllFiles(); + void RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path); +}; + +} + +#endif /* PERFDATAWRITER_H */ diff --git a/lib/perfdata/perfdatawriter.ti b/lib/perfdata/perfdatawriter.ti new file mode 100644 index 0000000..d6d99e8 --- /dev/null +++ b/lib/perfdata/perfdatawriter.ti @@ -0,0 +1,61 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" +#include "base/application.hpp" + +library perfdata; + +namespace icinga +{ + +class PerfdataWriter : ConfigObject +{ + activation_priority 100; + + [config] String host_perfdata_path { + default {{{ return Configuration::SpoolDir + "/perfdata/host-perfdata"; }}} + }; + [config] String service_perfdata_path { + default {{{ return Configuration::SpoolDir + "/perfdata/service-perfdata"; }}} + }; + [config] String host_temp_path { + default {{{ return Configuration::SpoolDir + "/tmp/host-perfdata"; }}} + }; + [config] String service_temp_path { + default {{{ return Configuration::SpoolDir + "/tmp/service-perfdata"; }}} + }; + [config] String host_format_template { + default {{{ + return "DATATYPE::HOSTPERFDATA\t" + "TIMET::$host.last_check$\t" + "HOSTNAME::$host.name$\t" + "HOSTPERFDATA::$host.perfdata$\t" + "HOSTCHECKCOMMAND::$host.check_command$\t" + "HOSTSTATE::$host.state$\t" + "HOSTSTATETYPE::$host.state_type$"; + }}} + }; + [config] String service_format_template { + default {{{ + return "DATATYPE::SERVICEPERFDATA\t" + "TIMET::$service.last_check$\t" + "HOSTNAME::$host.name$\t" + "SERVICEDESC::$service.name$\t" + "SERVICEPERFDATA::$service.perfdata$\t" + "SERVICECHECKCOMMAND::$service.check_command$\t" + "HOSTSTATE::$host.state$\t" + "HOSTSTATETYPE::$host.state_type$\t" + "SERVICESTATE::$service.state$\t" + "SERVICESTATETYPE::$service.state_type$"; + }}} + }; + + [config] double rotation_interval { + default {{{ return 30; }}} + }; + [config] bool enable_ha { + default {{{ return false; }}} + }; +}; + +} |