From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/test/rgw/bench_rgw_ratelimit.cc | 247 ++++++++++++++++++++++++++++++++++++ 1 file changed, 247 insertions(+) create mode 100644 src/test/rgw/bench_rgw_ratelimit.cc (limited to 'src/test/rgw/bench_rgw_ratelimit.cc') diff --git a/src/test/rgw/bench_rgw_ratelimit.cc b/src/test/rgw/bench_rgw_ratelimit.cc new file mode 100644 index 000000000..2bf7753ad --- /dev/null +++ b/src/test/rgw/bench_rgw_ratelimit.cc @@ -0,0 +1,247 @@ +#include "rgw_ratelimit.h" +#include "rgw_common.h" +#include "random" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +using Executor = boost::asio::io_context::executor_type; +std::uniform_int_distribution dist(0, 1); +std::random_device rd; +std::default_random_engine rng{rd()}; +std::uniform_int_distribution disttenant(2, 100000000); +struct client_info { + uint64_t accepted = 0; + uint64_t rejected = 0; + uint64_t ops = 0; + uint64_t bytes = 0; + uint64_t num_retries = 0; + std::string tenant; +}; + +struct parameters { + int64_t req_size = 1; + int64_t backend_bandwidth = 1; + size_t wait_between_retries_ms = 1; + int num_clients = 1; +}; +std::shared_ptr> ds = std::make_shared>(std::vector()); + +std::string method[2] = {"PUT", "GET"}; +void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx) +{ + auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); + boost::asio::steady_timer timer(ioctx); + int rw = 0; // will always use PUT method as there is no difference + std::string methodop(method[rw]); + auto req_size = params.req_size; + auto backend_bandwidth = params.backend_bandwidth; +// the 4 * 1024 * 1024 is the RGW default we are sending in a typical environment + while (req_size) { + if (req_size <= backend_bandwidth) { + while (req_size > 0) { + if(req_size > 4*1024*1024) { + ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info); + it.bytes += 4*1024*1024; + req_size = req_size - 4*1024*1024; + } + else { + ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info); + req_size = 0; + } + } + } else { + int64_t total_bytes = 0; + while (req_size > 0) { + if (req_size >= 4*1024*1024) { + if (total_bytes >= backend_bandwidth) + { + timer.expires_after(std::chrono::seconds(1)); + timer.async_wait(yield); + total_bytes = 0; + } + ratelimit->decrease_bytes(methodop.c_str(),it.tenant, 4*1024*1024, info); + it.bytes += 4*1024*1024; + req_size = req_size - 4*1024*1024; + total_bytes += 4*1024*1024; + } + else { + ratelimit->decrease_bytes(methodop.c_str(),it.tenant, req_size, info); + it.bytes += req_size; + total_bytes += req_size; + req_size = 0; + } + } + } + } +} +bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr ratelimit) +{ + boost::asio::io_context context; + auto time = ceph::coarse_real_clock::now(); + int rw = 0; // will always use PUT method as there is no different + std::string methodop = method[rw]; + auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); + bool to_fail = ratelimit->should_rate_limit(methodop.c_str(), it.tenant, time, &info); + if(to_fail) + { + it.rejected++; + it.ops++; + return true; + } + it.accepted++; + return false; +} +void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx) +{ + for (;;) + { + bool to_retry = simulate_request(it, info, ratelimit); + while (to_retry && to_run) + { + if (params.wait_between_retries_ms) + { + boost::asio::steady_timer timer(ioctx); + timer.expires_after(std::chrono::milliseconds(params.wait_between_retries_ms)); + timer.async_wait(ctx); + } + to_retry = simulate_request(it, info, ratelimit); + } + if (!to_run) + { + return; + } + simulate_transfer(it, &info, ratelimit, params, ctx, ioctx); + } +} +void simulate_clients(boost::asio::io_context& context, std::string tenant, const RGWRateLimitInfo& info, std::shared_ptr ratelimit, const parameters& params, bool& to_run) +{ + for (int i = 0; i < params.num_clients; i++) + { + auto& it = ds->emplace_back(client_info()); + it.tenant = tenant; + int x = ds->size() - 1; + spawn::spawn(context, + [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx) + { + auto& it = ds.get()->operator[](x); + simulate_client(it, info, ratelimit, params, ctx, to_run, context); + }); + } +} +int main(int argc, char **argv) +{ + int num_ratelimit_classes = 1; + int64_t ops_limit = 1; + int64_t bw_limit = 1; + int thread_count = 512; + int runtime = 60; + parameters params; + try + { + using namespace boost::program_options; + options_description desc{"Options"}; + desc.add_options() + ("help,h", "Help screen") + ("num_ratelimit_classes", value()->default_value(1), "how many ratelimit tenants") + ("request_size", value()->default_value(1), "what is the request size we are testing if 0, it will be randomized") + ("backend_bandwidth", value()->default_value(1), "what is the backend bandwidth, so there will be wait between decrease_bytes") + ("wait_between_retries_ms", value()->default_value(1), "time in seconds to wait between retries") + ("ops_limit", value()->default_value(1), "ops limit for the tenants") + ("bw_limit", value()->default_value(1), "bytes per second limit") + ("threads", value()->default_value(512), "server's threads count") + ("runtime", value()->default_value(60), "For how many seconds the test will run") + ("num_clients", value()->default_value(1), "number of clients per tenant to run"); + variables_map vm; + store(parse_command_line(argc, argv, desc), vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return EXIT_SUCCESS; + } + num_ratelimit_classes = vm["num_ratelimit_classes"].as(); + params.req_size = vm["request_size"].as(); + params.backend_bandwidth = vm["backend_bandwidth"].as(); + params.wait_between_retries_ms = vm["wait_between_retries_ms"].as(); + params.num_clients = vm["num_clients"].as(); + ops_limit = vm["ops_limit"].as(); + bw_limit = vm["bw_limit"].as(); + thread_count = vm["threads"].as(); + runtime = vm["runtime"].as(); + } + catch (const boost::program_options::error &ex) + { + std::cerr << ex.what() << std::endl; + return EXIT_FAILURE; + } + RGWRateLimitInfo info; + info.enabled = true; + info.max_read_bytes = bw_limit; + info.max_write_bytes = bw_limit; + info.max_read_ops = ops_limit; + info.max_write_ops = ops_limit; + std::unique_ptr cct = std::make_unique(CEPH_ENTITY_TYPE_ANY); + if (!g_ceph_context) + { + g_ceph_context = cct.get(); + } + std::shared_ptr ratelimit(new ActiveRateLimiter(g_ceph_context)); + ratelimit->start(); + std::vector threads; + using Executor = boost::asio::io_context::executor_type; + std::optional> work; + threads.reserve(thread_count); + boost::asio::io_context context; + boost::asio::io_context stopme; + work.emplace(boost::asio::make_work_guard(context)); + // server execution + for (int i = 0; i < thread_count; i++) { + threads.emplace_back([&]() noexcept { + context.run(); + }); + } + //client execution + bool to_run = true; + ds->reserve(num_ratelimit_classes*params.num_clients); + for (int i = 0; i < num_ratelimit_classes; i++) + { + unsigned long long tenantid = disttenant(rng); + std::string tenantuser = "uuser" + std::to_string(tenantid); + simulate_clients(context, tenantuser, info, ratelimit->get_active(), params, to_run); + } + boost::asio::steady_timer timer_runtime(stopme); + timer_runtime.expires_after(std::chrono::seconds(runtime)); + timer_runtime.wait(); + work.reset(); + context.stop(); + to_run = false; + + for (auto& i : threads) + { + i.join(); + } + std::unordered_map metrics_by_tenant; + for(auto& i : *ds.get()) + { + auto it = metrics_by_tenant.emplace(i.tenant, client_info()).first; + std::cout << i.accepted << std::endl; + it->second.accepted += i.accepted; + it->second.rejected += i.rejected; + } + // TODO sum the results by tenant + for(auto& i : metrics_by_tenant) + { + std::cout << "Tenant is: " << i.first << std::endl; + std::cout << "Simulator finished accepted sum : " << i.second.accepted << std::endl; + std::cout << "Simulator finished rejected sum : " << i.second.rejected << std::endl; + } + + return 0; +} -- cgit v1.2.3