diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/test/rgw/test_cls_fifo_legacy.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/rgw/test_cls_fifo_legacy.cc')
-rw-r--r-- | src/test/rgw/test_cls_fifo_legacy.cc | 1184 |
1 files changed, 1184 insertions, 0 deletions
diff --git a/src/test/rgw/test_cls_fifo_legacy.cc b/src/test/rgw/test_cls_fifo_legacy.cc new file mode 100644 index 000000000..1fa5f8681 --- /dev/null +++ b/src/test/rgw/test_cls_fifo_legacy.cc @@ -0,0 +1,1184 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <cerrno> +#include <iostream> +#include <string_view> + +#include "include/scope_guard.h" +#include "include/types.h" +#include "include/rados/librados.hpp" +#include "common/ceph_context.h" + +#include "cls/fifo/cls_fifo_ops.h" +#include "test/librados/test_cxx.h" +#include "global/global_context.h" + +#include "rgw_tools.h" +#include "cls_fifo_legacy.h" + +#include "gtest/gtest.h" + +using namespace std::literals; +using namespace std::string_literals; + +namespace R = librados; +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; +namespace RCf = rgw::cls::fifo; + +auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); +const DoutPrefix dp(cct, 1, "test legacy cls fifo: "); + +namespace { +int fifo_create(const DoutPrefixProvider *dpp, R::IoCtx& ioctx, + const std::string& oid, + std::string_view id, + optional_yield y, + std::optional<fifo::objv> objv = std::nullopt, + std::optional<std::string_view> oid_prefix = std::nullopt, + bool exclusive = false, + std::uint64_t max_part_size = RCf::default_max_part_size, + std::uint64_t max_entry_size = RCf::default_max_entry_size) +{ + R::ObjectWriteOperation op; + RCf::create_meta(&op, id, objv, oid_prefix, exclusive, max_part_size, + max_entry_size); + return rgw_rados_operate(dpp, ioctx, oid, &op, y); +} +} + +class LegacyFIFO : public testing::Test { +protected: + const std::string pool_name = get_temp_pool_name(); + const std::string fifo_id = "fifo"; + R::Rados rados; + librados::IoCtx ioctx; + + void SetUp() override { + ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); + ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); + } + void TearDown() override { + destroy_one_pool_pp(pool_name, rados); + } +}; + +using LegacyClsFIFO = LegacyFIFO; +using AioLegacyFIFO = LegacyFIFO; + + +TEST_F(LegacyClsFIFO, TestCreate) +{ + auto r = fifo_create(&dp, ioctx, fifo_id, ""s, null_yield); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield, std::nullopt, + std::nullopt, false, 0); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield, + std::nullopt, std::nullopt, + false, RCf::default_max_part_size, 0); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield); + EXPECT_EQ(0, r); + std::uint64_t size; + ioctx.stat(fifo_id, &size, nullptr); + EXPECT_GT(size, 0); + /* test idempotency */ + r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield); + EXPECT_EQ(0, r); + r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield, std::nullopt, + std::nullopt, false); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(&dp, ioctx, fifo_id, {}, null_yield, std::nullopt, + "myprefix"sv, false); + EXPECT_EQ(-EINVAL, r); + r = fifo_create(&dp, ioctx, fifo_id, "foo"sv, null_yield, + std::nullopt, std::nullopt, false); + EXPECT_EQ(-EEXIST, r); +} + +TEST_F(LegacyClsFIFO, TestGetInfo) +{ + auto r = fifo_create(&dp, ioctx, fifo_id, fifo_id, null_yield); + fifo::info info; + std::uint32_t part_header_size; + std::uint32_t part_entry_overhead; + r = RCf::get_meta(&dp, ioctx, fifo_id, std::nullopt, &info, &part_header_size, + &part_entry_overhead, 0, null_yield); + EXPECT_EQ(0, r); + EXPECT_GT(part_header_size, 0); + EXPECT_GT(part_entry_overhead, 0); + EXPECT_FALSE(info.version.instance.empty()); + + r = RCf::get_meta(&dp, ioctx, fifo_id, info.version, &info, &part_header_size, + &part_entry_overhead, 0, null_yield); + EXPECT_EQ(0, r); + fifo::objv objv; + objv.instance = "foo"; + objv.ver = 12; + r = RCf::get_meta(&dp, ioctx, fifo_id, objv, &info, &part_header_size, + &part_entry_overhead, 0, null_yield); + EXPECT_EQ(-ECANCELED, r); +} + +TEST_F(LegacyFIFO, TestOpenDefault) +{ + std::unique_ptr<RCf::FIFO> fifo; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &fifo, null_yield); + ASSERT_EQ(0, r); + // force reading from backend + r = fifo->read_meta(&dp, null_yield); + EXPECT_EQ(0, r); + auto info = fifo->meta(); + EXPECT_EQ(info.id, fifo_id); +} + +TEST_F(LegacyFIFO, TestOpenParams) +{ + const std::uint64_t max_part_size = 10 * 1024; + const std::uint64_t max_entry_size = 128; + auto oid_prefix = "foo.123."sv; + fifo::objv objv; + objv.instance = "fooz"s; + objv.ver = 10; + + /* first successful create */ + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, objv, oid_prefix, + false, max_part_size, max_entry_size); + ASSERT_EQ(0, r); + + /* force reading from backend */ + r = f->read_meta(&dp, null_yield); + auto info = f->meta(); + EXPECT_EQ(info.id, fifo_id); + EXPECT_EQ(info.params.max_part_size, max_part_size); + EXPECT_EQ(info.params.max_entry_size, max_entry_size); + EXPECT_EQ(info.version, objv); +} + +namespace { +template<class T> +std::pair<T, std::string> decode_entry(const RCf::list_entry& entry) +{ + T val; + auto iter = entry.data.cbegin(); + decode(val, iter); + return std::make_pair(std::move(val), entry.marker); +} +} + + +TEST_F(LegacyFIFO, TestPushListTrim) +{ + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); + ASSERT_EQ(0, r); + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + r = f->push(&dp, bl, null_yield); + ASSERT_EQ(0, r); + } + + std::optional<std::string> marker; + /* get entries one by one */ + std::vector<RCf::list_entry> result; + bool more = false; + for (auto i = 0u; i < max_entries; ++i) { + + r = f->list(&dp, 1, marker, &result, &more, null_yield); + ASSERT_EQ(0, r); + + bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + ASSERT_EQ(1, result.size()); + + std::uint32_t val; + std::tie(val, marker) = decode_entry<std::uint32_t>(result.front()); + + ASSERT_EQ(i, val); + result.clear(); + } + + /* get all entries at once */ + std::string markers[max_entries]; + std::uint32_t min_entry = 0; + r = f->list(&dp, max_entries * 10, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + + ASSERT_FALSE(more); + ASSERT_EQ(max_entries, result.size()); + for (auto i = 0u; i < max_entries; ++i) { + std::uint32_t val; + std::tie(val, markers[i]) = decode_entry<std::uint32_t>(result[i]); + ASSERT_EQ(i, val); + } + + /* trim one entry */ + r = f->trim(&dp, markers[min_entry], false, null_yield); + ASSERT_EQ(0, r); + ++min_entry; + + r = f->list(&dp, max_entries * 10, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_FALSE(more); + ASSERT_EQ(max_entries - min_entry, result.size()); + + for (auto i = min_entry; i < max_entries; ++i) { + std::uint32_t val; + std::tie(val, markers[i - min_entry]) = + decode_entry<std::uint32_t>(result[i - min_entry]); + EXPECT_EQ(i, val); + } +} + + +TEST_F(LegacyFIFO, TestPushTooBig) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size + 1]; + memset(buf, 0, sizeof(buf)); + + cb::list bl; + bl.append(buf, sizeof(buf)); + + r = f->push(&dp, bl, null_yield); + EXPECT_EQ(-E2BIG, r); +} + + +TEST_F(LegacyFIFO, TestMultipleParts) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + const auto [part_header_size, part_entry_overhead] = + f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + /* push enough entries */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + r = f->push(&dp, bl, null_yield); + ASSERT_EQ(0, r); + } + + auto info = f->meta(); + ASSERT_EQ(info.id, fifo_id); + /* head should have advanced */ + ASSERT_GT(info.head_part_num, 0); + + /* list all at once */ + std::vector<RCf::list_entry> result; + bool more = false; + r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + EXPECT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + std::optional<std::string> marker; + /* get entries one by one */ + + for (auto i = 0u; i < max_entries; ++i) { + r = f->list(&dp, 1, marker, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + std::uint32_t val; + std::tie(val, marker) = decode_entry<std::uint32_t>(result.front()); + + auto& entry = result.front(); + auto& bl = entry.data; + ASSERT_EQ(i, *(int *)bl.c_str()); + marker = entry.marker; + } + + /* trim one at a time */ + marker.reset(); + for (auto i = 0u; i < max_entries; ++i) { + /* read single entry */ + r = f->list(&dp, 1, marker, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + marker = result.front().marker; + r = f->trim(&dp, *marker, false, null_yield); + ASSERT_EQ(0, r); + + /* check tail */ + info = f->meta(); + ASSERT_EQ(info.tail_part_num, i / entries_per_part); + + /* try to read all again, see how many entries left */ + r = f->list(&dp, max_entries, marker, &result, &more, null_yield); + ASSERT_EQ(max_entries - i - 1, result.size()); + ASSERT_EQ(false, more); + } + + /* tail now should point at head */ + info = f->meta(); + ASSERT_EQ(info.head_part_num, info.tail_part_num); + + RCf::part_info partinfo; + /* check old tails are removed */ + for (auto i = 0; i < info.tail_part_num; ++i) { + r = f->get_part_info(&dp, i, &partinfo, null_yield); + ASSERT_EQ(-ENOENT, r); + } + /* check current tail exists */ + r = f->get_part_info(&dp, info.tail_part_num, &partinfo, null_yield); + ASSERT_EQ(0, r); +} + +TEST_F(LegacyFIFO, TestTwoPushers) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + std::unique_ptr<RCf::FIFO> f2; + r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield); + std::vector fifos{&f, &f2}; + + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto& f = *fifos[i % fifos.size()]; + r = f->push(&dp, bl, null_yield); + ASSERT_EQ(0, r); + } + + /* list all by both */ + std::vector<RCf::list_entry> result; + bool more = false; + r = f2->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + r = f2->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } +} + +TEST_F(LegacyFIFO, TestTwoPushersTrim) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr<RCf::FIFO> f1; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f1, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + + std::unique_ptr<RCf::FIFO> f2; + r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield); + ASSERT_EQ(0, r); + + /* push one entry to f2 and the rest to f1 */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto& f = (i < 1 ? f2 : f1); + r = f->push(&dp, bl, null_yield); + ASSERT_EQ(0, r); + } + + /* trim half by fifo1 */ + auto num = max_entries / 2; + std::string marker; + std::vector<RCf::list_entry> result; + bool more = false; + r = f1->list(&dp, num, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(true, more); + ASSERT_EQ(num, result.size()); + + for (auto i = 0u; i < num; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + auto& entry = result[num - 1]; + marker = entry.marker; + r = f1->trim(&dp, marker, false, null_yield); + /* list what's left by fifo2 */ + + const auto left = max_entries - num; + f2->list(&dp, left, marker, &result, &more, null_yield); + ASSERT_EQ(left, result.size()); + ASSERT_EQ(false, more); + + for (auto i = num; i < max_entries; ++i) { + auto& bl = result[i - num].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } +} + +TEST_F(LegacyFIFO, TestPushBatch) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); + auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ + std::vector<cb::list> bufs; + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + bufs.push_back(bl); + } + ASSERT_EQ(max_entries, bufs.size()); + + r = f->push(&dp, bufs, null_yield); + ASSERT_EQ(0, r); + + /* list all */ + + std::vector<RCf::list_entry> result; + bool more = false; + r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + auto& info = f->meta(); + ASSERT_EQ(info.head_part_num, 4); +} + +TEST_F(LegacyFIFO, TestAioTrim) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + const auto [part_header_size, part_entry_overhead] = + f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + /* push enough entries */ + std::vector<cb::list> bufs; + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + bufs.push_back(std::move(bl)); + } + ASSERT_EQ(max_entries, bufs.size()); + + r = f->push(&dp, bufs, null_yield); + ASSERT_EQ(0, r); + + auto info = f->meta(); + ASSERT_EQ(info.id, fifo_id); + /* head should have advanced */ + ASSERT_GT(info.head_part_num, 0); + + /* list all at once */ + std::vector<RCf::list_entry> result; + bool more = false; + r = f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + std::optional<std::string> marker; + /* trim one at a time */ + result.clear(); + more = false; + marker.reset(); + for (auto i = 0u; i < max_entries; ++i) { + /* read single entry */ + r = f->list(&dp, 1, marker, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + marker = result.front().marker; + std::unique_ptr<R::AioCompletion> c(rados.aio_create_completion(nullptr, + nullptr)); + f->trim(&dp, *marker, false, c.get()); + c->wait_for_complete(); + r = c->get_return_value(); + ASSERT_EQ(0, r); + + /* check tail */ + info = f->meta(); + ASSERT_EQ(info.tail_part_num, i / entries_per_part); + + /* try to read all again, see how many entries left */ + r = f->list(&dp, max_entries, marker, &result, &more, null_yield); + ASSERT_EQ(max_entries - i - 1, result.size()); + ASSERT_EQ(false, more); + } + + /* tail now should point at head */ + info = f->meta(); + ASSERT_EQ(info.head_part_num, info.tail_part_num); + + RCf::part_info partinfo; + /* check old tails are removed */ + for (auto i = 0; i < info.tail_part_num; ++i) { + r = f->get_part_info(&dp, i, &partinfo, null_yield); + ASSERT_EQ(-ENOENT, r); + } + /* check current tail exists */ + r = f->get_part_info(&dp, info.tail_part_num, &partinfo, null_yield); + ASSERT_EQ(0, r); +} + +TEST_F(LegacyFIFO, TestTrimExclusive) { + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); + ASSERT_EQ(0, r); + std::vector<RCf::list_entry> result; + bool more = false; + + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + f->push(&dp, bl, null_yield); + } + + f->list(&dp, 1, std::nullopt, &result, &more, null_yield); + auto [val, marker] = decode_entry<std::uint32_t>(result.front()); + ASSERT_EQ(0, val); + f->trim(&dp, marker, true, null_yield); + + result.clear(); + f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); + std::tie(val, marker) = decode_entry<std::uint32_t>(result.front()); + ASSERT_EQ(0, val); + f->trim(&dp, result[4].marker, true, null_yield); + + result.clear(); + f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); + std::tie(val, marker) = decode_entry<std::uint32_t>(result.front()); + ASSERT_EQ(4, val); + f->trim(&dp, result.back().marker, true, null_yield); + + result.clear(); + f->list(&dp, max_entries, std::nullopt, &result, &more, null_yield); + std::tie(val, marker) = decode_entry<std::uint32_t>(result.front()); + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(max_entries - 1, val); +} + +TEST_F(AioLegacyFIFO, TestPushListTrim) +{ + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); + ASSERT_EQ(0, r); + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + auto c = R::Rados::aio_create_completion(); + f->push(&dp, bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + } + + std::optional<std::string> marker; + /* get entries one by one */ + std::vector<RCf::list_entry> result; + bool more = false; + for (auto i = 0u; i < max_entries; ++i) { + auto c = R::Rados::aio_create_completion(); + f->list(&dp, 1, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + + bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + ASSERT_EQ(1, result.size()); + + std::uint32_t val; + std::tie(val, marker) = decode_entry<std::uint32_t>(result.front()); + + ASSERT_EQ(i, val); + result.clear(); + } + + /* get all entries at once */ + std::string markers[max_entries]; + std::uint32_t min_entry = 0; + auto c = R::Rados::aio_create_completion(); + f->list(&dp, max_entries * 10, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + + ASSERT_FALSE(more); + ASSERT_EQ(max_entries, result.size()); + for (auto i = 0u; i < max_entries; ++i) { + std::uint32_t val; + std::tie(val, markers[i]) = decode_entry<std::uint32_t>(result[i]); + ASSERT_EQ(i, val); + } + + /* trim one entry */ + c = R::Rados::aio_create_completion(); + f->trim(&dp, markers[min_entry], false, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ++min_entry; + + c = R::Rados::aio_create_completion(); + f->list(&dp, max_entries * 10, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_FALSE(more); + ASSERT_EQ(max_entries - min_entry, result.size()); + + for (auto i = min_entry; i < max_entries; ++i) { + std::uint32_t val; + std::tie(val, markers[i - min_entry]) = + decode_entry<std::uint32_t>(result[i - min_entry]); + EXPECT_EQ(i, val); + } +} + + +TEST_F(AioLegacyFIFO, TestPushTooBig) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size + 1]; + memset(buf, 0, sizeof(buf)); + + cb::list bl; + bl.append(buf, sizeof(buf)); + + auto c = R::Rados::aio_create_completion(); + f->push(&dp, bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + ASSERT_EQ(-E2BIG, r); + c->release(); + + c = R::Rados::aio_create_completion(); + f->push(&dp, std::vector<cb::list>{}, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); +} + + +TEST_F(AioLegacyFIFO, TestMultipleParts) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + { + auto c = R::Rados::aio_create_completion(); + f->get_head_info(&dp, [&](int r, RCf::part_info&& p) { + ASSERT_EQ(0, p.magic); + ASSERT_EQ(0, p.min_ofs); + ASSERT_EQ(0, p.last_ofs); + ASSERT_EQ(0, p.next_ofs); + ASSERT_EQ(0, p.min_index); + ASSERT_EQ(0, p.max_index); + ASSERT_EQ(ceph::real_time{}, p.max_time); + }, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + } + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + const auto [part_header_size, part_entry_overhead] = + f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + /* push enough entries */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto c = R::Rados::aio_create_completion(); + f->push(&dp, bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + } + + auto info = f->meta(); + ASSERT_EQ(info.id, fifo_id); + /* head should have advanced */ + ASSERT_GT(info.head_part_num, 0); + + /* list all at once */ + std::vector<RCf::list_entry> result; + bool more = false; + auto c = R::Rados::aio_create_completion(); + f->list(&dp, max_entries, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + EXPECT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + std::optional<std::string> marker; + /* get entries one by one */ + + for (auto i = 0u; i < max_entries; ++i) { + c = R::Rados::aio_create_completion(); + f->list(&dp, 1, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + std::uint32_t val; + std::tie(val, marker) = decode_entry<std::uint32_t>(result.front()); + + auto& entry = result.front(); + auto& bl = entry.data; + ASSERT_EQ(i, *(int *)bl.c_str()); + marker = entry.marker; + } + + /* trim one at a time */ + marker.reset(); + for (auto i = 0u; i < max_entries; ++i) { + /* read single entry */ + c = R::Rados::aio_create_completion(); + f->list(&dp, 1, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + marker = result.front().marker; + c = R::Rados::aio_create_completion(); + f->trim(&dp, *marker, false, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + + /* check tail */ + info = f->meta(); + ASSERT_EQ(info.tail_part_num, i / entries_per_part); + + /* try to read all again, see how many entries left */ + c = R::Rados::aio_create_completion(); + f->list(&dp, max_entries, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + ASSERT_EQ(max_entries - i - 1, result.size()); + ASSERT_EQ(false, more); + } + + /* tail now should point at head */ + info = f->meta(); + ASSERT_EQ(info.head_part_num, info.tail_part_num); + + /* check old tails are removed */ + for (auto i = 0; i < info.tail_part_num; ++i) { + c = R::Rados::aio_create_completion(); + RCf::part_info partinfo; + f->get_part_info(i, &partinfo, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(-ENOENT, r); + } + /* check current tail exists */ + std::uint64_t next_ofs; + { + c = R::Rados::aio_create_completion(); + RCf::part_info partinfo; + f->get_part_info(info.tail_part_num, &partinfo, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + next_ofs = partinfo.next_ofs; + } + ASSERT_EQ(0, r); + + c = R::Rados::aio_create_completion(); + f->get_head_info(&dp, [&](int r, RCf::part_info&& p) { + ASSERT_EQ(next_ofs, p.next_ofs); + }, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); +} + +TEST_F(AioLegacyFIFO, TestTwoPushers) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + std::unique_ptr<RCf::FIFO> f2; + r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield); + std::vector fifos{&f, &f2}; + + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto& f = *fifos[i % fifos.size()]; + auto c = R::Rados::aio_create_completion(); + f->push(&dp, bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + } + + /* list all by both */ + std::vector<RCf::list_entry> result; + bool more = false; + auto c = R::Rados::aio_create_completion(); + f2->list(&dp, max_entries, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + c = R::Rados::aio_create_completion(); + f2->list(&dp, max_entries, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } +} + +TEST_F(AioLegacyFIFO, TestTwoPushersTrim) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr<RCf::FIFO> f1; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f1, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + + std::unique_ptr<RCf::FIFO> f2; + r = RCf::FIFO::open(&dp, ioctx, fifo_id, &f2, null_yield); + ASSERT_EQ(0, r); + + /* push one entry to f2 and the rest to f1 */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto& f = (i < 1 ? f2 : f1); + auto c = R::Rados::aio_create_completion(); + f->push(&dp, bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + } + + /* trim half by fifo1 */ + auto num = max_entries / 2; + std::string marker; + std::vector<RCf::list_entry> result; + bool more = false; + auto c = R::Rados::aio_create_completion(); + f1->list(&dp, num, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(true, more); + ASSERT_EQ(num, result.size()); + + for (auto i = 0u; i < num; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + auto& entry = result[num - 1]; + marker = entry.marker; + c = R::Rados::aio_create_completion(); + f1->trim(&dp, marker, false, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + /* list what's left by fifo2 */ + + const auto left = max_entries - num; + c = R::Rados::aio_create_completion(); + f2->list(&dp, left, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(left, result.size()); + ASSERT_EQ(false, more); + + for (auto i = num; i < max_entries; ++i) { + auto& bl = result[i - num].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } +} + +TEST_F(AioLegacyFIFO, TestPushBatch) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); + auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ + std::vector<cb::list> bufs; + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + bufs.push_back(bl); + } + ASSERT_EQ(max_entries, bufs.size()); + + auto c = R::Rados::aio_create_completion(); + f->push(&dp, bufs, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + + /* list all */ + + std::vector<RCf::list_entry> result; + bool more = false; + c = R::Rados::aio_create_completion(); + f->list(&dp, max_entries, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + auto& info = f->meta(); + ASSERT_EQ(info.head_part_num, 4); +} + +TEST_F(LegacyFIFO, TrimAll) +{ + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); + ASSERT_EQ(0, r); + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + r = f->push(&dp, bl, null_yield); + ASSERT_EQ(0, r); + } + + /* trim one entry */ + r = f->trim(&dp, RCf::marker::max().to_string(), false, null_yield); + ASSERT_EQ(-ENODATA, r); + + std::vector<RCf::list_entry> result; + bool more; + r = f->list(&dp, 1, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_TRUE(result.empty()); +} + +TEST_F(LegacyFIFO, AioTrimAll) +{ + std::unique_ptr<RCf::FIFO> f; + auto r = RCf::FIFO::create(&dp, ioctx, fifo_id, &f, null_yield); + ASSERT_EQ(0, r); + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + r = f->push(&dp, bl, null_yield); + ASSERT_EQ(0, r); + } + + auto c = R::Rados::aio_create_completion(); + f->trim(&dp, RCf::marker::max().to_string(), false, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(-ENODATA, r); + + std::vector<RCf::list_entry> result; + bool more; + r = f->list(&dp, 1, std::nullopt, &result, &more, null_yield); + ASSERT_EQ(0, r); + ASSERT_TRUE(result.empty()); +} |