summaryrefslogtreecommitdiffstats
path: root/lib/perfdata/gelfwriter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/perfdata/gelfwriter.cpp')
-rw-r--r--lib/perfdata/gelfwriter.cpp535
1 files changed, 535 insertions, 0 deletions
diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp
new file mode 100644
index 0000000..c5b2bbd
--- /dev/null
+++ b/lib/perfdata/gelfwriter.cpp
@@ -0,0 +1,535 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "perfdata/gelfwriter.hpp"
+#include "perfdata/gelfwriter-ti.cpp"
+#include "icinga/service.hpp"
+#include "icinga/notification.hpp"
+#include "icinga/checkcommand.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/compatutility.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/configtype.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/utility.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/application.hpp"
+#include "base/stream.hpp"
+#include "base/networkstream.hpp"
+#include "base/context.hpp"
+#include "base/exception.hpp"
+#include "base/json.hpp"
+#include "base/statsfunction.hpp"
+#include <boost/algorithm/string/replace.hpp>
+#include <utility>
+#include "base/io-engine.hpp"
+#include <boost/asio/write.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/system/error_code.hpp>
+#include <boost/asio/error.hpp>
+
+using namespace icinga;
+
+REGISTER_TYPE(GelfWriter);
+
+REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
+
+void GelfWriter::OnConfigLoaded()
+{
+ ObjectImpl<GelfWriter>::OnConfigLoaded();
+
+ m_WorkQueue.SetName("GelfWriter, " + GetName());
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, "GelfWriter")
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ DictionaryData nodes;
+
+ for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
+ size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+ nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
+ { "work_queue_items", workQueueItems },
+ { "work_queue_item_rate", workQueueItemRate },
+ { "connected", gelfwriter->GetConnected() },
+ { "source", gelfwriter->GetSource() }
+ }));
+
+ perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+ }
+
+ status->Set("gelfwriter", new Dictionary(std::move(nodes)));
+}
+
+void GelfWriter::Resume()
+{
+ ObjectImpl<GelfWriter>::Resume();
+
+ Log(LogInformation, "GelfWriter")
+ << "'" << GetName() << "' resumed.";
+
+ /* Register exception handler for WQ tasks. */
+ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
+
+ /* Timer for reconnecting */
+ m_ReconnectTimer = Timer::Create();
+ m_ReconnectTimer->SetInterval(10);
+ m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
+ m_ReconnectTimer->Start();
+ m_ReconnectTimer->Reschedule(0);
+
+ /* Register event handlers. */
+ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ });
+ m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification,
+ const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr,
+ const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
+ NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
+ });
+ m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
+ StateChangeHandler(checkable, cr, type);
+ });
+}
+
+/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+void GelfWriter::Pause()
+{
+ m_HandleCheckResults.disconnect();
+ m_HandleNotifications.disconnect();
+ m_HandleStateChanges.disconnect();
+
+ m_ReconnectTimer->Stop(true);
+
+ m_WorkQueue.Enqueue([this]() {
+ try {
+ ReconnectInternal();
+ } catch (const std::exception&) {
+ Log(LogInformation, "GelfWriter")
+ << "Unable to connect, not flushing buffers. Data may be lost.";
+ }
+ }, PriorityImmediate);
+
+ m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow);
+ m_WorkQueue.Join();
+
+ Log(LogInformation, "GelfWriter")
+ << "'" << GetName() << "' paused.";
+
+ ObjectImpl<GelfWriter>::Pause();
+}
+
+void GelfWriter::AssertOnWorkQueue()
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
+ Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
+
+ DisconnectInternal();
+}
+
+void GelfWriter::Reconnect()
+{
+ AssertOnWorkQueue();
+
+ if (IsPaused()) {
+ SetConnected(false);
+ return;
+ }
+
+ ReconnectInternal();
+}
+
+void GelfWriter::ReconnectInternal()
+{
+ double startTime = Utility::GetTime();
+
+ CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'");
+
+ SetShouldConnect(true);
+
+ if (GetConnected())
+ return;
+
+ Log(LogNotice, "GelfWriter")
+ << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ bool ssl = GetEnableTls();
+
+ if (ssl) {
+ Shared<boost::asio::ssl::context>::Ptr sslContext;
+
+ try {
+ sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "GelfWriter")
+ << "Unable to create SSL context.";
+ throw;
+ }
+
+ m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
+
+ } else {
+ m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
+ }
+
+ try {
+ icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "GelfWriter")
+ << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
+ throw;
+ }
+
+ if (ssl) {
+ auto& tlsStream (m_Stream.first->next_layer());
+
+ try {
+ tlsStream.handshake(tlsStream.client);
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "GelfWriter")
+ << "TLS handshake with host '" << GetHost() << " failed.'";
+ throw;
+ }
+
+ if (!GetInsecureNoverify()) {
+ if (!tlsStream.GetPeerCertificate()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
+ }
+
+ if (!tlsStream.IsVerifyOK()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error(
+ "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
+ ));
+ }
+ }
+ }
+
+ SetConnected(true);
+
+ Log(LogInformation, "GelfWriter")
+ << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+}
+
+void GelfWriter::ReconnectTimerHandler()
+{
+ m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
+}
+
+void GelfWriter::Disconnect()
+{
+ AssertOnWorkQueue();
+
+ DisconnectInternal();
+}
+
+void GelfWriter::DisconnectInternal()
+{
+ if (!GetConnected())
+ return;
+
+ if (m_Stream.first) {
+ boost::system::error_code ec;
+ m_Stream.first->next_layer().shutdown(ec);
+
+ // https://stackoverflow.com/a/25703699
+ // As long as the error code's category is not an SSL category, then the protocol was securely shutdown
+ if (ec.category() == boost::asio::error::get_ssl_category()) {
+ Log(LogCritical, "GelfWriter")
+ << "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
+ }
+ } else if (m_Stream.second) {
+ m_Stream.second->close();
+ }
+
+ SetConnected(false);
+
+}
+
+void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
+}
+
+void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
+
+ Log(LogDebug, "GelfWriter")
+ << "Processing check result for '" << checkable->GetName() << "'";
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ if (service) {
+ fields->Set("_service_name", service->GetShortName());
+ fields->Set("_service_state", Service::StateToString(service->GetState()));
+ fields->Set("_last_state", service->GetLastState());
+ fields->Set("_last_hard_state", service->GetLastHardState());
+ } else {
+ fields->Set("_last_state", host->GetLastState());
+ fields->Set("_last_hard_state", host->GetLastHardState());
+ }
+
+ fields->Set("_hostname", host->GetName());
+ fields->Set("_type", "CHECK RESULT");
+ fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+
+ fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
+ fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
+
+ fields->Set("_reachable", checkable->IsReachable());
+
+ CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
+
+ if (checkCommand)
+ fields->Set("_check_command", checkCommand->GetName());
+
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ fields->Set("_latency", cr->CalculateLatency());
+ fields->Set("_execution_time", cr->CalculateExecutionTime());
+ fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
+ fields->Set("full_message", cr->GetOutput());
+ fields->Set("_check_source", cr->GetCheckSource());
+ ts = cr->GetExecutionEnd();
+ }
+
+ if (cr && GetEnableSendPerfdata()) {
+ Array::Ptr perfdata = cr->GetPerformanceData();
+
+ if (perfdata) {
+ ObjectLock olock(perfdata);
+ for (const Value& val : perfdata) {
+ PerfdataValue::Ptr pdv;
+
+ if (val.IsObjectType<PerfdataValue>())
+ pdv = val;
+ else {
+ try {
+ pdv = PerfdataValue::Parse(val);
+ } catch (const std::exception&) {
+ Log(LogWarning, "GelfWriter")
+ << "Ignoring invalid perfdata for checkable '"
+ << checkable->GetName() << "' and command '"
+ << checkCommand->GetName() << "' with value: " << val;
+ continue;
+ }
+ }
+
+ String escaped_key = pdv->GetLabel();
+ boost::replace_all(escaped_key, " ", "_");
+ boost::replace_all(escaped_key, ".", "_");
+ boost::replace_all(escaped_key, "\\", "_");
+ boost::algorithm::replace_all(escaped_key, "::", ".");
+
+ fields->Set("_" + escaped_key, pdv->GetValue());
+
+ if (!pdv->GetMin().IsEmpty())
+ fields->Set("_" + escaped_key + "_min", pdv->GetMin());
+ if (!pdv->GetMax().IsEmpty())
+ fields->Set("_" + escaped_key + "_max", pdv->GetMax());
+ if (!pdv->GetWarn().IsEmpty())
+ fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
+ if (!pdv->GetCrit().IsEmpty())
+ fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
+
+ if (!pdv->GetUnit().IsEmpty())
+ fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
+ }
+ }
+ }
+
+ SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
+}
+
+void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+ const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
+ const String& author, const String& commentText, const String& commandName)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() {
+ NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName);
+ });
+}
+
+void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+ const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
+ const String& author, const String& commentText, const String& commandName)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
+
+ Log(LogDebug, "GelfWriter")
+ << "Processing notification for '" << checkable->GetName() << "'";
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types.
+
+ String authorComment = "";
+
+ if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
+ authorComment = author + ";" + commentText;
+ }
+
+ String output;
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ output = CompatUtility::GetCheckResultOutput(cr);
+ ts = cr->GetExecutionEnd();
+ }
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ if (service) {
+ fields->Set("_type", "SERVICE NOTIFICATION");
+ //TODO: fix this to _service_name
+ fields->Set("_service", service->GetShortName());
+ fields->Set("short_message", output);
+ } else {
+ fields->Set("_type", "HOST NOTIFICATION");
+ fields->Set("short_message", output);
+ }
+
+ fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+
+ fields->Set("_hostname", host->GetName());
+ fields->Set("_command", commandName);
+ fields->Set("_notification_type", notificationTypeString);
+ fields->Set("_comment", authorComment);
+
+ CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+ if (commandObj)
+ fields->Set("_check_command", commandObj->GetName());
+
+ SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
+}
+
+void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+ if (IsPaused())
+ return;
+
+ m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
+}
+
+void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+ AssertOnWorkQueue();
+
+ CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
+
+ Log(LogDebug, "GelfWriter")
+ << "Processing state change for '" << checkable->GetName() << "'";
+
+ Host::Ptr host;
+ Service::Ptr service;
+ tie(host, service) = GetHostService(checkable);
+
+ Dictionary::Ptr fields = new Dictionary();
+
+ fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+ fields->Set("_type", "STATE CHANGE");
+ fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
+ fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
+ fields->Set("_hostname", host->GetName());
+
+ if (service) {
+ fields->Set("_service_name", service->GetShortName());
+ fields->Set("_service_state", Service::StateToString(service->GetState()));
+ fields->Set("_last_state", service->GetLastState());
+ fields->Set("_last_hard_state", service->GetLastHardState());
+ } else {
+ fields->Set("_last_state", host->GetLastState());
+ fields->Set("_last_hard_state", host->GetLastHardState());
+ }
+
+ CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+ if (commandObj)
+ fields->Set("_check_command", commandObj->GetName());
+
+ double ts = Utility::GetTime();
+
+ if (cr) {
+ fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
+ fields->Set("full_message", cr->GetOutput());
+ fields->Set("_check_source", cr->GetCheckSource());
+ ts = cr->GetExecutionEnd();
+ }
+
+ SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
+}
+
+String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
+{
+ fields->Set("version", "1.1");
+ fields->Set("host", source);
+ fields->Set("timestamp", ts);
+
+ return JsonEncode(fields);
+}
+
+void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
+{
+ std::ostringstream msgbuf;
+ msgbuf << gelfMessage;
+ msgbuf << '\0';
+
+ String log = msgbuf.str();
+
+ if (!GetConnected())
+ return;
+
+ try {
+ Log(LogDebug, "GelfWriter")
+ << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
+
+ if (m_Stream.first) {
+ boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
+ m_Stream.first->flush();
+ } else {
+ boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
+ m_Stream.second->flush();
+ }
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "GelfWriter")
+ << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ throw ex;
+ }
+}