summaryrefslogtreecommitdiffstats
path: root/dnsdist-tcp-downstream.hh
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 09:34:30 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 09:34:30 +0000
commit4fc2f55f761d71aae1f145d5aa94ba929cc39676 (patch)
tree5c1e1db3b46dd4edbe11f612d93cb94b96891ce3 /dnsdist-tcp-downstream.hh
parentInitial commit. (diff)
downloaddnsdist-4fc2f55f761d71aae1f145d5aa94ba929cc39676.tar.xz
dnsdist-4fc2f55f761d71aae1f145d5aa94ba929cc39676.zip
Adding upstream version 1.7.3.upstream/1.7.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'dnsdist-tcp-downstream.hh')
-rw-r--r--dnsdist-tcp-downstream.hh590
1 files changed, 590 insertions, 0 deletions
diff --git a/dnsdist-tcp-downstream.hh b/dnsdist-tcp-downstream.hh
new file mode 100644
index 0000000..547d266
--- /dev/null
+++ b/dnsdist-tcp-downstream.hh
@@ -0,0 +1,590 @@
+#pragma once
+
+#include <queue>
+
+#include "sstuff.hh"
+#include "tcpiohandler-mplexer.hh"
+#include "dnsdist.hh"
+#include "dnsdist-tcp.hh"
+
+class ConnectionToBackend : public std::enable_shared_from_this<ConnectionToBackend>
+{
+public:
+ ConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->tcpFastOpen)
+ {
+ reconnect();
+ }
+
+ virtual ~ConnectionToBackend();
+
+ int getHandle() const
+ {
+ if (!d_handler) {
+ throw std::runtime_error("Attempt to get the socket handle from a non-established TCP connection");
+ }
+
+ return d_handler->getDescriptor();
+ }
+
+ /* whether the underlying socket has been closed under our feet, basically */
+ bool isUsable() const
+ {
+ if (!d_handler) {
+ return false;
+ }
+
+ return d_handler->isUsable();
+ }
+
+ const std::shared_ptr<DownstreamState>& getDS() const
+ {
+ return d_ds;
+ }
+
+ const ComboAddress& getRemote() const
+ {
+ return d_ds->remote;
+ }
+
+ const std::string& getBackendName() const
+ {
+ return d_ds->getName();
+ }
+
+ bool isFresh() const
+ {
+ return d_fresh;
+ }
+
+ void setReused()
+ {
+ d_fresh = false;
+ }
+
+ void disableFastOpen()
+ {
+ d_enableFastOpen = false;
+ }
+
+ bool isFastOpenEnabled()
+ {
+ return d_enableFastOpen;
+ }
+
+ /* whether a connection can be used now */
+ bool canBeReused(bool sameClient = false) const
+ {
+ if (d_connectionDied) {
+ return false;
+ }
+
+ /* we can't reuse a connection where a proxy protocol payload has been sent,
+ since:
+ - it cannot be reused for a different client
+ - we might have different TLV values for each query
+ */
+ if (d_ds && d_ds->useProxyProtocol == true && !sameClient) {
+ return false;
+ }
+
+ if (reachedMaxStreamID()) {
+ return false;
+ }
+
+ if (reachedMaxConcurrentQueries()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /* full now but will become usable later */
+ bool willBeReusable(bool sameClient) const
+ {
+ if (d_connectionDied || reachedMaxStreamID()) {
+ return false;
+ }
+
+ if (d_ds && d_ds->useProxyProtocol == true) {
+ return sameClient;
+ }
+
+ return true;
+ }
+
+ virtual bool reachedMaxStreamID() const = 0;
+ virtual bool reachedMaxConcurrentQueries() const = 0;
+ virtual bool isIdle() const = 0;
+ virtual void release() = 0;
+ virtual void stopIO()
+ {
+ }
+
+ bool matches(const std::shared_ptr<DownstreamState>& ds) const
+ {
+ if (!ds || !d_ds) {
+ return false;
+ }
+ return ds == d_ds;
+ }
+
+ virtual void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) = 0;
+ virtual void handleTimeout(const struct timeval& now, bool write) = 0;
+
+ struct timeval getLastDataReceivedTime() const
+ {
+ return d_lastDataReceivedTime;
+ }
+
+ virtual std::string toString() const = 0;
+
+protected:
+ bool reconnect();
+
+ boost::optional<struct timeval> getBackendHealthCheckTTD(const struct timeval& now) const
+ {
+ if (d_ds == nullptr) {
+ throw std::runtime_error("getBackendReadTTD() without any backend selected");
+ }
+ if (d_ds->checkTimeout == 0) {
+ return boost::none;
+ }
+
+ struct timeval res = now;
+ res.tv_sec += d_ds->checkTimeout / 1000; /* ms to s */
+ res.tv_usec += (d_ds->checkTimeout % 1000) / 1000; /* remaining ms to µs */
+
+ return res;
+ }
+
+ boost::optional<struct timeval> getBackendReadTTD(const struct timeval& now) const
+ {
+ if (d_ds == nullptr) {
+ throw std::runtime_error("getBackendReadTTD() without any backend selected");
+ }
+ if (d_ds->tcpRecvTimeout == 0) {
+ return boost::none;
+ }
+
+ struct timeval res = now;
+ res.tv_sec += d_ds->tcpRecvTimeout;
+
+ return res;
+ }
+
+ boost::optional<struct timeval> getBackendWriteTTD(const struct timeval& now) const
+ {
+ if (d_ds == nullptr) {
+ throw std::runtime_error("getBackendWriteTTD() called without any backend selected");
+ }
+ if (d_ds->tcpSendTimeout == 0) {
+ return boost::none;
+ }
+
+ struct timeval res = now;
+ res.tv_sec += d_ds->tcpSendTimeout;
+
+ return res;
+ }
+
+ boost::optional<struct timeval> getBackendConnectTTD(const struct timeval& now) const
+ {
+ if (d_ds == nullptr) {
+ throw std::runtime_error("getBackendConnectTTD() called without any backend selected");
+ }
+ if (d_ds->tcpConnectTimeout == 0) {
+ return boost::none;
+ }
+
+ struct timeval res = now;
+ res.tv_sec += d_ds->tcpConnectTimeout;
+
+ return res;
+ }
+
+ struct timeval d_connectionStartTime;
+ struct timeval d_lastDataReceivedTime;
+ const std::shared_ptr<DownstreamState> d_ds{nullptr};
+ std::shared_ptr<TCPQuerySender> d_sender{nullptr};
+ std::unique_ptr<FDMultiplexer>& d_mplexer;
+ std::unique_ptr<TCPIOHandler> d_handler{nullptr};
+ std::unique_ptr<IOStateHandler> d_ioState{nullptr};
+ uint64_t d_queries{0};
+ uint32_t d_highestStreamID{0};
+ uint16_t d_downstreamFailures{0};
+ bool d_proxyProtocolPayloadSent{false};
+ bool d_enableFastOpen{false};
+ bool d_connectionDied{false};
+ bool d_fresh{true};
+};
+
+class TCPConnectionToBackend : public ConnectionToBackend
+{
+public:
+ TCPConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& /* proxyProtocolPayload*, unused but there to match the HTTP2 connections, so we can use the same templated connections manager class */): ConnectionToBackend(ds, mplexer, now), d_responseBuffer(s_maxPacketCacheEntrySize)
+ {
+ }
+
+ virtual ~TCPConnectionToBackend();
+
+ bool isIdle() const override
+ {
+ return d_state == State::idle && d_pendingQueries.size() == 0 && d_pendingResponses.size() == 0;
+ }
+
+ bool reachedMaxStreamID() const override
+ {
+ /* TCP/DoT has only 2^16 usable identifiers, DoH has 2^32 */
+ const uint32_t maximumStreamID = std::numeric_limits<uint16_t>::max() - 1;
+ return d_highestStreamID == maximumStreamID;
+ }
+
+ bool reachedMaxConcurrentQueries() const override
+ {
+ const size_t concurrent = d_pendingQueries.size() + d_pendingResponses.size();
+ if (concurrent > 0 && concurrent >= d_ds->d_maxInFlightQueriesPerConn) {
+ return true;
+ }
+ return false;
+ }
+ bool matchesTLVs(const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs) const;
+
+ void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
+ void handleTimeout(const struct timeval& now, bool write) override;
+ void release() override;
+
+ std::string toString() const override
+ {
+ ostringstream o;
+ o << "TCP connection to backend "<<(d_ds ? d_ds->getName() : "empty")<<" over FD "<<(d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket")<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? d_ioState->getState() : "empty")<<", queries count is "<<d_queries<<", pending queries count is "<<d_pendingQueries.size()<<", "<<d_pendingResponses.size()<<" pending responses";
+ return o.str();
+ }
+
+ void setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent);
+
+private:
+ /* waitingForResponseFromBackend is a state where we have not yet started reading the size,
+ so we can still switch to sending instead */
+ enum class State : uint8_t { idle, sendingQueryToBackend, waitingForResponseFromBackend, readingResponseSizeFromBackend, readingResponseFromBackend };
+ enum class FailureReason : uint8_t { /* too many attempts */ gaveUp, timeout, unexpectedQueryID };
+
+ static void handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
+ static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
+ static IOState queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn);
+ static IOState sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
+ static bool isXFRFinished(const TCPResponse& response, TCPQuery& query);
+
+ IOState handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
+ uint16_t getQueryIdFromResponse() const;
+ void notifyAllQueriesFailed(const struct timeval& now, FailureReason reason);
+ bool needProxyProtocolPayload() const
+ {
+ return !d_proxyProtocolPayloadSent && (d_ds && d_ds->useProxyProtocol);
+ }
+
+ class PendingRequest
+ {
+ public:
+ std::shared_ptr<TCPQuerySender> d_sender{nullptr};
+ TCPQuery d_query;
+ };
+
+ PacketBuffer d_responseBuffer;
+ std::deque<PendingRequest> d_pendingQueries;
+ std::unordered_map<uint16_t, PendingRequest> d_pendingResponses;
+ std::unique_ptr<std::vector<ProxyProtocolValue>> d_proxyProtocolValuesSent{nullptr};
+ PendingRequest d_currentQuery;
+ size_t d_currentPos{0};
+ uint16_t d_responseSize{0};
+ State d_state{State::idle};
+};
+
+template <class T> class DownstreamConnectionsManager
+{
+ struct SequencedTag {};
+ struct OrderedTag {};
+
+ typedef multi_index_container<
+ std::shared_ptr<T>,
+ indexed_by <
+ ordered_unique<tag<OrderedTag>,
+ identity<std::shared_ptr<T>>
+ >,
+ /* new elements are added to the front of the sequence */
+ sequenced<tag<SequencedTag> >
+ >
+ > list_t;
+ struct ConnectionLists
+ {
+ list_t d_actives;
+ list_t d_idles;
+ };
+
+public:
+ static void setMaxIdleConnectionsPerDownstream(size_t max)
+ {
+ s_maxIdleConnectionsPerDownstream = max;
+ }
+
+ static void setCleanupInterval(uint16_t interval)
+ {
+ s_cleanupInterval = interval;
+ }
+
+ static void setMaxIdleTime(uint16_t max)
+ {
+ s_maxIdleTime = max;
+ }
+
+ std::shared_ptr<T> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload)
+ {
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+
+ auto backendId = ds->getID();
+
+ cleanupClosedConnections(now);
+
+ const bool haveProxyProtocol = ds->useProxyProtocol || !proxyProtocolPayload.empty();
+ if (!haveProxyProtocol) {
+ const auto& it = d_downstreamConnections.find(backendId);
+ if (it != d_downstreamConnections.end()) {
+ /* first scan idle connections, more recent first */
+ auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true);
+ if (entry) {
+ ++ds->tcpReusedConnections;
+ it->second.d_actives.insert(entry);
+ return entry;
+ }
+
+ /* then scan actives ones, more recent first as well */
+ entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false);
+ if (entry) {
+ ++ds->tcpReusedConnections;
+ return entry;
+ }
+ }
+ }
+
+ auto newConnection = std::make_shared<T>(ds, mplexer, now, std::move(proxyProtocolPayload));
+ if (!haveProxyProtocol) {
+ auto& list = d_downstreamConnections[backendId].d_actives;
+ list.template get<SequencedTag>().push_front(newConnection);
+ }
+
+ return newConnection;
+ }
+
+ void cleanupClosedConnections(const struct timeval& now)
+ {
+ if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) {
+ return;
+ }
+
+ d_nextCleanup = now.tv_sec + s_cleanupInterval;
+
+ struct timeval freshCutOff = now;
+ freshCutOff.tv_sec -= 1;
+ struct timeval idleCutOff = now;
+ idleCutOff.tv_sec -= s_maxIdleTime;
+
+ for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) {
+ cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff);
+ cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff);
+
+ if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) {
+ dsIt = d_downstreamConnections.erase(dsIt);
+ }
+ else {
+ ++dsIt;
+ }
+ }
+ }
+
+ size_t clear()
+ {
+ size_t count = 0;
+ for (const auto& downstream : d_downstreamConnections) {
+ count += downstream.second.d_actives.size();
+ for (auto& conn : downstream.second.d_actives) {
+ conn->stopIO();
+ }
+ count += downstream.second.d_idles.size();
+ for (auto& conn : downstream.second.d_idles) {
+ conn->stopIO();
+ }
+ }
+
+ d_downstreamConnections.clear();
+ return count;
+ }
+
+ size_t count() const
+ {
+ return getActiveCount() + getIdleCount();
+ }
+
+ size_t getActiveCount() const
+ {
+ size_t count = 0;
+ for (const auto& downstream : d_downstreamConnections) {
+ count += downstream.second.d_actives.size();
+ }
+ return count;
+ }
+
+ size_t getIdleCount() const
+ {
+ size_t count = 0;
+ for (const auto& downstream : d_downstreamConnections) {
+ count += downstream.second.d_idles.size();
+ }
+ return count;
+ }
+
+ bool removeDownstreamConnection(std::shared_ptr<T>& conn)
+ {
+ auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
+ if (backendIt == d_downstreamConnections.end()) {
+ return false;
+ }
+
+ /* idle list first */
+ {
+ auto it = backendIt->second.d_idles.find(conn);
+ if (it != backendIt->second.d_idles.end()) {
+ backendIt->second.d_idles.erase(it);
+ return true;
+ }
+ }
+ /* then active */
+ {
+ auto it = backendIt->second.d_actives.find(conn);
+ if (it != backendIt->second.d_actives.end()) {
+ backendIt->second.d_actives.erase(it);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ bool moveToIdle(std::shared_ptr<T>& conn)
+ {
+ auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
+ if (backendIt == d_downstreamConnections.end()) {
+ return false;
+ }
+
+ auto it = backendIt->second.d_actives.find(conn);
+ if (it == backendIt->second.d_actives.end()) {
+ return false;
+ }
+
+ backendIt->second.d_actives.erase(it);
+
+ if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) {
+ backendIt->second.d_idles.template get<SequencedTag>().pop_back();
+ }
+
+ backendIt->second.d_idles.template get<SequencedTag>().push_front(conn);
+ return true;
+ }
+
+protected:
+
+ void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff)
+ {
+ auto& sidx = list.template get<SequencedTag>();
+ for (auto connIt = sidx.begin(); connIt != sidx.end(); ) {
+ if (!(*connIt)) {
+ connIt = sidx.erase(connIt);
+ continue;
+ }
+
+ auto& entry = *connIt;
+
+ /* don't bother checking freshly used connections */
+ if (freshCutOff < entry->getLastDataReceivedTime()) {
+ ++connIt;
+ continue;
+ }
+
+ if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
+ /* idle for too long */
+ connIt = sidx.erase(connIt);
+ continue;
+ }
+
+ if (entry->isUsable()) {
+ ++connIt;
+ continue;
+ }
+
+ connIt = sidx.erase(connIt);
+ }
+ }
+
+ std::shared_ptr<T> findUsableConnectionInList(const struct timeval& now, const struct timeval& freshCutOff, list_t& list, bool removeIfFound)
+ {
+ auto& sidx = list.template get<SequencedTag>();
+ for (auto listIt = sidx.begin(); listIt != sidx.end(); ) {
+ if (!(*listIt)) {
+ listIt = sidx.erase(listIt);
+ continue;
+ }
+
+ auto& entry = *listIt;
+ if (isConnectionUsable(entry, now, freshCutOff)) {
+ entry->setReused();
+ // make a copy since the iterator will be invalidated after erasing
+ auto result = entry;
+ if (removeIfFound) {
+ sidx.erase(listIt);
+ }
+ return result;
+ }
+
+ if (entry->willBeReusable(false)) {
+ ++listIt;
+ continue;
+ }
+
+ /* that connection will not be usable later, no need to keep it in that list */
+ listIt = sidx.erase(listIt);
+ }
+
+ return nullptr;
+ }
+
+ bool isConnectionUsable(const std::shared_ptr<T>& conn, const struct timeval& now, const struct timeval& freshCutOff)
+ {
+ if (!conn->canBeReused()) {
+ return false;
+ }
+
+ /* for connections that have not been used very recently,
+ check whether they have been closed in the meantime */
+ if (freshCutOff < conn->getLastDataReceivedTime()) {
+ /* used recently enough, skip the check */
+ return true;
+ }
+
+ return conn->isUsable();
+ }
+
+ static size_t s_maxIdleConnectionsPerDownstream;
+ static uint16_t s_cleanupInterval;
+ static uint16_t s_maxIdleTime;
+
+ std::map<boost::uuids::uuid, ConnectionLists> d_downstreamConnections;
+
+ time_t d_nextCleanup{0};
+};
+
+template <class T> size_t DownstreamConnectionsManager<T>::s_maxIdleConnectionsPerDownstream{10};
+template <class T> uint16_t DownstreamConnectionsManager<T>::s_cleanupInterval{60};
+template <class T> uint16_t DownstreamConnectionsManager<T>::s_maxIdleTime{300};
+
+using DownstreamTCPConnectionsManager = DownstreamConnectionsManager<TCPConnectionToBackend>;
+extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;