summaryrefslogtreecommitdiffstats
path: root/dnsdist-backend.cc
diff options
context:
space:
mode:
Diffstat (limited to 'dnsdist-backend.cc')
-rw-r--r--dnsdist-backend.cc291
1 files changed, 291 insertions, 0 deletions
diff --git a/dnsdist-backend.cc b/dnsdist-backend.cc
new file mode 100644
index 0000000..2af756f
--- /dev/null
+++ b/dnsdist-backend.cc
@@ -0,0 +1,291 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "dnsdist.hh"
+#include "dnsdist-nghttp2.hh"
+#include "dnsdist-tcp.hh"
+#include "dolog.hh"
+
+
+bool DownstreamState::passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq)
+{
+ if (d_dohPath.empty()) {
+ return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq));
+ }
+ else {
+ return g_dohClientThreads && g_dohClientThreads->passCrossProtocolQueryToThread(std::move(cpq));
+ }
+}
+
+bool DownstreamState::reconnect()
+{
+ std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
+ if (!tl.owns_lock() || isStopped()) {
+ /* we are already reconnecting or stopped anyway */
+ return false;
+ }
+
+ connected = false;
+ for (auto& fd : sockets) {
+ if (fd != -1) {
+ if (sockets.size() > 1) {
+ (*mplexer.lock())->removeReadFD(fd);
+ }
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ fd = -1;
+ }
+ if (!IsAnyAddress(remote)) {
+ fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
+ if (!IsAnyAddress(sourceAddr)) {
+ SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
+ if (!sourceItfName.empty()) {
+#ifdef SO_BINDTODEVICE
+ int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
+ if (res != 0) {
+ infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
+ }
+#endif
+ }
+
+ SBind(fd, sourceAddr);
+ }
+ try {
+ SConnect(fd, remote);
+ if (sockets.size() > 1) {
+ (*mplexer.lock())->addReadFD(fd, [](int, boost::any) {});
+ }
+ connected = true;
+ }
+ catch(const std::runtime_error& error) {
+ infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+ connected = false;
+ break;
+ }
+ }
+ }
+
+ /* if at least one (re-)connection failed, close all sockets */
+ if (!connected) {
+ for (auto& fd : sockets) {
+ if (fd != -1) {
+ if (sockets.size() > 1) {
+ try {
+ (*mplexer.lock())->removeReadFD(fd);
+ }
+ catch (const FDMultiplexerException& e) {
+ /* some sockets might not have been added to the multiplexer
+ yet, that's fine */
+ }
+ }
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ fd = -1;
+ }
+ }
+ }
+
+ return connected;
+}
+
+void DownstreamState::stop()
+{
+ d_stopped = true;
+
+ {
+ std::lock_guard<std::mutex> tl(connectLock);
+ auto slock = mplexer.lock();
+
+ for (auto& fd : sockets) {
+ if (fd != -1) {
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ }
+ }
+ }
+}
+
+void DownstreamState::hash()
+{
+ vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
+ auto w = weight;
+ auto lockedHashes = hashes.write_lock();
+ lockedHashes->clear();
+ lockedHashes->reserve(w);
+ while (w > 0) {
+ std::string uuid = boost::str(boost::format("%s-%d") % id % w);
+ unsigned int wshash = burtleCI(reinterpret_cast<const unsigned char*>(uuid.c_str()), uuid.size(), g_hashperturb);
+ lockedHashes->push_back(wshash);
+ --w;
+ }
+ std::sort(lockedHashes->begin(), lockedHashes->end());
+ hashesComputed = true;
+}
+
+void DownstreamState::setId(const boost::uuids::uuid& newId)
+{
+ id = newId;
+ // compute hashes only if already done
+ if (hashesComputed) {
+ hash();
+ }
+}
+
+void DownstreamState::setWeight(int newWeight)
+{
+ if (newWeight < 1) {
+ errlog("Error setting server's weight: downstream weight value must be greater than 0.");
+ return ;
+ }
+ weight = newWeight;
+ if (hashesComputed) {
+ hash();
+ }
+}
+
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_): remote(remote_), sourceAddr(sourceAddr_), sourceItfName(sourceItfName_), name(remote_.toStringWithPort()), nameWithAddr(remote_.toStringWithPort()), sourceItf(sourceItf_)
+{
+ id = getUniqueID();
+ threadStarted.clear();
+
+ sw.start();
+}
+
+void DownstreamState::connectUDPSockets(size_t numberOfSockets)
+{
+ idStates.resize(g_maxOutstanding);
+ sockets.resize(numberOfSockets);
+
+ if (sockets.size() > 1) {
+ *(mplexer.lock()) = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+ }
+
+ for (auto& fd : sockets) {
+ fd = -1;
+ }
+
+ reconnect();
+}
+
+DownstreamState::~DownstreamState()
+{
+ for (auto& fd : sockets) {
+ if (fd >= 0) {
+ close(fd);
+ fd = -1;
+ }
+ }
+
+ // we need to either detach or join the thread before it
+ // is destroyed
+ if (threadStarted.test_and_set()) {
+ tid.detach();
+ }
+}
+
+void DownstreamState::incCurrentConnectionsCount()
+{
+ auto currentConnectionsCount = ++tcpCurrentConnections;
+ if (currentConnectionsCount > tcpMaxConcurrentConnections) {
+ tcpMaxConcurrentConnections.store(currentConnectionsCount);
+ }
+}
+
+size_t ServerPool::countServers(bool upOnly)
+{
+ size_t count = 0;
+ auto servers = d_servers.read_lock();
+ for (const auto& server : **servers) {
+ if (!upOnly || std::get<1>(server)->isUp() ) {
+ count++;
+ }
+ }
+ return count;
+}
+
+size_t ServerPool::poolLoad()
+{
+ size_t load = 0;
+ auto servers = d_servers.read_lock();
+ for (const auto& server : **servers) {
+ size_t serverOutstanding = std::get<1>(server)->outstanding.load();
+ load += serverOutstanding;
+ }
+ return load;
+}
+
+const std::shared_ptr<ServerPolicy::NumberedServerVector> ServerPool::getServers()
+{
+ std::shared_ptr<ServerPolicy::NumberedServerVector> result;
+ {
+ result = *(d_servers.read_lock());
+ }
+ return result;
+}
+
+void ServerPool::addServer(shared_ptr<DownstreamState>& server)
+{
+ auto servers = d_servers.write_lock();
+ /* we can't update the content of the shared pointer directly even when holding the lock,
+ as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
+ unsigned int count = static_cast<unsigned int>((*servers)->size());
+ auto newServers = std::make_shared<ServerPolicy::NumberedServerVector>(*(*servers));
+ newServers->emplace_back(++count, server);
+ /* we need to reorder based on the server 'order' */
+ std::stable_sort(newServers->begin(), newServers->end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
+ return a.second->order < b.second->order;
+ });
+ /* and now we need to renumber for Lua (custom policies) */
+ size_t idx = 1;
+ for (auto& serv : *newServers) {
+ serv.first = idx++;
+ }
+ *servers = std::move(newServers);
+}
+
+void ServerPool::removeServer(shared_ptr<DownstreamState>& server)
+{
+ auto servers = d_servers.write_lock();
+ /* we can't update the content of the shared pointer directly even when holding the lock,
+ as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
+ auto newServers = std::make_shared<ServerPolicy::NumberedServerVector>(*(*servers));
+ size_t idx = 1;
+ bool found = false;
+ for (auto it = newServers->begin(); it != newServers->end();) {
+ if (found) {
+ /* we need to renumber the servers placed
+ after the removed one, for Lua (custom policies) */
+ it->first = idx++;
+ it++;
+ }
+ else if (it->second == server) {
+ it = newServers->erase(it);
+ found = true;
+ } else {
+ idx++;
+ it++;
+ }
+ }
+ *servers = std::move(newServers);
+}