diff options
Diffstat (limited to '')
-rw-r--r-- | dnsdist-nghttp2.cc | 386 |
1 files changed, 150 insertions, 236 deletions
diff --git a/dnsdist-nghttp2.cc b/dnsdist-nghttp2.cc index 5c745ea..bf1666f 100644 --- a/dnsdist-nghttp2.cc +++ b/dnsdist-nghttp2.cc @@ -22,16 +22,18 @@ #include "config.h" -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) #include <nghttp2/nghttp2.h> -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ #include "dnsdist-nghttp2.hh" +#include "dnsdist-nghttp2-in.hh" #include "dnsdist-tcp.hh" #include "dnsdist-tcp-downstream.hh" #include "dnsdist-downstream-connection.hh" #include "dolog.hh" +#include "channel.hh" #include "iputils.hh" #include "libssl.hh" #include "noinitvector.hh" @@ -43,7 +45,7 @@ std::atomic<uint64_t> g_dohStatesDumpRequested{0}; std::unique_ptr<DoHClientCollection> g_dohClientThreads{nullptr}; std::optional<uint16_t> g_outgoingDoHWorkerThreads{std::nullopt}; -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) class DoHConnectionToBackend : public ConnectionToBackend { public: @@ -82,9 +84,6 @@ private: static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param); static void handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param); - static void addStaticHeader(std::vector<nghttp2_nv>& headers, const std::string& nameKey, const std::string& valueKey); - static void addDynamicHeader(std::vector<nghttp2_nv>& headers, const std::string& nameKey, const std::string& value); - class PendingRequest { public: @@ -95,8 +94,7 @@ private: uint16_t d_responseCode{0}; bool d_finished{false}; }; - void addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback); - void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD = false); + void updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD = false); void watchForRemoteHostClosingConnection(); void handleResponse(PendingRequest&& request); void handleResponseError(PendingRequest&& request, const struct timeval& now); @@ -132,7 +130,11 @@ uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const void DoHConnectionToBackend::handleResponse(PendingRequest&& request) { - struct timeval now; + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; + gettimeofday(&now, nullptr); try { if (!d_healthCheckQuery) { @@ -148,7 +150,11 @@ void DoHConnectionToBackend::handleResponse(PendingRequest&& request) } } - request.d_sender->handleResponse(now, TCPResponse(std::move(request.d_buffer), std::move(request.d_query.d_idstate), shared_from_this(), d_ds)); + TCPResponse response(std::move(request.d_query)); + response.d_buffer = std::move(request.d_buffer); + response.d_connection = shared_from_this(); + response.d_ds = d_ds; + request.d_sender->handleResponse(now, std::move(response)); } catch (const std::exception& e) { vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what()); @@ -162,7 +168,8 @@ void DoHConnectionToBackend::handleResponseError(PendingRequest&& request, const d_ds->reportTimeoutOrError(); } - request.d_sender->notifyIOError(std::move(request.d_query.d_idstate), now); + TCPResponse response(PacketBuffer(), std::move(request.d_query.d_idstate), nullptr, nullptr); + request.d_sender->notifyIOError(now, std::move(response)); } catch (const std::exception& e) { vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what()); @@ -174,7 +181,11 @@ void DoHConnectionToBackend::handleIOError() d_connectionDied = true; nghttp2_session_terminate_session(d_session.get(), NGHTTP2_PROTOCOL_ERROR); - struct timeval now; + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; + gettimeofday(&now, nullptr); for (auto& request : d_currentStreams) { handleResponseError(std::move(request.second), now); @@ -221,45 +232,6 @@ bool DoHConnectionToBackend::isIdle() const return getConcurrentStreamsCount() == 0; } -const std::unordered_map<std::string, std::string> DoHConnectionToBackend::s_constants = { - {"method-name", ":method"}, - {"method-value", "POST"}, - {"scheme-name", ":scheme"}, - {"scheme-value", "https"}, - {"accept-name", "accept"}, - {"accept-value", "application/dns-message"}, - {"content-type-name", "content-type"}, - {"content-type-value", "application/dns-message"}, - {"user-agent-name", "user-agent"}, - {"user-agent-value", "nghttp2-" NGHTTP2_VERSION "/dnsdist"}, - {"authority-name", ":authority"}, - {"path-name", ":path"}, - {"content-length-name", "content-length"}, - {"x-forwarded-for-name", "x-forwarded-for"}, - {"x-forwarded-port-name", "x-forwarded-port"}, - {"x-forwarded-proto-name", "x-forwarded-proto"}, - {"x-forwarded-proto-value-dns-over-udp", "dns-over-udp"}, - {"x-forwarded-proto-value-dns-over-tcp", "dns-over-tcp"}, - {"x-forwarded-proto-value-dns-over-tls", "dns-over-tls"}, - {"x-forwarded-proto-value-dns-over-http", "dns-over-http"}, - {"x-forwarded-proto-value-dns-over-https", "dns-over-https"}, -}; - -void DoHConnectionToBackend::addStaticHeader(std::vector<nghttp2_nv>& headers, const std::string& nameKey, const std::string& valueKey) -{ - const auto& name = s_constants.at(nameKey); - const auto& value = s_constants.at(valueKey); - - headers.push_back({const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(name.c_str())), const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(value.c_str())), name.size(), value.size(), NGHTTP2_NV_FLAG_NO_COPY_NAME | NGHTTP2_NV_FLAG_NO_COPY_VALUE}); -} - -void DoHConnectionToBackend::addDynamicHeader(std::vector<nghttp2_nv>& headers, const std::string& nameKey, const std::string& value) -{ - const auto& name = s_constants.at(nameKey); - - headers.push_back({const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(name.c_str())), const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(value.c_str())), name.size(), value.size(), NGHTTP2_NV_FLAG_NO_COPY_NAME | NGHTTP2_NV_FLAG_NO_COPY_VALUE}); -} - void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) { auto payloadSize = std::to_string(query.d_buffer.size()); @@ -275,37 +247,37 @@ void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, headers.reserve(8 + (addXForwarded ? 3 : 0)); /* Pseudo-headers need to come first (rfc7540 8.1.2.1) */ - addStaticHeader(headers, "method-name", "method-value"); - addStaticHeader(headers, "scheme-name", "scheme-value"); - addDynamicHeader(headers, "authority-name", d_ds->d_config.d_tlsSubjectName); - addDynamicHeader(headers, "path-name", d_ds->d_config.d_dohPath); - addStaticHeader(headers, "accept-name", "accept-value"); - addStaticHeader(headers, "content-type-name", "content-type-value"); - addStaticHeader(headers, "user-agent-name", "user-agent-value"); - addDynamicHeader(headers, "content-length-name", payloadSize); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::METHOD_NAME, NGHTTP2Headers::HeaderConstantIndexes::METHOD_VALUE); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_NAME, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_VALUE); + NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::AUTHORITY_NAME, d_ds->d_config.d_tlsSubjectName); + NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::PATH_NAME, d_ds->d_config.d_dohPath); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_NAME, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_VALUE); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_NAME, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_VALUE); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_NAME, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_VALUE); + NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_LENGTH_NAME, payloadSize); /* no need to add these headers for health-check queries */ if (addXForwarded && query.d_idstate.origRemote.getPort() != 0) { remote = query.d_idstate.origRemote.toString(); remotePort = std::to_string(query.d_idstate.origRemote.getPort()); - addDynamicHeader(headers, "x-forwarded-for-name", remote); - addDynamicHeader(headers, "x-forwarded-port-name", remotePort); + NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_FOR_NAME, remote); + NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PORT_NAME, remotePort); if (query.d_idstate.cs != nullptr) { if (query.d_idstate.cs->isUDP()) { - addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-udp"); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_UDP); } else if (query.d_idstate.cs->isDoH()) { if (query.d_idstate.cs->hasTLS()) { - addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-https"); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTPS); } else { - addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-http"); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTP); } } else if (query.d_idstate.cs->hasTLS()) { - addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-tls"); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TLS); } else { - addStaticHeader(headers, "x-forwarded-proto-name", "x-forwarded-proto-value-dns-over-tcp"); + NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TCP); } } } @@ -327,10 +299,9 @@ void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, */ nghttp2_data_provider data_provider; - /* we will not use this pointer */ data_provider.source.ptr = this; data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t { - auto conn = reinterpret_cast<DoHConnectionToBackend*>(user_data); + auto* conn = static_cast<DoHConnectionToBackend*>(user_data); auto& request = conn->d_currentStreams.at(stream_id); size_t toCopy = 0; if (request.d_queryPos < request.d_query.d_buffer.size()) { @@ -368,12 +339,14 @@ void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, class DoHClientThreadData { public: - DoHClientThreadData() : - mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent())) + DoHClientThreadData(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) : + mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent())), + d_receiver(std::move(receiver)) { } std::unique_ptr<FDMultiplexer> mplexer{nullptr}; + pdns::channel::Receiver<CrossProtocolQuery> d_receiver; }; void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param) @@ -403,7 +376,11 @@ void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::fun throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen))); } - struct timeval now; + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; + gettimeofday(&now, nullptr); conn->d_lastDataReceivedTime = now; @@ -491,9 +468,13 @@ void DoHConnectionToBackend::stopIO() } } -void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD) +void DoHConnectionToBackend::updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD) { - struct timeval now; + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; + gettimeofday(&now, nullptr); boost::optional<struct timeval> ttd{boost::none}; if (!noTTD) { @@ -515,10 +496,10 @@ void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackf auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this()); if (shared) { if (newState == IOState::NeedRead) { - d_ioState->update(newState, callback, shared, ttd); + d_ioState->update(newState, callback, std::move(shared), ttd); } else if (newState == IOState::NeedWrite) { - d_ioState->update(newState, callback, shared, ttd); + d_ioState->update(newState, callback, std::move(shared), ttd); } } } @@ -530,33 +511,6 @@ void DoHConnectionToBackend::watchForRemoteHostClosingConnection() } } -void DoHConnectionToBackend::addToIOState(IOState state, FDMultiplexer::callbackfunc_t callback) -{ - struct timeval now; - gettimeofday(&now, nullptr); - boost::optional<struct timeval> ttd{boost::none}; - if (state == IOState::NeedRead) { - ttd = getBackendReadTTD(now); - } - else if (isFresh() && d_firstWrite == 0) { - /* first write just after the non-blocking connect */ - ttd = getBackendConnectTTD(now); - } - else { - ttd = getBackendWriteTTD(now); - } - - auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this()); - if (shared) { - if (state == IOState::NeedRead) { - d_ioState->add(state, callback, shared, ttd); - } - else if (state == IOState::NeedWrite) { - d_ioState->add(state, callback, shared, ttd); - } - } -} - ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data) { DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data); @@ -646,7 +600,11 @@ int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, con } else { vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode); - struct timeval now; + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; + gettimeofday(&now, nullptr); conn->handleResponseError(std::move(request), now); @@ -698,7 +656,11 @@ int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session } else { vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode); - struct timeval now; + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; + gettimeofday(&now, nullptr); conn->handleResponseError(std::move(request), now); @@ -730,7 +692,11 @@ int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, i return 0; } - struct timeval now; + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; + gettimeofday(&now, nullptr); auto request = std::move(stream->second); conn->d_currentStreams.erase(stream->first); @@ -856,55 +822,53 @@ DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamS static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param) { auto threadData = boost::any_cast<DoHClientThreadData*>(param); - CrossProtocolQuery* tmp{nullptr}; - ssize_t got = read(pipefd, &tmp, sizeof(tmp)); - if (got == 0) { - throw std::runtime_error("EOF while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode"); - } - else if (got == -1) { - if (errno == EAGAIN || errno == EINTR) { + std::unique_ptr<CrossProtocolQuery> cpq{nullptr}; + try { + auto tmp = threadData->d_receiver.receive(); + if (!tmp) { return; } - throw std::runtime_error("Error while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror()); + cpq = std::move(*tmp); } - else if (got != sizeof(tmp)) { - throw std::runtime_error("Partial read while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode"); + catch (const std::exception& e) { + throw std::runtime_error("Error while reading from the DoH cross-protocol channel:" + std::string(e.what())); } - try { - struct timeval now; - gettimeofday(&now, nullptr); + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; + gettimeofday(&now, nullptr); - std::shared_ptr<TCPQuerySender> tqs = tmp->getTCPQuerySender(); - auto query = std::move(tmp->query); - auto downstreamServer = std::move(tmp->downstream); - delete tmp; - tmp = nullptr; + std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender(); + auto query = std::move(cpq->query); + auto downstreamServer = std::move(cpq->downstream); + cpq.reset(); - try { - auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload)); - downstream->queueQuery(tqs, std::move(query)); - } - catch (...) { - tqs->notifyIOError(std::move(query.d_idstate), now); - } + try { + auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload)); + downstream->queueQuery(tqs, std::move(query)); } catch (...) { - delete tmp; - tmp = nullptr; + TCPResponse response(std::move(query)); + tqs->notifyIOError(now, std::move(response)); } } -static void dohClientThread(int crossProtocolPipeFD) +static void dohClientThread(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) { setThreadName("dnsdist/dohClie"); try { - DoHClientThreadData data; - data.mplexer->addReadFD(crossProtocolPipeFD, handleCrossProtocolQuery, &data); + DoHClientThreadData data(std::move(receiver)); + data.mplexer->addReadFD(data.d_receiver.getDescriptor(), handleCrossProtocolQuery, &data); + + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; - struct timeval now; gettimeofday(&now, nullptr); time_t lastTimeoutScan = now.tv_sec; @@ -925,31 +889,31 @@ static void dohClientThread(int crossProtocolPipeFD) if (g_dohStatesDumpRequested > 0) { /* no race here, we took the lock so it can only be increased in the meantime */ --g_dohStatesDumpRequested; - errlog("Dumping the DoH client states, as requested:"); + infolog("Dumping the DoH client states, as requested:"); data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd) { struct timeval lnow; gettimeofday(&lnow, nullptr); if (ttd.tv_sec > 0) { - errlog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec)); + infolog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec)); } else { - errlog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write")); + infolog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write")); } if (param.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) { auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param); - errlog(" - %s", conn->toString()); + infolog(" - %s", conn->toString()); } else if (param.type() == typeid(DoHClientThreadData*)) { - errlog(" - Worker thread pipe"); + infolog(" - Worker thread pipe"); } }); - errlog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount()); + infolog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount()); } } } catch (const std::exception& e) { - errlog("Error in outgoing DoH thread: %s", e.what()); + warnlog("Error in outgoing DoH thread: %s", e.what()); } } } @@ -968,7 +932,7 @@ static bool select_next_proto_callback(unsigned char** out, unsigned char* outle return true; } -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ struct DoHClientCollection::DoHWorkerThread { @@ -976,40 +940,26 @@ struct DoHClientCollection::DoHWorkerThread { } - DoHWorkerThread(int crossProtocolPipe) : - d_crossProtocolQueryPipe(crossProtocolPipe) + DoHWorkerThread(pdns::channel::Sender<CrossProtocolQuery>&& sender) : + d_sender(std::move(sender)) { } DoHWorkerThread(DoHWorkerThread&& rhs) : - d_crossProtocolQueryPipe(rhs.d_crossProtocolQueryPipe) + d_sender(std::move(rhs.d_sender)) { - rhs.d_crossProtocolQueryPipe = -1; } DoHWorkerThread& operator=(DoHWorkerThread&& rhs) { - if (d_crossProtocolQueryPipe != -1) { - close(d_crossProtocolQueryPipe); - } - - d_crossProtocolQueryPipe = rhs.d_crossProtocolQueryPipe; - rhs.d_crossProtocolQueryPipe = -1; - + d_sender = std::move(rhs.d_sender); return *this; } DoHWorkerThread(const DoHWorkerThread& rhs) = delete; DoHWorkerThread& operator=(const DoHWorkerThread&) = delete; - ~DoHWorkerThread() - { - if (d_crossProtocolQueryPipe != -1) { - close(d_crossProtocolQueryPipe); - } - } - - int d_crossProtocolQueryPipe{-1}; + pdns::channel::Sender<CrossProtocolQuery> d_sender; }; DoHClientCollection::DoHClientCollection(size_t numberOfThreads) : @@ -1024,13 +974,8 @@ bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossPr } uint64_t pos = d_pos++; - auto pipe = d_clientThreads.at(pos % d_numberOfThreads).d_crossProtocolQueryPipe; - auto tmp = cpq.release(); - - if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) { - delete tmp; - ++g_stats.outgoingDoHQueryPipeFull; - tmp = nullptr; + if (!d_clientThreads.at(pos % d_numberOfThreads).d_sender.send(std::move(cpq))) { + ++dnsdist::metrics::g_stats.outgoingDoHQueryPipeFull; return false; } @@ -1039,78 +984,44 @@ bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossPr void DoHClientCollection::addThread() { -#ifdef HAVE_NGHTTP2 - auto preparePipe = [](int fds[2], const std::string& type) -> bool { - if (pipe(fds) < 0) { - errlog("Error creating the DoH thread %s pipe: %s", type, stringerror()); - return false; - } - - if (!setNonBlocking(fds[0])) { - int err = errno; - close(fds[0]); - close(fds[1]); - errlog("Error setting the DoH thread %s pipe non-blocking: %s", type, stringerror(err)); - return false; - } - - if (!setNonBlocking(fds[1])) { - int err = errno; - close(fds[0]); - close(fds[1]); - errlog("Error setting the DoH thread %s pipe non-blocking: %s", type, stringerror(err)); - return false; - } - - if (g_tcpInternalPipeBufferSize > 0 && getPipeBufferSize(fds[0]) < g_tcpInternalPipeBufferSize) { - setPipeBufferSize(fds[0], g_tcpInternalPipeBufferSize); - } - - return true; - }; - - int crossProtocolFDs[2] = {-1, -1}; - if (!preparePipe(crossProtocolFDs, "cross-protocol")) { - return; - } - - vinfolog("Adding DoH Client thread"); +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) + try { + auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize); - { + vinfolog("Adding DoH Client thread"); std::lock_guard<std::mutex> lock(d_mutex); if (d_numberOfThreads >= d_clientThreads.size()) { vinfolog("Adding a new DoH client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of DoH client threads with setMaxDoHClientThreads() in the configuration.", d_numberOfThreads, d_clientThreads.size()); - close(crossProtocolFDs[0]); - close(crossProtocolFDs[1]); return; } - /* from now on this side of the pipe will be managed by that object, - no need to worry about it */ - DoHWorkerThread worker(crossProtocolFDs[1]); + DoHWorkerThread worker(std::move(sender)); try { - std::thread t1(dohClientThread, crossProtocolFDs[0]); + std::thread t1(dohClientThread, std::move(receiver)); t1.detach(); } catch (const std::runtime_error& e) { - /* the thread creation failed, don't leak */ + /* the thread creation failed */ errlog("Error creating a DoH thread: %s", e.what()); - close(crossProtocolFDs[0]); return; } d_clientThreads.at(d_numberOfThreads) = std::move(worker); ++d_numberOfThreads; } -#else /* HAVE_NGHTTP2 */ + catch (const std::exception& e) { + errlog("Error creating the DoH channel: %s", e.what()); + return; + } +#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ throw std::runtime_error("DoHClientCollection::addThread() called but nghttp2 support is not available"); -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ } bool initDoHWorkers() { -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) if (!g_outgoingDoHWorkerThreads) { /* Unless the value has been set to 0 explicitly, always start at least one outgoing DoH worker thread, in case a DoH backend is added at a later time. */ @@ -1126,7 +1037,7 @@ bool initDoHWorkers() return true; #else return false; -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ } bool setupDoHClientProtocolNegotiation(std::shared_ptr<TLSCtx>& ctx) @@ -1134,21 +1045,24 @@ bool setupDoHClientProtocolNegotiation(std::shared_ptr<TLSCtx>& ctx) if (ctx == nullptr) { return false; } -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) /* we want to set the ALPN to h2, if only to mitigate the ALPACA attack */ const std::vector<std::vector<uint8_t>> h2Alpns = {{'h', '2'}}; ctx->setALPNProtos(h2Alpns); ctx->setNextProtocolSelectCallback(select_next_proto_callback); return true; -#else /* HAVE_NGHTTP2 */ +#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ return false; -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ } bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<TCPQuerySender>& sender, InternalQuery&& query, bool healthCheck) { -#ifdef HAVE_NGHTTP2 - struct timeval now; +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) + struct timeval now + { + .tv_sec = 0, .tv_usec = 0 + }; gettimeofday(&now, nullptr); if (healthCheck) { @@ -1163,24 +1077,24 @@ bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDM } return true; -#else /* HAVE_NGHTTP2 */ +#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ return false; -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ } size_t clearH2Connections() { size_t cleared = 0; -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) cleared = t_downstreamDoHConnectionsManager.clear(); -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ return cleared; } size_t handleH2Timeouts(FDMultiplexer& mplexer, const struct timeval& now) { size_t got = 0; -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) auto expiredReadConns = mplexer.getTimeouts(now, false); for (const auto& cbData : expiredReadConns) { if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) { @@ -1200,27 +1114,27 @@ size_t handleH2Timeouts(FDMultiplexer& mplexer, const struct timeval& now) ++got; } } -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ return got; } void setDoHDownstreamCleanupInterval(uint16_t max) { -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) DownstreamDoHConnectionsManager::setCleanupInterval(max); -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ } void setDoHDownstreamMaxIdleTime(uint16_t max) { -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) DownstreamDoHConnectionsManager::setMaxIdleTime(max); -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ } void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max) { -#ifdef HAVE_NGHTTP2 +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max); -#endif /* HAVE_NGHTTP2 */ +#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ } |