summaryrefslogtreecommitdiffstats
path: root/dnsdist-nghttp2.cc
diff options
context:
space:
mode:
Diffstat (limited to 'dnsdist-nghttp2.cc')
-rw-r--r--dnsdist-nghttp2.cc386
1 files changed, 150 insertions, 236 deletions
diff --git a/dnsdist-nghttp2.cc b/dnsdist-nghttp2.cc
index 5c745ea..bf1666f 100644
--- a/dnsdist-nghttp2.cc
+++ b/dnsdist-nghttp2.cc
@@ -22,16 +22,18 @@
#include "config.h"
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
#include <nghttp2/nghttp2.h>
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
#include "dnsdist-nghttp2.hh"
+#include "dnsdist-nghttp2-in.hh"
#include "dnsdist-tcp.hh"
#include "dnsdist-tcp-downstream.hh"
#include "dnsdist-downstream-connection.hh"
#include "dolog.hh"
+#include "channel.hh"
#include "iputils.hh"
#include "libssl.hh"
#include "noinitvector.hh"
@@ -43,7 +45,7 @@ std::atomic<uint64_t> g_dohStatesDumpRequested{0};
std::unique_ptr<DoHClientCollection> g_dohClientThreads{nullptr};
std::optional<uint16_t> g_outgoingDoHWorkerThreads{std::nullopt};
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
class DoHConnectionToBackend : public ConnectionToBackend
{
public:
@@ -82,9 +84,6 @@ private:
static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
static void handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
- static void addStaticHeader(std::vector<nghttp2_nv>& headers, const std::string& nameKey, const std::string& valueKey);
- static void addDynamicHeader(std::vector<nghttp2_nv>& headers, const std::string& nameKey, const std::string& value);
-
class PendingRequest
{
public:
@@ -95,8 +94,7 @@ private:
uint16_t d_responseCode{0};
bool d_finished{false};
};
- void addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback);
- void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD = false);
+ void updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD = false);
void watchForRemoteHostClosingConnection();
void handleResponse(PendingRequest&& request);
void handleResponseError(PendingRequest&& request, const struct timeval& now);
@@ -132,7 +130,11 @@ uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const
void DoHConnectionToBackend::handleResponse(PendingRequest&& request)
{
- struct timeval now;
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
+
gettimeofday(&now, nullptr);
try {
if (!d_healthCheckQuery) {
@@ -148,7 +150,11 @@ void DoHConnectionToBackend::handleResponse(PendingRequest&& request)
}
}
- request.d_sender->handleResponse(now, TCPResponse(std::move(request.d_buffer), std::move(request.d_query.d_idstate), shared_from_this(), d_ds));
+ TCPResponse response(std::move(request.d_query));
+ response.d_buffer = std::move(request.d_buffer);
+ response.d_connection = shared_from_this();
+ response.d_ds = d_ds;
+ request.d_sender->handleResponse(now, std::move(response));
}
catch (const std::exception& e) {
vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
@@ -162,7 +168,8 @@ void DoHConnectionToBackend::handleResponseError(PendingRequest&& request, const
d_ds->reportTimeoutOrError();
}
- request.d_sender->notifyIOError(std::move(request.d_query.d_idstate), now);
+ TCPResponse response(PacketBuffer(), std::move(request.d_query.d_idstate), nullptr, nullptr);
+ request.d_sender->notifyIOError(now, std::move(response));
}
catch (const std::exception& e) {
vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
@@ -174,7 +181,11 @@ void DoHConnectionToBackend::handleIOError()
d_connectionDied = true;
nghttp2_session_terminate_session(d_session.get(), NGHTTP2_PROTOCOL_ERROR);
- struct timeval now;
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
+
gettimeofday(&now, nullptr);
for (auto& request : d_currentStreams) {
handleResponseError(std::move(request.second), now);
@@ -221,45 +232,6 @@ bool DoHConnectionToBackend::isIdle() const
return getConcurrentStreamsCount() == 0;
}
-const std::unordered_map<std::string, std::string> DoHConnectionToBackend::s_constants = {
- {"method-name", ":method"},
- {"method-value", "POST"},
- {"scheme-name", ":scheme"},
- {"scheme-value", "https"},
- {"accept-name", "accept"},
- {"accept-value", "application/dns-message"},
- {"content-type-name", "content-type"},
- {"content-type-value", "application/dns-message"},
- {"user-agent-name", "user-agent"},
- {"user-agent-value", "nghttp2-" NGHTTP2_VERSION "/dnsdist"},
- {"authority-name", ":authority"},
- {"path-name", ":path"},
- {"content-length-name", "content-length"},
- {"x-forwarded-for-name", "x-forwarded-for"},
- {"x-forwarded-port-name", "x-forwarded-port"},
- {"x-forwarded-proto-name", "x-forwarded-proto"},
- {"x-forwarded-proto-value-dns-over-udp", "dns-over-udp"},
- {"x-forwarded-proto-value-dns-over-tcp", "dns-over-tcp"},
- {"x-forwarded-proto-value-dns-over-tls", "dns-over-tls"},
- {"x-forwarded-proto-value-dns-over-http", "dns-over-http"},
- {"x-forwarded-proto-value-dns-over-https", "dns-over-https"},
-};
-
-void DoHConnectionToBackend::addStaticHeader(std::vector<nghttp2_nv>& headers, const std::string& nameKey, const std::string& valueKey)
-{
- const auto& name = s_constants.at(nameKey);
- const auto& value = s_constants.at(valueKey);
-
- headers.push_back({const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(name.c_str())), const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(value.c_str())), name.size(), value.size(), NGHTTP2_NV_FLAG_NO_COPY_NAME | NGHTTP2_NV_FLAG_NO_COPY_VALUE});
-}
-
-void DoHConnectionToBackend::addDynamicHeader(std::vector<nghttp2_nv>& headers, const std::string& nameKey, const std::string& value)
-{
- const auto& name = s_constants.at(nameKey);
-
- headers.push_back({const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(name.c_str())), const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(value.c_str())), name.size(), value.size(), NGHTTP2_NV_FLAG_NO_COPY_NAME | NGHTTP2_NV_FLAG_NO_COPY_VALUE});
-}
-
void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
{
auto payloadSize = std::to_string(query.d_buffer.size());
@@ -275,37 +247,37 @@ void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender,
headers.reserve(8 + (addXForwarded ? 3 : 0));
/* Pseudo-headers need to come first (rfc7540 8.1.2.1) */
- addStaticHeader(headers, "method-name", "method-value");
- addStaticHeader(headers, "scheme-name", "scheme-value");
- addDynamicHeader(headers, "authority-name", d_ds->d_config.d_tlsSubjectName);
- addDynamicHeader(headers, "path-name", d_ds->d_config.d_dohPath);
- addStaticHeader(headers, "accept-name", "accept-value");
- addStaticHeader(headers, "content-type-name", "content-type-value");
- addStaticHeader(headers, "user-agent-name", "user-agent-value");
- addDynamicHeader(headers, "content-length-name", payloadSize);
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::METHOD_NAME, NGHTTP2Headers::HeaderConstantIndexes::METHOD_VALUE);
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_NAME, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_VALUE);
+ NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::AUTHORITY_NAME, d_ds->d_config.d_tlsSubjectName);
+ NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::PATH_NAME, d_ds->d_config.d_dohPath);
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_NAME, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_VALUE);
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_NAME, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_VALUE);
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_NAME, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_VALUE);
+ NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_LENGTH_NAME, payloadSize);
/* no need to add these headers for health-check queries */
if (addXForwarded && query.d_idstate.origRemote.getPort() != 0) {
remote = query.d_idstate.origRemote.toString();
remotePort = std::to_string(query.d_idstate.origRemote.getPort());
- addDynamicHeader(headers, "x-forwarded-for-name", remote);
- addDynamicHeader(headers, "x-forwarded-port-name", remotePort);
+ NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_FOR_NAME, remote);
+ NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PORT_NAME, remotePort);
if (query.d_idstate.cs != nullptr) {
if (query.d_idstate.cs->isUDP()) {
- addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-udp");
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_UDP);
}
else if (query.d_idstate.cs->isDoH()) {
if (query.d_idstate.cs->hasTLS()) {
- addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-https");
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTPS);
}
else {
- addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-http");
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTP);
}
}
else if (query.d_idstate.cs->hasTLS()) {
- addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-tls");
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TLS);
}
else {
- addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-tcp");
+ NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TCP);
}
}
}
@@ -327,10 +299,9 @@ void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender,
*/
nghttp2_data_provider data_provider;
- /* we will not use this pointer */
data_provider.source.ptr = this;
data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t {
- auto conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
+ auto* conn = static_cast<DoHConnectionToBackend*>(user_data);
auto& request = conn->d_currentStreams.at(stream_id);
size_t toCopy = 0;
if (request.d_queryPos < request.d_query.d_buffer.size()) {
@@ -368,12 +339,14 @@ void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender,
class DoHClientThreadData
{
public:
- DoHClientThreadData() :
- mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent()))
+ DoHClientThreadData(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) :
+ mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent())),
+ d_receiver(std::move(receiver))
{
}
std::unique_ptr<FDMultiplexer> mplexer{nullptr};
+ pdns::channel::Receiver<CrossProtocolQuery> d_receiver;
};
void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
@@ -403,7 +376,11 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun
throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen)));
}
- struct timeval now;
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
+
gettimeofday(&now, nullptr);
conn->d_lastDataReceivedTime = now;
@@ -491,9 +468,13 @@ void DoHConnectionToBackend::stopIO()
}
}
-void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD)
+void DoHConnectionToBackend::updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD)
{
- struct timeval now;
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
+
gettimeofday(&now, nullptr);
boost::optional<struct timeval> ttd{boost::none};
if (!noTTD) {
@@ -515,10 +496,10 @@ void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackf
auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
if (shared) {
if (newState == IOState::NeedRead) {
- d_ioState->update(newState, callback, shared, ttd);
+ d_ioState->update(newState, callback, std::move(shared), ttd);
}
else if (newState == IOState::NeedWrite) {
- d_ioState->update(newState, callback, shared, ttd);
+ d_ioState->update(newState, callback, std::move(shared), ttd);
}
}
}
@@ -530,33 +511,6 @@ void DoHConnectionToBackend::watchForRemoteHostClosingConnection()
}
}
-void DoHConnectionToBackend::addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback)
-{
- struct timeval now;
- gettimeofday(&now, nullptr);
- boost::optional<struct timeval> ttd{boost::none};
- if (state == IOState::NeedRead) {
- ttd = getBackendReadTTD(now);
- }
- else if (isFresh() && d_firstWrite == 0) {
- /* first write just after the non-blocking connect */
- ttd = getBackendConnectTTD(now);
- }
- else {
- ttd = getBackendWriteTTD(now);
- }
-
- auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
- if (shared) {
- if (state == IOState::NeedRead) {
- d_ioState->add(state, callback, shared, ttd);
- }
- else if (state == IOState::NeedWrite) {
- d_ioState->add(state, callback, shared, ttd);
- }
- }
-}
-
ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data)
{
DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
@@ -646,7 +600,11 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con
}
else {
vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
- struct timeval now;
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
+
gettimeofday(&now, nullptr);
conn->handleResponseError(std::move(request), now);
@@ -698,7 +656,11 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session
}
else {
vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
- struct timeval now;
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
+
gettimeofday(&now, nullptr);
conn->handleResponseError(std::move(request), now);
@@ -730,7 +692,11 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i
return 0;
}
- struct timeval now;
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
+
gettimeofday(&now, nullptr);
auto request = std::move(stream->second);
conn->d_currentStreams.erase(stream->first);
@@ -856,55 +822,53 @@ DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamS
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
{
auto threadData = boost::any_cast<DoHClientThreadData*>(param);
- CrossProtocolQuery* tmp{nullptr};
- ssize_t got = read(pipefd, &tmp, sizeof(tmp));
- if (got == 0) {
- throw std::runtime_error("EOF while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
- }
- else if (got == -1) {
- if (errno == EAGAIN || errno == EINTR) {
+ std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
+ try {
+ auto tmp = threadData->d_receiver.receive();
+ if (!tmp) {
return;
}
- throw std::runtime_error("Error while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror());
+ cpq = std::move(*tmp);
}
- else if (got != sizeof(tmp)) {
- throw std::runtime_error("Partial read while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
+ catch (const std::exception& e) {
+ throw std::runtime_error("Error while reading from the DoH cross-protocol channel:" + std::string(e.what()));
}
- try {
- struct timeval now;
- gettimeofday(&now, nullptr);
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
+ gettimeofday(&now, nullptr);
- std::shared_ptr<TCPQuerySender> tqs = tmp->getTCPQuerySender();
- auto query = std::move(tmp->query);
- auto downstreamServer = std::move(tmp->downstream);
- delete tmp;
- tmp = nullptr;
+ std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
+ auto query = std::move(cpq->query);
+ auto downstreamServer = std::move(cpq->downstream);
+ cpq.reset();
- try {
- auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
- downstream->queueQuery(tqs, std::move(query));
- }
- catch (...) {
- tqs->notifyIOError(std::move(query.d_idstate), now);
- }
+ try {
+ auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
+ downstream->queueQuery(tqs, std::move(query));
}
catch (...) {
- delete tmp;
- tmp = nullptr;
+ TCPResponse response(std::move(query));
+ tqs->notifyIOError(now, std::move(response));
}
}
-static void dohClientThread(int crossProtocolPipeFD)
+static void dohClientThread(pdns::channel::Receiver<CrossProtocolQuery>&& receiver)
{
setThreadName("dnsdist/dohClie");
try {
- DoHClientThreadData data;
- data.mplexer->addReadFD(crossProtocolPipeFD, handleCrossProtocolQuery, &data);
+ DoHClientThreadData data(std::move(receiver));
+ data.mplexer->addReadFD(data.d_receiver.getDescriptor(), handleCrossProtocolQuery, &data);
+
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
- struct timeval now;
gettimeofday(&now, nullptr);
time_t lastTimeoutScan = now.tv_sec;
@@ -925,31 +889,31 @@ static void dohClientThread(int crossProtocolPipeFD)
if (g_dohStatesDumpRequested > 0) {
/* no race here, we took the lock so it can only be increased in the meantime */
--g_dohStatesDumpRequested;
- errlog("Dumping the DoH client states, as requested:");
+ infolog("Dumping the DoH client states, as requested:");
data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
struct timeval lnow;
gettimeofday(&lnow, nullptr);
if (ttd.tv_sec > 0) {
- errlog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
+ infolog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
}
else {
- errlog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write"));
+ infolog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write"));
}
if (param.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
- errlog(" - %s", conn->toString());
+ infolog(" - %s", conn->toString());
}
else if (param.type() == typeid(DoHClientThreadData*)) {
- errlog(" - Worker thread pipe");
+ infolog(" - Worker thread pipe");
}
});
- errlog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount());
+ infolog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount());
}
}
}
catch (const std::exception& e) {
- errlog("Error in outgoing DoH thread: %s", e.what());
+ warnlog("Error in outgoing DoH thread: %s", e.what());
}
}
}
@@ -968,7 +932,7 @@ static bool select_next_proto_callback(unsigned char** out, unsigned char* outle
return true;
}
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
struct DoHClientCollection::DoHWorkerThread
{
@@ -976,40 +940,26 @@ struct DoHClientCollection::DoHWorkerThread
{
}
- DoHWorkerThread(int crossProtocolPipe) :
- d_crossProtocolQueryPipe(crossProtocolPipe)
+ DoHWorkerThread(pdns::channel::Sender<CrossProtocolQuery>&& sender) :
+ d_sender(std::move(sender))
{
}
DoHWorkerThread(DoHWorkerThread&& rhs) :
- d_crossProtocolQueryPipe(rhs.d_crossProtocolQueryPipe)
+ d_sender(std::move(rhs.d_sender))
{
- rhs.d_crossProtocolQueryPipe = -1;
}
DoHWorkerThread& operator=(DoHWorkerThread&& rhs)
{
- if (d_crossProtocolQueryPipe != -1) {
- close(d_crossProtocolQueryPipe);
- }
-
- d_crossProtocolQueryPipe = rhs.d_crossProtocolQueryPipe;
- rhs.d_crossProtocolQueryPipe = -1;
-
+ d_sender = std::move(rhs.d_sender);
return *this;
}
DoHWorkerThread(const DoHWorkerThread& rhs) = delete;
DoHWorkerThread& operator=(const DoHWorkerThread&) = delete;
- ~DoHWorkerThread()
- {
- if (d_crossProtocolQueryPipe != -1) {
- close(d_crossProtocolQueryPipe);
- }
- }
-
- int d_crossProtocolQueryPipe{-1};
+ pdns::channel::Sender<CrossProtocolQuery> d_sender;
};
DoHClientCollection::DoHClientCollection(size_t numberOfThreads) :
@@ -1024,13 +974,8 @@ bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossPr
}
uint64_t pos = d_pos++;
- auto pipe = d_clientThreads.at(pos % d_numberOfThreads).d_crossProtocolQueryPipe;
- auto tmp = cpq.release();
-
- if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
- delete tmp;
- ++g_stats.outgoingDoHQueryPipeFull;
- tmp = nullptr;
+ if (!d_clientThreads.at(pos % d_numberOfThreads).d_sender.send(std::move(cpq))) {
+ ++dnsdist::metrics::g_stats.outgoingDoHQueryPipeFull;
return false;
}
@@ -1039,78 +984,44 @@ bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossPr
void DoHClientCollection::addThread()
{
-#ifdef HAVE_NGHTTP2
- auto preparePipe = [](int fds[2], const std::string& type) -> bool {
- if (pipe(fds) < 0) {
- errlog("Error creating the DoH thread %s pipe: %s", type, stringerror());
- return false;
- }
-
- if (!setNonBlocking(fds[0])) {
- int err = errno;
- close(fds[0]);
- close(fds[1]);
- errlog("Error setting the DoH thread %s pipe non-blocking: %s", type, stringerror(err));
- return false;
- }
-
- if (!setNonBlocking(fds[1])) {
- int err = errno;
- close(fds[0]);
- close(fds[1]);
- errlog("Error setting the DoH thread %s pipe non-blocking: %s", type, stringerror(err));
- return false;
- }
-
- if (g_tcpInternalPipeBufferSize > 0 && getPipeBufferSize(fds[0]) < g_tcpInternalPipeBufferSize) {
- setPipeBufferSize(fds[0], g_tcpInternalPipeBufferSize);
- }
-
- return true;
- };
-
- int crossProtocolFDs[2] = {-1, -1};
- if (!preparePipe(crossProtocolFDs, "cross-protocol")) {
- return;
- }
-
- vinfolog("Adding DoH Client thread");
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
+ try {
+ auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
- {
+ vinfolog("Adding DoH Client thread");
std::lock_guard<std::mutex> lock(d_mutex);
if (d_numberOfThreads >= d_clientThreads.size()) {
vinfolog("Adding a new DoH client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of DoH client threads with setMaxDoHClientThreads() in the configuration.", d_numberOfThreads, d_clientThreads.size());
- close(crossProtocolFDs[0]);
- close(crossProtocolFDs[1]);
return;
}
- /* from now on this side of the pipe will be managed by that object,
- no need to worry about it */
- DoHWorkerThread worker(crossProtocolFDs[1]);
+ DoHWorkerThread worker(std::move(sender));
try {
- std::thread t1(dohClientThread, crossProtocolFDs[0]);
+ std::thread t1(dohClientThread, std::move(receiver));
t1.detach();
}
catch (const std::runtime_error& e) {
- /* the thread creation failed, don't leak */
+ /* the thread creation failed */
errlog("Error creating a DoH thread: %s", e.what());
- close(crossProtocolFDs[0]);
return;
}
d_clientThreads.at(d_numberOfThreads) = std::move(worker);
++d_numberOfThreads;
}
-#else /* HAVE_NGHTTP2 */
+ catch (const std::exception& e) {
+ errlog("Error creating the DoH channel: %s", e.what());
+ return;
+ }
+#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
throw std::runtime_error("DoHClientCollection::addThread() called but nghttp2 support is not available");
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
}
bool initDoHWorkers()
{
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
if (!g_outgoingDoHWorkerThreads) {
/* Unless the value has been set to 0 explicitly, always start at least one outgoing DoH worker thread, in case a DoH backend
is added at a later time. */
@@ -1126,7 +1037,7 @@ bool initDoHWorkers()
return true;
#else
return false;
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
}
bool setupDoHClientProtocolNegotiation(std::shared_ptr<TLSCtx>& ctx)
@@ -1134,21 +1045,24 @@ bool setupDoHClientProtocolNegotiation(std::shared_ptr<TLSCtx>& ctx)
if (ctx == nullptr) {
return false;
}
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
/* we want to set the ALPN to h2, if only to mitigate the ALPACA attack */
const std::vector<std::vector<uint8_t>> h2Alpns = {{'h', '2'}};
ctx->setALPNProtos(h2Alpns);
ctx->setNextProtocolSelectCallback(select_next_proto_callback);
return true;
-#else /* HAVE_NGHTTP2 */
+#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
return false;
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
}
bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<TCPQuerySender>& sender, InternalQuery&& query, bool healthCheck)
{
-#ifdef HAVE_NGHTTP2
- struct timeval now;
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
+ struct timeval now
+ {
+ .tv_sec = 0, .tv_usec = 0
+ };
gettimeofday(&now, nullptr);
if (healthCheck) {
@@ -1163,24 +1077,24 @@ bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDM
}
return true;
-#else /* HAVE_NGHTTP2 */
+#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
return false;
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
}
size_t clearH2Connections()
{
size_t cleared = 0;
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
cleared = t_downstreamDoHConnectionsManager.clear();
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
return cleared;
}
size_t handleH2Timeouts(FDMultiplexer& mplexer, const struct timeval& now)
{
size_t got = 0;
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
auto expiredReadConns = mplexer.getTimeouts(now, false);
for (const auto& cbData : expiredReadConns) {
if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
@@ -1200,27 +1114,27 @@ size_t handleH2Timeouts(FDMultiplexer& mplexer, const struct timeval& now)
++got;
}
}
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
return got;
}
void setDoHDownstreamCleanupInterval(uint16_t max)
{
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
DownstreamDoHConnectionsManager::setCleanupInterval(max);
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
}
void setDoHDownstreamMaxIdleTime(uint16_t max)
{
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
DownstreamDoHConnectionsManager::setMaxIdleTime(max);
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
}
void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max)
{
-#ifdef HAVE_NGHTTP2
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
-#endif /* HAVE_NGHTTP2 */
+#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
}