summaryrefslogtreecommitdiffstats
path: root/dnsdist-backend.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 21:14:49 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 21:14:49 +0000
commit2f230033794fafdf10822568e763d4db68cf6c6b (patch)
tree39ca5c2325b7b43c9a28ca6d4ad4026a61e7eb97 /dnsdist-backend.cc
parentAdding debian version 1.8.3-3. (diff)
downloaddnsdist-2f230033794fafdf10822568e763d4db68cf6c6b.tar.xz
dnsdist-2f230033794fafdf10822568e763d4db68cf6c6b.zip
Merging upstream version 1.9.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'dnsdist-backend.cc')
-rw-r--r--dnsdist-backend.cc174
1 files changed, 149 insertions, 25 deletions
diff --git a/dnsdist-backend.cc b/dnsdist-backend.cc
index eca469a..7f56034 100644
--- a/dnsdist-backend.cc
+++ b/dnsdist-backend.cc
@@ -19,25 +19,62 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-
+#include "config.h"
#include "dnsdist.hh"
#include "dnsdist-backoff.hh"
+#include "dnsdist-metrics.hh"
#include "dnsdist-nghttp2.hh"
#include "dnsdist-random.hh"
#include "dnsdist-rings.hh"
#include "dnsdist-tcp.hh"
+#include "dnsdist-xsk.hh"
#include "dolog.hh"
+#include "xsk.hh"
bool DownstreamState::passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq)
{
- if (d_config.d_dohPath.empty()) {
- return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq));
- }
- else {
+#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
+ if (!d_config.d_dohPath.empty()) {
return g_dohClientThreads && g_dohClientThreads->passCrossProtocolQueryToThread(std::move(cpq));
}
+#endif
+ return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq));
+}
+
+#ifdef HAVE_XSK
+void DownstreamState::addXSKDestination(int fd)
+{
+ auto socklen = d_config.remote.getSocklen();
+ ComboAddress local;
+ if (getsockname(fd, reinterpret_cast<sockaddr*>(&local), &socklen)) {
+ return;
+ }
+
+ {
+ auto addresses = d_socketSourceAddresses.write_lock();
+ addresses->push_back(local);
+ }
+ dnsdist::xsk::addDestinationAddress(local);
+ for (size_t idx = 0; idx < d_xskSockets.size(); idx++) {
+ d_xskSockets.at(idx)->addWorkerRoute(d_xskInfos.at(idx), local);
+ }
}
+void DownstreamState::removeXSKDestination(int fd)
+{
+ auto socklen = d_config.remote.getSocklen();
+ ComboAddress local;
+ if (getsockname(fd, reinterpret_cast<sockaddr*>(&local), &socklen)) {
+ return;
+ }
+
+ dnsdist::xsk::removeDestinationAddress(local);
+ for (auto& xskSocket : d_xskSockets) {
+ xskSocket->removeWorkerRoute(local);
+ }
+}
+#endif /* HAVE_XSK */
+
bool DownstreamState::reconnect(bool initialAttempt)
{
std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
@@ -51,11 +88,23 @@ bool DownstreamState::reconnect(bool initialAttempt)
}
connected = false;
+#ifdef HAVE_XSK
+ if (!d_xskInfos.empty()) {
+ auto addresses = d_socketSourceAddresses.write_lock();
+ addresses->clear();
+ }
+#endif /* HAVE_XSK */
+
for (auto& fd : sockets) {
if (fd != -1) {
if (sockets.size() > 1) {
(*mplexer.lock())->removeReadFD(fd);
}
+#ifdef HAVE_XSK
+ if (!d_xskInfos.empty()) {
+ removeXSKDestination(fd);
+ }
+#endif /* HAVE_XSK */
/* shutdown() is needed to wake up recv() in the responderThread */
shutdown(fd, SHUT_RDWR);
close(fd);
@@ -86,6 +135,11 @@ bool DownstreamState::reconnect(bool initialAttempt)
if (sockets.size() > 1) {
(*mplexer.lock())->addReadFD(fd, [](int, boost::any) {});
}
+#ifdef HAVE_XSK
+ if (!d_xskInfos.empty()) {
+ addXSKDestination(fd);
+ }
+#endif /* HAVE_XSK */
connected = true;
}
catch (const std::runtime_error& error) {
@@ -99,8 +153,19 @@ bool DownstreamState::reconnect(bool initialAttempt)
/* if at least one (re-)connection failed, close all sockets */
if (!connected) {
+#ifdef HAVE_XSK
+ if (!d_xskInfos.empty()) {
+ auto addresses = d_socketSourceAddresses.write_lock();
+ addresses->clear();
+ }
+#endif /* HAVE_XSK */
for (auto& fd : sockets) {
if (fd != -1) {
+#ifdef HAVE_XSK
+ if (!d_xskInfos.empty()) {
+ removeXSKDestination(fd);
+ }
+#endif /* HAVE_XSK */
if (sockets.size() > 1) {
try {
(*mplexer.lock())->removeReadFD(fd);
@@ -267,12 +332,20 @@ DownstreamState::DownstreamState(DownstreamState::Config&& config, std::shared_p
void DownstreamState::start()
{
if (connected && !threadStarted.test_and_set()) {
- tid = std::thread(responderThread, shared_from_this());
+#ifdef HAVE_XSK
+ for (auto& xskInfo : d_xskInfos) {
+ auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this(), xskInfo);
+ if (!d_config.d_cpus.empty()) {
+ mapThreadToCPUList(xskResponderThread.native_handle(), d_config.d_cpus);
+ }
+ xskResponderThread.detach();
+ }
+#endif /* HAVE_XSK */
+ auto tid = std::thread(responderThread, shared_from_this());
if (!d_config.d_cpus.empty()) {
mapThreadToCPUList(tid.native_handle(), d_config.d_cpus);
}
-
tid.detach();
}
}
@@ -360,10 +433,10 @@ void DownstreamState::handleUDPTimeout(IDState& ids)
{
ids.age = 0;
ids.inUse = false;
- handleDOHTimeout(std::move(ids.internal.du));
+ DOHUnitInterface::handleTimeout(std::move(ids.internal.du));
++reuseds;
--outstanding;
- ++g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
+ ++dnsdist::metrics::g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
d_config.remote.toStringWithPort(), getName(),
ids.internal.qname.toLogString(), QType(ids.internal.qtype).toString(), ids.internal.origRemote.toStringWithPort());
@@ -462,8 +535,8 @@ uint16_t DownstreamState::saveState(InternalQueryState&& state)
auto oldDU = std::move(it->second.internal.du);
++reuseds;
- ++g_stats.downstreamTimeouts;
- handleDOHTimeout(std::move(oldDU));
+ ++dnsdist::metrics::g_stats.downstreamTimeouts;
+ DOHUnitInterface::handleTimeout(std::move(oldDU));
}
else {
++outstanding;
@@ -489,8 +562,8 @@ uint16_t DownstreamState::saveState(InternalQueryState&& state)
to handle it because it's about to be overwritten. */
auto oldDU = std::move(ids.internal.du);
++reuseds;
- ++g_stats.downstreamTimeouts;
- handleDOHTimeout(std::move(oldDU));
+ ++dnsdist::metrics::g_stats.downstreamTimeouts;
+ DOHUnitInterface::handleTimeout(std::move(oldDU));
}
else {
++outstanding;
@@ -512,8 +585,8 @@ void DownstreamState::restoreState(uint16_t id, InternalQueryState&& state)
if (!inserted) {
/* already used */
++reuseds;
- ++g_stats.downstreamTimeouts;
- handleDOHTimeout(std::move(state.du));
+ ++dnsdist::metrics::g_stats.downstreamTimeouts;
+ DOHUnitInterface::handleTimeout(std::move(state.du));
}
else {
it->second.internal = std::move(state);
@@ -527,15 +600,15 @@ void DownstreamState::restoreState(uint16_t id, InternalQueryState&& state)
if (!guard) {
/* already used */
++reuseds;
- ++g_stats.downstreamTimeouts;
- handleDOHTimeout(std::move(state.du));
+ ++dnsdist::metrics::g_stats.downstreamTimeouts;
+ DOHUnitInterface::handleTimeout(std::move(state.du));
return;
}
if (ids.isInUse()) {
/* already used */
++reuseds;
- ++g_stats.downstreamTimeouts;
- handleDOHTimeout(std::move(state.du));
+ ++dnsdist::metrics::g_stats.downstreamTimeouts;
+ DOHUnitInterface::handleTimeout(std::move(state.du));
return;
}
ids.internal = std::move(state);
@@ -619,6 +692,7 @@ bool DownstreamState::healthCheckRequired(std::optional<time_t> currentTime)
lastResults.clear();
vinfolog("Backend %s reached the lazy health-check threshold (%f%% out of %f%%, looking at sample of %d items with %d failures), moving to Potential Failure state", getNameWithAddr(), current, maxFailureRate, totalCount, failures);
stats->d_status = LazyHealthCheckStats::LazyStatus::PotentialFailure;
+ consecutiveSuccessfulChecks = 0;
/* we update the next check time here because the check might time out,
and we do not want to send a second check during that time unless
the timer is actually very short */
@@ -678,7 +752,7 @@ void DownstreamState::updateNextLazyHealthCheck(LazyHealthCheckStats& stats, boo
time_t backOff = d_config.d_lazyHealthCheckMaxBackOff;
const ExponentialBackOffTimer backOffTimer(d_config.d_lazyHealthCheckMaxBackOff);
- auto backOffCoeffTmp = backOffTimer.get(failedTests);
+ auto backOffCoeffTmp = backOffTimer.get(failedTests - 1);
/* backOffCoeffTmp cannot be higher than d_config.d_lazyHealthCheckMaxBackOff */
const auto backOffCoeff = static_cast<time_t>(backOffCoeffTmp);
if ((std::numeric_limits<time_t>::max() / d_config.d_lazyHealthCheckFailedInterval) >= backOffCoeff) {
@@ -700,6 +774,10 @@ void DownstreamState::updateNextLazyHealthCheck(LazyHealthCheckStats& stats, boo
void DownstreamState::submitHealthCheckResult(bool initial, bool newResult)
{
+ if (!newResult) {
+ ++d_healthCheckMetrics.d_failures;
+ }
+
if (initial) {
/* if this is the initial health-check, at startup, we do not care
about the minimum number of failed/successful health-checks */
@@ -709,9 +787,11 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult)
setUpStatus(newResult);
if (newResult == false) {
currentCheckFailures++;
- auto stats = d_lazyHealthCheckStats.lock();
- stats->d_status = LazyHealthCheckStats::LazyStatus::Failed;
- updateNextLazyHealthCheck(*stats, false);
+ if (d_config.availability == DownstreamState::Availability::Lazy) {
+ auto stats = d_lazyHealthCheckStats.lock();
+ stats->d_status = LazyHealthCheckStats::LazyStatus::Failed;
+ updateNextLazyHealthCheck(*stats, false);
+ }
}
return;
}
@@ -721,12 +801,12 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult)
if (newResult) {
/* check succeeded */
currentCheckFailures = 0;
+ consecutiveSuccessfulChecks++;
if (!upStatus) {
/* we were previously marked as "down" and had a successful health-check,
let's see if this is enough to move to the "up" state or if we need
more successful health-checks for that */
- consecutiveSuccessfulChecks++;
if (consecutiveSuccessfulChecks < d_config.minRiseSuccesses) {
/* we need more than one successful check to rise
and we didn't reach the threshold yet, let's stay down */
@@ -767,7 +847,7 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult)
auto stats = d_lazyHealthCheckStats.lock();
vinfolog("Backend %s failed its health-check, moving from Potential failure to Failed", getNameWithAddr());
stats->d_status = LazyHealthCheckStats::LazyStatus::Failed;
- currentCheckFailures = 0;
+ currentCheckFailures = 1;
updateNextLazyHealthCheck(*stats, false);
}
}
@@ -790,6 +870,50 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult)
}
}
+#ifdef HAVE_XSK
+[[nodiscard]] ComboAddress DownstreamState::pickSourceAddressForSending()
+{
+ if (!connected) {
+ waitUntilConnected();
+ }
+
+ auto addresses = d_socketSourceAddresses.read_lock();
+ auto numberOfAddresses = addresses->size();
+ if (numberOfAddresses == 0) {
+ throw std::runtime_error("No source address available for sending XSK data to backend " + getNameWithAddr());
+ }
+ size_t idx = dnsdist::getRandomValue(numberOfAddresses);
+ return (*addresses)[idx % numberOfAddresses];
+}
+
+void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks)
+{
+ d_xskSockets = xsks;
+
+ if (d_config.sourceAddr.sin4.sin_family == 0 || (IsAnyAddress(d_config.sourceAddr))) {
+ const auto& ifName = xsks.at(0)->getInterfaceName();
+ auto addresses = getListOfAddressesOfNetworkInterface(ifName);
+ if (addresses.empty()) {
+ throw std::runtime_error("Unable to get source address from interface " + ifName);
+ }
+
+ if (addresses.size() > 1) {
+ warnlog("More than one address configured on interface %s, picking the first one (%s) for XSK. Set the 'source' parameter on 'newServer' if you want to use a different address.", ifName, addresses.at(0).toString());
+ }
+ d_config.sourceAddr = addresses.at(0);
+ }
+ d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress();
+
+ for (auto& xsk : d_xskSockets) {
+ auto xskInfo = XskWorker::create();
+ d_xskInfos.push_back(xskInfo);
+ xsk->addWorker(xskInfo);
+ xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
+ }
+ reconnect(false);
+}
+#endif /* HAVE_XSK */
+
size_t ServerPool::countServers(bool upOnly)
{
std::shared_ptr<const ServerPolicy::NumberedServerVector> servers = nullptr;