summaryrefslogtreecommitdiffstats
path: root/src/test/cls_fifo
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/test/cls_fifo
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/cls_fifo')
-rw-r--r--src/test/cls_fifo/CMakeLists.txt34
-rw-r--r--src/test/cls_fifo/bench_cls_fifo.cc462
-rw-r--r--src/test/cls_fifo/test_cls_fifo.cc739
3 files changed, 1235 insertions, 0 deletions
diff --git a/src/test/cls_fifo/CMakeLists.txt b/src/test/cls_fifo/CMakeLists.txt
new file mode 100644
index 000000000..3abf7634a
--- /dev/null
+++ b/src/test/cls_fifo/CMakeLists.txt
@@ -0,0 +1,34 @@
+add_executable(ceph_test_cls_fifo
+ test_cls_fifo.cc
+ )
+target_link_libraries(ceph_test_cls_fifo
+ neorados_cls_fifo
+ libneorados
+ spawn
+ ${UNITTEST_LIBS}
+ ${BLKID_LIBRARIES}
+ ${CMAKE_DL_LIBS}
+ ${CRYPTO_LIBS}
+ ${EXTRALIBS}
+ neoradostest-support
+ )
+install(TARGETS
+ ceph_test_cls_fifo
+ DESTINATION ${CMAKE_INSTALL_BINDIR})
+
+add_executable(ceph_bench_cls_fifo
+ bench_cls_fifo.cc
+ )
+target_link_libraries(ceph_bench_cls_fifo
+ neorados_cls_fifo
+ libneorados
+ spawn
+ ${UNITTEST_LIBS}
+ ${BLKID_LIBRARIES}
+ ${CMAKE_DL_LIBS}
+ ${CRYPTO_LIBS}
+ ${EXTRALIBS}
+ )
+install(TARGETS
+ ceph_test_cls_fifo
+ DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/test/cls_fifo/bench_cls_fifo.cc b/src/test/cls_fifo/bench_cls_fifo.cc
new file mode 100644
index 000000000..df390cd31
--- /dev/null
+++ b/src/test/cls_fifo/bench_cls_fifo.cc
@@ -0,0 +1,462 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <cerrno>
+#include <chrono>
+#include <cstdint>
+#include <exception>
+#include <future>
+#include <iostream>
+#include <string_view>
+
+#include <boost/asio.hpp>
+#include <boost/system/error_code.hpp>
+#include <boost/program_options.hpp>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/chrono.h>
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include <spawn/spawn.hpp>
+
+#include "include/neorados/RADOS.hpp"
+
+#include "neorados/cls/fifo.h"
+
+namespace ba = boost::asio;
+namespace bs = boost::system;
+namespace bpo = boost::program_options;
+namespace cb = ceph::buffer;
+namespace R = neorados;
+namespace RCf = neorados::cls::fifo;
+namespace fifo = rados::cls::fifo;
+namespace s = spawn;
+namespace sc = std::chrono;
+
+namespace {
+static constexpr auto PUSH = 0x01 << 0;
+static constexpr auto PULL = 0x01 << 1;
+static constexpr auto BOTH = PUSH | PULL;
+static constexpr auto CLEAN = 0x01 << 2;
+static constexpr auto METADATA = 0x01 << 3;
+static constexpr auto PARTINFO = 0x01 << 4;
+static constexpr auto LIST = 0x01 << 5;
+
+struct benchmark {
+ std::uint32_t entries = 0;
+ sc::duration<double> elapsed = 0ns;
+
+ std::uint64_t ratio() const {
+ return entries/std::max(elapsed,
+ sc::duration<double>(1ns)).count();
+ }
+ benchmark() = default;
+ benchmark(std::uint32_t entries, sc::duration<double> elapsed)
+ : entries(entries), elapsed(elapsed) {}
+};
+
+benchmark push(RCf::FIFO& f, const std::uint32_t count,
+ const std::uint32_t entry_size, const std::uint32_t push_entries,
+ s::yield_context y)
+{
+ cb::list entry;
+ entry.push_back(cb::create_small_page_aligned(entry_size));
+ entry.zero();
+
+ std::vector entries(std::min(count, push_entries), entry);
+ auto remaining = count;
+ auto start = sc::steady_clock::now();
+ while (remaining) {
+ if (entries.size() > remaining) {
+ entries.resize(remaining);
+ }
+ f.push(entries, y);
+ remaining -= entries.size();
+ }
+ auto finish = sc::steady_clock::now();
+ return benchmark(count, (finish - start));
+}
+
+benchmark pull(RCf::FIFO& f, const std::uint32_t count,
+ const std::uint32_t pull_entries, s::yield_context y)
+{
+ auto remaining = count;
+ std::uint32_t got = 0;
+
+ auto start = sc::steady_clock::now();
+ while (remaining) {
+ auto [result, more] = f.list(std::min(remaining, pull_entries),
+ std::nullopt, y);
+ if (result.empty())
+ break;
+ got += result.size();
+ remaining -= result.size();
+ f.trim(result.back().marker, false, y);
+ }
+ auto finish = sc::steady_clock::now();
+ return benchmark(got, (finish - start));
+}
+
+void concurpull(const std::string& oid, const std::int64_t pool,
+ const std::uint32_t count, const std::uint32_t pull_entries,
+ std::promise<benchmark> notify, const bool* const exit_early)
+{
+ ba::io_context c;
+ benchmark bench;
+ std::exception_ptr ex;
+ s::spawn(
+ c,
+ [&](s::yield_context y) {
+ try {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ R::IOContext ioc(pool);
+ auto f = RCf::FIFO::open(r, ioc, oid, y);
+ auto remaining = count;
+ std::uint32_t got = 0;
+
+ auto start = sc::steady_clock::now();
+ while (remaining) {
+ if (*exit_early) break;
+ auto [result, more] =
+ f->list(std::min(remaining, pull_entries), std::nullopt, y);
+ if (result.empty()) {
+ // We just keep going assuming they'll push more.
+ continue;
+ }
+ got += result.size();
+ remaining -= result.size();
+ if (*exit_early) break;
+ f->trim(result.back().marker, false, y);
+ }
+ auto finish = sc::steady_clock::now();
+ bench.entries = got;
+ bench.elapsed = finish - start;
+ } catch (const std::exception&) {
+ ex = std::current_exception();
+ }
+ });
+ c.run();
+ if (ex) {
+ notify.set_exception(std::current_exception());
+ } else {
+ notify.set_value(bench);
+ }
+}
+
+void clean(R::RADOS& r, const R::IOContext& ioc, RCf::FIFO& f,
+ s::yield_context y)
+{
+ f.read_meta(y);
+ const auto info = f.meta();
+ if (info.head_part_num > -1) {
+ for (auto i = info.tail_part_num; i <= info.head_part_num; ++i) {
+ R::WriteOp op;
+ op.remove();
+ r.execute(info.part_oid(i), ioc, std::move(op), y);
+ }
+ }
+ R::WriteOp op;
+ op.remove();
+ r.execute(info.id, ioc, std::move(op), y);
+}
+}
+
+int main(int argc, char* argv[])
+{
+ const std::string_view prog(argv[0]);
+ std::string command;
+ try {
+ std::uint32_t count = 0;
+ std::string oid;
+ std::string pool;
+ std::uint32_t entry_size = 0;
+ std::uint32_t push_entries = 0;
+ std::uint32_t pull_entries = 0;
+ std::uint64_t max_part_size = 0;
+ std::uint64_t max_entry_size = 0;
+ std::int64_t part_num = 0;
+ std::string marker;
+
+ bpo::options_description desc(fmt::format("{} options", prog));
+ desc.add_options()
+ ("help", "show help")
+ ("oid", bpo::value<std::string>(&oid)->default_value("fifo"s),
+ "the base oid for the fifo")
+ ("pool", bpo::value<std::string>(&pool)->default_value("fifo_benchmark"s),
+ "the base oid for the fifo")
+ ("count", bpo::value<std::uint32_t>(&count)->default_value(1024),
+ "total count of items")
+ ("entry-size", bpo::value<std::uint32_t>(&entry_size)->default_value(64),
+ "size of entries to push")
+ ("push-entries",
+ bpo::value<std::uint32_t>(&push_entries)
+ ->default_value(512), "entries to push per call")
+ ("max-part-size", bpo::value<std::uint64_t>(&max_part_size)
+ ->default_value(RCf::default_max_part_size),
+ "maximum entry size allowed by FIFO")
+ ("max-entry-size", bpo::value<std::uint64_t>(&max_entry_size)
+ ->default_value(RCf::default_max_entry_size),
+ "maximum entry size allowed by FIFO")
+ ("pull-entries",
+ bpo::value<uint32_t>(&pull_entries)
+ ->default_value(512), "entries to pull per call")
+ ("part-num",
+ bpo::value<int64_t>(&part_num)
+ ->default_value(-1), "partition number, -1 for head")
+ ("marker", bpo::value<std::string>(&marker), "marker to begin list")
+ ("command", bpo::value<std::string>(&command),
+ "the operation to perform");
+
+ bpo::positional_options_description p;
+ p.add("command", 1);
+
+ bpo::variables_map vm;
+
+ bpo::store(bpo::command_line_parser(argc, argv).
+ options(desc).positional(p).run(), vm);
+
+ bpo::notify(vm);
+
+ if (vm.count("help")) {
+ fmt::print(std::cout, "{}", desc);
+ fmt::print(std::cout, "\n{} commands:\n", prog);
+ fmt::print(std::cout, " push\t\t\t push entries into fifo\n");
+ fmt::print(std::cout, " pull\t\t\t retrieve and trim entries\n");
+ fmt::print(std::cout, " both\t\t\t both at once, in two threads\n");
+ fmt::print(std::cout, " metadata\t\t\t print metadata\n");
+ fmt::print(std::cout, " partinfo\t\t\t print metadata\n");
+ fmt::print(std::cout, " list\t\t\t list entries\n");
+ fmt::print(std::cout, " clean\t\t\t clean up\n");
+ return 0;
+ }
+
+
+ if (vm.find("command") == vm.end()) {
+ fmt::print(std::cerr, "{}: a command is required\n", prog);
+ return 1;
+ }
+
+ int op = 0;
+ if (command == "push"s) {
+ op = PUSH;
+ } else if (command == "pull"s) {
+ op = PULL;
+ } else if (command == "both"s) {
+ op = BOTH;
+ } else if (command == "clean"s) {
+ op = CLEAN;
+ } else if (command == "metadata"s) {
+ op = METADATA;
+ } else if (command == "partinfo"s) {
+ op = PARTINFO;
+ } else if (command == "list"s) {
+ op = LIST;
+ } else {
+ fmt::print(std::cerr, "{}: {} is not a valid command\n",
+ prog, command);
+ return 1;
+ }
+
+ if (!(op & PULL) && !vm["pull-entries"].defaulted()) {
+ fmt::print(std::cerr, "{}: pull-entries is only meaningful when pulling\n",
+ prog);
+ return 1;
+ }
+
+ if (!(op & PUSH)) {
+ for (const auto& p : { "entry-size"s, "push-entries"s, "max-part-size"s,
+ "max-entry-size"s }) {
+ if (!vm[p].defaulted()) {
+ fmt::print(std::cerr, "{}: {} is only meaningful when pushing\n",
+ prog, p);
+ return 1;
+ }
+ }
+ }
+
+ if (!(op & BOTH) && !(op & LIST) && !vm["count"].defaulted()) {
+ fmt::print(std::cerr, "{}: count is only meaningful when pulling, pushing, both, or listing\n",
+ prog);
+ return 1;
+ }
+
+ if (!(op & PARTINFO) && !vm["part-num"].defaulted()) {
+ fmt::print(std::cerr, "{}: part-num is only meaningful when getting part info\n",
+ prog);
+ return 1;
+ }
+
+ if (count == 0) {
+ fmt::print(std::cerr, "{}: count must be nonzero\n", prog);
+ return 1;
+ }
+
+ if ((op & PULL) && (pull_entries == 0)) {
+ fmt::print(std::cerr,
+ "{}: pull-entries must be nonzero\n", prog);
+ return 1;
+ }
+
+ if (!(op & LIST) && vm.count("marker") > 0) {
+ fmt::print(std::cerr, "{}: marker is only meaningful when listing\n",
+ prog);
+ return 1;
+ }
+
+ if (op & PUSH) {
+ if (entry_size == 0) {
+ fmt::print(std::cerr, "{}: entry-size must be nonzero\n", prog);
+ return 1;
+ }
+ if (push_entries== 0) {
+ fmt::print(std::cerr, "{}: push-entries must be nonzero\n", prog);
+ return 1;
+ }
+ if (max_entry_size == 0) {
+ fmt::print(std::cerr, "{}: max-entry-size must be nonzero\n", prog);
+ return 1;
+ }
+ if (max_part_size == 0) {
+ fmt::print(std::cerr, "{}: max-part-size must be nonzero\n", prog);
+ return 1;
+ }
+ if (entry_size > max_entry_size) {
+ fmt::print(std::cerr,
+ "{}: entry-size may not be greater than max-entry-size\n",
+ prog);
+ return 1;
+ }
+ if (max_entry_size >= max_part_size) {
+ fmt::print(std::cerr,
+ "{}: max-entry-size may be less than max-part-size\n",
+ prog);
+ return 1;
+ }
+ }
+
+ ba::io_context c;
+ benchmark pushmark, pullmark;
+ fifo::info meta;
+ fifo::part_header partinfo;
+ bool more = false;
+ std::vector<RCf::list_entry> entries;
+ s::spawn(
+ c,
+ [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ bs::error_code ec;
+ std::int64_t pid;
+ pid = r.lookup_pool(pool, y[ec]);
+ if (ec) {
+ r.create_pool(pool, std::nullopt, y);
+ pid = r.lookup_pool(pool, y);
+ }
+ const R::IOContext ioc(pid);
+ auto f = RCf::FIFO::create(r, ioc, oid, y, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+
+ switch (op) {
+ case PUSH:
+ pushmark = push(*f, count, entry_size, push_entries, y);
+ break;
+
+ case PULL:
+ pullmark = pull(*f, count, pull_entries, y);
+ break;
+
+ case METADATA:
+ meta = f->meta();
+ break;
+
+ case PARTINFO:
+ meta = f->meta();
+ if (part_num == -1) {
+ part_num = meta.head_part_num;
+ }
+ partinfo = f->get_part_info(part_num, y);
+ break;
+
+ case LIST:
+ if (vm.count("marker") == 0) {
+ std::tie(entries, more) = f->list(count, std::nullopt, y);
+ } else {
+ std::tie(entries, more) = f->list(count, marker, y);
+ }
+ break;
+
+ case BOTH: {
+ std::promise<benchmark> notify;
+ bool exit_early = false;
+
+ auto notifier = notify.get_future();
+ std::thread t(concurpull, oid, pid, count, pull_entries,
+ std::move(notify), &exit_early);
+ t.detach();
+ try {
+ pushmark = push(*f, count, entry_size, push_entries, y);
+ } catch (const std::exception&) {
+ exit_early = true;
+ notifier.wait();
+ throw;
+ }
+ pullmark = notifier.get();
+ }
+ }
+
+ if (op & CLEAN)
+ clean(r, ioc, *f, y);
+ });
+ c.run();
+ if (op & PUSH) {
+ fmt::print("Pushed {} in {} at {}/s\n",
+ pushmark.entries, pushmark.elapsed, pushmark.ratio());
+ }
+ if (op & PULL) {
+ if (pullmark.entries == count) {
+ fmt::print(std::cout, "Pulled {} in {} at {}/s\n",
+ pullmark.entries, pullmark.elapsed, pullmark.ratio());
+ } else {
+ fmt::print(std::cout, "Pulled {} (of {} requested), in {} at {}/s\n",
+ pullmark.entries, count, pullmark.elapsed, pullmark.ratio());
+ }
+ }
+ if (op & METADATA) {
+ fmt::print(std::cout, "Metadata: [{}]\n", meta);
+ }
+ if (op & PARTINFO) {
+ fmt::print(std::cout, "Info for partition {}: [{}]\n", part_num, partinfo);
+ }
+ if (op & LIST) {
+ for (const auto& entry : entries) {
+ fmt::print(std::cout, "{}\t{}\n", entry.marker, entry.mtime);
+ }
+ if (more) {
+ fmt::print(std::cout, "...");
+ }
+ }
+ } catch (const std::exception& e) {
+ if (command.empty()) {
+ fmt::print(std::cerr, "{}: {}\n", prog, e.what());
+ } else {
+ fmt::print(std::cerr, "{}: {}: {}\n", prog, command, e.what());
+ }
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/src/test/cls_fifo/test_cls_fifo.cc b/src/test/cls_fifo/test_cls_fifo.cc
new file mode 100644
index 000000000..484248c0c
--- /dev/null
+++ b/src/test/cls_fifo/test_cls_fifo.cc
@@ -0,0 +1,739 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <cerrno>
+#include <iostream>
+#include <string_view>
+
+#include <boost/asio.hpp>
+#include <boost/system/error_code.hpp>
+
+#include <spawn/spawn.hpp>
+
+#include "include/scope_guard.h"
+#include "include/types.h"
+#include "include/neorados/RADOS.hpp"
+
+#include "cls/fifo/cls_fifo_ops.h"
+
+#include "neorados/cls/fifo.h"
+
+#include "test/neorados/common_tests.h"
+
+#include "gtest/gtest.h"
+
+namespace R = neorados;
+namespace ba = boost::asio;
+namespace bs = boost::system;
+namespace cb = ceph::buffer;
+namespace fifo = rados::cls::fifo;
+namespace RCf = neorados::cls::fifo;
+namespace s = spawn;
+
+namespace {
+void fifo_create(R::RADOS& r,
+ const R::IOContext& ioc,
+ const R::Object& oid,
+ std::string_view id,
+ s::yield_context y,
+ std::optional<fifo::objv> objv = std::nullopt,
+ std::optional<std::string_view> oid_prefix = std::nullopt,
+ bool exclusive = false,
+ std::uint64_t max_part_size = RCf::default_max_part_size,
+ std::uint64_t max_entry_size = RCf::default_max_entry_size)
+{
+ R::WriteOp op;
+ RCf::create_meta(op, id, objv, oid_prefix, exclusive, max_part_size,
+ max_entry_size);
+ r.execute(oid, ioc, std::move(op), y);
+}
+}
+
+TEST(ClsFIFO, TestCreate) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+ R::Object oid(fifo_id);
+
+ s::spawn(c, [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+ bs::error_code ec;
+ fifo_create(r, ioc, oid, ""s, y[ec]);
+ EXPECT_EQ(bs::errc::invalid_argument, ec);
+ fifo_create(r, ioc, oid, fifo_id, y[ec], std::nullopt,
+ std::nullopt, false, 0);
+ EXPECT_EQ(bs::errc::invalid_argument, ec);
+ fifo_create(r, ioc, oid, {}, y[ec],
+ std::nullopt, std::nullopt,
+ false, RCf::default_max_part_size, 0);
+ EXPECT_EQ(bs::errc::invalid_argument, ec);
+ fifo_create(r, ioc, oid, fifo_id, y);
+ {
+ std::uint64_t size;
+ std::uint64_t size2;
+ {
+ R::ReadOp op;
+ op.stat(&size, nullptr);
+ r.execute(oid, ioc, std::move(op),
+ nullptr, y);
+ EXPECT_GT(size, 0);
+ }
+
+ {
+ R::ReadOp op;
+ op.stat(&size2, nullptr);
+ r.execute(oid, ioc, std::move(op), nullptr, y);
+ }
+ EXPECT_EQ(size2, size);
+ }
+ /* test idempotency */
+ fifo_create(r, ioc, oid, fifo_id, y);
+ fifo_create(r, ioc, oid, {}, y[ec], std::nullopt,
+ std::nullopt, false);
+ EXPECT_EQ(bs::errc::invalid_argument, ec);
+ fifo_create(r, ioc, oid, {}, y[ec], std::nullopt,
+ "myprefix"sv, false);
+ EXPECT_EQ(bs::errc::invalid_argument, ec);
+ fifo_create(r, ioc, oid, "foo"sv, y[ec],
+ std::nullopt, std::nullopt, false);
+ EXPECT_EQ(bs::errc::file_exists, ec);
+ });
+ c.run();
+}
+
+TEST(ClsFIFO, TestGetInfo) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+ R::Object oid(fifo_id);
+
+ s::spawn(c, [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+ /* first successful create */
+ fifo_create(r, ioc, oid, fifo_id, y);
+
+ fifo::info info;
+ std::uint32_t part_header_size;
+ std::uint32_t part_entry_overhead;
+ {
+ R::ReadOp op;
+ RCf::get_meta(op, std::nullopt,
+ nullptr, &info, &part_header_size,
+ &part_entry_overhead);
+ r.execute(oid, ioc, std::move(op), nullptr, y);
+ EXPECT_GT(part_header_size, 0);
+ EXPECT_GT(part_entry_overhead, 0);
+ EXPECT_FALSE(info.version.instance.empty());
+ }
+ {
+ R::ReadOp op;
+ RCf::get_meta(op, info.version,
+ nullptr, &info, &part_header_size,
+ &part_entry_overhead);
+ r.execute(oid, ioc, std::move(op), nullptr, y);
+ }
+ {
+ R::ReadOp op;
+ fifo::objv objv;
+ objv.instance = "foo";
+ objv.ver = 12;
+ RCf::get_meta(op, objv,
+ nullptr, &info, &part_header_size,
+ &part_entry_overhead);
+ ASSERT_ANY_THROW(r.execute(oid, ioc, std::move(op),
+ nullptr, y));
+ }
+ });
+ c.run();
+}
+
+TEST(FIFO, TestOpenDefault) {
+ ba::io_context c;
+ auto fifo_id = "fifo"s;
+
+ s::spawn(c, [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+ auto fifo = RCf::FIFO::create(r, ioc, fifo_id, y);
+ // force reading from backend
+ fifo->read_meta(y);
+ auto info = fifo->meta();
+ EXPECT_EQ(info.id, fifo_id);
+ });
+ c.run();
+}
+
+TEST(FIFO, TestOpenParams) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+
+ s::spawn(c, [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+
+ const std::uint64_t max_part_size = 10 * 1024;
+ const std::uint64_t max_entry_size = 128;
+ auto oid_prefix = "foo.123."sv;
+ fifo::objv objv;
+ objv.instance = "fooz"s;
+ objv.ver = 10;
+
+ /* first successful create */
+ auto f = RCf::FIFO::create(r, ioc, fifo_id, y, objv, oid_prefix,
+ false, max_part_size,
+ max_entry_size);
+
+
+ /* force reading from backend */
+ f->read_meta(y);
+ auto info = f->meta();
+ ASSERT_EQ(info.id, fifo_id);
+ ASSERT_EQ(info.params.max_part_size, max_part_size);
+ ASSERT_EQ(info.params.max_entry_size, max_entry_size);
+ ASSERT_EQ(info.version, objv);
+ });
+ c.run();
+}
+
+namespace {
+template<class T>
+std::pair<T, std::string> decode_entry(const RCf::list_entry& entry)
+{
+ T val;
+ auto iter = entry.data.cbegin();
+ decode(val, iter);
+ return std::make_pair(std::move(val), entry.marker);
+}
+}
+
+
+
+TEST(FIFO, TestPushListTrim) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+
+ s::spawn(c, [&](s::yield_context y) mutable {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+ auto f = RCf::FIFO::create(r, ioc, fifo_id, y);
+ static constexpr auto max_entries = 10u;
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ cb::list bl;
+ encode(i, bl);
+ f->push(bl, y);
+ }
+
+ std::optional<std::string> marker;
+ /* get entries one by one */
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto [result, more] = f->list(1, marker, y);
+
+ bool expected_more = (i != (max_entries - 1));
+ ASSERT_EQ(expected_more, more);
+ ASSERT_EQ(1, result.size());
+
+ std::uint32_t val;
+ std::tie(val, marker) =
+ decode_entry<std::uint32_t>(result.front());
+
+ ASSERT_EQ(i, val);
+ }
+
+ /* get all entries at once */
+ std::string markers[max_entries];
+ std::uint32_t min_entry = 0;
+ {
+ auto [result, more] = f->list(max_entries * 10, std::nullopt,
+ y);
+
+ ASSERT_FALSE(more);
+ ASSERT_EQ(max_entries, result.size());
+
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ std::uint32_t val;
+
+ std::tie(val, markers[i]) =
+ decode_entry<std::uint32_t>(result[i]);
+ ASSERT_EQ(i, val);
+ }
+
+
+ /* trim one entry */
+ f->trim(markers[min_entry], false, y);
+ ++min_entry;
+ }
+
+ auto [result, more] = f->list(max_entries * 10,
+ std::nullopt, y);
+
+ ASSERT_FALSE(more);
+ ASSERT_EQ(max_entries - min_entry, result.size());
+
+ for (auto i = min_entry; i < max_entries; ++i) {
+ std::uint32_t val;
+
+ std::tie(val, markers[i - min_entry]) =
+ decode_entry<std::uint32_t>(result[i - min_entry]);
+ ASSERT_EQ(i, val);
+ }
+
+ });
+ c.run();
+}
+
+
+TEST(FIFO, TestPushTooBig) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+
+ s::spawn(c, [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+
+ auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+
+ char buf[max_entry_size + 1];
+ memset(buf, 0, sizeof(buf));
+
+ cb::list bl;
+ bl.append(buf, sizeof(buf));
+
+ bs::error_code ec;
+ f->push(bl, y[ec]);
+ EXPECT_EQ(RCf::errc::entry_too_large, ec);
+ });
+ c.run();
+}
+
+
+TEST(FIFO, TestMultipleParts) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+
+ s::spawn(c, [&](s::yield_context y) mutable {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+
+ auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+
+
+ char buf[max_entry_size];
+ memset(buf, 0, sizeof(buf));
+
+ const auto [part_header_size, part_entry_overhead] =
+ f->get_part_layout_info();
+
+ const auto entries_per_part =
+ (max_part_size - part_header_size) /
+ (max_entry_size + part_entry_overhead);
+
+ const auto max_entries = entries_per_part * 4 + 1;
+
+ /* push enough entries */
+ for (auto i = 0u; i < max_entries; ++i) {
+ cb::list bl;
+
+ *(int *)buf = i;
+ bl.append(buf, sizeof(buf));
+
+ f->push(bl, y);
+ }
+
+ auto info = f->meta();
+
+ ASSERT_EQ(info.id, fifo_id);
+ /* head should have advanced */
+ ASSERT_GT(info.head_part_num, 0);
+
+
+ /* list all at once */
+ auto [result, more] = f->list(max_entries, std::nullopt, y);
+ EXPECT_EQ(false, more);
+
+ ASSERT_EQ(max_entries, result.size());
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+ std::optional<std::string> marker;
+ /* get entries one by one */
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto [result, more] = f->list(1, marker, y);
+ ASSERT_EQ(result.size(), 1);
+ const bool expected_more = (i != (max_entries - 1));
+ ASSERT_EQ(expected_more, more);
+
+ std::uint32_t val;
+ std::tie(val, marker) =
+ decode_entry<std::uint32_t>(result.front());
+
+ auto& entry = result.front();
+ auto& bl = entry.data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ marker = entry.marker;
+ }
+
+ /* trim one at a time */
+ marker.reset();
+ for (auto i = 0u; i < max_entries; ++i) {
+ /* read single entry */
+ {
+ auto [result, more] = f->list(1, marker, y);
+ ASSERT_EQ(result.size(), 1);
+ const bool expected_more = (i != (max_entries - 1));
+ ASSERT_EQ(expected_more, more);
+
+ marker = result.front().marker;
+
+ f->trim(*marker, false, y);
+ }
+
+ /* check tail */
+ info = f->meta();
+ ASSERT_EQ(info.tail_part_num, i / entries_per_part);
+
+ /* try to read all again, see how many entries left */
+ auto [result, more] = f->list(max_entries, marker, y);
+ ASSERT_EQ(max_entries - i - 1, result.size());
+ ASSERT_EQ(false, more);
+ }
+
+ /* tail now should point at head */
+ info = f->meta();
+ ASSERT_EQ(info.head_part_num, info.tail_part_num);
+
+ /* check old tails are removed */
+ for (auto i = 0; i < info.tail_part_num; ++i) {
+ bs::error_code ec;
+ f->get_part_info(i, y[ec]);
+ ASSERT_EQ(bs::errc::no_such_file_or_directory, ec);
+ }
+ /* check current tail exists */
+ f->get_part_info(info.tail_part_num, y);
+ });
+ c.run();
+}
+
+
+TEST(FIFO, TestTwoPushers) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+
+ s::spawn(c, [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+
+ auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+
+
+
+ char buf[max_entry_size];
+ memset(buf, 0, sizeof(buf));
+
+
+ auto [part_header_size, part_entry_overhead] =
+ f->get_part_layout_info();
+
+ const auto entries_per_part =
+ (max_part_size - part_header_size) /
+ (max_entry_size + part_entry_overhead);
+
+ const auto max_entries = entries_per_part * 4 + 1;
+
+ auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y);
+
+ std::vector fifos{&f, &f2};
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ cb::list bl;
+ *(int *)buf = i;
+ bl.append(buf, sizeof(buf));
+
+ auto& f = fifos[i % fifos.size()];
+
+ (*f)->push(bl, y);
+ }
+
+ /* list all by both */
+ {
+ auto [result, more] = f2->list(max_entries, std::nullopt, y);
+
+ ASSERT_EQ(false, more);
+ ASSERT_EQ(max_entries, result.size());
+ }
+ auto [result, more] = f2->list(max_entries, std::nullopt, y);
+ ASSERT_EQ(false, more);
+ ASSERT_EQ(max_entries, result.size());
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+ });
+ c.run();
+}
+
+
+TEST(FIFO, TestTwoPushersTrim) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+
+ s::spawn(c, [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+
+ auto f1 = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+
+ char buf[max_entry_size];
+ memset(buf, 0, sizeof(buf));
+
+
+ auto [part_header_size, part_entry_overhead] =
+ f1->get_part_layout_info();
+
+ const auto entries_per_part =
+ (max_part_size - part_header_size) /
+ (max_entry_size + part_entry_overhead);
+
+ const auto max_entries = entries_per_part * 4 + 1;
+
+ auto f2 = RCf::FIFO::open(r, ioc, fifo_id, y);
+
+ /* push one entry to f2 and the rest to f1 */
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ cb::list bl;
+
+ *(int *)buf = i;
+ bl.append(buf, sizeof(buf));
+
+ auto f = (i < 1 ? &f2 : &f1);
+ (*f)->push(bl, y);
+ }
+
+ /* trim half by fifo1 */
+ auto num = max_entries / 2;
+
+ std::string marker;
+ {
+ auto [result, more] = f1->list(num, std::nullopt, y);
+
+ ASSERT_EQ(true, more);
+ ASSERT_EQ(num, result.size());
+
+ for (auto i = 0u; i < num; ++i) {
+ auto& bl = result[i].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+ auto& entry = result[num - 1];
+ marker = entry.marker;
+
+ f1->trim(marker, false, y);
+
+ /* list what's left by fifo2 */
+
+ }
+
+ const auto left = max_entries - num;
+ auto [result, more] = f2->list(left, marker, y);
+ ASSERT_EQ(left, result.size());
+ ASSERT_EQ(false, more);
+
+ for (auto i = num; i < max_entries; ++i) {
+ auto& bl = result[i - num].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+ });
+ c.run();
+}
+
+TEST(FIFO, TestPushBatch) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+ static constexpr auto max_part_size = 2048ull;
+ static constexpr auto max_entry_size = 128ull;
+
+ s::spawn(c, [&](s::yield_context y) {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+
+ auto f = RCf::FIFO::create(r, ioc, fifo_id, y, std::nullopt,
+ std::nullopt, false, max_part_size,
+ max_entry_size);
+
+
+ char buf[max_entry_size];
+ memset(buf, 0, sizeof(buf));
+
+ auto [part_header_size, part_entry_overhead]
+ = f->get_part_layout_info();
+
+ auto entries_per_part =
+ (max_part_size - part_header_size) /
+ (max_entry_size + part_entry_overhead);
+
+ auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
+
+ std::vector<cb::list> bufs;
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ cb::list bl;
+
+ *(int *)buf = i;
+ bl.append(buf, sizeof(buf));
+
+ bufs.push_back(bl);
+ }
+
+ f->push(bufs, y);
+
+ /* list all */
+
+ auto [result, more] = f->list(max_entries, std::nullopt, y);
+ ASSERT_EQ(false, more);
+ ASSERT_EQ(max_entries, result.size());
+
+ for (auto i = 0u; i < max_entries; ++i) {
+ auto& bl = result[i].data;
+ ASSERT_EQ(i, *(int *)bl.c_str());
+ }
+
+ auto& info = f->meta();
+ ASSERT_EQ(info.head_part_num, 4);
+ });
+ c.run();
+}
+
+TEST(FIFO, TestTrimExclusive) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+
+ s::spawn(c, [&](s::yield_context y) mutable {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+ auto f = RCf::FIFO::create(r, ioc, fifo_id, y);
+ static constexpr auto max_entries = 10u;
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ cb::list bl;
+ encode(i, bl);
+ f->push(bl, y);
+ }
+
+ {
+ auto [result, more] = f->list(1, std::nullopt, y);
+ auto [val, marker] =
+ decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(0, val);
+ f->trim(marker, true, y);
+ }
+ {
+ auto [result, more] = f->list(max_entries, std::nullopt, y);
+ auto [val, marker] = decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(0, val);
+ f->trim(result[4].marker, true, y);
+ }
+ {
+ auto [result, more] = f->list(max_entries, std::nullopt, y);
+ auto [val, marker] =
+ decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(4, val);
+ f->trim(result.back().marker, true, y);
+ }
+ {
+ auto [result, more] = f->list(max_entries, std::nullopt, y);
+ auto [val, marker] =
+ decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(result.size(), 1);
+ ASSERT_EQ(max_entries - 1, val);
+ }
+ });
+ c.run();
+}