diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/test/cls_fifo/bench_cls_fifo.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/16.2.11+ds.tar.xz ceph-upstream/16.2.11+ds.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/cls_fifo/bench_cls_fifo.cc')
-rw-r--r-- | src/test/cls_fifo/bench_cls_fifo.cc | 462 |
1 files changed, 462 insertions, 0 deletions
diff --git a/src/test/cls_fifo/bench_cls_fifo.cc b/src/test/cls_fifo/bench_cls_fifo.cc new file mode 100644 index 000000000..df390cd31 --- /dev/null +++ b/src/test/cls_fifo/bench_cls_fifo.cc @@ -0,0 +1,462 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include <cerrno> +#include <chrono> +#include <cstdint> +#include <exception> +#include <future> +#include <iostream> +#include <string_view> + +#include <boost/asio.hpp> +#include <boost/system/error_code.hpp> +#include <boost/program_options.hpp> + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include <fmt/chrono.h> +#include <fmt/format.h> +#include <fmt/ostream.h> + +#include <spawn/spawn.hpp> + +#include "include/neorados/RADOS.hpp" + +#include "neorados/cls/fifo.h" + +namespace ba = boost::asio; +namespace bs = boost::system; +namespace bpo = boost::program_options; +namespace cb = ceph::buffer; +namespace R = neorados; +namespace RCf = neorados::cls::fifo; +namespace fifo = rados::cls::fifo; +namespace s = spawn; +namespace sc = std::chrono; + +namespace { +static constexpr auto PUSH = 0x01 << 0; +static constexpr auto PULL = 0x01 << 1; +static constexpr auto BOTH = PUSH | PULL; +static constexpr auto CLEAN = 0x01 << 2; +static constexpr auto METADATA = 0x01 << 3; +static constexpr auto PARTINFO = 0x01 << 4; +static constexpr auto LIST = 0x01 << 5; + +struct benchmark { + std::uint32_t entries = 0; + sc::duration<double> elapsed = 0ns; + + std::uint64_t ratio() const { + return entries/std::max(elapsed, + sc::duration<double>(1ns)).count(); + } + benchmark() = default; + benchmark(std::uint32_t entries, sc::duration<double> elapsed) + : entries(entries), elapsed(elapsed) {} +}; + +benchmark push(RCf::FIFO& f, const std::uint32_t count, + const std::uint32_t entry_size, const std::uint32_t push_entries, + s::yield_context y) +{ + cb::list entry; + entry.push_back(cb::create_small_page_aligned(entry_size)); + entry.zero(); + + std::vector entries(std::min(count, push_entries), entry); + auto remaining = count; + auto start = sc::steady_clock::now(); + while (remaining) { + if (entries.size() > remaining) { + entries.resize(remaining); + } + f.push(entries, y); + remaining -= entries.size(); + } + auto finish = sc::steady_clock::now(); + return benchmark(count, (finish - start)); +} + +benchmark pull(RCf::FIFO& f, const std::uint32_t count, + const std::uint32_t pull_entries, s::yield_context y) +{ + auto remaining = count; + std::uint32_t got = 0; + + auto start = sc::steady_clock::now(); + while (remaining) { + auto [result, more] = f.list(std::min(remaining, pull_entries), + std::nullopt, y); + if (result.empty()) + break; + got += result.size(); + remaining -= result.size(); + f.trim(result.back().marker, false, y); + } + auto finish = sc::steady_clock::now(); + return benchmark(got, (finish - start)); +} + +void concurpull(const std::string& oid, const std::int64_t pool, + const std::uint32_t count, const std::uint32_t pull_entries, + std::promise<benchmark> notify, const bool* const exit_early) +{ + ba::io_context c; + benchmark bench; + std::exception_ptr ex; + s::spawn( + c, + [&](s::yield_context y) { + try { + auto r = R::RADOS::Builder{}.build(c, y); + R::IOContext ioc(pool); + auto f = RCf::FIFO::open(r, ioc, oid, y); + auto remaining = count; + std::uint32_t got = 0; + + auto start = sc::steady_clock::now(); + while (remaining) { + if (*exit_early) break; + auto [result, more] = + f->list(std::min(remaining, pull_entries), std::nullopt, y); + if (result.empty()) { + // We just keep going assuming they'll push more. + continue; + } + got += result.size(); + remaining -= result.size(); + if (*exit_early) break; + f->trim(result.back().marker, false, y); + } + auto finish = sc::steady_clock::now(); + bench.entries = got; + bench.elapsed = finish - start; + } catch (const std::exception&) { + ex = std::current_exception(); + } + }); + c.run(); + if (ex) { + notify.set_exception(std::current_exception()); + } else { + notify.set_value(bench); + } +} + +void clean(R::RADOS& r, const R::IOContext& ioc, RCf::FIFO& f, + s::yield_context y) +{ + f.read_meta(y); + const auto info = f.meta(); + if (info.head_part_num > -1) { + for (auto i = info.tail_part_num; i <= info.head_part_num; ++i) { + R::WriteOp op; + op.remove(); + r.execute(info.part_oid(i), ioc, std::move(op), y); + } + } + R::WriteOp op; + op.remove(); + r.execute(info.id, ioc, std::move(op), y); +} +} + +int main(int argc, char* argv[]) +{ + const std::string_view prog(argv[0]); + std::string command; + try { + std::uint32_t count = 0; + std::string oid; + std::string pool; + std::uint32_t entry_size = 0; + std::uint32_t push_entries = 0; + std::uint32_t pull_entries = 0; + std::uint64_t max_part_size = 0; + std::uint64_t max_entry_size = 0; + std::int64_t part_num = 0; + std::string marker; + + bpo::options_description desc(fmt::format("{} options", prog)); + desc.add_options() + ("help", "show help") + ("oid", bpo::value<std::string>(&oid)->default_value("fifo"s), + "the base oid for the fifo") + ("pool", bpo::value<std::string>(&pool)->default_value("fifo_benchmark"s), + "the base oid for the fifo") + ("count", bpo::value<std::uint32_t>(&count)->default_value(1024), + "total count of items") + ("entry-size", bpo::value<std::uint32_t>(&entry_size)->default_value(64), + "size of entries to push") + ("push-entries", + bpo::value<std::uint32_t>(&push_entries) + ->default_value(512), "entries to push per call") + ("max-part-size", bpo::value<std::uint64_t>(&max_part_size) + ->default_value(RCf::default_max_part_size), + "maximum entry size allowed by FIFO") + ("max-entry-size", bpo::value<std::uint64_t>(&max_entry_size) + ->default_value(RCf::default_max_entry_size), + "maximum entry size allowed by FIFO") + ("pull-entries", + bpo::value<uint32_t>(&pull_entries) + ->default_value(512), "entries to pull per call") + ("part-num", + bpo::value<int64_t>(&part_num) + ->default_value(-1), "partition number, -1 for head") + ("marker", bpo::value<std::string>(&marker), "marker to begin list") + ("command", bpo::value<std::string>(&command), + "the operation to perform"); + + bpo::positional_options_description p; + p.add("command", 1); + + bpo::variables_map vm; + + bpo::store(bpo::command_line_parser(argc, argv). + options(desc).positional(p).run(), vm); + + bpo::notify(vm); + + if (vm.count("help")) { + fmt::print(std::cout, "{}", desc); + fmt::print(std::cout, "\n{} commands:\n", prog); + fmt::print(std::cout, " push\t\t\t push entries into fifo\n"); + fmt::print(std::cout, " pull\t\t\t retrieve and trim entries\n"); + fmt::print(std::cout, " both\t\t\t both at once, in two threads\n"); + fmt::print(std::cout, " metadata\t\t\t print metadata\n"); + fmt::print(std::cout, " partinfo\t\t\t print metadata\n"); + fmt::print(std::cout, " list\t\t\t list entries\n"); + fmt::print(std::cout, " clean\t\t\t clean up\n"); + return 0; + } + + + if (vm.find("command") == vm.end()) { + fmt::print(std::cerr, "{}: a command is required\n", prog); + return 1; + } + + int op = 0; + if (command == "push"s) { + op = PUSH; + } else if (command == "pull"s) { + op = PULL; + } else if (command == "both"s) { + op = BOTH; + } else if (command == "clean"s) { + op = CLEAN; + } else if (command == "metadata"s) { + op = METADATA; + } else if (command == "partinfo"s) { + op = PARTINFO; + } else if (command == "list"s) { + op = LIST; + } else { + fmt::print(std::cerr, "{}: {} is not a valid command\n", + prog, command); + return 1; + } + + if (!(op & PULL) && !vm["pull-entries"].defaulted()) { + fmt::print(std::cerr, "{}: pull-entries is only meaningful when pulling\n", + prog); + return 1; + } + + if (!(op & PUSH)) { + for (const auto& p : { "entry-size"s, "push-entries"s, "max-part-size"s, + "max-entry-size"s }) { + if (!vm[p].defaulted()) { + fmt::print(std::cerr, "{}: {} is only meaningful when pushing\n", + prog, p); + return 1; + } + } + } + + if (!(op & BOTH) && !(op & LIST) && !vm["count"].defaulted()) { + fmt::print(std::cerr, "{}: count is only meaningful when pulling, pushing, both, or listing\n", + prog); + return 1; + } + + if (!(op & PARTINFO) && !vm["part-num"].defaulted()) { + fmt::print(std::cerr, "{}: part-num is only meaningful when getting part info\n", + prog); + return 1; + } + + if (count == 0) { + fmt::print(std::cerr, "{}: count must be nonzero\n", prog); + return 1; + } + + if ((op & PULL) && (pull_entries == 0)) { + fmt::print(std::cerr, + "{}: pull-entries must be nonzero\n", prog); + return 1; + } + + if (!(op & LIST) && vm.count("marker") > 0) { + fmt::print(std::cerr, "{}: marker is only meaningful when listing\n", + prog); + return 1; + } + + if (op & PUSH) { + if (entry_size == 0) { + fmt::print(std::cerr, "{}: entry-size must be nonzero\n", prog); + return 1; + } + if (push_entries== 0) { + fmt::print(std::cerr, "{}: push-entries must be nonzero\n", prog); + return 1; + } + if (max_entry_size == 0) { + fmt::print(std::cerr, "{}: max-entry-size must be nonzero\n", prog); + return 1; + } + if (max_part_size == 0) { + fmt::print(std::cerr, "{}: max-part-size must be nonzero\n", prog); + return 1; + } + if (entry_size > max_entry_size) { + fmt::print(std::cerr, + "{}: entry-size may not be greater than max-entry-size\n", + prog); + return 1; + } + if (max_entry_size >= max_part_size) { + fmt::print(std::cerr, + "{}: max-entry-size may be less than max-part-size\n", + prog); + return 1; + } + } + + ba::io_context c; + benchmark pushmark, pullmark; + fifo::info meta; + fifo::part_header partinfo; + bool more = false; + std::vector<RCf::list_entry> entries; + s::spawn( + c, + [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + bs::error_code ec; + std::int64_t pid; + pid = r.lookup_pool(pool, y[ec]); + if (ec) { + r.create_pool(pool, std::nullopt, y); + pid = r.lookup_pool(pool, y); + } + const R::IOContext ioc(pid); + auto f = RCf::FIFO::create(r, ioc, oid, y, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + + switch (op) { + case PUSH: + pushmark = push(*f, count, entry_size, push_entries, y); + break; + + case PULL: + pullmark = pull(*f, count, pull_entries, y); + break; + + case METADATA: + meta = f->meta(); + break; + + case PARTINFO: + meta = f->meta(); + if (part_num == -1) { + part_num = meta.head_part_num; + } + partinfo = f->get_part_info(part_num, y); + break; + + case LIST: + if (vm.count("marker") == 0) { + std::tie(entries, more) = f->list(count, std::nullopt, y); + } else { + std::tie(entries, more) = f->list(count, marker, y); + } + break; + + case BOTH: { + std::promise<benchmark> notify; + bool exit_early = false; + + auto notifier = notify.get_future(); + std::thread t(concurpull, oid, pid, count, pull_entries, + std::move(notify), &exit_early); + t.detach(); + try { + pushmark = push(*f, count, entry_size, push_entries, y); + } catch (const std::exception&) { + exit_early = true; + notifier.wait(); + throw; + } + pullmark = notifier.get(); + } + } + + if (op & CLEAN) + clean(r, ioc, *f, y); + }); + c.run(); + if (op & PUSH) { + fmt::print("Pushed {} in {} at {}/s\n", + pushmark.entries, pushmark.elapsed, pushmark.ratio()); + } + if (op & PULL) { + if (pullmark.entries == count) { + fmt::print(std::cout, "Pulled {} in {} at {}/s\n", + pullmark.entries, pullmark.elapsed, pullmark.ratio()); + } else { + fmt::print(std::cout, "Pulled {} (of {} requested), in {} at {}/s\n", + pullmark.entries, count, pullmark.elapsed, pullmark.ratio()); + } + } + if (op & METADATA) { + fmt::print(std::cout, "Metadata: [{}]\n", meta); + } + if (op & PARTINFO) { + fmt::print(std::cout, "Info for partition {}: [{}]\n", part_num, partinfo); + } + if (op & LIST) { + for (const auto& entry : entries) { + fmt::print(std::cout, "{}\t{}\n", entry.marker, entry.mtime); + } + if (more) { + fmt::print(std::cout, "..."); + } + } + } catch (const std::exception& e) { + if (command.empty()) { + fmt::print(std::cerr, "{}: {}\n", prog, e.what()); + } else { + fmt::print(std::cerr, "{}: {}: {}\n", prog, command, e.what()); + } + return 1; + } + + return 0; +} |