From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../image_replayer/journal/test_mock_Replayer.cc | 2162 ++++++++++++++++++++ 1 file changed, 2162 insertions(+) create mode 100644 src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc (limited to 'src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc') diff --git a/src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc b/src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc new file mode 100644 index 000000000..7c8defb3d --- /dev/null +++ b/src/test/rbd_mirror/image_replayer/journal/test_mock_Replayer.cc @@ -0,0 +1,2162 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/rbd_mirror/test_mock_fixture.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" +#include "tools/rbd_mirror/image_replayer/ReplayerListener.h" +#include "tools/rbd_mirror/image_replayer/Utils.h" +#include "tools/rbd_mirror/image_replayer/journal/Replayer.h" +#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h" +#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h" +#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h" +#include "test/journal/mock/MockJournaler.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "test/rbd_mirror/mock/MockContextWQ.h" +#include "test/rbd_mirror/mock/MockSafeTimer.h" +#include + +using namespace std::chrono_literals; + +namespace librbd { + +namespace { + +struct MockTestJournal; + +struct MockTestImageCtx : public librbd::MockImageCtx { + explicit MockTestImageCtx(librbd::ImageCtx &image_ctx, + MockTestJournal& mock_test_journal) + : librbd::MockImageCtx(image_ctx), journal(&mock_test_journal) { + } + + MockTestJournal* journal = nullptr; +}; + +struct MockTestJournal : public MockJournal { + MOCK_METHOD2(start_external_replay, void(journal::Replay **, + Context *on_start)); + MOCK_METHOD0(stop_external_replay, void()); +}; + +} // anonymous namespace + +namespace journal { + +template <> +struct TypeTraits { + typedef ::journal::MockJournaler Journaler; + typedef ::journal::MockReplayEntryProxy ReplayEntry; +}; + +template<> +struct Replay { + MOCK_METHOD2(decode, int(bufferlist::const_iterator *, EventEntry *)); + MOCK_METHOD3(process, void(const EventEntry &, Context *, Context *)); + MOCK_METHOD1(flush, void(Context*)); + MOCK_METHOD2(shut_down, void(bool, Context*)); +}; + +} // namespace journal +} // namespace librbd + +namespace boost { + +template<> +struct intrusive_ptr { + intrusive_ptr() { + } + intrusive_ptr(librbd::MockTestJournal* mock_test_journal) + : mock_test_journal(mock_test_journal) { + } + + librbd::MockTestJournal* operator->() { + return mock_test_journal; + } + + void reset() { + mock_test_journal = nullptr; + } + + const librbd::MockTestJournal* get() const { + return mock_test_journal; + } + + template + bool operator==(T* t) const { + return (mock_test_journal == t); + } + + librbd::MockTestJournal* mock_test_journal = nullptr; +}; + +} // namespace boost + +namespace rbd { +namespace mirror { + +template <> +struct Threads { + MockSafeTimer *timer; + ceph::mutex &timer_lock; + + MockContextWQ *work_queue; + + Threads(Threads* threads) + : timer(new MockSafeTimer()), + timer_lock(threads->timer_lock), + work_queue(new MockContextWQ()) { + } + ~Threads() { + delete timer; + delete work_queue; + } +}; + +namespace { + +struct MockReplayerListener : public image_replayer::ReplayerListener { + MOCK_METHOD0(handle_notification, void()); +}; + +} // anonymous namespace + +namespace image_replayer { + +template<> +struct CloseImageRequest { + static CloseImageRequest* s_instance; + librbd::MockTestImageCtx **image_ctx = nullptr; + Context *on_finish = nullptr; + + static CloseImageRequest* create(librbd::MockTestImageCtx **image_ctx, + Context *on_finish) { + ceph_assert(s_instance != nullptr); + s_instance->image_ctx = image_ctx; + s_instance->on_finish = on_finish; + return s_instance; + } + + CloseImageRequest() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + ~CloseImageRequest() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(send, void()); +}; + +CloseImageRequest* CloseImageRequest::s_instance = nullptr; + +namespace journal { + +template <> +struct EventPreprocessor { + static EventPreprocessor *s_instance; + + static EventPreprocessor *create(librbd::MockTestImageCtx &local_image_ctx, + ::journal::MockJournaler &remote_journaler, + const std::string &local_mirror_uuid, + librbd::journal::MirrorPeerClientMeta *client_meta, + MockContextWQ *work_queue) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + static void destroy(EventPreprocessor* processor) { + } + + EventPreprocessor() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + ~EventPreprocessor() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD1(is_required, bool(const librbd::journal::EventEntry &)); + MOCK_METHOD2(preprocess, void(librbd::journal::EventEntry *, Context *)); +}; + +template<> +struct ReplayStatusFormatter { + static ReplayStatusFormatter* s_instance; + + static ReplayStatusFormatter* create(::journal::MockJournaler *journaler, + const std::string &mirror_uuid) { + ceph_assert(s_instance != nullptr); + return s_instance; + } + + static void destroy(ReplayStatusFormatter* formatter) { + } + + ReplayStatusFormatter() { + ceph_assert(s_instance == nullptr); + s_instance = this; + } + + ~ReplayStatusFormatter() { + ceph_assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD1(handle_entry_processed, void(uint64_t)); + MOCK_METHOD2(get_or_send_update, bool(std::string *description, Context *on_finish)); +}; + +template<> +struct StateBuilder { + StateBuilder(librbd::MockTestImageCtx& local_image_ctx, + ::journal::MockJournaler& remote_journaler, + const librbd::journal::MirrorPeerClientMeta& remote_client_meta) + : local_image_ctx(&local_image_ctx), + remote_journaler(&remote_journaler), + remote_client_meta(remote_client_meta) { + } + + librbd::MockTestImageCtx* local_image_ctx; + std::string remote_mirror_uuid = "remote mirror uuid"; + ::journal::MockJournaler* remote_journaler = nullptr; + librbd::journal::MirrorPeerClientMeta remote_client_meta; +}; + +EventPreprocessor* EventPreprocessor::s_instance = nullptr; +ReplayStatusFormatter* ReplayStatusFormatter::s_instance = nullptr; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +#include "tools/rbd_mirror/image_replayer/journal/Replayer.cc" + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +using ::testing::_; +using ::testing::AtLeast; +using ::testing::DoAll; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::MatcherCast; +using ::testing::Return; +using ::testing::ReturnArg; +using ::testing::SaveArg; +using ::testing::SetArgPointee; +using ::testing::WithArg; + +class TestMockImageReplayerJournalReplayer : public TestMockFixture { +public: + typedef Replayer MockReplayer; + typedef EventPreprocessor MockEventPreprocessor; + typedef ReplayStatusFormatter MockReplayStatusFormatter; + typedef StateBuilder MockStateBuilder; + typedef Threads MockThreads; + typedef CloseImageRequest MockCloseImageRequest; + typedef librbd::journal::Replay MockReplay; + + void SetUp() override { + TestMockFixture::SetUp(); + + librbd::RBD rbd; + ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size)); + ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx)); + } + + bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) { + bufferlist bl; + encode(tag_data, bl); + return bl; + } + + void expect_work_queue_repeatedly(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) + .WillRepeatedly(Invoke([this](Context *ctx, int r) { + m_threads->work_queue->queue(ctx, r); + })); + } + + void expect_add_event_after_repeatedly(MockThreads &mock_threads) { + EXPECT_CALL(*mock_threads.timer, add_event_after(_, _)) + .WillRepeatedly( + DoAll(Invoke([this](double seconds, Context *ctx) { + m_threads->timer->add_event_after(seconds, ctx); + }), + ReturnArg<1>())); + EXPECT_CALL(*mock_threads.timer, cancel_event(_)) + .WillRepeatedly( + Invoke([this](Context *ctx) { + return m_threads->timer->cancel_event(ctx); + })); + } + + void expect_init(::journal::MockJournaler &mock_journaler, int r) { + EXPECT_CALL(mock_journaler, init(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_stop_replay(::journal::MockJournaler &mock_journaler, int r) { + EXPECT_CALL(mock_journaler, stop_replay(_)) + .WillOnce(CompleteContext(r)); + } + + void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) { + EXPECT_CALL(mock_replay, shut_down(cancel_ops, _)) + .WillOnce(WithArg<1>(CompleteContext(m_threads->work_queue, r))); + } + + void expect_get_cached_client(::journal::MockJournaler &mock_journaler, + const std::string& client_id, + const cls::journal::Client& client, + const librbd::journal::ClientMeta& client_meta, + int r) { + librbd::journal::ClientData client_data; + client_data.client_meta = client_meta; + + cls::journal::Client client_copy{client}; + encode(client_data, client_copy.data); + + EXPECT_CALL(mock_journaler, get_cached_client(client_id, _)) + .WillOnce(DoAll(SetArgPointee<1>(client_copy), + Return(r))); + } + + void expect_start_external_replay(librbd::MockTestJournal &mock_journal, + MockReplay *mock_replay, int r) { + EXPECT_CALL(mock_journal, start_external_replay(_, _)) + .WillOnce(DoAll(SetArgPointee<0>(mock_replay), + WithArg<1>(CompleteContext(m_threads->work_queue, r)))); + } + + void expect_is_tag_owner(librbd::MockTestJournal &mock_journal, + bool is_owner) { + EXPECT_CALL(mock_journal, is_tag_owner()).WillOnce(Return(is_owner)); + } + + void expect_is_resync_requested(librbd::MockTestJournal &mock_journal, + int r, bool resync_requested) { + EXPECT_CALL(mock_journal, is_resync_requested(_)).WillOnce( + DoAll(SetArgPointee<0>(resync_requested), + Return(r))); + } + + void expect_get_commit_tid_in_debug( + ::journal::MockReplayEntry &mock_replay_entry) { + // It is used in debug messages and depends on debug level + EXPECT_CALL(mock_replay_entry, get_commit_tid()) + .Times(AtLeast(0)) + .WillRepeatedly(Return(0)); + } + + void expect_get_tag_tid_in_debug(librbd::MockTestJournal &mock_journal) { + // It is used in debug messages and depends on debug level + EXPECT_CALL(mock_journal, get_tag_tid()).Times(AtLeast(0)) + .WillRepeatedly(Return(0)); + } + + void expect_committed(::journal::MockReplayEntry &mock_replay_entry, + ::journal::MockJournaler &mock_journaler, int times) { + EXPECT_CALL(mock_replay_entry, get_data()).Times(times); + EXPECT_CALL(mock_journaler, committed( + MatcherCast(_))) + .Times(times); + } + + void expect_try_pop_front(::journal::MockJournaler &mock_journaler, + uint64_t replay_tag_tid, bool entries_available) { + EXPECT_CALL(mock_journaler, try_pop_front(_, _)) + .WillOnce(DoAll(SetArgPointee<0>(::journal::MockReplayEntryProxy()), + SetArgPointee<1>(replay_tag_tid), + Return(entries_available))); + } + + void expect_try_pop_front_return_no_entries( + ::journal::MockJournaler &mock_journaler, Context *on_finish) { + EXPECT_CALL(mock_journaler, try_pop_front(_, _)) + .WillOnce(DoAll(Invoke([on_finish](::journal::MockReplayEntryProxy *e, + uint64_t *t) { + on_finish->complete(0); + }), + Return(false))); + } + + void expect_get_tag(::journal::MockJournaler &mock_journaler, + const cls::journal::Tag &tag, int r) { + EXPECT_CALL(mock_journaler, get_tag(_, _, _)) + .WillOnce(DoAll(SetArgPointee<1>(tag), + WithArg<2>(CompleteContext(r)))); + } + + void expect_allocate_tag(librbd::MockTestJournal &mock_journal, int r) { + EXPECT_CALL(mock_journal, allocate_tag(_, _, _)) + .WillOnce(WithArg<2>(CompleteContext(r))); + } + + void expect_preprocess(MockEventPreprocessor &mock_event_preprocessor, + bool required, int r) { + EXPECT_CALL(mock_event_preprocessor, is_required(_)) + .WillOnce(Return(required)); + if (required) { + EXPECT_CALL(mock_event_preprocessor, preprocess(_, _)) + .WillOnce(WithArg<1>(CompleteContext(r))); + } + } + + void expect_process(MockReplay &mock_replay, + int on_ready_r, int on_commit_r) { + EXPECT_CALL(mock_replay, process(_, _, _)) + .WillOnce(DoAll(WithArg<1>(CompleteContext(on_ready_r)), + WithArg<2>(CompleteContext(on_commit_r)))); + } + + void expect_flush(MockReplay& mock_replay, int r) { + EXPECT_CALL(mock_replay, flush(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_flush_commit_position(::journal::MockJournaler& mock_journal, + int r) { + EXPECT_CALL(mock_journal, flush_commit_position(_)) + .WillOnce(CompleteContext(m_threads->work_queue, r)); + } + + void expect_get_tag_data(librbd::MockTestJournal& mock_local_journal, + const librbd::journal::TagData& tag_data) { + EXPECT_CALL(mock_local_journal, get_tag_data()) + .WillOnce(Return(tag_data)); + } + + void expect_send(MockCloseImageRequest &mock_close_image_request, int r) { + EXPECT_CALL(mock_close_image_request, send()) + .WillOnce(Invoke([this, &mock_close_image_request, r]() { + *mock_close_image_request.image_ctx = nullptr; + m_threads->work_queue->queue(mock_close_image_request.on_finish, r); + })); + } + + void expect_notification(MockThreads& mock_threads, + MockReplayerListener& mock_replayer_listener) { + EXPECT_CALL(mock_replayer_listener, handle_notification()) + .WillOnce(Invoke([this]() { + std::unique_lock locker{m_lock}; + m_notified = true; + m_cond.notify_all(); + })); + } + + int wait_for_notification() { + std::unique_lock locker{m_lock}; + while (!m_notified) { + if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) { + return -ETIMEDOUT; + } + } + m_notified = false; + return 0; + } + + void expect_local_journal_add_listener( + librbd::MockTestJournal& mock_local_journal, + librbd::journal::Listener** local_journal_listener) { + EXPECT_CALL(mock_local_journal, add_listener(_)) + .WillOnce(SaveArg<0>(local_journal_listener)); + expect_is_tag_owner(mock_local_journal, false); + expect_is_resync_requested(mock_local_journal, 0, false); + } + + int init_entry_replayer(MockReplayer& mock_replayer, + MockThreads& mock_threads, + MockReplayerListener& mock_replayer_listener, + librbd::MockTestJournal& mock_local_journal, + ::journal::MockJournaler& mock_remote_journaler, + MockReplay& mock_local_journal_replay, + librbd::journal::Listener** local_journal_listener, + ::journal::ReplayHandler** remote_replay_handler, + ::journal::JournalMetadataListener** remote_journal_listener) { + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)) + .WillOnce(SaveArg<0>(remote_journal_listener)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + local_journal_listener); + EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _)) + .WillOnce(SaveArg<0>(remote_replay_handler)); + expect_notification(mock_threads, mock_replayer_listener); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + int r = init_ctx.wait(); + if (r < 0) { + return r; + } + + return wait_for_notification(); + } + + int shut_down_entry_replayer(MockReplayer& mock_replayer, + MockThreads& mock_threads, + librbd::MockTestJournal& mock_local_journal, + ::journal::MockJournaler& mock_remote_journaler, + MockReplay& mock_local_journal_replay) { + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + return shutdown_ctx.wait(); + } + + librbd::ImageCtx* m_local_image_ctx = nullptr; + + ceph::mutex m_lock = ceph::make_mutex( + "TestMockImageReplayerJournalReplayer"); + ceph::condition_variable m_cond; + bool m_notified = false; +}; + +TEST_F(TestMockImageReplayerJournalReplayer, InitShutDown) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, -EINVAL); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitRemoteJournalerGetClientError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, -EINVAL); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitNoLocalJournal) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + + mock_local_image_ctx.journal = nullptr; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitLocalJournalStartExternalReplayError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + expect_start_external_replay(mock_local_journal, nullptr, -EINVAL); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-EINVAL, init_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitIsPromoted) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + MockReplay mock_local_journal_replay; + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + EXPECT_CALL(mock_local_journal, add_listener(_)); + expect_is_tag_owner(mock_local_journal, true); + expect_notification(mock_threads, mock_replayer_listener); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(0, init_ctx.wait()); + ASSERT_EQ(0, wait_for_notification()); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitDisconnected) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect", + "false"); + + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", + {{}, {}, {}, + cls::journal::CLIENT_STATE_DISCONNECTED}, + {librbd::journal::MirrorPeerClientMeta{ + mock_local_image_ctx.id}}, 0); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-ENOTCONN, init_ctx.wait()); + ASSERT_FALSE(mock_replayer.is_resync_requested()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitDisconnectedResync) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect", + "true"); + + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", + {{}, {}, {}, + cls::journal::CLIENT_STATE_DISCONNECTED}, + {librbd::journal::MirrorPeerClientMeta{ + mock_local_image_ctx.id}}, 0); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(-ENOTCONN, init_ctx.wait()); + ASSERT_TRUE(mock_replayer.is_resync_requested()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitResyncRequested) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + MockReplay mock_local_journal_replay; + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + EXPECT_CALL(mock_local_journal, add_listener(_)); + expect_is_tag_owner(mock_local_journal, false); + expect_is_resync_requested(mock_local_journal, 0, true); + expect_notification(mock_threads, mock_replayer_listener); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(0, init_ctx.wait()); + ASSERT_EQ(0, wait_for_notification()); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, InitResyncRequestedError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + expect_init(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, add_listener(_)); + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", {}, + {librbd::journal::MirrorPeerClientMeta{}}, 0); + MockReplay mock_local_journal_replay; + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + EXPECT_CALL(mock_local_journal, add_listener(_)); + expect_is_tag_owner(mock_local_journal, false); + expect_is_resync_requested(mock_local_journal, -EINVAL, false); + expect_notification(mock_threads, mock_replayer_listener); + + C_SaferCond init_ctx; + mock_replayer.init(&init_ctx); + ASSERT_EQ(0, init_ctx.wait()); + ASSERT_EQ(0, wait_for_notification()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ShutDownLocalJournalReplayError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_shut_down(mock_local_journal_replay, true, -EINVAL); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(-EINVAL, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, CloseLocalImageError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, -EINVAL); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(-EINVAL, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, StopRemoteJournalerError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_shut_down(mock_local_journal_replay, true, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, -EPERM); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(-EPERM, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, Replay) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_add_event_after_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_committed(mock_replay_entry, mock_remote_journaler, 2); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + + // replay_flush + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + + // process + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, 0); + EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_)); + + // the next event with preprocess + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, true, 0); + expect_process(mock_local_journal_replay, 0, 0); + EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_)); + + // attempt to process the next event + C_SaferCond replay_ctx; + expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); + + // fire + remote_replay_handler->handle_entries_available(); + ASSERT_EQ(0, replay_ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, DecodeError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_add_event_after_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + + // replay_flush + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + + // process + EXPECT_CALL(mock_replay_entry, get_data()); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)) + .WillOnce(Return(-EINVAL)); + expect_notification(mock_threads, mock_replayer_listener); + + // fire + remote_replay_handler->handle_entries_available(); + wait_for_notification(); + + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, DelayedReplay) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_add_event_after_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_committed(mock_replay_entry, mock_remote_journaler, 1); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + + // replay_flush + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + + // process with delay + EXPECT_CALL(mock_replay_entry, get_data()); + librbd::journal::EventEntry event_entry( + librbd::journal::AioDiscardEvent(123, 345, 0), ceph_clock_now()); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)) + .WillOnce(DoAll(SetArgPointee<1>(event_entry), + Return(0))); + + Context* delayed_task_ctx = nullptr; + EXPECT_CALL(*mock_threads.timer, add_event_after(_, _)) + .WillOnce( + DoAll(Invoke([this, &delayed_task_ctx](double seconds, Context *ctx) { + std::unique_lock locker{m_lock}; + delayed_task_ctx = ctx; + m_cond.notify_all(); + }), + ReturnArg<1>())); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, 0); + EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_)); + + // attempt to process the next event + C_SaferCond replay_ctx; + expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); + + // fire + mock_local_image_ctx.mirroring_replay_delay = 600; + remote_replay_handler->handle_entries_available(); + { + std::unique_lock locker{m_lock}; + while (delayed_task_ctx == nullptr) { + if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) { + FAIL() << "timed out waiting for task"; + break; + } + } + } + { + std::unique_lock timer_locker{mock_threads.timer_lock}; + delayed_task_ctx->complete(0); + } + ASSERT_EQ(0, replay_ctx.wait()); + + // add a pending (delayed) entry before stop + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + C_SaferCond decode_ctx; + EXPECT_CALL(mock_local_journal_replay, decode(_, _)) + .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::const_iterator* it, + librbd::journal::EventEntry *e) { + decode_ctx.complete(0); + }), + Return(0))); + + remote_replay_handler->handle_entries_available(); + ASSERT_EQ(0, decode_ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ReplayNoMemoryError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_complete(-ENOMEM); + + wait_for_notification(); + ASSERT_EQ(false, mock_replayer.is_replaying()); + ASSERT_EQ(-ENOMEM, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, LocalJournalForcePromoted) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_notification(mock_threads, mock_replayer_listener); + local_journal_listener->handle_promoted(); + wait_for_notification(); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, LocalJournalResyncRequested) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_notification(mock_threads, mock_replayer_listener); + local_journal_listener->handle_resync(); + wait_for_notification(); + + ASSERT_TRUE(mock_replayer.is_resync_requested()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, RemoteJournalDisconnected) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + mock_local_image_ctx.config.set_val("rbd_mirroring_resync_after_disconnect", + "true"); + + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_get_cached_client(mock_remote_journaler, "local mirror uuid", + {{}, {}, {}, + cls::journal::CLIENT_STATE_DISCONNECTED}, + {librbd::journal::MirrorPeerClientMeta{ + mock_local_image_ctx.id}}, 0); + expect_notification(mock_threads, mock_replayer_listener); + + remote_journaler_listener->handle_update(nullptr); + wait_for_notification(); + + ASSERT_EQ(-ENOTCONN, mock_replayer.get_error_code()); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_TRUE(mock_replayer.is_resync_requested()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, Flush) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_flush(mock_local_journal_replay, 0); + expect_flush_commit_position(mock_remote_journaler, 0); + + C_SaferCond ctx; + mock_replayer.flush(&ctx); + ASSERT_EQ(0, ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, FlushError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_flush(mock_local_journal_replay, -EINVAL); + + C_SaferCond ctx; + mock_replayer.flush(&ctx); + ASSERT_EQ(-EINVAL, ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, FlushCommitPositionError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_flush(mock_local_journal_replay, 0); + expect_flush_commit_position(mock_remote_journaler, -EINVAL); + + C_SaferCond ctx; + mock_replayer.flush(&ctx); + ASSERT_EQ(-EINVAL, ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + + +TEST_F(TestMockImageReplayerJournalReplayer, ReplayFlushShutDownError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_try_pop_front(mock_remote_journaler, 1, true); + expect_shut_down(mock_local_journal_replay, false, -EINVAL); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ReplayFlushStartError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + expect_try_pop_front(mock_remote_journaler, 1, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, nullptr, -EINVAL); + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + MockCloseImageRequest mock_close_image_request; + expect_send(mock_close_image_request, 0); + expect_stop_replay(mock_remote_journaler, 0); + EXPECT_CALL(mock_remote_journaler, remove_listener(_)); + + C_SaferCond shutdown_ctx; + mock_replayer.shut_down(&shutdown_ctx); + ASSERT_EQ(0, shutdown_ctx.wait()); +} + +TEST_F(TestMockImageReplayerJournalReplayer, GetTagError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_work_queue_repeatedly(mock_threads); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, -EINVAL); + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagDemotion) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_notification(mock_threads, mock_replayer_listener); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_committed(mock_replay_entry, mock_remote_journaler, 1); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::ORPHAN_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_get_tag_data(mock_local_journal, {}); + expect_allocate_tag(mock_local_journal, 0); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, 0); + EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_)); + + remote_replay_handler->handle_entries_available(); + wait_for_notification(); + ASSERT_FALSE(mock_replayer.is_replaying()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, AllocateTagError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, -EINVAL); + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, PreprocessError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + EXPECT_CALL(mock_replay_entry, get_data()); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, true, -EINVAL); + + expect_notification(mock_threads, mock_replayer_listener); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ProcessError) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + EXPECT_CALL(mock_replay_entry, get_data()); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, -EINVAL); + EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_)); + + // attempt to process the next event + C_SaferCond replay_ctx; + expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); + remote_replay_handler->handle_entries_available(); + + wait_for_notification(); + ASSERT_FALSE(mock_replayer.is_replaying()); + ASSERT_EQ(-EINVAL, mock_replayer.get_error_code()); + + ASSERT_EQ(0, replay_ctx.wait()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +TEST_F(TestMockImageReplayerJournalReplayer, ImageNameUpdated) { + librbd::MockTestJournal mock_local_journal; + librbd::MockTestImageCtx mock_local_image_ctx{*m_local_image_ctx, + mock_local_journal}; + ::journal::MockJournaler mock_remote_journaler; + MockReplayerListener mock_replayer_listener; + MockThreads mock_threads{m_threads}; + MockStateBuilder mock_state_builder(mock_local_image_ctx, + mock_remote_journaler, + {}); + MockReplayer mock_replayer{ + &mock_threads, "local mirror uuid", &mock_state_builder, + &mock_replayer_listener}; + + ::journal::MockReplayEntry mock_replay_entry; + expect_work_queue_repeatedly(mock_threads); + expect_add_event_after_repeatedly(mock_threads); + expect_get_commit_tid_in_debug(mock_replay_entry); + expect_get_tag_tid_in_debug(mock_local_journal); + expect_committed(mock_replay_entry, mock_remote_journaler, 1); + expect_notification(mock_threads, mock_replayer_listener); + + InSequence seq; + + MockReplay mock_local_journal_replay; + MockEventPreprocessor mock_event_preprocessor; + MockReplayStatusFormatter mock_replay_status_formatter; + librbd::journal::Listener* local_journal_listener = nullptr; + ::journal::ReplayHandler* remote_replay_handler = nullptr; + ::journal::JournalMetadataListener* remote_journaler_listener = nullptr; + ASSERT_EQ(0, init_entry_replayer(mock_replayer, mock_threads, + mock_replayer_listener, mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay, + &local_journal_listener, + &remote_replay_handler, + &remote_journaler_listener)); + + mock_local_image_ctx.name = "NEW NAME"; + cls::journal::Tag tag = + {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID, + librbd::Journal<>::LOCAL_MIRROR_UUID, + true, 0, 0})}; + + expect_try_pop_front(mock_remote_journaler, tag.tid, true); + expect_shut_down(mock_local_journal_replay, false, 0); + EXPECT_CALL(mock_local_journal, remove_listener(_)); + EXPECT_CALL(mock_local_journal, stop_external_replay()); + expect_start_external_replay(mock_local_journal, &mock_local_journal_replay, + 0); + expect_local_journal_add_listener(mock_local_journal, + &local_journal_listener); + expect_get_tag(mock_remote_journaler, tag, 0); + expect_allocate_tag(mock_local_journal, 0); + EXPECT_CALL(mock_local_journal_replay, decode(_, _)).WillOnce(Return(0)); + expect_preprocess(mock_event_preprocessor, false, 0); + expect_process(mock_local_journal_replay, 0, 0); + EXPECT_CALL(mock_replay_status_formatter, handle_entry_processed(_)); + + // attempt to process the next event + C_SaferCond replay_ctx; + expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx); + + remote_replay_handler->handle_entries_available(); + wait_for_notification(); + + auto image_spec = util::compute_image_spec(m_local_io_ctx, "NEW NAME"); + ASSERT_EQ(image_spec, mock_replayer.get_image_spec()); + + ASSERT_EQ(0, replay_ctx.wait()); + ASSERT_TRUE(mock_replayer.is_replaying()); + + ASSERT_EQ(0, shut_down_entry_replayer(mock_replayer, mock_threads, + mock_local_journal, + mock_remote_journaler, + mock_local_journal_replay)); +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd -- cgit v1.2.3