diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/test/journal | |
parent | Initial commit. (diff) | |
download | ceph-upstream/16.2.11+ds.tar.xz ceph-upstream/16.2.11+ds.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/journal')
-rw-r--r-- | src/test/journal/CMakeLists.txt | 32 | ||||
-rw-r--r-- | src/test/journal/RadosTestFixture.cc | 133 | ||||
-rw-r--r-- | src/test/journal/RadosTestFixture.h | 74 | ||||
-rw-r--r-- | src/test/journal/mock/MockJournaler.cc | 16 | ||||
-rw-r--r-- | src/test/journal/mock/MockJournaler.h | 313 | ||||
-rw-r--r-- | src/test/journal/test_Entry.cc | 96 | ||||
-rw-r--r-- | src/test/journal/test_FutureImpl.cc | 268 | ||||
-rw-r--r-- | src/test/journal/test_JournalMetadata.cc | 210 | ||||
-rw-r--r-- | src/test/journal/test_JournalPlayer.cc | 994 | ||||
-rw-r--r-- | src/test/journal/test_JournalRecorder.cc | 174 | ||||
-rw-r--r-- | src/test/journal/test_JournalTrimmer.cc | 197 | ||||
-rw-r--r-- | src/test/journal/test_Journaler.cc | 198 | ||||
-rw-r--r-- | src/test/journal/test_ObjectPlayer.cc | 281 | ||||
-rw-r--r-- | src/test/journal/test_ObjectRecorder.cc | 463 | ||||
-rw-r--r-- | src/test/journal/test_main.cc | 27 |
15 files changed, 3476 insertions, 0 deletions
diff --git a/src/test/journal/CMakeLists.txt b/src/test/journal/CMakeLists.txt new file mode 100644 index 000000000..99e0f8ae6 --- /dev/null +++ b/src/test/journal/CMakeLists.txt @@ -0,0 +1,32 @@ +add_library(journal_test_mock STATIC mock/MockJournaler.cc) +target_link_libraries(journal_test_mock + PUBLIC GMock::GMock) + +# unittest_journal +set(unittest_journal_srcs + test_main.cc + test_Entry.cc + test_FutureImpl.cc + test_Journaler.cc + test_JournalMetadata.cc + test_JournalPlayer.cc + test_JournalRecorder.cc + test_JournalTrimmer.cc + test_ObjectPlayer.cc + test_ObjectRecorder.cc + RadosTestFixture.cc + ) + +add_executable(unittest_journal + ${unittest_journal_srcs} + ) +add_ceph_unittest(unittest_journal) +target_link_libraries(unittest_journal + journal + cls_journal + cls_journal_client + rados_test_stub + librados + radostest-cxx + global + ) diff --git a/src/test/journal/RadosTestFixture.cc b/src/test/journal/RadosTestFixture.cc new file mode 100644 index 000000000..50e35e871 --- /dev/null +++ b/src/test/journal/RadosTestFixture.cc @@ -0,0 +1,133 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librados/test_cxx.h" +#include "test/journal/RadosTestFixture.h" +#include "cls/journal/cls_journal_client.h" +#include "include/stringify.h" +#include "common/WorkQueue.h" +#include "journal/Settings.h" + +RadosTestFixture::RadosTestFixture() + : m_timer_lock(ceph::make_mutex("m_timer_lock")), + m_listener(this) { +} + +void RadosTestFixture::SetUpTestCase() { + _pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(_pool_name, _rados)); + + CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct()); + _thread_pool = new ThreadPool(cct, "RadosTestFixture::_thread_pool", + "tp_test", 1); + _thread_pool->start(); +} + +void RadosTestFixture::TearDownTestCase() { + _thread_pool->stop(); + delete _thread_pool; + + ASSERT_EQ(0, destroy_one_pool_pp(_pool_name, _rados)); +} + +std::string RadosTestFixture::get_temp_oid() { + ++_oid_number; + return "oid" + stringify(_oid_number); +} + +void RadosTestFixture::SetUp() { + ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx)); + + CephContext* cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); + m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue", + ceph::make_timespan(60), + _thread_pool); + + m_timer = new SafeTimer(cct, m_timer_lock, true); + m_timer->init(); +} + +void RadosTestFixture::TearDown() { + for (auto metadata : m_metadatas) { + C_SaferCond ctx; + metadata->shut_down(&ctx); + ASSERT_EQ(0, ctx.wait()); + } + + { + std::lock_guard locker{m_timer_lock}; + m_timer->shutdown(); + } + delete m_timer; + + m_work_queue->drain(); + delete m_work_queue; +} + +int RadosTestFixture::create(const std::string &oid, uint8_t order, + uint8_t splay_width) { + return cls::journal::client::create(m_ioctx, oid, order, splay_width, -1); +} + +ceph::ref_t<journal::JournalMetadata> RadosTestFixture::create_metadata( + const std::string &oid, const std::string &client_id, + double commit_interval, int max_concurrent_object_sets) { + journal::Settings settings; + settings.commit_interval = commit_interval; + settings.max_concurrent_object_sets = max_concurrent_object_sets; + + auto metadata = ceph::make_ref<journal::JournalMetadata>( + m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings); + m_metadatas.push_back(metadata); + return metadata; +} + +int RadosTestFixture::append(const std::string &oid, const bufferlist &bl) { + librados::ObjectWriteOperation op; + op.append(bl); + return m_ioctx.operate(oid, &op); +} + +int RadosTestFixture::client_register(const std::string &oid, + const std::string &id, + const std::string &description) { + bufferlist data; + data.append(description); + return cls::journal::client::client_register(m_ioctx, oid, id, data); +} + +int RadosTestFixture::client_commit(const std::string &oid, + const std::string &id, + const cls::journal::ObjectSetPosition &commit_position) { + librados::ObjectWriteOperation op; + cls::journal::client::client_commit(&op, id, commit_position); + return m_ioctx.operate(oid, &op); +} + +bufferlist RadosTestFixture::create_payload(const std::string &payload) { + bufferlist bl; + bl.append(payload); + return bl; +} + +int RadosTestFixture::init_metadata(const ceph::ref_t<journal::JournalMetadata>& metadata) { + C_SaferCond cond; + metadata->init(&cond); + return cond.wait(); +} + +bool RadosTestFixture::wait_for_update(const ceph::ref_t<journal::JournalMetadata>& metadata) { + std::unique_lock locker{m_listener.mutex}; + while (m_listener.updates[metadata.get()] == 0) { + if (m_listener.cond.wait_for(locker, 10s) == std::cv_status::timeout) { + return false; + } + } + --m_listener.updates[metadata.get()]; + return true; +} + +std::string RadosTestFixture::_pool_name; +librados::Rados RadosTestFixture::_rados; +uint64_t RadosTestFixture::_oid_number = 0; +ThreadPool *RadosTestFixture::_thread_pool = nullptr; diff --git a/src/test/journal/RadosTestFixture.h b/src/test/journal/RadosTestFixture.h new file mode 100644 index 000000000..8ec662931 --- /dev/null +++ b/src/test/journal/RadosTestFixture.h @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librados/test.h" +#include "common/ceph_mutex.h" +#include "common/Timer.h" +#include "journal/JournalMetadata.h" +#include "cls/journal/cls_journal_types.h" +#include "gtest/gtest.h" + +class ThreadPool; + +class RadosTestFixture : public ::testing::Test { +public: + static void SetUpTestCase(); + static void TearDownTestCase(); + + static std::string get_temp_oid(); + + RadosTestFixture(); + void SetUp() override; + void TearDown() override; + + int create(const std::string &oid, uint8_t order = 14, + uint8_t splay_width = 2); + ceph::ref_t<journal::JournalMetadata> create_metadata(const std::string &oid, + const std::string &client_id = "client", + double commit_internal = 0.1, + int max_concurrent_object_sets = 0); + int append(const std::string &oid, const bufferlist &bl); + + int client_register(const std::string &oid, const std::string &id = "client", + const std::string &description = ""); + int client_commit(const std::string &oid, const std::string &id, + const cls::journal::ObjectSetPosition &commit_position); + + bufferlist create_payload(const std::string &payload); + + struct Listener : public journal::JournalMetadataListener { + RadosTestFixture *test_fixture; + ceph::mutex mutex = ceph::make_mutex("mutex"); + ceph::condition_variable cond; + std::map<journal::JournalMetadata*, uint32_t> updates; + + Listener(RadosTestFixture *_test_fixture) + : test_fixture(_test_fixture) {} + + void handle_update(journal::JournalMetadata *metadata) override { + std::lock_guard locker{mutex}; + ++updates[metadata]; + cond.notify_all(); + } + }; + + int init_metadata(const ceph::ref_t<journal::JournalMetadata>& metadata); + + bool wait_for_update(const ceph::ref_t<journal::JournalMetadata>& metadata); + + static std::string _pool_name; + static librados::Rados _rados; + static uint64_t _oid_number; + static ThreadPool *_thread_pool; + + librados::IoCtx m_ioctx; + + ContextWQ *m_work_queue = nullptr; + + ceph::mutex m_timer_lock; + SafeTimer *m_timer = nullptr; + + Listener m_listener; + + std::list<ceph::ref_t<journal::JournalMetadata>> m_metadatas; +}; diff --git a/src/test/journal/mock/MockJournaler.cc b/src/test/journal/mock/MockJournaler.cc new file mode 100644 index 000000000..90649440d --- /dev/null +++ b/src/test/journal/mock/MockJournaler.cc @@ -0,0 +1,16 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "MockJournaler.h" + +namespace journal { + +MockFuture *MockFuture::s_instance = nullptr; +MockReplayEntry *MockReplayEntry::s_instance = nullptr; +MockJournaler *MockJournaler::s_instance = nullptr; + +std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &) { + return os; +} + +} // namespace journal diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h new file mode 100644 index 000000000..d4e0f6c2a --- /dev/null +++ b/src/test/journal/mock/MockJournaler.h @@ -0,0 +1,313 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef TEST_RBD_MIRROR_MOCK_JOURNALER_H +#define TEST_RBD_MIRROR_MOCK_JOURNALER_H + +#include <gmock/gmock.h> +#include "include/int_types.h" +#include "include/rados/librados.hpp" +#include "cls/journal/cls_journal_types.h" +#include "journal/Journaler.h" +#include <iosfwd> +#include <string> + +class Context; + +namespace journal { + +struct ReplayHandler; +struct Settings; + +struct MockFuture { + static MockFuture *s_instance; + static MockFuture &get_instance() { + ceph_assert(s_instance != nullptr); + return *s_instance; + } + + MockFuture() { + s_instance = this; + } + + MOCK_CONST_METHOD0(is_valid, bool()); + MOCK_METHOD1(flush, void(Context *)); + MOCK_METHOD1(wait, void(Context *)); +}; + +struct MockFutureProxy { + bool is_valid() const { + return MockFuture::get_instance().is_valid(); + } + + void flush(Context *on_safe) { + MockFuture::get_instance().flush(on_safe); + } + + void wait(Context *on_safe) { + MockFuture::get_instance().wait(on_safe); + } +}; + +struct MockReplayEntry { + static MockReplayEntry *s_instance; + static MockReplayEntry &get_instance() { + ceph_assert(s_instance != nullptr); + return *s_instance; + } + + MockReplayEntry() { + s_instance = this; + } + + MOCK_CONST_METHOD0(get_commit_tid, uint64_t()); + MOCK_CONST_METHOD0(get_data, bufferlist()); +}; + +struct MockReplayEntryProxy { + uint64_t get_commit_tid() const { + return MockReplayEntry::get_instance().get_commit_tid(); + } + + bufferlist get_data() const { + return MockReplayEntry::get_instance().get_data(); + } +}; + +struct MockJournaler { + static MockJournaler *s_instance; + static MockJournaler &get_instance() { + ceph_assert(s_instance != nullptr); + return *s_instance; + } + + MockJournaler() { + s_instance = this; + } + + MOCK_METHOD0(construct, void()); + + MOCK_METHOD1(init, void(Context *)); + MOCK_METHOD0(shut_down, void()); + MOCK_METHOD1(shut_down, void(Context *)); + MOCK_CONST_METHOD0(is_initialized, bool()); + + MOCK_METHOD3(get_metadata, void(uint8_t *order, uint8_t *splay_width, + int64_t *pool_id)); + MOCK_METHOD4(get_mutable_metadata, void(uint64_t*, uint64_t*, + std::set<cls::journal::Client> *, + Context*)); + + MOCK_METHOD2(register_client, void(const bufferlist &, Context *)); + MOCK_METHOD1(unregister_client, void(Context *)); + MOCK_METHOD3(get_client, void(const std::string &, cls::journal::Client *, + Context *)); + MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*)); + MOCK_METHOD2(update_client, void(const bufferlist &, Context *)); + + MOCK_METHOD4(allocate_tag, void(uint64_t, const bufferlist &, + cls::journal::Tag*, Context *)); + MOCK_METHOD3(get_tag, void(uint64_t, cls::journal::Tag *, Context *)); + MOCK_METHOD3(get_tags, void(uint64_t, journal::Journaler::Tags*, Context*)); + MOCK_METHOD4(get_tags, void(uint64_t, uint64_t, journal::Journaler::Tags*, + Context*)); + + MOCK_METHOD1(start_replay, void(::journal::ReplayHandler *replay_handler)); + MOCK_METHOD2(start_live_replay, void(ReplayHandler *, double)); + MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *)); + MOCK_METHOD2(try_pop_front, bool(MockReplayEntryProxy *, uint64_t *)); + MOCK_METHOD0(stop_replay, void()); + MOCK_METHOD1(stop_replay, void(Context *on_finish)); + + MOCK_METHOD1(start_append, void(uint64_t)); + MOCK_METHOD3(set_append_batch_options, void(int, uint64_t, double)); + MOCK_CONST_METHOD0(get_max_append_size, uint64_t()); + MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id, + const bufferlist &bl)); + MOCK_METHOD1(flush, void(Context *on_safe)); + MOCK_METHOD1(stop_append, void(Context *on_safe)); + + MOCK_METHOD1(committed, void(const MockReplayEntryProxy &)); + MOCK_METHOD1(committed, void(const MockFutureProxy &future)); + MOCK_METHOD1(flush_commit_position, void(Context*)); + + MOCK_METHOD1(add_listener, void(JournalMetadataListener *)); + MOCK_METHOD1(remove_listener, void(JournalMetadataListener *)); + +}; + +struct MockJournalerProxy { + MockJournalerProxy() { + MockJournaler::get_instance().construct(); + } + + template <typename IoCtxT> + MockJournalerProxy(IoCtxT &header_ioctx, const std::string &, + const std::string &, const Settings&, + journal::CacheManagerHandler *) { + MockJournaler::get_instance().construct(); + } + + template <typename WorkQueue, typename Timer> + MockJournalerProxy(WorkQueue *work_queue, Timer *timer, ceph::mutex *timer_lock, + librados::IoCtx &header_ioctx, + const std::string &journal_id, + const std::string &client_id, const Settings&, + journal::CacheManagerHandler *) { + MockJournaler::get_instance().construct(); + } + + void exists(Context *on_finish) const { + on_finish->complete(-EINVAL); + } + void create(uint8_t order, uint8_t splay_width, int64_t pool_id, Context *on_finish) { + on_finish->complete(-EINVAL); + } + void remove(bool force, Context *on_finish) { + on_finish->complete(-EINVAL); + } + int register_client(const bufferlist &data) { + return -EINVAL; + } + + void allocate_tag(uint64_t tag_class, const bufferlist &tag_data, + cls::journal::Tag* tag, Context *on_finish) { + MockJournaler::get_instance().allocate_tag(tag_class, tag_data, tag, + on_finish); + } + + void init(Context *on_finish) { + MockJournaler::get_instance().init(on_finish); + } + void shut_down() { + MockJournaler::get_instance().shut_down(); + } + void shut_down(Context *on_finish) { + MockJournaler::get_instance().shut_down(on_finish); + } + bool is_initialized() const { + return MockJournaler::get_instance().is_initialized(); + } + + void get_metadata(uint8_t *order, uint8_t *splay_width, int64_t *pool_id) { + MockJournaler::get_instance().get_metadata(order, splay_width, pool_id); + } + + void get_mutable_metadata(uint64_t *min, uint64_t *active, + std::set<cls::journal::Client> *clients, + Context *on_finish) { + MockJournaler::get_instance().get_mutable_metadata(min, active, clients, + on_finish); + } + + void register_client(const bufferlist &data, Context *on_finish) { + MockJournaler::get_instance().register_client(data, on_finish); + } + + void unregister_client(Context *on_finish) { + MockJournaler::get_instance().unregister_client(on_finish); + } + + void get_client(const std::string &client_id, cls::journal::Client *client, + Context *on_finish) { + MockJournaler::get_instance().get_client(client_id, client, on_finish); + } + + int get_cached_client(const std::string& client_id, + cls::journal::Client* client) { + return MockJournaler::get_instance().get_cached_client(client_id, client); + } + + void update_client(const bufferlist &client_data, Context *on_finish) { + MockJournaler::get_instance().update_client(client_data, on_finish); + } + + void get_tag(uint64_t tag_tid, cls::journal::Tag *tag, Context *on_finish) { + MockJournaler::get_instance().get_tag(tag_tid, tag, on_finish); + } + + void get_tags(uint64_t tag_class, journal::Journaler::Tags *tags, + Context *on_finish) { + MockJournaler::get_instance().get_tags(tag_class, tags, on_finish); + } + void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, + journal::Journaler::Tags *tags, Context *on_finish) { + MockJournaler::get_instance().get_tags(start_after_tag_tid, tag_class, tags, + on_finish); + } + + void start_replay(::journal::ReplayHandler *replay_handler) { + MockJournaler::get_instance().start_replay(replay_handler); + } + + void start_live_replay(ReplayHandler *handler, double interval) { + MockJournaler::get_instance().start_live_replay(handler, interval); + } + + bool try_pop_front(MockReplayEntryProxy *replay_entry) { + return MockJournaler::get_instance().try_pop_front(replay_entry); + } + + bool try_pop_front(MockReplayEntryProxy *entry, uint64_t *tag_tid) { + return MockJournaler::get_instance().try_pop_front(entry, tag_tid); + } + + void stop_replay() { + MockJournaler::get_instance().stop_replay(); + } + void stop_replay(Context *on_finish) { + MockJournaler::get_instance().stop_replay(on_finish); + } + + void start_append(uint64_t max_in_flight_appends) { + MockJournaler::get_instance().start_append(max_in_flight_appends); + } + + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age) { + MockJournaler::get_instance().set_append_batch_options( + flush_interval, flush_bytes, flush_age); + } + + uint64_t get_max_append_size() const { + return MockJournaler::get_instance().get_max_append_size(); + } + + MockFutureProxy append(uint64_t tag_id, const bufferlist &bl) { + return MockJournaler::get_instance().append(tag_id, bl); + } + + void flush(Context *on_safe) { + MockJournaler::get_instance().flush(on_safe); + } + + void stop_append(Context *on_safe) { + MockJournaler::get_instance().stop_append(on_safe); + } + + void committed(const MockReplayEntryProxy &entry) { + MockJournaler::get_instance().committed(entry); + } + + void committed(const MockFutureProxy &future) { + MockJournaler::get_instance().committed(future); + } + + void flush_commit_position(Context *on_finish) { + MockJournaler::get_instance().flush_commit_position(on_finish); + } + + void add_listener(JournalMetadataListener *listener) { + MockJournaler::get_instance().add_listener(listener); + } + + void remove_listener(JournalMetadataListener *listener) { + MockJournaler::get_instance().remove_listener(listener); + } +}; + +std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &); + +} // namespace journal + +#endif // TEST_RBD_MIRROR_MOCK_JOURNALER_H diff --git a/src/test/journal/test_Entry.cc b/src/test/journal/test_Entry.cc new file mode 100644 index 000000000..1fa3136f7 --- /dev/null +++ b/src/test/journal/test_Entry.cc @@ -0,0 +1,96 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Entry.h" +#include "gtest/gtest.h" + +class TestEntry : public ::testing::Test { +}; + +TEST_F(TestEntry, DefaultConstructor) { + journal::Entry entry; + ASSERT_EQ(0U, entry.get_entry_tid()); + ASSERT_EQ(0U, entry.get_tag_tid()); + + bufferlist data(entry.get_data()); + bufferlist expected_data; + ASSERT_TRUE(data.contents_equal(expected_data)); +} + +TEST_F(TestEntry, Constructor) { + bufferlist data; + data.append("data"); + journal::Entry entry(234, 123, data); + + data.clear(); + data = entry.get_data(); + + bufferlist expected_data; + expected_data.append("data"); + + ASSERT_EQ(123U, entry.get_entry_tid()); + ASSERT_EQ(234U, entry.get_tag_tid()); + ASSERT_TRUE(data.contents_equal(expected_data)); +} + +TEST_F(TestEntry, IsReadable) { + bufferlist data; + data.append("data"); + journal::Entry entry(234, 123, data); + + bufferlist full_bl; + encode(entry, full_bl); + + uint32_t bytes_needed; + for (size_t i = 0; i < full_bl.length() - 1; ++i) { + bufferlist partial_bl; + if (i > 0) { + partial_bl.substr_of(full_bl, 0, i); + } + ASSERT_FALSE(journal::Entry::is_readable(partial_bl.begin(), + &bytes_needed)); + ASSERT_GT(bytes_needed, 0U); + } + ASSERT_TRUE(journal::Entry::is_readable(full_bl.begin(), &bytes_needed)); + ASSERT_EQ(0U, bytes_needed); +} + +TEST_F(TestEntry, IsReadableBadPreamble) { + bufferlist data; + data.append("data"); + journal::Entry entry(234, 123, data); + + uint64_t stray_bytes = 0x1122334455667788; + bufferlist full_bl; + encode(stray_bytes, full_bl); + encode(entry, full_bl); + + uint32_t bytes_needed; + bufferlist::iterator it = full_bl.begin(); + ASSERT_FALSE(journal::Entry::is_readable(it, &bytes_needed)); + ASSERT_EQ(0U, bytes_needed); + + it += sizeof(stray_bytes); + ASSERT_TRUE(journal::Entry::is_readable(it, &bytes_needed)); + ASSERT_EQ(0U, bytes_needed); +} + +TEST_F(TestEntry, IsReadableBadCRC) { + bufferlist data; + data.append("data"); + journal::Entry entry(234, 123, data); + + bufferlist full_bl; + encode(entry, full_bl); + + bufferlist bad_bl; + bad_bl.substr_of(full_bl, 0, full_bl.length() - 4); + encode(full_bl.crc32c(1), bad_bl); + + uint32_t bytes_needed; + ASSERT_FALSE(journal::Entry::is_readable(bad_bl.begin(), &bytes_needed)); + ASSERT_EQ(0U, bytes_needed); + + + +} diff --git a/src/test/journal/test_FutureImpl.cc b/src/test/journal/test_FutureImpl.cc new file mode 100644 index 000000000..1ff346dcf --- /dev/null +++ b/src/test/journal/test_FutureImpl.cc @@ -0,0 +1,268 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/FutureImpl.h" +#include "common/Cond.h" +#include "gtest/gtest.h" +#include "test/journal/RadosTestFixture.h" + +class TestFutureImpl : public RadosTestFixture { +public: + struct FlushHandler : public journal::FutureImpl::FlushHandler { + uint64_t flushes = 0; + void flush(const ceph::ref_t<journal::FutureImpl>& future) override { + ++flushes; + } + FlushHandler() = default; + }; + + TestFutureImpl() { + m_flush_handler = std::make_shared<FlushHandler>(); + } + + auto create_future(uint64_t tag_tid, uint64_t entry_tid, + uint64_t commit_tid, + ceph::ref_t<journal::FutureImpl> prev = nullptr) { + auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, commit_tid); + future->init(prev); + return future; + } + + void flush(const ceph::ref_t<journal::FutureImpl>& future) { + } + + std::shared_ptr<FlushHandler> m_flush_handler; +}; + +TEST_F(TestFutureImpl, Getters) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 123, 456); + ASSERT_EQ(234U, future->get_tag_tid()); + ASSERT_EQ(123U, future->get_entry_tid()); + ASSERT_EQ(456U, future->get_commit_tid()); +} + +TEST_F(TestFutureImpl, Attach) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); + ASSERT_EQ(2U, m_flush_handler.use_count()); +} + +TEST_F(TestFutureImpl, AttachWithPendingFlush) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 123, 456); + future->flush(NULL); + + ASSERT_TRUE(future->attach(m_flush_handler)); + ASSERT_EQ(2U, m_flush_handler.use_count()); +} + +TEST_F(TestFutureImpl, Detach) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); + future->detach(); + ASSERT_EQ(1U, m_flush_handler.use_count()); +} + +TEST_F(TestFutureImpl, DetachImplicit) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); + future.reset(); + ASSERT_EQ(1U, m_flush_handler.use_count()); +} + +TEST_F(TestFutureImpl, Flush) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); + + C_SaferCond cond; + future->flush(&cond); + + ASSERT_EQ(1U, m_flush_handler->flushes); + future->safe(-EIO); + ASSERT_EQ(-EIO, cond.wait()); +} + +TEST_F(TestFutureImpl, FlushWithoutContext) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 123, 456); + ASSERT_FALSE(future->attach(m_flush_handler)); + + future->flush(NULL); + ASSERT_EQ(1U, m_flush_handler->flushes); + future->safe(-EIO); + ASSERT_TRUE(future->is_complete()); + ASSERT_EQ(-EIO, future->get_return_value()); +} + +TEST_F(TestFutureImpl, FlushChain) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future1 = create_future(234, 123, 456); + auto future2 = create_future(234, 124, 457, future1); + auto future3 = create_future(235, 1, 458, future2); + + auto flush_handler = std::make_shared<FlushHandler>(); + ASSERT_FALSE(future1->attach(m_flush_handler)); + ASSERT_FALSE(future2->attach(flush_handler)); + ASSERT_FALSE(future3->attach(m_flush_handler)); + + C_SaferCond cond; + future3->flush(&cond); + + ASSERT_EQ(1U, m_flush_handler->flushes); + ASSERT_EQ(1U, flush_handler->flushes); + + future3->safe(0); + ASSERT_FALSE(future3->is_complete()); + + future1->safe(0); + ASSERT_FALSE(future3->is_complete()); + + future2->safe(-EIO); + ASSERT_TRUE(future3->is_complete()); + ASSERT_EQ(-EIO, future3->get_return_value()); + ASSERT_EQ(-EIO, cond.wait()); + ASSERT_EQ(0, future1->get_return_value()); +} + +TEST_F(TestFutureImpl, FlushInProgress) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future1 = create_future(234, 123, 456); + auto future2 = create_future(234, 124, 457, future1); + ASSERT_FALSE(future1->attach(m_flush_handler)); + ASSERT_FALSE(future2->attach(m_flush_handler)); + + future1->set_flush_in_progress(); + ASSERT_TRUE(future1->is_flush_in_progress()); + + future1->flush(NULL); + ASSERT_EQ(0U, m_flush_handler->flushes); + + future1->safe(0); +} + +TEST_F(TestFutureImpl, FlushAlreadyComplete) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 123, 456); + future->safe(-EIO); + + C_SaferCond cond; + future->flush(&cond); + ASSERT_EQ(-EIO, cond.wait()); +} + +TEST_F(TestFutureImpl, Wait) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 1, 456); + + C_SaferCond cond; + future->wait(&cond); + future->safe(-EEXIST); + ASSERT_EQ(-EEXIST, cond.wait()); +} + +TEST_F(TestFutureImpl, WaitAlreadyComplete) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future = create_future(234, 1, 456); + future->safe(-EEXIST); + + C_SaferCond cond; + future->wait(&cond); + ASSERT_EQ(-EEXIST, cond.wait()); +} + +TEST_F(TestFutureImpl, SafePreservesError) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future1 = create_future(234, 123, 456); + auto future2 = create_future(234, 124, 457, future1); + + future1->safe(-EIO); + future2->safe(-EEXIST); + ASSERT_TRUE(future2->is_complete()); + ASSERT_EQ(-EIO, future2->get_return_value()); +} + +TEST_F(TestFutureImpl, ConsistentPreservesError) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + auto future1 = create_future(234, 123, 456); + auto future2 = create_future(234, 124, 457, future1); + + future2->safe(-EEXIST); + future1->safe(-EIO); + ASSERT_TRUE(future2->is_complete()); + ASSERT_EQ(-EEXIST, future2->get_return_value()); +} diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc new file mode 100644 index 000000000..4108d4da3 --- /dev/null +++ b/src/test/journal/test_JournalMetadata.cc @@ -0,0 +1,210 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalMetadata.h" +#include "test/journal/RadosTestFixture.h" +#include "common/Cond.h" +#include <map> + +class TestJournalMetadata : public RadosTestFixture { +public: + void TearDown() override { + for (MetadataList::iterator it = m_metadata_list.begin(); + it != m_metadata_list.end(); ++it) { + (*it)->remove_listener(&m_listener); + } + m_metadata_list.clear(); + + RadosTestFixture::TearDown(); + } + + auto create_metadata(const std::string &oid, + const std::string &client_id, + double commit_interval = 0.1, + int max_concurrent_object_sets = 0) { + auto metadata = RadosTestFixture::create_metadata( + oid, client_id, commit_interval, max_concurrent_object_sets); + m_metadata_list.push_back(metadata); + metadata->add_listener(&m_listener); + return metadata; + } + + typedef std::list<ceph::ref_t<journal::JournalMetadata>> MetadataList; + MetadataList m_metadata_list; +}; + +TEST_F(TestJournalMetadata, JournalDNE) { + std::string oid = get_temp_oid(); + + auto metadata1 = create_metadata(oid, "client1"); + ASSERT_EQ(-ENOENT, init_metadata(metadata1)); +} + +TEST_F(TestJournalMetadata, ClientDNE) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid, 14, 2)); + ASSERT_EQ(0, client_register(oid, "client1", "")); + + auto metadata1 = create_metadata(oid, "client1"); + ASSERT_EQ(0, init_metadata(metadata1)); + + auto metadata2 = create_metadata(oid, "client2"); + ASSERT_EQ(-ENOENT, init_metadata(metadata2)); +} + +TEST_F(TestJournalMetadata, Committed) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid, 14, 2)); + ASSERT_EQ(0, client_register(oid, "client1", "")); + + auto metadata1 = create_metadata(oid, "client1", 600); + ASSERT_EQ(0, init_metadata(metadata1)); + + auto metadata2 = create_metadata(oid, "client1"); + ASSERT_EQ(0, init_metadata(metadata2)); + ASSERT_TRUE(wait_for_update(metadata2)); + + journal::JournalMetadata::ObjectSetPosition expect_commit_position; + journal::JournalMetadata::ObjectSetPosition read_commit_position; + metadata1->get_commit_position(&read_commit_position); + ASSERT_EQ(expect_commit_position, read_commit_position); + + uint64_t commit_tid1 = metadata1->allocate_commit_tid(0, 0, 0); + uint64_t commit_tid2 = metadata1->allocate_commit_tid(0, 1, 0); + uint64_t commit_tid3 = metadata1->allocate_commit_tid(1, 0, 1); + uint64_t commit_tid4 = metadata1->allocate_commit_tid(0, 0, 2); + + // cannot commit until tid1 + 2 committed + metadata1->committed(commit_tid2, []() { return nullptr; }); + metadata1->committed(commit_tid3, []() { return nullptr; }); + + C_SaferCond cond1; + metadata1->committed(commit_tid1, [&cond1]() { return &cond1; }); + + // given our 10 minute commit internal, this should override the + // in-flight commit + C_SaferCond cond2; + metadata1->committed(commit_tid4, [&cond2]() { return &cond2; }); + + ASSERT_EQ(-ESTALE, cond1.wait()); + metadata1->flush_commit_position(); + ASSERT_EQ(0, cond2.wait()); + + ASSERT_TRUE(wait_for_update(metadata2)); + metadata2->get_commit_position(&read_commit_position); + expect_commit_position = {{{0, 0, 2}, {1, 0, 1}}}; + ASSERT_EQ(expect_commit_position, read_commit_position); +} + +TEST_F(TestJournalMetadata, UpdateActiveObject) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid, 14, 2)); + ASSERT_EQ(0, client_register(oid, "client1", "")); + + auto metadata1 = create_metadata(oid, "client1"); + ASSERT_EQ(0, init_metadata(metadata1)); + ASSERT_TRUE(wait_for_update(metadata1)); + + ASSERT_EQ(0U, metadata1->get_active_set()); + + ASSERT_EQ(0, metadata1->set_active_set(123)); + ASSERT_TRUE(wait_for_update(metadata1)); + + ASSERT_EQ(123U, metadata1->get_active_set()); +} + +TEST_F(TestJournalMetadata, DisconnectLaggyClient) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid, "client1", "")); + ASSERT_EQ(0, client_register(oid, "client2", "laggy")); + + int max_concurrent_object_sets = 100; + auto metadata = + create_metadata(oid, "client1", 0.1, max_concurrent_object_sets); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_TRUE(wait_for_update(metadata)); + + ASSERT_EQ(0U, metadata->get_active_set()); + + journal::JournalMetadata::RegisteredClients clients; + +#define ASSERT_CLIENT_STATES(s1, s2) \ + ASSERT_EQ(2U, clients.size()); \ + for (auto &c : clients) { \ + if (c.id == "client1") { \ + ASSERT_EQ(c.state, s1); \ + } else if (c.id == "client2") { \ + ASSERT_EQ(c.state, s2); \ + } else { \ + ASSERT_TRUE(false); \ + } \ + } + + metadata->get_registered_clients(&clients); + ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED, + cls::journal::CLIENT_STATE_CONNECTED); + + // client2 is connected when active set <= max_concurrent_object_sets + ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets)); + ASSERT_TRUE(wait_for_update(metadata)); + uint64_t commit_tid = metadata->allocate_commit_tid(0, 0, 0); + C_SaferCond cond1; + metadata->committed(commit_tid, [&cond1]() { return &cond1; }); + ASSERT_EQ(0, cond1.wait()); + metadata->flush_commit_position(); + ASSERT_TRUE(wait_for_update(metadata)); + ASSERT_EQ(100U, metadata->get_active_set()); + clients.clear(); + metadata->get_registered_clients(&clients); + ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED, + cls::journal::CLIENT_STATE_CONNECTED); + + // client2 is disconnected when active set > max_concurrent_object_sets + ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets + 1)); + ASSERT_TRUE(wait_for_update(metadata)); + commit_tid = metadata->allocate_commit_tid(0, 0, 1); + C_SaferCond cond2; + metadata->committed(commit_tid, [&cond2]() { return &cond2; }); + ASSERT_EQ(0, cond2.wait()); + metadata->flush_commit_position(); + ASSERT_TRUE(wait_for_update(metadata)); + ASSERT_EQ(101U, metadata->get_active_set()); + clients.clear(); + metadata->get_registered_clients(&clients); + ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED, + cls::journal::CLIENT_STATE_DISCONNECTED); +} + +TEST_F(TestJournalMetadata, AssertActiveTag) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid, "client1", "")); + + auto metadata = create_metadata(oid, "client1"); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_TRUE(wait_for_update(metadata)); + + C_SaferCond ctx1; + cls::journal::Tag tag1; + metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1); + ASSERT_EQ(0, ctx1.wait()); + + C_SaferCond ctx2; + metadata->assert_active_tag(tag1.tid, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + + C_SaferCond ctx3; + cls::journal::Tag tag2; + metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx3); + ASSERT_EQ(0, ctx3.wait()); + + C_SaferCond ctx4; + metadata->assert_active_tag(tag1.tid, &ctx4); + ASSERT_EQ(-ESTALE, ctx4.wait()); +} diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc new file mode 100644 index 000000000..1601705da --- /dev/null +++ b/src/test/journal/test_JournalPlayer.cc @@ -0,0 +1,994 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalPlayer.h" +#include "journal/Entry.h" +#include "journal/JournalMetadata.h" +#include "journal/ReplayHandler.h" +#include "include/stringify.h" +#include "common/ceph_mutex.h" +#include "gtest/gtest.h" +#include "test/journal/RadosTestFixture.h" +#include <list> +#include <boost/scope_exit.hpp> + +typedef std::list<journal::Entry> Entries; + +template <typename T> +class TestJournalPlayer : public RadosTestFixture { +public: + typedef std::list<journal::JournalPlayer *> JournalPlayers; + + static const uint64_t max_fetch_bytes = T::max_fetch_bytes; + + struct ReplayHandler : public journal::ReplayHandler { + ceph::mutex lock = ceph::make_mutex("lock"); + ceph::condition_variable cond; + bool entries_available; + bool complete; + int complete_result; + + ReplayHandler() + : entries_available(false), complete(false), + complete_result(0) {} + + void handle_entries_available() override { + std::lock_guard locker{lock}; + entries_available = true; + cond.notify_all(); + } + + void handle_complete(int r) override { + std::lock_guard locker{lock}; + complete = true; + complete_result = r; + cond.notify_all(); + } + }; + + void TearDown() override { + for (JournalPlayers::iterator it = m_players.begin(); + it != m_players.end(); ++it) { + delete *it; + } + RadosTestFixture::TearDown(); + } + + auto create_metadata(const std::string &oid) { + return RadosTestFixture::create_metadata(oid, "client", 0.1, + max_fetch_bytes); + } + + int client_commit(const std::string &oid, + journal::JournalPlayer::ObjectSetPosition position) { + return RadosTestFixture::client_commit(oid, "client", position); + } + + journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) { + std::string payload(128, '0'); + bufferlist payload_bl; + payload_bl.append(payload); + return journal::Entry(tag_tid, entry_tid, payload_bl); + } + + journal::JournalPlayer *create_player(const std::string &oid, + const ceph::ref_t<journal::JournalMetadata>& metadata) { + journal::JournalPlayer *player(new journal::JournalPlayer( + m_ioctx, oid + ".", metadata, &m_replay_hander, nullptr)); + m_players.push_back(player); + return player; + } + + bool wait_for_entries(journal::JournalPlayer *player, uint32_t count, + Entries *entries) { + entries->clear(); + while (entries->size() < count) { + journal::Entry entry; + uint64_t commit_tid; + while (entries->size() < count && + player->try_pop_front(&entry, &commit_tid)) { + entries->push_back(entry); + } + if (entries->size() == count) { + break; + } + + std::unique_lock locker{m_replay_hander.lock}; + if (m_replay_hander.entries_available) { + m_replay_hander.entries_available = false; + } else if (m_replay_hander.cond.wait_for(locker, 10s) == + std::cv_status::timeout) { + break; + } + } + return entries->size() == count; + } + + bool wait_for_complete(journal::JournalPlayer *player) { + std::unique_lock locker{m_replay_hander.lock}; + while (!m_replay_hander.complete) { + journal::Entry entry; + uint64_t commit_tid; + player->try_pop_front(&entry, &commit_tid); + + if (m_replay_hander.cond.wait_for(locker, 10s) == + std::cv_status::timeout) { + return false; + } + } + m_replay_hander.complete = false; + return true; + } + + int write_entry(const std::string &oid, uint64_t object_num, + uint64_t tag_tid, uint64_t entry_tid) { + bufferlist bl; + encode(create_entry(tag_tid, entry_tid), bl); + return append(oid + "." + stringify(object_num), bl); + } + + JournalPlayers m_players; + ReplayHandler m_replay_hander; +}; + +template <uint64_t _max_fetch_bytes> +class TestJournalPlayerParams { +public: + static const uint64_t max_fetch_bytes = _max_fetch_bytes; +}; + +typedef ::testing::Types<TestJournalPlayerParams<0>, + TestJournalPlayerParams<16> > TestJournalPlayerTypes; +TYPED_TEST_SUITE(TestJournalPlayer, TestJournalPlayerTypes); + +TYPED_TEST(TestJournalPlayer, Prefetch) { + std::string oid = this->get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions; + positions = { + cls::journal::ObjectPosition(0, 234, 122) }; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125)); + + player->prefetch(); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); + ASSERT_TRUE(this->wait_for_complete(player)); + + Entries expected_entries; + expected_entries = { + this->create_entry(234, 123), + this->create_entry(234, 124), + this->create_entry(234, 125)}; + ASSERT_EQ(expected_entries, entries); + + uint64_t last_tid; + ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); + ASSERT_EQ(125U, last_tid); +} + +TYPED_TEST(TestJournalPlayer, PrefetchSkip) { + std::string oid = this->get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions; + positions = { + cls::journal::ObjectPosition(0, 234, 125), + cls::journal::ObjectPosition(1, 234, 124) }; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125)); + + player->prefetch(); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 0, &entries)); + ASSERT_TRUE(this->wait_for_complete(player)); + + uint64_t last_tid; + ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); + ASSERT_EQ(125U, last_tid); +} + +TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); + + player->prefetch(); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 2, &entries)); + ASSERT_TRUE(this->wait_for_complete(player)); + + Entries expected_entries; + expected_entries = { + this->create_entry(234, 122), + this->create_entry(234, 123)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) { + std::string oid = this->get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions; + positions = { + cls::journal::ObjectPosition(2, 234, 122), + cls::journal::ObjectPosition(1, 234, 121), + cls::journal::ObjectPosition(0, 234, 120)}; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, this->create(oid, 14, 3)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121)); + ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122)); + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124)); + ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated + + player->prefetch(); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); + ASSERT_TRUE(this->wait_for_complete(player)); + + uint64_t last_tid; + ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); + ASSERT_EQ(124U, last_tid); + ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid)); + ASSERT_EQ(0U, last_tid); +} + +TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121)); + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 2, &entries)); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + ASSERT_TRUE(this->wait_for_complete(player)); + ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result); +} + +TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid, 14, 4)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, metadata->set_active_set(1)); + ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852)); + ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); + ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860)); + ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853)); + ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857)); + ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861)); + ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854)); + ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0)); + ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1)); + ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2)); + ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 7, &entries)); + + Entries expected_entries = { + this->create_entry(2, 852), + this->create_entry(2, 853), + this->create_entry(2, 854), + this->create_entry(3, 0), + this->create_entry(3, 1), + this->create_entry(3, 2), + this->create_entry(3, 3)}; + ASSERT_EQ(expected_entries, entries); + + ASSERT_TRUE(this->wait_for_complete(player)); + ASSERT_EQ(0, this->m_replay_hander.complete_result); +} + +TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, metadata->set_active_set(2)); + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); + ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + this->create_entry(0, 0), + this->create_entry(0, 1), + this->create_entry(1, 0)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2)); + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4)); + ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); + + Entries expected_entries = { + this->create_entry(0, 0), + this->create_entry(0, 1), + this->create_entry(0, 2), + this->create_entry(1, 0)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) { + std::string oid = this->get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions = { + cls::journal::ObjectPosition(0, 1, 0) }; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); + ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); + ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); + + Entries expected_entries = { + this->create_entry(1, 1)}; + ASSERT_EQ(expected_entries, entries); + + ASSERT_TRUE(this->wait_for_complete(player)); + ASSERT_EQ(0, this->m_replay_hander.complete_result); +} + +TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120)); + ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121)); + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + ASSERT_TRUE(this->wait_for_complete(player)); + ASSERT_EQ(0, this->m_replay_hander.complete_result); +} + +TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) { + std::string oid = this->get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions; + positions = { + cls::journal::ObjectPosition(0, 234, 122)}; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); + + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); + ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); + + Entries expected_entries; + expected_entries = {this->create_entry(234, 123)}; + ASSERT_EQ(expected_entries, entries); + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124)); + ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); + + expected_entries = {this->create_entry(234, 124)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid, 14, 3)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + ASSERT_EQ(0, metadata->set_active_set(2)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122)); + ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123)); + ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124)); + ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125)); + ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126)); + + player->prefetch(); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 5, &entries)); + ASSERT_TRUE(this->wait_for_complete(player)); + + Entries expected_entries; + expected_entries = { + this->create_entry(234, 122), + this->create_entry(234, 123), + this->create_entry(234, 124), + this->create_entry(234, 125), + this->create_entry(234, 126)}; + ASSERT_EQ(expected_entries, entries); + + uint64_t last_tid; + ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); + ASSERT_EQ(126U, last_tid); +} + +TYPED_TEST(TestJournalPlayer, ImbalancedJournal) { + std::string oid = this->get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions = { + cls::journal::ObjectPosition(9, 300, 1), + cls::journal::ObjectPosition(8, 300, 0), + cls::journal::ObjectPosition(10, 200, 4334), + cls::journal::ObjectPosition(11, 200, 4331) }; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, this->create(oid, 14, 4)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + ASSERT_EQ(0, metadata->set_active_set(2)); + metadata->set_minimum_set(2); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0)); + ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0)); + ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1)); + ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1)); + ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334)); + ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2)); + ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331)); + ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3)); + + player->prefetch(); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); + ASSERT_TRUE(this->wait_for_complete(player)); + + Entries expected_entries; + expected_entries = { + this->create_entry(301, 0), + this->create_entry(301, 1), + this->create_entry(301, 2), + this->create_entry(301, 3)}; + ASSERT_EQ(expected_entries, entries); + + uint64_t last_tid; + ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid)); + ASSERT_EQ(3U, last_tid); +} + +TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2)); + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4)); + ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1 + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + this->create_entry(0, 0), + this->create_entry(0, 1), + this->create_entry(0, 2)}; + ASSERT_EQ(expected_entries, entries); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); + ASSERT_EQ(0, metadata->set_active_set(1)); + ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); + + expected_entries = { + this->create_entry(0, 3), + this->create_entry(0, 4), + this->create_entry(0, 5)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid, 14, 4)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852)); + ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); + ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860)); + ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853)); + ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857)); + ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854)); + ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + this->create_entry(2, 852), + this->create_entry(2, 853), + this->create_entry(2, 854)}; + ASSERT_EQ(expected_entries, entries); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + + ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3)); + ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2)); + ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1)); + ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0)); + ASSERT_TRUE(this->wait_for_entries(player, 4, &entries)); + + expected_entries = { + this->create_entry(3, 0), + this->create_entry(3, 1), + this->create_entry(3, 2), + this->create_entry(3, 3)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, metadata->set_active_set(2)); + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); + ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + this->create_entry(0, 0), + this->create_entry(0, 1), + this->create_entry(1, 0)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) { + std::string oid = this->get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + C_SaferCond ctx1; + cls::journal::Tag tag1; + metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1); + ASSERT_EQ(0, ctx1.wait()); + + ASSERT_EQ(0, metadata->set_active_set(0)); + ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0)); + ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1)); + ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2)); + ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + this->create_entry(tag1.tid, 0), + this->create_entry(tag1.tid, 1), + this->create_entry(tag1.tid, 2)}; + ASSERT_EQ(expected_entries, entries); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + + C_SaferCond ctx2; + cls::journal::Tag tag2; + metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + + ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0)); + ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); + + expected_entries = { + this->create_entry(tag2.tid, 0)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) { + std::string oid = this->get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions = { + cls::journal::ObjectPosition(0, 1, 0) }; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3)); + ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0)); + ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); + + Entries expected_entries = { + this->create_entry(1, 1)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) { + std::string oid = this->get_temp_oid(); + + journal::JournalPlayer::ObjectPositions positions = { + cls::journal::ObjectPosition(1, 0, 1), + cls::journal::ObjectPosition(0, 0, 0)}; + cls::journal::ObjectSetPosition commit_position(positions); + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, commit_position)); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + + ASSERT_EQ(0, metadata->set_active_set(1)); + ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3)); + ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0)); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); + + Entries expected_entries = { + this->create_entry(1, 0)}; + ASSERT_EQ(expected_entries, entries); + + // should remove player for offset 3 after refetching + ASSERT_EQ(0, metadata->set_active_set(3)); + ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1)); + + ASSERT_TRUE(this->wait_for_entries(player, 1, &entries)); + + expected_entries = { + this->create_entry(1, 1)}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestJournalPlayer, PrefechShutDown) { + std::string oid = this->get_temp_oid(); + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, {})); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + player->prefetch(); +} + +TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) { + std::string oid = this->get_temp_oid(); + + ASSERT_EQ(0, this->create(oid)); + ASSERT_EQ(0, this->client_register(oid)); + ASSERT_EQ(0, this->client_commit(oid, {})); + + auto metadata = this->create_metadata(oid); + ASSERT_EQ(0, this->init_metadata(metadata)); + + journal::JournalPlayer *player = this->create_player(oid, metadata); + BOOST_SCOPE_EXIT_ALL( (player) ) { + C_SaferCond unwatch_ctx; + player->shut_down(&unwatch_ctx); + ASSERT_EQ(0, unwatch_ctx.wait()); + }; + player->prefetch_and_watch(0.25); +} + diff --git a/src/test/journal/test_JournalRecorder.cc b/src/test/journal/test_JournalRecorder.cc new file mode 100644 index 000000000..466ee2741 --- /dev/null +++ b/src/test/journal/test_JournalRecorder.cc @@ -0,0 +1,174 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalRecorder.h" +#include "journal/Entry.h" +#include "journal/JournalMetadata.h" +#include "test/journal/RadosTestFixture.h" +#include <limits> +#include <list> +#include <memory> + +class TestJournalRecorder : public RadosTestFixture { +public: + using JournalRecorderPtr = std::unique_ptr<journal::JournalRecorder, + std::function<void(journal::JournalRecorder*)>>; + JournalRecorderPtr create_recorder( + const std::string &oid, const ceph::ref_t<journal::JournalMetadata>& metadata) { + JournalRecorderPtr recorder{ + new journal::JournalRecorder(m_ioctx, oid + ".", metadata, 0), + [](journal::JournalRecorder* recorder) { + C_SaferCond cond; + recorder->shut_down(&cond); + cond.wait(); + delete recorder; + } + }; + recorder->set_append_batch_options(0, std::numeric_limits<uint32_t>::max(), 0); + return recorder; + } +}; + +TEST_F(TestJournalRecorder, Append) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + JournalRecorderPtr recorder = create_recorder(oid, metadata); + + journal::Future future1 = recorder->append(123, create_payload("payload")); + + C_SaferCond cond; + future1.flush(&cond); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestJournalRecorder, AppendKnownOverflow) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_EQ(0U, metadata->get_active_set()); + + JournalRecorderPtr recorder = create_recorder(oid, metadata); + + recorder->append(123, create_payload(std::string(metadata->get_object_size() - + journal::Entry::get_fixed_size(), '1'))); + journal::Future future2 = recorder->append(123, create_payload(std::string(1, '2'))); + + C_SaferCond cond; + future2.flush(&cond); + ASSERT_EQ(0, cond.wait()); + + ASSERT_EQ(1U, metadata->get_active_set()); +} + +TEST_F(TestJournalRecorder, AppendDelayedOverflow) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_EQ(0U, metadata->get_active_set()); + + JournalRecorderPtr recorder1 = create_recorder(oid, metadata); + JournalRecorderPtr recorder2 = create_recorder(oid, metadata); + + recorder1->append(234, create_payload(std::string(1, '1'))); + recorder2->append(123, create_payload(std::string(metadata->get_object_size() - + journal::Entry::get_fixed_size(), '2'))); + + journal::Future future = recorder2->append(123, create_payload(std::string(1, '3'))); + + C_SaferCond cond; + future.flush(&cond); + ASSERT_EQ(0, cond.wait()); + + ASSERT_EQ(1U, metadata->get_active_set()); +} + +TEST_F(TestJournalRecorder, FutureFlush) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + JournalRecorderPtr recorder = create_recorder(oid, metadata); + + journal::Future future1 = recorder->append(123, create_payload("payload1")); + journal::Future future2 = recorder->append(123, create_payload("payload2")); + + C_SaferCond cond; + future2.flush(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_TRUE(future1.is_complete()); + ASSERT_TRUE(future2.is_complete()); +} + +TEST_F(TestJournalRecorder, Flush) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + JournalRecorderPtr recorder = create_recorder(oid, metadata); + + journal::Future future1 = recorder->append(123, create_payload("payload1")); + journal::Future future2 = recorder->append(123, create_payload("payload2")); + + C_SaferCond cond1; + recorder->flush(&cond1); + ASSERT_EQ(0, cond1.wait()); + + C_SaferCond cond2; + future2.wait(&cond2); + ASSERT_EQ(0, cond2.wait()); + ASSERT_TRUE(future1.is_complete()); + ASSERT_TRUE(future2.is_complete()); +} + +TEST_F(TestJournalRecorder, OverflowCommitObjectNumber) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_EQ(0U, metadata->get_active_set()); + + JournalRecorderPtr recorder = create_recorder(oid, metadata); + + recorder->append(123, create_payload(std::string(metadata->get_object_size() - + journal::Entry::get_fixed_size(), '1'))); + journal::Future future2 = recorder->append(124, create_payload(std::string(1, '2'))); + + C_SaferCond cond; + future2.flush(&cond); + ASSERT_EQ(0, cond.wait()); + + ASSERT_EQ(1U, metadata->get_active_set()); + + uint64_t object_num; + uint64_t tag_tid; + uint64_t entry_tid; + metadata->get_commit_entry(1, &object_num, &tag_tid, &entry_tid); + ASSERT_EQ(0U, object_num); + ASSERT_EQ(123U, tag_tid); + ASSERT_EQ(0U, entry_tid); + + metadata->get_commit_entry(2, &object_num, &tag_tid, &entry_tid); + ASSERT_EQ(2U, object_num); + ASSERT_EQ(124U, tag_tid); + ASSERT_EQ(0U, entry_tid); +} + diff --git a/src/test/journal/test_JournalTrimmer.cc b/src/test/journal/test_JournalTrimmer.cc new file mode 100644 index 000000000..aaf10979f --- /dev/null +++ b/src/test/journal/test_JournalTrimmer.cc @@ -0,0 +1,197 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalTrimmer.h" +#include "journal/JournalMetadata.h" +#include "include/stringify.h" +#include "test/journal/RadosTestFixture.h" +#include <limits> +#include <list> + +class TestJournalTrimmer : public RadosTestFixture { +public: + + void TearDown() override { + for (MetadataList::iterator it = m_metadata_list.begin(); + it != m_metadata_list.end(); ++it) { + (*it)->remove_listener(&m_listener); + } + m_metadata_list.clear(); + + for (std::list<journal::JournalTrimmer*>::iterator it = m_trimmers.begin(); + it != m_trimmers.end(); ++it) { + C_SaferCond ctx; + (*it)->shut_down(&ctx); + ASSERT_EQ(0, ctx.wait()); + delete *it; + } + RadosTestFixture::TearDown(); + } + + int append_payload(const ceph::ref_t<journal::JournalMetadata>& metadata, + const std::string &oid, uint64_t object_num, + const std::string &payload, uint64_t *commit_tid) { + int r = append(oid + "." + stringify(object_num), create_payload(payload)); + uint64_t tid = metadata->allocate_commit_tid(object_num, 234, 123); + if (commit_tid != NULL) { + *commit_tid = tid; + } + return r; + } + + auto create_metadata(const std::string &oid) { + auto metadata = RadosTestFixture::create_metadata(oid); + m_metadata_list.push_back(metadata); + metadata->add_listener(&m_listener); + return metadata; + } + + journal::JournalTrimmer *create_trimmer(const std::string &oid, + const ceph::ref_t<journal::JournalMetadata>& metadata) { + journal::JournalTrimmer *trimmer(new journal::JournalTrimmer( + m_ioctx, oid + ".", metadata)); + m_trimmers.push_back(trimmer); + return trimmer; + } + + int assert_exists(const std::string &oid) { + librados::ObjectWriteOperation op; + op.assert_exists(); + return m_ioctx.operate(oid, &op); + } + + typedef std::list<ceph::ref_t<journal::JournalMetadata>> MetadataList; + MetadataList m_metadata_list; + std::list<journal::JournalTrimmer*> m_trimmers; +}; + +TEST_F(TestJournalTrimmer, Committed) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_TRUE(wait_for_update(metadata)); + + ASSERT_EQ(0, metadata->set_active_set(10)); + ASSERT_TRUE(wait_for_update(metadata)); + + uint64_t commit_tid1; + uint64_t commit_tid2; + uint64_t commit_tid3; + uint64_t commit_tid4; + uint64_t commit_tid5; + uint64_t commit_tid6; + ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid1)); + ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid2)); + ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid3)); + ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid4)); + ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid5)); + ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid6)); + + journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata); + + trimmer->committed(commit_tid4); + trimmer->committed(commit_tid6); + trimmer->committed(commit_tid2); + trimmer->committed(commit_tid5); + trimmer->committed(commit_tid3); + trimmer->committed(commit_tid1); + while (metadata->get_minimum_set() != 2U) { + ASSERT_TRUE(wait_for_update(metadata)); + } + + ASSERT_EQ(-ENOENT, assert_exists(oid + ".0")); + ASSERT_EQ(-ENOENT, assert_exists(oid + ".2")); + ASSERT_EQ(0, assert_exists(oid + ".5")); +} + +TEST_F(TestJournalTrimmer, CommittedWithOtherClient) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_register(oid, "client2", "slow client")); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_TRUE(wait_for_update(metadata)); + + ASSERT_EQ(0, metadata->set_active_set(10)); + ASSERT_TRUE(wait_for_update(metadata)); + + uint64_t commit_tid1; + uint64_t commit_tid2; + uint64_t commit_tid3; + uint64_t commit_tid4; + ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid1)); + ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid2)); + ASSERT_EQ(0, append_payload(metadata, oid, 3, "payload", &commit_tid3)); + ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid4)); + + journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata); + + trimmer->committed(commit_tid1); + trimmer->committed(commit_tid2); + trimmer->committed(commit_tid3); + trimmer->committed(commit_tid4); + ASSERT_TRUE(wait_for_update(metadata)); + + ASSERT_EQ(0, assert_exists(oid + ".0")); + ASSERT_EQ(0, assert_exists(oid + ".2")); + ASSERT_EQ(0, assert_exists(oid + ".3")); + ASSERT_EQ(0, assert_exists(oid + ".5")); +} + +TEST_F(TestJournalTrimmer, RemoveObjects) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_TRUE(wait_for_update(metadata)); + + ASSERT_EQ(0, metadata->set_active_set(10)); + ASSERT_TRUE(wait_for_update(metadata)); + + ASSERT_EQ(0, append(oid + ".0", create_payload("payload"))); + ASSERT_EQ(0, append(oid + ".2", create_payload("payload"))); + ASSERT_EQ(0, append(oid + ".3", create_payload("payload"))); + ASSERT_EQ(0, append(oid + ".5", create_payload("payload"))); + + journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata); + + C_SaferCond cond; + trimmer->remove_objects(false, &cond); + ASSERT_EQ(0, cond.wait()); + + ASSERT_TRUE(wait_for_update(metadata)); + + ASSERT_EQ(-ENOENT, assert_exists(oid + ".0")); + ASSERT_EQ(-ENOENT, assert_exists(oid + ".2")); + ASSERT_EQ(-ENOENT, assert_exists(oid + ".3")); + ASSERT_EQ(-ENOENT, assert_exists(oid + ".5")); +} + +TEST_F(TestJournalTrimmer, RemoveObjectsWithOtherClient) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid, 12, 2)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_register(oid, "client2", "other client")); + + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_TRUE(wait_for_update(metadata)); + + journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata); + + C_SaferCond ctx1; + trimmer->remove_objects(false, &ctx1); + ASSERT_EQ(-EBUSY, ctx1.wait()); + + C_SaferCond ctx2; + trimmer->remove_objects(true, &ctx2); + ASSERT_EQ(0, ctx2.wait()); +} + diff --git a/src/test/journal/test_Journaler.cc b/src/test/journal/test_Journaler.cc new file mode 100644 index 000000000..836816581 --- /dev/null +++ b/src/test/journal/test_Journaler.cc @@ -0,0 +1,198 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/stringify.h" + +#include "journal/Journaler.h" +#include "journal/Settings.h" + +#include "test/librados/test.h" +#include "test/journal/RadosTestFixture.h" + +#include "gtest/gtest.h" + +// reinclude our assert to clobber the system one +#include "include/ceph_assert.h" + +class TestJournaler : public RadosTestFixture { +public: + + static const std::string CLIENT_ID; + + static std::string get_temp_journal_id() { + return stringify(++_journal_id); + } + + void SetUp() override { + RadosTestFixture::SetUp(); + m_journal_id = get_temp_journal_id(); + m_journaler = new journal::Journaler(m_work_queue, m_timer, &m_timer_lock, + m_ioctx, m_journal_id, CLIENT_ID, {}, + nullptr); + } + + void TearDown() override { + delete m_journaler; + RadosTestFixture::TearDown(); + } + + int create_journal(uint8_t order, uint8_t splay_width) { + C_SaferCond cond; + m_journaler->create(order, splay_width, -1, &cond); + return cond.wait(); + } + + int init_journaler() { + C_SaferCond cond; + m_journaler->init(&cond); + return cond.wait(); + } + + int shut_down_journaler() { + C_SaferCond ctx; + m_journaler->shut_down(&ctx); + return ctx.wait(); + } + + int register_client(const std::string &client_id, const std::string &desc) { + journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx, + m_journal_id, client_id, {}, nullptr); + bufferlist data; + data.append(desc); + C_SaferCond cond; + journaler.register_client(data, &cond); + return cond.wait(); + } + + int update_client(const std::string &client_id, const std::string &desc) { + journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx, + m_journal_id, client_id, {}, nullptr); + bufferlist data; + data.append(desc); + C_SaferCond cond; + journaler.update_client(data, &cond); + return cond.wait(); + } + + int unregister_client(const std::string &client_id) { + journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx, + m_journal_id, client_id, {}, nullptr); + C_SaferCond cond; + journaler.unregister_client(&cond); + return cond.wait(); + } + + static uint64_t _journal_id; + + std::string m_journal_id; + journal::Journaler *m_journaler; +}; + +const std::string TestJournaler::CLIENT_ID = "client1"; +uint64_t TestJournaler::_journal_id = 0; + +TEST_F(TestJournaler, Create) { + ASSERT_EQ(0, create_journal(12, 8)); +} + +TEST_F(TestJournaler, CreateDuplicate) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(-EEXIST, create_journal(12, 8)); +} + +TEST_F(TestJournaler, CreateInvalidParams) { + ASSERT_EQ(-EDOM, create_journal(1, 8)); + ASSERT_EQ(-EDOM, create_journal(123, 8)); + ASSERT_EQ(-EINVAL, create_journal(12, 0)); +} + +TEST_F(TestJournaler, Init) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(0, register_client(CLIENT_ID, "foo")); + ASSERT_EQ(0, init_journaler()); + ASSERT_EQ(0, shut_down_journaler()); +} + +TEST_F(TestJournaler, InitDNE) { + ASSERT_EQ(-ENOENT, init_journaler()); + ASSERT_EQ(0, shut_down_journaler()); +} + +TEST_F(TestJournaler, RegisterClientDuplicate) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(0, register_client(CLIENT_ID, "foo")); + ASSERT_EQ(-EEXIST, register_client(CLIENT_ID, "foo2")); +} + +TEST_F(TestJournaler, UpdateClient) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(0, register_client(CLIENT_ID, "foo")); + ASSERT_EQ(0, update_client(CLIENT_ID, "foo2")); +} + +TEST_F(TestJournaler, UpdateClientDNE) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(-ENOENT, update_client(CLIENT_ID, "foo")); +} + +TEST_F(TestJournaler, UnregisterClient) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(0, register_client(CLIENT_ID, "foo")); + ASSERT_EQ(0, unregister_client(CLIENT_ID)); + // Test it does not exist and can be registered again + ASSERT_EQ(-ENOENT, update_client(CLIENT_ID, "foo")); + ASSERT_EQ(0, register_client(CLIENT_ID, "foo")); +} + +TEST_F(TestJournaler, UnregisterClientDNE) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(-ENOENT, unregister_client(CLIENT_ID)); +} + +TEST_F(TestJournaler, AllocateTag) { + ASSERT_EQ(0, create_journal(12, 8)); + + cls::journal::Tag tag; + + bufferlist data; + data.append(std::string(128, '1')); + + // allocate a new tag class + C_SaferCond ctx1; + m_journaler->allocate_tag(data, &tag, &ctx1); + ASSERT_EQ(0, ctx1.wait()); + ASSERT_EQ(cls::journal::Tag(0, 0, data), tag); + + // re-use an existing tag class + C_SaferCond ctx2; + m_journaler->allocate_tag(tag.tag_class, bufferlist(), &tag, &ctx2); + ASSERT_EQ(0, ctx2.wait()); + ASSERT_EQ(cls::journal::Tag(1, 0, bufferlist()), tag); +} + +TEST_F(TestJournaler, GetTags) { + ASSERT_EQ(0, create_journal(12, 8)); + ASSERT_EQ(0, register_client(CLIENT_ID, "foo")); + + std::list<cls::journal::Tag> expected_tags; + for (size_t i = 0; i < 256; ++i) { + C_SaferCond ctx; + cls::journal::Tag tag; + if (i < 2) { + m_journaler->allocate_tag(bufferlist(), &tag, &ctx); + } else { + m_journaler->allocate_tag(i % 2, bufferlist(), &tag, &ctx); + } + ASSERT_EQ(0, ctx.wait()); + + if (i % 2 == 0) { + expected_tags.push_back(tag); + } + } + + std::list<cls::journal::Tag> tags; + C_SaferCond ctx; + m_journaler->get_tags(0, &tags, &ctx); + ASSERT_EQ(0, ctx.wait()); + ASSERT_EQ(expected_tags, tags); +} diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc new file mode 100644 index 000000000..5ac3d8b12 --- /dev/null +++ b/src/test/journal/test_ObjectPlayer.cc @@ -0,0 +1,281 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/ObjectPlayer.h" +#include "journal/Entry.h" +#include "include/stringify.h" +#include "common/Timer.h" +#include "gtest/gtest.h" +#include "test/librados/test.h" +#include "test/journal/RadosTestFixture.h" + +template <typename T> +class TestObjectPlayer : public RadosTestFixture, public T { +public: + auto create_object(const std::string &oid, uint8_t order) { + auto object = ceph::make_ref<journal::ObjectPlayer>( + m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order, + T::max_fetch_bytes); + return object; + } + + int fetch(const ceph::ref_t<journal::ObjectPlayer>& object_player) { + while (true) { + C_SaferCond ctx; + object_player->set_refetch_state( + journal::ObjectPlayer::REFETCH_STATE_NONE); + object_player->fetch(&ctx); + int r = ctx.wait(); + if (r < 0 || !object_player->refetch_required()) { + return r; + } + } + return 0; + } + + int watch_and_wait_for_entries(const ceph::ref_t<journal::ObjectPlayer>& object_player, + journal::ObjectPlayer::Entries *entries, + size_t count) { + for (size_t i = 0; i < 50; ++i) { + object_player->get_entries(entries); + if (entries->size() == count) { + break; + } + + C_SaferCond ctx; + object_player->watch(&ctx, 0.1); + + int r = ctx.wait(); + if (r < 0) { + return r; + } + } + return 0; + } + + std::string get_object_name(const std::string &oid) { + return oid + ".0"; + } +}; + +template <uint32_t _max_fetch_bytes> +struct TestObjectPlayerParams { + static inline const uint32_t max_fetch_bytes = _max_fetch_bytes; +}; + +typedef ::testing::Types<TestObjectPlayerParams<0>, + TestObjectPlayerParams<10> > TestObjectPlayerTypes; +TYPED_TEST_SUITE(TestObjectPlayer, TestObjectPlayerTypes); + +TYPED_TEST(TestObjectPlayer, Fetch) { + std::string oid = this->get_temp_oid(); + + journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1'))); + journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1'))); + + bufferlist bl; + encode(entry1, bl); + encode(entry2, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + + auto object = this->create_object(oid, 14); + ASSERT_LE(0, this->fetch(object)); + + journal::ObjectPlayer::Entries entries; + object->get_entries(&entries); + ASSERT_EQ(2U, entries.size()); + + journal::ObjectPlayer::Entries expected_entries = {entry1, entry2}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestObjectPlayer, FetchLarge) { + std::string oid = this->get_temp_oid(); + + journal::Entry entry1(234, 123, + this->create_payload(std::string(8192 - 32, '1'))); + journal::Entry entry2(234, 124, this->create_payload("")); + + bufferlist bl; + encode(entry1, bl); + encode(entry2, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + + auto object = this->create_object(oid, 12); + ASSERT_LE(0, this->fetch(object)); + + journal::ObjectPlayer::Entries entries; + object->get_entries(&entries); + ASSERT_EQ(2U, entries.size()); + + journal::ObjectPlayer::Entries expected_entries = {entry1, entry2}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestObjectPlayer, FetchDeDup) { + std::string oid = this->get_temp_oid(); + + journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1'))); + journal::Entry entry2(234, 123, this->create_payload(std::string(24, '2'))); + + bufferlist bl; + encode(entry1, bl); + encode(entry2, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + + auto object = this->create_object(oid, 14); + ASSERT_LE(0, this->fetch(object)); + + journal::ObjectPlayer::Entries entries; + object->get_entries(&entries); + ASSERT_EQ(1U, entries.size()); + + journal::ObjectPlayer::Entries expected_entries = {entry2}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestObjectPlayer, FetchEmpty) { + std::string oid = this->get_temp_oid(); + + bufferlist bl; + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + + auto object = this->create_object(oid, 14); + + ASSERT_EQ(0, this->fetch(object)); + ASSERT_TRUE(object->empty()); +} + +TYPED_TEST(TestObjectPlayer, FetchCorrupt) { + std::string oid = this->get_temp_oid(); + + journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1'))); + journal::Entry entry2(234, 124, this->create_payload(std::string(24, '2'))); + + bufferlist bl; + encode(entry1, bl); + encode(this->create_payload("corruption" + std::string(1024, 'X')), bl); + encode(entry2, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + + auto object = this->create_object(oid, 14); + ASSERT_EQ(-EBADMSG, this->fetch(object)); + + journal::ObjectPlayer::Entries entries; + object->get_entries(&entries); + ASSERT_EQ(1U, entries.size()); + + journal::ObjectPlayer::Entries expected_entries = {entry1}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestObjectPlayer, FetchAppend) { + std::string oid = this->get_temp_oid(); + + journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1'))); + journal::Entry entry2(234, 124, this->create_payload(std::string(24, '2'))); + + bufferlist bl; + encode(entry1, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + + auto object = this->create_object(oid, 14); + ASSERT_LE(0, this->fetch(object)); + + journal::ObjectPlayer::Entries entries; + object->get_entries(&entries); + ASSERT_EQ(1U, entries.size()); + + journal::ObjectPlayer::Entries expected_entries = {entry1}; + ASSERT_EQ(expected_entries, entries); + + bl.clear(); + encode(entry2, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + ASSERT_LE(0, this->fetch(object)); + + object->get_entries(&entries); + ASSERT_EQ(2U, entries.size()); + + expected_entries = {entry1, entry2}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestObjectPlayer, PopEntry) { + std::string oid = this->get_temp_oid(); + + journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1'))); + journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1'))); + + bufferlist bl; + encode(entry1, bl); + encode(entry2, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + + auto object = this->create_object(oid, 14); + ASSERT_LE(0, this->fetch(object)); + + journal::ObjectPlayer::Entries entries; + object->get_entries(&entries); + ASSERT_EQ(2U, entries.size()); + + journal::Entry entry; + object->front(&entry); + object->pop_front(); + ASSERT_EQ(entry1, entry); + object->front(&entry); + object->pop_front(); + ASSERT_EQ(entry2, entry); + ASSERT_TRUE(object->empty()); +} + +TYPED_TEST(TestObjectPlayer, Watch) { + std::string oid = this->get_temp_oid(); + auto object = this->create_object(oid, 14); + + C_SaferCond cond1; + object->watch(&cond1, 0.1); + + journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1'))); + journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1'))); + + bufferlist bl; + encode(entry1, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + ASSERT_LE(0, cond1.wait()); + + journal::ObjectPlayer::Entries entries; + ASSERT_EQ(0, this->watch_and_wait_for_entries(object, &entries, 1U)); + ASSERT_EQ(1U, entries.size()); + + journal::ObjectPlayer::Entries expected_entries; + expected_entries = {entry1}; + ASSERT_EQ(expected_entries, entries); + + C_SaferCond cond2; + object->watch(&cond2, 0.1); + + bl.clear(); + encode(entry2, bl); + ASSERT_EQ(0, this->append(this->get_object_name(oid), bl)); + ASSERT_LE(0, cond2.wait()); + + ASSERT_EQ(0, this->watch_and_wait_for_entries(object, &entries, 2U)); + ASSERT_EQ(2U, entries.size()); + + expected_entries = {entry1, entry2}; + ASSERT_EQ(expected_entries, entries); +} + +TYPED_TEST(TestObjectPlayer, Unwatch) { + std::string oid = this->get_temp_oid(); + auto object = this->create_object(oid, 14); + + C_SaferCond watch_ctx; + object->watch(&watch_ctx, 600); + + usleep(200000); + + object->unwatch(); + ASSERT_EQ(-ECANCELED, watch_ctx.wait()); +} diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc new file mode 100644 index 000000000..ac110a23e --- /dev/null +++ b/src/test/journal/test_ObjectRecorder.cc @@ -0,0 +1,463 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/ObjectRecorder.h" +#include "common/Cond.h" +#include "common/ceph_mutex.h" +#include "common/Timer.h" +#include "gtest/gtest.h" +#include "test/librados/test.h" +#include "test/journal/RadosTestFixture.h" +#include <limits> + +using std::shared_ptr; + +class TestObjectRecorder : public RadosTestFixture { +public: + TestObjectRecorder() = default; + + struct Handler : public journal::ObjectRecorder::Handler { + ceph::mutex lock = ceph::make_mutex("lock"); + ceph::mutex* object_lock = nullptr; + ceph::condition_variable cond; + bool is_closed = false; + uint32_t overflows = 0; + + Handler() = default; + + void closed(journal::ObjectRecorder *object_recorder) override { + std::lock_guard locker{lock}; + is_closed = true; + cond.notify_all(); + } + void overflow(journal::ObjectRecorder *object_recorder) override { + std::lock_guard locker{lock}; + journal::AppendBuffers append_buffers; + object_lock->lock(); + object_recorder->claim_append_buffers(&append_buffers); + object_lock->unlock(); + + ++overflows; + cond.notify_all(); + } + }; + + // flush the pending buffers in dtor + class ObjectRecorderFlusher { + public: + ObjectRecorderFlusher(librados::IoCtx& ioctx, + ContextWQ* work_queue) + : m_ioctx{ioctx}, + m_work_queue{work_queue} + {} + ObjectRecorderFlusher(librados::IoCtx& ioctx, + ContextWQ* work_queue, + uint32_t flush_interval, + uint16_t flush_bytes, + double flush_age, + int max_in_flight) + : m_ioctx{ioctx}, + m_work_queue{work_queue}, + m_flush_interval{flush_interval}, + m_flush_bytes{flush_bytes}, + m_flush_age{flush_age}, + m_max_in_flight_appends{max_in_flight < 0 ? + std::numeric_limits<uint64_t>::max() : + static_cast<uint64_t>(max_in_flight)} + {} + ~ObjectRecorderFlusher() { + for (auto& [object_recorder, m] : m_object_recorders) { + C_SaferCond cond; + object_recorder->flush(&cond); + cond.wait(); + std::scoped_lock l{*m}; + if (!object_recorder->is_closed()) { + object_recorder->close(); + } + } + } + auto create_object(std::string_view oid, uint8_t order, ceph::mutex* lock) { + auto object = ceph::make_ref<journal::ObjectRecorder>( + m_ioctx, oid, 0, lock, m_work_queue, &m_handler, + order, m_max_in_flight_appends); + { + std::lock_guard locker{*lock}; + object->set_append_batch_options(m_flush_interval, + m_flush_bytes, + m_flush_age); + } + m_object_recorders.emplace_back(object, lock); + m_handler.object_lock = lock; + return object; + } + bool wait_for_closed() { + std::unique_lock locker{m_handler.lock}; + return m_handler.cond.wait_for(locker, 10s, + [this] { return m_handler.is_closed; }); + } + bool wait_for_overflow() { + std::unique_lock locker{m_handler.lock}; + if (m_handler.cond.wait_for(locker, 10s, + [this] { return m_handler.overflows > 0; })) { + m_handler.overflows = 0; + return true; + } else { + return false; + } + } + private: + librados::IoCtx& m_ioctx; + ContextWQ *m_work_queue; + uint32_t m_flush_interval = std::numeric_limits<uint32_t>::max(); + uint64_t m_flush_bytes = std::numeric_limits<uint64_t>::max(); + double m_flush_age = 600; + uint64_t m_max_in_flight_appends = 0; + using ObjectRecorders = + std::list<std::pair<ceph::ref_t<journal::ObjectRecorder>, ceph::mutex*>>; + ObjectRecorders m_object_recorders; + Handler m_handler; + }; + + journal::AppendBuffer create_append_buffer(uint64_t tag_tid, + uint64_t entry_tid, + const std::string &payload) { + auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456); + future->init(ceph::ref_t<journal::FutureImpl>()); + + bufferlist bl; + bl.append(payload); + return std::make_pair(future, bl); + } +}; + +TEST_F(TestObjectRecorder, Append) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0); + auto object = flusher.create_object(oid, 24, &lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(0U, object->get_pending_appends()); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + "payload"); + append_buffers = {append_buffer2}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(0U, object->get_pending_appends()); + + C_SaferCond cond; + append_buffer2.first->flush(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, AppendFlushByCount) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1); + auto object = flusher.create_object(oid, 24, &lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + "payload"); + append_buffers = {append_buffer2}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(0U, object->get_pending_appends()); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestObjectRecorder, AppendFlushByBytes) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); + auto object = flusher.create_object(oid, 24, &lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + "payload"); + append_buffers = {append_buffer2}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(0U, object->get_pending_appends()); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestObjectRecorder, AppendFlushByAge) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1); + auto object = flusher.create_object(oid, 24, &lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + + uint32_t offset = 0; + journal::AppendBuffer append_buffer2; + while (!append_buffer1.first->is_flush_in_progress() && + !append_buffer1.first->is_complete()) { + usleep(1000); + + append_buffer2 = create_append_buffer(234, 124 + offset, "payload"); + ++offset; + append_buffers = {append_buffer2}; + + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + } + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, AppendFilledObject) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1); + auto object = flusher.create_object(oid, 12, &lock); + + std::string payload(2048, '1'); + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + payload); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + payload); + append_buffers = {append_buffer2}; + lock.lock(); + ASSERT_TRUE(object->append(std::move(append_buffers))); + lock.unlock(); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, Flush) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); + auto object = flusher.create_object(oid, 24, &lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + C_SaferCond cond1; + object->flush(&cond1); + ASSERT_EQ(0, cond1.wait()); + + C_SaferCond cond2; + append_buffer1.first->wait(&cond2); + ASSERT_EQ(0, cond2.wait()); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, FlushFuture) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); + auto object = flusher.create_object(oid, 24, &lock); + + journal::AppendBuffer append_buffer = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + C_SaferCond cond; + append_buffer.first->wait(&cond); + object->flush(append_buffer.first); + ASSERT_TRUE(append_buffer.first->is_flush_in_progress() || + append_buffer.first->is_complete()); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestObjectRecorder, FlushDetachedFuture) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); + auto object = flusher.create_object(oid, 24, &lock); + + journal::AppendBuffer append_buffer = create_append_buffer(234, 123, + "payload"); + + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer}; + + object->flush(append_buffer.first); + ASSERT_FALSE(append_buffer.first->is_flush_in_progress()); + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + + // should automatically flush once its attached to the object + C_SaferCond cond; + append_buffer.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestObjectRecorder, Close) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1); + auto object = flusher.create_object(oid, 24, &lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + lock.lock(); + ASSERT_FALSE(object->close()); + ASSERT_TRUE(ceph_mutex_is_locked(lock)); + lock.unlock(); + + ASSERT_TRUE(flusher.wait_for_closed()); + + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, Overflow) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + auto metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1"); + ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2"); + + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); + auto object1 = flusher.create_object(oid, 12, &lock1); + + std::string payload(1 << 11, '1'); + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + payload); + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + payload); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1, append_buffer2}; + lock1.lock(); + ASSERT_TRUE(object1->append(std::move(append_buffers))); + lock1.unlock(); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_EQ(0U, object1->get_pending_appends()); + + auto object2 = flusher.create_object(oid, 12, &lock2); + + journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123, + payload); + append_buffers = {append_buffer3}; + lock2.lock(); + ASSERT_FALSE(object2->append(std::move(append_buffers))); + lock2.unlock(); + append_buffer3.first->flush(NULL); + + ASSERT_TRUE(flusher.wait_for_overflow()); +} diff --git a/src/test/journal/test_main.cc b/src/test/journal/test_main.cc new file mode 100644 index 000000000..afb535d31 --- /dev/null +++ b/src/test/journal/test_main.cc @@ -0,0 +1,27 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "gtest/gtest.h" +#include "common/ceph_argparse.h" +#include "common/ceph_crypto.h" +#include "common/config_proxy.h" +#include "global/global_context.h" +#include "global/global_init.h" +#include <vector> + +int main(int argc, char **argv) +{ + ::testing::InitGoogleTest(&argc, argv); + + std::vector<const char*> args; + argv_to_vec(argc, (const char **)argv, args); + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_OSD, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_MON_CONFIG); + g_conf().set_val("lockdep", "true"); + common_init_finish(g_ceph_context); + + int r = RUN_ALL_TESTS(); + return r; +} |