summaryrefslogtreecommitdiffstats
path: root/src/test/journal/test_JournalPlayer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/journal/test_JournalPlayer.cc')
-rw-r--r--src/test/journal/test_JournalPlayer.cc998
1 files changed, 998 insertions, 0 deletions
diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc
new file mode 100644
index 00000000..3ae04c07
--- /dev/null
+++ b/src/test/journal/test_JournalPlayer.cc
@@ -0,0 +1,998 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalPlayer.h"
+#include "journal/Entry.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ReplayHandler.h"
+#include "include/stringify.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "gtest/gtest.h"
+#include "test/journal/RadosTestFixture.h"
+#include <list>
+#include <boost/scope_exit.hpp>
+
+typedef std::list<journal::Entry> Entries;
+
+template <typename T>
+class TestJournalPlayer : public RadosTestFixture {
+public:
+ typedef std::list<journal::JournalPlayer *> JournalPlayers;
+
+ static const uint64_t max_fetch_bytes = T::max_fetch_bytes;
+
+ struct ReplayHandler : public journal::ReplayHandler {
+ Mutex lock;
+ Cond cond;
+ bool entries_available;
+ bool complete;
+ int complete_result;
+
+ ReplayHandler()
+ : lock("lock"), entries_available(false), complete(false),
+ complete_result(0) {}
+
+ void get() override {}
+ void put() override {}
+
+ void handle_entries_available() override {
+ Mutex::Locker locker(lock);
+ entries_available = true;
+ cond.Signal();
+ }
+
+ void handle_complete(int r) override {
+ Mutex::Locker locker(lock);
+ complete = true;
+ complete_result = r;
+ cond.Signal();
+ }
+ };
+
+ void TearDown() override {
+ for (JournalPlayers::iterator it = m_players.begin();
+ it != m_players.end(); ++it) {
+ delete *it;
+ }
+ RadosTestFixture::TearDown();
+ }
+
+ journal::JournalMetadataPtr create_metadata(const std::string &oid) {
+ return RadosTestFixture::create_metadata(oid, "client", 0.1,
+ max_fetch_bytes);
+ }
+
+ int client_commit(const std::string &oid,
+ journal::JournalPlayer::ObjectSetPosition position) {
+ return RadosTestFixture::client_commit(oid, "client", position);
+ }
+
+ journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) {
+ std::string payload(128, '0');
+ bufferlist payload_bl;
+ payload_bl.append(payload);
+ return journal::Entry(tag_tid, entry_tid, payload_bl);
+ }
+
+ journal::JournalPlayer *create_player(const std::string &oid,
+ const journal::JournalMetadataPtr &metadata) {
+ journal::JournalPlayer *player(new journal::JournalPlayer(
+ m_ioctx, oid + ".", metadata, &m_replay_hander));
+ m_players.push_back(player);
+ return player;
+ }
+
+ bool wait_for_entries(journal::JournalPlayer *player, uint32_t count,
+ Entries *entries) {
+ entries->clear();
+ while (entries->size() < count) {
+ journal::Entry entry;
+ uint64_t commit_tid;
+ while (entries->size() < count &&
+ player->try_pop_front(&entry, &commit_tid)) {
+ entries->push_back(entry);
+ }
+ if (entries->size() == count) {
+ break;
+ }
+
+ Mutex::Locker locker(m_replay_hander.lock);
+ if (m_replay_hander.entries_available) {
+ m_replay_hander.entries_available = false;
+ } else if (m_replay_hander.cond.WaitInterval(
+ m_replay_hander.lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ return entries->size() == count;
+ }
+
+ bool wait_for_complete(journal::JournalPlayer *player) {
+ Mutex::Locker locker(m_replay_hander.lock);
+ while (!m_replay_hander.complete) {
+ journal::Entry entry;
+ uint64_t commit_tid;
+ player->try_pop_front(&entry, &commit_tid);
+
+ if (m_replay_hander.cond.WaitInterval(
+ m_replay_hander.lock, utime_t(10, 0)) != 0) {
+ return false;
+ }
+ }
+ m_replay_hander.complete = false;
+ return true;
+ }
+
+ int write_entry(const std::string &oid, uint64_t object_num,
+ uint64_t tag_tid, uint64_t entry_tid) {
+ bufferlist bl;
+ encode(create_entry(tag_tid, entry_tid), bl);
+ return append(oid + "." + stringify(object_num), bl);
+ }
+
+ JournalPlayers m_players;
+ ReplayHandler m_replay_hander;
+};
+
+template <uint64_t _max_fetch_bytes>
+class TestJournalPlayerParams {
+public:
+ static const uint64_t max_fetch_bytes = _max_fetch_bytes;
+};
+
+typedef ::testing::Types<TestJournalPlayerParams<0>,
+ TestJournalPlayerParams<16> > TestJournalPlayerTypes;
+TYPED_TEST_CASE(TestJournalPlayer, TestJournalPlayerTypes);
+
+TYPED_TEST(TestJournalPlayer, Prefetch) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions;
+ positions = {
+ cls::journal::ObjectPosition(0, 234, 122) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ Entries expected_entries;
+ expected_entries = {
+ this->create_entry(234, 123),
+ this->create_entry(234, 124),
+ this->create_entry(234, 125)};
+ ASSERT_EQ(expected_entries, entries);
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
+ ASSERT_EQ(125U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchSkip) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions;
+ positions = {
+ cls::journal::ObjectPosition(0, 234, 125),
+ cls::journal::ObjectPosition(1, 234, 124) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 125));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 0, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
+ ASSERT_EQ(125U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchWithoutCommit) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ Entries expected_entries;
+ expected_entries = {
+ this->create_entry(234, 122),
+ this->create_entry(234, 123)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchMultipleTags) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions;
+ positions = {
+ cls::journal::ObjectPosition(2, 234, 122),
+ cls::journal::ObjectPosition(1, 234, 121),
+ cls::journal::ObjectPosition(0, 234, 120)};
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid, 14, 3));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 123));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 124));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 236, 0)); // new tag allocated
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
+ ASSERT_EQ(124U, last_tid);
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
+ ASSERT_EQ(0U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchCorruptSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 121));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 2, &entries));
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+ ASSERT_TRUE(this->wait_for_complete(player));
+ ASSERT_EQ(-ENOMSG, this->m_replay_hander.complete_result);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchMissingSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid, 14, 4));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, metadata->set_active_set(1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
+ ASSERT_EQ(0, this->write_entry(oid, 5, 2, 861));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 5, 3, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 7, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(2, 852),
+ this->create_entry(2, 853),
+ this->create_entry(2, 854),
+ this->create_entry(3, 0),
+ this->create_entry(3, 1),
+ this->create_entry(3, 2),
+ this->create_entry(3, 3)};
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_TRUE(this->wait_for_complete(player));
+ ASSERT_EQ(0, this->m_replay_hander.complete_result);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchLargeMissingSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, metadata->set_active_set(2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(0, 0),
+ this->create_entry(0, 1),
+ this->create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchBlockedNewTag) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(0, 0),
+ this->create_entry(0, 1),
+ this->create_entry(0, 2),
+ this->create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchStaleEntries) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(0, 1, 0) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_TRUE(this->wait_for_complete(player));
+ ASSERT_EQ(0, this->m_replay_hander.complete_result);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchUnexpectedTag) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 235, 121));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+ ASSERT_TRUE(this->wait_for_complete(player));
+ ASSERT_EQ(0, this->m_replay_hander.complete_result);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchAndWatch) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions;
+ positions = {
+ cls::journal::ObjectPosition(0, 234, 122)};
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries;
+ expected_entries = {this->create_entry(234, 123)};
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 124));
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ expected_entries = {this->create_entry(234, 124)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefetchSkippedObject) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid, 14, 3));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+ ASSERT_EQ(0, metadata->set_active_set(2));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, this->write_entry(oid, 5, 234, 124));
+ ASSERT_EQ(0, this->write_entry(oid, 6, 234, 125));
+ ASSERT_EQ(0, this->write_entry(oid, 7, 234, 126));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 5, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ Entries expected_entries;
+ expected_entries = {
+ this->create_entry(234, 122),
+ this->create_entry(234, 123),
+ this->create_entry(234, 124),
+ this->create_entry(234, 125),
+ this->create_entry(234, 126)};
+ ASSERT_EQ(expected_entries, entries);
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
+ ASSERT_EQ(126U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, ImbalancedJournal) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(9, 300, 1),
+ cls::journal::ObjectPosition(8, 300, 0),
+ cls::journal::ObjectPosition(10, 200, 4334),
+ cls::journal::ObjectPosition(11, 200, 4331) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid, 14, 4));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+ ASSERT_EQ(0, metadata->set_active_set(2));
+ metadata->set_minimum_set(2);
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 8, 300, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 8, 301, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 9, 300, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 9, 301, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 10, 200, 4334));
+ ASSERT_EQ(0, this->write_entry(oid, 10, 301, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 11, 200, 4331));
+ ASSERT_EQ(0, this->write_entry(oid, 11, 301, 3));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
+ ASSERT_TRUE(this->wait_for_complete(player));
+
+ Entries expected_entries;
+ expected_entries = {
+ this->create_entry(301, 0),
+ this->create_entry(301, 1),
+ this->create_entry(301, 2),
+ this->create_entry(301, 3)};
+ ASSERT_EQ(expected_entries, entries);
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid));
+ ASSERT_EQ(3U, last_tid);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayLaggyAppend) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 4));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(0, 0),
+ this->create_entry(0, 1),
+ this->create_entry(0, 2)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
+ ASSERT_EQ(0, metadata->set_active_set(1));
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ expected_entries = {
+ this->create_entry(0, 3),
+ this->create_entry(0, 4),
+ this->create_entry(0, 5)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayMissingSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid, 14, 4));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 852));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 860));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 2, 853));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 2, 857));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 2, 854));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 2, 856));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(2, 852),
+ this->create_entry(2, 853),
+ this->create_entry(2, 854)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ ASSERT_EQ(0, this->write_entry(oid, 3, 3, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 3, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 3, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 3, 0));
+ ASSERT_TRUE(this->wait_for_entries(player, 4, &entries));
+
+ expected_entries = {
+ this->create_entry(3, 0),
+ this->create_entry(3, 1),
+ this->create_entry(3, 2),
+ this->create_entry(3, 3)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayLargeMissingSequence) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, metadata->set_active_set(2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 4, 1, 0));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(0, 0),
+ this->create_entry(0, 1),
+ this->create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayBlockedNewTag) {
+ std::string oid = this->get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ C_SaferCond ctx1;
+ cls::journal::Tag tag1;
+ metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, {}, &tag1, &ctx1);
+ ASSERT_EQ(0, ctx1.wait());
+
+ ASSERT_EQ(0, metadata->set_active_set(0));
+ ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, tag1.tid, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 2));
+ ASSERT_EQ(0, this->write_entry(oid, 0, tag1.tid, 4));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(tag1.tid, 0),
+ this->create_entry(tag1.tid, 1),
+ this->create_entry(tag1.tid, 2)};
+ ASSERT_EQ(expected_entries, entries);
+
+ journal::Entry entry;
+ uint64_t commit_tid;
+ ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
+
+ C_SaferCond ctx2;
+ cls::journal::Tag tag2;
+ metadata->allocate_tag(tag1.tag_class, {}, &tag2, &ctx2);
+ ASSERT_EQ(0, ctx2.wait());
+
+ ASSERT_EQ(0, this->write_entry(oid, 0, tag2.tid, 0));
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ expected_entries = {
+ this->create_entry(tag2.tid, 0)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayStaleEntries) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(0, 1, 0) };
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 1, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 1, 1));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
+ std::string oid = this->get_temp_oid();
+
+ journal::JournalPlayer::ObjectPositions positions = {
+ cls::journal::ObjectPosition(1, 0, 1),
+ cls::journal::ObjectPosition(0, 0, 0)};
+ cls::journal::ObjectSetPosition commit_position(positions);
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+
+ ASSERT_EQ(0, metadata->set_active_set(1));
+ ASSERT_EQ(0, this->write_entry(oid, 0, 0, 0));
+ ASSERT_EQ(0, this->write_entry(oid, 1, 0, 1));
+ ASSERT_EQ(0, this->write_entry(oid, 3, 0, 3));
+ ASSERT_EQ(0, this->write_entry(oid, 2, 1, 0));
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries = {
+ this->create_entry(1, 0)};
+ ASSERT_EQ(expected_entries, entries);
+
+ // should remove player for offset 3 after refetching
+ ASSERT_EQ(0, metadata->set_active_set(3));
+ ASSERT_EQ(0, this->write_entry(oid, 7, 1, 1));
+
+ ASSERT_TRUE(this->wait_for_entries(player, 1, &entries));
+
+ expected_entries = {
+ this->create_entry(1, 1)};
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TYPED_TEST(TestJournalPlayer, PrefechShutDown) {
+ std::string oid = this->get_temp_oid();
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, {}));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+ player->prefetch();
+}
+
+TYPED_TEST(TestJournalPlayer, LiveReplayShutDown) {
+ std::string oid = this->get_temp_oid();
+
+ ASSERT_EQ(0, this->create(oid));
+ ASSERT_EQ(0, this->client_register(oid));
+ ASSERT_EQ(0, this->client_commit(oid, {}));
+
+ journal::JournalMetadataPtr metadata = this->create_metadata(oid);
+ ASSERT_EQ(0, this->init_metadata(metadata));
+
+ journal::JournalPlayer *player = this->create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+ player->prefetch_and_watch(0.25);
+}
+