summaryrefslogtreecommitdiffstats
path: root/test-dnsdistrings_cc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test-dnsdistrings_cc.cc')
-rw-r--r--test-dnsdistrings_cc.cc290
1 files changed, 290 insertions, 0 deletions
diff --git a/test-dnsdistrings_cc.cc b/test-dnsdistrings_cc.cc
new file mode 100644
index 0000000..db665bb
--- /dev/null
+++ b/test-dnsdistrings_cc.cc
@@ -0,0 +1,290 @@
+
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+
+#include <thread>
+#include <boost/test/unit_test.hpp>
+
+#include "dnsdist-rings.hh"
+#include "gettime.hh"
+
+BOOST_AUTO_TEST_SUITE(dnsdistrings_cc)
+
+static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTries)
+{
+ Rings rings(maxEntries, numberOfShards, nbLockTries);
+ size_t entriesPerShard = maxEntries / numberOfShards;
+
+ BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards);
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), 0U);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U);
+ BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards());
+ for (const auto& shard : rings.d_shards) {
+ BOOST_CHECK(shard != nullptr);
+ }
+
+ dnsheader dh;
+ memset(&dh, 0, sizeof(dh));
+ DNSName qname("rings.powerdns.com.");
+ ComboAddress requestor1("192.0.2.1");
+ ComboAddress requestor2("192.0.2.2");
+ uint16_t qtype = QType::AAAA;
+ uint16_t size = 42;
+ dnsdist::Protocol protocol = dnsdist::Protocol::DoUDP;
+ dnsdist::Protocol outgoingProtocol = dnsdist::Protocol::DoUDP;
+ struct timespec now;
+ gettime(&now);
+
+ /* fill the query ring */
+ for (size_t idx = 0; idx < maxEntries; idx++) {
+ rings.insertQuery(now, requestor1, qname, qtype, size, dh, protocol);
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U);
+ for (const auto& shard : rings.d_shards) {
+ auto ring = shard->queryRing.lock();
+ BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+ for (const auto& entry : *ring) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort());
+ }
+ }
+
+ /* push enough queries to get rid of the existing ones */
+ for (size_t idx = 0; idx < maxEntries; idx++) {
+ rings.insertQuery(now, requestor2, qname, qtype, size, dh, protocol);
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U);
+ for (const auto& shard : rings.d_shards) {
+ auto ring = shard->queryRing.lock();
+ BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+ for (const auto& entry : *ring) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort());
+ }
+ }
+
+ ComboAddress server("192.0.2.42");
+ unsigned int latency = 100;
+
+ /* fill the response ring */
+ for (size_t idx = 0; idx < maxEntries; idx++) {
+ rings.insertResponse(now, requestor1, qname, qtype, latency, size, dh, server, outgoingProtocol);
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
+ for (const auto& shard : rings.d_shards) {
+ auto ring = shard->respRing.lock();
+ BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+ for (const auto& entry : *ring) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort());
+ BOOST_CHECK_EQUAL(entry.usec, latency);
+ BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+ }
+ }
+
+ /* push enough responses to get rid of the existing ones */
+ for (size_t idx = 0; idx < maxEntries; idx++) {
+ rings.insertResponse(now, requestor2, qname, qtype, latency, size, dh, server, outgoingProtocol);
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
+ for (const auto& shard : rings.d_shards) {
+ auto ring = shard->respRing.lock();
+ BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+ for (const auto& entry : *ring) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort());
+ BOOST_CHECK_EQUAL(entry.usec, latency);
+ BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+ }
+ }
+}
+
+
+BOOST_AUTO_TEST_CASE(test_Rings_Simple) {
+
+ /* 5 entries over 1 shard */
+ test_ring(5, 1, 0);
+ /* 500 entries over 10 shards */
+ test_ring(500, 10, 0);
+ /* 5000 entries over 100 shards, max 5 try-lock attempts */
+ test_ring(500, 100, 5);
+}
+
+static void ringReaderThread(Rings& rings, std::atomic<bool>& done, size_t numberOfEntries, uint16_t qtype)
+{
+ size_t iterationsDone = 0;
+
+ while (done == false) {
+ size_t numberOfQueries = 0;
+ size_t numberOfResponses = 0;
+
+ for (const auto& shard : rings.d_shards) {
+ {
+ auto rl = shard->queryRing.lock();
+ for(const auto& c : *rl) {
+ numberOfQueries++;
+ // BOOST_CHECK* is slow as hell..
+ if(c.qtype != qtype) {
+ cerr<<"Invalid query QType!"<<endl;
+ return;
+ }
+ }
+ }
+ {
+ auto rl = shard->respRing.lock();
+ for(const auto& c : *rl) {
+ if(c.qtype != qtype) {
+ cerr<<"Invalid response QType!"<<endl;
+ return;
+ }
+ numberOfResponses++;
+ }
+ }
+ }
+
+ BOOST_CHECK_LE(numberOfQueries, numberOfEntries);
+ BOOST_CHECK_LE(numberOfResponses, numberOfEntries);
+ iterationsDone++;
+ usleep(10000);
+ }
+
+ BOOST_CHECK_GT(iterationsDone, 1U);
+#if 0
+ cerr<<"Done "<<iterationsDone<<" reading iterations"<<endl;
+#endif
+}
+
+static void ringWriterThread(Rings& rings, size_t numberOfEntries, const Rings::Query& query, const Rings::Response& response)
+{
+ for (size_t idx = 0; idx < numberOfEntries; idx++) {
+ rings.insertQuery(query.when, query.requestor, query.name, query.qtype, query.size, query.dh, query.protocol);
+ rings.insertResponse(response.when, response.requestor, response.name, response.qtype, response.usec, response.size, response.dh, response.ds, response.protocol);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_Rings_Threaded) {
+ size_t numberOfEntries = 1000000;
+ size_t numberOfShards = 50;
+ size_t lockAttempts = 5;
+ size_t numberOfWriterThreads = 4;
+ size_t entriesPerShard = numberOfEntries / numberOfShards;
+
+ struct timespec now;
+ gettime(&now);
+ dnsheader dh;
+ memset(&dh, 0, sizeof(dh));
+ dh.id = htons(4242);
+ dh.qr = 0;
+ dh.tc = 0;
+ dh.rd = 0;
+ dh.rcode = 0;
+ dh.qdcount = htons(1);
+ DNSName qname("rings.powerdns.com.");
+ ComboAddress requestor("192.0.2.1");
+ ComboAddress server("192.0.2.42");
+ unsigned int latency = 100;
+ uint16_t qtype = QType::AAAA;
+ uint16_t size = 42;
+ dnsdist::Protocol protocol = dnsdist::Protocol::DoUDP;
+ dnsdist::Protocol outgoingProtocol = dnsdist::Protocol::DoUDP;
+
+ Rings rings(numberOfEntries, numberOfShards, lockAttempts, true);
+ Rings::Query query({requestor, qname, now, dh, size, qtype, protocol});
+ Rings::Response response({requestor, server, qname, now, dh, latency, size, qtype, outgoingProtocol});
+
+ std::atomic<bool> done(false);
+ std::vector<std::thread> writerThreads;
+ std::thread readerThread(ringReaderThread, std::ref(rings), std::ref(done), numberOfEntries, qtype);
+
+ /* we need to overcommit a bit to account for the fact that due to contention,
+ we might not perfectly distribute the entries over the shards,
+ so some of them might get full while other still have some place left */
+ size_t insertionsPerThread = (1.2 * numberOfEntries) / numberOfWriterThreads;
+ for (size_t idx = 0; idx < numberOfWriterThreads; idx++) {
+ writerThreads.push_back(std::thread(ringWriterThread, std::ref(rings), insertionsPerThread, query, response));
+ }
+
+ /* wait for the writers to be finished */
+ for (auto& t : writerThreads) {
+ t.join();
+ }
+
+ /* we can stop the reader thread now */
+ done = true;
+ readerThread.join();
+
+ BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards);
+ BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards());
+ BOOST_CHECK_LE(rings.getNumberOfQueryEntries(), numberOfEntries);
+ BOOST_CHECK_GT(rings.getNumberOfQueryEntries(), numberOfEntries * 0.75);
+ BOOST_WARN_GT(rings.getNumberOfQueryEntries(), numberOfEntries * 0.99);
+ BOOST_CHECK_LE(rings.getNumberOfResponseEntries(), numberOfEntries);
+ BOOST_CHECK_GT(rings.getNumberOfResponseEntries(), numberOfEntries * 0.75);
+ BOOST_WARN_GT(rings.getNumberOfResponseEntries(), numberOfEntries * 0.99);
+
+ size_t totalQueries = 0;
+ size_t totalResponses = 0;
+ for (const auto& shard : rings.d_shards) {
+ {
+ auto ring = shard->queryRing.lock();
+ BOOST_CHECK_LE(ring->size(), entriesPerShard);
+ // verify that the shard is not empty
+ BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1);
+ // this would be optimal
+ BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95);
+ totalQueries += ring->size();
+ for (const auto& entry : *ring) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+ }
+ }
+ {
+ auto ring = shard->respRing.lock();
+ BOOST_CHECK_LE(ring->size(), entriesPerShard);
+ // verify that the shard is not empty
+ BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1);
+ // this would be optimal
+ BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95);
+ totalResponses += ring->size();
+ for (const auto& entry : *ring) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+ BOOST_CHECK_EQUAL(entry.usec, latency);
+ BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+ }
+ }
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), totalResponses);
+#if 0
+ cerr<<"Done "<<(insertionsPerThread*numberOfWriterThreads)<<" insertions"<<endl;
+ cerr<<"Got "<<rings.d_deferredQueryInserts<<" deferred query insertions"<<endl;
+ cerr<<"Got "<<rings.d_blockingQueryInserts<<" blocking query insertions"<<endl;
+ cerr<<"Got "<<rings.d_deferredResponseInserts<<" deferred response insertions"<<endl;
+ cerr<<"Got "<<rings.d_blockingResponseInserts<<" blocking response insertions"<<endl;
+#endif
+}
+
+BOOST_AUTO_TEST_SUITE_END()