diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/test/cls_fifo | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/cls_fifo')
-rw-r--r-- | src/test/cls_fifo/CMakeLists.txt | 34 | ||||
-rw-r--r-- | src/test/cls_fifo/bench_cls_fifo.cc | 462 | ||||
-rw-r--r-- | src/test/cls_fifo/test_cls_fifo.cc | 739 |
3 files changed, 1235 insertions, 0 deletions
diff --git a/src/test/cls_fifo/CMakeLists.txt b/src/test/cls_fifo/CMakeLists.txt new file mode 100644 index 000000000..3abf7634a --- /dev/null +++ b/src/test/cls_fifo/CMakeLists.txt @@ -0,0 +1,34 @@ +add_executable(ceph_test_cls_fifo + test_cls_fifo.cc + ) +target_link_libraries(ceph_test_cls_fifo + neorados_cls_fifo + libneorados + spawn + ${UNITTEST_LIBS} + ${BLKID_LIBRARIES} + ${CMAKE_DL_LIBS} + ${CRYPTO_LIBS} + ${EXTRALIBS} + neoradostest-support + ) +install(TARGETS + ceph_test_cls_fifo + DESTINATION ${CMAKE_INSTALL_BINDIR}) + +add_executable(ceph_bench_cls_fifo + bench_cls_fifo.cc + ) +target_link_libraries(ceph_bench_cls_fifo + neorados_cls_fifo + libneorados + spawn + ${UNITTEST_LIBS} + ${BLKID_LIBRARIES} + ${CMAKE_DL_LIBS} + ${CRYPTO_LIBS} + ${EXTRALIBS} + ) +install(TARGETS + ceph_test_cls_fifo + DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/cls_fifo/bench_cls_fifo.cc b/src/test/cls_fifo/bench_cls_fifo.cc new file mode 100644 index 000000000..df390cd31 --- /dev/null +++ b/src/test/cls_fifo/bench_cls_fifo.cc @@ -0,0 +1,462 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include <cerrno> +#include <chrono> +#include <cstdint> +#include <exception> +#include <future> +#include <iostream> +#include <string_view> + +#include <boost/asio.hpp> +#include <boost/system/error_code.hpp> +#include <boost/program_options.hpp> + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include <fmt/chrono.h> +#include <fmt/format.h> +#include <fmt/ostream.h> + +#include <spawn/spawn.hpp> + +#include "include/neorados/RADOS.hpp" + +#include "neorados/cls/fifo.h" + +namespace ba = boost::asio; +namespace bs = boost::system; +namespace bpo = boost::program_options; +namespace cb = ceph::buffer; +namespace R = neorados; +namespace RCf = neorados::cls::fifo; +namespace fifo = rados::cls::fifo; +namespace s = spawn; +namespace sc = std::chrono; + +namespace { +static constexpr auto PUSH = 0x01 << 0; +static constexpr auto PULL = 0x01 << 1; +static constexpr auto BOTH = PUSH | PULL; +static constexpr auto CLEAN = 0x01 << 2; +static constexpr auto METADATA = 0x01 << 3; +static constexpr auto PARTINFO = 0x01 << 4; +static constexpr auto LIST = 0x01 << 5; + +struct benchmark { + std::uint32_t entries = 0; + sc::duration<double> elapsed = 0ns; + + std::uint64_t ratio() const { + return entries/std::max(elapsed, + sc::duration<double>(1ns)).count(); + } + benchmark() = default; + benchmark(std::uint32_t entries, sc::duration<double> elapsed) + : entries(entries), elapsed(elapsed) {} +}; + +benchmark push(RCf::FIFO& f, const std::uint32_t count, + const std::uint32_t entry_size, const std::uint32_t push_entries, + s::yield_context y) +{ + cb::list entry; + entry.push_back(cb::create_small_page_aligned(entry_size)); + entry.zero(); + + std::vector entries(std::min(count, push_entries), entry); + auto remaining = count; + auto start = sc::steady_clock::now(); + while (remaining) { + if (entries.size() > remaining) { + entries.resize(remaining); + } + f.push(entries, y); + remaining -= entries.size(); + } + auto finish = sc::steady_clock::now(); + return benchmark(count, (finish - start)); +} + +benchmark pull(RCf::FIFO& f, const std::uint32_t count, + const std::uint32_t pull_entries, s::yield_context y) +{ + auto remaining = count; + std::uint32_t got = 0; + + auto start = sc::steady_clock::now(); + while (remaining) { + auto [result, more] = f.list(std::min(remaining, pull_entries), + std::nullopt, y); + if (result.empty()) + break; + got += result.size(); + remaining -= result.size(); + f.trim(result.back().marker, false, y); + } + auto finish = sc::steady_clock::now(); + return benchmark(got, (finish - start)); +} + +void concurpull(const std::string& oid, const std::int64_t pool, + const std::uint32_t count, const std::uint32_t pull_entries, + std::promise<benchmark> notify, const bool* const exit_early) +{ + ba::io_context c; + benchmark bench; + std::exception_ptr ex; + s::spawn( + c, + [&](s::yield_context y) { + try { + auto r = R::RADOS::Builder{}.build(c, y); + R::IOContext ioc(pool); + auto f = RCf::FIFO::open(r, ioc, oid, y); + auto remaining = count; + std::uint32_t got = 0; + + auto start = sc::steady_clock::now(); + while (remaining) { + if (*exit_early) break; + auto [result, more] = + f->list(std::min(remaining, pull_entries), std::nullopt, y); + if (result.empty()) { + // We just keep going assuming they'll push more. + continue; + } + got += result.size(); + remaining -= result.size(); + if (*exit_early) break; + f->trim(result.back().marker, false, y); + } + auto finish = sc::steady_clock::now(); + bench.entries = got; + bench.elapsed = finish - start; + } catch (const std::exception&) { + ex = std::current_exception(); + } + }); + c.run(); + if (ex) { + notify.set_exception(std::current_exception()); + } else { + notify.set_value(bench); + } +} + +void clean(R::RADOS& r, const R::IOContext& ioc, RCf::FIFO& f, + s::yield_context y) +{ + f.read_meta(y); + const auto info = f.meta(); + if (info.head_part_num > -1) { + for (auto i = info.tail_part_num; i <= info.head_part_num; ++i) { + R::WriteOp op; + op.remove(); + r.execute(info.part_oid(i), ioc, std::move(op), y); + } + } + R::WriteOp op; + op.remove(); + r.execute(info.id, ioc, std::move(op), y); +} +} + +int main(int argc, char* argv[]) +{ + const std::string_view prog(argv[0]); + std::string command; + try { + std::uint32_t count = 0; + std::string oid; + std::string pool; + std::uint32_t entry_size = 0; + std::uint32_t push_entries = 0; + std::uint32_t pull_entries = 0; + std::uint64_t max_part_size = 0; + std::uint64_t max_entry_size = 0; + std::int64_t part_num = 0; + std::string marker; + + bpo::options_description desc(fmt::format("{} options", prog)); + desc.add_options() + ("help", "show help") + ("oid", bpo::value<std::string>(&oid)->default_value("fifo"s), + "the base oid for the fifo") + ("pool", bpo::value<std::string>(&pool)->default_value("fifo_benchmark"s), + "the base oid for the fifo") + ("count", bpo::value<std::uint32_t>(&count)->default_value(1024), + "total count of items") + ("entry-size", bpo::value<std::uint32_t>(&entry_size)->default_value(64), + "size of entries to push") + ("push-entries", + bpo::value<std::uint32_t>(&push_entries) + ->default_value(512), "entries to push per call") + ("max-part-size", bpo::value<std::uint64_t>(&max_part_size) + ->default_value(RCf::default_max_part_size), + "maximum entry size allowed by FIFO") + ("max-entry-size", bpo::value<std::uint64_t>(&max_entry_size) + ->default_value(RCf::default_max_entry_size), + "maximum entry size allowed by FIFO") + ("pull-entries", + bpo::value<uint32_t>(&pull_entries) + ->default_value(512), "entries to pull per call") + ("part-num", + bpo::value<int64_t>(&part_num) + ->default_value(-1), "partition number, -1 for head") + ("marker", bpo::value<std::string>(&marker), "marker to begin list") + ("command", bpo::value<std::string>(&command), + "the operation to perform"); + + bpo::positional_options_description p; + p.add("command", 1); + + bpo::variables_map vm; + + bpo::store(bpo::command_line_parser(argc, argv). + options(desc).positional(p).run(), vm); + + bpo::notify(vm); + + if (vm.count("help")) { + fmt::print(std::cout, "{}", desc); + fmt::print(std::cout, "\n{} commands:\n", prog); + fmt::print(std::cout, " push\t\t\t push entries into fifo\n"); + fmt::print(std::cout, " pull\t\t\t retrieve and trim entries\n"); + fmt::print(std::cout, " both\t\t\t both at once, in two threads\n"); + fmt::print(std::cout, " metadata\t\t\t print metadata\n"); + fmt::print(std::cout, " partinfo\t\t\t print metadata\n"); + fmt::print(std::cout, " list\t\t\t list entries\n"); + fmt::print(std::cout, " clean\t\t\t clean up\n"); + return 0; + } + + + if (vm.find("command") == vm.end()) { + fmt::print(std::cerr, "{}: a command is required\n", prog); + return 1; + } + + int op = 0; + if (command == "push"s) { + op = PUSH; + } else if (command == "pull"s) { + op = PULL; + } else if (command == "both"s) { + op = BOTH; + } else if (command == "clean"s) { + op = CLEAN; + } else if (command == "metadata"s) { + op = METADATA; + } else if (command == "partinfo"s) { + op = PARTINFO; + } else if (command == "list"s) { + op = LIST; + } else { + fmt::print(std::cerr, "{}: {} is not a valid command\n", + prog, command); + return 1; + } + + if (!(op & PULL) && !vm["pull-entries"].defaulted()) { + fmt::print(std::cerr, "{}: pull-entries is only meaningful when pulling\n", + prog); + return 1; + } + + if (!(op & PUSH)) { + for (const auto& p : { "entry-size"s, "push-entries"s, "max-part-size"s, + "max-entry-size"s }) { + if (!vm[p].defaulted()) { + fmt::print(std::cerr, "{}: {} is only meaningful when pushing\n", + prog, p); + return 1; + } + } + } + + if (!(op & BOTH) && !(op & LIST) && !vm["count"].defaulted()) { + fmt::print(std::cerr, "{}: count is only meaningful when pulling, pushing, both, or listing\n", + prog); + return 1; + } + + if (!(op & PARTINFO) && !vm["part-num"].defaulted()) { + fmt::print(std::cerr, "{}: part-num is only meaningful when getting part info\n", + prog); + return 1; + } + + if (count == 0) { + fmt::print(std::cerr, "{}: count must be nonzero\n", prog); + return 1; + } + + if ((op & PULL) && (pull_entries == 0)) { + fmt::print(std::cerr, + "{}: pull-entries must be nonzero\n", prog); + return 1; + } + + if (!(op & LIST) && vm.count("marker") > 0) { + fmt::print(std::cerr, "{}: marker is only meaningful when listing\n", + prog); + return 1; + } + + if (op & PUSH) { + if (entry_size == 0) { + fmt::print(std::cerr, "{}: entry-size must be nonzero\n", prog); + return 1; + } + if (push_entries== 0) { + fmt::print(std::cerr, "{}: push-entries must be nonzero\n", prog); + return 1; + } + if (max_entry_size == 0) { + fmt::print(std::cerr, "{}: max-entry-size must be nonzero\n", prog); + return 1; + } + if (max_part_size == 0) { + fmt::print(std::cerr, "{}: max-part-size must be nonzero\n", prog); + return 1; + } + if (entry_size > max_entry_size) { + fmt::print(std::cerr, + "{}: entry-size may not be greater than max-entry-size\n", + prog); + return 1; + } + if (max_entry_size >= max_part_size) { + fmt::print(std::cerr, + "{}: max-entry-size may be less than max-part-size\n", + prog); + return 1; + } + } + + ba::io_context c; + benchmark pushmark, pullmark; + fifo::info meta; + fifo::part_header partinfo; + bool more = false; + std::vector<RCf::list_entry> entries; + s::spawn( + c, + [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + bs::error_code ec; + std::int64_t pid; + pid = r.lookup_pool(pool, y[ec]); + if (ec) { + r.create_pool(pool, std::nullopt, y); + pid = r.lookup_pool(pool, y); + } + const R::IOContext ioc(pid); + auto f = RCf::FIFO::create(r, ioc, oid, y, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + + switch (op) { + case PUSH: + pushmark = push(*f, count, entry_size, push_entries, y); + break; + + case PULL: + pullmark = pull(*f, count, pull_entries, y); + break; + + case METADATA: + meta = f->meta(); + break; + + case PARTINFO: + meta = f->meta(); + if (part_num == -1) { + part_num = meta.head_part_num; + } + partinfo = f->get_part_info(part_num, y); + break; + + case LIST: + if (vm.count("marker") == 0) { + std::tie(entries, more) = f->list(count, std::nullopt, y); + } else { + std::tie(entries, more) = f->list(count, marker, y); + } + break; + + case BOTH: { + std::promise<benchmark> notify; + bool exit_early = false; + + auto notifier = notify.get_future(); + std::thread t(concurpull, oid, pid, count, pull_entries, + std::move(notify), &exit_early); + t.detach(); + try { + pushmark = push(*f, count, entry_size, push_entries, y); + } catch (const std::exception&) { + exit_early = true; + notifier.wait(); + throw; + } + pullmark = notifier.get(); + } + } + + if (op & CLEAN) + clean(r, ioc, *f, y); + }); + c.run(); + if (op & PUSH) { + fmt::print("Pushed {} in {} at {}/s\n", + pushmark.entries, pushmark.elapsed, pushmark.ratio()); + } + if (op & PULL) { + if (pullmark.entries == count) { + fmt::print(std::cout, "Pulled {} in {} at {}/s\n", + pullmark.entries, pullmark.elapsed, pullmark.ratio()); + } else { + fmt::print(std::cout, "Pulled {} (of {} requested), in {} at {}/s\n", + pullmark.entries, count, pullmark.elapsed, pullmark.ratio()); + } + } + if (op & METADATA) { + fmt::print(std::cout, "Metadata: [{}]\n", meta); + } + if (op & PARTINFO) { + fmt::print(std::cout, "Info for partition {}: [{}]\n", part_num, partinfo); + } + if (op & LIST) { + for (const auto& entry : entries) { + fmt::print(std::cout, "{}\t{}\n", entry.marker, entry.mtime); + } + if (more) { + fmt::print(std::cout, "..."); + } + } + } catch (const std::exception& e) { + if (command.empty()) { + fmt::print(std::cerr, "{}: {}\n", prog, e.what()); + } else { + fmt::print(std::cerr, "{}: {}: {}\n", prog, command, e.what()); + } + return 1; + } + + return 0; +} diff --git a/src/test/cls_fifo/test_cls_fifo.cc b/src/test/cls_fifo/test_cls_fifo.cc new file mode 100644 index 000000000..484248c0c --- /dev/null +++ b/src/test/cls_fifo/test_cls_fifo.cc @@ -0,0 +1,739 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <cerrno> +#include <iostream> +#include <string_view> + +#include <boost/asio.hpp> +#include <boost/system/error_code.hpp> + +#include <spawn/spawn.hpp> + +#include "include/scope_guard.h" +#include "include/types.h" +#include "include/neorados/RADOS.hpp" + +#include "cls/fifo/cls_fifo_ops.h" + +#include "neorados/cls/fifo.h" + +#include "test/neorados/common_tests.h" + +#include "gtest/gtest.h" + +namespace R = neorados; +namespace ba = boost::asio; +namespace bs = boost::system; +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; +namespace RCf = neorados::cls::fifo; +namespace s = spawn; + +namespace { +void fifo_create(R::RADOS& r, + const R::IOContext& ioc, + const R::Object& oid, + std::string_view id, + s::yield_context y, + std::optional<fifo::objv> objv = std::nullopt, + std::optional<std::string_view> oid_prefix = std::nullopt, + bool exclusive = false, + std::uint64_t max_part_size = RCf::default_max_part_size, + std::uint64_t max_entry_size = RCf::default_max_entry_size) +{ + R::WriteOp op; + RCf::create_meta(op, id, objv, oid_prefix, exclusive, max_part_size, + max_entry_size); + r.execute(oid, ioc, std::move(op), y); +} +} + +TEST(ClsFIFO, TestCreate) { + ba::io_context c; + auto fifo_id = "fifo"sv; + R::Object oid(fifo_id); + + s::spawn(c, [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + bs::error_code ec; + fifo_create(r, ioc, oid, ""s, y[ec]); + EXPECT_EQ(bs::errc::invalid_argument, ec); + fifo_create(r, ioc, oid, fifo_id, y[ec], std::nullopt, + std::nullopt, false, 0); + EXPECT_EQ(bs::errc::invalid_argument, ec); + fifo_create(r, ioc, oid, {}, y[ec], + std::nullopt, std::nullopt, + false, RCf::default_max_part_size, 0); + EXPECT_EQ(bs::errc::invalid_argument, ec); + fifo_create(r, ioc, oid, fifo_id, y); + { + std::uint64_t size; + std::uint64_t size2; + { + R::ReadOp op; + op.stat(&size, nullptr); + r.execute(oid, ioc, std::move(op), + nullptr, y); + EXPECT_GT(size, 0); + } + + { + R::ReadOp op; + op.stat(&size2, nullptr); + r.execute(oid, ioc, std::move(op), nullptr, y); + } + EXPECT_EQ(size2, size); + } + /* test idempotency */ + fifo_create(r, ioc, oid, fifo_id, y); + fifo_create(r, ioc, oid, {}, y[ec], std::nullopt, + std::nullopt, false); + EXPECT_EQ(bs::errc::invalid_argument, ec); + fifo_create(r, ioc, oid, {}, y[ec], std::nullopt, + "myprefix"sv, false); + EXPECT_EQ(bs::errc::invalid_argument, ec); + fifo_create(r, ioc, oid, "foo"sv, y[ec], + std::nullopt, std::nullopt, false); + EXPECT_EQ(bs::errc::file_exists, ec); + }); + c.run(); +} + +TEST(ClsFIFO, TestGetInfo) { + ba::io_context c; + auto fifo_id = "fifo"sv; + R::Object oid(fifo_id); + + s::spawn(c, [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + /* first successful create */ + fifo_create(r, ioc, oid, fifo_id, y); + + fifo::info info; + std::uint32_t part_header_size; + std::uint32_t part_entry_overhead; + { + R::ReadOp op; + RCf::get_meta(op, std::nullopt, + nullptr, &info, &part_header_size, + &part_entry_overhead); + r.execute(oid, ioc, std::move(op), nullptr, y); + EXPECT_GT(part_header_size, 0); + EXPECT_GT(part_entry_overhead, 0); + EXPECT_FALSE(info.version.instance.empty()); + } + { + R::ReadOp op; + RCf::get_meta(op, info.version, + nullptr, &info, &part_header_size, + &part_entry_overhead); + r.execute(oid, ioc, std::move(op), nullptr, y); + } + { + R::ReadOp op; + fifo::objv objv; + objv.instance = "foo"; + objv.ver = 12; + RCf::get_meta(op, objv, + nullptr, &info, &part_header_size, + &part_entry_overhead); + ASSERT_ANY_THROW(r.execute(oid, ioc, std::move(op), + nullptr, y)); + } + }); + c.run(); +} + +TEST(FIFO, TestOpenDefault) { + ba::io_context c; + auto fifo_id = "fifo"s; + + s::spawn(c, [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + auto fifo = RCf::FIFO::create(r, ioc, fifo_id, y); + // force reading from backend + fifo->read_meta(y); + auto info = fifo->meta(); + EXPECT_EQ(info.id, fifo_id); + }); + c.run(); +} + +TEST(FIFO, TestOpenParams) { + ba::io_context c; + auto fifo_id = "fifo"sv; + + s::spawn(c, [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + + const std::uint64_t max_part_size = 10 * 1024; + const std::uint64_t max_entry_size = 128; + auto oid_prefix = "foo.123."sv; + fifo::objv objv; + objv.instance = "fooz"s; + objv.ver = 10; + + /* first successful create */ + auto f = RCf::FIFO::create(r, ioc, fifo_id, y, objv, oid_prefix, + false, max_part_size, + max_entry_size); + + + /* force reading from backend */ + f->read_meta(y); + auto info = f->meta(); + ASSERT_EQ(info.id, fifo_id); + ASSERT_EQ(info.params.max_part_size, max_part_size); + ASSERT_EQ(info.params.max_entry_size, max_entry_size); + ASSERT_EQ(info.version, objv); + }); + c.run(); +} + +namespace { +template<class T> +std::pair<T, std::string> decode_entry(const RCf::list_entry& entry) +{ + T val; + auto iter = entry.data.cbegin(); + decode(val, iter); + return std::make_pair(std::move(val), entry.marker); +} +} + + + +TEST(FIFO, TestPushListTrim) { + ba::io_context c; + auto fifo_id = "fifo"sv; + + s::spawn(c, [&](s::yield_context y) mutable { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + auto f = RCf::FIFO::create(r, ioc, fifo_id, y); + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + f->push(bl, y); + } + + std::optional<std::string> marker; + /* get entries one by one */ + + for (auto i = 0u; i < max_entries; ++i) { + auto [result, more] = f->list(1, marker, y); + + bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + ASSERT_EQ(1, result.size()); + + std::uint32_t val; + std::tie(val, marker) = + decode_entry<std::uint32_t>(result.front()); + + ASSERT_EQ(i, val); + } + + /* get all entries at once */ + std::string markers[max_entries]; + std::uint32_t min_entry = 0; + { + auto [result, more] = f->list(max_entries * 10, std::nullopt, + y); + + ASSERT_FALSE(more); + ASSERT_EQ(max_entries, result.size()); + + + for (auto i = 0u; i < max_entries; ++i) { + std::uint32_t val; + + std::tie(val, markers[i]) = + decode_entry<std::uint32_t>(result[i]); + ASSERT_EQ(i, val); + } + + + /* trim one entry */ + f->trim(markers[min_entry], false, y); + ++min_entry; + } + + auto [result, more] = f->list(max_entries * 10, + std::nullopt, y); + + ASSERT_FALSE(more); + ASSERT_EQ(max_entries - min_entry, result.size()); + + for (auto i = min_entry; i < max_entries; ++i) { + std::uint32_t val; + + std::tie(val, markers[i - min_entry]) = + decode_entry<std::uint32_t>(result[i - min_entry]); + ASSERT_EQ(i, val); + } + + }); + c.run(); +} + + +TEST(FIFO, TestPushTooBig) { + ba::io_context c; + auto fifo_id = "fifo"sv; + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + s::spawn(c, [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + + auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + + char buf[max_entry_size + 1]; + memset(buf, 0, sizeof(buf)); + + cb::list bl; + bl.append(buf, sizeof(buf)); + + bs::error_code ec; + f->push(bl, y[ec]); + EXPECT_EQ(RCf::errc::entry_too_large, ec); + }); + c.run(); +} + + +TEST(FIFO, TestMultipleParts) { + ba::io_context c; + auto fifo_id = "fifo"sv; + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + s::spawn(c, [&](s::yield_context y) mutable { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + + auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + const auto [part_header_size, part_entry_overhead] = + f->get_part_layout_info(); + + const auto entries_per_part = + (max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead); + + const auto max_entries = entries_per_part * 4 + 1; + + /* push enough entries */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + + f->push(bl, y); + } + + auto info = f->meta(); + + ASSERT_EQ(info.id, fifo_id); + /* head should have advanced */ + ASSERT_GT(info.head_part_num, 0); + + + /* list all at once */ + auto [result, more] = f->list(max_entries, std::nullopt, y); + EXPECT_EQ(false, more); + + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + std::optional<std::string> marker; + /* get entries one by one */ + + for (auto i = 0u; i < max_entries; ++i) { + auto [result, more] = f->list(1, marker, y); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + std::uint32_t val; + std::tie(val, marker) = + decode_entry<std::uint32_t>(result.front()); + + auto& entry = result.front(); + auto& bl = entry.data; + ASSERT_EQ(i, *(int *)bl.c_str()); + marker = entry.marker; + } + + /* trim one at a time */ + marker.reset(); + for (auto i = 0u; i < max_entries; ++i) { + /* read single entry */ + { + auto [result, more] = f->list(1, marker, y); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + marker = result.front().marker; + + f->trim(*marker, false, y); + } + + /* check tail */ + info = f->meta(); + ASSERT_EQ(info.tail_part_num, i / entries_per_part); + + /* try to read all again, see how many entries left */ + auto [result, more] = f->list(max_entries, marker, y); + ASSERT_EQ(max_entries - i - 1, result.size()); + ASSERT_EQ(false, more); + } + + /* tail now should point at head */ + info = f->meta(); + ASSERT_EQ(info.head_part_num, info.tail_part_num); + + /* check old tails are removed */ + for (auto i = 0; i < info.tail_part_num; ++i) { + bs::error_code ec; + f->get_part_info(i, y[ec]); + ASSERT_EQ(bs::errc::no_such_file_or_directory, ec); + } + /* check current tail exists */ + f->get_part_info(info.tail_part_num, y); + }); + c.run(); +} + + +TEST(FIFO, TestTwoPushers) { + ba::io_context c; + auto fifo_id = "fifo"sv; + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + s::spawn(c, [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + + auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + + + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + + auto [part_header_size, part_entry_overhead] = + f->get_part_layout_info(); + + const auto entries_per_part = + (max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead); + + const auto max_entries = entries_per_part * 4 + 1; + + auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y); + + std::vector fifos{&f, &f2}; + + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + + auto& f = fifos[i % fifos.size()]; + + (*f)->push(bl, y); + } + + /* list all by both */ + { + auto [result, more] = f2->list(max_entries, std::nullopt, y); + + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + } + auto [result, more] = f2->list(max_entries, std::nullopt, y); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + }); + c.run(); +} + + +TEST(FIFO, TestTwoPushersTrim) { + ba::io_context c; + auto fifo_id = "fifo"sv; + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + s::spawn(c, [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + + auto f1 = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + + auto [part_header_size, part_entry_overhead] = + f1->get_part_layout_info(); + + const auto entries_per_part = + (max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead); + + const auto max_entries = entries_per_part * 4 + 1; + + auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y); + + /* push one entry to f2 and the rest to f1 */ + + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + + auto f = (i < 1 ? &f2 : &f1); + (*f)->push(bl, y); + } + + /* trim half by fifo1 */ + auto num = max_entries / 2; + + std::string marker; + { + auto [result, more] = f1->list(num, std::nullopt, y); + + ASSERT_EQ(true, more); + ASSERT_EQ(num, result.size()); + + for (auto i = 0u; i < num; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + auto& entry = result[num - 1]; + marker = entry.marker; + + f1->trim(marker, false, y); + + /* list what's left by fifo2 */ + + } + + const auto left = max_entries - num; + auto [result, more] = f2->list(left, marker, y); + ASSERT_EQ(left, result.size()); + ASSERT_EQ(false, more); + + for (auto i = num; i < max_entries; ++i) { + auto& bl = result[i - num].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + }); + c.run(); +} + +TEST(FIFO, TestPushBatch) { + ba::io_context c; + auto fifo_id = "fifo"sv; + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + s::spawn(c, [&](s::yield_context y) { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + + auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] + = f->get_part_layout_info(); + + auto entries_per_part = + (max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead); + + auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ + + std::vector<cb::list> bufs; + + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + + bufs.push_back(bl); + } + + f->push(bufs, y); + + /* list all */ + + auto [result, more] = f->list(max_entries, std::nullopt, y); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + auto& info = f->meta(); + ASSERT_EQ(info.head_part_num, 4); + }); + c.run(); +} + +TEST(FIFO, TestTrimExclusive) { + ba::io_context c; + auto fifo_id = "fifo"sv; + + s::spawn(c, [&](s::yield_context y) mutable { + auto r = R::RADOS::Builder{}.build(c, y); + auto pool = create_pool(r, get_temp_pool_name(), y); + auto sg = make_scope_guard( + [&] { + r.delete_pool(pool, y); + }); + R::IOContext ioc(pool); + auto f = RCf::FIFO::create(r, ioc, fifo_id, y); + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + f->push(bl, y); + } + + { + auto [result, more] = f->list(1, std::nullopt, y); + auto [val, marker] = + decode_entry<std::uint32_t>(result.front()); + ASSERT_EQ(0, val); + f->trim(marker, true, y); + } + { + auto [result, more] = f->list(max_entries, std::nullopt, y); + auto [val, marker] = decode_entry<std::uint32_t>(result.front()); + ASSERT_EQ(0, val); + f->trim(result[4].marker, true, y); + } + { + auto [result, more] = f->list(max_entries, std::nullopt, y); + auto [val, marker] = + decode_entry<std::uint32_t>(result.front()); + ASSERT_EQ(4, val); + f->trim(result.back().marker, true, y); + } + { + auto [result, more] = f->list(max_entries, std::nullopt, y); + auto [val, marker] = + decode_entry<std::uint32_t>(result.front()); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(max_entries - 1, val); + } + }); + c.run(); +} |