summaryrefslogtreecommitdiffstats
path: root/dnsdist-tcp.hh
diff options
context:
space:
mode:
Diffstat (limited to 'dnsdist-tcp.hh')
-rw-r--r--dnsdist-tcp.hh60
1 files changed, 32 insertions, 28 deletions
diff --git a/dnsdist-tcp.hh b/dnsdist-tcp.hh
index 3d11f1a..f3d827e 100644
--- a/dnsdist-tcp.hh
+++ b/dnsdist-tcp.hh
@@ -21,9 +21,12 @@
*/
#pragma once
+#include <optional>
#include <unistd.h>
+#include "channel.hh"
#include "iputils.hh"
#include "dnsdist.hh"
+#include "dnsdist-metrics.hh"
struct ConnectionInfo
{
@@ -98,12 +101,11 @@ public:
InternalQueryState d_idstate;
std::string d_proxyProtocolPayload;
PacketBuffer d_buffer;
- uint32_t d_proxyProtocolPayloadAddedSize{0};
uint32_t d_ixfrQuerySerial{0};
- uint32_t d_xfrMasterSerial{0};
+ uint32_t d_xfrPrimarySerial{0};
uint32_t d_xfrSerialCount{0};
uint32_t d_downstreamFailures{0};
- uint8_t d_xfrMasterSerialCount{0};
+ uint8_t d_xfrPrimarySerialCount{0};
bool d_xfrStarted{false};
bool d_proxyProtocolPayloadAdded{false};
};
@@ -121,10 +123,23 @@ struct TCPResponse : public TCPQuery
}
TCPResponse(PacketBuffer&& buffer, InternalQueryState&& state, std::shared_ptr<ConnectionToBackend> conn, std::shared_ptr<DownstreamState> ds) :
- TCPQuery(std::move(buffer), std::move(state)), d_connection(conn), d_ds(ds)
+ TCPQuery(std::move(buffer), std::move(state)), d_connection(std::move(conn)), d_ds(std::move(ds))
{
if (d_buffer.size() >= sizeof(dnsheader)) {
- memcpy(&d_cleartextDH, reinterpret_cast<const dnsheader*>(d_buffer.data()), sizeof(d_cleartextDH));
+ dnsheader_aligned header(d_buffer.data());
+ memcpy(&d_cleartextDH, header.get(), sizeof(d_cleartextDH));
+ }
+ else {
+ memset(&d_cleartextDH, 0, sizeof(d_cleartextDH));
+ }
+ }
+
+ TCPResponse(TCPQuery&& query) :
+ TCPQuery(std::move(query))
+ {
+ if (d_buffer.size() >= sizeof(dnsheader)) {
+ dnsheader_aligned header(d_buffer.data());
+ memcpy(&d_cleartextDH, header.get(), sizeof(d_cleartextDH));
}
else {
memset(&d_cleartextDH, 0, sizeof(d_cleartextDH));
@@ -152,7 +167,7 @@ public:
virtual bool active() const = 0;
virtual void handleResponse(const struct timeval& now, TCPResponse&& response) = 0;
virtual void handleXFRResponse(const struct timeval& now, TCPResponse&& response) = 0;
- virtual void notifyIOError(InternalQueryState&& query, const struct timeval& now) = 0;
+ virtual void notifyIOError(const struct timeval& now, TCPResponse&& response) = 0;
/* whether the connection should be automatically released to the pool after handleResponse()
has been called */
@@ -197,14 +212,13 @@ struct CrossProtocolQuery
InternalQuery query;
std::shared_ptr<DownstreamState> downstream{nullptr};
- size_t proxyProtocolPayloadSize{0};
bool d_isResponse{false};
};
class TCPClientCollection
{
public:
- TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpStates);
+ TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpAcceptStates);
bool passConnectionToThread(std::unique_ptr<ConnectionInfo>&& conn)
{
@@ -213,20 +227,16 @@ public:
}
uint64_t pos = d_pos++;
- auto pipe = d_tcpclientthreads.at(pos % d_numthreads).d_newConnectionPipe.getHandle();
- auto tmp = conn.release();
-
/* we need to increment this counter _before_ writing to the pipe,
otherwise there is a very real possiblity that the other end
decrement the counter before we can increment it, leading to an underflow */
++d_queued;
- if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+ if (!d_tcpclientthreads.at(pos % d_numthreads).d_querySender.send(std::move(conn))) {
--d_queued;
- ++g_stats.tcpQueryPipeFull;
- delete tmp;
- tmp = nullptr;
+ ++dnsdist::metrics::g_stats.tcpQueryPipeFull;
return false;
}
+
return true;
}
@@ -237,13 +247,8 @@ public:
}
uint64_t pos = d_pos++;
- auto pipe = d_tcpclientthreads.at(pos % d_numthreads).d_crossProtocolQueriesPipe.getHandle();
- auto tmp = cpq.release();
-
- if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
- ++g_stats.tcpCrossProtocolQueryPipeFull;
- delete tmp;
- tmp = nullptr;
+ if (!d_tcpclientthreads.at(pos % d_numthreads).d_crossProtocolQuerySender.send(std::move(cpq))) {
+ ++dnsdist::metrics::g_stats.tcpCrossProtocolQueryPipeFull;
return false;
}
@@ -279,8 +284,8 @@ private:
{
}
- TCPWorkerThread(int newConnPipe, int crossProtocolQueriesPipe, int crossProtocolResponsesPipe) :
- d_newConnectionPipe(newConnPipe), d_crossProtocolQueriesPipe(crossProtocolQueriesPipe), d_crossProtocolResponsesPipe(crossProtocolResponsesPipe)
+ TCPWorkerThread(pdns::channel::Sender<ConnectionInfo>&& querySender, pdns::channel::Sender<CrossProtocolQuery>&& crossProtocolQuerySender) :
+ d_querySender(std::move(querySender)), d_crossProtocolQuerySender(std::move(crossProtocolQuerySender))
{
}
@@ -289,9 +294,8 @@ private:
TCPWorkerThread(const TCPWorkerThread& rhs) = delete;
TCPWorkerThread& operator=(const TCPWorkerThread&) = delete;
- FDWrapper d_newConnectionPipe;
- FDWrapper d_crossProtocolQueriesPipe;
- FDWrapper d_crossProtocolResponsesPipe;
+ pdns::channel::Sender<ConnectionInfo> d_querySender;
+ pdns::channel::Sender<CrossProtocolQuery> d_crossProtocolQuerySender;
};
std::vector<TCPWorkerThread> d_tcpclientthreads;
@@ -303,4 +307,4 @@ private:
extern std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
-std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dq);
+std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion);