summaryrefslogtreecommitdiffstats
path: root/src/test/journal
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/test/journal
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/journal')
-rw-r--r--src/test/journal/CMakeLists.txt32
-rw-r--r--src/test/journal/RadosTestFixture.cc133
-rw-r--r--src/test/journal/RadosTestFixture.h74
-rw-r--r--src/test/journal/mock/MockJournaler.cc16
-rw-r--r--src/test/journal/mock/MockJournaler.h313
-rw-r--r--src/test/journal/test_Entry.cc96
-rw-r--r--src/test/journal/test_FutureImpl.cc268
-rw-r--r--src/test/journal/test_JournalMetadata.cc210
-rw-r--r--src/test/journal/test_JournalPlayer.cc994
-rw-r--r--src/test/journal/test_JournalRecorder.cc174
-rw-r--r--src/test/journal/test_JournalTrimmer.cc197
-rw-r--r--src/test/journal/test_Journaler.cc198
-rw-r--r--src/test/journal/test_ObjectPlayer.cc281
-rw-r--r--src/test/journal/test_ObjectRecorder.cc463
-rw-r--r--src/test/journal/test_main.cc27
15 files changed, 3476 insertions, 0 deletions
diff --git a/src/test/journal/CMakeLists.txt b/src/test/journal/CMakeLists.txt
new file mode 100644
index 000000000..99e0f8ae6
--- /dev/null
+++ b/src/test/journal/CMakeLists.txt
@@ -0,0 +1,32 @@
+add_library(journal_test_mock STATIC mock/MockJournaler.cc)
+target_link_libraries(journal_test_mock
+ PUBLIC GMock::GMock)
+
+# unittest_journal
+set(unittest_journal_srcs
+ test_main.cc
+ test_Entry.cc
+ test_FutureImpl.cc
+ test_Journaler.cc
+ test_JournalMetadata.cc
+ test_JournalPlayer.cc
+ test_JournalRecorder.cc
+ test_JournalTrimmer.cc
+ test_ObjectPlayer.cc
+ test_ObjectRecorder.cc
+ RadosTestFixture.cc
+ )
+
+add_executable(unittest_journal
+ ${unittest_journal_srcs}
+ )
+add_ceph_unittest(unittest_journal)
+target_link_libraries(unittest_journal
+ journal
+ cls_journal
+ cls_journal_client
+ rados_test_stub
+ librados
+ radostest-cxx
+ global
+ )
diff --git a/src/test/journal/RadosTestFixture.cc b/src/test/journal/RadosTestFixture.cc
new file mode 100644
index 000000000..50e35e871
--- /dev/null
+++ b/src/test/journal/RadosTestFixture.cc
@@ -0,0 +1,133 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librados/test_cxx.h"
+#include "test/journal/RadosTestFixture.h"
+#include "cls/journal/cls_journal_client.h"
+#include "include/stringify.h"
+#include "common/WorkQueue.h"
+#include "journal/Settings.h"
+
+RadosTestFixture::RadosTestFixture()
+ : m_timer_lock(ceph::make_mutex("m_timer_lock")),
+ m_listener(this) {
+}
+
+void RadosTestFixture::SetUpTestCase() {
+ _pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(_pool_name, _rados));
+
+ CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct());
+ _thread_pool = new ThreadPool(cct, "RadosTestFixture::_thread_pool",
+ "tp_test", 1);
+ _thread_pool->start();
+}
+
+void RadosTestFixture::TearDownTestCase() {
+ _thread_pool->stop();
+ delete _thread_pool;
+
+ ASSERT_EQ(0, destroy_one_pool_pp(_pool_name, _rados));
+}
+
+std::string RadosTestFixture::get_temp_oid() {
+ ++_oid_number;
+ return "oid" + stringify(_oid_number);
+}
+
+void RadosTestFixture::SetUp() {
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
+
+ CephContext* cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+ m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue",
+ ceph::make_timespan(60),
+ _thread_pool);
+
+ m_timer = new SafeTimer(cct, m_timer_lock, true);
+ m_timer->init();
+}
+
+void RadosTestFixture::TearDown() {
+ for (auto metadata : m_metadatas) {
+ C_SaferCond ctx;
+ metadata->shut_down(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+ }
+
+ {
+ std::lock_guard locker{m_timer_lock};
+ m_timer->shutdown();
+ }
+ delete m_timer;
+
+ m_work_queue->drain();
+ delete m_work_queue;
+}
+
+int RadosTestFixture::create(const std::string &oid, uint8_t order,
+ uint8_t splay_width) {
+ return cls::journal::client::create(m_ioctx, oid, order, splay_width, -1);
+}
+
+ceph::ref_t<journal::JournalMetadata> RadosTestFixture::create_metadata(
+ const std::string &oid, const std::string &client_id,
+ double commit_interval, int max_concurrent_object_sets) {
+ journal::Settings settings;
+ settings.commit_interval = commit_interval;
+ settings.max_concurrent_object_sets = max_concurrent_object_sets;
+
+ auto metadata = ceph::make_ref<journal::JournalMetadata>(
+ m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings);
+ m_metadatas.push_back(metadata);
+ return metadata;
+}
+
+int RadosTestFixture::append(const std::string &oid, const bufferlist &bl) {
+ librados::ObjectWriteOperation op;
+ op.append(bl);
+ return m_ioctx.operate(oid, &op);
+}
+
+int RadosTestFixture::client_register(const std::string &oid,
+ const std::string &id,
+ const std::string &description) {
+ bufferlist data;
+ data.append(description);
+ return cls::journal::client::client_register(m_ioctx, oid, id, data);
+}
+
+int RadosTestFixture::client_commit(const std::string &oid,
+ const std::string &id,
+ const cls::journal::ObjectSetPosition &commit_position) {
+ librados::ObjectWriteOperation op;
+ cls::journal::client::client_commit(&op, id, commit_position);
+ return m_ioctx.operate(oid, &op);
+}
+
+bufferlist RadosTestFixture::create_payload(const std::string &payload) {
+ bufferlist bl;
+ bl.append(payload);
+ return bl;
+}
+
+int RadosTestFixture::init_metadata(const ceph::ref_t<journal::JournalMetadata>& metadata) {
+ C_SaferCond cond;
+ metadata->init(&cond);
+ return cond.wait();
+}
+
+bool RadosTestFixture::wait_for_update(const ceph::ref_t<journal::JournalMetadata>& metadata) {
+ std::unique_lock locker{m_listener.mutex};
+ while (m_listener.updates[metadata.get()] == 0) {
+ if (m_listener.cond.wait_for(locker, 10s) == std::cv_status::timeout) {
+ return false;
+ }
+ }
+ --m_listener.updates[metadata.get()];
+ return true;
+}
+
+std::string RadosTestFixture::_pool_name;
+librados::Rados RadosTestFixture::_rados;
+uint64_t RadosTestFixture::_oid_number = 0;
+ThreadPool *RadosTestFixture::_thread_pool = nullptr;
diff --git a/src/test/journal/RadosTestFixture.h b/src/test/journal/RadosTestFixture.h
new file mode 100644
index 000000000..8ec662931
--- /dev/null
+++ b/src/test/journal/RadosTestFixture.h
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librados/test.h"
+#include "common/ceph_mutex.h"
+#include "common/Timer.h"
+#include "journal/JournalMetadata.h"
+#include "cls/journal/cls_journal_types.h"
+#include "gtest/gtest.h"
+
+class ThreadPool;
+
+class RadosTestFixture : public ::testing::Test {
+public:
+ static void SetUpTestCase();
+ static void TearDownTestCase();
+
+ static std::string get_temp_oid();
+
+ RadosTestFixture();
+ void SetUp() override;
+ void TearDown() override;
+
+ int create(const std::string &oid, uint8_t order = 14,
+ uint8_t splay_width = 2);
+ ceph::ref_t<journal::JournalMetadata> create_metadata(const std::string &oid,
+ const std::string &client_id = "client",
+ double commit_internal = 0.1,
+ int max_concurrent_object_sets = 0);
+ int append(const std::string &oid, const bufferlist &bl);
+
+ int client_register(const std::string &oid, const std::string &id = "client",
+ const std::string &description = "");
+ int client_commit(const std::string &oid, const std::string &id,
+ const cls::journal::ObjectSetPosition &commit_position);
+
+ bufferlist create_payload(const std::string &payload);
+
+ struct Listener : public journal::JournalMetadataListener {
+ RadosTestFixture *test_fixture;
+ ceph::mutex mutex = ceph::make_mutex("mutex");
+ ceph::condition_variable cond;
+ std::map<journal::JournalMetadata*, uint32_t> updates;
+
+ Listener(RadosTestFixture *_test_fixture)
+ : test_fixture(_test_fixture) {}
+
+ void handle_update(journal::JournalMetadata *metadata) override {
+ std::lock_guard locker{mutex};
+ ++updates[metadata];
+ cond.notify_all();
+ }
+ };
+
+ int init_metadata(const ceph::ref_t<journal::JournalMetadata>& metadata);
+
+ bool wait_for_update(const ceph::ref_t<journal::JournalMetadata>& metadata);
+
+ static std::string _pool_name;
+ static librados::Rados _rados;
+ static uint64_t _oid_number;
+ static ThreadPool *_thread_pool;
+
+ librados::IoCtx m_ioctx;
+
+ ContextWQ *m_work_queue = nullptr;
+
+ ceph::mutex m_timer_lock;
+ SafeTimer *m_timer = nullptr;
+
+ Listener m_listener;
+
+ std::list<ceph::ref_t<journal::JournalMetadata>> m_metadatas;
+};
diff --git a/src/test/journal/mock/MockJournaler.cc b/src/test/journal/mock/MockJournaler.cc
new file mode 100644
index 000000000..90649440d
--- /dev/null
+++ b/src/test/journal/mock/MockJournaler.cc
@@ -0,0 +1,16 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "MockJournaler.h"
+
+namespace journal {
+
+MockFuture *MockFuture::s_instance = nullptr;
+MockReplayEntry *MockReplayEntry::s_instance = nullptr;
+MockJournaler *MockJournaler::s_instance = nullptr;
+
+std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &) {
+ return os;
+}
+
+} // namespace journal
diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h
new file mode 100644
index 000000000..d4e0f6c2a
--- /dev/null
+++ b/src/test/journal/mock/MockJournaler.h
@@ -0,0 +1,313 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef TEST_RBD_MIRROR_MOCK_JOURNALER_H
+#define TEST_RBD_MIRROR_MOCK_JOURNALER_H
+
+#include <gmock/gmock.h>
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+#include "cls/journal/cls_journal_types.h"
+#include "journal/Journaler.h"
+#include <iosfwd>
+#include <string>
+
+class Context;
+
+namespace journal {
+
+struct ReplayHandler;
+struct Settings;
+
+struct MockFuture {
+ static MockFuture *s_instance;
+ static MockFuture &get_instance() {
+ ceph_assert(s_instance != nullptr);
+ return *s_instance;
+ }
+
+ MockFuture() {
+ s_instance = this;
+ }
+
+ MOCK_CONST_METHOD0(is_valid, bool());
+ MOCK_METHOD1(flush, void(Context *));
+ MOCK_METHOD1(wait, void(Context *));
+};
+
+struct MockFutureProxy {
+ bool is_valid() const {
+ return MockFuture::get_instance().is_valid();
+ }
+
+ void flush(Context *on_safe) {
+ MockFuture::get_instance().flush(on_safe);
+ }
+
+ void wait(Context *on_safe) {
+ MockFuture::get_instance().wait(on_safe);
+ }
+};
+
+struct MockReplayEntry {
+ static MockReplayEntry *s_instance;
+ static MockReplayEntry &get_instance() {
+ ceph_assert(s_instance != nullptr);
+ return *s_instance;
+ }
+
+ MockReplayEntry() {
+ s_instance = this;
+ }
+
+ MOCK_CONST_METHOD0(get_commit_tid, uint64_t());
+ MOCK_CONST_METHOD0(get_data, bufferlist());
+};
+
+struct MockReplayEntryProxy {
+ uint64_t get_commit_tid() const {
+ return MockReplayEntry::get_instance().get_commit_tid();
+ }
+
+ bufferlist get_data() const {
+ return MockReplayEntry::get_instance().get_data();
+ }
+};
+
+struct MockJournaler {
+ static MockJournaler *s_instance;
+ static MockJournaler &get_instance() {
+ ceph_assert(s_instance != nullptr);
+ return *s_instance;
+ }
+
+ MockJournaler() {
+ s_instance = this;
+ }
+
+ MOCK_METHOD0(construct, void());
+
+ MOCK_METHOD1(init, void(Context *));
+ MOCK_METHOD0(shut_down, void());
+ MOCK_METHOD1(shut_down, void(Context *));
+ MOCK_CONST_METHOD0(is_initialized, bool());
+
+ MOCK_METHOD3(get_metadata, void(uint8_t *order, uint8_t *splay_width,
+ int64_t *pool_id));
+ MOCK_METHOD4(get_mutable_metadata, void(uint64_t*, uint64_t*,
+ std::set<cls::journal::Client> *,
+ Context*));
+
+ MOCK_METHOD2(register_client, void(const bufferlist &, Context *));
+ MOCK_METHOD1(unregister_client, void(Context *));
+ MOCK_METHOD3(get_client, void(const std::string &, cls::journal::Client *,
+ Context *));
+ MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*));
+ MOCK_METHOD2(update_client, void(const bufferlist &, Context *));
+
+ MOCK_METHOD4(allocate_tag, void(uint64_t, const bufferlist &,
+ cls::journal::Tag*, Context *));
+ MOCK_METHOD3(get_tag, void(uint64_t, cls::journal::Tag *, Context *));
+ MOCK_METHOD3(get_tags, void(uint64_t, journal::Journaler::Tags*, Context*));
+ MOCK_METHOD4(get_tags, void(uint64_t, uint64_t, journal::Journaler::Tags*,
+ Context*));
+
+ MOCK_METHOD1(start_replay, void(::journal::ReplayHandler *replay_handler));
+ MOCK_METHOD2(start_live_replay, void(ReplayHandler *, double));
+ MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *));
+ MOCK_METHOD2(try_pop_front, bool(MockReplayEntryProxy *, uint64_t *));
+ MOCK_METHOD0(stop_replay, void());
+ MOCK_METHOD1(stop_replay, void(Context *on_finish));
+
+ MOCK_METHOD1(start_append, void(uint64_t));
+ MOCK_METHOD3(set_append_batch_options, void(int, uint64_t, double));
+ MOCK_CONST_METHOD0(get_max_append_size, uint64_t());
+ MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id,
+ const bufferlist &bl));
+ MOCK_METHOD1(flush, void(Context *on_safe));
+ MOCK_METHOD1(stop_append, void(Context *on_safe));
+
+ MOCK_METHOD1(committed, void(const MockReplayEntryProxy &));
+ MOCK_METHOD1(committed, void(const MockFutureProxy &future));
+ MOCK_METHOD1(flush_commit_position, void(Context*));
+
+ MOCK_METHOD1(add_listener, void(JournalMetadataListener *));
+ MOCK_METHOD1(remove_listener, void(JournalMetadataListener *));
+
+};
+
+struct MockJournalerProxy {
+ MockJournalerProxy() {
+ MockJournaler::get_instance().construct();
+ }
+
+ template <typename IoCtxT>
+ MockJournalerProxy(IoCtxT &header_ioctx, const std::string &,
+ const std::string &, const Settings&,
+ journal::CacheManagerHandler *) {
+ MockJournaler::get_instance().construct();
+ }
+
+ template <typename WorkQueue, typename Timer>
+ MockJournalerProxy(WorkQueue *work_queue, Timer *timer, ceph::mutex *timer_lock,
+ librados::IoCtx &header_ioctx,
+ const std::string &journal_id,
+ const std::string &client_id, const Settings&,
+ journal::CacheManagerHandler *) {
+ MockJournaler::get_instance().construct();
+ }
+
+ void exists(Context *on_finish) const {
+ on_finish->complete(-EINVAL);
+ }
+ void create(uint8_t order, uint8_t splay_width, int64_t pool_id, Context *on_finish) {
+ on_finish->complete(-EINVAL);
+ }
+ void remove(bool force, Context *on_finish) {
+ on_finish->complete(-EINVAL);
+ }
+ int register_client(const bufferlist &data) {
+ return -EINVAL;
+ }
+
+ void allocate_tag(uint64_t tag_class, const bufferlist &tag_data,
+ cls::journal::Tag* tag, Context *on_finish) {
+ MockJournaler::get_instance().allocate_tag(tag_class, tag_data, tag,
+ on_finish);
+ }
+
+ void init(Context *on_finish) {
+ MockJournaler::get_instance().init(on_finish);
+ }
+ void shut_down() {
+ MockJournaler::get_instance().shut_down();
+ }
+ void shut_down(Context *on_finish) {
+ MockJournaler::get_instance().shut_down(on_finish);
+ }
+ bool is_initialized() const {
+ return MockJournaler::get_instance().is_initialized();
+ }
+
+ void get_metadata(uint8_t *order, uint8_t *splay_width, int64_t *pool_id) {
+ MockJournaler::get_instance().get_metadata(order, splay_width, pool_id);
+ }
+
+ void get_mutable_metadata(uint64_t *min, uint64_t *active,
+ std::set<cls::journal::Client> *clients,
+ Context *on_finish) {
+ MockJournaler::get_instance().get_mutable_metadata(min, active, clients,
+ on_finish);
+ }
+
+ void register_client(const bufferlist &data, Context *on_finish) {
+ MockJournaler::get_instance().register_client(data, on_finish);
+ }
+
+ void unregister_client(Context *on_finish) {
+ MockJournaler::get_instance().unregister_client(on_finish);
+ }
+
+ void get_client(const std::string &client_id, cls::journal::Client *client,
+ Context *on_finish) {
+ MockJournaler::get_instance().get_client(client_id, client, on_finish);
+ }
+
+ int get_cached_client(const std::string& client_id,
+ cls::journal::Client* client) {
+ return MockJournaler::get_instance().get_cached_client(client_id, client);
+ }
+
+ void update_client(const bufferlist &client_data, Context *on_finish) {
+ MockJournaler::get_instance().update_client(client_data, on_finish);
+ }
+
+ void get_tag(uint64_t tag_tid, cls::journal::Tag *tag, Context *on_finish) {
+ MockJournaler::get_instance().get_tag(tag_tid, tag, on_finish);
+ }
+
+ void get_tags(uint64_t tag_class, journal::Journaler::Tags *tags,
+ Context *on_finish) {
+ MockJournaler::get_instance().get_tags(tag_class, tags, on_finish);
+ }
+ void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class,
+ journal::Journaler::Tags *tags, Context *on_finish) {
+ MockJournaler::get_instance().get_tags(start_after_tag_tid, tag_class, tags,
+ on_finish);
+ }
+
+ void start_replay(::journal::ReplayHandler *replay_handler) {
+ MockJournaler::get_instance().start_replay(replay_handler);
+ }
+
+ void start_live_replay(ReplayHandler *handler, double interval) {
+ MockJournaler::get_instance().start_live_replay(handler, interval);
+ }
+
+ bool try_pop_front(MockReplayEntryProxy *replay_entry) {
+ return MockJournaler::get_instance().try_pop_front(replay_entry);
+ }
+
+ bool try_pop_front(MockReplayEntryProxy *entry, uint64_t *tag_tid) {
+ return MockJournaler::get_instance().try_pop_front(entry, tag_tid);
+ }
+
+ void stop_replay() {
+ MockJournaler::get_instance().stop_replay();
+ }
+ void stop_replay(Context *on_finish) {
+ MockJournaler::get_instance().stop_replay(on_finish);
+ }
+
+ void start_append(uint64_t max_in_flight_appends) {
+ MockJournaler::get_instance().start_append(max_in_flight_appends);
+ }
+
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age) {
+ MockJournaler::get_instance().set_append_batch_options(
+ flush_interval, flush_bytes, flush_age);
+ }
+
+ uint64_t get_max_append_size() const {
+ return MockJournaler::get_instance().get_max_append_size();
+ }
+
+ MockFutureProxy append(uint64_t tag_id, const bufferlist &bl) {
+ return MockJournaler::get_instance().append(tag_id, bl);
+ }
+
+ void flush(Context *on_safe) {
+ MockJournaler::get_instance().flush(on_safe);
+ }
+
+ void stop_append(Context *on_safe) {
+ MockJournaler::get_instance().stop_append(on_safe);
+ }
+
+ void committed(const MockReplayEntryProxy &entry) {
+ MockJournaler::get_instance().committed(entry);
+ }
+
+ void committed(const MockFutureProxy &future) {
+ MockJournaler::get_instance().committed(future);
+ }
+
+ void flush_commit_position(Context *on_finish) {
+ MockJournaler::get_instance().flush_commit_position(on_finish);
+ }
+
+ void add_listener(JournalMetadataListener *listener) {
+ MockJournaler::get_instance().add_listener(listener);
+ }
+
+ void remove_listener(JournalMetadataListener *listener) {
+ MockJournaler::get_instance().remove_listener(listener);
+ }
+};
+
+std::ostream &operator<<(std::ostream &os, const MockJournalerProxy &);
+
+} // namespace journal
+
+#endif // TEST_RBD_MIRROR_MOCK_JOURNALER_H
diff --git a/src/test/journal/test_Entry.cc b/src/test/journal/test_Entry.cc
new file mode 100644
index 000000000..1fa3136f7
--- /dev/null
+++ b/src/test/journal/test_Entry.cc
@@ -0,0 +1,96 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Entry.h"
+#include "gtest/gtest.h"
+
+class TestEntry : public ::testing::Test {
+};
+
+TEST_F(TestEntry, DefaultConstructor) {
+ journal::Entry entry;
+ ASSERT_EQ(0U, entry.get_entry_tid());
+ ASSERT_EQ(0U, entry.get_tag_tid());
+
+ bufferlist data(entry.get_data());
+ bufferlist expected_data;
+ ASSERT_TRUE(data.contents_equal(expected_data));
+}
+
+TEST_F(TestEntry, Constructor) {
+ bufferlist data;
+ data.append("data");
+ journal::Entry entry(234, 123, data);
+
+ data.clear();
+ data = entry.get_data();
+
+ bufferlist expected_data;
+ expected_data.append("data");
+
+ ASSERT_EQ(123U, entry.get_entry_tid());
+ ASSERT_EQ(234U, entry.get_tag_tid());
+ ASSERT_TRUE(data.contents_equal(expected_data));
+}
+
+TEST_F(TestEntry, IsReadable) {
+ bufferlist data;
+ data.append("data");
+ journal::Entry entry(234, 123, data);
+
+ bufferlist full_bl;
+ encode(entry, full_bl);
+
+ uint32_t bytes_needed;
+ for (size_t i = 0; i < full_bl.length() - 1; ++i) {
+ bufferlist partial_bl;
+ if (i > 0) {
+ partial_bl.substr_of(full_bl, 0, i);
+ }
+ ASSERT_FALSE(journal::Entry::is_readable(partial_bl.begin(),
+ &bytes_needed));
+ ASSERT_GT(bytes_needed, 0U);
+ }
+ ASSERT_TRUE(journal::Entry::is_readable(full_bl.begin(), &bytes_needed));
+ ASSERT_EQ(0U, bytes_needed);
+}
+
+TEST_F(TestEntry, IsReadableBadPreamble) {
+ bufferlist data;
+ data.append("data");
+ journal::Entry entry(234, 123, data);
+
+ uint64_t stray_bytes = 0x1122334455667788;
+ bufferlist full_bl;
+ encode(stray_bytes, full_bl);
+ encode(entry, full_bl);
+
+ uint32_t bytes_needed;
+ bufferlist::iterator it = full_bl.begin();
+ ASSERT_FALSE(journal::Entry::is_readable(it, &bytes_needed));
+ ASSERT_EQ(0U, bytes_needed);
+
+ it += sizeof(stray_bytes);
+ ASSERT_TRUE(journal::Entry::is_readable(it, &bytes_needed));
+ ASSERT_EQ(0U, bytes_needed);
+}
+
+TEST_F(TestEntry, IsReadableBadCRC) {
+ bufferlist data;
+ data.append("data");
+ journal::Entry entry(234, 123, data);
+
+ bufferlist full_bl;
+ encode(entry, full_bl);
+
+ bufferlist bad_bl;
+ bad_bl.substr_of(full_bl, 0, full_bl.length() - 4);
+ encode(full_bl.crc32c(1), bad_bl);
+
+ uint32_t bytes_needed;
+ ASSERT_FALSE(journal::Entry::is_readable(bad_bl.begin(), &bytes_needed));
+ ASSERT_EQ(0U, bytes_needed);
+
+
+
+}
diff --git a/src/test/journal/test_FutureImpl.cc b/src/test/journal/test_FutureImpl.cc
new file mode 100644
index 000000000..1ff346dcf
--- /dev/null
+++ b/src/test/journal/test_FutureImpl.cc
@@ -0,0 +1,268 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/FutureImpl.h"
+#include "common/Cond.h"
+#include "gtest/gtest.h"
+#include "test/journal/RadosTestFixture.h"
+
+class TestFutureImpl : public RadosTestFixture {
+public:
+ struct FlushHandler : public journal::FutureImpl::FlushHandler {
+ uint64_t flushes = 0;
+ void flush(const ceph::ref_t<journal::FutureImpl>& future) override {
+ ++flushes;
+ }
+ FlushHandler() = default;
+ };
+
+ TestFutureImpl() {
+ m_flush_handler = std::make_shared<FlushHandler>();
+ }
+
+ auto create_future(uint64_t tag_tid, uint64_t entry_tid,
+ uint64_t commit_tid,
+ ceph::ref_t<journal::FutureImpl> prev = nullptr) {
+ auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, commit_tid);
+ future->init(prev);
+ return future;
+ }
+
+ void flush(const ceph::ref_t<journal::FutureImpl>& future) {
+ }
+
+ std::shared_ptr<FlushHandler> m_flush_handler;
+};
+
+TEST_F(TestFutureImpl, Getters) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 123, 456);
+ ASSERT_EQ(234U, future->get_tag_tid());
+ ASSERT_EQ(123U, future->get_entry_tid());
+ ASSERT_EQ(456U, future->get_commit_tid());
+}
+
+TEST_F(TestFutureImpl, Attach) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
+ ASSERT_EQ(2U, m_flush_handler.use_count());
+}
+
+TEST_F(TestFutureImpl, AttachWithPendingFlush) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 123, 456);
+ future->flush(NULL);
+
+ ASSERT_TRUE(future->attach(m_flush_handler));
+ ASSERT_EQ(2U, m_flush_handler.use_count());
+}
+
+TEST_F(TestFutureImpl, Detach) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
+ future->detach();
+ ASSERT_EQ(1U, m_flush_handler.use_count());
+}
+
+TEST_F(TestFutureImpl, DetachImplicit) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
+ future.reset();
+ ASSERT_EQ(1U, m_flush_handler.use_count());
+}
+
+TEST_F(TestFutureImpl, Flush) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
+
+ C_SaferCond cond;
+ future->flush(&cond);
+
+ ASSERT_EQ(1U, m_flush_handler->flushes);
+ future->safe(-EIO);
+ ASSERT_EQ(-EIO, cond.wait());
+}
+
+TEST_F(TestFutureImpl, FlushWithoutContext) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 123, 456);
+ ASSERT_FALSE(future->attach(m_flush_handler));
+
+ future->flush(NULL);
+ ASSERT_EQ(1U, m_flush_handler->flushes);
+ future->safe(-EIO);
+ ASSERT_TRUE(future->is_complete());
+ ASSERT_EQ(-EIO, future->get_return_value());
+}
+
+TEST_F(TestFutureImpl, FlushChain) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future1 = create_future(234, 123, 456);
+ auto future2 = create_future(234, 124, 457, future1);
+ auto future3 = create_future(235, 1, 458, future2);
+
+ auto flush_handler = std::make_shared<FlushHandler>();
+ ASSERT_FALSE(future1->attach(m_flush_handler));
+ ASSERT_FALSE(future2->attach(flush_handler));
+ ASSERT_FALSE(future3->attach(m_flush_handler));
+
+ C_SaferCond cond;
+ future3->flush(&cond);
+
+ ASSERT_EQ(1U, m_flush_handler->flushes);
+ ASSERT_EQ(1U, flush_handler->flushes);
+
+ future3->safe(0);
+ ASSERT_FALSE(future3->is_complete());
+
+ future1->safe(0);
+ ASSERT_FALSE(future3->is_complete());
+
+ future2->safe(-EIO);
+ ASSERT_TRUE(future3->is_complete());
+ ASSERT_EQ(-EIO, future3->get_return_value());
+ ASSERT_EQ(-EIO, cond.wait());
+ ASSERT_EQ(0, future1->get_return_value());
+}
+
+TEST_F(TestFutureImpl, FlushInProgress) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future1 = create_future(234, 123, 456);
+ auto future2 = create_future(234, 124, 457, future1);
+ ASSERT_FALSE(future1->attach(m_flush_handler));
+ ASSERT_FALSE(future2->attach(m_flush_handler));
+
+ future1->set_flush_in_progress();
+ ASSERT_TRUE(future1->is_flush_in_progress());
+
+ future1->flush(NULL);
+ ASSERT_EQ(0U, m_flush_handler->flushes);
+
+ future1->safe(0);
+}
+
+TEST_F(TestFutureImpl, FlushAlreadyComplete) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 123, 456);
+ future->safe(-EIO);
+
+ C_SaferCond cond;
+ future->flush(&cond);
+ ASSERT_EQ(-EIO, cond.wait());
+}
+
+TEST_F(TestFutureImpl, Wait) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 1, 456);
+
+ C_SaferCond cond;
+ future->wait(&cond);
+ future->safe(-EEXIST);
+ ASSERT_EQ(-EEXIST, cond.wait());
+}
+
+TEST_F(TestFutureImpl, WaitAlreadyComplete) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future = create_future(234, 1, 456);
+ future->safe(-EEXIST);
+
+ C_SaferCond cond;
+ future->wait(&cond);
+ ASSERT_EQ(-EEXIST, cond.wait());
+}
+
+TEST_F(TestFutureImpl, SafePreservesError) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future1 = create_future(234, 123, 456);
+ auto future2 = create_future(234, 124, 457, future1);
+
+ future1->safe(-EIO);
+ future2->safe(-EEXIST);
+ ASSERT_TRUE(future2->is_complete());
+ ASSERT_EQ(-EIO, future2->get_return_value());
+}
+
+TEST_F(TestFutureImpl, ConsistentPreservesError) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ auto future1 = create_future(234, 123, 456);
+ auto future2 = create_future(234, 124, 457, future1);
+
+ future2->safe(-EEXIST);
+ future1->safe(-EIO);
+ ASSERT_TRUE(future2->is_complete());
+ ASSERT_EQ(-EEXIST, future2->get_return_value());
+}
diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc
new file mode 100644
index 000000000..4108d4da3
--- /dev/null
+++ b/src/test/journal/test_JournalMetadata.cc
@@ -0,0 +1,210 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalMetadata.h"
+#include "test/journal/RadosTestFixture.h"
+#include "common/Cond.h"
+#include <map>
+
+class TestJournalMetadata : public RadosTestFixture {
+public:
+ void TearDown() override {
+ for (MetadataList::iterator it = m_metadata_list.begin();
+ it != m_metadata_list.end(); ++it) {
+ (*it)->remove_listener(&m_listener);
+ }
+ m_metadata_list.clear();
+
+ RadosTestFixture::TearDown();
+ }
+
+ auto create_metadata(const std::string &oid,
+ const std::string &client_id,
+ double commit_interval = 0.1,
+ int max_concurrent_object_sets = 0) {
+ auto metadata = RadosTestFixture::create_metadata(
+ oid, client_id, commit_interval, max_concurrent_object_sets);
+ m_metadata_list.push_back(metadata);
+ metadata->add_listener(&m_listener);
+ return metadata;
+ }
+
+ typedef std::list<ceph::ref_t<journal::JournalMetadata>> MetadataList;
+ MetadataList m_metadata_list;
+};
+
+TEST_F(TestJournalMetadata, JournalDNE) {
+ std::string oid = get_temp_oid();
+
+ auto metadata1 = create_metadata(oid, "client1");
+ ASSERT_EQ(-ENOENT, init_metadata(metadata1));
+}
+
+TEST_F(TestJournalMetadata, ClientDNE) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid, 14, 2));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+ auto metadata1 = create_metadata(oid, "client1");
+ ASSERT_EQ(0, init_metadata(metadata1));
+
+ auto metadata2 = create_metadata(oid, "client2");
+ ASSERT_EQ(-ENOENT, init_metadata(metadata2));
+}
+
+TEST_F(TestJournalMetadata, Committed) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid, 14, 2));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+ auto metadata1 = create_metadata(oid, "client1", 600);
+ ASSERT_EQ(0, init_metadata(metadata1));
+
+ auto metadata2 = create_metadata(oid, "client1");
+ ASSERT_EQ(0, init_metadata(metadata2));
+ ASSERT_TRUE(wait_for_update(metadata2));
+
+ journal::JournalMetadata::ObjectSetPosition expect_commit_position;
+ journal::JournalMetadata::ObjectSetPosition read_commit_position;
+ metadata1->get_commit_position(&read_commit_position);
+ ASSERT_EQ(expect_commit_position, read_commit_position);
+
+ uint64_t commit_tid1 = metadata1->allocate_commit_tid(0, 0, 0);
+ uint64_t commit_tid2 = metadata1->allocate_commit_tid(0, 1, 0);
+ uint64_t commit_tid3 = metadata1->allocate_commit_tid(1, 0, 1);
+ uint64_t commit_tid4 = metadata1->allocate_commit_tid(0, 0, 2);
+
+ // cannot commit until tid1 + 2 committed
+ metadata1->committed(commit_tid2, []() { return nullptr; });
+ metadata1->committed(commit_tid3, []() { return nullptr; });
+
+ C_SaferCond cond1;
+ metadata1->committed(commit_tid1, [&cond1]() { return &cond1; });
+
+ // given our 10 minute commit internal, this should override the
+ // in-flight commit
+ C_SaferCond cond2;
+ metadata1->committed(commit_tid4, [&cond2]() { return &cond2; });
+
+ ASSERT_EQ(-ESTALE, cond1.wait());
+ metadata1->flush_commit_position();
+ ASSERT_EQ(0, cond2.wait());
+
+ ASSERT_TRUE(wait_for_update(metadata2));
+ metadata2->get_commit_position(&read_commit_position);
+ expect_commit_position = {{{0, 0, 2}, {1, 0, 1}}};
+ ASSERT_EQ(expect_commit_position, read_commit_position);
+}
+
+TEST_F(TestJournalMetadata, UpdateActiveObject) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid, 14, 2));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+ auto metadata1 = create_metadata(oid, "client1");
+ ASSERT_EQ(0, init_metadata(metadata1));
+ ASSERT_TRUE(wait_for_update(metadata1));
+
+ ASSERT_EQ(0U, metadata1->get_active_set());
+
+ ASSERT_EQ(0, metadata1->set_active_set(123));
+ ASSERT_TRUE(wait_for_update(metadata1));
+
+ ASSERT_EQ(123U, metadata1->get_active_set());
+}
+
+TEST_F(TestJournalMetadata, DisconnectLaggyClient) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+ ASSERT_EQ(0, client_register(oid, "client2", "laggy"));
+
+ int max_concurrent_object_sets = 100;
+ auto metadata =
+ create_metadata(oid, "client1", 0.1, max_concurrent_object_sets);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ ASSERT_EQ(0U, metadata->get_active_set());
+
+ journal::JournalMetadata::RegisteredClients clients;
+
+#define ASSERT_CLIENT_STATES(s1, s2) \
+ ASSERT_EQ(2U, clients.size()); \
+ for (auto &c : clients) { \
+ if (c.id == "client1") { \
+ ASSERT_EQ(c.state, s1); \
+ } else if (c.id == "client2") { \
+ ASSERT_EQ(c.state, s2); \
+ } else { \
+ ASSERT_TRUE(false); \
+ } \
+ }
+
+ metadata->get_registered_clients(&clients);
+ ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+ cls::journal::CLIENT_STATE_CONNECTED);
+
+ // client2 is connected when active set <= max_concurrent_object_sets
+ ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets));
+ ASSERT_TRUE(wait_for_update(metadata));
+ uint64_t commit_tid = metadata->allocate_commit_tid(0, 0, 0);
+ C_SaferCond cond1;
+ metadata->committed(commit_tid, [&cond1]() { return &cond1; });
+ ASSERT_EQ(0, cond1.wait());
+ metadata->flush_commit_position();
+ ASSERT_TRUE(wait_for_update(metadata));
+ ASSERT_EQ(100U, metadata->get_active_set());
+ clients.clear();
+ metadata->get_registered_clients(&clients);
+ ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+ cls::journal::CLIENT_STATE_CONNECTED);
+
+ // client2 is disconnected when active set > max_concurrent_object_sets
+ ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets + 1));
+ ASSERT_TRUE(wait_for_update(metadata));
+ commit_tid = metadata->allocate_commit_tid(0, 0, 1);
+ C_SaferCond cond2;
+ metadata->committed(commit_tid, [&cond2]() { return &cond2; });
+ ASSERT_EQ(0, cond2.wait());
+ metadata->flush_commit_position();
+ ASSERT_TRUE(wait_for_update(metadata));
+ ASSERT_EQ(101U, metadata->get_active_set());
+ clients.clear();
+ metadata->get_registered_clients(&clients);
+ ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+ cls::journal::CLIENT_STATE_DISCONNECTED);
+}
+
+TEST_F(TestJournalMetadata, AssertActiveTag) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+ auto metadata = create_metadata(oid, "client1");
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ C_SaferCond ctx1;
+ cls::journal::Tag tag1;
+ metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ C_SaferCond ctx2;
+ metadata->assert_active_tag(tag1.tid, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+
+ C_SaferCond ctx3;
+ cls::journal::Tag tag2;
+ metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx3);
+ ASSERT_EQ(0, ctx3.wait());
+
+ C_SaferCond ctx4;
+ metadata->assert_active_tag(tag1.tid, &ctx4);
+ ASSERT_EQ(-ESTALE, ctx4.wait());
+}
diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc
new file mode 100644
index 000000000..1601705da
--- /dev/null
+++ b/src/test/journal/test_JournalPlayer.cc
@@ -0,0 +1,994 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalPlayer.h"
+#include "journal/Entry.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ReplayHandler.h"
+#include "include/stringify.h"
+#include "common/ceph_mutex.h"
+#include "gtest/gtest.h"
+#include "test/journal/RadosTestFixture.h"
+#include <list>
+#include <boost/scope_exit.hpp>
+
+typedef std::list<journal::Entry> Entries;
+
+template <typename T>
+class TestJournalPlayer : public RadosTestFixture {
+public:
+ typedef std::list<journal::JournalPlayer *> JournalPlayers;
+
+ static const uint64_t max_fetch_bytes = T::max_fetch_bytes;
+
+ struct ReplayHandler : public journal::ReplayHandler {
+ ceph::mutex lock = ceph::make_mutex("lock");
+ ceph::condition_variable cond;
+ bool entries_available;
+ bool complete;
+ int complete_result;
+
+ ReplayHandler()
+ : entries_available(false), complete(false),
+ complete_result(0) {}
+
+ void handle_entries_available() override {
+ std::lock_guard locker{lock};
+ entries_available = true;
+ cond.notify_all();
+ }
+
+ void handle_complete(int r) override {
+ std::lock_guard locker{lock};
+ complete = true;
+ complete_result = r;
+ cond.notify_all();
+ }
+ };
+
+ void TearDown() override {
+ for (JournalPlayers::iterator it = m_players.begin();
+ it != m_players.end(); ++it) {
+ delete *it;
+ }
+ RadosTestFixture::TearDown();
+ }
+
+ auto create_metadata(const std::string &oid) {
+ return RadosTestFixture::create_metadata(oid, "client", 0.1,
+ max_fetch_bytes);
+ }
+
+ int client_commit(const std::string &oid,
+ journal::JournalPlayer::ObjectSetPosition position) {
+ return RadosTestFixture::client_commit(oid, "client", position);
+ }
+
+ journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) {
+ std::string payload(128, '0');
+ bufferlist payload_bl;
+ payload_bl.append(payload);
+ return journal::Entry(tag_tid, entry_tid, payload_bl);
+ }
+
+ journal::JournalPlayer *create_player(const std::string &oid,
+ const ceph::ref_t<journal::JournalMetadata>& metadata) {
+ journal::JournalPlayer *player(new journal::JournalPlayer(
+ m_ioctx, oid + ".", metadata, &m_replay_hander, nullptr));
+ m_players.push_back(player);
+ return player;
+ }
+
+ bool wait_for_entries(journal::JournalPlayer *player, uint32_t count,
+ Entries *entries) {
+ entries->clear();
+ while (entries->size() < count) {
+ journal::Entry entry;
+ uint64_t commit_tid;
+ while (entries->size() < count &&
+ player->try_pop_front(&entry, &commit_tid)) {
+ entries->push_back(entry);
+ }
+ if (entries->size() == count) {
+ break;
+ }
+
+ std::unique_lock locker{m_replay_hander.lock};
+ if (m_replay_hander.entries_available) {
+ m_replay_hander.entries_available = false;
+ } else if (m_replay_hander.cond.wait_for(locker, 10s) ==
+ std::cv_status::timeout) {
+ break;
+ }
+ }
+ return entries->size() == count;
+ }
+
+ bool wait_for_complete(journal::JournalPlayer *player) {
+ std::unique_lock locker{m_replay_hander.lock};
+ while (!m_replay_hander.complete) {
+ journal::Entry entry;
+ uint64_t commit_tid;
+ player->try_pop_front(&entry, &commit_tid);
+
+ if (m_replay_hander.cond.wait_for(locker, 10s) ==
+ std::cv_status::timeout) {
+ return false;
+ }
+ }
+ m_replay_hander.complete = false;
+ return true;
+ }
+
+ int write_entry(const std::string &oid, uint64_t object_num,
+ uint64_t tag_tid, uint64_t entry_tid) {
+ bufferlist bl;
+ encode(create_entry(tag_tid, entry_tid), bl);
+ return append(oid + "." + stringify(object_num), bl);
+ }
+
+ JournalPlayers m_players;
+ ReplayHandler m_replay_hander;
+};
+
+template <uint64_t _max_fetch_bytes>
+class TestJournalPlayerParams {
+public:
+ static const uint64_t max_fetch_bytes = _max_fetch_bytes;
+};
+
+typedef ::testing::Types<TestJournalPlayerParams<0>,
+ TestJournalPlayerParams<16> > TestJournalPlayerTypes;
+TYPED_TEST_SUITE(TestJournalPlayer, TestJournalPlayerTypes);
+
+TYPED_TEST(TestJournalPlayer, Prefetch) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions;
+ positions = {
+ cls::journal::ObjectPosition(0, 234, 122) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ Entries expected_entries;
+ expected_entries = {
+ this->create_entry(234, 123),
+ this->create_entry(234, 124),
+ this->create_entry(234, 125)};
+ ASSERT_EQ(expected_entries, entries);
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
+ ASSERT_EQ(125U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchSkip) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions;
+ positions = {
+ cls::journal::ObjectPosition(0, 234, 125),
+ cls::journal::ObjectPosition(1, 234, 124) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 0, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
+ ASSERT_EQ(125U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ Entries expected_entries;
+ expected_entries = {
+ this->create_entry(234, 122),
+ this->create_entry(234, 123)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions;
+ positions = {
+ cls::journal::ObjectPosition(2, 234, 122),
+ cls::journal::ObjectPosition(1, 234, 121),
+ cls::journal::ObjectPosition(0, 234, 120)};
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid, 14, 3));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
+ ASSERT_EQ(124U, last_tid);
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
+ ASSERT_EQ(0U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+ ASSERT_TRUE(this->wait_for_complete(player));
+ ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid, 14, 4));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, metadata->set_active_set(1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
+ ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 7, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(2, 852),
+ this->create_entry(2, 853),
+ this->create_entry(2, 854),
+ this->create_entry(3, 0),
+ this->create_entry(3, 1),
+ this->create_entry(3, 2),
+ this->create_entry(3, 3)};
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_TRUE(this->wait_for_complete(player));
+ ASSERT_EQ(0, this->m_replay_hander.complete_result);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, metadata->set_active_set(2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(0, 0),
+ this->create_entry(0, 1),
+ this->create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(0, 0),
+ this->create_entry(0, 1),
+ this->create_entry(0, 2),
+ this->create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(0, 1, 0) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_TRUE(this->wait_for_complete(player));
+ ASSERT_EQ(0, this->m_replay_hander.complete_result);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+ ASSERT_TRUE(this->wait_for_complete(player));
+ ASSERT_EQ(0, this->m_replay_hander.complete_result);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions;
+ positions = {
+ cls::journal::ObjectPosition(0, 234, 122)};
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries;
+ expected_entries = {this->create_entry(234, 123)};
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ expected_entries = {this->create_entry(234, 124)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid, 14, 3));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+ ASSERT_EQ(0, metadata->set_active_set(2));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124));
+ ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125));
+ ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 5, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ Entries expected_entries;
+ expected_entries = {
+ this->create_entry(234, 122),
+ this->create_entry(234, 123),
+ this->create_entry(234, 124),
+ this->create_entry(234, 125),
+ this->create_entry(234, 126)};
+ ASSERT_EQ(expected_entries, entries);
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
+ ASSERT_EQ(126U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, ImbalancedJournal) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(9, 300, 1),
+ cls::journal::ObjectPosition(8, 300, 0),
+ cls::journal::ObjectPosition(10, 200, 4334),
+ cls::journal::ObjectPosition(11, 200, 4331) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid, 14, 4));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+ ASSERT_EQ(0, metadata->set_active_set(2));
+ metadata->set_minimum_set(2);
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334));
+ ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331));
+ ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ Entries expected_entries;
+ expected_entries = {
+ this->create_entry(301, 0),
+ this->create_entry(301, 1),
+ this->create_entry(301, 2),
+ this->create_entry(301, 3)};
+ ASSERT_EQ(expected_entries, entries);
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid));
+ ASSERT_EQ(3U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(0, 0),
+ this->create_entry(0, 1),
+ this->create_entry(0, 2)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
+ ASSERT_EQ(0, metadata->set_active_set(1));
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ expected_entries = {
+ this->create_entry(0, 3),
+ this->create_entry(0, 4),
+ this->create_entry(0, 5)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid, 14, 4));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(2, 852),
+ this->create_entry(2, 853),
+ this->create_entry(2, 854)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
+ ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
+
+ expected_entries = {
+ this->create_entry(3, 0),
+ this->create_entry(3, 1),
+ this->create_entry(3, 2),
+ this->create_entry(3, 3)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, metadata->set_active_set(2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(0, 0),
+ this->create_entry(0, 1),
+ this->create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ C_SaferCond ctx1;
+ cls::journal::Tag tag1;
+ metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ ASSERT_EQ(0, metadata->set_active_set(0));
+ ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(tag1.tid, 0),
+ this->create_entry(tag1.tid, 1),
+ this->create_entry(tag1.tid, 2)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ C_SaferCond ctx2;
+ cls::journal::Tag tag2;
+ metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0));
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ expected_entries = {
+ this->create_entry(tag2.tid, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(0, 1, 0) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(1, 0, 1),
+ cls::journal::ObjectPosition(0, 0, 0)};
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, metadata->set_active_set(1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+
+ // should remove player for offset 3 after refetching
+ ASSERT_EQ(0, metadata->set_active_set(3));
+ ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1));
+
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ expected_entries = {
+ this->create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefechShutDown) {
+ std::string oid = this->get_temp_oid();
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, {}));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+ player->prefetch();
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) {
+ std::string oid = this->get_temp_oid();
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, {}));
+
+ auto metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+ player->prefetch_and_watch(0.25);
+}
+
diff --git a/src/test/journal/test_JournalRecorder.cc b/src/test/journal/test_JournalRecorder.cc
new file mode 100644
index 000000000..466ee2741
--- /dev/null
+++ b/src/test/journal/test_JournalRecorder.cc
@@ -0,0 +1,174 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalRecorder.h"
+#include "journal/Entry.h"
+#include "journal/JournalMetadata.h"
+#include "test/journal/RadosTestFixture.h"
+#include <limits>
+#include <list>
+#include <memory>
+
+class TestJournalRecorder : public RadosTestFixture {
+public:
+ using JournalRecorderPtr = std::unique_ptr<journal::JournalRecorder,
+ std::function<void(journal::JournalRecorder*)>>;
+ JournalRecorderPtr create_recorder(
+ const std::string &oid, const ceph::ref_t<journal::JournalMetadata>& metadata) {
+ JournalRecorderPtr recorder{
+ new journal::JournalRecorder(m_ioctx, oid + ".", metadata, 0),
+ [](journal::JournalRecorder* recorder) {
+ C_SaferCond cond;
+ recorder->shut_down(&cond);
+ cond.wait();
+ delete recorder;
+ }
+ };
+ recorder->set_append_batch_options(0, std::numeric_limits<uint32_t>::max(), 0);
+ return recorder;
+ }
+};
+
+TEST_F(TestJournalRecorder, Append) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ JournalRecorderPtr recorder = create_recorder(oid, metadata);
+
+ journal::Future future1 = recorder->append(123, create_payload("payload"));
+
+ C_SaferCond cond;
+ future1.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestJournalRecorder, AppendKnownOverflow) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_EQ(0U, metadata->get_active_set());
+
+ JournalRecorderPtr recorder = create_recorder(oid, metadata);
+
+ recorder->append(123, create_payload(std::string(metadata->get_object_size() -
+ journal::Entry::get_fixed_size(), '1')));
+ journal::Future future2 = recorder->append(123, create_payload(std::string(1, '2')));
+
+ C_SaferCond cond;
+ future2.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ ASSERT_EQ(1U, metadata->get_active_set());
+}
+
+TEST_F(TestJournalRecorder, AppendDelayedOverflow) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_EQ(0U, metadata->get_active_set());
+
+ JournalRecorderPtr recorder1 = create_recorder(oid, metadata);
+ JournalRecorderPtr recorder2 = create_recorder(oid, metadata);
+
+ recorder1->append(234, create_payload(std::string(1, '1')));
+ recorder2->append(123, create_payload(std::string(metadata->get_object_size() -
+ journal::Entry::get_fixed_size(), '2')));
+
+ journal::Future future = recorder2->append(123, create_payload(std::string(1, '3')));
+
+ C_SaferCond cond;
+ future.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ ASSERT_EQ(1U, metadata->get_active_set());
+}
+
+TEST_F(TestJournalRecorder, FutureFlush) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ JournalRecorderPtr recorder = create_recorder(oid, metadata);
+
+ journal::Future future1 = recorder->append(123, create_payload("payload1"));
+ journal::Future future2 = recorder->append(123, create_payload("payload2"));
+
+ C_SaferCond cond;
+ future2.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_TRUE(future1.is_complete());
+ ASSERT_TRUE(future2.is_complete());
+}
+
+TEST_F(TestJournalRecorder, Flush) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ JournalRecorderPtr recorder = create_recorder(oid, metadata);
+
+ journal::Future future1 = recorder->append(123, create_payload("payload1"));
+ journal::Future future2 = recorder->append(123, create_payload("payload2"));
+
+ C_SaferCond cond1;
+ recorder->flush(&cond1);
+ ASSERT_EQ(0, cond1.wait());
+
+ C_SaferCond cond2;
+ future2.wait(&cond2);
+ ASSERT_EQ(0, cond2.wait());
+ ASSERT_TRUE(future1.is_complete());
+ ASSERT_TRUE(future2.is_complete());
+}
+
+TEST_F(TestJournalRecorder, OverflowCommitObjectNumber) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_EQ(0U, metadata->get_active_set());
+
+ JournalRecorderPtr recorder = create_recorder(oid, metadata);
+
+ recorder->append(123, create_payload(std::string(metadata->get_object_size() -
+ journal::Entry::get_fixed_size(), '1')));
+ journal::Future future2 = recorder->append(124, create_payload(std::string(1, '2')));
+
+ C_SaferCond cond;
+ future2.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ ASSERT_EQ(1U, metadata->get_active_set());
+
+ uint64_t object_num;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ metadata->get_commit_entry(1, &object_num, &tag_tid, &entry_tid);
+ ASSERT_EQ(0U, object_num);
+ ASSERT_EQ(123U, tag_tid);
+ ASSERT_EQ(0U, entry_tid);
+
+ metadata->get_commit_entry(2, &object_num, &tag_tid, &entry_tid);
+ ASSERT_EQ(2U, object_num);
+ ASSERT_EQ(124U, tag_tid);
+ ASSERT_EQ(0U, entry_tid);
+}
+
diff --git a/src/test/journal/test_JournalTrimmer.cc b/src/test/journal/test_JournalTrimmer.cc
new file mode 100644
index 000000000..aaf10979f
--- /dev/null
+++ b/src/test/journal/test_JournalTrimmer.cc
@@ -0,0 +1,197 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalTrimmer.h"
+#include "journal/JournalMetadata.h"
+#include "include/stringify.h"
+#include "test/journal/RadosTestFixture.h"
+#include <limits>
+#include <list>
+
+class TestJournalTrimmer : public RadosTestFixture {
+public:
+
+ void TearDown() override {
+ for (MetadataList::iterator it = m_metadata_list.begin();
+ it != m_metadata_list.end(); ++it) {
+ (*it)->remove_listener(&m_listener);
+ }
+ m_metadata_list.clear();
+
+ for (std::list<journal::JournalTrimmer*>::iterator it = m_trimmers.begin();
+ it != m_trimmers.end(); ++it) {
+ C_SaferCond ctx;
+ (*it)->shut_down(&ctx);
+ ASSERT_EQ(0, ctx.wait());
+ delete *it;
+ }
+ RadosTestFixture::TearDown();
+ }
+
+ int append_payload(const ceph::ref_t<journal::JournalMetadata>& metadata,
+ const std::string &oid, uint64_t object_num,
+ const std::string &payload, uint64_t *commit_tid) {
+ int r = append(oid + "." + stringify(object_num), create_payload(payload));
+ uint64_t tid = metadata->allocate_commit_tid(object_num, 234, 123);
+ if (commit_tid != NULL) {
+ *commit_tid = tid;
+ }
+ return r;
+ }
+
+ auto create_metadata(const std::string &oid) {
+ auto metadata = RadosTestFixture::create_metadata(oid);
+ m_metadata_list.push_back(metadata);
+ metadata->add_listener(&m_listener);
+ return metadata;
+ }
+
+ journal::JournalTrimmer *create_trimmer(const std::string &oid,
+ const ceph::ref_t<journal::JournalMetadata>& metadata) {
+ journal::JournalTrimmer *trimmer(new journal::JournalTrimmer(
+ m_ioctx, oid + ".", metadata));
+ m_trimmers.push_back(trimmer);
+ return trimmer;
+ }
+
+ int assert_exists(const std::string &oid) {
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ return m_ioctx.operate(oid, &op);
+ }
+
+ typedef std::list<ceph::ref_t<journal::JournalMetadata>> MetadataList;
+ MetadataList m_metadata_list;
+ std::list<journal::JournalTrimmer*> m_trimmers;
+};
+
+TEST_F(TestJournalTrimmer, Committed) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ ASSERT_EQ(0, metadata->set_active_set(10));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ uint64_t commit_tid1;
+ uint64_t commit_tid2;
+ uint64_t commit_tid3;
+ uint64_t commit_tid4;
+ uint64_t commit_tid5;
+ uint64_t commit_tid6;
+ ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid1));
+ ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid2));
+ ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid3));
+ ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid4));
+ ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid5));
+ ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid6));
+
+ journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);
+
+ trimmer->committed(commit_tid4);
+ trimmer->committed(commit_tid6);
+ trimmer->committed(commit_tid2);
+ trimmer->committed(commit_tid5);
+ trimmer->committed(commit_tid3);
+ trimmer->committed(commit_tid1);
+ while (metadata->get_minimum_set() != 2U) {
+ ASSERT_TRUE(wait_for_update(metadata));
+ }
+
+ ASSERT_EQ(-ENOENT, assert_exists(oid + ".0"));
+ ASSERT_EQ(-ENOENT, assert_exists(oid + ".2"));
+ ASSERT_EQ(0, assert_exists(oid + ".5"));
+}
+
+TEST_F(TestJournalTrimmer, CommittedWithOtherClient) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_register(oid, "client2", "slow client"));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ ASSERT_EQ(0, metadata->set_active_set(10));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ uint64_t commit_tid1;
+ uint64_t commit_tid2;
+ uint64_t commit_tid3;
+ uint64_t commit_tid4;
+ ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid1));
+ ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid2));
+ ASSERT_EQ(0, append_payload(metadata, oid, 3, "payload", &commit_tid3));
+ ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid4));
+
+ journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);
+
+ trimmer->committed(commit_tid1);
+ trimmer->committed(commit_tid2);
+ trimmer->committed(commit_tid3);
+ trimmer->committed(commit_tid4);
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ ASSERT_EQ(0, assert_exists(oid + ".0"));
+ ASSERT_EQ(0, assert_exists(oid + ".2"));
+ ASSERT_EQ(0, assert_exists(oid + ".3"));
+ ASSERT_EQ(0, assert_exists(oid + ".5"));
+}
+
+TEST_F(TestJournalTrimmer, RemoveObjects) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ ASSERT_EQ(0, metadata->set_active_set(10));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ ASSERT_EQ(0, append(oid + ".0", create_payload("payload")));
+ ASSERT_EQ(0, append(oid + ".2", create_payload("payload")));
+ ASSERT_EQ(0, append(oid + ".3", create_payload("payload")));
+ ASSERT_EQ(0, append(oid + ".5", create_payload("payload")));
+
+ journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);
+
+ C_SaferCond cond;
+ trimmer->remove_objects(false, &cond);
+ ASSERT_EQ(0, cond.wait());
+
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ ASSERT_EQ(-ENOENT, assert_exists(oid + ".0"));
+ ASSERT_EQ(-ENOENT, assert_exists(oid + ".2"));
+ ASSERT_EQ(-ENOENT, assert_exists(oid + ".3"));
+ ASSERT_EQ(-ENOENT, assert_exists(oid + ".5"));
+}
+
+TEST_F(TestJournalTrimmer, RemoveObjectsWithOtherClient) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_register(oid, "client2", "other client"));
+
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);
+
+ C_SaferCond ctx1;
+ trimmer->remove_objects(false, &ctx1);
+ ASSERT_EQ(-EBUSY, ctx1.wait());
+
+ C_SaferCond ctx2;
+ trimmer->remove_objects(true, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+}
+
diff --git a/src/test/journal/test_Journaler.cc b/src/test/journal/test_Journaler.cc
new file mode 100644
index 000000000..836816581
--- /dev/null
+++ b/src/test/journal/test_Journaler.cc
@@ -0,0 +1,198 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/stringify.h"
+
+#include "journal/Journaler.h"
+#include "journal/Settings.h"
+
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+
+#include "gtest/gtest.h"
+
+// reinclude our assert to clobber the system one
+#include "include/ceph_assert.h"
+
+class TestJournaler : public RadosTestFixture {
+public:
+
+ static const std::string CLIENT_ID;
+
+ static std::string get_temp_journal_id() {
+ return stringify(++_journal_id);
+ }
+
+ void SetUp() override {
+ RadosTestFixture::SetUp();
+ m_journal_id = get_temp_journal_id();
+ m_journaler = new journal::Journaler(m_work_queue, m_timer, &m_timer_lock,
+ m_ioctx, m_journal_id, CLIENT_ID, {},
+ nullptr);
+ }
+
+ void TearDown() override {
+ delete m_journaler;
+ RadosTestFixture::TearDown();
+ }
+
+ int create_journal(uint8_t order, uint8_t splay_width) {
+ C_SaferCond cond;
+ m_journaler->create(order, splay_width, -1, &cond);
+ return cond.wait();
+ }
+
+ int init_journaler() {
+ C_SaferCond cond;
+ m_journaler->init(&cond);
+ return cond.wait();
+ }
+
+ int shut_down_journaler() {
+ C_SaferCond ctx;
+ m_journaler->shut_down(&ctx);
+ return ctx.wait();
+ }
+
+ int register_client(const std::string &client_id, const std::string &desc) {
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx,
+ m_journal_id, client_id, {}, nullptr);
+ bufferlist data;
+ data.append(desc);
+ C_SaferCond cond;
+ journaler.register_client(data, &cond);
+ return cond.wait();
+ }
+
+ int update_client(const std::string &client_id, const std::string &desc) {
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx,
+ m_journal_id, client_id, {}, nullptr);
+ bufferlist data;
+ data.append(desc);
+ C_SaferCond cond;
+ journaler.update_client(data, &cond);
+ return cond.wait();
+ }
+
+ int unregister_client(const std::string &client_id) {
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx,
+ m_journal_id, client_id, {}, nullptr);
+ C_SaferCond cond;
+ journaler.unregister_client(&cond);
+ return cond.wait();
+ }
+
+ static uint64_t _journal_id;
+
+ std::string m_journal_id;
+ journal::Journaler *m_journaler;
+};
+
+const std::string TestJournaler::CLIENT_ID = "client1";
+uint64_t TestJournaler::_journal_id = 0;
+
+TEST_F(TestJournaler, Create) {
+ ASSERT_EQ(0, create_journal(12, 8));
+}
+
+TEST_F(TestJournaler, CreateDuplicate) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(-EEXIST, create_journal(12, 8));
+}
+
+TEST_F(TestJournaler, CreateInvalidParams) {
+ ASSERT_EQ(-EDOM, create_journal(1, 8));
+ ASSERT_EQ(-EDOM, create_journal(123, 8));
+ ASSERT_EQ(-EINVAL, create_journal(12, 0));
+}
+
+TEST_F(TestJournaler, Init) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+ ASSERT_EQ(0, init_journaler());
+ ASSERT_EQ(0, shut_down_journaler());
+}
+
+TEST_F(TestJournaler, InitDNE) {
+ ASSERT_EQ(-ENOENT, init_journaler());
+ ASSERT_EQ(0, shut_down_journaler());
+}
+
+TEST_F(TestJournaler, RegisterClientDuplicate) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+ ASSERT_EQ(-EEXIST, register_client(CLIENT_ID, "foo2"));
+}
+
+TEST_F(TestJournaler, UpdateClient) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+ ASSERT_EQ(0, update_client(CLIENT_ID, "foo2"));
+}
+
+TEST_F(TestJournaler, UpdateClientDNE) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(-ENOENT, update_client(CLIENT_ID, "foo"));
+}
+
+TEST_F(TestJournaler, UnregisterClient) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+ ASSERT_EQ(0, unregister_client(CLIENT_ID));
+ // Test it does not exist and can be registered again
+ ASSERT_EQ(-ENOENT, update_client(CLIENT_ID, "foo"));
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+}
+
+TEST_F(TestJournaler, UnregisterClientDNE) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(-ENOENT, unregister_client(CLIENT_ID));
+}
+
+TEST_F(TestJournaler, AllocateTag) {
+ ASSERT_EQ(0, create_journal(12, 8));
+
+ cls::journal::Tag tag;
+
+ bufferlist data;
+ data.append(std::string(128, '1'));
+
+ // allocate a new tag class
+ C_SaferCond ctx1;
+ m_journaler->allocate_tag(data, &tag, &ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+ ASSERT_EQ(cls::journal::Tag(0, 0, data), tag);
+
+ // re-use an existing tag class
+ C_SaferCond ctx2;
+ m_journaler->allocate_tag(tag.tag_class, bufferlist(), &tag, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+ ASSERT_EQ(cls::journal::Tag(1, 0, bufferlist()), tag);
+}
+
+TEST_F(TestJournaler, GetTags) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+
+ std::list<cls::journal::Tag> expected_tags;
+ for (size_t i = 0; i < 256; ++i) {
+ C_SaferCond ctx;
+ cls::journal::Tag tag;
+ if (i < 2) {
+ m_journaler->allocate_tag(bufferlist(), &tag, &ctx);
+ } else {
+ m_journaler->allocate_tag(i % 2, bufferlist(), &tag, &ctx);
+ }
+ ASSERT_EQ(0, ctx.wait());
+
+ if (i % 2 == 0) {
+ expected_tags.push_back(tag);
+ }
+ }
+
+ std::list<cls::journal::Tag> tags;
+ C_SaferCond ctx;
+ m_journaler->get_tags(0, &tags, &ctx);
+ ASSERT_EQ(0, ctx.wait());
+ ASSERT_EQ(expected_tags, tags);
+}
diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc
new file mode 100644
index 000000000..5ac3d8b12
--- /dev/null
+++ b/src/test/journal/test_ObjectPlayer.cc
@@ -0,0 +1,281 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectPlayer.h"
+#include "journal/Entry.h"
+#include "include/stringify.h"
+#include "common/Timer.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+
+template <typename T>
+class TestObjectPlayer : public RadosTestFixture, public T {
+public:
+ auto create_object(const std::string &oid, uint8_t order) {
+ auto object = ceph::make_ref<journal::ObjectPlayer>(
+ m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order,
+ T::max_fetch_bytes);
+ return object;
+ }
+
+ int fetch(const ceph::ref_t<journal::ObjectPlayer>& object_player) {
+ while (true) {
+ C_SaferCond ctx;
+ object_player->set_refetch_state(
+ journal::ObjectPlayer::REFETCH_STATE_NONE);
+ object_player->fetch(&ctx);
+ int r = ctx.wait();
+ if (r < 0 || !object_player->refetch_required()) {
+ return r;
+ }
+ }
+ return 0;
+ }
+
+ int watch_and_wait_for_entries(const ceph::ref_t<journal::ObjectPlayer>& object_player,
+ journal::ObjectPlayer::Entries *entries,
+ size_t count) {
+ for (size_t i = 0; i < 50; ++i) {
+ object_player->get_entries(entries);
+ if (entries->size() == count) {
+ break;
+ }
+
+ C_SaferCond ctx;
+ object_player->watch(&ctx, 0.1);
+
+ int r = ctx.wait();
+ if (r < 0) {
+ return r;
+ }
+ }
+ return 0;
+ }
+
+ std::string get_object_name(const std::string &oid) {
+ return oid + ".0";
+ }
+};
+
+template <uint32_t _max_fetch_bytes>
+struct TestObjectPlayerParams {
+ static inline const uint32_t max_fetch_bytes = _max_fetch_bytes;
+};
+
+typedef ::testing::Types<TestObjectPlayerParams<0>,
+ TestObjectPlayerParams<10> > TestObjectPlayerTypes;
+TYPED_TEST_SUITE(TestObjectPlayer, TestObjectPlayerTypes);
+
+TYPED_TEST(TestObjectPlayer, Fetch) {
+ std::string oid = this->get_temp_oid();
+
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
+
+ bufferlist bl;
+ encode(entry1, bl);
+ encode(entry2, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+
+ auto object = this->create_object(oid, 14);
+ ASSERT_LE(0, this->fetch(object));
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = {entry1, entry2};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestObjectPlayer, FetchLarge) {
+ std::string oid = this->get_temp_oid();
+
+ journal::Entry entry1(234, 123,
+ this->create_payload(std::string(8192 - 32, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(""));
+
+ bufferlist bl;
+ encode(entry1, bl);
+ encode(entry2, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+
+ auto object = this->create_object(oid, 12);
+ ASSERT_LE(0, this->fetch(object));
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = {entry1, entry2};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestObjectPlayer, FetchDeDup) {
+ std::string oid = this->get_temp_oid();
+
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 123, this->create_payload(std::string(24, '2')));
+
+ bufferlist bl;
+ encode(entry1, bl);
+ encode(entry2, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+
+ auto object = this->create_object(oid, 14);
+ ASSERT_LE(0, this->fetch(object));
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(1U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = {entry2};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestObjectPlayer, FetchEmpty) {
+ std::string oid = this->get_temp_oid();
+
+ bufferlist bl;
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+
+ auto object = this->create_object(oid, 14);
+
+ ASSERT_EQ(0, this->fetch(object));
+ ASSERT_TRUE(object->empty());
+}
+
+TYPED_TEST(TestObjectPlayer, FetchCorrupt) {
+ std::string oid = this->get_temp_oid();
+
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '2')));
+
+ bufferlist bl;
+ encode(entry1, bl);
+ encode(this->create_payload("corruption" + std::string(1024, 'X')), bl);
+ encode(entry2, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+
+ auto object = this->create_object(oid, 14);
+ ASSERT_EQ(-EBADMSG, this->fetch(object));
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(1U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = {entry1};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestObjectPlayer, FetchAppend) {
+ std::string oid = this->get_temp_oid();
+
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '2')));
+
+ bufferlist bl;
+ encode(entry1, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+
+ auto object = this->create_object(oid, 14);
+ ASSERT_LE(0, this->fetch(object));
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(1U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = {entry1};
+ ASSERT_EQ(expected_entries, entries);
+
+ bl.clear();
+ encode(entry2, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+ ASSERT_LE(0, this->fetch(object));
+
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ expected_entries = {entry1, entry2};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestObjectPlayer, PopEntry) {
+ std::string oid = this->get_temp_oid();
+
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
+
+ bufferlist bl;
+ encode(entry1, bl);
+ encode(entry2, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+
+ auto object = this->create_object(oid, 14);
+ ASSERT_LE(0, this->fetch(object));
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ journal::Entry entry;
+ object->front(&entry);
+ object->pop_front();
+ ASSERT_EQ(entry1, entry);
+ object->front(&entry);
+ object->pop_front();
+ ASSERT_EQ(entry2, entry);
+ ASSERT_TRUE(object->empty());
+}
+
+TYPED_TEST(TestObjectPlayer, Watch) {
+ std::string oid = this->get_temp_oid();
+ auto object = this->create_object(oid, 14);
+
+ C_SaferCond cond1;
+ object->watch(&cond1, 0.1);
+
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
+
+ bufferlist bl;
+ encode(entry1, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+ ASSERT_LE(0, cond1.wait());
+
+ journal::ObjectPlayer::Entries entries;
+ ASSERT_EQ(0, this->watch_and_wait_for_entries(object, &entries, 1U));
+ ASSERT_EQ(1U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries;
+ expected_entries = {entry1};
+ ASSERT_EQ(expected_entries, entries);
+
+ C_SaferCond cond2;
+ object->watch(&cond2, 0.1);
+
+ bl.clear();
+ encode(entry2, bl);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+ ASSERT_LE(0, cond2.wait());
+
+ ASSERT_EQ(0, this->watch_and_wait_for_entries(object, &entries, 2U));
+ ASSERT_EQ(2U, entries.size());
+
+ expected_entries = {entry1, entry2};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestObjectPlayer, Unwatch) {
+ std::string oid = this->get_temp_oid();
+ auto object = this->create_object(oid, 14);
+
+ C_SaferCond watch_ctx;
+ object->watch(&watch_ctx, 600);
+
+ usleep(200000);
+
+ object->unwatch();
+ ASSERT_EQ(-ECANCELED, watch_ctx.wait());
+}
diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc
new file mode 100644
index 000000000..ac110a23e
--- /dev/null
+++ b/src/test/journal/test_ObjectRecorder.cc
@@ -0,0 +1,463 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectRecorder.h"
+#include "common/Cond.h"
+#include "common/ceph_mutex.h"
+#include "common/Timer.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+#include <limits>
+
+using std::shared_ptr;
+
+class TestObjectRecorder : public RadosTestFixture {
+public:
+ TestObjectRecorder() = default;
+
+ struct Handler : public journal::ObjectRecorder::Handler {
+ ceph::mutex lock = ceph::make_mutex("lock");
+ ceph::mutex* object_lock = nullptr;
+ ceph::condition_variable cond;
+ bool is_closed = false;
+ uint32_t overflows = 0;
+
+ Handler() = default;
+
+ void closed(journal::ObjectRecorder *object_recorder) override {
+ std::lock_guard locker{lock};
+ is_closed = true;
+ cond.notify_all();
+ }
+ void overflow(journal::ObjectRecorder *object_recorder) override {
+ std::lock_guard locker{lock};
+ journal::AppendBuffers append_buffers;
+ object_lock->lock();
+ object_recorder->claim_append_buffers(&append_buffers);
+ object_lock->unlock();
+
+ ++overflows;
+ cond.notify_all();
+ }
+ };
+
+ // flush the pending buffers in dtor
+ class ObjectRecorderFlusher {
+ public:
+ ObjectRecorderFlusher(librados::IoCtx& ioctx,
+ ContextWQ* work_queue)
+ : m_ioctx{ioctx},
+ m_work_queue{work_queue}
+ {}
+ ObjectRecorderFlusher(librados::IoCtx& ioctx,
+ ContextWQ* work_queue,
+ uint32_t flush_interval,
+ uint16_t flush_bytes,
+ double flush_age,
+ int max_in_flight)
+ : m_ioctx{ioctx},
+ m_work_queue{work_queue},
+ m_flush_interval{flush_interval},
+ m_flush_bytes{flush_bytes},
+ m_flush_age{flush_age},
+ m_max_in_flight_appends{max_in_flight < 0 ?
+ std::numeric_limits<uint64_t>::max() :
+ static_cast<uint64_t>(max_in_flight)}
+ {}
+ ~ObjectRecorderFlusher() {
+ for (auto& [object_recorder, m] : m_object_recorders) {
+ C_SaferCond cond;
+ object_recorder->flush(&cond);
+ cond.wait();
+ std::scoped_lock l{*m};
+ if (!object_recorder->is_closed()) {
+ object_recorder->close();
+ }
+ }
+ }
+ auto create_object(std::string_view oid, uint8_t order, ceph::mutex* lock) {
+ auto object = ceph::make_ref<journal::ObjectRecorder>(
+ m_ioctx, oid, 0, lock, m_work_queue, &m_handler,
+ order, m_max_in_flight_appends);
+ {
+ std::lock_guard locker{*lock};
+ object->set_append_batch_options(m_flush_interval,
+ m_flush_bytes,
+ m_flush_age);
+ }
+ m_object_recorders.emplace_back(object, lock);
+ m_handler.object_lock = lock;
+ return object;
+ }
+ bool wait_for_closed() {
+ std::unique_lock locker{m_handler.lock};
+ return m_handler.cond.wait_for(locker, 10s,
+ [this] { return m_handler.is_closed; });
+ }
+ bool wait_for_overflow() {
+ std::unique_lock locker{m_handler.lock};
+ if (m_handler.cond.wait_for(locker, 10s,
+ [this] { return m_handler.overflows > 0; })) {
+ m_handler.overflows = 0;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ private:
+ librados::IoCtx& m_ioctx;
+ ContextWQ *m_work_queue;
+ uint32_t m_flush_interval = std::numeric_limits<uint32_t>::max();
+ uint64_t m_flush_bytes = std::numeric_limits<uint64_t>::max();
+ double m_flush_age = 600;
+ uint64_t m_max_in_flight_appends = 0;
+ using ObjectRecorders =
+ std::list<std::pair<ceph::ref_t<journal::ObjectRecorder>, ceph::mutex*>>;
+ ObjectRecorders m_object_recorders;
+ Handler m_handler;
+ };
+
+ journal::AppendBuffer create_append_buffer(uint64_t tag_tid,
+ uint64_t entry_tid,
+ const std::string &payload) {
+ auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456);
+ future->init(ceph::ref_t<journal::FutureImpl>());
+
+ bufferlist bl;
+ bl.append(payload);
+ return std::make_pair(future, bl);
+ }
+};
+
+TEST_F(TestObjectRecorder, Append) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0);
+ auto object = flusher.create_object(oid, 24, &lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ "payload");
+ append_buffers = {append_buffer2};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByCount) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ "payload");
+ append_buffers = {append_buffer2};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByBytes) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ "payload");
+ append_buffers = {append_buffer2};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByAge) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+
+ uint32_t offset = 0;
+ journal::AppendBuffer append_buffer2;
+ while (!append_buffer1.first->is_flush_in_progress() &&
+ !append_buffer1.first->is_complete()) {
+ usleep(1000);
+
+ append_buffer2 = create_append_buffer(234, 124 + offset, "payload");
+ ++offset;
+ append_buffers = {append_buffer2};
+
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ }
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, AppendFilledObject) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1);
+ auto object = flusher.create_object(oid, 12, &lock);
+
+ std::string payload(2048, '1');
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ payload);
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ payload);
+ append_buffers = {append_buffer2};
+ lock.lock();
+ ASSERT_TRUE(object->append(std::move(append_buffers)));
+ lock.unlock();
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, Flush) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ C_SaferCond cond1;
+ object->flush(&cond1);
+ ASSERT_EQ(0, cond1.wait());
+
+ C_SaferCond cond2;
+ append_buffer1.first->wait(&cond2);
+ ASSERT_EQ(0, cond2.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, FlushFuture) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
+
+ journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer.first->wait(&cond);
+ object->flush(append_buffer.first);
+ ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
+ append_buffer.first->is_complete());
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, FlushDetachedFuture) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+ auto object = flusher.create_object(oid, 24, &lock);
+
+ journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
+ "payload");
+
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer};
+
+ object->flush(append_buffer.first);
+ ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+
+ // should automatically flush once its attached to the object
+ C_SaferCond cond;
+ append_buffer.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, Close) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ lock.lock();
+ ASSERT_FALSE(object->close());
+ ASSERT_TRUE(ceph_mutex_is_locked(lock));
+ lock.unlock();
+
+ ASSERT_TRUE(flusher.wait_for_closed());
+
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, Overflow) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ auto metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1");
+ ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2");
+
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+ auto object1 = flusher.create_object(oid, 12, &lock1);
+
+ std::string payload(1 << 11, '1');
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ payload);
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ payload);
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1, append_buffer2};
+ lock1.lock();
+ ASSERT_TRUE(object1->append(std::move(append_buffers)));
+ lock1.unlock();
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object1->get_pending_appends());
+
+ auto object2 = flusher.create_object(oid, 12, &lock2);
+
+ journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
+ payload);
+ append_buffers = {append_buffer3};
+ lock2.lock();
+ ASSERT_FALSE(object2->append(std::move(append_buffers)));
+ lock2.unlock();
+ append_buffer3.first->flush(NULL);
+
+ ASSERT_TRUE(flusher.wait_for_overflow());
+}
diff --git a/src/test/journal/test_main.cc b/src/test/journal/test_main.cc
new file mode 100644
index 000000000..afb535d31
--- /dev/null
+++ b/src/test/journal/test_main.cc
@@ -0,0 +1,27 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "gtest/gtest.h"
+#include "common/ceph_argparse.h"
+#include "common/ceph_crypto.h"
+#include "common/config_proxy.h"
+#include "global/global_context.h"
+#include "global/global_init.h"
+#include <vector>
+
+int main(int argc, char **argv)
+{
+ ::testing::InitGoogleTest(&argc, argv);
+
+ std::vector<const char*> args;
+ argv_to_vec(argc, (const char **)argv, args);
+
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_OSD,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_MON_CONFIG);
+ g_conf().set_val("lockdep", "true");
+ common_init_finish(g_ceph_context);
+
+ int r = RUN_ALL_TESTS();
+ return r;
+}