/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #ifndef REDISCONNECTION_H #define REDISCONNECTION_H #include "base/array.hpp" #include "base/atomic.hpp" #include "base/convert.hpp" #include "base/io-engine.hpp" #include "base/object.hpp" #include "base/ringbuffer.hpp" #include "base/shared.hpp" #include "base/string.hpp" #include "base/tlsstream.hpp" #include "base/value.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace icinga { /** * An Async Redis connection. * * @ingroup icingadb */ class RedisConnection final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisConnection); typedef std::vector Query; typedef std::vector Queries; typedef Value Reply; typedef std::vector Replies; /** * Redis query priorities, highest first. * * @ingroup icingadb */ enum class QueryPriority : unsigned char { Heartbeat, RuntimeStateStream, // runtime state updates, doesn't affect initially synced states Config, // includes initially synced states RuntimeStateSync, // updates initially synced states at runtime, in parallel to config dump, therefore must be < Config History, CheckResult, SyncConnection = 255 }; struct QueryAffects { size_t Config; size_t State; size_t History; QueryAffects(size_t config = 0, size_t state = 0, size_t history = 0) : Config(config), State(state), History(history) { } }; RedisConnection(const String& host, int port, const String& path, const String& password, int db, bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath, const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const Ptr& parent = nullptr); void UpdateTLSContext(); void Start(); bool IsConnected(); void FireAndForgetQuery(Query query, QueryPriority priority, QueryAffects affects = {}); void FireAndForgetQueries(Queries queries, QueryPriority priority, QueryAffects affects = {}); Reply GetResultOfQuery(Query query, QueryPriority priority, QueryAffects affects = {}); Replies GetResultsOfQueries(Queries queries, QueryPriority priority, QueryAffects affects = {}); void EnqueueCallback(const std::function& callback, QueryPriority priority); void Sync(); double GetOldestPendingQueryTs(); void SuppressQueryKind(QueryPriority kind); void UnsuppressQueryKind(QueryPriority kind); void SetConnectedCallback(std::function callback); inline bool GetConnected() { return m_Connected.load(); } int GetQueryCount(RingBuffer::SizeType span); inline int GetPendingQueryCount() { return m_PendingQueries; } inline int GetWrittenConfigFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime()) { return m_WrittenConfig.UpdateAndGetValues(tv, span); } inline int GetWrittenStateFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime()) { return m_WrittenState.UpdateAndGetValues(tv, span); } inline int GetWrittenHistoryFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime()) { return m_WrittenHistory.UpdateAndGetValues(tv, span); } private: /** * What to do with the responses to Redis queries. * * @ingroup icingadb */ enum class ResponseAction : unsigned char { Ignore, // discard Deliver, // submit to the requestor DeliverBulk // submit multiple responses to the requestor at once }; /** * What to do with how many responses to Redis queries. * * @ingroup icingadb */ struct FutureResponseAction { size_t Amount; ResponseAction Action; }; /** * Something to be send to Redis. * * @ingroup icingadb */ struct WriteQueueItem { Shared::Ptr FireAndForgetQuery; Shared::Ptr FireAndForgetQueries; Shared>>::Ptr GetResultOfQuery; Shared>>::Ptr GetResultsOfQueries; std::function Callback; double CTime; QueryAffects Affects; }; typedef boost::asio::ip::tcp Tcp; typedef boost::asio::local::stream_protocol Unix; typedef boost::asio::buffered_stream TcpConn; typedef boost::asio::buffered_stream UnixConn; Shared::Ptr m_TLSContext; template static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc); template static std::vector ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0); template static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); static boost::regex m_ErrAuth; RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath, String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const Ptr& parent); void Connect(boost::asio::yield_context& yc); void ReadLoop(boost::asio::yield_context& yc); void WriteLoop(boost::asio::yield_context& yc); void LogStats(boost::asio::yield_context& yc); void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item); Reply ReadOne(boost::asio::yield_context& yc); void WriteOne(Query& query, boost::asio::yield_context& yc); template Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc); template void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc); void IncreasePendingQueries(int count); void DecreasePendingQueries(int count); void RecordAffected(QueryAffects affected, double when); template void Handshake(StreamPtr& stream, boost::asio::yield_context& yc); template Timeout::Ptr MakeTimeout(StreamPtr& stream); String m_Path; String m_Host; int m_Port; String m_Password; int m_DbIndex; String m_CertPath; String m_KeyPath; bool m_Insecure; String m_CaPath; String m_CrlPath; String m_TlsProtocolmin; String m_CipherList; double m_ConnectTimeout; DebugInfo m_DebugInfo; boost::asio::io_context::strand m_Strand; Shared::Ptr m_TcpConn; Shared::Ptr m_UnixConn; Shared::Ptr m_TlsConn; Atomic m_Connecting, m_Connected, m_Started; struct { // Items to be send to Redis std::map> Writes; // Requestors, each waiting for a single response std::queue> ReplyPromises; // Requestors, each waiting for multiple responses at once std::queue> RepliesPromises; // Metadata about all of the above std::queue FutureResponseActions; } m_Queues; // Kinds of queries not to actually send yet std::set m_SuppressedQueryKinds; // Indicate that there's something to send/receive AsioConditionVariable m_QueuedWrites, m_QueuedReads; std::function m_ConnectedCallback; // Stats RingBuffer m_InputQueries{10}; RingBuffer m_OutputQueries{15 * 60}; RingBuffer m_WrittenConfig{15 * 60}; RingBuffer m_WrittenState{15 * 60}; RingBuffer m_WrittenHistory{15 * 60}; int m_PendingQueries{0}; boost::asio::deadline_timer m_LogStatsTimer; Ptr m_Parent; }; /** * An error response from the Redis server. * * @ingroup icingadb */ class RedisError final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisError); inline RedisError(String message) : m_Message(std::move(message)) { } inline const String& GetMessage() { return m_Message; } private: String m_Message; }; /** * Thrown if the connection to the Redis server has already been lost. * * @ingroup icingadb */ class RedisDisconnected : public std::runtime_error { public: inline RedisDisconnected() : runtime_error("") { } }; /** * Thrown on malformed Redis server responses. * * @ingroup icingadb */ class RedisProtocolError : public std::runtime_error { protected: inline RedisProtocolError() : runtime_error("") { } }; /** * Thrown on malformed types in Redis server responses. * * @ingroup icingadb */ class BadRedisType : public RedisProtocolError { public: inline BadRedisType(char type) : m_What{type, 0} { } virtual const char * what() const noexcept override { return m_What; } private: char m_What[2]; }; /** * Thrown on malformed ints in Redis server responses. * * @ingroup icingadb */ class BadRedisInt : public RedisProtocolError { public: inline BadRedisInt(std::vector intStr) : m_What(std::move(intStr)) { m_What.emplace_back(0); } virtual const char * what() const noexcept override { return m_What.data(); } private: std::vector m_What; }; /** * Read a Redis server response from stream * * @param stream Redis server connection * * @return The response */ template RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) { namespace asio = boost::asio; if (!stream) { throw RedisDisconnected(); } auto strm (stream); try { return ReadRESP(*strm, yc); } catch (const boost::coroutines::detail::forced_unwind&) { throw; } catch (...) { if (m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; if (!m_Connecting.exchange(true)) { Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } } throw; } } /** * Write a Redis query to stream * * @param stream Redis server connection * @param query Redis query */ template void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) { namespace asio = boost::asio; if (!stream) { throw RedisDisconnected(); } auto strm (stream); try { WriteRESP(*strm, query, yc); strm->async_flush(yc); } catch (const boost::coroutines::detail::forced_unwind&) { throw; } catch (...) { if (m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; if (!m_Connecting.exchange(true)) { Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } } throw; } } /** * Initialize a Redis stream * * @param stream Redis server connection * @param query Redis query */ template void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) { if (m_Password.IsEmpty() && !m_DbIndex) { // Trigger NOAUTH WriteRESP(*strm, {"PING"}, yc); } else { if (!m_Password.IsEmpty()) { WriteRESP(*strm, {"AUTH", m_Password}, yc); } if (m_DbIndex) { WriteRESP(*strm, {"SELECT", Convert::ToString(m_DbIndex)}, yc); } } strm->async_flush(yc); if (m_Password.IsEmpty() && !m_DbIndex) { Reply pong (ReadRESP(*strm, yc)); if (pong.IsObjectType()) { // Likely NOAUTH BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(pong)->GetMessage())); } } else { if (!m_Password.IsEmpty()) { Reply auth (ReadRESP(*strm, yc)); if (auth.IsObjectType()) { auto& authErr (RedisError::Ptr(auth)->GetMessage().GetData()); boost::smatch what; if (boost::regex_search(authErr, what, m_ErrAuth)) { Log(LogWarning, "IcingaDB") << authErr; } else { // Likely WRONGPASS BOOST_THROW_EXCEPTION(std::runtime_error(authErr)); } } } if (m_DbIndex) { Reply select (ReadRESP(*strm, yc)); if (select.IsObjectType()) { // Likely NOAUTH or ERR DB BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(select)->GetMessage())); } } } } /** * Creates a Timeout which cancels stream's I/O after m_ConnectTimeout * * @param stream Redis server connection */ template Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream) { Ptr keepAlive (this); return new Timeout( m_Strand.context(), m_Strand, boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)), [keepAlive, stream](boost::asio::yield_context yc) { boost::system::error_code ec; stream->lowest_layer().cancel(ec); } ); } /** * Read a Redis protocol value from stream * * @param stream Redis server connection * * @return The value */ template Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) { namespace asio = boost::asio; char type = 0; asio::async_read(stream, asio::mutable_buffer(&type, 1), yc); switch (type) { case '+': { auto buf (ReadLine(stream, yc)); return String(buf.begin(), buf.end()); } case '-': { auto buf (ReadLine(stream, yc)); return new RedisError(String(buf.begin(), buf.end())); } case ':': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } return (double)i; } case '$': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } if (i < 0) { return Value(); } buf.clear(); buf.insert(buf.end(), i, 0); asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc); { char crlf[2]; asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc); } return String(buf.begin(), buf.end()); } case '*': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } if (i < 0) { return Empty; } Array::Ptr arr = new Array(); arr->Reserve(i); for (; i; --i) { arr->Add(ReadRESP(stream, yc)); } return arr; } default: throw BadRedisType(type); } } /** * Read from stream until \r\n * * @param stream Redis server connection * @param hint Expected amount of data * * @return Read data ex. \r\n */ template std::vector RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) { namespace asio = boost::asio; std::vector line; line.reserve(hint); char next = 0; asio::mutable_buffer buf (&next, 1); for (;;) { asio::async_read(stream, buf, yc); if (next == '\r') { asio::async_read(stream, buf, yc); return line; } line.emplace_back(next); } } /** * Write a Redis protocol value to stream * * @param stream Redis server connection * @param query Redis protocol value */ template void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) { namespace asio = boost::asio; asio::streambuf writeBuffer; std::ostream msg(&writeBuffer); msg << "*" << query.size() << "\r\n"; for (auto& arg : query) { msg << "$" << arg.GetLength() << "\r\n" << arg << "\r\n"; } asio::async_write(stream, writeBuffer, yc); } } #endif //REDISCONNECTION_H