diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:34:54 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:34:54 +0000 |
commit | 0915b3ef56dfac3113cce55a59a5765dc94976be (patch) | |
tree | a8fea11d50b4f083e1bf0f90025ece7f0824784a /lib/remote/apilistener.cpp | |
parent | Initial commit. (diff) | |
download | icinga2-upstream.tar.xz icinga2-upstream.zip |
Adding upstream version 2.13.6.upstream/2.13.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | lib/remote/apilistener.cpp | 1929 |
1 files changed, 1929 insertions, 0 deletions
diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp new file mode 100644 index 0000000..472bd90 --- /dev/null +++ b/lib/remote/apilistener.cpp @@ -0,0 +1,1929 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "remote/apilistener.hpp" +#include "remote/apilistener-ti.cpp" +#include "remote/jsonrpcconnection.hpp" +#include "remote/endpoint.hpp" +#include "remote/jsonrpc.hpp" +#include "remote/apifunction.hpp" +#include "remote/configpackageutility.hpp" +#include "remote/configobjectutility.hpp" +#include "base/convert.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/netstring.hpp" +#include "base/json.hpp" +#include "base/configtype.hpp" +#include "base/logger.hpp" +#include "base/objectlock.hpp" +#include "base/stdiostream.hpp" +#include "base/perfdatavalue.hpp" +#include "base/application.hpp" +#include "base/context.hpp" +#include "base/statsfunction.hpp" +#include "base/exception.hpp" +#include "base/tcpsocket.hpp" +#include <boost/asio/buffer.hpp> +#include <boost/asio/io_context_strand.hpp> +#include <boost/asio/ip/tcp.hpp> +#include <boost/asio/spawn.hpp> +#include <boost/asio/ssl/context.hpp> +#include <boost/date_time/posix_time/posix_time_duration.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/regex.hpp> +#include <boost/system/error_code.hpp> +#include <boost/thread/locks.hpp> +#include <climits> +#include <cstdint> +#include <fstream> +#include <memory> +#include <openssl/ssl.h> +#include <openssl/tls1.h> +#include <openssl/x509.h> +#include <sstream> +#include <utility> + +using namespace icinga; + +REGISTER_TYPE(ApiListener); + +boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged; +ApiListener::Ptr ApiListener::m_Instance; + +REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc); + +REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler); + +ApiListener::ApiListener() +{ + m_RelayQueue.SetName("ApiListener, RelayQueue"); + m_SyncQueue.SetName("ApiListener, SyncQueue"); +} + +String ApiListener::GetApiDir() +{ + return Configuration::DataDir + "/api/"; +} + +String ApiListener::GetApiZonesDir() +{ + return GetApiDir() + "zones/"; +} + +String ApiListener::GetApiZonesStageDir() +{ + return GetApiDir() + "zones-stage/"; +} + +String ApiListener::GetCertsDir() +{ + return Configuration::DataDir + "/certs/"; +} + +String ApiListener::GetCaDir() +{ + return Configuration::DataDir + "/ca/"; +} + +String ApiListener::GetCertificateRequestsDir() +{ + return Configuration::DataDir + "/certificate-requests/"; +} + +String ApiListener::GetDefaultCertPath() +{ + return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".crt"; +} + +String ApiListener::GetDefaultKeyPath() +{ + return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".key"; +} + +String ApiListener::GetDefaultCaPath() +{ + return GetCertsDir() + "/ca.crt"; +} + +double ApiListener::GetTlsHandshakeTimeout() const +{ + return Configuration::TlsHandshakeTimeout; +} + +void ApiListener::SetTlsHandshakeTimeout(double value, bool suppress_events, const Value& cookie) +{ + Configuration::TlsHandshakeTimeout = value; +} + +void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& newCertPath) +{ + struct stat st1, st2; + + if (!oldCertPath.IsEmpty() && stat(oldCertPath.CStr(), &st1) >= 0 && (stat(newCertPath.CStr(), &st2) < 0 || st1.st_mtime > st2.st_mtime)) { + Log(LogWarning, "ApiListener") + << "Copying '" << oldCertPath << "' certificate file to '" << newCertPath << "'"; + + Utility::MkDirP(Utility::DirName(newCertPath), 0700); + Utility::CopyFile(oldCertPath, newCertPath); + } +} + +void ApiListener::OnConfigLoaded() +{ + if (m_Instance) + BOOST_THROW_EXCEPTION(ScriptError("Only one ApiListener object is allowed.", GetDebugInfo())); + + m_Instance = this; + + String defaultCertPath = GetDefaultCertPath(); + String defaultKeyPath = GetDefaultKeyPath(); + String defaultCaPath = GetDefaultCaPath(); + + /* Migrate certificate location < 2.8 to the new default path. */ + String oldCertPath = GetCertPath(); + String oldKeyPath = GetKeyPath(); + String oldCaPath = GetCaPath(); + + CopyCertificateFile(oldCertPath, defaultCertPath); + CopyCertificateFile(oldKeyPath, defaultKeyPath); + CopyCertificateFile(oldCaPath, defaultCaPath); + + if (!oldCertPath.IsEmpty() && !oldKeyPath.IsEmpty() && !oldCaPath.IsEmpty()) { + Log(LogWarning, "ApiListener", "Please read the upgrading documentation for v2.8: https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/"); + } + + /* Create the internal API object storage. */ + ConfigObjectUtility::CreateStorage(); + + /* Cache API packages and their active stage name. */ + UpdateActivePackageStagesCache(); + + /* set up SSL context */ + std::shared_ptr<X509> cert; + try { + cert = GetX509Certificate(defaultCertPath); + } catch (const std::exception&) { + BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '" + + defaultCertPath + "'.", GetDebugInfo())); + } + + try { + SetIdentity(GetCertificateCN(cert)); + } catch (const std::exception&) { + BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '" + + defaultCertPath + "'.", GetDebugInfo())); + } + + Log(LogInformation, "ApiListener") + << "My API identity: " << GetIdentity(); + + UpdateSSLContext(); +} + +std::shared_ptr<X509> ApiListener::RenewCert(const std::shared_ptr<X509>& cert) +{ + std::shared_ptr<EVP_PKEY> pubkey (X509_get_pubkey(cert.get()), EVP_PKEY_free); + auto subject (X509_get_subject_name(cert.get())); + auto cacert (GetX509Certificate(GetDefaultCaPath())); + auto newcert (CreateCertIcingaCA(pubkey.get(), subject)); + + /* verify that the new cert matches the CA we're using for the ApiListener; + * this ensures that the CA we have in /var/lib/icinga2/ca matches the one + * we're using for cluster connections (there's no point in sending a client + * a certificate it wouldn't be able to use to connect to us anyway) */ + try { + if (!VerifyCertificate(cacert, newcert, GetCrlPath())) { + Log(LogWarning, "ApiListener") + << "The CA in '" << GetDefaultCaPath() << "' does not match the CA which Icinga uses " + << "for its own cluster connections. This is most likely a configuration problem."; + + return nullptr; + } + } catch (const std::exception&) { } /* Swallow the exception on purpose, cacert will never be a non-CA certificate. */ + + return newcert; +} + +void ApiListener::UpdateSSLContext() +{ + auto ctx (SetupSslContext(GetDefaultCertPath(), GetDefaultKeyPath(), GetDefaultCaPath(), GetCrlPath(), GetCipherList(), GetTlsProtocolmin(), GetDebugInfo())); + + { + boost::unique_lock<decltype(m_SSLContextMutex)> lock (m_SSLContextMutex); + + m_SSLContext = std::move(ctx); + } + + for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) { + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { + client->Disconnect(); + } + } + + for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) { + client->Disconnect(); + } +} + +void ApiListener::OnAllConfigLoaded() +{ + m_LocalEndpoint = Endpoint::GetByName(GetIdentity()); + + if (!m_LocalEndpoint) + BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo())); +} + +/** + * Starts the component. + */ +void ApiListener::Start(bool runtimeCreated) +{ + Log(LogInformation, "ApiListener") + << "'" << GetName() << "' started."; + + SyncLocalZoneDirs(); + + m_RenewOwnCertTimer = new Timer(); + + if (Utility::PathExists(GetIcingaCADir() + "/ca.key")) { + RenewOwnCert(); + m_RenewOwnCertTimer->OnTimerExpired.connect([this](const Timer * const&) { RenewOwnCert(); }); + } else { + m_RenewOwnCertTimer->OnTimerExpired.connect([this](const Timer * const&) { + JsonRpcConnection::SendCertificateRequest(nullptr, nullptr, String()); + }); + } + + m_RenewOwnCertTimer->SetInterval(RENEW_INTERVAL); + m_RenewOwnCertTimer->Start(); + + ObjectImpl<ApiListener>::Start(runtimeCreated); + + { + std::unique_lock<std::mutex> lock(m_LogLock); + OpenLogFile(); + } + + /* create the primary JSON-RPC listener */ + if (!AddListener(GetBindHost(), GetBindPort())) { + Log(LogCritical, "ApiListener") + << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'."; + Application::Exit(EXIT_FAILURE); + } + + m_Timer = new Timer(); + m_Timer->OnTimerExpired.connect([this](const Timer * const&) { ApiTimerHandler(); }); + m_Timer->SetInterval(5); + m_Timer->Start(); + m_Timer->Reschedule(0); + + m_ReconnectTimer = new Timer(); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ApiReconnectTimerHandler(); }); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); + + /* Keep this in relative sync with the cold startup in UpdateObjectAuthority() and the reconnect interval above. + * Previous: 60s reconnect, 30s OA, 60s cold startup. + * Now: 10s reconnect, 10s OA, 30s cold startup. + */ + m_AuthorityTimer = new Timer(); + m_AuthorityTimer->OnTimerExpired.connect([](const Timer * const&) { UpdateObjectAuthority(); }); + m_AuthorityTimer->SetInterval(10); + m_AuthorityTimer->Start(); + + m_CleanupCertificateRequestsTimer = new Timer(); + m_CleanupCertificateRequestsTimer->OnTimerExpired.connect([this](const Timer * const&) { CleanupCertificateRequestsTimerHandler(); }); + m_CleanupCertificateRequestsTimer->SetInterval(3600); + m_CleanupCertificateRequestsTimer->Start(); + m_CleanupCertificateRequestsTimer->Reschedule(0); + + m_ApiPackageIntegrityTimer = new Timer(); + m_ApiPackageIntegrityTimer->OnTimerExpired.connect([this](const Timer * const&) { CheckApiPackageIntegrity(); }); + m_ApiPackageIntegrityTimer->SetInterval(300); + m_ApiPackageIntegrityTimer->Start(); + + OnMasterChanged(true); +} + +void ApiListener::RenewOwnCert() +{ + auto certPath (GetDefaultCertPath()); + auto cert (GetX509Certificate(certPath)); + + if (IsCertUptodate(cert)) { + return; + } + + Log(LogInformation, "ApiListener") + << "Our certificate will expire soon, but we own the CA. Renewing."; + + cert = RenewCert(cert); + + if (!cert) { + return; + } + + std::fstream certfp; + auto tempCertPath (Utility::CreateTempFile(certPath + ".XXXXXX", 0644, certfp)); + + certfp.exceptions(std::ofstream::failbit | std::ofstream::badbit); + certfp << CertificateToString(cert); + certfp.close(); + + Utility::RenameFile(tempCertPath, certPath); + UpdateSSLContext(); +} + +void ApiListener::Stop(bool runtimeDeleted) +{ + ObjectImpl<ApiListener>::Stop(runtimeDeleted); + + Log(LogInformation, "ApiListener") + << "'" << GetName() << "' stopped."; + + { + std::unique_lock<std::mutex> lock(m_LogLock); + CloseLogFile(); + RotateLogFile(); + } + + RemoveStatusFile(); +} + +ApiListener::Ptr ApiListener::GetInstance() +{ + return m_Instance; +} + +Endpoint::Ptr ApiListener::GetMaster() const +{ + Zone::Ptr zone = Zone::GetLocalZone(); + + if (!zone) + return nullptr; + + std::vector<String> names; + + for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) + if (endpoint->GetConnected() || endpoint->GetName() == GetIdentity()) + names.push_back(endpoint->GetName()); + + std::sort(names.begin(), names.end()); + + return Endpoint::GetByName(*names.begin()); +} + +bool ApiListener::IsMaster() const +{ + Endpoint::Ptr master = GetMaster(); + + if (!master) + return false; + + return master == GetLocalEndpoint(); +} + +/** + * Creates a new JSON-RPC listener on the specified port. + * + * @param node The host the listener should be bound to. + * @param service The port to listen on. + */ +bool ApiListener::AddListener(const String& node, const String& service) +{ + namespace asio = boost::asio; + namespace ip = asio::ip; + using ip::tcp; + + ObjectLock olock(this); + + if (!m_SSLContext) { + Log(LogCritical, "ApiListener", "SSL context is required for AddListener()"); + return false; + } + + auto& io (IoEngine::Get().GetIoContext()); + auto acceptor (Shared<tcp::acceptor>::Make(io)); + + try { + tcp::resolver resolver (io); + tcp::resolver::query query (node, service, tcp::resolver::query::passive); + + auto result (resolver.resolve(query)); + auto current (result.begin()); + + for (;;) { + try { + acceptor->open(current->endpoint().protocol()); + + { + auto fd (acceptor->native_handle()); + + const int optFalse = 0; + setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&optFalse), sizeof(optFalse)); + + const int optTrue = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char *>(&optTrue), sizeof(optTrue)); +#ifdef SO_REUSEPORT + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast<const char *>(&optTrue), sizeof(optTrue)); +#endif /* SO_REUSEPORT */ + } + + acceptor->bind(current->endpoint()); + + break; + } catch (const std::exception&) { + if (++current == result.end()) { + throw; + } + + if (acceptor->is_open()) { + acceptor->close(); + } + } + } + } catch (const std::exception& ex) { + Log(LogCritical, "ApiListener") + << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "': " << ex.what(); + return false; + } + + acceptor->listen(INT_MAX); + + auto localEndpoint (acceptor->local_endpoint()); + + Log(LogInformation, "ApiListener") + << "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'"; + + IoEngine::SpawnCoroutine(io, [this, acceptor](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor); }); + + UpdateStatusFile(localEndpoint); + + return true; +} + +void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Shared<boost::asio::ip::tcp::acceptor>::Ptr& server) +{ + namespace asio = boost::asio; + + auto& io (IoEngine::Get().GetIoContext()); + + time_t lastModified = -1; + const String crlPath = GetCrlPath(); + + if (!crlPath.IsEmpty()) { + lastModified = Utility::GetFileCreationTime(crlPath); + } + + for (;;) { + try { + asio::ip::tcp::socket socket (io); + + server->async_accept(socket.lowest_layer(), yc); + + if (!crlPath.IsEmpty()) { + time_t currentCreationTime = Utility::GetFileCreationTime(crlPath); + + if (lastModified != currentCreationTime) { + UpdateSSLContext(); + + lastModified = currentCreationTime; + } + } + + boost::shared_lock<decltype(m_SSLContextMutex)> lock (m_SSLContextMutex); + auto sslConn (Shared<AsioTlsStream>::Make(io, *m_SSLContext)); + + lock.unlock(); + sslConn->lowest_layer() = std::move(socket); + + auto strand (Shared<asio::io_context::strand>::Make(io)); + + IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn](asio::yield_context yc) { + Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), + [sslConn](asio::yield_context yc) { + Log(LogWarning, "ApiListener") + << "Timeout while processing incoming connection from " + << sslConn->lowest_layer().remote_endpoint(); + + boost::system::error_code ec; + sslConn->lowest_layer().cancel(ec); + } + )); + Defer cancelTimeout([timeout]() { timeout->Cancel(); }); + + NewClientHandler(yc, strand, sslConn, String(), RoleServer); + }); + } catch (const std::exception& ex) { + Log(LogCritical, "ApiListener") + << "Cannot accept new connection: " << ex.what(); + } + } +} + +/** + * Creates a new JSON-RPC client and connects to the specified endpoint. + * + * @param endpoint The endpoint. + */ +void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) +{ + namespace asio = boost::asio; + using asio::ip::tcp; + + if (!m_SSLContext) { + Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()"); + return; + } + + auto& io (IoEngine::Get().GetIoContext()); + auto strand (Shared<asio::io_context::strand>::Make(io)); + + IoEngine::SpawnCoroutine(*strand, [this, strand, endpoint, &io](asio::yield_context yc) { + String host = endpoint->GetHost(); + String port = endpoint->GetPort(); + + Log(LogInformation, "ApiListener") + << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'"; + + try { + boost::shared_lock<decltype(m_SSLContextMutex)> lock (m_SSLContextMutex); + auto sslConn (Shared<AsioTlsStream>::Make(io, *m_SSLContext, endpoint->GetName())); + + lock.unlock(); + + Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), + [sslConn, endpoint, host, port](asio::yield_context yc) { + Log(LogCritical, "ApiListener") + << "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host + << "' and port '" << port << "', cancelling attempt"; + + boost::system::error_code ec; + sslConn->lowest_layer().cancel(ec); + } + )); + Defer cancelTimeout([&timeout]() { timeout->Cancel(); }); + + Connect(sslConn->lowest_layer(), host, port, yc); + + NewClientHandler(yc, strand, sslConn, endpoint->GetName(), RoleClient); + + endpoint->SetConnecting(false); + Log(LogInformation, "ApiListener") + << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'"; + } catch (const std::exception& ex) { + endpoint->SetConnecting(false); + + Log(LogCritical, "ApiListener") + << "Cannot connect to host '" << host << "' on port '" << port << "': " << ex.what(); + } + }); +} + +void ApiListener::NewClientHandler( + boost::asio::yield_context yc, const Shared<boost::asio::io_context::strand>::Ptr& strand, + const Shared<AsioTlsStream>::Ptr& client, const String& hostname, ConnectionRole role +) +{ + try { + NewClientHandlerInternal(yc, strand, client, hostname, role); + } catch (const std::exception& ex) { + Log(LogCritical, "ApiListener") + << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false); + + Log(LogDebug, "ApiListener") + << "Exception while handling new API client connection: " << DiagnosticInformation(ex); + } +} + +static const auto l_AppVersionInt (([]() -> unsigned long { + auto appVersion (Application::GetAppVersion()); + boost::regex rgx (R"EOF(^[rv]?(\d+)\.(\d+)\.(\d+))EOF"); + boost::smatch match; + + if (!boost::regex_search(appVersion.GetData(), match, rgx)) { + return 0; + } + + return 100u * 100u * boost::lexical_cast<unsigned long>(match[1].str()) + + 100u * boost::lexical_cast<unsigned long>(match[2].str()) + + boost::lexical_cast<unsigned long>(match[3].str()); +})()); + +static const auto l_MyCapabilities (ApiCapabilities::ExecuteArbitraryCommand); + +/** + * Processes a new client connection. + * + * @param client The new client. + */ +void ApiListener::NewClientHandlerInternal( + boost::asio::yield_context yc, const Shared<boost::asio::io_context::strand>::Ptr& strand, + const Shared<AsioTlsStream>::Ptr& client, const String& hostname, ConnectionRole role +) +{ + namespace asio = boost::asio; + namespace ssl = asio::ssl; + + String conninfo; + + { + std::ostringstream conninfo_; + + if (role == RoleClient) { + conninfo_ << "to"; + } else { + conninfo_ << "from"; + } + + auto endpoint (client->lowest_layer().remote_endpoint()); + + conninfo_ << " [" << endpoint.address() << "]:" << endpoint.port(); + + conninfo = conninfo_.str(); + } + + auto& sslConn (client->next_layer()); + + boost::system::error_code ec; + + { + Timeout::Ptr handshakeTimeout (new Timeout( + strand->context(), + *strand, + boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)), + [strand, client](asio::yield_context yc) { + boost::system::error_code ec; + client->lowest_layer().cancel(ec); + } + )); + + sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]); + + handshakeTimeout->Cancel(); + } + + if (ec) { + // https://github.com/boostorg/beast/issues/915 + // Google Chrome 73+ seems not close the connection properly, https://stackoverflow.com/questions/56272906/how-to-fix-certificate-unknown-error-from-chrome-v73 + if (ec == asio::ssl::error::stream_truncated) { + Log(LogNotice, "ApiListener") + << "TLS stream was truncated, ignoring connection from " << conninfo; + return; + } + + Log(LogCritical, "ApiListener") + << "Client TLS handshake failed (" << conninfo << "): " << ec.message(); + return; + } + + bool willBeShutDown = false; + + Defer shutDownIfNeeded ([&sslConn, &willBeShutDown, &yc]() { + if (!willBeShutDown) { + // Ignore the error, but do not throw an exception being swallowed at all cost. + // https://github.com/Icinga/icinga2/issues/7351 + boost::system::error_code ec; + sslConn.async_shutdown(yc[ec]); + } + }); + + std::shared_ptr<X509> cert (sslConn.GetPeerCertificate()); + bool verify_ok = false; + String identity; + Endpoint::Ptr endpoint; + + if (cert) { + verify_ok = sslConn.IsVerifyOK(); + + String verifyError = sslConn.GetVerifyError(); + + try { + identity = GetCertificateCN(cert); + } catch (const std::exception&) { + Log(LogCritical, "ApiListener") + << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'."; + return; + } + + if (!hostname.IsEmpty()) { + if (identity != hostname) { + Log(LogWarning, "ApiListener") + << "Unexpected certificate common name while connecting to endpoint '" + << hostname << "': got '" << identity << "'"; + return; + } else if (!verify_ok) { + Log(LogWarning, "ApiListener") + << "Certificate validation failed for endpoint '" << hostname + << "': " << verifyError; + } + } + + if (verify_ok) { + endpoint = Endpoint::GetByName(identity); + } + + Log log(LogInformation, "ApiListener"); + + log << "New client connection for identity '" << identity << "' " << conninfo; + + if (!verify_ok) { + log << " (certificate validation failed: " << verifyError << ")"; + } else if (!endpoint) { + log << " (no Endpoint object found for identity)"; + } + } else { + Log(LogInformation, "ApiListener") + << "New client connection " << conninfo << " (no client certificate)"; + } + + ClientType ctype; + + if (role == RoleClient) { + JsonRpc::SendMessage(client, new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "icinga::Hello" }, + { "params", new Dictionary({ + { "version", (double)l_AppVersionInt }, + { "capabilities", (double)l_MyCapabilities } + }) } + }), yc); + + client->async_flush(yc); + + ctype = ClientJsonRpc; + } else { + { + boost::system::error_code ec; + + if (client->async_fill(yc[ec]) == 0u) { + if (identity.IsEmpty()) { + Log(LogInformation, "ApiListener") + << "No data received on new API connection " << conninfo << ". " + << "Ensure that the remote endpoints are properly configured in a cluster setup."; + } else { + Log(LogWarning, "ApiListener") + << "No data received on new API connection " << conninfo << " for identity '" << identity << "'. " + << "Ensure that the remote endpoints are properly configured in a cluster setup."; + } + + return; + } + } + + char firstByte = 0; + + { + asio::mutable_buffer firstByteBuf (&firstByte, 1); + client->peek(firstByteBuf); + } + + if (firstByte >= '0' && firstByte <= '9') { + JsonRpc::SendMessage(client, new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "icinga::Hello" }, + { "params", new Dictionary({ + { "version", (double)l_AppVersionInt }, + { "capabilities", (double)l_MyCapabilities } + }) } + }), yc); + + client->async_flush(yc); + + ctype = ClientJsonRpc; + } else { + ctype = ClientHttp; + } + } + + if (ctype == ClientJsonRpc) { + Log(LogNotice, "ApiListener", "New JSON-RPC client"); + + if (endpoint && endpoint->GetConnected()) { + Log(LogNotice, "ApiListener") + << "Ignoring JSON-RPC connection " << conninfo + << ". We're already connected to Endpoint '" << endpoint->GetName() << "'."; + return; + } + + JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role); + + if (endpoint) { + endpoint->AddClient(aclient); + + Utility::QueueAsyncCallback([this, aclient, endpoint]() { + SyncClient(aclient, endpoint, true); + }); + } else if (!AddAnonymousClient(aclient)) { + Log(LogNotice, "ApiListener") + << "Ignoring anonymous JSON-RPC connection " << conninfo + << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded."; + + aclient = nullptr; + } + + if (aclient) { + aclient->Start(); + + willBeShutDown = true; + } + } else { + Log(LogNotice, "ApiListener", "New HTTP client"); + + HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client); + AddHttpClient(aclient); + aclient->Start(); + + willBeShutDown = true; + } +} + +void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint, bool needSync) +{ + Zone::Ptr eZone = endpoint->GetZone(); + + try { + { + ObjectLock olock(endpoint); + + endpoint->SetSyncing(true); + } + + Zone::Ptr myZone = Zone::GetLocalZone(); + auto parent (myZone->GetParent()); + + if (parent == eZone || !parent && eZone == myZone) { + JsonRpcConnection::SendCertificateRequest(aclient, nullptr, String()); + + if (Utility::PathExists(ApiListener::GetCertificateRequestsDir())) { + Utility::Glob(ApiListener::GetCertificateRequestsDir() + "/*.json", [aclient](const String& newPath) { + JsonRpcConnection::SendCertificateRequest(aclient, nullptr, newPath); + }, GlobFile); + } + } + + /* Make sure that the config updates are synced + * before the logs are replayed. + */ + + Log(LogInformation, "ApiListener") + << "Sending config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'."; + + /* sync zone file config */ + SendConfigUpdate(aclient); + + Log(LogInformation, "ApiListener") + << "Finished sending config file updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'."; + + /* sync runtime config */ + SendRuntimeConfigObjects(aclient); + + Log(LogInformation, "ApiListener") + << "Finished sending runtime config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'."; + + if (!needSync) { + ObjectLock olock2(endpoint); + endpoint->SetSyncing(false); + return; + } + + Log(LogInformation, "ApiListener") + << "Sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'."; + + ReplayLog(aclient); + + if (eZone == Zone::GetLocalZone()) + UpdateObjectAuthority(); + + Log(LogInformation, "ApiListener") + << "Finished sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'."; + } catch (const std::exception& ex) { + { + ObjectLock olock2(endpoint); + endpoint->SetSyncing(false); + } + + Log(LogCritical, "ApiListener") + << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false); + + Log(LogDebug, "ApiListener") + << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex); + } + + Log(LogInformation, "ApiListener") + << "Finished syncing endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'."; +} + +void ApiListener::ApiTimerHandler() +{ + double now = Utility::GetTime(); + + std::vector<int> files; + Utility::Glob(GetApiDir() + "log/*", [&files](const String& file) { LogGlobHandler(files, file); }, GlobFile); + std::sort(files.begin(), files.end()); + + for (int ts : files) { + bool need = false; + auto localZone (GetLocalEndpoint()->GetZone()); + + for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) { + if (endpoint == GetLocalEndpoint()) + continue; + + auto zone (endpoint->GetZone()); + + /* only care for endpoints in a) the same zone b) our parent zone c) immediate child zones */ + if (!(zone == localZone || zone == localZone->GetParent() || zone->GetParent() == localZone)) { + continue; + } + + if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration()) + continue; + + if (ts > endpoint->GetLocalLogPosition()) { + need = true; + break; + } + } + + if (!need) { + String path = GetApiDir() + "log/" + Convert::ToString(ts); + Log(LogNotice, "ApiListener") + << "Removing old log file: " << path; + (void)unlink(path.CStr()); + } + } + + for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) { + if (!endpoint->GetConnected()) + continue; + + double ts = endpoint->GetRemoteLogPosition(); + + if (ts == 0) + continue; + + Dictionary::Ptr lmessage = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "log::SetLogPosition" }, + { "params", new Dictionary({ + { "log_position", ts } + }) } + }); + + double maxTs = 0; + + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { + if (client->GetTimestamp() > maxTs) + maxTs = client->GetTimestamp(); + } + + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { + if (client->GetTimestamp() == maxTs) { + client->SendMessage(lmessage); + } else { + client->Disconnect(); + } + } + + Log(LogNotice, "ApiListener") + << "Setting log position for identity '" << endpoint->GetName() << "': " + << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts); + } +} + +void ApiListener::ApiReconnectTimerHandler() +{ + Zone::Ptr my_zone = Zone::GetLocalZone(); + + for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) { + /* don't connect to global zones */ + if (zone->GetGlobal()) + continue; + + /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */ + if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) { + Log(LogDebug, "ApiListener") + << "Not connecting to Zone '" << zone->GetName() + << "' because it's not in the same zone, a parent or a child zone."; + continue; + } + + for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) { + /* don't connect to ourselves */ + if (endpoint == GetLocalEndpoint()) { + Log(LogDebug, "ApiListener") + << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us."; + continue; + } + + /* don't try to connect to endpoints which don't have a host and port */ + if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) { + Log(LogDebug, "ApiListener") + << "Not connecting to Endpoint '" << endpoint->GetName() + << "' because the host/port attributes are missing."; + continue; + } + + /* don't try to connect if there's already a connection attempt */ + if (endpoint->GetConnecting()) { + Log(LogDebug, "ApiListener") + << "Not connecting to Endpoint '" << endpoint->GetName() + << "' because we're already trying to connect to it."; + continue; + } + + /* don't try to connect if we're already connected */ + if (endpoint->GetConnected()) { + Log(LogDebug, "ApiListener") + << "Not connecting to Endpoint '" << endpoint->GetName() + << "' because we're already connected to it."; + continue; + } + + /* Set connecting state to prevent duplicated queue inserts later. */ + endpoint->SetConnecting(true); + + AddConnection(endpoint); + } + } + + Endpoint::Ptr master = GetMaster(); + + if (master) + Log(LogNotice, "ApiListener") + << "Current zone master: " << master->GetName(); + + std::vector<String> names; + for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) + if (endpoint->GetConnected()) + names.emplace_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")"); + + Log(LogNotice, "ApiListener") + << "Connected endpoints: " << Utility::NaturalJoin(names); +} + +static void CleanupCertificateRequest(const String& path, double expiryTime) +{ +#ifndef _WIN32 + struct stat statbuf; + if (lstat(path.CStr(), &statbuf) < 0) + return; +#else /* _WIN32 */ + struct _stat statbuf; + if (_stat(path.CStr(), &statbuf) < 0) + return; +#endif /* _WIN32 */ + + if (statbuf.st_mtime < expiryTime) + (void) unlink(path.CStr()); +} + +void ApiListener::CleanupCertificateRequestsTimerHandler() +{ + String requestsDir = GetCertificateRequestsDir(); + + if (Utility::PathExists(requestsDir)) { + /* remove certificate requests that are older than a week */ + double expiryTime = Utility::GetTime() - 7 * 24 * 60 * 60; + Utility::Glob(requestsDir + "/*.json", [expiryTime](const String& path) { + CleanupCertificateRequest(path, expiryTime); + }, GlobFile); + } +} + +void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin, + const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log) +{ + if (!IsActive()) + return; + + m_RelayQueue.Enqueue([this, origin, secobj, message, log]() { SyncRelayMessage(origin, secobj, message, log); }, PriorityNormal, true); +} + +void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj) +{ + double ts = message->Get("ts"); + + ASSERT(ts != 0); + + Dictionary::Ptr pmessage = new Dictionary(); + pmessage->Set("timestamp", ts); + + pmessage->Set("message", JsonEncode(message)); + + if (secobj) { + Dictionary::Ptr secname = new Dictionary(); + secname->Set("type", secobj->GetReflectionType()->GetName()); + secname->Set("name", secobj->GetName()); + pmessage->Set("secobj", secname); + } + + std::unique_lock<std::mutex> lock(m_LogLock); + if (m_LogFile) { + NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage)); + m_LogMessageCount++; + SetLogMessageTimestamp(ts); + + if (m_LogMessageCount > 50000) { + CloseLogFile(); + RotateLogFile(); + OpenLogFile(); + } + } +} + +void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message) +{ + ObjectLock olock(endpoint); + + if (!endpoint->GetSyncing()) { + Log(LogNotice, "ApiListener") + << "Sending message '" << message->Get("method") << "' to '" << endpoint->GetName() << "'"; + + double maxTs = 0; + + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { + if (client->GetTimestamp() > maxTs) + maxTs = client->GetTimestamp(); + } + + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { + if (client->GetTimestamp() != maxTs) + continue; + + client->SendMessage(message); + } + } +} + +/** + * Relay a message to a directly connected zone or to a global zone. + * If some other zone is passed as the target zone, it is not relayed. + * + * @param targetZone The zone to relay to + * @param origin Information about where this message is relayed from (if it was not generated locally) + * @param message The message to relay + * @param currentZoneMaster The current master node of the local zone + * @return true if the message has been relayed to all relevant endpoints, + * false if it hasn't and must be persisted in the replay log + */ +bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentZoneMaster) +{ + ASSERT(targetZone); + + Zone::Ptr localZone = Zone::GetLocalZone(); + + /* only relay the message to a) the same local zone, b) the parent zone and c) direct child zones. Exception is a global zone. */ + if (!targetZone->GetGlobal() && + targetZone != localZone && + targetZone != localZone->GetParent() && + targetZone->GetParent() != localZone) { + return true; + } + + Endpoint::Ptr localEndpoint = GetLocalEndpoint(); + + std::vector<Endpoint::Ptr> skippedEndpoints; + + std::set<Zone::Ptr> allTargetZones; + if (targetZone->GetGlobal()) { + /* if the zone is global, the message has to be relayed to our local zone and direct children */ + allTargetZones.insert(localZone); + for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) { + if (zone->GetParent() == localZone) { + allTargetZones.insert(zone); + } + } + } else { + /* whereas if it's not global, the message is just relayed to the zone itself */ + allTargetZones.insert(targetZone); + } + + bool needsReplay = false; + + for (const Zone::Ptr& currentTargetZone : allTargetZones) { + bool relayed = false, log_needed = false, log_done = false; + + for (const Endpoint::Ptr& targetEndpoint : currentTargetZone->GetEndpoints()) { + /* Don't relay messages to ourselves. */ + if (targetEndpoint == localEndpoint) + continue; + + log_needed = true; + + /* Don't relay messages to disconnected endpoints. */ + if (!targetEndpoint->GetConnected()) { + if (currentTargetZone == localZone) + log_done = false; + + continue; + } + + log_done = true; + + /* Don't relay the message to the zone through more than one endpoint unless this is our own zone. + * 'relayed' is set to true on success below, enabling the checks in the second iteration. + */ + if (relayed && currentTargetZone != localZone) { + skippedEndpoints.push_back(targetEndpoint); + continue; + } + + /* Don't relay messages back to the endpoint which we got the message from. */ + if (origin && origin->FromClient && targetEndpoint == origin->FromClient->GetEndpoint()) { + skippedEndpoints.push_back(targetEndpoint); + continue; + } + + /* Don't relay messages back to the zone which we got the message from. */ + if (origin && origin->FromZone && currentTargetZone == origin->FromZone) { + skippedEndpoints.push_back(targetEndpoint); + continue; + } + + /* Only relay message to the zone master if we're not currently the zone master. + * e1 is zone master, e2 and e3 are zone members. + * + * Message is sent from e2 or e3: + * !isMaster == true + * targetEndpoint e1 is zone master -> send the message + * targetEndpoint e3 is not zone master -> skip it, avoid routing loops + * + * Message is sent from e1: + * !isMaster == false -> send the messages to e2 and e3 being the zone routing master. + */ + bool isMaster = (currentZoneMaster == localEndpoint); + + if (!isMaster && targetEndpoint != currentZoneMaster) { + skippedEndpoints.push_back(targetEndpoint); + continue; + } + + relayed = true; + + SyncSendMessage(targetEndpoint, message); + } + + if (log_needed && !log_done) { + needsReplay = true; + } + } + + if (!skippedEndpoints.empty()) { + double ts = message->Get("ts"); + + for (const Endpoint::Ptr& skippedEndpoint : skippedEndpoints) + skippedEndpoint->SetLocalLogPosition(ts); + } + + return !needsReplay; +} + +void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin, + const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log) +{ + double ts = Utility::GetTime(); + message->Set("ts", ts); + + Log(LogNotice, "ApiListener") + << "Relaying '" << message->Get("method") << "' message"; + + if (origin && origin->FromZone) + message->Set("originZone", origin->FromZone->GetName()); + + Zone::Ptr target_zone; + + if (secobj) { + if (secobj->GetReflectionType() == Zone::TypeInstance) + target_zone = static_pointer_cast<Zone>(secobj); + else + target_zone = static_pointer_cast<Zone>(secobj->GetZone()); + } + + if (!target_zone) + target_zone = Zone::GetLocalZone(); + + Endpoint::Ptr master = GetMaster(); + + bool need_log = !RelayMessageOne(target_zone, origin, message, master); + + for (const Zone::Ptr& zone : target_zone->GetAllParentsRaw()) { + if (!RelayMessageOne(zone, origin, message, master)) + need_log = true; + } + + if (log && need_log) + PersistMessage(message, secobj); +} + +/* must hold m_LogLock */ +void ApiListener::OpenLogFile() +{ + String path = GetApiDir() + "log/current"; + + Utility::MkDirP(Utility::DirName(path), 0750); + + auto *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app); + + if (!fp->good()) { + Log(LogWarning, "ApiListener") + << "Could not open spool file: " << path; + return; + } + + m_LogFile = new StdioStream(fp, true); + m_LogMessageCount = 0; + SetLogMessageTimestamp(Utility::GetTime()); +} + +/* must hold m_LogLock */ +void ApiListener::CloseLogFile() +{ + if (!m_LogFile) + return; + + m_LogFile->Close(); + m_LogFile.reset(); +} + +/* must hold m_LogLock */ +void ApiListener::RotateLogFile() +{ + double ts = GetLogMessageTimestamp(); + + if (ts == 0) + ts = Utility::GetTime(); + + String oldpath = GetApiDir() + "log/current"; + String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1); + + // If the log is being rotated more than once per second, + // don't overwrite the previous one, but silently deny rotation. + if (!Utility::PathExists(newpath)) { + try { + Utility::RenameFile(oldpath, newpath); + } catch (const std::exception& ex) { + Log(LogCritical, "ApiListener") + << "Cannot rotate replay log file from '" << oldpath << "' to '" + << newpath << "': " << ex.what(); + } + } +} + +void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file) +{ + String name = Utility::BaseName(file); + + if (name == "current") + return; + + int ts; + + try { + ts = Convert::ToLong(name); + } catch (const std::exception&) { + return; + } + + files.push_back(ts); +} + +void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) +{ + Endpoint::Ptr endpoint = client->GetEndpoint(); + + if (endpoint->GetLogDuration() == 0) { + ObjectLock olock2(endpoint); + endpoint->SetSyncing(false); + return; + } + + CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'"); + + int count = -1; + double peer_ts = endpoint->GetLocalLogPosition(); + double logpos_ts = peer_ts; + bool last_sync = false; + + Endpoint::Ptr target_endpoint = client->GetEndpoint(); + ASSERT(target_endpoint); + + Zone::Ptr target_zone = target_endpoint->GetZone(); + + if (!target_zone) { + ObjectLock olock2(endpoint); + endpoint->SetSyncing(false); + return; + } + + for (;;) { + std::unique_lock<std::mutex> lock(m_LogLock); + + CloseLogFile(); + + if (count == -1 || count > 50000) { + OpenLogFile(); + lock.unlock(); + } else { + last_sync = true; + } + + count = 0; + + std::vector<int> files; + Utility::Glob(GetApiDir() + "log/*", [&files](const String& file) { LogGlobHandler(files, file); }, GlobFile); + std::sort(files.begin(), files.end()); + + std::vector<std::pair<int, String>> allFiles; + + for (int ts : files) { + if (ts >= peer_ts) { + allFiles.emplace_back(ts, GetApiDir() + "log/" + Convert::ToString(ts)); + } + } + + allFiles.emplace_back(Utility::GetTime() + 1, GetApiDir() + "log/current"); + + for (auto& file : allFiles) { + Log(LogNotice, "ApiListener") + << "Replaying log: " << file.second; + + auto *fp = new std::fstream(file.second.CStr(), std::fstream::in | std::fstream::binary); + StdioStream::Ptr logStream = new StdioStream(fp, true); + + String message; + StreamReadContext src; + while (true) { + Dictionary::Ptr pmessage; + + try { + StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src); + + if (srs == StatusEof) + break; + + if (srs != StatusNewItem) + continue; + + pmessage = JsonDecode(message); + } catch (const std::exception&) { + Log(LogWarning, "ApiListener") + << "Unexpected end-of-file for cluster log: " << file.second; + + /* Log files may be incomplete or corrupted. This is perfectly OK. */ + break; + } + + if (pmessage->Get("timestamp") <= peer_ts) + continue; + + Dictionary::Ptr secname = pmessage->Get("secobj"); + + if (secname) { + ConfigObject::Ptr secobj = ConfigObject::GetObject(secname->Get("type"), secname->Get("name")); + + if (!secobj) + continue; + + if (!target_zone->CanAccessObject(secobj)) + continue; + } + + try { + client->SendRawMessage(pmessage->Get("message")); + count++; + } catch (const std::exception& ex) { + Log(LogWarning, "ApiListener") + << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false); + + Log(LogDebug, "ApiListener") + << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex); + + break; + } + + peer_ts = pmessage->Get("timestamp"); + + if (file.first > logpos_ts + 10) { + logpos_ts = file.first; + + Dictionary::Ptr lmessage = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "log::SetLogPosition" }, + { "params", new Dictionary({ + { "log_position", logpos_ts } + }) } + }); + + client->SendMessage(lmessage); + } + } + + logStream->Close(); + } + + if (count > 0) { + Log(LogInformation, "ApiListener") + << "Replayed " << count << " messages."; + } + else { + Log(LogNotice, "ApiListener") + << "Replayed " << count << " messages."; + } + + if (last_sync) { + { + ObjectLock olock2(endpoint); + endpoint->SetSyncing(false); + } + + OpenLogFile(); + + break; + } + } +} + +void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + std::pair<Dictionary::Ptr, Dictionary::Ptr> stats; + + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + stats = listener->GetStatus(); + + ObjectLock olock(stats.second); + for (const Dictionary::Pair& kv : stats.second) + perfdata->Add(new PerfdataValue("api_" + kv.first, kv.second)); + + status->Set("api", stats.first); +} + +std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus() +{ + Dictionary::Ptr perfdata = new Dictionary(); + + /* cluster stats */ + + double allEndpoints = 0; + Array::Ptr allNotConnectedEndpoints = new Array(); + Array::Ptr allConnectedEndpoints = new Array(); + + Zone::Ptr my_zone = Zone::GetLocalZone(); + + Dictionary::Ptr connectedZones = new Dictionary(); + + for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) { + /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */ + if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) { + Log(LogDebug, "ApiListener") + << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone."; + continue; + } + + bool zoneConnected = false; + int countZoneEndpoints = 0; + double zoneLag = 0; + + ArrayData zoneEndpoints; + + for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) { + zoneEndpoints.emplace_back(endpoint->GetName()); + + if (endpoint->GetName() == GetIdentity()) + continue; + + double eplag = CalculateZoneLag(endpoint); + + if (eplag > 0 && eplag > zoneLag) + zoneLag = eplag; + + allEndpoints++; + countZoneEndpoints++; + + if (!endpoint->GetConnected()) { + allNotConnectedEndpoints->Add(endpoint->GetName()); + } else { + allConnectedEndpoints->Add(endpoint->GetName()); + zoneConnected = true; + } + } + + /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */ + if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0) + zoneConnected = true; + + String parentZoneName; + Zone::Ptr parentZone = zone->GetParent(); + if (parentZone) + parentZoneName = parentZone->GetName(); + + Dictionary::Ptr zoneStats = new Dictionary({ + { "connected", zoneConnected }, + { "client_log_lag", zoneLag }, + { "endpoints", new Array(std::move(zoneEndpoints)) }, + { "parent_zone", parentZoneName } + }); + + connectedZones->Set(zone->GetName(), zoneStats); + } + + /* connection stats */ + size_t jsonRpcAnonymousClients = GetAnonymousClients().size(); + size_t httpClients = GetHttpClients().size(); + size_t syncQueueItems = m_SyncQueue.GetLength(); + size_t relayQueueItems = m_RelayQueue.GetLength(); + double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate(); + double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0; + double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0; + + Dictionary::Ptr status = new Dictionary({ + { "identity", GetIdentity() }, + { "num_endpoints", allEndpoints }, + { "num_conn_endpoints", allConnectedEndpoints->GetLength() }, + { "num_not_conn_endpoints", allNotConnectedEndpoints->GetLength() }, + { "conn_endpoints", allConnectedEndpoints }, + { "not_conn_endpoints", allNotConnectedEndpoints }, + + { "zones", connectedZones }, + + { "json_rpc", new Dictionary({ + { "anonymous_clients", jsonRpcAnonymousClients }, + { "sync_queue_items", syncQueueItems }, + { "relay_queue_items", relayQueueItems }, + { "work_queue_item_rate", workQueueItemRate }, + { "sync_queue_item_rate", syncQueueItemRate }, + { "relay_queue_item_rate", relayQueueItemRate } + }) }, + + { "http", new Dictionary({ + { "clients", httpClients } + }) } + }); + + /* performance data */ + perfdata->Set("num_endpoints", allEndpoints); + perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength())); + perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength())); + + perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients); + perfdata->Set("num_http_clients", httpClients); + perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems); + perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems); + + perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate); + perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate); + perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate); + + return std::make_pair(status, perfdata); +} + +double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint) +{ + double remoteLogPosition = endpoint->GetRemoteLogPosition(); + double eplag = Utility::GetTime() - remoteLogPosition; + + if ((endpoint->GetSyncing() || !endpoint->GetConnected()) && remoteLogPosition != 0) + return eplag; + + return 0; +} + +bool ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient) +{ + std::unique_lock<std::mutex> lock(m_AnonymousClientsLock); + + if (GetMaxAnonymousClients() >= 0 && (long)m_AnonymousClients.size() + 1 > (long)GetMaxAnonymousClients()) + return false; + + m_AnonymousClients.insert(aclient); + return true; +} + +void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient) +{ + std::unique_lock<std::mutex> lock(m_AnonymousClientsLock); + m_AnonymousClients.erase(aclient); +} + +std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients() const +{ + std::unique_lock<std::mutex> lock(m_AnonymousClientsLock); + return m_AnonymousClients; +} + +void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient) +{ + std::unique_lock<std::mutex> lock(m_HttpClientsLock); + m_HttpClients.insert(aclient); +} + +void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient) +{ + std::unique_lock<std::mutex> lock(m_HttpClientsLock); + m_HttpClients.erase(aclient); +} + +std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients() const +{ + std::unique_lock<std::mutex> lock(m_HttpClientsLock); + return m_HttpClients; +} + +static void LogAppVersion(unsigned long version, Log& log) +{ + log << version / 100u << "." << version % 100u << ".x"; +} + +Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + if (origin) { + auto client (origin->FromClient); + + if (client) { + auto endpoint (client->GetEndpoint()); + + if (endpoint) { + unsigned long nodeVersion = params->Get("version"); + + endpoint->SetIcingaVersion(nodeVersion); + endpoint->SetCapabilities((double)params->Get("capabilities")); + + if (nodeVersion == 0u) { + nodeVersion = 21200; + } + + if (endpoint->GetZone()->GetParent() == Zone::GetLocalZone()) { + switch (l_AppVersionInt / 100 - nodeVersion / 100) { + case 0: + case 1: + break; + default: + Log log (LogWarning, "ApiListener"); + log << "Unexpected Icinga version of endpoint '" << endpoint->GetName() << "': "; + + LogAppVersion(nodeVersion / 100u, log); + log << " Expected one of: "; + + LogAppVersion(l_AppVersionInt / 100u, log); + log << ", "; + + LogAppVersion((l_AppVersionInt / 100u - 1u), log); + } + } + } + } + } + + return Empty; +} + +Endpoint::Ptr ApiListener::GetLocalEndpoint() const +{ + return m_LocalEndpoint; +} + +void ApiListener::UpdateActivePackageStagesCache() +{ + std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock); + + for (auto package : ConfigPackageUtility::GetPackages()) { + String activeStage; + + try { + activeStage = ConfigPackageUtility::GetActiveStageFromFile(package); + } catch (const std::exception& ex) { + Log(LogCritical, "ApiListener") + << ex.what(); + continue; + } + + Log(LogNotice, "ApiListener") + << "Updating cache: Config package '" << package << "' has active stage '" << activeStage << "'."; + + m_ActivePackageStages[package] = activeStage; + } +} + +void ApiListener::CheckApiPackageIntegrity() +{ + std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock); + + for (auto package : ConfigPackageUtility::GetPackages()) { + String activeStage; + try { + activeStage = ConfigPackageUtility::GetActiveStageFromFile(package); + } catch (const std::exception& ex) { + /* An error means that the stage is broken, try to repair it. */ + auto it = m_ActivePackageStages.find(package); + + if (it == m_ActivePackageStages.end()) + continue; + + String activeStageCached = it->second; + + Log(LogInformation, "ApiListener") + << "Repairing broken API config package '" << package + << "', setting active stage '" << activeStageCached << "'."; + + ConfigPackageUtility::SetActiveStageToFile(package, activeStageCached); + } + } +} + +void ApiListener::SetActivePackageStage(const String& package, const String& stage) +{ + std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock); + m_ActivePackageStages[package] = stage; +} + +String ApiListener::GetActivePackageStage(const String& package) +{ + std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock); + + if (m_ActivePackageStages.find(package) == m_ActivePackageStages.end()) + BOOST_THROW_EXCEPTION(ScriptError("Package " + package + " has no active stage.")); + + return m_ActivePackageStages[package]; +} + +void ApiListener::RemoveActivePackageStage(const String& package) +{ + /* This is the rare occassion when a package has been deleted. */ + std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock); + + auto it = m_ActivePackageStages.find(package); + + if (it == m_ActivePackageStages.end()) + return; + + m_ActivePackageStages.erase(it); +} + +void ApiListener::ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<ApiListener>::ValidateTlsProtocolmin(lvalue, utils); + + try { + ResolveTlsProtocolVersion(lvalue()); + } catch (const std::exception& ex) { + BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, ex.what())); + } +} + +void ApiListener::ValidateTlsHandshakeTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils) +{ + ObjectImpl<ApiListener>::ValidateTlsHandshakeTimeout(lvalue, utils); + + if (lvalue() <= 0) + BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_handshake_timeout" }, "Value must be greater than 0.")); +} + +bool ApiListener::IsHACluster() +{ + Zone::Ptr zone = Zone::GetLocalZone(); + + if (!zone) + return false; + + return zone->IsSingleInstance(); +} + +/* Provide a helper function for zone origin name. */ +String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone) +{ + String fromZoneName; + + if (fromZone) { + fromZoneName = fromZone->GetName(); + } else { + Zone::Ptr lzone = Zone::GetLocalZone(); + + if (lzone) + fromZoneName = lzone->GetName(); + } + + return fromZoneName; +} + +void ApiListener::UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint) +{ + String path = Configuration::CacheDir + "/api-state.json"; + + Utility::SaveJsonFile(path, 0644, new Dictionary({ + {"host", String(localEndpoint.address().to_string())}, + {"port", localEndpoint.port()} + })); +} + +void ApiListener::RemoveStatusFile() +{ + String path = Configuration::CacheDir + "/api-state.json"; + + Utility::Remove(path); +} |