summaryrefslogtreecommitdiffstats
path: root/lib/remote/jsonrpcconnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/remote/jsonrpcconnection.cpp')
-rw-r--r--lib/remote/jsonrpcconnection.cpp388
1 files changed, 388 insertions, 0 deletions
diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp
new file mode 100644
index 0000000..3bae3ca
--- /dev/null
+++ b/lib/remote/jsonrpcconnection.cpp
@@ -0,0 +1,388 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "remote/jsonrpcconnection.hpp"
+#include "remote/apilistener.hpp"
+#include "remote/apifunction.hpp"
+#include "remote/jsonrpc.hpp"
+#include "base/defer.hpp"
+#include "base/configtype.hpp"
+#include "base/io-engine.hpp"
+#include "base/json.hpp"
+#include "base/objectlock.hpp"
+#include "base/utility.hpp"
+#include "base/logger.hpp"
+#include "base/exception.hpp"
+#include "base/convert.hpp"
+#include "base/tlsstream.hpp"
+#include <memory>
+#include <utility>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/system/system_error.hpp>
+#include <boost/thread/once.hpp>
+
+using namespace icinga;
+
+static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
+REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
+
+static RingBuffer l_TaskStats (15 * 60);
+
+JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
+ const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role)
+ : JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
+{
+}
+
+JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
+ const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io)
+ : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
+ m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(io),
+ m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),
+ m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
+{
+ if (authenticated)
+ m_Endpoint = Endpoint::GetByName(identity);
+}
+
+void JsonRpcConnection::Start()
+{
+ namespace asio = boost::asio;
+
+ JsonRpcConnection::Ptr keepAlive (this);
+
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
+}
+
+void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
+{
+ m_Stream->next_layer().SetSeen(&m_Seen);
+
+ for (;;) {
+ String message;
+
+ try {
+ message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
+ } catch (const std::exception& ex) {
+ Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
+ << "Error while reading JSON-RPC message for identity '" << m_Identity
+ << "': " << DiagnosticInformation(ex);
+
+ break;
+ }
+
+ m_Seen = Utility::GetTime();
+
+ try {
+ CpuBoundWork handleMessage (yc);
+
+ MessageHandler(message);
+ } catch (const std::exception& ex) {
+ Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
+ << "Error while processing JSON-RPC message for identity '" << m_Identity
+ << "': " << DiagnosticInformation(ex);
+
+ break;
+ }
+
+ CpuBoundWork taskStats (yc);
+
+ l_TaskStats.InsertValue(Utility::GetTime(), 1);
+ }
+
+ Disconnect();
+}
+
+void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
+{
+ Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
+
+ do {
+ m_OutgoingMessagesQueued.Wait(yc);
+
+ auto queue (std::move(m_OutgoingMessagesQueue));
+
+ m_OutgoingMessagesQueue.clear();
+ m_OutgoingMessagesQueued.Clear();
+
+ if (!queue.empty()) {
+ try {
+ for (auto& message : queue) {
+ size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
+
+ if (m_Endpoint) {
+ m_Endpoint->AddMessageSent(bytesSent);
+ }
+ }
+
+ m_Stream->async_flush(yc);
+ } catch (const std::exception& ex) {
+ Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
+ << "Error while sending JSON-RPC message for identity '"
+ << m_Identity << "'\n" << DiagnosticInformation(ex);
+
+ break;
+ }
+ }
+ } while (!m_ShuttingDown);
+
+ Disconnect();
+}
+
+double JsonRpcConnection::GetTimestamp() const
+{
+ return m_Timestamp;
+}
+
+String JsonRpcConnection::GetIdentity() const
+{
+ return m_Identity;
+}
+
+bool JsonRpcConnection::IsAuthenticated() const
+{
+ return m_Authenticated;
+}
+
+Endpoint::Ptr JsonRpcConnection::GetEndpoint() const
+{
+ return m_Endpoint;
+}
+
+Shared<AsioTlsStream>::Ptr JsonRpcConnection::GetStream() const
+{
+ return m_Stream;
+}
+
+ConnectionRole JsonRpcConnection::GetRole() const
+{
+ return m_Role;
+}
+
+void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
+{
+ Ptr keepAlive (this);
+
+ m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
+}
+
+void JsonRpcConnection::SendRawMessage(const String& message)
+{
+ Ptr keepAlive (this);
+
+ m_IoStrand.post([this, keepAlive, message]() {
+ m_OutgoingMessagesQueue.emplace_back(message);
+ m_OutgoingMessagesQueued.Set();
+ });
+}
+
+void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
+{
+ m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
+ m_OutgoingMessagesQueued.Set();
+}
+
+void JsonRpcConnection::Disconnect()
+{
+ namespace asio = boost::asio;
+
+ JsonRpcConnection::Ptr keepAlive (this);
+
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
+ if (!m_ShuttingDown) {
+ m_ShuttingDown = true;
+
+ Log(LogWarning, "JsonRpcConnection")
+ << "API client disconnected for identity '" << m_Identity << "'";
+
+ {
+ CpuBoundWork removeClient (yc);
+
+ if (m_Endpoint) {
+ m_Endpoint->RemoveClient(this);
+ } else {
+ ApiListener::GetInstance()->RemoveAnonymousClient(this);
+ }
+ }
+
+ m_OutgoingMessagesQueued.Set();
+
+ m_WriterDone.Wait(yc);
+
+ /*
+ * Do not swallow exceptions in a coroutine.
+ * https://github.com/Icinga/icinga2/issues/7351
+ * We must not catch `detail::forced_unwind exception` as
+ * this is used for unwinding the stack.
+ *
+ * Just use the error_code dummy here.
+ */
+ boost::system::error_code ec;
+
+ m_CheckLivenessTimer.cancel();
+ m_HeartbeatTimer.cancel();
+
+ m_Stream->lowest_layer().cancel(ec);
+
+ Timeout::Ptr shutdownTimeout (new Timeout(
+ m_IoStrand.context(),
+ m_IoStrand,
+ boost::posix_time::seconds(10),
+ [this, keepAlive](asio::yield_context yc) {
+ boost::system::error_code ec;
+ m_Stream->lowest_layer().cancel(ec);
+ }
+ ));
+
+ m_Stream->next_layer().async_shutdown(yc[ec]);
+
+ shutdownTimeout->Cancel();
+
+ m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
+ }
+ });
+}
+
+void JsonRpcConnection::MessageHandler(const String& jsonString)
+{
+ Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
+
+ if (m_Endpoint && message->Contains("ts")) {
+ double ts = message->Get("ts");
+
+ /* ignore old messages */
+ if (ts < m_Endpoint->GetRemoteLogPosition())
+ return;
+
+ m_Endpoint->SetRemoteLogPosition(ts);
+ }
+
+ MessageOrigin::Ptr origin = new MessageOrigin();
+ origin->FromClient = this;
+
+ if (m_Endpoint) {
+ if (m_Endpoint->GetZone() != Zone::GetLocalZone())
+ origin->FromZone = m_Endpoint->GetZone();
+ else
+ origin->FromZone = Zone::GetByName(message->Get("originZone"));
+
+ m_Endpoint->AddMessageReceived(jsonString.GetLength());
+ }
+
+ Value vmethod;
+
+ if (!message->Get("method", &vmethod)) {
+ Value vid;
+
+ if (!message->Get("id", &vid))
+ return;
+
+ Log(LogWarning, "JsonRpcConnection",
+ "We received a JSON-RPC response message. This should never happen because we're only ever sending notifications.");
+
+ return;
+ }
+
+ String method = vmethod;
+
+ Log(LogNotice, "JsonRpcConnection")
+ << "Received '" << method << "' message from identity '" << m_Identity << "'.";
+
+ Dictionary::Ptr resultMessage = new Dictionary();
+
+ try {
+ ApiFunction::Ptr afunc = ApiFunction::GetByName(method);
+
+ if (!afunc) {
+ Log(LogNotice, "JsonRpcConnection")
+ << "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
+ } else {
+ Dictionary::Ptr params = message->Get("params");
+ if (params)
+ resultMessage->Set("result", afunc->Invoke(origin, params));
+ else
+ resultMessage->Set("result", Empty);
+ }
+ } catch (const std::exception& ex) {
+ /* TODO: Add a user readable error message for the remote caller */
+ String diagInfo = DiagnosticInformation(ex);
+ resultMessage->Set("error", diagInfo);
+ Log(LogWarning, "JsonRpcConnection")
+ << "Error while processing message for identity '" << m_Identity << "'\n" << diagInfo;
+ }
+
+ if (message->Contains("id")) {
+ resultMessage->Set("jsonrpc", "2.0");
+ resultMessage->Set("id", message->Get("id"));
+
+ SendMessageInternal(resultMessage);
+ }
+}
+
+Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
+{
+ double log_position = params->Get("log_position");
+ Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint();
+
+ if (!endpoint)
+ return Empty;
+
+ if (log_position > endpoint->GetLocalLogPosition())
+ endpoint->SetLocalLogPosition(log_position);
+
+ return Empty;
+}
+
+void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
+{
+ boost::system::error_code ec;
+
+ if (!m_Authenticated) {
+ /* Anonymous connections are normally only used for requesting a certificate and are closed after this request
+ * is received. However, the request is only sent if the child has successfully verified the certificate of its
+ * parent so that it is an authenticated connection from its perspective. In case this verification fails, both
+ * ends view it as an anonymous connection and never actually use it but attempt a reconnect after 10 seconds
+ * leaking the connection. Therefore close it after a timeout.
+ */
+
+ m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10));
+ m_CheckLivenessTimer.async_wait(yc[ec]);
+
+ if (m_ShuttingDown) {
+ return;
+ }
+
+ auto remote (m_Stream->lowest_layer().remote_endpoint());
+
+ Log(LogInformation, "JsonRpcConnection")
+ << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after 10 seconds.";
+
+ Disconnect();
+ } else {
+ for (;;) {
+ m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30));
+ m_CheckLivenessTimer.async_wait(yc[ec]);
+
+ if (m_ShuttingDown) {
+ break;
+ }
+
+ if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
+ Log(LogInformation, "JsonRpcConnection")
+ << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
+
+ Disconnect();
+ break;
+ }
+ }
+ }
+}
+
+double JsonRpcConnection::GetWorkQueueRate()
+{
+ return l_TaskStats.UpdateAndGetValues(Utility::GetTime(), 60) / 60.0;
+}