summaryrefslogtreecommitdiffstats
path: root/dnsdist-rings.cc
diff options
context:
space:
mode:
Diffstat (limited to 'dnsdist-rings.cc')
-rw-r--r--dnsdist-rings.cc216
1 files changed, 216 insertions, 0 deletions
diff --git a/dnsdist-rings.cc b/dnsdist-rings.cc
new file mode 100644
index 0000000..5485b33
--- /dev/null
+++ b/dnsdist-rings.cc
@@ -0,0 +1,216 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <fstream>
+
+#include "dnsdist-rings.hh"
+
+void Rings::setCapacity(size_t newCapacity, size_t numberOfShards)
+{
+ if (d_initialized) {
+ throw std::runtime_error("Rings::setCapacity() should not be called once the rings have been initialized");
+ }
+ d_capacity = newCapacity;
+ d_numberOfShards = numberOfShards;
+}
+
+void Rings::init()
+{
+ if (d_initialized.exchange(true)) {
+ throw std::runtime_error("Rings::init() should only be called once");
+ }
+
+ if (d_numberOfShards <= 1) {
+ d_nbLockTries = 0;
+ }
+
+ d_shards.resize(d_numberOfShards);
+
+ /* resize all the rings */
+ for (auto& shard : d_shards) {
+ shard = std::make_unique<Shard>();
+ if (shouldRecordQueries()) {
+ shard->queryRing.lock()->set_capacity(d_capacity / d_numberOfShards);
+ }
+ if (shouldRecordResponses()) {
+ shard->respRing.lock()->set_capacity(d_capacity / d_numberOfShards);
+ }
+ }
+
+ /* we just recreated the shards so they are now empty */
+ d_nbQueryEntries = 0;
+ d_nbResponseEntries = 0;
+}
+
+void Rings::setNumberOfLockRetries(size_t retries)
+{
+ if (d_numberOfShards <= 1) {
+ d_nbLockTries = 0;
+ } else {
+ d_nbLockTries = retries;
+ }
+}
+
+void Rings::setRecordQueries(bool record)
+{
+ d_recordQueries = record;
+}
+
+void Rings::setRecordResponses(bool record)
+{
+ d_recordResponses = record;
+}
+
+size_t Rings::numDistinctRequestors()
+{
+ std::set<ComboAddress, ComboAddress::addressOnlyLessThan> s;
+ for (const auto& shard : d_shards) {
+ auto rl = shard->queryRing.lock();
+ for (const auto& q : *rl) {
+ s.insert(q.requestor);
+ }
+ }
+ return s.size();
+}
+
+std::unordered_map<int, vector<boost::variant<string,double>>> Rings::getTopBandwidth(unsigned int numentries)
+{
+ map<ComboAddress, unsigned int, ComboAddress::addressOnlyLessThan> counts;
+ uint64_t total=0;
+ for (const auto& shard : d_shards) {
+ {
+ auto rl = shard->queryRing.lock();
+ for(const auto& q : *rl) {
+ counts[q.requestor] += q.size;
+ total+=q.size;
+ }
+ }
+ {
+ auto rl = shard->respRing.lock();
+ for(const auto& r : *rl) {
+ counts[r.requestor] += r.size;
+ total+=r.size;
+ }
+ }
+ }
+
+ typedef vector<pair<unsigned int, ComboAddress>> ret_t;
+ ret_t rcounts;
+ rcounts.reserve(counts.size());
+ for(const auto& p : counts)
+ rcounts.push_back({p.second, p.first});
+ numentries = rcounts.size() < numentries ? rcounts.size() : numentries;
+ partial_sort(rcounts.begin(), rcounts.begin()+numentries, rcounts.end(), [](const ret_t::value_type&a, const ret_t::value_type&b)
+ {
+ return(b.first < a.first);
+ });
+ std::unordered_map<int, vector<boost::variant<string,double>>> ret;
+ uint64_t rest = 0;
+ int count = 1;
+ for(const auto& rc : rcounts) {
+ if (count == static_cast<int>(numentries + 1)) {
+ rest+=rc.first;
+ }
+ else {
+ ret.insert({count++, {rc.second.toString(), rc.first, 100.0*rc.first/total}});
+ }
+ }
+
+ if (total > 0) {
+ ret.insert({count, {"Rest", rest, 100.0*rest/total}});
+ }
+ else {
+ ret.insert({count, {"Rest", rest, 100.0 }});
+ }
+
+ return ret;
+}
+
+size_t Rings::loadFromFile(const std::string& filepath, const struct timespec& now)
+{
+ ifstream ifs(filepath);
+ if (!ifs) {
+ throw std::runtime_error("unable to open the file at " + filepath);
+ }
+
+ size_t inserted = 0;
+ string line;
+ dnsheader dh;
+ memset(&dh, 0, sizeof(dh));
+
+ while (std::getline(ifs, line)) {
+ boost::trim_right_if(line, boost::is_any_of(" \r\n\x1a"));
+ boost::trim_left(line);
+ bool isResponse = false;
+ vector<string> parts;
+ stringtok(parts, line, " \t,");
+
+ if (parts.size() == 8) {
+ }
+ else if (parts.size() >= 11 && parts.size() <= 13) {
+ isResponse = true;
+ }
+ else {
+ cerr<<"skipping line with "<<parts.size()<<"parts: "<<line<<endl;
+ continue;
+ }
+
+ size_t idx = 0;
+ vector<string> timeStr;
+ stringtok(timeStr, parts.at(idx++), ".");
+ if (timeStr.size() != 2) {
+ cerr<<"skipping invalid time "<<parts.at(0)<<endl;
+ continue;
+ }
+
+ struct timespec when;
+ try {
+ when.tv_sec = now.tv_sec + std::stoi(timeStr.at(0));
+ when.tv_nsec = now.tv_nsec + std::stoi(timeStr.at(1)) * 100 * 1000 * 1000;
+ }
+ catch (const std::exception& e) {
+ cerr<<"error parsing time "<<parts.at(idx-1)<<" from line "<<line<<endl;
+ continue;
+ }
+
+ ComboAddress from(parts.at(idx++));
+ ComboAddress to;
+ dnsdist::Protocol protocol(parts.at(idx++));
+ if (isResponse) {
+ to = ComboAddress(parts.at(idx++));
+ }
+ /* skip ID */
+ idx++;
+ DNSName qname(parts.at(idx++));
+ QType qtype(QType::chartocode(parts.at(idx++).c_str()));
+
+ if (isResponse) {
+ insertResponse(when, from, qname, qtype.getCode(), 0, 0, dh, to, protocol);
+ }
+ else {
+ insertQuery(when, from, qname, qtype.getCode(), 0, dh, protocol);
+ }
+ ++inserted;
+ }
+
+ return inserted;
+}