// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2020 Red Hat, Inc. * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include #include #include #include #include #include #include #include #undef FMT_HEADER_ONLY #define FMT_HEADER_ONLY 1 #include #include #include #include #include "include/neorados/RADOS.hpp" #include "neorados/cls/fifo.h" namespace ba = boost::asio; namespace bs = boost::system; namespace bpo = boost::program_options; namespace cb = ceph::buffer; namespace R = neorados; namespace RCf = neorados::cls::fifo; namespace fifo = rados::cls::fifo; namespace s = spawn; namespace sc = std::chrono; namespace { static constexpr auto PUSH = 0x01 << 0; static constexpr auto PULL = 0x01 << 1; static constexpr auto BOTH = PUSH | PULL; static constexpr auto CLEAN = 0x01 << 2; static constexpr auto METADATA = 0x01 << 3; static constexpr auto PARTINFO = 0x01 << 4; static constexpr auto LIST = 0x01 << 5; struct benchmark { std::uint32_t entries = 0; sc::duration elapsed = 0ns; std::uint64_t ratio() const { return entries/std::max(elapsed, sc::duration(1ns)).count(); } benchmark() = default; benchmark(std::uint32_t entries, sc::duration elapsed) : entries(entries), elapsed(elapsed) {} }; benchmark push(RCf::FIFO& f, const std::uint32_t count, const std::uint32_t entry_size, const std::uint32_t push_entries, s::yield_context y) { cb::list entry; entry.push_back(cb::create_small_page_aligned(entry_size)); entry.zero(); std::vector entries(std::min(count, push_entries), entry); auto remaining = count; auto start = sc::steady_clock::now(); while (remaining) { if (entries.size() > remaining) { entries.resize(remaining); } f.push(entries, y); remaining -= entries.size(); } auto finish = sc::steady_clock::now(); return benchmark(count, (finish - start)); } benchmark pull(RCf::FIFO& f, const std::uint32_t count, const std::uint32_t pull_entries, s::yield_context y) { auto remaining = count; std::uint32_t got = 0; auto start = sc::steady_clock::now(); while (remaining) { auto [result, more] = f.list(std::min(remaining, pull_entries), std::nullopt, y); if (result.empty()) break; got += result.size(); remaining -= result.size(); f.trim(result.back().marker, false, y); } auto finish = sc::steady_clock::now(); return benchmark(got, (finish - start)); } void concurpull(const std::string& oid, const std::int64_t pool, const std::uint32_t count, const std::uint32_t pull_entries, std::promise notify, const bool* const exit_early) { ba::io_context c; benchmark bench; std::exception_ptr ex; s::spawn( c, [&](s::yield_context y) { try { auto r = R::RADOS::Builder{}.build(c, y); R::IOContext ioc(pool); auto f = RCf::FIFO::open(r, ioc, oid, y); auto remaining = count; std::uint32_t got = 0; auto start = sc::steady_clock::now(); while (remaining) { if (*exit_early) break; auto [result, more] = f->list(std::min(remaining, pull_entries), std::nullopt, y); if (result.empty()) { // We just keep going assuming they'll push more. continue; } got += result.size(); remaining -= result.size(); if (*exit_early) break; f->trim(result.back().marker, false, y); } auto finish = sc::steady_clock::now(); bench.entries = got; bench.elapsed = finish - start; } catch (const std::exception&) { ex = std::current_exception(); } }); c.run(); if (ex) { notify.set_exception(std::current_exception()); } else { notify.set_value(bench); } } void clean(R::RADOS& r, const R::IOContext& ioc, RCf::FIFO& f, s::yield_context y) { f.read_meta(y); const auto info = f.meta(); if (info.head_part_num > -1) { for (auto i = info.tail_part_num; i <= info.head_part_num; ++i) { R::WriteOp op; op.remove(); r.execute(info.part_oid(i), ioc, std::move(op), y); } } R::WriteOp op; op.remove(); r.execute(info.id, ioc, std::move(op), y); } } int main(int argc, char* argv[]) { const std::string_view prog(argv[0]); std::string command; try { std::uint32_t count = 0; std::string oid; std::string pool; std::uint32_t entry_size = 0; std::uint32_t push_entries = 0; std::uint32_t pull_entries = 0; std::uint64_t max_part_size = 0; std::uint64_t max_entry_size = 0; std::int64_t part_num = 0; std::string marker; bpo::options_description desc(fmt::format("{} options", prog)); desc.add_options() ("help", "show help") ("oid", bpo::value(&oid)->default_value("fifo"s), "the base oid for the fifo") ("pool", bpo::value(&pool)->default_value("fifo_benchmark"s), "the base oid for the fifo") ("count", bpo::value(&count)->default_value(1024), "total count of items") ("entry-size", bpo::value(&entry_size)->default_value(64), "size of entries to push") ("push-entries", bpo::value(&push_entries) ->default_value(512), "entries to push per call") ("max-part-size", bpo::value(&max_part_size) ->default_value(RCf::default_max_part_size), "maximum entry size allowed by FIFO") ("max-entry-size", bpo::value(&max_entry_size) ->default_value(RCf::default_max_entry_size), "maximum entry size allowed by FIFO") ("pull-entries", bpo::value(&pull_entries) ->default_value(512), "entries to pull per call") ("part-num", bpo::value(&part_num) ->default_value(-1), "partition number, -1 for head") ("marker", bpo::value(&marker), "marker to begin list") ("command", bpo::value(&command), "the operation to perform"); bpo::positional_options_description p; p.add("command", 1); bpo::variables_map vm; bpo::store(bpo::command_line_parser(argc, argv). options(desc).positional(p).run(), vm); bpo::notify(vm); if (vm.count("help")) { fmt::print(std::cout, "{}", desc); fmt::print(std::cout, "\n{} commands:\n", prog); fmt::print(std::cout, " push\t\t\t push entries into fifo\n"); fmt::print(std::cout, " pull\t\t\t retrieve and trim entries\n"); fmt::print(std::cout, " both\t\t\t both at once, in two threads\n"); fmt::print(std::cout, " metadata\t\t\t print metadata\n"); fmt::print(std::cout, " partinfo\t\t\t print metadata\n"); fmt::print(std::cout, " list\t\t\t list entries\n"); fmt::print(std::cout, " clean\t\t\t clean up\n"); return 0; } if (vm.find("command") == vm.end()) { fmt::print(std::cerr, "{}: a command is required\n", prog); return 1; } int op = 0; if (command == "push"s) { op = PUSH; } else if (command == "pull"s) { op = PULL; } else if (command == "both"s) { op = BOTH; } else if (command == "clean"s) { op = CLEAN; } else if (command == "metadata"s) { op = METADATA; } else if (command == "partinfo"s) { op = PARTINFO; } else if (command == "list"s) { op = LIST; } else { fmt::print(std::cerr, "{}: {} is not a valid command\n", prog, command); return 1; } if (!(op & PULL) && !vm["pull-entries"].defaulted()) { fmt::print(std::cerr, "{}: pull-entries is only meaningful when pulling\n", prog); return 1; } if (!(op & PUSH)) { for (const auto& p : { "entry-size"s, "push-entries"s, "max-part-size"s, "max-entry-size"s }) { if (!vm[p].defaulted()) { fmt::print(std::cerr, "{}: {} is only meaningful when pushing\n", prog, p); return 1; } } } if (!(op & BOTH) && !(op & LIST) && !vm["count"].defaulted()) { fmt::print(std::cerr, "{}: count is only meaningful when pulling, pushing, both, or listing\n", prog); return 1; } if (!(op & PARTINFO) && !vm["part-num"].defaulted()) { fmt::print(std::cerr, "{}: part-num is only meaningful when getting part info\n", prog); return 1; } if (count == 0) { fmt::print(std::cerr, "{}: count must be nonzero\n", prog); return 1; } if ((op & PULL) && (pull_entries == 0)) { fmt::print(std::cerr, "{}: pull-entries must be nonzero\n", prog); return 1; } if (!(op & LIST) && vm.count("marker") > 0) { fmt::print(std::cerr, "{}: marker is only meaningful when listing\n", prog); return 1; } if (op & PUSH) { if (entry_size == 0) { fmt::print(std::cerr, "{}: entry-size must be nonzero\n", prog); return 1; } if (push_entries== 0) { fmt::print(std::cerr, "{}: push-entries must be nonzero\n", prog); return 1; } if (max_entry_size == 0) { fmt::print(std::cerr, "{}: max-entry-size must be nonzero\n", prog); return 1; } if (max_part_size == 0) { fmt::print(std::cerr, "{}: max-part-size must be nonzero\n", prog); return 1; } if (entry_size > max_entry_size) { fmt::print(std::cerr, "{}: entry-size may not be greater than max-entry-size\n", prog); return 1; } if (max_entry_size >= max_part_size) { fmt::print(std::cerr, "{}: max-entry-size may be less than max-part-size\n", prog); return 1; } } ba::io_context c; benchmark pushmark, pullmark; fifo::info meta; fifo::part_header partinfo; bool more = false; std::vector entries; s::spawn( c, [&](s::yield_context y) { auto r = R::RADOS::Builder{}.build(c, y); bs::error_code ec; std::int64_t pid; pid = r.lookup_pool(pool, y[ec]); if (ec) { r.create_pool(pool, std::nullopt, y); pid = r.lookup_pool(pool, y); } const R::IOContext ioc(pid); auto f = RCf::FIFO::create(r, ioc, oid, y, std::nullopt, std::nullopt, false, max_part_size, max_entry_size); switch (op) { case PUSH: pushmark = push(*f, count, entry_size, push_entries, y); break; case PULL: pullmark = pull(*f, count, pull_entries, y); break; case METADATA: meta = f->meta(); break; case PARTINFO: meta = f->meta(); if (part_num == -1) { part_num = meta.head_part_num; } partinfo = f->get_part_info(part_num, y); break; case LIST: if (vm.count("marker") == 0) { std::tie(entries, more) = f->list(count, std::nullopt, y); } else { std::tie(entries, more) = f->list(count, marker, y); } break; case BOTH: { std::promise notify; bool exit_early = false; auto notifier = notify.get_future(); std::thread t(concurpull, oid, pid, count, pull_entries, std::move(notify), &exit_early); t.detach(); try { pushmark = push(*f, count, entry_size, push_entries, y); } catch (const std::exception&) { exit_early = true; notifier.wait(); throw; } pullmark = notifier.get(); } } if (op & CLEAN) clean(r, ioc, *f, y); }); c.run(); if (op & PUSH) { fmt::print("Pushed {} in {} at {}/s\n", pushmark.entries, pushmark.elapsed, pushmark.ratio()); } if (op & PULL) { if (pullmark.entries == count) { fmt::print(std::cout, "Pulled {} in {} at {}/s\n", pullmark.entries, pullmark.elapsed, pullmark.ratio()); } else { fmt::print(std::cout, "Pulled {} (of {} requested), in {} at {}/s\n", pullmark.entries, count, pullmark.elapsed, pullmark.ratio()); } } if (op & METADATA) { fmt::print(std::cout, "Metadata: [{}]\n", meta); } if (op & PARTINFO) { fmt::print(std::cout, "Info for partition {}: [{}]\n", part_num, partinfo); } if (op & LIST) { for (const auto& entry : entries) { fmt::print(std::cout, "{}\t{}\n", entry.marker, entry.mtime); } if (more) { fmt::print(std::cout, "..."); } } } catch (const std::exception& e) { if (command.empty()) { fmt::print(std::cerr, "{}: {}\n", prog, e.what()); } else { fmt::print(std::cerr, "{}: {}: {}\n", prog, command, e.what()); } return 1; } return 0; }