#define BOOST_TEST_DYN_LINK #define BOOST_TEST_NO_MAIN #include #include #include "dnsdist-rings.hh" #include "gettime.hh" BOOST_AUTO_TEST_SUITE(dnsdistrings_cc) static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTries) { Rings rings(maxEntries, numberOfShards, nbLockTries); size_t entriesPerShard = maxEntries / numberOfShards; BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards); BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), 0U); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U); BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards()); for (const auto& shard : rings.d_shards) { BOOST_CHECK(shard != nullptr); } dnsheader dh; memset(&dh, 0, sizeof(dh)); DNSName qname("rings.powerdns.com."); ComboAddress requestor1("192.0.2.1"); ComboAddress requestor2("192.0.2.2"); uint16_t qtype = QType::AAAA; uint16_t size = 42; dnsdist::Protocol protocol = dnsdist::Protocol::DoUDP; dnsdist::Protocol outgoingProtocol = dnsdist::Protocol::DoUDP; struct timespec now; gettime(&now); /* fill the query ring */ for (size_t idx = 0; idx < maxEntries; idx++) { rings.insertQuery(now, requestor1, qname, qtype, size, dh, protocol); } BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U); for (const auto& shard : rings.d_shards) { auto ring = shard->queryRing.lock(); BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort()); } } /* push enough queries to get rid of the existing ones */ for (size_t idx = 0; idx < maxEntries; idx++) { rings.insertQuery(now, requestor2, qname, qtype, size, dh, protocol); } BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U); for (const auto& shard : rings.d_shards) { auto ring = shard->queryRing.lock(); BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort()); } } ComboAddress server("192.0.2.42"); unsigned int latency = 100; /* fill the response ring */ for (size_t idx = 0; idx < maxEntries; idx++) { rings.insertResponse(now, requestor1, qname, qtype, latency, size, dh, server, outgoingProtocol); } BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries); for (const auto& shard : rings.d_shards) { auto ring = shard->respRing.lock(); BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort()); BOOST_CHECK_EQUAL(entry.usec, latency); BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); } } /* push enough responses to get rid of the existing ones */ for (size_t idx = 0; idx < maxEntries; idx++) { rings.insertResponse(now, requestor2, qname, qtype, latency, size, dh, server, outgoingProtocol); } BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries); for (const auto& shard : rings.d_shards) { auto ring = shard->respRing.lock(); BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort()); BOOST_CHECK_EQUAL(entry.usec, latency); BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); } } } BOOST_AUTO_TEST_CASE(test_Rings_Simple) { /* 5 entries over 1 shard */ test_ring(5, 1, 0); /* 500 entries over 10 shards */ test_ring(500, 10, 0); /* 5000 entries over 100 shards, max 5 try-lock attempts */ test_ring(500, 100, 5); } static void ringReaderThread(Rings& rings, std::atomic& done, size_t numberOfEntries, uint16_t qtype) { size_t iterationsDone = 0; while (done == false) { size_t numberOfQueries = 0; size_t numberOfResponses = 0; for (const auto& shard : rings.d_shards) { { auto rl = shard->queryRing.lock(); for(const auto& c : *rl) { numberOfQueries++; // BOOST_CHECK* is slow as hell.. if(c.qtype != qtype) { cerr<<"Invalid query QType!"<respRing.lock(); for(const auto& c : *rl) { if(c.qtype != qtype) { cerr<<"Invalid response QType!"< done(false); std::vector writerThreads; std::thread readerThread(ringReaderThread, std::ref(rings), std::ref(done), numberOfEntries, qtype); /* we need to overcommit a bit to account for the fact that due to contention, we might not perfectly distribute the entries over the shards, so some of them might get full while other still have some place left */ size_t insertionsPerThread = (1.2 * numberOfEntries) / numberOfWriterThreads; for (size_t idx = 0; idx < numberOfWriterThreads; idx++) { writerThreads.push_back(std::thread(ringWriterThread, std::ref(rings), insertionsPerThread, query, response)); } /* wait for the writers to be finished */ for (auto& t : writerThreads) { t.join(); } /* we can stop the reader thread now */ done = true; readerThread.join(); BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards); BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards()); BOOST_CHECK_LE(rings.getNumberOfQueryEntries(), numberOfEntries); BOOST_CHECK_GT(rings.getNumberOfQueryEntries(), numberOfEntries * 0.75); BOOST_WARN_GT(rings.getNumberOfQueryEntries(), numberOfEntries * 0.99); BOOST_CHECK_LE(rings.getNumberOfResponseEntries(), numberOfEntries); BOOST_CHECK_GT(rings.getNumberOfResponseEntries(), numberOfEntries * 0.75); BOOST_WARN_GT(rings.getNumberOfResponseEntries(), numberOfEntries * 0.99); size_t totalQueries = 0; size_t totalResponses = 0; for (const auto& shard : rings.d_shards) { { auto ring = shard->queryRing.lock(); BOOST_CHECK_LE(ring->size(), entriesPerShard); // verify that the shard is not empty BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1); // this would be optimal BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95); totalQueries += ring->size(); for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); } } { auto ring = shard->respRing.lock(); BOOST_CHECK_LE(ring->size(), entriesPerShard); // verify that the shard is not empty BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1); // this would be optimal BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95); totalResponses += ring->size(); for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); BOOST_CHECK_EQUAL(entry.usec, latency); BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); } } } BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), totalResponses); #if 0 cerr<<"Done "<<(insertionsPerThread*numberOfWriterThreads)<<" insertions"<