diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 21:11:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 21:11:59 +0000 |
commit | 3cd01b932e1c85394272ae64fae67ebeda92fb00 (patch) | |
tree | c5a3115d710afc1879ddea5349362a2bc651733c /dnsdist-healthchecks.cc | |
parent | Initial commit. (diff) | |
download | dnsdist-3cd01b932e1c85394272ae64fae67ebeda92fb00.tar.xz dnsdist-3cd01b932e1c85394272ae64fae67ebeda92fb00.zip |
Adding upstream version 1.8.3.upstream/1.8.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'dnsdist-healthchecks.cc')
-rw-r--r-- | dnsdist-healthchecks.cc | 488 |
1 files changed, 488 insertions, 0 deletions
diff --git a/dnsdist-healthchecks.cc b/dnsdist-healthchecks.cc new file mode 100644 index 0000000..dc77022 --- /dev/null +++ b/dnsdist-healthchecks.cc @@ -0,0 +1,488 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "dnsdist-healthchecks.hh" +#include "tcpiohandler-mplexer.hh" +#include "dnswriter.hh" +#include "dolog.hh" +#include "dnsdist-random.hh" +#include "dnsdist-tcp.hh" +#include "dnsdist-nghttp2.hh" +#include "dnsdist-session-cache.hh" + +bool g_verboseHealthChecks{false}; + +struct HealthCheckData +{ + enum class TCPState : uint8_t { WritingQuery, ReadingResponseSize, ReadingResponse }; + + HealthCheckData(FDMultiplexer& mplexer, const std::shared_ptr<DownstreamState>& ds, DNSName&& checkName, uint16_t checkType, uint16_t checkClass, uint16_t queryID): d_ds(ds), d_mplexer(mplexer), d_udpSocket(-1), d_checkName(std::move(checkName)), d_checkType(checkType), d_checkClass(checkClass), d_queryID(queryID) + { + } + + const std::shared_ptr<DownstreamState> d_ds; + FDMultiplexer& d_mplexer; + std::unique_ptr<TCPIOHandler> d_tcpHandler{nullptr}; + std::unique_ptr<IOStateHandler> d_ioState{nullptr}; + PacketBuffer d_buffer; + Socket d_udpSocket; + DNSName d_checkName; + struct timeval d_ttd{0, 0}; + size_t d_bufferPos{0}; + uint16_t d_checkType; + uint16_t d_checkClass; + uint16_t d_queryID; + TCPState d_tcpState{TCPState::WritingQuery}; + bool d_initial{false}; +}; + +static bool handleResponse(std::shared_ptr<HealthCheckData>& data) +{ + auto& ds = data->d_ds; + try { + if (data->d_buffer.size() < sizeof(dnsheader)) { + if (g_verboseHealthChecks) { + infolog("Invalid health check response of size %d from backend %s, expecting at least %d", data->d_buffer.size(), ds->getNameWithAddr(), sizeof(dnsheader)); + } + return false; + } + + const dnsheader * responseHeader = reinterpret_cast<const dnsheader*>(data->d_buffer.data()); + if (responseHeader->id != data->d_queryID) { + if (g_verboseHealthChecks) { + infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), data->d_queryID); + } + return false; + } + + if (!responseHeader->qr) { + if (g_verboseHealthChecks) { + infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr()); + } + return false; + } + + if (responseHeader->rcode == RCode::ServFail) { + if (g_verboseHealthChecks) { + infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr()); + } + return false; + } + + if (ds->d_config.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) { + if (g_verboseHealthChecks) { + infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused"); + } + return false; + } + + uint16_t receivedType; + uint16_t receivedClass; + DNSName receivedName(reinterpret_cast<const char*>(data->d_buffer.data()), data->d_buffer.size(), sizeof(dnsheader), false, &receivedType, &receivedClass); + + if (receivedName != data->d_checkName || receivedType != data->d_checkType || receivedClass != data->d_checkClass) { + if (g_verboseHealthChecks) { + infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), data->d_checkName.toLogString(), QType(receivedType).toString(), QType(data->d_checkType).toString(), receivedClass, data->d_checkClass); + } + return false; + } + } + catch(const std::exception& e) { + if (g_verboseHealthChecks) { + infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what()); + } + return false; + } + catch (...) { + if (g_verboseHealthChecks) { + infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr()); + } + return false; + } + + return true; +} + +class HealthCheckQuerySender : public TCPQuerySender +{ +public: + HealthCheckQuerySender(std::shared_ptr<HealthCheckData>& data): d_data(data) + { + } + + ~HealthCheckQuerySender() + { + } + + bool active() const override + { + return true; + } + + void handleResponse(const struct timeval& now, TCPResponse&& response) override + { + d_data->d_buffer = std::move(response.d_buffer); + d_data->d_ds->submitHealthCheckResult(d_data->d_initial, ::handleResponse(d_data)); + } + + void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override + { + throw std::runtime_error("Unexpected XFR reponse to a health check query"); + } + + void notifyIOError(InternalQueryState&& query, const struct timeval& now) override + { + d_data->d_ds->submitHealthCheckResult(d_data->d_initial, false); + } + +private: + std::shared_ptr<HealthCheckData> d_data; +}; + +static void healthCheckUDPCallback(int fd, FDMultiplexer::funcparam_t& param) +{ + auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param); + data->d_mplexer.removeReadFD(fd); + + ComboAddress from; + from.sin4.sin_family = data->d_ds->d_config.remote.sin4.sin_family; + auto fromlen = from.getSocklen(); + data->d_buffer.resize(512); + auto got = recvfrom(data->d_udpSocket.getHandle(), &data->d_buffer.at(0), data->d_buffer.size(), 0, reinterpret_cast<sockaddr *>(&from), &fromlen); + if (got < 0) { + int savederrno = errno; + if (g_verboseHealthChecks) { + infolog("Error receiving health check response from %s: %s", data->d_ds->d_config.remote.toStringWithPort(), stringerror(savederrno)); + } + data->d_ds->submitHealthCheckResult(data->d_initial, false); + return; + } + data->d_buffer.resize(static_cast<size_t>(got)); + + /* we are using a connected socket but hey.. */ + if (from != data->d_ds->d_config.remote) { + if (g_verboseHealthChecks) { + infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), data->d_ds->d_config.remote.toStringWithPort()); + } + data->d_ds->submitHealthCheckResult(data->d_initial, false); + return; + } + + data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data)); +} + +static void healthCheckTCPCallback(int fd, FDMultiplexer::funcparam_t& param) +{ + auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param); + + IOStateGuard ioGuard(data->d_ioState); + try { + auto ioState = IOState::Done; + + if (data->d_tcpState == HealthCheckData::TCPState::WritingQuery) { + ioState = data->d_tcpHandler->tryWrite(data->d_buffer, data->d_bufferPos, data->d_buffer.size()); + if (ioState == IOState::Done) { + data->d_bufferPos = 0; + data->d_buffer.resize(sizeof(uint16_t)); + data->d_tcpState = HealthCheckData::TCPState::ReadingResponseSize; + } + } + + if (data->d_tcpState == HealthCheckData::TCPState::ReadingResponseSize) { + ioState = data->d_tcpHandler->tryRead(data->d_buffer, data->d_bufferPos, data->d_buffer.size()); + if (ioState == IOState::Done) { + data->d_bufferPos = 0; + uint16_t responseSize; + memcpy(&responseSize, &data->d_buffer.at(0), sizeof(responseSize)); + data->d_buffer.resize(ntohs(responseSize)); + data->d_tcpState = HealthCheckData::TCPState::ReadingResponse; + } + } + + if (data->d_tcpState == HealthCheckData::TCPState::ReadingResponse) { + ioState = data->d_tcpHandler->tryRead(data->d_buffer, data->d_bufferPos, data->d_buffer.size()); + if (ioState == IOState::Done) { + data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data)); + } + } + + if (ioState == IOState::Done) { + /* remove us from the mplexer, we are done */ + data->d_ioState->update(ioState, healthCheckTCPCallback, data); + if (data->d_tcpHandler->isTLS()) { + try { + auto sessions = data->d_tcpHandler->getTLSSessions(); + if (!sessions.empty()) { + g_sessionCache.putSessions(data->d_ds->getID(), time(nullptr), std::move(sessions)); + } + } + catch (const std::exception& e) { + vinfolog("Unable to get a TLS session from the DoT healthcheck: %s", e.what()); + } + } + } + else { + data->d_ioState->update(ioState, healthCheckTCPCallback, data, data->d_ttd); + } + + /* the state has been updated, we can release the guard */ + ioGuard.release(); + } + catch (const std::exception& e) { + data->d_ds->submitHealthCheckResult(data->d_initial, false); + if (g_verboseHealthChecks) { + infolog("Error checking the health of backend %s: %s", data->d_ds->getNameWithAddr(), e.what()); + } + } + catch (...) { + data->d_ds->submitHealthCheckResult(data->d_initial, false); + if (g_verboseHealthChecks) { + infolog("Unknown exception while checking the health of backend %s", data->d_ds->getNameWithAddr()); + } + } +} + +bool queueHealthCheck(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initialCheck) +{ + try { + uint16_t queryID = dnsdist::getRandomDNSID(); + DNSName checkName = ds->d_config.checkName; + uint16_t checkType = ds->d_config.checkType.getCode(); + uint16_t checkClass = ds->d_config.checkClass; + dnsheader checkHeader; + memset(&checkHeader, 0, sizeof(checkHeader)); + + checkHeader.qdcount = htons(1); + checkHeader.id = queryID; + + checkHeader.rd = true; + if (ds->d_config.setCD) { + checkHeader.cd = true; + } + + if (ds->d_config.checkFunction) { + auto lock = g_lua.lock(); + auto ret = ds->d_config.checkFunction(checkName, checkType, checkClass, &checkHeader); + checkName = std::get<0>(ret); + checkType = std::get<1>(ret); + checkClass = std::get<2>(ret); + } + + PacketBuffer packet; + GenericDNSPacketWriter<PacketBuffer> dpw(packet, checkName, checkType, checkClass); + dnsheader* requestHeader = dpw.getHeader(); + *requestHeader = checkHeader; + + /* we need to compute that _before_ adding the proxy protocol payload */ + uint16_t packetSize = packet.size(); + std::string proxyProtocolPayload; + size_t proxyProtocolPayloadSize = 0; + if (ds->d_config.useProxyProtocol) { + proxyProtocolPayload = makeLocalProxyHeader(); + proxyProtocolPayloadSize = proxyProtocolPayload.size(); + if (!ds->isDoH()) { + packet.insert(packet.begin(), proxyProtocolPayload.begin(), proxyProtocolPayload.end()); + } + } + + Socket sock(ds->d_config.remote.sin4.sin_family, ds->doHealthcheckOverTCP() ? SOCK_STREAM : SOCK_DGRAM); + + sock.setNonBlocking(); + +#ifdef SO_BINDTODEVICE + if (!ds->d_config.sourceItfName.empty()) { + int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->d_config.sourceItfName.c_str(), ds->d_config.sourceItfName.length()); + if (res != 0 && g_verboseHealthChecks) { + infolog("Error setting SO_BINDTODEVICE on the health check socket for backend '%s': %s", ds->getNameWithAddr(), stringerror()); + } + } +#endif + + if (!IsAnyAddress(ds->d_config.sourceAddr)) { + if (ds->doHealthcheckOverTCP()) { + sock.setReuseAddr(); + } +#ifdef IP_BIND_ADDRESS_NO_PORT + if (ds->d_config.ipBindAddrNoPort) { + SSetsockopt(sock.getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1); + } +#endif + sock.bind(ds->d_config.sourceAddr, false); + } + + auto data = std::make_shared<HealthCheckData>(*mplexer, ds, std::move(checkName), checkType, checkClass, queryID); + data->d_initial = initialCheck; + + gettimeofday(&data->d_ttd, nullptr); + data->d_ttd.tv_sec += ds->d_config.checkTimeout / 1000; /* ms to seconds */ + data->d_ttd.tv_usec += (ds->d_config.checkTimeout % 1000) * 1000; /* remaining ms to us */ + normalizeTV(data->d_ttd); + + if (!ds->doHealthcheckOverTCP()) { + sock.connect(ds->d_config.remote); + data->d_udpSocket = std::move(sock); + ssize_t sent = udpClientSendRequestToBackend(ds, data->d_udpSocket.getHandle(), packet, true); + if (sent < 0) { + int ret = errno; + if (g_verboseHealthChecks) { + infolog("Error while sending a health check query (ID %d) to backend %s: %d", queryID, ds->getNameWithAddr(), ret); + } + return false; + } + + mplexer->addReadFD(data->d_udpSocket.getHandle(), &healthCheckUDPCallback, data, &data->d_ttd); + } + else if (ds->isDoH()) { + InternalQuery query(std::move(packet), InternalQueryState()); + query.d_proxyProtocolPayload = std::move(proxyProtocolPayload); + auto sender = std::shared_ptr<TCPQuerySender>(new HealthCheckQuerySender(data)); + if (!sendH2Query(ds, mplexer, sender, std::move(query), true)) { + data->d_ds->submitHealthCheckResult(data->d_initial, false); + } + } + else { + data->d_tcpHandler = std::make_unique<TCPIOHandler>(ds->d_config.d_tlsSubjectName, ds->d_config.d_tlsSubjectIsAddr, sock.releaseHandle(), timeval{ds->d_config.checkTimeout,0}, ds->d_tlsCtx); + data->d_ioState = std::make_unique<IOStateHandler>(*mplexer, data->d_tcpHandler->getDescriptor()); + if (ds->d_tlsCtx) { + try { + time_t now = time(nullptr); + auto tlsSession = g_sessionCache.getSession(ds->getID(), now); + if (tlsSession) { + data->d_tcpHandler->setTLSSession(tlsSession); + } + } + catch (const std::exception& e) { + vinfolog("Unable to restore a TLS session for the DoT healthcheck for backend %s: %s", ds->getNameWithAddr(), e.what()); + } + } + data->d_tcpHandler->tryConnect(ds->d_config.tcpFastOpen, ds->d_config.remote); + + const uint8_t sizeBytes[] = { static_cast<uint8_t>(packetSize / 256), static_cast<uint8_t>(packetSize % 256) }; + packet.insert(packet.begin() + proxyProtocolPayloadSize, sizeBytes, sizeBytes + 2); + data->d_buffer = std::move(packet); + + auto ioState = data->d_tcpHandler->tryWrite(data->d_buffer, data->d_bufferPos, data->d_buffer.size()); + if (ioState == IOState::Done) { + data->d_bufferPos = 0; + data->d_buffer.resize(sizeof(uint16_t)); + data->d_tcpState = HealthCheckData::TCPState::ReadingResponseSize; + ioState = IOState::NeedRead; + } + + data->d_ioState->update(ioState, healthCheckTCPCallback, data, data->d_ttd); + } + + return true; + } + catch (const std::exception& e) { + if (g_verboseHealthChecks) { + infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what()); + } + return false; + } + catch (...) { + if (g_verboseHealthChecks) { + infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr()); + } + return false; + } +} + +void handleQueuedHealthChecks(FDMultiplexer& mplexer, bool initial) +{ + while (mplexer.getWatchedFDCount(false) > 0 || mplexer.getWatchedFDCount(true) > 0) { + struct timeval now; + int ret = mplexer.run(&now, 100); + if (ret == -1) { + if (g_verboseHealthChecks) { + infolog("Error while waiting for the health check response from backends: %d", ret); + } + break; + } + if (ret > 0) { + /* we got at least one event other than a timeout */ + continue; + } + + handleH2Timeouts(mplexer, now); + + auto timeouts = mplexer.getTimeouts(now); + for (const auto& timeout : timeouts) { + if (timeout.second.type() != typeid(std::shared_ptr<HealthCheckData>)) { + continue; + } + + auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(timeout.second); + try { + /* UDP does not have an IO state, H2 is handled separately */ + if (data->d_ioState) { + data->d_ioState.reset(); + } + else { + mplexer.removeReadFD(timeout.first); + } + if (g_verboseHealthChecks) { + infolog("Timeout while waiting for the health check response (ID %d) from backend %s", data->d_queryID, data->d_ds->getNameWithAddr()); + } + + data->d_ds->submitHealthCheckResult(initial, false); + } + catch (const std::exception& e) { + if (g_verboseHealthChecks) { + infolog("Error while dealing with a timeout for the health check response (ID %d) from backend %s: %s", data->d_queryID, data->d_ds->getNameWithAddr(), e.what()); + } + } + catch (...) { + if (g_verboseHealthChecks) { + infolog("Error while dealing with a timeout for the health check response (ID %d) from backend %s", data->d_queryID, data->d_ds->getNameWithAddr()); + } + } + } + + timeouts = mplexer.getTimeouts(now, true); + for (const auto& timeout : timeouts) { + if (timeout.second.type() != typeid(std::shared_ptr<HealthCheckData>)) { + continue; + } + auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(timeout.second); + try { + /* UDP does not block while writing, H2 is handled separately */ + data->d_ioState.reset(); + if (g_verboseHealthChecks) { + infolog("Timeout while waiting for the health check response (ID %d) from backend %s", data->d_queryID, data->d_ds->getNameWithAddr()); + } + + data->d_ds->submitHealthCheckResult(initial, false); + } + catch (const std::exception& e) { + if (g_verboseHealthChecks) { + infolog("Error while dealing with a timeout for the health check response (ID %d) from backend %s: %s", data->d_queryID, data->d_ds->getNameWithAddr(), e.what()); + } + } + catch (...) { + if (g_verboseHealthChecks) { + infolog("Error while dealing with a timeout for the health check response (ID %d) from backend %s", data->d_queryID, data->d_ds->getNameWithAddr()); + } + } + } + } +} |