diff options
Diffstat (limited to 'src/seastar/tests/unit/fair_queue_test.cc')
-rw-r--r-- | src/seastar/tests/unit/fair_queue_test.cc | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/src/seastar/tests/unit/fair_queue_test.cc b/src/seastar/tests/unit/fair_queue_test.cc new file mode 100644 index 000000000..b2e65230a --- /dev/null +++ b/src/seastar/tests/unit/fair_queue_test.cc @@ -0,0 +1,449 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright (C) 2016 ScyllaDB + */ + +#include <seastar/core/thread.hh> +#include <seastar/testing/test_case.hh> +#include <seastar/testing/thread_test_case.hh> +#include <seastar/testing/test_runner.hh> +#include <seastar/core/sstring.hh> +#include <seastar/core/fair_queue.hh> +#include <seastar/core/do_with.hh> +#include <seastar/util/later.hh> +#include <seastar/core/sleep.hh> +#include <seastar/core/print.hh> +#include <boost/range/irange.hpp> +#include <chrono> + +using namespace seastar; +using namespace std::chrono_literals; + +struct request { + fair_queue_entry fqent; + std::function<void(request& req)> handle; + unsigned index; + + template <typename Func> + request(unsigned weight, unsigned index, Func&& h) + : fqent(fair_queue_ticket(weight, 0)) + , handle(std::move(h)) + , index(index) + {} + + void submit() { + handle(*this); + delete this; + } +}; + +class test_env { + fair_group _fg; + fair_queue _fq; + std::vector<int> _results; + std::vector<std::vector<std::exception_ptr>> _exceptions; + fair_queue::class_id _nr_classes = 0; + std::vector<request> _inflight; + + static fair_group::config fg_config(unsigned cap) { + fair_group::config cfg; + cfg.weight_rate = 1'000'000; + cfg.size_rate = std::numeric_limits<int>::max(); + cfg.rate_limit_duration = std::chrono::microseconds(cap); + return cfg; + } + + static fair_queue::config fq_config() { + fair_queue::config cfg; + cfg.tau = std::chrono::microseconds(50); + return cfg; + } + + void drain() { + do {} while (tick() != 0); + } +public: + test_env(unsigned capacity) + : _fg(fg_config(capacity)) + , _fq(_fg, fq_config()) + {} + + // As long as there is a request sitting in the queue, tick() will process + // at least one request. The only situation in which tick() will return nothing + // is if no requests were sent to the fair_queue (obviously). + // + // Because of this property, one useful use of tick() is to implement a drain() + // method (see above) in which all requests currently sent to the queue are drained + // before the queue is destroyed. + unsigned tick(unsigned n = 1) { + unsigned processed = 0; + _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); + _fq.dispatch_requests([] (fair_queue_entry& ent) { + boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); + }); + + for (unsigned i = 0; i < n; ++i) { + std::vector<request> curr; + curr.swap(_inflight); + + for (auto& req : curr) { + processed++; + _results[req.index]++; + _fq.notify_request_finished(req.fqent.ticket()); + } + + _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); + _fq.dispatch_requests([] (fair_queue_entry& ent) { + boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); + }); + } + return processed; + } + + ~test_env() { + drain(); + for (fair_queue::class_id id = 0; id < _nr_classes; id++) { + _fq.unregister_priority_class(id); + } + } + + size_t register_priority_class(uint32_t shares) { + _results.push_back(0); + _exceptions.push_back(std::vector<std::exception_ptr>()); + _fq.register_priority_class(_nr_classes, shares); + return _nr_classes++; + } + + void do_op(fair_queue::class_id id, unsigned weight) { + unsigned index = id; + auto req = std::make_unique<request>(weight, index, [this, index] (request& req) mutable noexcept { + try { + _inflight.push_back(std::move(req)); + } catch (...) { + auto eptr = std::current_exception(); + _exceptions[index].push_back(eptr); + _fq.notify_request_finished(req.fqent.ticket()); + } + }); + + _fq.queue(id, req->fqent); + req.release(); + } + + void update_shares(fair_queue::class_id id, uint32_t shares) { + _fq.update_shares_for_class(id, shares); + } + + void reset_results(unsigned index) { + _results[index] = 0; + } + + // Verify if the ratios are what we expect. Because we can't be sure about + // precise timing issues, we can always be off by some percentage. In simpler + // tests we really expect it to very low, but in more complex tests, with share + // changes, for instance, they can accumulate + // + // The ratios argument is the ratios towards the first class + void verify(sstring name, std::vector<unsigned> ratios, unsigned expected_error = 1) { + assert(ratios.size() == _results.size()); + auto str = name + ":"; + for (auto i = 0ul; i < _results.size(); ++i) { + str += format(" r[{:d}] = {:d}", i, _results[i]); + } + std::cout << str << std::endl; + for (auto i = 0ul; i < ratios.size(); ++i) { + int min_expected = ratios[i] * (_results[0] - expected_error); + int max_expected = ratios[i] * (_results[0] + expected_error); + BOOST_REQUIRE(_results[i] >= min_expected); + BOOST_REQUIRE(_results[i] <= max_expected); + BOOST_REQUIRE(_exceptions[i].size() == 0); + } + } +}; + +// Equal ratios. Expected equal results. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_equal_2classes) { + test_env env(1); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(10); + + for (int i = 0; i < 100; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + + yield().get(); + // allow half the requests in + env.tick(100); + env.verify("equal_2classes", {1, 1}); +} + +// Equal results, spread among 4 classes. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_equal_4classes) { + test_env env(1); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(10); + auto c = env.register_priority_class(10); + auto d = env.register_priority_class(10); + + for (int i = 0; i < 100; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + env.do_op(c, 1); + env.do_op(d, 1); + } + yield().get(); + // allow half the requests in + env.tick(200); + env.verify("equal_4classes", {1, 1, 1, 1}); +} + +// Class2 twice as powerful. Expected class2 to have 2 x more requests. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_different_shares) { + test_env env(1); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(20); + + for (int i = 0; i < 100; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + yield().get(); + // allow half the requests in + env.tick(100); + return env.verify("different_shares", {1, 2}); +} + +// Equal ratios, high capacity queue. Should still divide equally. +// +// Note that we sleep less because now more requests will be going through the +// queue. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_equal_hi_capacity_2classes) { + test_env env(10); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(10); + + for (int i = 0; i < 100; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + yield().get(); + + // queue has capacity 10, 10 x 10 = 100, allow half the requests in + env.tick(10); + env.verify("hi_capacity_2classes", {1, 1}); +} + +// Class2 twice as powerful, queue is high capacity. Still expected class2 to +// have 2 x more requests. +// +// Note that we sleep less because now more requests will be going through the +// queue. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_different_shares_hi_capacity) { + test_env env(10); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(20); + + for (int i = 0; i < 100; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + yield().get(); + // queue has capacity 10, 10 x 10 = 100, allow half the requests in + env.tick(10); + env.verify("different_shares_hi_capacity", {1, 2}); +} + +// Classes equally powerful. But Class1 issues twice as expensive requests. Expected Class2 to have 2 x more requests. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_different_weights) { + test_env env(2); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(10); + + for (int i = 0; i < 100; ++i) { + env.do_op(a, 2); + env.do_op(b, 1); + } + yield().get(); + // allow half the requests in + env.tick(100); + env.verify("different_weights", {1, 2}); +} + +// Class2 pushes many requests over. Right after, don't expect Class2 to be able to push anything else. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_dominant_queue) { + test_env env(1); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(10); + + for (int i = 0; i < 100; ++i) { + env.do_op(b, 1); + } + yield().get(); + + // consume all requests + env.tick(100); + // zero statistics. + env.reset_results(b); + for (int i = 0; i < 20; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + // allow half the requests in + env.tick(20); + env.verify("dominant_queue", {1, 0}); +} + +// Class2 pushes many requests at first. Right after, don't expect Class1 to be able to do the same +SEASTAR_THREAD_TEST_CASE(test_fair_queue_forgiving_queue) { + test_env env(1); + + // The fair_queue preemption logic allows one class to gain exclusive + // queue access for at most tau duration. Test queue configures the + // request rate to be 1/us and tau to be 50us, so after (re-)activation + // a queue can overrun its peer by at most 50 requests. + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(10); + + for (int i = 0; i < 100; ++i) { + env.do_op(a, 1); + } + yield().get(); + + // consume all requests + env.tick(100); + env.reset_results(a); + + for (int i = 0; i < 100; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + yield().get(); + + // allow half the requests in + env.tick(100); + // 50 requests should be passed from b, other 100 should be shared 1:1 + env.verify("forgiving_queue", {1, 3}, 2); +} + +// Classes push requests and then update swap their shares. In the end, should have executed +// the same number of requests. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_update_shares) { + test_env env(1); + + auto a = env.register_priority_class(20); + auto b = env.register_priority_class(10); + + for (int i = 0; i < 500; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + + yield().get(); + // allow 25% of the requests in + env.tick(250); + env.update_shares(a, 10); + env.update_shares(b, 20); + + yield().get(); + // allow 25% of the requests in + env.tick(250); + env.verify("update_shares", {1, 1}, 2); +} + +// Classes run for a longer period of time. Balance must be kept over many timer +// periods. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_longer_run) { + test_env env(1); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(10); + + for (int i = 0; i < 20000; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + // In total allow half the requests in, but do it over a + // long period of time, ticking slowly + for (int i = 0; i < 1000; ++i) { + sleep(1ms).get(); + env.tick(2); + } + env.verify("longer_run", {1, 1}, 2); +} + +// Classes run for a longer period of time. Proportional balance must be kept over many timer +// periods, despite unequal shares.. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_longer_run_different_shares) { + test_env env(1); + + auto a = env.register_priority_class(10); + auto b = env.register_priority_class(20); + + for (int i = 0; i < 20000; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + + // In total allow half the requests in, but do it over a + // long period of time, ticking slowly + for (int i = 0; i < 1000; ++i) { + sleep(1ms).get(); + env.tick(3); + } + env.verify("longer_run_different_shares", {1, 2}, 2); +} + +// Classes run for a random period of time. Equal operations expected. +SEASTAR_THREAD_TEST_CASE(test_fair_queue_random_run) { + test_env env(1); + + auto a = env.register_priority_class(1); + auto b = env.register_priority_class(1); + + std::default_random_engine& generator = testing::local_random_engine; + // multiples of 100usec - which is the approximate length of the request. We will + // put a minimum of 10. Below that, it is hard to guarantee anything. The maximum is + // about 50 seconds. + std::uniform_int_distribution<uint32_t> distribution(10, 500 * 1000); + auto reqs = distribution(generator); + + // Enough requests for the maximum run (half per queue, + leeway) + for (uint32_t i = 0; i < reqs; ++i) { + env.do_op(a, 1); + env.do_op(b, 1); + } + + yield().get(); + // In total allow half the requests in + env.tick(reqs); + + // Accept 5 % error. + auto expected_error = std::max(1, int(round(reqs * 0.05))); + env.verify(format("random_run ({:d} requests)", reqs), {1, 1}, expected_error); +} |