/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #include "perfdata/gelfwriter.hpp" #include "perfdata/gelfwriter-ti.cpp" #include "icinga/service.hpp" #include "icinga/notification.hpp" #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/compatutility.hpp" #include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" #include "base/application.hpp" #include "base/stream.hpp" #include "base/networkstream.hpp" #include "base/context.hpp" #include "base/exception.hpp" #include "base/json.hpp" #include "base/statsfunction.hpp" #include #include #include "base/io-engine.hpp" #include #include #include #include using namespace icinga; REGISTER_TYPE(GelfWriter); REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc); void GelfWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); m_WorkQueue.SetName("GelfWriter, " + GetName()); if (!GetEnableHa()) { Log(LogDebug, "GelfWriter") << "HA functionality disabled. Won't pause connection: " << GetName(); SetHAMode(HARunEverywhere); } else { SetHAMode(HARunOnce); } } void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { DictionaryData nodes; for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength(); double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0; nodes.emplace_back(gelfwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, { "connected", gelfwriter->GetConnected() }, { "source", gelfwriter->GetSource() } })); perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems)); perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); } status->Set("gelfwriter", new Dictionary(std::move(nodes))); } void GelfWriter::Resume() { ObjectImpl::Resume(); Log(LogInformation, "GelfWriter") << "'" << GetName() << "' resumed."; /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); /* Timer for reconnecting */ m_ReconnectTimer = Timer::Create(); m_ReconnectTimer->SetInterval(10); m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); m_ReconnectTimer->Start(); m_ReconnectTimer->Reschedule(0); /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { CheckResultHandler(checkable, cr); }); m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification, const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr, const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) { NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName); }); m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) { StateChangeHandler(checkable, cr, type); }); } /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ void GelfWriter::Pause() { m_HandleCheckResults.disconnect(); m_HandleNotifications.disconnect(); m_HandleStateChanges.disconnect(); m_ReconnectTimer->Stop(true); m_WorkQueue.Enqueue([this]() { try { ReconnectInternal(); } catch (const std::exception&) { Log(LogInformation, "GelfWriter") << "Unable to connect, not flushing buffers. Data may be lost."; } }, PriorityImmediate); m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow); m_WorkQueue.Join(); Log(LogInformation, "GelfWriter") << "'" << GetName() << "' paused."; ObjectImpl::Pause(); } void GelfWriter::AssertOnWorkQueue() { ASSERT(m_WorkQueue.IsWorkerThread()); } void GelfWriter::ExceptionHandler(boost::exception_ptr exp) { Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false); Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true); DisconnectInternal(); } void GelfWriter::Reconnect() { AssertOnWorkQueue(); if (IsPaused()) { SetConnected(false); return; } ReconnectInternal(); } void GelfWriter::ReconnectInternal() { double startTime = Utility::GetTime(); CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'"); SetShouldConnect(true); if (GetConnected()) return; Log(LogNotice, "GelfWriter") << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; bool ssl = GetEnableTls(); if (ssl) { Shared::Ptr sslContext; try { sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); } catch (const std::exception& ex) { Log(LogWarning, "GelfWriter") << "Unable to create SSL context."; throw; } m_Stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); } else { m_Stream.second = Shared::Make(IoEngine::Get().GetIoContext()); } try { icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort()); } catch (const std::exception& ex) { Log(LogWarning, "GelfWriter") << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'"; throw; } if (ssl) { auto& tlsStream (m_Stream.first->next_layer()); try { tlsStream.handshake(tlsStream.client); } catch (const std::exception& ex) { Log(LogWarning, "GelfWriter") << "TLS handshake with host '" << GetHost() << " failed.'"; throw; } if (!GetInsecureNoverify()) { if (!tlsStream.GetPeerCertificate()) { BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate.")); } if (!tlsStream.IsVerifyOK()) { BOOST_THROW_EXCEPTION(std::runtime_error( "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) )); } } } SetConnected(true); Log(LogInformation, "GelfWriter") << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; } void GelfWriter::ReconnectTimerHandler() { m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal); } void GelfWriter::Disconnect() { AssertOnWorkQueue(); DisconnectInternal(); } void GelfWriter::DisconnectInternal() { if (!GetConnected()) return; if (m_Stream.first) { boost::system::error_code ec; m_Stream.first->next_layer().shutdown(ec); // https://stackoverflow.com/a/25703699 // As long as the error code's category is not an SSL category, then the protocol was securely shutdown if (ec.category() == boost::asio::error::get_ssl_category()) { Log(LogCritical, "GelfWriter") << "TLS shutdown with host '" << GetHost() << "' could not be done securely."; } } else if (m_Stream.second) { m_Stream.second->close(); } SetConnected(false); } void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { if (IsPaused()) return; m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); }); } void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { AssertOnWorkQueue(); CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") << "Processing check result for '" << checkable->GetName() << "'"; Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); Dictionary::Ptr fields = new Dictionary(); if (service) { fields->Set("_service_name", service->GetShortName()); fields->Set("_service_state", Service::StateToString(service->GetState())); fields->Set("_last_state", service->GetLastState()); fields->Set("_last_hard_state", service->GetLastHardState()); } else { fields->Set("_last_state", host->GetLastState()); fields->Set("_last_hard_state", host->GetLastHardState()); } fields->Set("_hostname", host->GetName()); fields->Set("_type", "CHECK RESULT"); fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); fields->Set("_current_check_attempt", checkable->GetCheckAttempt()); fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts()); fields->Set("_reachable", checkable->IsReachable()); CheckCommand::Ptr checkCommand = checkable->GetCheckCommand(); if (checkCommand) fields->Set("_check_command", checkCommand->GetName()); double ts = Utility::GetTime(); if (cr) { fields->Set("_latency", cr->CalculateLatency()); fields->Set("_execution_time", cr->CalculateExecutionTime()); fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); fields->Set("full_message", cr->GetOutput()); fields->Set("_check_source", cr->GetCheckSource()); ts = cr->GetExecutionEnd(); } if (cr && GetEnableSendPerfdata()) { Array::Ptr perfdata = cr->GetPerformanceData(); if (perfdata) { ObjectLock olock(perfdata); for (const Value& val : perfdata) { PerfdataValue::Ptr pdv; if (val.IsObjectType()) pdv = val; else { try { pdv = PerfdataValue::Parse(val); } catch (const std::exception&) { Log(LogWarning, "GelfWriter") << "Ignoring invalid perfdata for checkable '" << checkable->GetName() << "' and command '" << checkCommand->GetName() << "' with value: " << val; continue; } } String escaped_key = pdv->GetLabel(); boost::replace_all(escaped_key, " ", "_"); boost::replace_all(escaped_key, ".", "_"); boost::replace_all(escaped_key, "\\", "_"); boost::algorithm::replace_all(escaped_key, "::", "."); fields->Set("_" + escaped_key, pdv->GetValue()); if (!pdv->GetMin().IsEmpty()) fields->Set("_" + escaped_key + "_min", pdv->GetMin()); if (!pdv->GetMax().IsEmpty()) fields->Set("_" + escaped_key + "_max", pdv->GetMax()); if (!pdv->GetWarn().IsEmpty()) fields->Set("_" + escaped_key + "_warn", pdv->GetWarn()); if (!pdv->GetCrit().IsEmpty()) fields->Set("_" + escaped_key + "_crit", pdv->GetCrit()); if (!pdv->GetUnit().IsEmpty()) fields->Set("_" + escaped_key + "_unit", pdv->GetUnit()); } } } SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts)); } void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr, const String& author, const String& commentText, const String& commandName) { if (IsPaused()) return; m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() { NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName); }); } void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable, const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr, const String& author, const String& commentText, const String& commandName) { AssertOnWorkQueue(); CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") << "Processing notification for '" << checkable->GetName() << "'"; Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types. String authorComment = ""; if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) { authorComment = author + ";" + commentText; } String output; double ts = Utility::GetTime(); if (cr) { output = CompatUtility::GetCheckResultOutput(cr); ts = cr->GetExecutionEnd(); } Dictionary::Ptr fields = new Dictionary(); if (service) { fields->Set("_type", "SERVICE NOTIFICATION"); //TODO: fix this to _service_name fields->Set("_service", service->GetShortName()); fields->Set("short_message", output); } else { fields->Set("_type", "HOST NOTIFICATION"); fields->Set("short_message", output); } fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); fields->Set("_hostname", host->GetName()); fields->Set("_command", commandName); fields->Set("_notification_type", notificationTypeString); fields->Set("_comment", authorComment); CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); if (commandObj) fields->Set("_check_command", commandObj->GetName()); SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts)); } void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) { if (IsPaused()) return; m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); }); } void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) { AssertOnWorkQueue(); CONTEXT("GELF Processing state change '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") << "Processing state change for '" << checkable->GetName() << "'"; Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); Dictionary::Ptr fields = new Dictionary(); fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState())); fields->Set("_type", "STATE CHANGE"); fields->Set("_current_check_attempt", checkable->GetCheckAttempt()); fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts()); fields->Set("_hostname", host->GetName()); if (service) { fields->Set("_service_name", service->GetShortName()); fields->Set("_service_state", Service::StateToString(service->GetState())); fields->Set("_last_state", service->GetLastState()); fields->Set("_last_hard_state", service->GetLastHardState()); } else { fields->Set("_last_state", host->GetLastState()); fields->Set("_last_hard_state", host->GetLastHardState()); } CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); if (commandObj) fields->Set("_check_command", commandObj->GetName()); double ts = Utility::GetTime(); if (cr) { fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr)); fields->Set("full_message", cr->GetOutput()); fields->Set("_check_source", cr->GetCheckSource()); ts = cr->GetExecutionEnd(); } SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts)); } String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts) { fields->Set("version", "1.1"); fields->Set("host", source); fields->Set("timestamp", ts); return JsonEncode(fields); } void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage) { std::ostringstream msgbuf; msgbuf << gelfMessage; msgbuf << '\0'; String log = msgbuf.str(); if (!GetConnected()) return; try { Log(LogDebug, "GelfWriter") << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'."; if (m_Stream.first) { boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str())); m_Stream.first->flush(); } else { boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str())); m_Stream.second->flush(); } } catch (const std::exception& ex) { Log(LogCritical, "GelfWriter") << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; throw ex; } }