summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd_mirror/image_replayer/journal
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/tools/rbd_mirror/image_replayer/journal
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/rbd_mirror/image_replayer/journal')
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.cc162
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h116
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.cc206
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h127
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.cc316
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h115
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc284
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h70
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/Replayer.cc1303
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/Replayer.h323
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/StateBuilder.cc149
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/StateBuilder.h94
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.cc109
-rw-r--r--src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h55
14 files changed, 3429 insertions, 0 deletions
diff --git a/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.cc b/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.cc
new file mode 100644
index 000000000..087cf4f5f
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.cc
@@ -0,0 +1,162 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "CreateLocalImageRequest.h"
+#include "include/rados/librados.hpp"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/journal/Types.h"
+#include "tools/rbd_mirror/PoolMetaCache.h"
+#include "tools/rbd_mirror/ProgressContext.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/image_replayer/CreateImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "CreateLocalImageRequest: " << this << " " \
+ << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+void CreateLocalImageRequest<I>::send() {
+ unregister_client();
+}
+
+template <typename I>
+void CreateLocalImageRequest<I>::unregister_client() {
+ dout(10) << dendl;
+ update_progress("UNREGISTER_CLIENT");
+
+ auto ctx = create_context_callback<
+ CreateLocalImageRequest<I>,
+ &CreateLocalImageRequest<I>::handle_unregister_client>(this);
+ m_state_builder->remote_journaler->unregister_client(ctx);
+}
+
+template <typename I>
+void CreateLocalImageRequest<I>::handle_unregister_client(int r) {
+ dout(10) << "r=" << r << dendl;
+ if (r < 0 && r != -ENOENT) {
+ derr << "failed to unregister with remote journal: " << cpp_strerror(r)
+ << dendl;
+ finish(r);
+ return;
+ }
+
+ m_state_builder->local_image_id = "";
+ m_state_builder->remote_client_meta = {};
+ register_client();
+}
+
+template <typename I>
+void CreateLocalImageRequest<I>::register_client() {
+ ceph_assert(m_state_builder->local_image_id.empty());
+ m_state_builder->local_image_id =
+ librbd::util::generate_image_id<I>(m_local_io_ctx);
+ dout(10) << "local_image_id=" << m_state_builder->local_image_id << dendl;
+ update_progress("REGISTER_CLIENT");
+
+ librbd::journal::MirrorPeerClientMeta client_meta{
+ m_state_builder->local_image_id};
+ client_meta.state = librbd::journal::MIRROR_PEER_STATE_SYNCING;
+
+ librbd::journal::ClientData client_data{client_meta};
+ bufferlist client_data_bl;
+ encode(client_data, client_data_bl);
+
+ auto ctx = create_context_callback<
+ CreateLocalImageRequest<I>,
+ &CreateLocalImageRequest<I>::handle_register_client>(this);
+ m_state_builder->remote_journaler->register_client(client_data_bl, ctx);
+}
+
+template <typename I>
+void CreateLocalImageRequest<I>::handle_register_client(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "failed to register with remote journal: " << cpp_strerror(r)
+ << dendl;
+ finish(r);
+ return;
+ }
+
+ m_state_builder->remote_client_state = cls::journal::CLIENT_STATE_CONNECTED;
+ m_state_builder->remote_client_meta = {m_state_builder->local_image_id};
+ m_state_builder->remote_client_meta.state =
+ librbd::journal::MIRROR_PEER_STATE_SYNCING;
+
+ create_local_image();
+}
+
+template <typename I>
+void CreateLocalImageRequest<I>::create_local_image() {
+ dout(10) << "local_image_id=" << m_state_builder->local_image_id << dendl;
+ update_progress("CREATE_LOCAL_IMAGE");
+
+ m_remote_image_ctx->image_lock.lock_shared();
+ std::string image_name = m_remote_image_ctx->name;
+ m_remote_image_ctx->image_lock.unlock_shared();
+
+ auto ctx = create_context_callback<
+ CreateLocalImageRequest<I>,
+ &CreateLocalImageRequest<I>::handle_create_local_image>(this);
+ auto request = CreateImageRequest<I>::create(
+ m_threads, m_local_io_ctx, m_global_image_id,
+ m_state_builder->remote_mirror_uuid, image_name,
+ m_state_builder->local_image_id, m_remote_image_ctx,
+ m_pool_meta_cache, cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, ctx);
+ request->send();
+}
+template <typename I>
+void CreateLocalImageRequest<I>::handle_create_local_image(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r == -EBADF) {
+ dout(5) << "image id " << m_state_builder->local_image_id << " "
+ << "already in-use" << dendl;
+ unregister_client();
+ return;
+ } else if (r < 0) {
+ if (r == -ENOENT) {
+ dout(10) << "parent image does not exist" << dendl;
+ } else {
+ derr << "failed to create local image: " << cpp_strerror(r) << dendl;
+ }
+ finish(r);
+ return;
+ }
+
+ finish(0);
+}
+
+template <typename I>
+void CreateLocalImageRequest<I>::update_progress(
+ const std::string& description) {
+ dout(15) << description << dendl;
+ if (m_progress_ctx != nullptr) {
+ m_progress_ctx->update_progress(description);
+ }
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::CreateLocalImageRequest<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h b/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h
new file mode 100644
index 000000000..fc776ecc3
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h
@@ -0,0 +1,116 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_CREATE_LOCAL_IMAGE_REQUEST_H
+#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_CREATE_LOCAL_IMAGE_REQUEST_H
+
+#include "include/rados/librados_fwd.hpp"
+#include "tools/rbd_mirror/BaseRequest.h"
+#include <string>
+
+struct Context;
+namespace journal { class Journaler; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+class PoolMetaCache;
+class ProgressContext;
+template <typename> struct Threads;
+
+namespace image_replayer {
+namespace journal {
+
+template <typename> class StateBuilder;
+
+template <typename ImageCtxT>
+class CreateLocalImageRequest : public BaseRequest {
+public:
+ typedef rbd::mirror::ProgressContext ProgressContext;
+
+ static CreateLocalImageRequest* create(
+ Threads<ImageCtxT>* threads,
+ librados::IoCtx& local_io_ctx,
+ ImageCtxT* remote_image_ctx,
+ const std::string& global_image_id,
+ PoolMetaCache* pool_meta_cache,
+ ProgressContext* progress_ctx,
+ StateBuilder<ImageCtxT>* state_builder,
+ Context* on_finish) {
+ return new CreateLocalImageRequest(threads, local_io_ctx, remote_image_ctx,
+ global_image_id, pool_meta_cache,
+ progress_ctx, state_builder, on_finish);
+ }
+
+ CreateLocalImageRequest(
+ Threads<ImageCtxT>* threads,
+ librados::IoCtx& local_io_ctx,
+ ImageCtxT* remote_image_ctx,
+ const std::string& global_image_id,
+ PoolMetaCache* pool_meta_cache,
+ ProgressContext* progress_ctx,
+ StateBuilder<ImageCtxT>* state_builder,
+ Context* on_finish)
+ : BaseRequest(on_finish),
+ m_threads(threads),
+ m_local_io_ctx(local_io_ctx),
+ m_remote_image_ctx(remote_image_ctx),
+ m_global_image_id(global_image_id),
+ m_pool_meta_cache(pool_meta_cache),
+ m_progress_ctx(progress_ctx),
+ m_state_builder(state_builder) {
+ }
+
+ void send();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * UNREGISTER_CLIENT < * * * * * * * *
+ * | *
+ * v *
+ * REGISTER_CLIENT *
+ * | *
+ * v (id exists) *
+ * CREATE_LOCAL_IMAGE * * * * * * * * *
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+ Threads<ImageCtxT>* m_threads;
+ librados::IoCtx& m_local_io_ctx;
+ ImageCtxT* m_remote_image_ctx;
+ std::string m_global_image_id;
+ PoolMetaCache* m_pool_meta_cache;
+ ProgressContext* m_progress_ctx;
+ StateBuilder<ImageCtxT>* m_state_builder;
+
+ void unregister_client();
+ void handle_unregister_client(int r);
+
+ void register_client();
+ void handle_register_client(int r);
+
+ void create_local_image();
+ void handle_create_local_image(int r);
+
+ void update_progress(const std::string& description);
+
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::CreateLocalImageRequest<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_CREATE_LOCAL_IMAGE_REQUEST_H
diff --git a/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.cc b/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.cc
new file mode 100644
index 000000000..f5d49048e
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.cc
@@ -0,0 +1,206 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "EventPreprocessor.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/journal/Types.h"
+#include <boost/variant.hpp>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "EventPreprocessor: " << this << " " << __func__ \
+ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+using librbd::util::create_context_callback;
+
+template <typename I>
+EventPreprocessor<I>::EventPreprocessor(I &local_image_ctx,
+ Journaler &remote_journaler,
+ const std::string &local_mirror_uuid,
+ MirrorPeerClientMeta *client_meta,
+ librbd::asio::ContextWQ *work_queue)
+ : m_local_image_ctx(local_image_ctx), m_remote_journaler(remote_journaler),
+ m_local_mirror_uuid(local_mirror_uuid), m_client_meta(client_meta),
+ m_work_queue(work_queue) {
+}
+
+template <typename I>
+EventPreprocessor<I>::~EventPreprocessor() {
+ ceph_assert(!m_in_progress);
+}
+
+template <typename I>
+bool EventPreprocessor<I>::is_required(const EventEntry &event_entry) {
+ SnapSeqs snap_seqs(m_client_meta->snap_seqs);
+ return (prune_snap_map(&snap_seqs) ||
+ event_entry.get_event_type() ==
+ librbd::journal::EVENT_TYPE_SNAP_RENAME);
+}
+
+template <typename I>
+void EventPreprocessor<I>::preprocess(EventEntry *event_entry,
+ Context *on_finish) {
+ ceph_assert(!m_in_progress);
+ m_in_progress = true;
+ m_event_entry = event_entry;
+ m_on_finish = on_finish;
+
+ refresh_image();
+}
+
+template <typename I>
+void EventPreprocessor<I>::refresh_image() {
+ dout(20) << dendl;
+
+ Context *ctx = create_context_callback<
+ EventPreprocessor<I>, &EventPreprocessor<I>::handle_refresh_image>(this);
+ m_local_image_ctx.state->refresh(ctx);
+}
+
+template <typename I>
+void EventPreprocessor<I>::handle_refresh_image(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "error encountered during image refresh: " << cpp_strerror(r)
+ << dendl;
+ finish(r);
+ return;
+ }
+
+ preprocess_event();
+}
+
+template <typename I>
+void EventPreprocessor<I>::preprocess_event() {
+ dout(20) << dendl;
+
+ m_snap_seqs = m_client_meta->snap_seqs;
+ m_snap_seqs_updated = prune_snap_map(&m_snap_seqs);
+
+ int r = boost::apply_visitor(PreprocessEventVisitor(this),
+ m_event_entry->event);
+ if (r < 0) {
+ finish(r);
+ return;
+ }
+
+ update_client();
+}
+
+template <typename I>
+int EventPreprocessor<I>::preprocess_snap_rename(
+ librbd::journal::SnapRenameEvent &event) {
+ dout(20) << "remote_snap_id=" << event.snap_id << ", "
+ << "src_snap_name=" << event.src_snap_name << ", "
+ << "dest_snap_name=" << event.dst_snap_name << dendl;
+
+ auto snap_seq_it = m_snap_seqs.find(event.snap_id);
+ if (snap_seq_it != m_snap_seqs.end()) {
+ dout(20) << "remapping remote snap id " << snap_seq_it->first << " "
+ << "to local snap id " << snap_seq_it->second << dendl;
+ event.snap_id = snap_seq_it->second;
+ return 0;
+ }
+
+ auto snap_id_it = m_local_image_ctx.snap_ids.find({cls::rbd::UserSnapshotNamespace(),
+ event.src_snap_name});
+ if (snap_id_it == m_local_image_ctx.snap_ids.end()) {
+ dout(20) << "cannot map remote snapshot '" << event.src_snap_name << "' "
+ << "to local snapshot" << dendl;
+ event.snap_id = CEPH_NOSNAP;
+ return -ENOENT;
+ }
+
+ dout(20) << "mapping remote snap id " << event.snap_id << " "
+ << "to local snap id " << snap_id_it->second << dendl;
+ m_snap_seqs_updated = true;
+ m_snap_seqs[event.snap_id] = snap_id_it->second;
+ event.snap_id = snap_id_it->second;
+ return 0;
+}
+
+template <typename I>
+void EventPreprocessor<I>::update_client() {
+ if (!m_snap_seqs_updated) {
+ finish(0);
+ return;
+ }
+
+ dout(20) << dendl;
+ librbd::journal::MirrorPeerClientMeta client_meta(*m_client_meta);
+ client_meta.snap_seqs = m_snap_seqs;
+
+ librbd::journal::ClientData client_data(client_meta);
+ bufferlist data_bl;
+ encode(client_data, data_bl);
+
+ Context *ctx = create_context_callback<
+ EventPreprocessor<I>, &EventPreprocessor<I>::handle_update_client>(
+ this);
+ m_remote_journaler.update_client(data_bl, ctx);
+}
+
+template <typename I>
+void EventPreprocessor<I>::handle_update_client(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "failed to update mirror peer journal client: "
+ << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ m_client_meta->snap_seqs = m_snap_seqs;
+ finish(0);
+}
+
+template <typename I>
+bool EventPreprocessor<I>::prune_snap_map(SnapSeqs *snap_seqs) {
+ bool pruned = false;
+
+ std::shared_lock image_locker{m_local_image_ctx.image_lock};
+ for (auto it = snap_seqs->begin(); it != snap_seqs->end(); ) {
+ auto current_it(it++);
+ if (m_local_image_ctx.snap_info.count(current_it->second) == 0) {
+ snap_seqs->erase(current_it);
+ pruned = true;
+ }
+ }
+ return pruned;
+}
+
+template <typename I>
+void EventPreprocessor<I>::finish(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = m_on_finish;
+ m_on_finish = nullptr;
+ m_event_entry = nullptr;
+ m_in_progress = false;
+ m_snap_seqs_updated = false;
+ m_work_queue->queue(on_finish, r);
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::EventPreprocessor<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h b/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h
new file mode 100644
index 000000000..12f70eb93
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h
@@ -0,0 +1,127 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_EVENT_PREPROCESSOR_H
+#define RBD_MIRROR_IMAGE_REPLAYER_EVENT_PREPROCESSOR_H
+
+#include "include/int_types.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+#include <map>
+#include <string>
+#include <boost/variant/static_visitor.hpp>
+
+struct Context;
+namespace journal { class Journaler; }
+namespace librbd {
+class ImageCtx;
+namespace asio { struct ContextWQ; }
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class EventPreprocessor {
+public:
+ using Journaler = typename librbd::journal::TypeTraits<ImageCtxT>::Journaler;
+ using EventEntry = librbd::journal::EventEntry;
+ using MirrorPeerClientMeta = librbd::journal::MirrorPeerClientMeta;
+
+ static EventPreprocessor *create(ImageCtxT &local_image_ctx,
+ Journaler &remote_journaler,
+ const std::string &local_mirror_uuid,
+ MirrorPeerClientMeta *client_meta,
+ librbd::asio::ContextWQ *work_queue) {
+ return new EventPreprocessor(local_image_ctx, remote_journaler,
+ local_mirror_uuid, client_meta, work_queue);
+ }
+
+ static void destroy(EventPreprocessor* processor) {
+ delete processor;
+ }
+
+ EventPreprocessor(ImageCtxT &local_image_ctx, Journaler &remote_journaler,
+ const std::string &local_mirror_uuid,
+ MirrorPeerClientMeta *client_meta,
+ librbd::asio::ContextWQ *work_queue);
+ ~EventPreprocessor();
+
+ bool is_required(const EventEntry &event_entry);
+ void preprocess(EventEntry *event_entry, Context *on_finish);
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v (skip if not required)
+ * REFRESH_IMAGE
+ * |
+ * v (skip if not required)
+ * PREPROCESS_EVENT
+ * |
+ * v (skip if not required)
+ * UPDATE_CLIENT
+ *
+ * @endverbatim
+ */
+
+ typedef std::map<uint64_t, uint64_t> SnapSeqs;
+
+ class PreprocessEventVisitor : public boost::static_visitor<int> {
+ public:
+ EventPreprocessor *event_preprocessor;
+
+ PreprocessEventVisitor(EventPreprocessor *event_preprocessor)
+ : event_preprocessor(event_preprocessor) {
+ }
+
+ template <typename T>
+ inline int operator()(T&) const {
+ return 0;
+ }
+ inline int operator()(librbd::journal::SnapRenameEvent &event) const {
+ return event_preprocessor->preprocess_snap_rename(event);
+ }
+ };
+
+ ImageCtxT &m_local_image_ctx;
+ Journaler &m_remote_journaler;
+ std::string m_local_mirror_uuid;
+ MirrorPeerClientMeta *m_client_meta;
+ librbd::asio::ContextWQ *m_work_queue;
+
+ bool m_in_progress = false;
+ EventEntry *m_event_entry = nullptr;
+ Context *m_on_finish = nullptr;
+
+ SnapSeqs m_snap_seqs;
+ bool m_snap_seqs_updated = false;
+
+ bool prune_snap_map(SnapSeqs *snap_seqs);
+
+ void refresh_image();
+ void handle_refresh_image(int r);
+
+ void preprocess_event();
+ int preprocess_snap_rename(librbd::journal::SnapRenameEvent &event);
+
+ void update_client();
+ void handle_update_client(int r);
+
+ void finish(int r);
+
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::EventPreprocessor<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_EVENT_PREPROCESSOR_H
diff --git a/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.cc b/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.cc
new file mode 100644
index 000000000..c8a96a4ad
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.cc
@@ -0,0 +1,316 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "PrepareReplayRequest.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "tools/rbd_mirror/ProgressContext.h"
+#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "PrepareReplayRequest: " << this << " " \
+ << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+using librbd::util::create_context_callback;
+
+template <typename I>
+void PrepareReplayRequest<I>::send() {
+ *m_resync_requested = false;
+ *m_syncing = false;
+
+ if (m_state_builder->local_image_id !=
+ m_state_builder->remote_client_meta.image_id) {
+ // somehow our local image has a different image id than the image id
+ // registered in the remote image
+ derr << "split-brain detected: local_image_id="
+ << m_state_builder->local_image_id << ", "
+ << "registered local_image_id="
+ << m_state_builder->remote_client_meta.image_id << dendl;
+ finish(-EEXIST);
+ return;
+ }
+
+ std::shared_lock image_locker(m_state_builder->local_image_ctx->image_lock);
+ if (m_state_builder->local_image_ctx->journal == nullptr) {
+ image_locker.unlock();
+
+ derr << "local image does not support journaling" << dendl;
+ finish(-EINVAL);
+ return;
+ }
+
+ int r = m_state_builder->local_image_ctx->journal->is_resync_requested(
+ m_resync_requested);
+ if (r < 0) {
+ image_locker.unlock();
+
+ derr << "failed to check if a resync was requested" << dendl;
+ finish(r);
+ return;
+ }
+
+ m_local_tag_tid = m_state_builder->local_image_ctx->journal->get_tag_tid();
+ m_local_tag_data = m_state_builder->local_image_ctx->journal->get_tag_data();
+ dout(10) << "local tag=" << m_local_tag_tid << ", "
+ << "local tag data=" << m_local_tag_data << dendl;
+ image_locker.unlock();
+
+ if (*m_resync_requested) {
+ finish(0);
+ return;
+ } else if (m_state_builder->remote_client_meta.state ==
+ librbd::journal::MIRROR_PEER_STATE_SYNCING &&
+ m_local_tag_data.mirror_uuid ==
+ m_state_builder->remote_mirror_uuid) {
+ // if the initial sync hasn't completed, we cannot replay
+ *m_syncing = true;
+ finish(0);
+ return;
+ }
+
+ update_client_state();
+}
+
+template <typename I>
+void PrepareReplayRequest<I>::update_client_state() {
+ if (m_state_builder->remote_client_meta.state !=
+ librbd::journal::MIRROR_PEER_STATE_SYNCING ||
+ m_local_tag_data.mirror_uuid == m_state_builder->remote_mirror_uuid) {
+ get_remote_tag_class();
+ return;
+ }
+
+ // our local image is not primary, is flagged as syncing on the remote side,
+ // but is no longer tied to the remote -- this implies we were forced
+ // promoted and then demoted at some point
+ dout(15) << dendl;
+ update_progress("UPDATE_CLIENT_STATE");
+
+ auto client_meta = m_state_builder->remote_client_meta;
+ client_meta.state = librbd::journal::MIRROR_PEER_STATE_REPLAYING;
+
+ librbd::journal::ClientData client_data(client_meta);
+ bufferlist data_bl;
+ encode(client_data, data_bl);
+
+ auto ctx = create_context_callback<
+ PrepareReplayRequest<I>,
+ &PrepareReplayRequest<I>::handle_update_client_state>(this);
+ m_state_builder->remote_journaler->update_client(data_bl, ctx);
+}
+
+template <typename I>
+void PrepareReplayRequest<I>::handle_update_client_state(int r) {
+ dout(15) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to update client: " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ m_state_builder->remote_client_meta.state =
+ librbd::journal::MIRROR_PEER_STATE_REPLAYING;
+ get_remote_tag_class();
+}
+
+template <typename I>
+void PrepareReplayRequest<I>::get_remote_tag_class() {
+ dout(10) << dendl;
+ update_progress("GET_REMOTE_TAG_CLASS");
+
+ auto ctx = create_context_callback<
+ PrepareReplayRequest<I>,
+ &PrepareReplayRequest<I>::handle_get_remote_tag_class>(this);
+ m_state_builder->remote_journaler->get_client(
+ librbd::Journal<>::IMAGE_CLIENT_ID, &m_client, ctx);
+}
+
+template <typename I>
+void PrepareReplayRequest<I>::handle_get_remote_tag_class(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "failed to retrieve remote client: " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ librbd::journal::ClientData client_data;
+ auto it = m_client.data.cbegin();
+ try {
+ decode(client_data, it);
+ } catch (const buffer::error &err) {
+ derr << "failed to decode remote client meta data: " << err.what()
+ << dendl;
+ finish(-EBADMSG);
+ return;
+ }
+
+ librbd::journal::ImageClientMeta *client_meta =
+ boost::get<librbd::journal::ImageClientMeta>(&client_data.client_meta);
+ if (client_meta == nullptr) {
+ derr << "unknown remote client registration" << dendl;
+ finish(-EINVAL);
+ return;
+ }
+
+ m_remote_tag_class = client_meta->tag_class;
+ dout(10) << "remote tag class=" << m_remote_tag_class << dendl;
+
+ get_remote_tags();
+}
+
+template <typename I>
+void PrepareReplayRequest<I>::get_remote_tags() {
+ dout(10) << dendl;
+ update_progress("GET_REMOTE_TAGS");
+
+ auto ctx = create_context_callback<
+ PrepareReplayRequest<I>,
+ &PrepareReplayRequest<I>::handle_get_remote_tags>(this);
+ m_state_builder->remote_journaler->get_tags(m_remote_tag_class,
+ &m_remote_tags, ctx);
+}
+
+template <typename I>
+void PrepareReplayRequest<I>::handle_get_remote_tags(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "failed to retrieve remote tags: " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ // At this point, the local image was existing, non-primary, and replaying;
+ // and the remote image is primary. Attempt to link the local image's most
+ // recent tag to the remote image's tag chain.
+ bool remote_tag_data_valid = false;
+ librbd::journal::TagData remote_tag_data;
+ boost::optional<uint64_t> remote_orphan_tag_tid =
+ boost::make_optional<uint64_t>(false, 0U);
+ bool reconnect_orphan = false;
+
+ // decode the remote tags
+ for (auto &remote_tag : m_remote_tags) {
+ if (m_local_tag_data.predecessor.commit_valid &&
+ m_local_tag_data.predecessor.mirror_uuid ==
+ m_state_builder->remote_mirror_uuid &&
+ m_local_tag_data.predecessor.tag_tid > remote_tag.tid) {
+ dout(10) << "skipping processed predecessor remote tag "
+ << remote_tag.tid << dendl;
+ continue;
+ }
+
+ try {
+ auto it = remote_tag.data.cbegin();
+ decode(remote_tag_data, it);
+ remote_tag_data_valid = true;
+ } catch (const buffer::error &err) {
+ derr << "failed to decode remote tag " << remote_tag.tid << ": "
+ << err.what() << dendl;
+ finish(-EBADMSG);
+ return;
+ }
+
+ dout(10) << "decoded remote tag " << remote_tag.tid << ": "
+ << remote_tag_data << dendl;
+
+ if (!m_local_tag_data.predecessor.commit_valid) {
+ // newly synced local image (no predecessor) replays from the first tag
+ if (remote_tag_data.mirror_uuid != librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ dout(10) << "skipping non-primary remote tag" << dendl;
+ continue;
+ }
+
+ dout(10) << "using initial primary remote tag" << dendl;
+ break;
+ }
+
+ if (m_local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
+ // demotion last available local epoch
+
+ if (remote_tag_data.mirror_uuid == m_local_tag_data.mirror_uuid &&
+ remote_tag_data.predecessor.commit_valid &&
+ remote_tag_data.predecessor.tag_tid ==
+ m_local_tag_data.predecessor.tag_tid) {
+ // demotion matches remote epoch
+
+ if (remote_tag_data.predecessor.mirror_uuid == m_local_mirror_uuid &&
+ m_local_tag_data.predecessor.mirror_uuid ==
+ librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ // local demoted and remote has matching event
+ dout(10) << "found matching local demotion tag" << dendl;
+ remote_orphan_tag_tid = remote_tag.tid;
+ continue;
+ }
+
+ if (m_local_tag_data.predecessor.mirror_uuid ==
+ m_state_builder->remote_mirror_uuid &&
+ remote_tag_data.predecessor.mirror_uuid ==
+ librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ // remote demoted and local has matching event
+ dout(10) << "found matching remote demotion tag" << dendl;
+ remote_orphan_tag_tid = remote_tag.tid;
+ continue;
+ }
+ }
+
+ if (remote_tag_data.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID &&
+ remote_tag_data.predecessor.mirror_uuid ==
+ librbd::Journal<>::ORPHAN_MIRROR_UUID &&
+ remote_tag_data.predecessor.commit_valid && remote_orphan_tag_tid &&
+ remote_tag_data.predecessor.tag_tid == *remote_orphan_tag_tid) {
+ // remote promotion tag chained to remote/local demotion tag
+ dout(10) << "found chained remote promotion tag" << dendl;
+ reconnect_orphan = true;
+ break;
+ }
+
+ // promotion must follow demotion
+ remote_orphan_tag_tid = boost::none;
+ }
+ }
+
+ if (remote_tag_data_valid &&
+ m_local_tag_data.mirror_uuid == m_state_builder->remote_mirror_uuid) {
+ dout(10) << "local image is in clean replay state" << dendl;
+ } else if (reconnect_orphan) {
+ dout(10) << "remote image was demoted/promoted" << dendl;
+ } else {
+ derr << "split-brain detected -- skipping image replay" << dendl;
+ finish(-EEXIST);
+ return;
+ }
+
+ finish(0);
+}
+
+template <typename I>
+void PrepareReplayRequest<I>::update_progress(const std::string &description) {
+ dout(10) << description << dendl;
+
+ if (m_progress_ctx != nullptr) {
+ m_progress_ctx->update_progress(description);
+ }
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::PrepareReplayRequest<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h b/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h
new file mode 100644
index 000000000..2b6fb659b
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h
@@ -0,0 +1,115 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_PREPARE_REPLAY_REQUEST_H
+#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_PREPARE_REPLAY_REQUEST_H
+
+#include "include/int_types.h"
+#include "cls/journal/cls_journal_types.h"
+#include "librbd/journal/Types.h"
+#include "librbd/mirror/Types.h"
+#include "tools/rbd_mirror/BaseRequest.h"
+#include <list>
+#include <string>
+
+struct Context;
+namespace librbd { struct ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+class ProgressContext;
+
+namespace image_replayer {
+namespace journal {
+
+template <typename> class StateBuilder;
+
+template <typename ImageCtxT>
+class PrepareReplayRequest : public BaseRequest {
+public:
+ static PrepareReplayRequest* create(
+ const std::string& local_mirror_uuid,
+ ProgressContext* progress_ctx,
+ StateBuilder<ImageCtxT>* state_builder,
+ bool* resync_requested,
+ bool* syncing,
+ Context* on_finish) {
+ return new PrepareReplayRequest(
+ local_mirror_uuid, progress_ctx, state_builder, resync_requested,
+ syncing, on_finish);
+ }
+
+ PrepareReplayRequest(
+ const std::string& local_mirror_uuid,
+ ProgressContext* progress_ctx,
+ StateBuilder<ImageCtxT>* state_builder,
+ bool* resync_requested,
+ bool* syncing,
+ Context* on_finish)
+ : BaseRequest(on_finish),
+ m_local_mirror_uuid(local_mirror_uuid),
+ m_progress_ctx(progress_ctx),
+ m_state_builder(state_builder),
+ m_resync_requested(resync_requested),
+ m_syncing(syncing) {
+ }
+
+ void send() override;
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * UPDATE_CLIENT_STATE
+ * |
+ * v
+ * GET_REMOTE_TAG_CLASS
+ * |
+ * v
+ * GET_REMOTE_TAGS
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+ typedef std::list<cls::journal::Tag> Tags;
+
+ std::string m_local_mirror_uuid;
+ ProgressContext* m_progress_ctx;
+ StateBuilder<ImageCtxT>* m_state_builder;
+ bool* m_resync_requested;
+ bool* m_syncing;
+
+ uint64_t m_local_tag_tid = 0;
+ librbd::journal::TagData m_local_tag_data;
+
+ uint64_t m_remote_tag_class = 0;
+ Tags m_remote_tags;
+ cls::journal::Client m_client;
+
+ void update_client_state();
+ void handle_update_client_state(int r);
+
+ void get_remote_tag_class();
+ void handle_get_remote_tag_class(int r);
+
+ void get_remote_tags();
+ void handle_get_remote_tags(int r);
+
+ void update_progress(const std::string& description);
+
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::PrepareReplayRequest<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_PREPARE_REPLAY_REQUEST_H
diff --git a/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc
new file mode 100644
index 000000000..eb99d5add
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc
@@ -0,0 +1,284 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ReplayStatusFormatter.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "json_spirit/json_spirit.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "ReplayStatusFormatter: " << this << " " \
+ << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+using librbd::util::unique_lock_name;
+
+namespace {
+
+double round_to_two_places(double value) {
+ return abs(round(value * 100) / 100);
+}
+
+json_spirit::mObject to_json_object(
+ const cls::journal::ObjectPosition& position) {
+ json_spirit::mObject object;
+ if (position != cls::journal::ObjectPosition{}) {
+ object["object_number"] = position.object_number;
+ object["tag_tid"] = position.tag_tid;
+ object["entry_tid"] = position.entry_tid;
+ }
+ return object;
+}
+
+} // anonymous namespace
+
+template <typename I>
+ReplayStatusFormatter<I>::ReplayStatusFormatter(Journaler *journaler,
+ const std::string &mirror_uuid)
+ : m_journaler(journaler),
+ m_mirror_uuid(mirror_uuid),
+ m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) {
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::handle_entry_processed(uint32_t bytes) {
+ dout(20) << dendl;
+
+ m_bytes_per_second(bytes);
+ m_entries_per_second(1);
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::get_or_send_update(std::string *description,
+ Context *on_finish) {
+ dout(20) << dendl;
+
+ bool in_progress = false;
+ {
+ std::lock_guard locker{m_lock};
+ if (m_on_finish) {
+ in_progress = true;
+ } else {
+ m_on_finish = on_finish;
+ }
+ }
+
+ if (in_progress) {
+ dout(10) << "previous request is still in progress, ignoring" << dendl;
+ on_finish->complete(-EAGAIN);
+ return false;
+ }
+
+ m_master_position = cls::journal::ObjectPosition();
+ m_mirror_position = cls::journal::ObjectPosition();
+
+ cls::journal::Client master_client, mirror_client;
+ int r;
+
+ r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID,
+ &master_client);
+ if (r < 0) {
+ derr << "error retrieving registered master client: "
+ << cpp_strerror(r) << dendl;
+ } else {
+ r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client);
+ if (r < 0) {
+ derr << "error retrieving registered mirror client: "
+ << cpp_strerror(r) << dendl;
+ }
+ }
+
+ if (!master_client.commit_position.object_positions.empty()) {
+ m_master_position =
+ *(master_client.commit_position.object_positions.begin());
+ }
+
+ if (!mirror_client.commit_position.object_positions.empty()) {
+ m_mirror_position =
+ *(mirror_client.commit_position.object_positions.begin());
+ }
+
+ if (!calculate_behind_master_or_send_update()) {
+ dout(20) << "need to update tag cache" << dendl;
+ return false;
+ }
+
+ format(description);
+
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_on_finish == on_finish);
+ m_on_finish = nullptr;
+ }
+
+ on_finish->complete(-EEXIST);
+ return true;
+}
+
+template <typename I>
+bool ReplayStatusFormatter<I>::calculate_behind_master_or_send_update() {
+ dout(20) << "m_master_position=" << m_master_position
+ << ", m_mirror_position=" << m_mirror_position << dendl;
+
+ m_entries_behind_master = 0;
+
+ if (m_master_position == cls::journal::ObjectPosition() ||
+ m_master_position.tag_tid < m_mirror_position.tag_tid) {
+ return true;
+ }
+
+ cls::journal::ObjectPosition master = m_master_position;
+ uint64_t mirror_tag_tid = m_mirror_position.tag_tid;
+
+ while (master.tag_tid > mirror_tag_tid) {
+ auto tag_it = m_tag_cache.find(master.tag_tid);
+ if (tag_it == m_tag_cache.end()) {
+ send_update_tag_cache(master.tag_tid, mirror_tag_tid);
+ return false;
+ }
+ librbd::journal::TagData &tag_data = tag_it->second;
+ m_entries_behind_master += master.entry_tid;
+ master = {0, tag_data.predecessor.tag_tid, tag_data.predecessor.entry_tid};
+ }
+ if (master.tag_tid == mirror_tag_tid &&
+ master.entry_tid > m_mirror_position.entry_tid) {
+ m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid;
+ }
+
+ dout(20) << "clearing tags not needed any more (below mirror position)"
+ << dendl;
+
+ uint64_t tag_tid = mirror_tag_tid;
+ size_t old_size = m_tag_cache.size();
+ while (tag_tid != 0) {
+ auto tag_it = m_tag_cache.find(tag_tid);
+ if (tag_it == m_tag_cache.end()) {
+ break;
+ }
+ librbd::journal::TagData &tag_data = tag_it->second;
+
+ dout(20) << "erasing tag " << tag_data << "for tag_tid " << tag_tid
+ << dendl;
+
+ tag_tid = tag_data.predecessor.tag_tid;
+ m_tag_cache.erase(tag_it);
+ }
+
+ dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl;
+
+ return true;
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::send_update_tag_cache(uint64_t master_tag_tid,
+ uint64_t mirror_tag_tid) {
+ if (master_tag_tid <= mirror_tag_tid ||
+ m_tag_cache.find(master_tag_tid) != m_tag_cache.end()) {
+ Context *on_finish = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+ std::swap(m_on_finish, on_finish);
+ }
+
+ ceph_assert(on_finish);
+ on_finish->complete(0);
+ return;
+ }
+
+ dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
+ << mirror_tag_tid << dendl;
+
+ auto ctx = new LambdaContext(
+ [this, master_tag_tid, mirror_tag_tid](int r) {
+ handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
+ });
+ m_journaler->get_tag(master_tag_tid, &m_tag, ctx);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::handle_update_tag_cache(uint64_t master_tag_tid,
+ uint64_t mirror_tag_tid,
+ int r) {
+ librbd::journal::TagData tag_data;
+
+ if (r < 0) {
+ derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r)
+ << dendl;
+ } else {
+ dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl;
+
+ auto it = m_tag.data.cbegin();
+ try {
+ decode(tag_data, it);
+ } catch (const buffer::error &err) {
+ derr << "error decoding tag " << master_tag_tid << ": " << err.what()
+ << dendl;
+ }
+ }
+
+ if (tag_data.predecessor.mirror_uuid !=
+ librbd::Journal<>::LOCAL_MIRROR_UUID &&
+ tag_data.predecessor.mirror_uuid !=
+ librbd::Journal<>::ORPHAN_MIRROR_UUID) {
+ dout(20) << "hit remote image non-primary epoch" << dendl;
+ tag_data.predecessor = {};
+ }
+
+ dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl;
+
+ m_tag_cache[master_tag_tid] = tag_data;
+ send_update_tag_cache(tag_data.predecessor.tag_tid, mirror_tag_tid);
+}
+
+template <typename I>
+void ReplayStatusFormatter<I>::format(std::string *description) {
+ dout(20) << "m_master_position=" << m_master_position
+ << ", m_mirror_position=" << m_mirror_position
+ << ", m_entries_behind_master=" << m_entries_behind_master << dendl;
+
+ json_spirit::mObject root_obj;
+ root_obj["primary_position"] = to_json_object(m_master_position);
+ root_obj["non_primary_position"] = to_json_object(m_mirror_position);
+ root_obj["entries_behind_primary"] = (
+ m_entries_behind_master > 0 ? m_entries_behind_master : 0);
+
+ m_bytes_per_second(0);
+ root_obj["bytes_per_second"] = round_to_two_places(
+ m_bytes_per_second.get_average());
+
+ m_entries_per_second(0);
+ auto entries_per_second = m_entries_per_second.get_average();
+ root_obj["entries_per_second"] = round_to_two_places(entries_per_second);
+
+ if (m_entries_behind_master > 0 && entries_per_second > 0) {
+ std::uint64_t seconds_until_synced = round_to_two_places(
+ m_entries_behind_master / entries_per_second);
+ if (seconds_until_synced >= std::numeric_limits<uint64_t>::max()) {
+ seconds_until_synced = std::numeric_limits<uint64_t>::max();
+ }
+
+ root_obj["seconds_until_synced"] = seconds_until_synced;
+ }
+
+ *description = json_spirit::write(
+ root_obj, json_spirit::remove_trailing_zeros);
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h
new file mode 100644
index 000000000..5dbbfe10d
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h
@@ -0,0 +1,70 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
+
+#include "include/Context.h"
+#include "common/ceph_mutex.h"
+#include "cls/journal/cls_journal_types.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+#include "tools/rbd_mirror/image_replayer/TimeRollingMean.h"
+
+namespace journal { class Journaler; }
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ReplayStatusFormatter {
+public:
+ typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+
+ static ReplayStatusFormatter* create(Journaler *journaler,
+ const std::string &mirror_uuid) {
+ return new ReplayStatusFormatter(journaler, mirror_uuid);
+ }
+
+ static void destroy(ReplayStatusFormatter* formatter) {
+ delete formatter;
+ }
+
+ ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid);
+
+ void handle_entry_processed(uint32_t bytes);
+
+ bool get_or_send_update(std::string *description, Context *on_finish);
+
+private:
+ Journaler *m_journaler;
+ std::string m_mirror_uuid;
+ ceph::mutex m_lock;
+ Context *m_on_finish = nullptr;
+ cls::journal::ObjectPosition m_master_position;
+ cls::journal::ObjectPosition m_mirror_position;
+ int64_t m_entries_behind_master = 0;
+ cls::journal::Tag m_tag;
+ std::map<uint64_t, librbd::journal::TagData> m_tag_cache;
+
+ TimeRollingMean m_bytes_per_second;
+ TimeRollingMean m_entries_per_second;
+
+ bool calculate_behind_master_or_send_update();
+ void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid);
+ void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid,
+ int r);
+ void format(std::string *description);
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H
diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc
new file mode 100644
index 000000000..3ce9104d2
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc
@@ -0,0 +1,1303 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Replayer.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/journal/Replay.h"
+#include "journal/Journaler.h"
+#include "journal/JournalMetadataListener.h"
+#include "journal/ReplayHandler.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Types.h"
+#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h"
+#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h"
+#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "Replayer: " << this << " " << __func__ << ": "
+
+extern PerfCounters *g_journal_perf_counters;
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+namespace {
+
+uint32_t calculate_replay_delay(const utime_t &event_time,
+ int mirroring_replay_delay) {
+ if (mirroring_replay_delay <= 0) {
+ return 0;
+ }
+
+ utime_t now = ceph_clock_now();
+ if (event_time + mirroring_replay_delay <= now) {
+ return 0;
+ }
+
+ // ensure it is rounded up when converting to integer
+ return (event_time + mirroring_replay_delay - now) + 1;
+}
+
+} // anonymous namespace
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+struct Replayer<I>::C_ReplayCommitted : public Context {
+ Replayer* replayer;
+ ReplayEntry replay_entry;
+ uint64_t replay_bytes;
+ utime_t replay_start_time;
+
+ C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry,
+ uint64_t replay_bytes, const utime_t &replay_start_time)
+ : replayer(replayer), replay_entry(std::move(replay_entry)),
+ replay_bytes(replay_bytes), replay_start_time(replay_start_time) {
+ }
+
+ void finish(int r) override {
+ replayer->handle_process_entry_safe(replay_entry, replay_bytes,
+ replay_start_time, r);
+ }
+};
+
+template <typename I>
+struct Replayer<I>::RemoteJournalerListener
+ : public ::journal::JournalMetadataListener {
+ Replayer* replayer;
+
+ RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {}
+
+ void handle_update(::journal::JournalMetadata*) override {
+ auto ctx = new C_TrackedOp(
+ replayer->m_in_flight_op_tracker,
+ new LambdaContext([this](int r) {
+ replayer->handle_remote_journal_metadata_updated();
+ }));
+ replayer->m_threads->work_queue->queue(ctx, 0);
+ }
+};
+
+template <typename I>
+struct Replayer<I>::RemoteReplayHandler : public ::journal::ReplayHandler {
+ Replayer* replayer;
+
+ RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {}
+ ~RemoteReplayHandler() override {};
+
+ void handle_entries_available() override {
+ replayer->handle_replay_ready();
+ }
+
+ void handle_complete(int r) override {
+ std::string error;
+ if (r == -ENOMEM) {
+ error = "not enough memory in autotune cache";
+ } else if (r < 0) {
+ error = "replay completed with error: " + cpp_strerror(r);
+ }
+ replayer->handle_replay_complete(r, error);
+ }
+};
+
+template <typename I>
+struct Replayer<I>::LocalJournalListener
+ : public librbd::journal::Listener {
+ Replayer* replayer;
+
+ LocalJournalListener(Replayer* replayer) : replayer(replayer) {
+ }
+
+ void handle_close() override {
+ replayer->handle_replay_complete(0, "");
+ }
+
+ void handle_promoted() override {
+ replayer->handle_replay_complete(0, "force promoted");
+ }
+
+ void handle_resync() override {
+ replayer->handle_resync_image();
+ }
+};
+
+template <typename I>
+Replayer<I>::Replayer(
+ Threads<I>* threads,
+ const std::string& local_mirror_uuid,
+ StateBuilder<I>* state_builder,
+ ReplayerListener* replayer_listener)
+ : m_threads(threads),
+ m_local_mirror_uuid(local_mirror_uuid),
+ m_state_builder(state_builder),
+ m_replayer_listener(replayer_listener),
+ m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
+ "rbd::mirror::image_replayer::journal::Replayer", this))) {
+ dout(10) << dendl;
+}
+
+template <typename I>
+Replayer<I>::~Replayer() {
+ dout(10) << dendl;
+
+ {
+ std::unique_lock locker{m_lock};
+ unregister_perf_counters();
+ }
+
+ ceph_assert(m_remote_listener == nullptr);
+ ceph_assert(m_local_journal_listener == nullptr);
+ ceph_assert(m_local_journal_replay == nullptr);
+ ceph_assert(m_remote_replay_handler == nullptr);
+ ceph_assert(m_event_preprocessor == nullptr);
+ ceph_assert(m_replay_status_formatter == nullptr);
+ ceph_assert(m_delayed_preprocess_task == nullptr);
+ ceph_assert(m_flush_local_replay_task == nullptr);
+ ceph_assert(m_state_builder->local_image_ctx == nullptr);
+}
+
+template <typename I>
+void Replayer<I>::init(Context* on_finish) {
+ dout(10) << dendl;
+
+ {
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+ std::shared_lock image_locker{local_image_ctx->image_lock};
+ m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
+ local_image_ctx->name);
+ }
+
+ {
+ std::unique_lock locker{m_lock};
+ register_perf_counters();
+ }
+
+ ceph_assert(m_on_init_shutdown == nullptr);
+ m_on_init_shutdown = on_finish;
+
+ init_remote_journaler();
+}
+
+template <typename I>
+void Replayer<I>::shut_down(Context* on_finish) {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_on_init_shutdown == nullptr);
+ m_on_init_shutdown = on_finish;
+
+ if (m_state == STATE_INIT) {
+ // raced with the last piece of the init state machine
+ return;
+ } else if (m_state == STATE_REPLAYING) {
+ m_state = STATE_COMPLETE;
+ }
+
+ // if shutting down due to an error notification, we don't
+ // need to propagate the same error again
+ m_error_code = 0;
+ m_error_description = "";
+
+ cancel_delayed_preprocess_task();
+ cancel_flush_local_replay_task();
+ wait_for_flush();
+}
+
+template <typename I>
+void Replayer<I>::flush(Context* on_finish) {
+ dout(10) << dendl;
+
+ flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker, on_finish));
+}
+
+template <typename I>
+bool Replayer<I>::get_replay_status(std::string* description,
+ Context* on_finish) {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (m_replay_status_formatter == nullptr) {
+ derr << "replay not running" << dendl;
+ locker.unlock();
+
+ on_finish->complete(-EAGAIN);
+ return false;
+ }
+
+ on_finish = new C_TrackedOp(m_in_flight_op_tracker, on_finish);
+ return m_replay_status_formatter->get_or_send_update(description,
+ on_finish);
+}
+
+template <typename I>
+void Replayer<I>::init_remote_journaler() {
+ dout(10) << dendl;
+
+ Context *ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_init_remote_journaler>(this);
+ m_state_builder->remote_journaler->init(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_init_remote_journaler(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(locker, r, "error initializing remote journal");
+ close_local_image();
+ return;
+ }
+
+ // listen for metadata updates to check for disconnect events
+ ceph_assert(m_remote_listener == nullptr);
+ m_remote_listener = new RemoteJournalerListener(this);
+ m_state_builder->remote_journaler->add_listener(m_remote_listener);
+
+ cls::journal::Client remote_client;
+ r = m_state_builder->remote_journaler->get_cached_client(m_local_mirror_uuid,
+ &remote_client);
+ if (r < 0) {
+ derr << "error retrieving remote journal client: " << cpp_strerror(r)
+ << dendl;
+ handle_replay_complete(locker, r, "error retrieving remote journal client");
+ close_local_image();
+ return;
+ }
+
+ std::string error;
+ r = validate_remote_client_state(remote_client,
+ &m_state_builder->remote_client_meta,
+ &m_resync_requested, &error);
+ if (r < 0) {
+ handle_replay_complete(locker, r, error);
+ close_local_image();
+ return;
+ }
+
+ start_external_replay(locker);
+}
+
+template <typename I>
+void Replayer<I>::start_external_replay(std::unique_lock<ceph::mutex>& locker) {
+ dout(10) << dendl;
+
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+ std::shared_lock local_image_locker{local_image_ctx->image_lock};
+
+ ceph_assert(m_local_journal == nullptr);
+ m_local_journal = local_image_ctx->journal;
+ if (m_local_journal == nullptr) {
+ local_image_locker.unlock();
+
+ derr << "local image journal closed" << dendl;
+ handle_replay_complete(locker, -EINVAL, "error accessing local journal");
+ close_local_image();
+ return;
+ }
+
+ // safe to hold pointer to journal after external playback starts
+ Context *start_ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_start_external_replay>(this);
+ m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_start_external_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ ceph_assert(m_local_journal_replay == nullptr);
+ derr << "error starting external replay on local image "
+ << m_state_builder->local_image_ctx->id << ": "
+ << cpp_strerror(r) << dendl;
+
+ handle_replay_complete(locker, r, "error starting replay on local image");
+ close_local_image();
+ return;
+ }
+
+ if (!notify_init_complete(locker)) {
+ return;
+ }
+
+ m_state = STATE_REPLAYING;
+
+ // check for resync/promotion state after adding listener
+ if (!add_local_journal_listener(locker)) {
+ return;
+ }
+
+ // start remote journal replay
+ m_event_preprocessor = EventPreprocessor<I>::create(
+ *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler,
+ m_local_mirror_uuid, &m_state_builder->remote_client_meta,
+ m_threads->work_queue);
+ m_replay_status_formatter = ReplayStatusFormatter<I>::create(
+ m_state_builder->remote_journaler, m_local_mirror_uuid);
+
+ auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
+ double poll_seconds = cct->_conf.get_val<double>(
+ "rbd_mirror_journal_poll_age");
+ m_remote_replay_handler = new RemoteReplayHandler(this);
+ m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler,
+ poll_seconds);
+
+ notify_status_updated();
+}
+
+template <typename I>
+bool Replayer<I>::add_local_journal_listener(
+ std::unique_lock<ceph::mutex>& locker) {
+ dout(10) << dendl;
+
+ // listen for promotion and resync requests against local journal
+ ceph_assert(m_local_journal_listener == nullptr);
+ m_local_journal_listener = new LocalJournalListener(this);
+ m_local_journal->add_listener(m_local_journal_listener);
+
+ // verify that the local image wasn't force-promoted and that a resync hasn't
+ // been requested now that we are listening for events
+ if (m_local_journal->is_tag_owner()) {
+ dout(10) << "local image force-promoted" << dendl;
+ handle_replay_complete(locker, 0, "force promoted");
+ return false;
+ }
+
+ bool resync_requested = false;
+ int r = m_local_journal->is_resync_requested(&resync_requested);
+ if (r < 0) {
+ dout(10) << "failed to determine resync state: " << cpp_strerror(r)
+ << dendl;
+ handle_replay_complete(locker, r, "error parsing resync state");
+ return false;
+ } else if (resync_requested) {
+ dout(10) << "local image resync requested" << dendl;
+ handle_replay_complete(locker, 0, "resync requested");
+ return false;
+ }
+
+ return true;
+}
+
+template <typename I>
+bool Replayer<I>::notify_init_complete(std::unique_lock<ceph::mutex>& locker) {
+ dout(10) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(m_state == STATE_INIT);
+
+ // notify that init has completed
+ Context *on_finish = nullptr;
+ std::swap(m_on_init_shutdown, on_finish);
+
+ locker.unlock();
+ on_finish->complete(0);
+ locker.lock();
+
+ if (m_on_init_shutdown != nullptr) {
+ // shut down requested after we notified init complete but before we
+ // grabbed the lock
+ close_local_image();
+ return false;
+ }
+
+ return true;
+}
+
+template <typename I>
+void Replayer<I>::wait_for_flush() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ // ensure that we don't have two concurrent local journal replay shut downs
+ dout(10) << dendl;
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_wait_for_flush>(this));
+ m_flush_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_flush(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ shut_down_local_journal_replay();
+}
+
+template <typename I>
+void Replayer<I>::shut_down_local_journal_replay() {
+ std::unique_lock locker{m_lock};
+
+ if (m_local_journal_replay == nullptr) {
+ wait_for_event_replay();
+ return;
+ }
+
+ // It's required to stop the local journal replay state machine prior to
+ // waiting for the events to complete. This is to ensure that IO is properly
+ // flushed (it might be batched), wait for any running ops to complete, and
+ // to cancel any ops waiting for their associated OnFinish events.
+ dout(10) << dendl;
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_shut_down_local_journal_replay>(this);
+ m_local_journal_replay->shut_down(true, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_shut_down_local_journal_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl;
+ handle_replay_error(r, "failed to shut down local journal replay");
+ }
+
+ wait_for_event_replay();
+}
+
+template <typename I>
+void Replayer<I>::wait_for_event_replay() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(10) << dendl;
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_wait_for_event_replay>(this));
+ m_event_replay_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_event_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ close_local_image();
+}
+
+template <typename I>
+void Replayer<I>::close_local_image() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ if (m_state_builder->local_image_ctx == nullptr) {
+ stop_remote_journaler_replay();
+ return;
+ }
+
+ dout(10) << dendl;
+ if (m_local_journal_listener != nullptr) {
+ // blocks if listener notification is in-progress
+ m_local_journal->remove_listener(m_local_journal_listener);
+ delete m_local_journal_listener;
+ m_local_journal_listener = nullptr;
+ }
+
+ if (m_local_journal_replay != nullptr) {
+ m_local_journal->stop_external_replay();
+ m_local_journal_replay = nullptr;
+ }
+
+ if (m_event_preprocessor != nullptr) {
+ image_replayer::journal::EventPreprocessor<I>::destroy(
+ m_event_preprocessor);
+ m_event_preprocessor = nullptr;
+ }
+
+ m_local_journal.reset();
+
+ // NOTE: it's important to ensure that the local image is fully
+ // closed before attempting to close the remote journal in
+ // case the remote cluster is unreachable
+ ceph_assert(m_state_builder->local_image_ctx != nullptr);
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_close_local_image>(this);
+ auto request = image_replayer::CloseImageRequest<I>::create(
+ &m_state_builder->local_image_ctx, ctx);
+ request->send();
+}
+
+
+template <typename I>
+void Replayer<I>::handle_close_local_image(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ derr << "error closing local iamge: " << cpp_strerror(r) << dendl;
+ handle_replay_error(r, "failed to close local image");
+ }
+
+ ceph_assert(m_state_builder->local_image_ctx == nullptr);
+ stop_remote_journaler_replay();
+}
+
+template <typename I>
+void Replayer<I>::stop_remote_journaler_replay() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (m_state_builder->remote_journaler == nullptr) {
+ wait_for_in_flight_ops();
+ return;
+ } else if (m_remote_replay_handler == nullptr) {
+ wait_for_in_flight_ops();
+ return;
+ }
+
+ dout(10) << dendl;
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_stop_remote_journaler_replay>(this));
+ m_state_builder->remote_journaler->stop_replay(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_stop_remote_journaler_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (r < 0) {
+ derr << "failed to stop remote journaler replay : " << cpp_strerror(r)
+ << dendl;
+ handle_replay_error(r, "failed to stop remote journaler replay");
+ }
+
+ delete m_remote_replay_handler;
+ m_remote_replay_handler = nullptr;
+
+ wait_for_in_flight_ops();
+}
+
+template <typename I>
+void Replayer<I>::wait_for_in_flight_ops() {
+ dout(10) << dendl;
+ if (m_remote_listener != nullptr) {
+ m_state_builder->remote_journaler->remove_listener(m_remote_listener);
+ delete m_remote_listener;
+ m_remote_listener = nullptr;
+ }
+
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
+ m_in_flight_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
+ m_replay_status_formatter = nullptr;
+
+ Context* on_init_shutdown = nullptr;
+ {
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_on_init_shutdown != nullptr);
+ std::swap(m_on_init_shutdown, on_init_shutdown);
+ m_state = STATE_COMPLETE;
+ }
+ on_init_shutdown->complete(m_error_code);
+}
+
+template <typename I>
+void Replayer<I>::handle_remote_journal_metadata_updated() {
+ dout(20) << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ cls::journal::Client remote_client;
+ int r = m_state_builder->remote_journaler->get_cached_client(
+ m_local_mirror_uuid, &remote_client);
+ if (r < 0) {
+ derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ librbd::journal::MirrorPeerClientMeta remote_client_meta;
+ std::string error;
+ r = validate_remote_client_state(remote_client, &remote_client_meta,
+ &m_resync_requested, &error);
+ if (r < 0) {
+ dout(0) << "client flagged disconnected, stopping image replay" << dendl;
+ handle_replay_complete(locker, r, error);
+ }
+}
+
+template <typename I>
+void Replayer<I>::schedule_flush_local_replay_task() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
+ return;
+ }
+
+ dout(15) << dendl;
+ m_flush_local_replay_task = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_flush_local_replay_task>(this));
+ m_threads->timer->add_event_after(30, m_flush_local_replay_task);
+}
+
+template <typename I>
+void Replayer<I>::cancel_flush_local_replay_task() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_flush_local_replay_task != nullptr) {
+ dout(10) << dendl;
+ m_threads->timer->cancel_event(m_flush_local_replay_task);
+ m_flush_local_replay_task = nullptr;
+ }
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_local_replay_task(int) {
+ dout(15) << dendl;
+
+ m_in_flight_op_tracker.start_op();
+ auto on_finish = new LambdaContext([this](int) {
+ std::unique_lock locker{m_lock};
+
+ {
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ m_flush_local_replay_task = nullptr;
+ }
+
+ notify_status_updated();
+ m_in_flight_op_tracker.finish_op();
+ });
+ flush_local_replay(on_finish);
+}
+
+template <typename I>
+void Replayer<I>::flush_local_replay(Context* on_flush) {
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ locker.unlock();
+ on_flush->complete(0);
+ return;
+ } else if (m_local_journal_replay == nullptr) {
+ // raced w/ a tag creation stop/start, which implies that
+ // the replay is flushed
+ locker.unlock();
+ flush_commit_position(on_flush);
+ return;
+ }
+
+ dout(15) << dendl;
+ auto ctx = new LambdaContext(
+ [this, on_flush](int r) {
+ handle_flush_local_replay(on_flush, r);
+ });
+ m_local_journal_replay->flush(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_local_replay(Context* on_flush, int r) {
+ dout(15) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
+ on_flush->complete(r);
+ return;
+ }
+
+ flush_commit_position(on_flush);
+}
+
+template <typename I>
+void Replayer<I>::flush_commit_position(Context* on_flush) {
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ locker.unlock();
+ on_flush->complete(0);
+ return;
+ }
+
+ dout(15) << dendl;
+ auto ctx = new LambdaContext(
+ [this, on_flush](int r) {
+ handle_flush_commit_position(on_flush, r);
+ });
+ m_state_builder->remote_journaler->flush_commit_position(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_flush_commit_position(Context* on_flush, int r) {
+ dout(15) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "error flushing remote journal commit position: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ on_flush->complete(r);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_error(int r, const std::string &error) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (m_error_code == 0) {
+ m_error_code = r;
+ m_error_description = error;
+ }
+}
+
+template <typename I>
+bool Replayer<I>::is_replay_complete() const {
+ std::unique_lock locker{m_lock};
+ return is_replay_complete(locker);
+}
+
+template <typename I>
+bool Replayer<I>::is_replay_complete(
+ const std::unique_lock<ceph::mutex>&) const {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ return (m_state == STATE_COMPLETE);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_complete(int r, const std::string &error) {
+ std::unique_lock locker{m_lock};
+ handle_replay_complete(locker, r, error);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_complete(
+ const std::unique_lock<ceph::mutex>&, int r, const std::string &error) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(10) << "r=" << r << ", error=" << error << dendl;
+ if (r < 0) {
+ derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
+ handle_replay_error(r, error);
+ }
+
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ m_state = STATE_COMPLETE;
+ notify_status_updated();
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_ready() {
+ std::unique_lock locker{m_lock};
+ handle_replay_ready(locker);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_ready(
+ std::unique_lock<ceph::mutex>& locker) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(20) << dendl;
+ if (is_replay_complete(locker)) {
+ return;
+ }
+
+ if (!m_state_builder->remote_journaler->try_pop_front(&m_replay_entry,
+ &m_replay_tag_tid)) {
+ dout(20) << "no entries ready for replay" << dendl;
+ return;
+ }
+
+ // can safely drop lock once the entry is tracked
+ m_event_replay_tracker.start_op();
+ locker.unlock();
+
+ dout(20) << "entry tid=" << m_replay_entry.get_commit_tid()
+ << "tag_tid=" << m_replay_tag_tid << dendl;
+ if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) {
+ // must allocate a new local journal tag prior to processing
+ replay_flush();
+ return;
+ }
+
+ preprocess_entry();
+}
+
+template <typename I>
+void Replayer<I>::replay_flush() {
+ dout(10) << dendl;
+ m_flush_tracker.start_op();
+
+ // shut down the replay to flush all IO and ops and create a new
+ // replayer to handle the new tag epoch
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_replay_flush_shut_down>(this);
+ ceph_assert(m_local_journal_replay != nullptr);
+ m_local_journal_replay->shut_down(false, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_flush_shut_down(int r) {
+ std::unique_lock locker{m_lock};
+ dout(10) << "r=" << r << dendl;
+
+ ceph_assert(m_local_journal != nullptr);
+ ceph_assert(m_local_journal_listener != nullptr);
+
+ // blocks if listener notification is in-progress
+ m_local_journal->remove_listener(m_local_journal_listener);
+ delete m_local_journal_listener;
+ m_local_journal_listener = nullptr;
+
+ m_local_journal->stop_external_replay();
+ m_local_journal_replay = nullptr;
+ m_local_journal.reset();
+
+ if (r < 0) {
+ locker.unlock();
+
+ handle_replay_flush(r);
+ return;
+ }
+
+ // journal might have been closed now that we stopped external replay
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+ std::shared_lock local_image_locker{local_image_ctx->image_lock};
+ m_local_journal = local_image_ctx->journal;
+ if (m_local_journal == nullptr) {
+ local_image_locker.unlock();
+ locker.unlock();
+
+ derr << "local image journal closed" << dendl;
+ handle_replay_flush(-EINVAL);
+ return;
+ }
+
+ auto ctx = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_replay_flush>(this);
+ m_local_journal->start_external_replay(&m_local_journal_replay, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_replay_flush(int r) {
+ std::unique_lock locker{m_lock};
+ dout(10) << "r=" << r << dendl;
+ m_flush_tracker.finish_op();
+
+ if (r < 0) {
+ derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(locker, r, "replay flush encountered an error");
+ m_event_replay_tracker.finish_op();
+ return;
+ } else if (is_replay_complete(locker)) {
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ // check for resync/promotion state after adding listener
+ if (!add_local_journal_listener(locker)) {
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+ locker.unlock();
+
+ get_remote_tag();
+}
+
+template <typename I>
+void Replayer<I>::get_remote_tag() {
+ dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
+
+ Context *ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_get_remote_tag>(this);
+ m_state_builder->remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag,
+ ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_get_remote_tag(int r) {
+ dout(15) << "r=" << r << dendl;
+
+ if (r == 0) {
+ try {
+ auto it = m_replay_tag.data.cbegin();
+ decode(m_replay_tag_data, it);
+ } catch (const buffer::error &err) {
+ r = -EBADMSG;
+ }
+ }
+
+ if (r < 0) {
+ derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
+ << cpp_strerror(r) << dendl;
+ handle_replay_complete(r, "failed to retrieve remote tag");
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ m_replay_tag_valid = true;
+ dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
+ << m_replay_tag_data << dendl;
+
+ allocate_local_tag();
+}
+
+template <typename I>
+void Replayer<I>::allocate_local_tag() {
+ dout(15) << dendl;
+
+ std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
+ if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ mirror_uuid = m_state_builder->remote_mirror_uuid;
+ } else if (mirror_uuid == m_local_mirror_uuid) {
+ mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
+ } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
+ // handle possible edge condition where daemon can failover and
+ // the local image has already been promoted/demoted
+ auto local_tag_data = m_local_journal->get_tag_data();
+ if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
+ (local_tag_data.predecessor.commit_valid &&
+ local_tag_data.predecessor.mirror_uuid ==
+ librbd::Journal<>::LOCAL_MIRROR_UUID)) {
+ dout(15) << "skipping stale demotion event" << dendl;
+ handle_process_entry_safe(m_replay_entry, m_replay_bytes,
+ m_replay_start_time, 0);
+ handle_replay_ready();
+ return;
+ } else {
+ dout(5) << "encountered image demotion: stopping" << dendl;
+ handle_replay_complete(0, "");
+ }
+ }
+
+ librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
+ if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
+ predecessor.mirror_uuid = m_state_builder->remote_mirror_uuid;
+ } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
+ predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
+ }
+
+ dout(15) << "mirror_uuid=" << mirror_uuid << ", "
+ << "predecessor=" << predecessor << ", "
+ << "replay_tag_tid=" << m_replay_tag_tid << dendl;
+ Context *ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_allocate_local_tag>(this);
+ m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_allocate_local_tag(int r) {
+ dout(15) << "r=" << r << ", "
+ << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
+ if (r < 0) {
+ derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(r, "failed to allocate journal tag");
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ preprocess_entry();
+}
+
+template <typename I>
+void Replayer<I>::preprocess_entry() {
+ dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
+ << dendl;
+
+ bufferlist data = m_replay_entry.get_data();
+ auto it = data.cbegin();
+ int r = m_local_journal_replay->decode(&it, &m_event_entry);
+ if (r < 0) {
+ derr << "failed to decode journal event" << dendl;
+ handle_replay_complete(r, "failed to decode journal event");
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ m_replay_bytes = data.length();
+ uint32_t delay = calculate_replay_delay(
+ m_event_entry.timestamp,
+ m_state_builder->local_image_ctx->mirroring_replay_delay);
+ if (delay == 0) {
+ handle_preprocess_entry_ready(0);
+ return;
+ }
+
+ std::unique_lock locker{m_lock};
+ if (is_replay_complete(locker)) {
+ // don't schedule a delayed replay task if a shut-down is in-progress
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ dout(20) << "delaying replay by " << delay << " sec" << dendl;
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ ceph_assert(m_delayed_preprocess_task == nullptr);
+ m_delayed_preprocess_task = create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_delayed_preprocess_task>(this);
+ m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
+}
+
+template <typename I>
+void Replayer<I>::handle_delayed_preprocess_task(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
+ m_delayed_preprocess_task = nullptr;
+
+ m_threads->work_queue->queue(create_context_callback<
+ Replayer, &Replayer<I>::handle_preprocess_entry_ready>(this), 0);
+}
+
+template <typename I>
+void Replayer<I>::handle_preprocess_entry_ready(int r) {
+ dout(20) << "r=" << r << dendl;
+ ceph_assert(r == 0);
+
+ m_replay_start_time = ceph_clock_now();
+ if (!m_event_preprocessor->is_required(m_event_entry)) {
+ process_entry();
+ return;
+ }
+
+ Context *ctx = create_context_callback<
+ Replayer, &Replayer<I>::handle_preprocess_entry_safe>(this);
+ m_event_preprocessor->preprocess(&m_event_entry, ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_preprocess_entry_safe(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ if (r == -ECANCELED) {
+ handle_replay_complete(0, "lost exclusive lock");
+ } else {
+ derr << "failed to preprocess journal event" << dendl;
+ handle_replay_complete(r, "failed to preprocess journal event");
+ }
+
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
+ process_entry();
+}
+
+template <typename I>
+void Replayer<I>::process_entry() {
+ dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
+ << dendl;
+
+ Context *on_ready = create_context_callback<
+ Replayer, &Replayer<I>::handle_process_entry_ready>(this);
+ Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
+ m_replay_bytes,
+ m_replay_start_time);
+
+ m_local_journal_replay->process(m_event_entry, on_ready, on_commit);
+}
+
+template <typename I>
+void Replayer<I>::handle_process_entry_ready(int r) {
+ std::unique_lock locker{m_lock};
+
+ dout(20) << dendl;
+ ceph_assert(r == 0);
+
+ bool update_status = false;
+ {
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+ std::shared_lock image_locker{local_image_ctx->image_lock};
+ auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx,
+ local_image_ctx->name);
+ if (m_image_spec != image_spec) {
+ m_image_spec = image_spec;
+ update_status = true;
+ }
+ }
+
+ m_replay_status_formatter->handle_entry_processed(m_replay_bytes);
+
+ if (update_status) {
+ unregister_perf_counters();
+ register_perf_counters();
+ notify_status_updated();
+ }
+
+ // attempt to process the next event
+ handle_replay_ready(locker);
+}
+
+template <typename I>
+void Replayer<I>::handle_process_entry_safe(
+ const ReplayEntry &replay_entry, uint64_t replay_bytes,
+ const utime_t &replay_start_time, int r) {
+ dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
+ << dendl;
+
+ if (r < 0) {
+ derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
+ handle_replay_complete(r, "failed to commit journal event");
+ } else {
+ ceph_assert(m_state_builder->remote_journaler != nullptr);
+ m_state_builder->remote_journaler->committed(replay_entry);
+ }
+
+ auto latency = ceph_clock_now() - replay_start_time;
+ if (g_journal_perf_counters) {
+ g_journal_perf_counters->inc(l_rbd_mirror_replay);
+ g_journal_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
+ g_journal_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
+ }
+
+ auto ctx = new LambdaContext(
+ [this, replay_bytes, latency](int r) {
+ std::unique_lock locker{m_lock};
+ schedule_flush_local_replay_task();
+
+ if (m_perf_counters) {
+ m_perf_counters->inc(l_rbd_mirror_replay);
+ m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes);
+ m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
+ }
+
+ m_event_replay_tracker.finish_op();
+ });
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void Replayer<I>::handle_resync_image() {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ m_resync_requested = true;
+ handle_replay_complete(locker, 0, "resync requested");
+}
+
+template <typename I>
+void Replayer<I>::notify_status_updated() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ dout(10) << dendl;
+
+ auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
+ [this](int) {
+ m_replayer_listener->handle_notification();
+ }));
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void Replayer<I>::cancel_delayed_preprocess_task() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ bool canceled_delayed_preprocess_task = false;
+ {
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_delayed_preprocess_task != nullptr) {
+ dout(10) << dendl;
+ canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
+ m_delayed_preprocess_task);
+ ceph_assert(canceled_delayed_preprocess_task);
+ m_delayed_preprocess_task = nullptr;
+ }
+ }
+
+ if (canceled_delayed_preprocess_task) {
+ // wake up sleeping replay
+ m_event_replay_tracker.finish_op();
+ }
+}
+
+template <typename I>
+int Replayer<I>::validate_remote_client_state(
+ const cls::journal::Client& remote_client,
+ librbd::journal::MirrorPeerClientMeta* remote_client_meta,
+ bool* resync_requested, std::string* error) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (!util::decode_client_meta(remote_client, remote_client_meta)) {
+ // require operator intervention since the data is corrupt
+ *error = "error retrieving remote journal client";
+ return -EBADMSG;
+ }
+
+ auto local_image_ctx = m_state_builder->local_image_ctx;
+ dout(5) << "image_id=" << local_image_ctx->id << ", "
+ << "remote_client_meta.image_id="
+ << remote_client_meta->image_id << ", "
+ << "remote_client.state=" << remote_client.state << dendl;
+ if (remote_client_meta->image_id == local_image_ctx->id &&
+ remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+ dout(5) << "client flagged disconnected, stopping image replay" << dendl;
+ if (local_image_ctx->config.template get_val<bool>(
+ "rbd_mirroring_resync_after_disconnect")) {
+ dout(10) << "disconnected: automatic resync" << dendl;
+ *resync_requested = true;
+ *error = "disconnected: automatic resync";
+ return -ENOTCONN;
+ } else {
+ dout(10) << "disconnected" << dendl;
+ *error = "disconnected";
+ return -ENOTCONN;
+ }
+ }
+
+ return 0;
+}
+
+template <typename I>
+void Replayer<I>::register_perf_counters() {
+ dout(5) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(m_perf_counters == nullptr);
+
+ auto cct = static_cast<CephContext *>(m_state_builder->local_image_ctx->cct);
+ auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_image_perf_stats_prio");
+ PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec,
+ l_rbd_mirror_journal_first, l_rbd_mirror_journal_last);
+ plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
+ plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
+ "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
+ plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
+ "Replay latency", "rl", prio);
+ m_perf_counters = plb.create_perf_counters();
+ g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
+}
+
+template <typename I>
+void Replayer<I>::unregister_perf_counters() {
+ dout(5) << dendl;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ PerfCounters *perf_counters = nullptr;
+ std::swap(perf_counters, m_perf_counters);
+
+ if (perf_counters != nullptr) {
+ g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
+ delete perf_counters;
+ }
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.h b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h
new file mode 100644
index 000000000..6b1f36d9c
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h
@@ -0,0 +1,323 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
+
+#include "tools/rbd_mirror/image_replayer/Replayer.h"
+#include "include/utime.h"
+#include "common/AsyncOpTracker.h"
+#include "common/ceph_mutex.h"
+#include "common/RefCountedObj.h"
+#include "cls/journal/cls_journal_types.h"
+#include "journal/ReplayEntry.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+#include <string>
+#include <type_traits>
+
+namespace journal { class Journaler; }
+namespace librbd {
+
+struct ImageCtx;
+namespace journal { template <typename I> class Replay; }
+
+} // namespace librbd
+
+namespace rbd {
+namespace mirror {
+
+template <typename> struct Threads;
+
+namespace image_replayer {
+
+struct ReplayerListener;
+
+namespace journal {
+
+template <typename> class EventPreprocessor;
+template <typename> class ReplayStatusFormatter;
+template <typename> class StateBuilder;
+
+template <typename ImageCtxT>
+class Replayer : public image_replayer::Replayer {
+public:
+ typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
+
+ static Replayer* create(
+ Threads<ImageCtxT>* threads,
+ const std::string& local_mirror_uuid,
+ StateBuilder<ImageCtxT>* state_builder,
+ ReplayerListener* replayer_listener) {
+ return new Replayer(threads, local_mirror_uuid, state_builder,
+ replayer_listener);
+ }
+
+ Replayer(
+ Threads<ImageCtxT>* threads,
+ const std::string& local_mirror_uuid,
+ StateBuilder<ImageCtxT>* state_builder,
+ ReplayerListener* replayer_listener);
+ ~Replayer();
+
+ void destroy() override {
+ delete this;
+ }
+
+ void init(Context* on_finish) override;
+ void shut_down(Context* on_finish) override;
+
+ void flush(Context* on_finish) override;
+
+ bool get_replay_status(std::string* description, Context* on_finish) override;
+
+ bool is_replaying() const override {
+ std::unique_lock locker{m_lock};
+ return (m_state == STATE_REPLAYING);
+ }
+
+ bool is_resync_requested() const override {
+ std::unique_lock locker(m_lock);
+ return m_resync_requested;
+ }
+
+ int get_error_code() const override {
+ std::unique_lock locker(m_lock);
+ return m_error_code;
+ }
+
+ std::string get_error_description() const override {
+ std::unique_lock locker(m_lock);
+ return m_error_description;
+ }
+
+ std::string get_image_spec() const {
+ std::unique_lock locker(m_lock);
+ return m_image_spec;
+ }
+
+private:
+ /**
+ * @verbatim
+ *
+ * <init>
+ * |
+ * v (error)
+ * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * *
+ * | *
+ * v (error) *
+ * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * *
+ * | *
+ * | /--------------------------------------------\ *
+ * | | | *
+ * v v (asok flush) | *
+ * REPLAYING -------------> LOCAL_REPLAY_FLUSH | *
+ * | \ | | *
+ * | | v | *
+ * | | FLUSH_COMMIT_POSITION | *
+ * | | | | *
+ * | | \--------------------/| *
+ * | | | *
+ * | | (entries available) | *
+ * | \-----------> REPLAY_READY | *
+ * | | | *
+ * | | (skip if not | *
+ * | v needed) (error) *
+ * | REPLAY_FLUSH * * * * * * * * * *
+ * | | | * *
+ * | | (skip if not | * *
+ * | v needed) (error) * *
+ * | GET_REMOTE_TAG * * * * * * * * *
+ * | | | * *
+ * | | (skip if not | * *
+ * | v needed) (error) * *
+ * | ALLOCATE_LOCAL_TAG * * * * * * *
+ * | | | * *
+ * | v (error) * *
+ * | PREPROCESS_ENTRY * * * * * * * *
+ * | | | * *
+ * | v (error) * *
+ * | PROCESS_ENTRY * * * * * * * * * *
+ * | | | * *
+ * | \---------------------/ * *
+ * v (shutdown) * *
+ * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * *
+ * | *
+ * v *
+ * WAIT_FOR_FLUSH *
+ * | *
+ * v *
+ * SHUT_DOWN_LOCAL_JOURNAL_REPLAY *
+ * | *
+ * v *
+ * WAIT_FOR_REPLAY *
+ * | *
+ * v *
+ * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * *
+ * |
+ * v (skip if not started)
+ * STOP_REMOTE_JOURNALER_REPLAY
+ * |
+ * v
+ * WAIT_FOR_IN_FLIGHT_OPS
+ * |
+ * v
+ * <shutdown>
+ *
+ * @endverbatim
+ */
+
+ typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
+
+ enum State {
+ STATE_INIT,
+ STATE_REPLAYING,
+ STATE_COMPLETE
+ };
+
+ struct C_ReplayCommitted;
+ struct RemoteJournalerListener;
+ struct RemoteReplayHandler;
+ struct LocalJournalListener;
+
+ Threads<ImageCtxT>* m_threads;
+ std::string m_local_mirror_uuid;
+ StateBuilder<ImageCtxT>* m_state_builder;
+ ReplayerListener* m_replayer_listener;
+
+ mutable ceph::mutex m_lock;
+
+ std::string m_image_spec;
+ Context* m_on_init_shutdown = nullptr;
+
+ State m_state = STATE_INIT;
+ int m_error_code = 0;
+ std::string m_error_description;
+ bool m_resync_requested = false;
+
+ ceph::ref_t<typename std::remove_pointer<decltype(ImageCtxT::journal)>::type>
+ m_local_journal;
+ RemoteJournalerListener* m_remote_listener = nullptr;
+
+ librbd::journal::Replay<ImageCtxT>* m_local_journal_replay = nullptr;
+ EventPreprocessor<ImageCtxT>* m_event_preprocessor = nullptr;
+ ReplayStatusFormatter<ImageCtxT>* m_replay_status_formatter = nullptr;
+ RemoteReplayHandler* m_remote_replay_handler = nullptr;
+ LocalJournalListener* m_local_journal_listener = nullptr;
+
+ PerfCounters *m_perf_counters = nullptr;
+
+ ReplayEntry m_replay_entry;
+ uint64_t m_replay_bytes = 0;
+ utime_t m_replay_start_time;
+ bool m_replay_tag_valid = false;
+ uint64_t m_replay_tag_tid = 0;
+ cls::journal::Tag m_replay_tag;
+ librbd::journal::TagData m_replay_tag_data;
+ librbd::journal::EventEntry m_event_entry;
+
+ AsyncOpTracker m_flush_tracker;
+
+ AsyncOpTracker m_event_replay_tracker;
+ Context *m_delayed_preprocess_task = nullptr;
+
+ AsyncOpTracker m_in_flight_op_tracker;
+ Context *m_flush_local_replay_task = nullptr;
+
+ void handle_remote_journal_metadata_updated();
+
+ void schedule_flush_local_replay_task();
+ void cancel_flush_local_replay_task();
+ void handle_flush_local_replay_task(int r);
+
+ void flush_local_replay(Context* on_flush);
+ void handle_flush_local_replay(Context* on_flush, int r);
+
+ void flush_commit_position(Context* on_flush);
+ void handle_flush_commit_position(Context* on_flush, int r);
+
+ void init_remote_journaler();
+ void handle_init_remote_journaler(int r);
+
+ void start_external_replay(std::unique_lock<ceph::mutex>& locker);
+ void handle_start_external_replay(int r);
+
+ bool add_local_journal_listener(std::unique_lock<ceph::mutex>& locker);
+
+ bool notify_init_complete(std::unique_lock<ceph::mutex>& locker);
+
+ void wait_for_flush();
+ void handle_wait_for_flush(int r);
+
+ void shut_down_local_journal_replay();
+ void handle_shut_down_local_journal_replay(int r);
+
+ void wait_for_event_replay();
+ void handle_wait_for_event_replay(int r);
+
+ void close_local_image();
+ void handle_close_local_image(int r);
+
+ void stop_remote_journaler_replay();
+ void handle_stop_remote_journaler_replay(int r);
+
+ void wait_for_in_flight_ops();
+ void handle_wait_for_in_flight_ops(int r);
+
+ void replay_flush();
+ void handle_replay_flush_shut_down(int r);
+ void handle_replay_flush(int r);
+
+ void get_remote_tag();
+ void handle_get_remote_tag(int r);
+
+ void allocate_local_tag();
+ void handle_allocate_local_tag(int r);
+
+ void handle_replay_error(int r, const std::string &error);
+
+ bool is_replay_complete() const;
+ bool is_replay_complete(const std::unique_lock<ceph::mutex>& locker) const;
+
+ void handle_replay_complete(int r, const std::string &error_desc);
+ void handle_replay_complete(const std::unique_lock<ceph::mutex>&,
+ int r, const std::string &error_desc);
+ void handle_replay_ready();
+ void handle_replay_ready(std::unique_lock<ceph::mutex>& locker);
+
+ void preprocess_entry();
+ void handle_delayed_preprocess_task(int r);
+ void handle_preprocess_entry_ready(int r);
+ void handle_preprocess_entry_safe(int r);
+
+ void process_entry();
+ void handle_process_entry_ready(int r);
+ void handle_process_entry_safe(const ReplayEntry& replay_entry,
+ uint64_t relay_bytes,
+ const utime_t &replay_start_time, int r);
+
+ void handle_resync_image();
+
+ void notify_status_updated();
+
+ void cancel_delayed_preprocess_task();
+
+ int validate_remote_client_state(
+ const cls::journal::Client& remote_client,
+ librbd::journal::MirrorPeerClientMeta* remote_client_meta,
+ bool* resync_requested, std::string* error);
+
+ void register_perf_counters();
+ void unregister_perf_counters();
+
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
diff --git a/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.cc b/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.cc
new file mode 100644
index 000000000..5f1fb0e2f
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.cc
@@ -0,0 +1,149 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "StateBuilder.h"
+#include "include/ceph_assert.h"
+#include "include/Context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h"
+#include "tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
+#include "tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "StateBuilder: " << this << " " \
+ << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+template <typename I>
+StateBuilder<I>::StateBuilder(const std::string& global_image_id)
+ : image_replayer::StateBuilder<I>(global_image_id) {
+}
+
+template <typename I>
+StateBuilder<I>::~StateBuilder() {
+ ceph_assert(remote_journaler == nullptr);
+}
+
+template <typename I>
+void StateBuilder<I>::close(Context* on_finish) {
+ dout(10) << dendl;
+
+ // close the remote journaler after closing the local image
+ // in case we have lost contact w/ the remote cluster and
+ // will block
+ on_finish = new LambdaContext([this, on_finish](int) {
+ shut_down_remote_journaler(on_finish);
+ });
+ on_finish = new LambdaContext([this, on_finish](int) {
+ this->close_local_image(on_finish);
+ });
+ this->close_remote_image(on_finish);
+}
+
+template <typename I>
+bool StateBuilder<I>::is_disconnected() const {
+ return (remote_client_state == cls::journal::CLIENT_STATE_DISCONNECTED);
+}
+
+template <typename I>
+bool StateBuilder<I>::is_linked_impl() const {
+ ceph_assert(!this->remote_mirror_uuid.empty());
+ return (local_primary_mirror_uuid == this->remote_mirror_uuid);
+}
+
+template <typename I>
+cls::rbd::MirrorImageMode StateBuilder<I>::get_mirror_image_mode() const {
+ return cls::rbd::MIRROR_IMAGE_MODE_JOURNAL;
+}
+
+template <typename I>
+image_sync::SyncPointHandler* StateBuilder<I>::create_sync_point_handler() {
+ dout(10) << dendl;
+
+ this->m_sync_point_handler = SyncPointHandler<I>::create(this);
+ return this->m_sync_point_handler;
+}
+
+template <typename I>
+BaseRequest* StateBuilder<I>::create_local_image_request(
+ Threads<I>* threads,
+ librados::IoCtx& local_io_ctx,
+ const std::string& global_image_id,
+ PoolMetaCache* pool_meta_cache,
+ ProgressContext* progress_ctx,
+ Context* on_finish) {
+ return CreateLocalImageRequest<I>::create(
+ threads, local_io_ctx, this->remote_image_ctx, this->global_image_id,
+ pool_meta_cache, progress_ctx, this, on_finish);
+}
+
+template <typename I>
+BaseRequest* StateBuilder<I>::create_prepare_replay_request(
+ const std::string& local_mirror_uuid,
+ ProgressContext* progress_ctx,
+ bool* resync_requested,
+ bool* syncing,
+ Context* on_finish) {
+ return PrepareReplayRequest<I>::create(
+ local_mirror_uuid, progress_ctx, this, resync_requested, syncing,
+ on_finish);
+}
+
+template <typename I>
+image_replayer::Replayer* StateBuilder<I>::create_replayer(
+ Threads<I>* threads,
+ InstanceWatcher<I>* instance_watcher,
+ const std::string& local_mirror_uuid,
+ PoolMetaCache* pool_meta_cache,
+ ReplayerListener* replayer_listener) {
+ return Replayer<I>::create(
+ threads, local_mirror_uuid, this, replayer_listener);
+}
+
+template <typename I>
+void StateBuilder<I>::shut_down_remote_journaler(Context* on_finish) {
+ if (remote_journaler == nullptr) {
+ on_finish->complete(0);
+ return;
+ }
+
+ dout(10) << dendl;
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_shut_down_remote_journaler(r, on_finish);
+ });
+ remote_journaler->shut_down(ctx);
+}
+
+template <typename I>
+void StateBuilder<I>::handle_shut_down_remote_journaler(int r,
+ Context* on_finish) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "failed to shut down remote journaler: " << cpp_strerror(r)
+ << dendl;
+ }
+
+ delete remote_journaler;
+ remote_journaler = nullptr;
+ on_finish->complete(r);
+}
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::StateBuilder<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.h b/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.h
new file mode 100644
index 000000000..790d1390b
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.h
@@ -0,0 +1,94 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_STATE_BUILDER_H
+#define CEPH_RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_STATE_BUILDER_H
+
+#include "tools/rbd_mirror/image_replayer/StateBuilder.h"
+#include "cls/journal/cls_journal_types.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+#include <string>
+
+struct Context;
+
+namespace librbd { struct ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+template <typename> class SyncPointHandler;
+
+template <typename ImageCtxT>
+class StateBuilder : public image_replayer::StateBuilder<ImageCtxT> {
+public:
+ typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
+ typedef typename TypeTraits::Journaler Journaler;
+
+ static StateBuilder* create(const std::string& global_image_id) {
+ return new StateBuilder(global_image_id);
+ }
+
+ StateBuilder(const std::string& global_image_id);
+ ~StateBuilder() override;
+
+ void close(Context* on_finish) override;
+
+ bool is_disconnected() const override;
+
+ cls::rbd::MirrorImageMode get_mirror_image_mode() const override;
+
+ image_sync::SyncPointHandler* create_sync_point_handler() override;
+
+ bool replay_requires_remote_image() const override {
+ return false;
+ }
+
+ BaseRequest* create_local_image_request(
+ Threads<ImageCtxT>* threads,
+ librados::IoCtx& local_io_ctx,
+ const std::string& global_image_id,
+ PoolMetaCache* pool_meta_cache,
+ ProgressContext* progress_ctx,
+ Context* on_finish) override;
+
+ BaseRequest* create_prepare_replay_request(
+ const std::string& local_mirror_uuid,
+ ProgressContext* progress_ctx,
+ bool* resync_requested,
+ bool* syncing,
+ Context* on_finish) override;
+
+ image_replayer::Replayer* create_replayer(
+ Threads<ImageCtxT>* threads,
+ InstanceWatcher<ImageCtxT>* instance_watcher,
+ const std::string& local_mirror_uuid,
+ PoolMetaCache* pool_meta_cache,
+ ReplayerListener* replayer_listener) override;
+
+ std::string local_primary_mirror_uuid;
+
+ Journaler* remote_journaler = nullptr;
+ cls::journal::ClientState remote_client_state =
+ cls::journal::CLIENT_STATE_CONNECTED;
+ librbd::journal::MirrorPeerClientMeta remote_client_meta;
+
+ SyncPointHandler<ImageCtxT>* sync_point_handler = nullptr;
+
+private:
+ bool is_linked_impl() const override;
+
+ void shut_down_remote_journaler(Context* on_finish);
+ void handle_shut_down_remote_journaler(int r, Context* on_finish);
+};
+
+} // namespace journal
+} // namespace image_replayer
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::StateBuilder<librbd::ImageCtx>;
+
+#endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_STATE_BUILDER_H
diff --git a/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.cc b/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.cc
new file mode 100644
index 000000000..66d13e555
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.cc
@@ -0,0 +1,109 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "SyncPointHandler.h"
+#include "StateBuilder.h"
+#include "include/ceph_assert.h"
+#include "include/Context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \
+ << "SyncPointHandler: " << this << " " \
+ << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+template <typename I>
+SyncPointHandler<I>::SyncPointHandler(StateBuilder<I>* state_builder)
+ : m_state_builder(state_builder),
+ m_client_meta_copy(state_builder->remote_client_meta) {
+}
+
+template <typename I>
+typename SyncPointHandler<I>::SyncPoints
+SyncPointHandler<I>::get_sync_points() const {
+ SyncPoints sync_points;
+ for (auto& sync_point : m_client_meta_copy.sync_points) {
+ sync_points.emplace_back(
+ sync_point.snap_namespace,
+ sync_point.snap_name,
+ sync_point.from_snap_name,
+ sync_point.object_number);
+ }
+ return sync_points;
+}
+
+template <typename I>
+librbd::SnapSeqs SyncPointHandler<I>::get_snap_seqs() const {
+ return m_client_meta_copy.snap_seqs;
+}
+
+template <typename I>
+void SyncPointHandler<I>::update_sync_points(
+ const librbd::SnapSeqs& snap_seqs, const SyncPoints& sync_points,
+ bool sync_complete, Context* on_finish) {
+ dout(10) << dendl;
+
+ if (sync_complete && sync_points.empty()) {
+ m_client_meta_copy.state = librbd::journal::MIRROR_PEER_STATE_REPLAYING;
+ }
+
+ m_client_meta_copy.snap_seqs = snap_seqs;
+ m_client_meta_copy.sync_points.clear();
+ for (auto& sync_point : sync_points) {
+ m_client_meta_copy.sync_points.emplace_back(
+ sync_point.snap_namespace,
+ sync_point.snap_name,
+ sync_point.from_snap_name,
+ sync_point.object_number);
+
+ if (sync_point.object_number) {
+ m_client_meta_copy.sync_object_count = std::max(
+ m_client_meta_copy.sync_object_count, *sync_point.object_number + 1);
+ }
+ }
+
+ dout(20) << "client_meta=" << m_client_meta_copy << dendl;
+ bufferlist client_data_bl;
+ librbd::journal::ClientData client_data{m_client_meta_copy};
+ encode(client_data, client_data_bl);
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_update_sync_points(r, on_finish);
+ });
+ m_state_builder->remote_journaler->update_client(client_data_bl, ctx);
+}
+
+template <typename I>
+void SyncPointHandler<I>::handle_update_sync_points(int r, Context* on_finish) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r >= 0) {
+ m_state_builder->remote_client_meta.snap_seqs =
+ m_client_meta_copy.snap_seqs;
+ m_state_builder->remote_client_meta.sync_points =
+ m_client_meta_copy.sync_points;
+ } else {
+ derr << "failed to update remote journal client meta for image "
+ << m_state_builder->global_image_id << ": " << cpp_strerror(r)
+ << dendl;
+ }
+
+ on_finish->complete(r);
+}
+
+} // namespace journal
+} // namespace image_sync
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::image_replayer::journal::SyncPointHandler<librbd::ImageCtx>;
diff --git a/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h b/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h
new file mode 100644
index 000000000..b4f492c19
--- /dev/null
+++ b/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_SYNC_POINT_HANDLER_H
+#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_SYNC_POINT_HANDLER_H
+
+#include "tools/rbd_mirror/image_sync/Types.h"
+#include "librbd/journal/Types.h"
+
+struct Context;
+namespace librbd { struct ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+namespace image_replayer {
+namespace journal {
+
+template <typename> class StateBuilder;
+
+template <typename ImageCtxT>
+class SyncPointHandler : public image_sync::SyncPointHandler {
+public:
+ using SyncPoint = image_sync::SyncPoint;
+ using SyncPoints = image_sync::SyncPoints;
+
+ static SyncPointHandler* create(StateBuilder<ImageCtxT>* state_builder) {
+ return new SyncPointHandler(state_builder);
+ }
+ SyncPointHandler(StateBuilder<ImageCtxT>* state_builder);
+
+ SyncPoints get_sync_points() const override;
+ librbd::SnapSeqs get_snap_seqs() const override;
+
+ void update_sync_points(const librbd::SnapSeqs& snap_seqs,
+ const SyncPoints& sync_points,
+ bool sync_complete,
+ Context* on_finish) override;
+
+private:
+ StateBuilder<ImageCtxT>* m_state_builder;
+
+ librbd::journal::MirrorPeerClientMeta m_client_meta_copy;
+
+ void handle_update_sync_points(int r, Context* on_finish);
+
+};
+
+} // namespace journal
+} // namespace image_sync
+} // namespace mirror
+} // namespace rbd
+
+extern template class rbd::mirror::image_replayer::journal::SyncPointHandler<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_SYNC_POINT_HANDLER_H