summaryrefslogtreecommitdiffstats
path: root/lib/icingadb/redisconnection.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 11:32:39 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 11:32:39 +0000
commit56ae875861ab260b80a030f50c4aff9f9dc8fff0 (patch)
tree531412110fc901a5918c7f7442202804a83cada9 /lib/icingadb/redisconnection.cpp
parentInitial commit. (diff)
downloadicinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.tar.xz
icinga2-56ae875861ab260b80a030f50c4aff9f9dc8fff0.zip
Adding upstream version 2.14.2.upstream/2.14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/icingadb/redisconnection.cpp')
-rw-r--r--lib/icingadb/redisconnection.cpp773
1 files changed, 773 insertions, 0 deletions
diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp
new file mode 100644
index 0000000..798a827
--- /dev/null
+++ b/lib/icingadb/redisconnection.cpp
@@ -0,0 +1,773 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "icingadb/redisconnection.hpp"
+#include "base/array.hpp"
+#include "base/convert.hpp"
+#include "base/defer.hpp"
+#include "base/exception.hpp"
+#include "base/io-engine.hpp"
+#include "base/logger.hpp"
+#include "base/objectlock.hpp"
+#include "base/string.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/tlsutility.hpp"
+#include "base/utility.hpp"
+#include <boost/asio.hpp>
+#include <boost/coroutine/exceptions.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/utility/string_view.hpp>
+#include <boost/variant/get.hpp>
+#include <exception>
+#include <future>
+#include <iterator>
+#include <memory>
+#include <openssl/ssl.h>
+#include <openssl/x509_vfy.h>
+#include <utility>
+
+using namespace icinga;
+namespace asio = boost::asio;
+
+boost::regex RedisConnection::m_ErrAuth ("\\AERR AUTH ");
+
+RedisConnection::RedisConnection(const String& host, int port, const String& path, const String& password, int db,
+ bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath,
+ const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent)
+ : RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db,
+ useTls, insecure, certPath, keyPath, caPath, crlPath, tlsProtocolmin, cipherList, connectTimeout, std::move(di), parent)
+{
+}
+
+RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password,
+ int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath,
+ String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent)
+ : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)),
+ m_DbIndex(db), m_CertPath(std::move(certPath)), m_KeyPath(std::move(keyPath)), m_Insecure(insecure),
+ m_CaPath(std::move(caPath)), m_CrlPath(std::move(crlPath)), m_TlsProtocolmin(std::move(tlsProtocolmin)),
+ m_CipherList(std::move(cipherList)), m_ConnectTimeout(connectTimeout), m_DebugInfo(std::move(di)), m_Connecting(false), m_Connected(false),
+ m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent)
+{
+ if (useTls && m_Path.IsEmpty()) {
+ UpdateTLSContext();
+ }
+}
+
+void RedisConnection::UpdateTLSContext()
+{
+ m_TLSContext = SetupSslContext(m_CertPath, m_KeyPath, m_CaPath,
+ m_CrlPath, m_CipherList, m_TlsProtocolmin, m_DebugInfo);
+}
+
+void RedisConnection::Start()
+{
+ if (!m_Started.exchange(true)) {
+ Ptr keepAlive (this);
+
+ IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
+ IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
+
+ if (!m_Parent) {
+ IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); });
+ }
+ }
+
+ if (!m_Connecting.exchange(true)) {
+ Ptr keepAlive (this);
+
+ IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
+ }
+}
+
+bool RedisConnection::IsConnected() {
+ return m_Connected.load();
+}
+
+/**
+ * Append a Redis query to a log message
+ *
+ * @param query Redis query
+ * @param msg Log message
+ */
+static inline
+void LogQuery(RedisConnection::Query& query, Log& msg)
+{
+ int i = 0;
+
+ for (auto& arg : query) {
+ if (++i == 8) {
+ msg << " ...";
+ break;
+ }
+
+ if (arg.GetLength() > 64) {
+ msg << " '" << arg.SubStr(0, 61) << "...'";
+ } else {
+ msg << " '" << arg << '\'';
+ }
+ }
+}
+
+/**
+ * Queue a Redis query for sending
+ *
+ * @param query Redis query
+ * @param priority The query's priority
+ */
+void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
+{
+ if (LogDebug >= Logger::GetMinLogSeverity()) {
+ Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
+ LogQuery(query, msg);
+ }
+
+ auto item (Shared<Query>::Make(std::move(query)));
+ auto ctime (Utility::GetTime());
+
+ asio::post(m_Strand, [this, item, priority, ctime, affects]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects});
+ m_QueuedWrites.Set();
+ IncreasePendingQueries(1);
+ });
+}
+
+/**
+ * Queue Redis queries for sending
+ *
+ * @param queries Redis queries
+ * @param priority The queries' priority
+ */
+void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
+{
+ if (LogDebug >= Logger::GetMinLogSeverity()) {
+ for (auto& query : queries) {
+ Log msg(LogDebug, "IcingaDB", "Firing and forgetting query:");
+ LogQuery(query, msg);
+ }
+ }
+
+ auto item (Shared<Queries>::Make(std::move(queries)));
+ auto ctime (Utility::GetTime());
+
+ asio::post(m_Strand, [this, item, priority, ctime, affects]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects});
+ m_QueuedWrites.Set();
+ IncreasePendingQueries(item->size());
+ });
+}
+
+/**
+ * Queue a Redis query for sending, wait for the response and return (or throw) it
+ *
+ * @param query Redis query
+ * @param priority The query's priority
+ *
+ * @return The response
+ */
+RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
+{
+ if (LogDebug >= Logger::GetMinLogSeverity()) {
+ Log msg (LogDebug, "IcingaDB", "Executing query:");
+ LogQuery(query, msg);
+ }
+
+ std::promise<Reply> promise;
+ auto future (promise.get_future());
+ auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
+ auto ctime (Utility::GetTime());
+
+ asio::post(m_Strand, [this, item, priority, ctime, affects]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects});
+ m_QueuedWrites.Set();
+ IncreasePendingQueries(1);
+ });
+
+ item = nullptr;
+ future.wait();
+ return future.get();
+}
+
+/**
+ * Queue Redis queries for sending, wait for the responses and return (or throw) them
+ *
+ * @param queries Redis queries
+ * @param priority The queries' priority
+ *
+ * @return The responses
+ */
+RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
+{
+ if (LogDebug >= Logger::GetMinLogSeverity()) {
+ for (auto& query : queries) {
+ Log msg(LogDebug, "IcingaDB", "Executing query:");
+ LogQuery(query, msg);
+ }
+ }
+
+ std::promise<Replies> promise;
+ auto future (promise.get_future());
+ auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
+ auto ctime (Utility::GetTime());
+
+ asio::post(m_Strand, [this, item, priority, ctime, affects]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects});
+ m_QueuedWrites.Set();
+ IncreasePendingQueries(item->first.size());
+ });
+
+ item = nullptr;
+ future.wait();
+ return future.get();
+}
+
+void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, RedisConnection::QueryPriority priority)
+{
+ auto ctime (Utility::GetTime());
+
+ asio::post(m_Strand, [this, callback, priority, ctime]() {
+ m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime});
+ m_QueuedWrites.Set();
+ });
+}
+
+/**
+ * Puts a no-op command with a result at the end of the queue and wait for the result,
+ * i.e. for everything enqueued to be processed by the server.
+ *
+ * @ingroup icingadb
+ */
+void RedisConnection::Sync()
+{
+ GetResultOfQuery({"PING"}, RedisConnection::QueryPriority::SyncConnection);
+}
+
+/**
+ * Get the enqueue time of the oldest still queued Redis query
+ *
+ * @return *nix timestamp or 0
+ */
+double RedisConnection::GetOldestPendingQueryTs()
+{
+ auto promise (Shared<std::promise<double>>::Make());
+ auto future (promise->get_future());
+
+ asio::post(m_Strand, [this, promise]() {
+ double oldest = 0;
+
+ for (auto& queue : m_Queues.Writes) {
+ if (m_SuppressedQueryKinds.find(queue.first) == m_SuppressedQueryKinds.end() && !queue.second.empty()) {
+ auto ctime (queue.second.front().CTime);
+
+ if (ctime < oldest || oldest == 0) {
+ oldest = ctime;
+ }
+ }
+ }
+
+ promise->set_value(oldest);
+ });
+
+ future.wait();
+ return future.get();
+}
+
+/**
+ * Mark kind as kind of queries not to actually send yet
+ *
+ * @param kind Query kind
+ */
+void RedisConnection::SuppressQueryKind(RedisConnection::QueryPriority kind)
+{
+ asio::post(m_Strand, [this, kind]() { m_SuppressedQueryKinds.emplace(kind); });
+}
+
+/**
+ * Unmark kind as kind of queries not to actually send yet
+ *
+ * @param kind Query kind
+ */
+void RedisConnection::UnsuppressQueryKind(RedisConnection::QueryPriority kind)
+{
+ asio::post(m_Strand, [this, kind]() {
+ m_SuppressedQueryKinds.erase(kind);
+ m_QueuedWrites.Set();
+ });
+}
+
+/**
+ * Try to connect to Redis
+ */
+void RedisConnection::Connect(asio::yield_context& yc)
+{
+ Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
+
+ boost::asio::deadline_timer timer (m_Strand.context());
+
+ auto waitForReadLoop ([this, &yc]() {
+ while (!m_Queues.FutureResponseActions.empty()) {
+ IoEngine::YieldCurrentCoroutine(yc);
+ }
+ });
+
+ for (;;) {
+ try {
+ if (m_Path.IsEmpty()) {
+ if (m_TLSContext) {
+ Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
+ << "Trying to connect to Redis server (async, TLS) on host '" << m_Host << ":" << m_Port << "'";
+
+ auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host));
+ auto& tlsConn (conn->next_layer());
+ auto connectTimeout (MakeTimeout(conn));
+ Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
+
+ icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc);
+ tlsConn.async_handshake(tlsConn.client, yc);
+
+ if (!m_Insecure) {
+ std::shared_ptr<X509> cert (tlsConn.GetPeerCertificate());
+
+ if (!cert) {
+ BOOST_THROW_EXCEPTION(std::runtime_error(
+ "Redis didn't present any TLS certificate."
+ ));
+ }
+
+ if (!tlsConn.IsVerifyOK()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error(
+ "TLS certificate validation failed: " + std::string(tlsConn.GetVerifyError())
+ ));
+ }
+ }
+
+ Handshake(conn, yc);
+ waitForReadLoop();
+ m_TlsConn = std::move(conn);
+ } else {
+ Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
+ << "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'";
+
+ auto conn (Shared<TcpConn>::Make(m_Strand.context()));
+ auto connectTimeout (MakeTimeout(conn));
+ Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
+
+ icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
+ Handshake(conn, yc);
+ waitForReadLoop();
+ m_TcpConn = std::move(conn);
+ }
+ } else {
+ Log(LogInformation, "IcingaDB")
+ << "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'";
+
+ auto conn (Shared<UnixConn>::Make(m_Strand.context()));
+ auto connectTimeout (MakeTimeout(conn));
+ Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
+
+ conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
+ Handshake(conn, yc);
+ waitForReadLoop();
+ m_UnixConn = std::move(conn);
+ }
+
+ m_Connected.store(true);
+
+ Log(m_Parent ? LogNotice : LogInformation, "IcingaDB", "Connected to Redis server");
+
+ // Operate on a copy so that the callback can set a new callback without destroying itself while running.
+ auto callback (m_ConnectedCallback);
+ if (callback) {
+ callback(yc);
+ }
+
+ break;
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ throw;
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "IcingaDB")
+ << "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what();
+ }
+
+ timer.expires_from_now(boost::posix_time::seconds(5));
+ timer.async_wait(yc);
+ }
+
+}
+
+/**
+ * Actually receive the responses to the Redis queries send by WriteItem() and handle them
+ */
+void RedisConnection::ReadLoop(asio::yield_context& yc)
+{
+ for (;;) {
+ m_QueuedReads.Wait(yc);
+
+ while (!m_Queues.FutureResponseActions.empty()) {
+ auto item (std::move(m_Queues.FutureResponseActions.front()));
+ m_Queues.FutureResponseActions.pop();
+
+ switch (item.Action) {
+ case ResponseAction::Ignore:
+ try {
+ for (auto i (item.Amount); i; --i) {
+ ReadOne(yc);
+ }
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ throw;
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "IcingaDB")
+ << "Error during receiving the response to a query which has been fired and forgotten: " << ex.what();
+
+ continue;
+ } catch (...) {
+ Log(LogCritical, "IcingaDB")
+ << "Error during receiving the response to a query which has been fired and forgotten";
+
+ continue;
+ }
+
+ break;
+ case ResponseAction::Deliver:
+ for (auto i (item.Amount); i; --i) {
+ auto promise (std::move(m_Queues.ReplyPromises.front()));
+ m_Queues.ReplyPromises.pop();
+
+ Reply reply;
+
+ try {
+ reply = ReadOne(yc);
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ throw;
+ } catch (...) {
+ promise.set_exception(std::current_exception());
+
+ continue;
+ }
+
+ promise.set_value(std::move(reply));
+ }
+
+ break;
+ case ResponseAction::DeliverBulk:
+ {
+ auto promise (std::move(m_Queues.RepliesPromises.front()));
+ m_Queues.RepliesPromises.pop();
+
+ Replies replies;
+ replies.reserve(item.Amount);
+
+ for (auto i (item.Amount); i; --i) {
+ try {
+ replies.emplace_back(ReadOne(yc));
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ throw;
+ } catch (...) {
+ promise.set_exception(std::current_exception());
+ break;
+ }
+ }
+
+ try {
+ promise.set_value(std::move(replies));
+ } catch (const std::future_error&) {
+ // Complaint about the above op is not allowed
+ // due to promise.set_exception() was already called
+ }
+ }
+ }
+ }
+
+ m_QueuedReads.Clear();
+ }
+}
+
+/**
+ * Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}()
+ */
+void RedisConnection::WriteLoop(asio::yield_context& yc)
+{
+ for (;;) {
+ m_QueuedWrites.Wait(yc);
+
+ WriteFirstOfHighestPrio:
+ for (auto& queue : m_Queues.Writes) {
+ if (m_SuppressedQueryKinds.find(queue.first) != m_SuppressedQueryKinds.end() || queue.second.empty()) {
+ continue;
+ }
+
+ auto next (std::move(queue.second.front()));
+ queue.second.pop();
+
+ WriteItem(yc, std::move(next));
+
+ goto WriteFirstOfHighestPrio;
+ }
+
+ m_QueuedWrites.Clear();
+ }
+}
+
+/**
+ * Periodically log current query performance
+ */
+void RedisConnection::LogStats(asio::yield_context& yc)
+{
+ double lastMessage = 0;
+
+ m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
+
+ for (;;) {
+ m_LogStatsTimer.async_wait(yc);
+ m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
+
+ if (!IsConnected())
+ continue;
+
+ auto now (Utility::GetTime());
+ bool timeoutReached = now - lastMessage >= 5 * 60;
+
+ if (m_PendingQueries < 1 && !timeoutReached)
+ continue;
+
+ auto output (round(m_OutputQueries.CalculateRate(now, 10)));
+
+ if (m_PendingQueries < output * 5 && !timeoutReached)
+ continue;
+
+ Log(LogInformation, "IcingaDB")
+ << "Pending queries: " << m_PendingQueries << " (Input: "
+ << round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)";
+
+ lastMessage = now;
+ }
+}
+
+/**
+ * Send next and schedule receiving the response
+ *
+ * @param next Redis queries
+ */
+void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
+{
+ if (next.FireAndForgetQuery) {
+ auto& item (*next.FireAndForgetQuery);
+ DecreasePendingQueries(1);
+
+ try {
+ WriteOne(item, yc);
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ throw;
+ } catch (const std::exception& ex) {
+ Log msg (LogCritical, "IcingaDB", "Error during sending query");
+ LogQuery(item, msg);
+ msg << " which has been fired and forgotten: " << ex.what();
+
+ return;
+ } catch (...) {
+ Log msg (LogCritical, "IcingaDB", "Error during sending query");
+ LogQuery(item, msg);
+ msg << " which has been fired and forgotten";
+
+ return;
+ }
+
+ if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
+ m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
+ } else {
+ ++m_Queues.FutureResponseActions.back().Amount;
+ }
+
+ m_QueuedReads.Set();
+ }
+
+ if (next.FireAndForgetQueries) {
+ auto& item (*next.FireAndForgetQueries);
+ size_t i = 0;
+
+ DecreasePendingQueries(item.size());
+
+ try {
+ for (auto& query : item) {
+ WriteOne(query, yc);
+ ++i;
+ }
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ throw;
+ } catch (const std::exception& ex) {
+ Log msg (LogCritical, "IcingaDB", "Error during sending query");
+ LogQuery(item[i], msg);
+ msg << " which has been fired and forgotten: " << ex.what();
+
+ return;
+ } catch (...) {
+ Log msg (LogCritical, "IcingaDB", "Error during sending query");
+ LogQuery(item[i], msg);
+ msg << " which has been fired and forgotten";
+
+ return;
+ }
+
+ if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
+ m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
+ } else {
+ m_Queues.FutureResponseActions.back().Amount += item.size();
+ }
+
+ m_QueuedReads.Set();
+ }
+
+ if (next.GetResultOfQuery) {
+ auto& item (*next.GetResultOfQuery);
+ DecreasePendingQueries(1);
+
+ try {
+ WriteOne(item.first, yc);
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ throw;
+ } catch (...) {
+ item.second.set_exception(std::current_exception());
+
+ return;
+ }
+
+ m_Queues.ReplyPromises.emplace(std::move(item.second));
+
+ if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
+ m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
+ } else {
+ ++m_Queues.FutureResponseActions.back().Amount;
+ }
+
+ m_QueuedReads.Set();
+ }
+
+ if (next.GetResultsOfQueries) {
+ auto& item (*next.GetResultsOfQueries);
+ DecreasePendingQueries(item.first.size());
+
+ try {
+ for (auto& query : item.first) {
+ WriteOne(query, yc);
+ }
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ throw;
+ } catch (...) {
+ item.second.set_exception(std::current_exception());
+
+ return;
+ }
+
+ m_Queues.RepliesPromises.emplace(std::move(item.second));
+ m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
+
+ m_QueuedReads.Set();
+ }
+
+ if (next.Callback) {
+ next.Callback(yc);
+ }
+
+ RecordAffected(next.Affects, Utility::GetTime());
+}
+
+/**
+ * Receive the response to a Redis query
+ *
+ * @return The response
+ */
+RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
+{
+ if (m_Path.IsEmpty()) {
+ if (m_TLSContext) {
+ return ReadOne(m_TlsConn, yc);
+ } else {
+ return ReadOne(m_TcpConn, yc);
+ }
+ } else {
+ return ReadOne(m_UnixConn, yc);
+ }
+}
+
+/**
+ * Send query
+ *
+ * @param query Redis query
+ */
+void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
+{
+ if (m_Path.IsEmpty()) {
+ if (m_TLSContext) {
+ WriteOne(m_TlsConn, query, yc);
+ } else {
+ WriteOne(m_TcpConn, query, yc);
+ }
+ } else {
+ WriteOne(m_UnixConn, query, yc);
+ }
+}
+
+/**
+ * Specify a callback that is run each time a connection is successfully established
+ *
+ * The callback is executed from a Boost.Asio coroutine and should therefore not perform blocking operations.
+ *
+ * @param callback Callback to execute
+ */
+void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) {
+ m_ConnectedCallback = std::move(callback);
+}
+
+int RedisConnection::GetQueryCount(RingBuffer::SizeType span)
+{
+ return m_OutputQueries.UpdateAndGetValues(Utility::GetTime(), span);
+}
+
+void RedisConnection::IncreasePendingQueries(int count)
+{
+ if (m_Parent) {
+ auto parent (m_Parent);
+
+ asio::post(parent->m_Strand, [parent, count]() {
+ parent->IncreasePendingQueries(count);
+ });
+ } else {
+ m_PendingQueries += count;
+ m_InputQueries.InsertValue(Utility::GetTime(), count);
+ }
+}
+
+void RedisConnection::DecreasePendingQueries(int count)
+{
+ if (m_Parent) {
+ auto parent (m_Parent);
+
+ asio::post(parent->m_Strand, [parent, count]() {
+ parent->DecreasePendingQueries(count);
+ });
+ } else {
+ m_PendingQueries -= count;
+ m_OutputQueries.InsertValue(Utility::GetTime(), count);
+ }
+}
+
+void RedisConnection::RecordAffected(RedisConnection::QueryAffects affected, double when)
+{
+ if (m_Parent) {
+ auto parent (m_Parent);
+
+ asio::post(parent->m_Strand, [parent, affected, when]() {
+ parent->RecordAffected(affected, when);
+ });
+ } else {
+ if (affected.Config) {
+ m_WrittenConfig.InsertValue(when, affected.Config);
+ }
+
+ if (affected.State) {
+ m_WrittenState.InsertValue(when, affected.State);
+ }
+
+ if (affected.History) {
+ m_WrittenHistory.InsertValue(when, affected.History);
+ }
+ }
+}