diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 21:14:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 21:14:48 +0000 |
commit | e10ff189aca57bba91933088195d4edda199cb20 (patch) | |
tree | 056237559582eba27e68fa864434436ac5b7f535 /dnsdist-backend.cc | |
parent | Adding upstream version 1.8.3. (diff) | |
download | dnsdist-e10ff189aca57bba91933088195d4edda199cb20.tar.xz dnsdist-e10ff189aca57bba91933088195d4edda199cb20.zip |
Adding upstream version 1.9.3.upstream/1.9.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'dnsdist-backend.cc')
-rw-r--r-- | dnsdist-backend.cc | 174 |
1 files changed, 149 insertions, 25 deletions
diff --git a/dnsdist-backend.cc b/dnsdist-backend.cc index eca469a..7f56034 100644 --- a/dnsdist-backend.cc +++ b/dnsdist-backend.cc @@ -19,25 +19,62 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ - +#include "config.h" #include "dnsdist.hh" #include "dnsdist-backoff.hh" +#include "dnsdist-metrics.hh" #include "dnsdist-nghttp2.hh" #include "dnsdist-random.hh" #include "dnsdist-rings.hh" #include "dnsdist-tcp.hh" +#include "dnsdist-xsk.hh" #include "dolog.hh" +#include "xsk.hh" bool DownstreamState::passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq) { - if (d_config.d_dohPath.empty()) { - return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq)); - } - else { +#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) + if (!d_config.d_dohPath.empty()) { return g_dohClientThreads && g_dohClientThreads->passCrossProtocolQueryToThread(std::move(cpq)); } +#endif + return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq)); +} + +#ifdef HAVE_XSK +void DownstreamState::addXSKDestination(int fd) +{ + auto socklen = d_config.remote.getSocklen(); + ComboAddress local; + if (getsockname(fd, reinterpret_cast<sockaddr*>(&local), &socklen)) { + return; + } + + { + auto addresses = d_socketSourceAddresses.write_lock(); + addresses->push_back(local); + } + dnsdist::xsk::addDestinationAddress(local); + for (size_t idx = 0; idx < d_xskSockets.size(); idx++) { + d_xskSockets.at(idx)->addWorkerRoute(d_xskInfos.at(idx), local); + } } +void DownstreamState::removeXSKDestination(int fd) +{ + auto socklen = d_config.remote.getSocklen(); + ComboAddress local; + if (getsockname(fd, reinterpret_cast<sockaddr*>(&local), &socklen)) { + return; + } + + dnsdist::xsk::removeDestinationAddress(local); + for (auto& xskSocket : d_xskSockets) { + xskSocket->removeWorkerRoute(local); + } +} +#endif /* HAVE_XSK */ + bool DownstreamState::reconnect(bool initialAttempt) { std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock); @@ -51,11 +88,23 @@ bool DownstreamState::reconnect(bool initialAttempt) } connected = false; +#ifdef HAVE_XSK + if (!d_xskInfos.empty()) { + auto addresses = d_socketSourceAddresses.write_lock(); + addresses->clear(); + } +#endif /* HAVE_XSK */ + for (auto& fd : sockets) { if (fd != -1) { if (sockets.size() > 1) { (*mplexer.lock())->removeReadFD(fd); } +#ifdef HAVE_XSK + if (!d_xskInfos.empty()) { + removeXSKDestination(fd); + } +#endif /* HAVE_XSK */ /* shutdown() is needed to wake up recv() in the responderThread */ shutdown(fd, SHUT_RDWR); close(fd); @@ -86,6 +135,11 @@ bool DownstreamState::reconnect(bool initialAttempt) if (sockets.size() > 1) { (*mplexer.lock())->addReadFD(fd, [](int, boost::any) {}); } +#ifdef HAVE_XSK + if (!d_xskInfos.empty()) { + addXSKDestination(fd); + } +#endif /* HAVE_XSK */ connected = true; } catch (const std::runtime_error& error) { @@ -99,8 +153,19 @@ bool DownstreamState::reconnect(bool initialAttempt) /* if at least one (re-)connection failed, close all sockets */ if (!connected) { +#ifdef HAVE_XSK + if (!d_xskInfos.empty()) { + auto addresses = d_socketSourceAddresses.write_lock(); + addresses->clear(); + } +#endif /* HAVE_XSK */ for (auto& fd : sockets) { if (fd != -1) { +#ifdef HAVE_XSK + if (!d_xskInfos.empty()) { + removeXSKDestination(fd); + } +#endif /* HAVE_XSK */ if (sockets.size() > 1) { try { (*mplexer.lock())->removeReadFD(fd); @@ -267,12 +332,20 @@ DownstreamState::DownstreamState(DownstreamState::Config&& config, std::shared_p void DownstreamState::start() { if (connected && !threadStarted.test_and_set()) { - tid = std::thread(responderThread, shared_from_this()); +#ifdef HAVE_XSK + for (auto& xskInfo : d_xskInfos) { + auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this(), xskInfo); + if (!d_config.d_cpus.empty()) { + mapThreadToCPUList(xskResponderThread.native_handle(), d_config.d_cpus); + } + xskResponderThread.detach(); + } +#endif /* HAVE_XSK */ + auto tid = std::thread(responderThread, shared_from_this()); if (!d_config.d_cpus.empty()) { mapThreadToCPUList(tid.native_handle(), d_config.d_cpus); } - tid.detach(); } } @@ -360,10 +433,10 @@ void DownstreamState::handleUDPTimeout(IDState& ids) { ids.age = 0; ids.inUse = false; - handleDOHTimeout(std::move(ids.internal.du)); + DOHUnitInterface::handleTimeout(std::move(ids.internal.du)); ++reuseds; --outstanding; - ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout + ++dnsdist::metrics::g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s", d_config.remote.toStringWithPort(), getName(), ids.internal.qname.toLogString(), QType(ids.internal.qtype).toString(), ids.internal.origRemote.toStringWithPort()); @@ -462,8 +535,8 @@ uint16_t DownstreamState::saveState(InternalQueryState&& state) auto oldDU = std::move(it->second.internal.du); ++reuseds; - ++g_stats.downstreamTimeouts; - handleDOHTimeout(std::move(oldDU)); + ++dnsdist::metrics::g_stats.downstreamTimeouts; + DOHUnitInterface::handleTimeout(std::move(oldDU)); } else { ++outstanding; @@ -489,8 +562,8 @@ uint16_t DownstreamState::saveState(InternalQueryState&& state) to handle it because it's about to be overwritten. */ auto oldDU = std::move(ids.internal.du); ++reuseds; - ++g_stats.downstreamTimeouts; - handleDOHTimeout(std::move(oldDU)); + ++dnsdist::metrics::g_stats.downstreamTimeouts; + DOHUnitInterface::handleTimeout(std::move(oldDU)); } else { ++outstanding; @@ -512,8 +585,8 @@ void DownstreamState::restoreState(uint16_t id, InternalQueryState&& state) if (!inserted) { /* already used */ ++reuseds; - ++g_stats.downstreamTimeouts; - handleDOHTimeout(std::move(state.du)); + ++dnsdist::metrics::g_stats.downstreamTimeouts; + DOHUnitInterface::handleTimeout(std::move(state.du)); } else { it->second.internal = std::move(state); @@ -527,15 +600,15 @@ void DownstreamState::restoreState(uint16_t id, InternalQueryState&& state) if (!guard) { /* already used */ ++reuseds; - ++g_stats.downstreamTimeouts; - handleDOHTimeout(std::move(state.du)); + ++dnsdist::metrics::g_stats.downstreamTimeouts; + DOHUnitInterface::handleTimeout(std::move(state.du)); return; } if (ids.isInUse()) { /* already used */ ++reuseds; - ++g_stats.downstreamTimeouts; - handleDOHTimeout(std::move(state.du)); + ++dnsdist::metrics::g_stats.downstreamTimeouts; + DOHUnitInterface::handleTimeout(std::move(state.du)); return; } ids.internal = std::move(state); @@ -619,6 +692,7 @@ bool DownstreamState::healthCheckRequired(std::optional<time_t> currentTime) lastResults.clear(); vinfolog("Backend %s reached the lazy health-check threshold (%f%% out of %f%%, looking at sample of %d items with %d failures), moving to Potential Failure state", getNameWithAddr(), current, maxFailureRate, totalCount, failures); stats->d_status = LazyHealthCheckStats::LazyStatus::PotentialFailure; + consecutiveSuccessfulChecks = 0; /* we update the next check time here because the check might time out, and we do not want to send a second check during that time unless the timer is actually very short */ @@ -678,7 +752,7 @@ void DownstreamState::updateNextLazyHealthCheck(LazyHealthCheckStats& stats, boo time_t backOff = d_config.d_lazyHealthCheckMaxBackOff; const ExponentialBackOffTimer backOffTimer(d_config.d_lazyHealthCheckMaxBackOff); - auto backOffCoeffTmp = backOffTimer.get(failedTests); + auto backOffCoeffTmp = backOffTimer.get(failedTests - 1); /* backOffCoeffTmp cannot be higher than d_config.d_lazyHealthCheckMaxBackOff */ const auto backOffCoeff = static_cast<time_t>(backOffCoeffTmp); if ((std::numeric_limits<time_t>::max() / d_config.d_lazyHealthCheckFailedInterval) >= backOffCoeff) { @@ -700,6 +774,10 @@ void DownstreamState::updateNextLazyHealthCheck(LazyHealthCheckStats& stats, boo void DownstreamState::submitHealthCheckResult(bool initial, bool newResult) { + if (!newResult) { + ++d_healthCheckMetrics.d_failures; + } + if (initial) { /* if this is the initial health-check, at startup, we do not care about the minimum number of failed/successful health-checks */ @@ -709,9 +787,11 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult) setUpStatus(newResult); if (newResult == false) { currentCheckFailures++; - auto stats = d_lazyHealthCheckStats.lock(); - stats->d_status = LazyHealthCheckStats::LazyStatus::Failed; - updateNextLazyHealthCheck(*stats, false); + if (d_config.availability == DownstreamState::Availability::Lazy) { + auto stats = d_lazyHealthCheckStats.lock(); + stats->d_status = LazyHealthCheckStats::LazyStatus::Failed; + updateNextLazyHealthCheck(*stats, false); + } } return; } @@ -721,12 +801,12 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult) if (newResult) { /* check succeeded */ currentCheckFailures = 0; + consecutiveSuccessfulChecks++; if (!upStatus) { /* we were previously marked as "down" and had a successful health-check, let's see if this is enough to move to the "up" state or if we need more successful health-checks for that */ - consecutiveSuccessfulChecks++; if (consecutiveSuccessfulChecks < d_config.minRiseSuccesses) { /* we need more than one successful check to rise and we didn't reach the threshold yet, let's stay down */ @@ -767,7 +847,7 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult) auto stats = d_lazyHealthCheckStats.lock(); vinfolog("Backend %s failed its health-check, moving from Potential failure to Failed", getNameWithAddr()); stats->d_status = LazyHealthCheckStats::LazyStatus::Failed; - currentCheckFailures = 0; + currentCheckFailures = 1; updateNextLazyHealthCheck(*stats, false); } } @@ -790,6 +870,50 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult) } } +#ifdef HAVE_XSK +[[nodiscard]] ComboAddress DownstreamState::pickSourceAddressForSending() +{ + if (!connected) { + waitUntilConnected(); + } + + auto addresses = d_socketSourceAddresses.read_lock(); + auto numberOfAddresses = addresses->size(); + if (numberOfAddresses == 0) { + throw std::runtime_error("No source address available for sending XSK data to backend " + getNameWithAddr()); + } + size_t idx = dnsdist::getRandomValue(numberOfAddresses); + return (*addresses)[idx % numberOfAddresses]; +} + +void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks) +{ + d_xskSockets = xsks; + + if (d_config.sourceAddr.sin4.sin_family == 0 || (IsAnyAddress(d_config.sourceAddr))) { + const auto& ifName = xsks.at(0)->getInterfaceName(); + auto addresses = getListOfAddressesOfNetworkInterface(ifName); + if (addresses.empty()) { + throw std::runtime_error("Unable to get source address from interface " + ifName); + } + + if (addresses.size() > 1) { + warnlog("More than one address configured on interface %s, picking the first one (%s) for XSK. Set the 'source' parameter on 'newServer' if you want to use a different address.", ifName, addresses.at(0).toString()); + } + d_config.sourceAddr = addresses.at(0); + } + d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress(); + + for (auto& xsk : d_xskSockets) { + auto xskInfo = XskWorker::create(); + d_xskInfos.push_back(xskInfo); + xsk->addWorker(xskInfo); + xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset; + } + reconnect(false); +} +#endif /* HAVE_XSK */ + size_t ServerPool::countServers(bool upOnly) { std::shared_ptr<const ServerPolicy::NumberedServerVector> servers = nullptr; |