diff options
Diffstat (limited to 'dnsdist-tcp.hh')
-rw-r--r-- | dnsdist-tcp.hh | 60 |
1 files changed, 32 insertions, 28 deletions
diff --git a/dnsdist-tcp.hh b/dnsdist-tcp.hh index 3d11f1a..f3d827e 100644 --- a/dnsdist-tcp.hh +++ b/dnsdist-tcp.hh @@ -21,9 +21,12 @@ */ #pragma once +#include <optional> #include <unistd.h> +#include "channel.hh" #include "iputils.hh" #include "dnsdist.hh" +#include "dnsdist-metrics.hh" struct ConnectionInfo { @@ -98,12 +101,11 @@ public: InternalQueryState d_idstate; std::string d_proxyProtocolPayload; PacketBuffer d_buffer; - uint32_t d_proxyProtocolPayloadAddedSize{0}; uint32_t d_ixfrQuerySerial{0}; - uint32_t d_xfrMasterSerial{0}; + uint32_t d_xfrPrimarySerial{0}; uint32_t d_xfrSerialCount{0}; uint32_t d_downstreamFailures{0}; - uint8_t d_xfrMasterSerialCount{0}; + uint8_t d_xfrPrimarySerialCount{0}; bool d_xfrStarted{false}; bool d_proxyProtocolPayloadAdded{false}; }; @@ -121,10 +123,23 @@ struct TCPResponse : public TCPQuery } TCPResponse(PacketBuffer&& buffer, InternalQueryState&& state, std::shared_ptr<ConnectionToBackend> conn, std::shared_ptr<DownstreamState> ds) : - TCPQuery(std::move(buffer), std::move(state)), d_connection(conn), d_ds(ds) + TCPQuery(std::move(buffer), std::move(state)), d_connection(std::move(conn)), d_ds(std::move(ds)) { if (d_buffer.size() >= sizeof(dnsheader)) { - memcpy(&d_cleartextDH, reinterpret_cast<const dnsheader*>(d_buffer.data()), sizeof(d_cleartextDH)); + dnsheader_aligned header(d_buffer.data()); + memcpy(&d_cleartextDH, header.get(), sizeof(d_cleartextDH)); + } + else { + memset(&d_cleartextDH, 0, sizeof(d_cleartextDH)); + } + } + + TCPResponse(TCPQuery&& query) : + TCPQuery(std::move(query)) + { + if (d_buffer.size() >= sizeof(dnsheader)) { + dnsheader_aligned header(d_buffer.data()); + memcpy(&d_cleartextDH, header.get(), sizeof(d_cleartextDH)); } else { memset(&d_cleartextDH, 0, sizeof(d_cleartextDH)); @@ -152,7 +167,7 @@ public: virtual bool active() const = 0; virtual void handleResponse(const struct timeval& now, TCPResponse&& response) = 0; virtual void handleXFRResponse(const struct timeval& now, TCPResponse&& response) = 0; - virtual void notifyIOError(InternalQueryState&& query, const struct timeval& now) = 0; + virtual void notifyIOError(const struct timeval& now, TCPResponse&& response) = 0; /* whether the connection should be automatically released to the pool after handleResponse() has been called */ @@ -197,14 +212,13 @@ struct CrossProtocolQuery InternalQuery query; std::shared_ptr<DownstreamState> downstream{nullptr}; - size_t proxyProtocolPayloadSize{0}; bool d_isResponse{false}; }; class TCPClientCollection { public: - TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpStates); + TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpAcceptStates); bool passConnectionToThread(std::unique_ptr<ConnectionInfo>&& conn) { @@ -213,20 +227,16 @@ public: } uint64_t pos = d_pos++; - auto pipe = d_tcpclientthreads.at(pos % d_numthreads).d_newConnectionPipe.getHandle(); - auto tmp = conn.release(); - /* we need to increment this counter _before_ writing to the pipe, otherwise there is a very real possiblity that the other end decrement the counter before we can increment it, leading to an underflow */ ++d_queued; - if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) { + if (!d_tcpclientthreads.at(pos % d_numthreads).d_querySender.send(std::move(conn))) { --d_queued; - ++g_stats.tcpQueryPipeFull; - delete tmp; - tmp = nullptr; + ++dnsdist::metrics::g_stats.tcpQueryPipeFull; return false; } + return true; } @@ -237,13 +247,8 @@ public: } uint64_t pos = d_pos++; - auto pipe = d_tcpclientthreads.at(pos % d_numthreads).d_crossProtocolQueriesPipe.getHandle(); - auto tmp = cpq.release(); - - if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) { - ++g_stats.tcpCrossProtocolQueryPipeFull; - delete tmp; - tmp = nullptr; + if (!d_tcpclientthreads.at(pos % d_numthreads).d_crossProtocolQuerySender.send(std::move(cpq))) { + ++dnsdist::metrics::g_stats.tcpCrossProtocolQueryPipeFull; return false; } @@ -279,8 +284,8 @@ private: { } - TCPWorkerThread(int newConnPipe, int crossProtocolQueriesPipe, int crossProtocolResponsesPipe) : - d_newConnectionPipe(newConnPipe), d_crossProtocolQueriesPipe(crossProtocolQueriesPipe), d_crossProtocolResponsesPipe(crossProtocolResponsesPipe) + TCPWorkerThread(pdns::channel::Sender<ConnectionInfo>&& querySender, pdns::channel::Sender<CrossProtocolQuery>&& crossProtocolQuerySender) : + d_querySender(std::move(querySender)), d_crossProtocolQuerySender(std::move(crossProtocolQuerySender)) { } @@ -289,9 +294,8 @@ private: TCPWorkerThread(const TCPWorkerThread& rhs) = delete; TCPWorkerThread& operator=(const TCPWorkerThread&) = delete; - FDWrapper d_newConnectionPipe; - FDWrapper d_crossProtocolQueriesPipe; - FDWrapper d_crossProtocolResponsesPipe; + pdns::channel::Sender<ConnectionInfo> d_querySender; + pdns::channel::Sender<CrossProtocolQuery> d_crossProtocolQuerySender; }; std::vector<TCPWorkerThread> d_tcpclientthreads; @@ -303,4 +307,4 @@ private: extern std::unique_ptr<TCPClientCollection> g_tcpclientthreads; -std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dq); +std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion); |