From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../journal/CreateLocalImageRequest.cc | 162 +++ .../journal/CreateLocalImageRequest.h | 116 ++ .../image_replayer/journal/EventPreprocessor.cc | 206 ++++ .../image_replayer/journal/EventPreprocessor.h | 127 ++ .../image_replayer/journal/PrepareReplayRequest.cc | 316 +++++ .../image_replayer/journal/PrepareReplayRequest.h | 115 ++ .../journal/ReplayStatusFormatter.cc | 284 +++++ .../image_replayer/journal/ReplayStatusFormatter.h | 70 ++ .../rbd_mirror/image_replayer/journal/Replayer.cc | 1303 ++++++++++++++++++++ .../rbd_mirror/image_replayer/journal/Replayer.h | 323 +++++ .../image_replayer/journal/StateBuilder.cc | 149 +++ .../image_replayer/journal/StateBuilder.h | 94 ++ .../image_replayer/journal/SyncPointHandler.cc | 109 ++ .../image_replayer/journal/SyncPointHandler.h | 55 + 14 files changed, 3429 insertions(+) create mode 100644 src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.cc create mode 100644 src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h create mode 100644 src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.cc create mode 100644 src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h create mode 100644 src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.cc create mode 100644 src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h create mode 100644 src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc create mode 100644 src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h create mode 100644 src/tools/rbd_mirror/image_replayer/journal/Replayer.cc create mode 100644 src/tools/rbd_mirror/image_replayer/journal/Replayer.h create mode 100644 src/tools/rbd_mirror/image_replayer/journal/StateBuilder.cc create mode 100644 src/tools/rbd_mirror/image_replayer/journal/StateBuilder.h create mode 100644 src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.cc create mode 100644 src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h (limited to 'src/tools/rbd_mirror/image_replayer/journal') diff --git a/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.cc b/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.cc new file mode 100644 index 000000000..087cf4f5f --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.cc @@ -0,0 +1,162 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "CreateLocalImageRequest.h" +#include "include/rados/librados.hpp" +#include "common/debug.h" +#include "common/dout.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/journal/Types.h" +#include "tools/rbd_mirror/PoolMetaCache.h" +#include "tools/rbd_mirror/ProgressContext.h" +#include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/image_replayer/CreateImageRequest.h" +#include "tools/rbd_mirror/image_replayer/Utils.h" +#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ + << "CreateLocalImageRequest: " << this << " " \ + << __func__ << ": " + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; + +template +void CreateLocalImageRequest::send() { + unregister_client(); +} + +template +void CreateLocalImageRequest::unregister_client() { + dout(10) << dendl; + update_progress("UNREGISTER_CLIENT"); + + auto ctx = create_context_callback< + CreateLocalImageRequest, + &CreateLocalImageRequest::handle_unregister_client>(this); + m_state_builder->remote_journaler->unregister_client(ctx); +} + +template +void CreateLocalImageRequest::handle_unregister_client(int r) { + dout(10) << "r=" << r << dendl; + if (r < 0 && r != -ENOENT) { + derr << "failed to unregister with remote journal: " << cpp_strerror(r) + << dendl; + finish(r); + return; + } + + m_state_builder->local_image_id = ""; + m_state_builder->remote_client_meta = {}; + register_client(); +} + +template +void CreateLocalImageRequest::register_client() { + ceph_assert(m_state_builder->local_image_id.empty()); + m_state_builder->local_image_id = + librbd::util::generate_image_id(m_local_io_ctx); + dout(10) << "local_image_id=" << m_state_builder->local_image_id << dendl; + update_progress("REGISTER_CLIENT"); + + librbd::journal::MirrorPeerClientMeta client_meta{ + m_state_builder->local_image_id}; + client_meta.state = librbd::journal::MIRROR_PEER_STATE_SYNCING; + + librbd::journal::ClientData client_data{client_meta}; + bufferlist client_data_bl; + encode(client_data, client_data_bl); + + auto ctx = create_context_callback< + CreateLocalImageRequest, + &CreateLocalImageRequest::handle_register_client>(this); + m_state_builder->remote_journaler->register_client(client_data_bl, ctx); +} + +template +void CreateLocalImageRequest::handle_register_client(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to register with remote journal: " << cpp_strerror(r) + << dendl; + finish(r); + return; + } + + m_state_builder->remote_client_state = cls::journal::CLIENT_STATE_CONNECTED; + m_state_builder->remote_client_meta = {m_state_builder->local_image_id}; + m_state_builder->remote_client_meta.state = + librbd::journal::MIRROR_PEER_STATE_SYNCING; + + create_local_image(); +} + +template +void CreateLocalImageRequest::create_local_image() { + dout(10) << "local_image_id=" << m_state_builder->local_image_id << dendl; + update_progress("CREATE_LOCAL_IMAGE"); + + m_remote_image_ctx->image_lock.lock_shared(); + std::string image_name = m_remote_image_ctx->name; + m_remote_image_ctx->image_lock.unlock_shared(); + + auto ctx = create_context_callback< + CreateLocalImageRequest, + &CreateLocalImageRequest::handle_create_local_image>(this); + auto request = CreateImageRequest::create( + m_threads, m_local_io_ctx, m_global_image_id, + m_state_builder->remote_mirror_uuid, image_name, + m_state_builder->local_image_id, m_remote_image_ctx, + m_pool_meta_cache, cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, ctx); + request->send(); +} +template +void CreateLocalImageRequest::handle_create_local_image(int r) { + dout(10) << "r=" << r << dendl; + + if (r == -EBADF) { + dout(5) << "image id " << m_state_builder->local_image_id << " " + << "already in-use" << dendl; + unregister_client(); + return; + } else if (r < 0) { + if (r == -ENOENT) { + dout(10) << "parent image does not exist" << dendl; + } else { + derr << "failed to create local image: " << cpp_strerror(r) << dendl; + } + finish(r); + return; + } + + finish(0); +} + +template +void CreateLocalImageRequest::update_progress( + const std::string& description) { + dout(15) << description << dendl; + if (m_progress_ctx != nullptr) { + m_progress_ctx->update_progress(description); + } +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::image_replayer::journal::CreateLocalImageRequest; diff --git a/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h b/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h new file mode 100644 index 000000000..fc776ecc3 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h @@ -0,0 +1,116 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_CREATE_LOCAL_IMAGE_REQUEST_H +#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_CREATE_LOCAL_IMAGE_REQUEST_H + +#include "include/rados/librados_fwd.hpp" +#include "tools/rbd_mirror/BaseRequest.h" +#include + +struct Context; +namespace journal { class Journaler; } +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +class PoolMetaCache; +class ProgressContext; +template struct Threads; + +namespace image_replayer { +namespace journal { + +template class StateBuilder; + +template +class CreateLocalImageRequest : public BaseRequest { +public: + typedef rbd::mirror::ProgressContext ProgressContext; + + static CreateLocalImageRequest* create( + Threads* threads, + librados::IoCtx& local_io_ctx, + ImageCtxT* remote_image_ctx, + const std::string& global_image_id, + PoolMetaCache* pool_meta_cache, + ProgressContext* progress_ctx, + StateBuilder* state_builder, + Context* on_finish) { + return new CreateLocalImageRequest(threads, local_io_ctx, remote_image_ctx, + global_image_id, pool_meta_cache, + progress_ctx, state_builder, on_finish); + } + + CreateLocalImageRequest( + Threads* threads, + librados::IoCtx& local_io_ctx, + ImageCtxT* remote_image_ctx, + const std::string& global_image_id, + PoolMetaCache* pool_meta_cache, + ProgressContext* progress_ctx, + StateBuilder* state_builder, + Context* on_finish) + : BaseRequest(on_finish), + m_threads(threads), + m_local_io_ctx(local_io_ctx), + m_remote_image_ctx(remote_image_ctx), + m_global_image_id(global_image_id), + m_pool_meta_cache(pool_meta_cache), + m_progress_ctx(progress_ctx), + m_state_builder(state_builder) { + } + + void send(); + +private: + /** + * @verbatim + * + * + * | + * v + * UNREGISTER_CLIENT < * * * * * * * * + * | * + * v * + * REGISTER_CLIENT * + * | * + * v (id exists) * + * CREATE_LOCAL_IMAGE * * * * * * * * * + * | + * v + * + * + * @endverbatim + */ + + Threads* m_threads; + librados::IoCtx& m_local_io_ctx; + ImageCtxT* m_remote_image_ctx; + std::string m_global_image_id; + PoolMetaCache* m_pool_meta_cache; + ProgressContext* m_progress_ctx; + StateBuilder* m_state_builder; + + void unregister_client(); + void handle_unregister_client(int r); + + void register_client(); + void handle_register_client(int r); + + void create_local_image(); + void handle_create_local_image(int r); + + void update_progress(const std::string& description); + +}; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::CreateLocalImageRequest; + +#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_CREATE_LOCAL_IMAGE_REQUEST_H diff --git a/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.cc b/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.cc new file mode 100644 index 000000000..f5d49048e --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.cc @@ -0,0 +1,206 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "EventPreprocessor.h" +#include "common/debug.h" +#include "common/dout.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/journal/Types.h" +#include + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror + +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ + << "EventPreprocessor: " << this << " " << __func__ \ + << ": " + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +using librbd::util::create_context_callback; + +template +EventPreprocessor::EventPreprocessor(I &local_image_ctx, + Journaler &remote_journaler, + const std::string &local_mirror_uuid, + MirrorPeerClientMeta *client_meta, + librbd::asio::ContextWQ *work_queue) + : m_local_image_ctx(local_image_ctx), m_remote_journaler(remote_journaler), + m_local_mirror_uuid(local_mirror_uuid), m_client_meta(client_meta), + m_work_queue(work_queue) { +} + +template +EventPreprocessor::~EventPreprocessor() { + ceph_assert(!m_in_progress); +} + +template +bool EventPreprocessor::is_required(const EventEntry &event_entry) { + SnapSeqs snap_seqs(m_client_meta->snap_seqs); + return (prune_snap_map(&snap_seqs) || + event_entry.get_event_type() == + librbd::journal::EVENT_TYPE_SNAP_RENAME); +} + +template +void EventPreprocessor::preprocess(EventEntry *event_entry, + Context *on_finish) { + ceph_assert(!m_in_progress); + m_in_progress = true; + m_event_entry = event_entry; + m_on_finish = on_finish; + + refresh_image(); +} + +template +void EventPreprocessor::refresh_image() { + dout(20) << dendl; + + Context *ctx = create_context_callback< + EventPreprocessor, &EventPreprocessor::handle_refresh_image>(this); + m_local_image_ctx.state->refresh(ctx); +} + +template +void EventPreprocessor::handle_refresh_image(int r) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "error encountered during image refresh: " << cpp_strerror(r) + << dendl; + finish(r); + return; + } + + preprocess_event(); +} + +template +void EventPreprocessor::preprocess_event() { + dout(20) << dendl; + + m_snap_seqs = m_client_meta->snap_seqs; + m_snap_seqs_updated = prune_snap_map(&m_snap_seqs); + + int r = boost::apply_visitor(PreprocessEventVisitor(this), + m_event_entry->event); + if (r < 0) { + finish(r); + return; + } + + update_client(); +} + +template +int EventPreprocessor::preprocess_snap_rename( + librbd::journal::SnapRenameEvent &event) { + dout(20) << "remote_snap_id=" << event.snap_id << ", " + << "src_snap_name=" << event.src_snap_name << ", " + << "dest_snap_name=" << event.dst_snap_name << dendl; + + auto snap_seq_it = m_snap_seqs.find(event.snap_id); + if (snap_seq_it != m_snap_seqs.end()) { + dout(20) << "remapping remote snap id " << snap_seq_it->first << " " + << "to local snap id " << snap_seq_it->second << dendl; + event.snap_id = snap_seq_it->second; + return 0; + } + + auto snap_id_it = m_local_image_ctx.snap_ids.find({cls::rbd::UserSnapshotNamespace(), + event.src_snap_name}); + if (snap_id_it == m_local_image_ctx.snap_ids.end()) { + dout(20) << "cannot map remote snapshot '" << event.src_snap_name << "' " + << "to local snapshot" << dendl; + event.snap_id = CEPH_NOSNAP; + return -ENOENT; + } + + dout(20) << "mapping remote snap id " << event.snap_id << " " + << "to local snap id " << snap_id_it->second << dendl; + m_snap_seqs_updated = true; + m_snap_seqs[event.snap_id] = snap_id_it->second; + event.snap_id = snap_id_it->second; + return 0; +} + +template +void EventPreprocessor::update_client() { + if (!m_snap_seqs_updated) { + finish(0); + return; + } + + dout(20) << dendl; + librbd::journal::MirrorPeerClientMeta client_meta(*m_client_meta); + client_meta.snap_seqs = m_snap_seqs; + + librbd::journal::ClientData client_data(client_meta); + bufferlist data_bl; + encode(client_data, data_bl); + + Context *ctx = create_context_callback< + EventPreprocessor, &EventPreprocessor::handle_update_client>( + this); + m_remote_journaler.update_client(data_bl, ctx); +} + +template +void EventPreprocessor::handle_update_client(int r) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to update mirror peer journal client: " + << cpp_strerror(r) << dendl; + finish(r); + return; + } + + m_client_meta->snap_seqs = m_snap_seqs; + finish(0); +} + +template +bool EventPreprocessor::prune_snap_map(SnapSeqs *snap_seqs) { + bool pruned = false; + + std::shared_lock image_locker{m_local_image_ctx.image_lock}; + for (auto it = snap_seqs->begin(); it != snap_seqs->end(); ) { + auto current_it(it++); + if (m_local_image_ctx.snap_info.count(current_it->second) == 0) { + snap_seqs->erase(current_it); + pruned = true; + } + } + return pruned; +} + +template +void EventPreprocessor::finish(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = m_on_finish; + m_on_finish = nullptr; + m_event_entry = nullptr; + m_in_progress = false; + m_snap_seqs_updated = false; + m_work_queue->queue(on_finish, r); +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::image_replayer::journal::EventPreprocessor; diff --git a/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h b/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h new file mode 100644 index 000000000..12f70eb93 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h @@ -0,0 +1,127 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_EVENT_PREPROCESSOR_H +#define RBD_MIRROR_IMAGE_REPLAYER_EVENT_PREPROCESSOR_H + +#include "include/int_types.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include +#include +#include + +struct Context; +namespace journal { class Journaler; } +namespace librbd { +class ImageCtx; +namespace asio { struct ContextWQ; } +} // namespace librbd + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +template +class EventPreprocessor { +public: + using Journaler = typename librbd::journal::TypeTraits::Journaler; + using EventEntry = librbd::journal::EventEntry; + using MirrorPeerClientMeta = librbd::journal::MirrorPeerClientMeta; + + static EventPreprocessor *create(ImageCtxT &local_image_ctx, + Journaler &remote_journaler, + const std::string &local_mirror_uuid, + MirrorPeerClientMeta *client_meta, + librbd::asio::ContextWQ *work_queue) { + return new EventPreprocessor(local_image_ctx, remote_journaler, + local_mirror_uuid, client_meta, work_queue); + } + + static void destroy(EventPreprocessor* processor) { + delete processor; + } + + EventPreprocessor(ImageCtxT &local_image_ctx, Journaler &remote_journaler, + const std::string &local_mirror_uuid, + MirrorPeerClientMeta *client_meta, + librbd::asio::ContextWQ *work_queue); + ~EventPreprocessor(); + + bool is_required(const EventEntry &event_entry); + void preprocess(EventEntry *event_entry, Context *on_finish); + +private: + /** + * @verbatim + * + * + * | + * v (skip if not required) + * REFRESH_IMAGE + * | + * v (skip if not required) + * PREPROCESS_EVENT + * | + * v (skip if not required) + * UPDATE_CLIENT + * + * @endverbatim + */ + + typedef std::map SnapSeqs; + + class PreprocessEventVisitor : public boost::static_visitor { + public: + EventPreprocessor *event_preprocessor; + + PreprocessEventVisitor(EventPreprocessor *event_preprocessor) + : event_preprocessor(event_preprocessor) { + } + + template + inline int operator()(T&) const { + return 0; + } + inline int operator()(librbd::journal::SnapRenameEvent &event) const { + return event_preprocessor->preprocess_snap_rename(event); + } + }; + + ImageCtxT &m_local_image_ctx; + Journaler &m_remote_journaler; + std::string m_local_mirror_uuid; + MirrorPeerClientMeta *m_client_meta; + librbd::asio::ContextWQ *m_work_queue; + + bool m_in_progress = false; + EventEntry *m_event_entry = nullptr; + Context *m_on_finish = nullptr; + + SnapSeqs m_snap_seqs; + bool m_snap_seqs_updated = false; + + bool prune_snap_map(SnapSeqs *snap_seqs); + + void refresh_image(); + void handle_refresh_image(int r); + + void preprocess_event(); + int preprocess_snap_rename(librbd::journal::SnapRenameEvent &event); + + void update_client(); + void handle_update_client(int r); + + void finish(int r); + +}; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::EventPreprocessor; + +#endif // RBD_MIRROR_IMAGE_REPLAYER_EVENT_PREPROCESSOR_H diff --git a/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.cc b/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.cc new file mode 100644 index 000000000..c8a96a4ad --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.cc @@ -0,0 +1,316 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "PrepareReplayRequest.h" +#include "common/debug.h" +#include "common/dout.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" +#include "tools/rbd_mirror/ProgressContext.h" +#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ + << "PrepareReplayRequest: " << this << " " \ + << __func__ << ": " + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +using librbd::util::create_context_callback; + +template +void PrepareReplayRequest::send() { + *m_resync_requested = false; + *m_syncing = false; + + if (m_state_builder->local_image_id != + m_state_builder->remote_client_meta.image_id) { + // somehow our local image has a different image id than the image id + // registered in the remote image + derr << "split-brain detected: local_image_id=" + << m_state_builder->local_image_id << ", " + << "registered local_image_id=" + << m_state_builder->remote_client_meta.image_id << dendl; + finish(-EEXIST); + return; + } + + std::shared_lock image_locker(m_state_builder->local_image_ctx->image_lock); + if (m_state_builder->local_image_ctx->journal == nullptr) { + image_locker.unlock(); + + derr << "local image does not support journaling" << dendl; + finish(-EINVAL); + return; + } + + int r = m_state_builder->local_image_ctx->journal->is_resync_requested( + m_resync_requested); + if (r < 0) { + image_locker.unlock(); + + derr << "failed to check if a resync was requested" << dendl; + finish(r); + return; + } + + m_local_tag_tid = m_state_builder->local_image_ctx->journal->get_tag_tid(); + m_local_tag_data = m_state_builder->local_image_ctx->journal->get_tag_data(); + dout(10) << "local tag=" << m_local_tag_tid << ", " + << "local tag data=" << m_local_tag_data << dendl; + image_locker.unlock(); + + if (*m_resync_requested) { + finish(0); + return; + } else if (m_state_builder->remote_client_meta.state == + librbd::journal::MIRROR_PEER_STATE_SYNCING && + m_local_tag_data.mirror_uuid == + m_state_builder->remote_mirror_uuid) { + // if the initial sync hasn't completed, we cannot replay + *m_syncing = true; + finish(0); + return; + } + + update_client_state(); +} + +template +void PrepareReplayRequest::update_client_state() { + if (m_state_builder->remote_client_meta.state != + librbd::journal::MIRROR_PEER_STATE_SYNCING || + m_local_tag_data.mirror_uuid == m_state_builder->remote_mirror_uuid) { + get_remote_tag_class(); + return; + } + + // our local image is not primary, is flagged as syncing on the remote side, + // but is no longer tied to the remote -- this implies we were forced + // promoted and then demoted at some point + dout(15) << dendl; + update_progress("UPDATE_CLIENT_STATE"); + + auto client_meta = m_state_builder->remote_client_meta; + client_meta.state = librbd::journal::MIRROR_PEER_STATE_REPLAYING; + + librbd::journal::ClientData client_data(client_meta); + bufferlist data_bl; + encode(client_data, data_bl); + + auto ctx = create_context_callback< + PrepareReplayRequest, + &PrepareReplayRequest::handle_update_client_state>(this); + m_state_builder->remote_journaler->update_client(data_bl, ctx); +} + +template +void PrepareReplayRequest::handle_update_client_state(int r) { + dout(15) << "r=" << r << dendl; + if (r < 0) { + derr << "failed to update client: " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + m_state_builder->remote_client_meta.state = + librbd::journal::MIRROR_PEER_STATE_REPLAYING; + get_remote_tag_class(); +} + +template +void PrepareReplayRequest::get_remote_tag_class() { + dout(10) << dendl; + update_progress("GET_REMOTE_TAG_CLASS"); + + auto ctx = create_context_callback< + PrepareReplayRequest, + &PrepareReplayRequest::handle_get_remote_tag_class>(this); + m_state_builder->remote_journaler->get_client( + librbd::Journal<>::IMAGE_CLIENT_ID, &m_client, ctx); +} + +template +void PrepareReplayRequest::handle_get_remote_tag_class(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to retrieve remote client: " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + librbd::journal::ClientData client_data; + auto it = m_client.data.cbegin(); + try { + decode(client_data, it); + } catch (const buffer::error &err) { + derr << "failed to decode remote client meta data: " << err.what() + << dendl; + finish(-EBADMSG); + return; + } + + librbd::journal::ImageClientMeta *client_meta = + boost::get(&client_data.client_meta); + if (client_meta == nullptr) { + derr << "unknown remote client registration" << dendl; + finish(-EINVAL); + return; + } + + m_remote_tag_class = client_meta->tag_class; + dout(10) << "remote tag class=" << m_remote_tag_class << dendl; + + get_remote_tags(); +} + +template +void PrepareReplayRequest::get_remote_tags() { + dout(10) << dendl; + update_progress("GET_REMOTE_TAGS"); + + auto ctx = create_context_callback< + PrepareReplayRequest, + &PrepareReplayRequest::handle_get_remote_tags>(this); + m_state_builder->remote_journaler->get_tags(m_remote_tag_class, + &m_remote_tags, ctx); +} + +template +void PrepareReplayRequest::handle_get_remote_tags(int r) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to retrieve remote tags: " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + // At this point, the local image was existing, non-primary, and replaying; + // and the remote image is primary. Attempt to link the local image's most + // recent tag to the remote image's tag chain. + bool remote_tag_data_valid = false; + librbd::journal::TagData remote_tag_data; + boost::optional remote_orphan_tag_tid = + boost::make_optional(false, 0U); + bool reconnect_orphan = false; + + // decode the remote tags + for (auto &remote_tag : m_remote_tags) { + if (m_local_tag_data.predecessor.commit_valid && + m_local_tag_data.predecessor.mirror_uuid == + m_state_builder->remote_mirror_uuid && + m_local_tag_data.predecessor.tag_tid > remote_tag.tid) { + dout(10) << "skipping processed predecessor remote tag " + << remote_tag.tid << dendl; + continue; + } + + try { + auto it = remote_tag.data.cbegin(); + decode(remote_tag_data, it); + remote_tag_data_valid = true; + } catch (const buffer::error &err) { + derr << "failed to decode remote tag " << remote_tag.tid << ": " + << err.what() << dendl; + finish(-EBADMSG); + return; + } + + dout(10) << "decoded remote tag " << remote_tag.tid << ": " + << remote_tag_data << dendl; + + if (!m_local_tag_data.predecessor.commit_valid) { + // newly synced local image (no predecessor) replays from the first tag + if (remote_tag_data.mirror_uuid != librbd::Journal<>::LOCAL_MIRROR_UUID) { + dout(10) << "skipping non-primary remote tag" << dendl; + continue; + } + + dout(10) << "using initial primary remote tag" << dendl; + break; + } + + if (m_local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) { + // demotion last available local epoch + + if (remote_tag_data.mirror_uuid == m_local_tag_data.mirror_uuid && + remote_tag_data.predecessor.commit_valid && + remote_tag_data.predecessor.tag_tid == + m_local_tag_data.predecessor.tag_tid) { + // demotion matches remote epoch + + if (remote_tag_data.predecessor.mirror_uuid == m_local_mirror_uuid && + m_local_tag_data.predecessor.mirror_uuid == + librbd::Journal<>::LOCAL_MIRROR_UUID) { + // local demoted and remote has matching event + dout(10) << "found matching local demotion tag" << dendl; + remote_orphan_tag_tid = remote_tag.tid; + continue; + } + + if (m_local_tag_data.predecessor.mirror_uuid == + m_state_builder->remote_mirror_uuid && + remote_tag_data.predecessor.mirror_uuid == + librbd::Journal<>::LOCAL_MIRROR_UUID) { + // remote demoted and local has matching event + dout(10) << "found matching remote demotion tag" << dendl; + remote_orphan_tag_tid = remote_tag.tid; + continue; + } + } + + if (remote_tag_data.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID && + remote_tag_data.predecessor.mirror_uuid == + librbd::Journal<>::ORPHAN_MIRROR_UUID && + remote_tag_data.predecessor.commit_valid && remote_orphan_tag_tid && + remote_tag_data.predecessor.tag_tid == *remote_orphan_tag_tid) { + // remote promotion tag chained to remote/local demotion tag + dout(10) << "found chained remote promotion tag" << dendl; + reconnect_orphan = true; + break; + } + + // promotion must follow demotion + remote_orphan_tag_tid = boost::none; + } + } + + if (remote_tag_data_valid && + m_local_tag_data.mirror_uuid == m_state_builder->remote_mirror_uuid) { + dout(10) << "local image is in clean replay state" << dendl; + } else if (reconnect_orphan) { + dout(10) << "remote image was demoted/promoted" << dendl; + } else { + derr << "split-brain detected -- skipping image replay" << dendl; + finish(-EEXIST); + return; + } + + finish(0); +} + +template +void PrepareReplayRequest::update_progress(const std::string &description) { + dout(10) << description << dendl; + + if (m_progress_ctx != nullptr) { + m_progress_ctx->update_progress(description); + } +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::image_replayer::journal::PrepareReplayRequest; diff --git a/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h b/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h new file mode 100644 index 000000000..2b6fb659b --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h @@ -0,0 +1,115 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_PREPARE_REPLAY_REQUEST_H +#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_PREPARE_REPLAY_REQUEST_H + +#include "include/int_types.h" +#include "cls/journal/cls_journal_types.h" +#include "librbd/journal/Types.h" +#include "librbd/mirror/Types.h" +#include "tools/rbd_mirror/BaseRequest.h" +#include +#include + +struct Context; +namespace librbd { struct ImageCtx; } + +namespace rbd { +namespace mirror { + +class ProgressContext; + +namespace image_replayer { +namespace journal { + +template class StateBuilder; + +template +class PrepareReplayRequest : public BaseRequest { +public: + static PrepareReplayRequest* create( + const std::string& local_mirror_uuid, + ProgressContext* progress_ctx, + StateBuilder* state_builder, + bool* resync_requested, + bool* syncing, + Context* on_finish) { + return new PrepareReplayRequest( + local_mirror_uuid, progress_ctx, state_builder, resync_requested, + syncing, on_finish); + } + + PrepareReplayRequest( + const std::string& local_mirror_uuid, + ProgressContext* progress_ctx, + StateBuilder* state_builder, + bool* resync_requested, + bool* syncing, + Context* on_finish) + : BaseRequest(on_finish), + m_local_mirror_uuid(local_mirror_uuid), + m_progress_ctx(progress_ctx), + m_state_builder(state_builder), + m_resync_requested(resync_requested), + m_syncing(syncing) { + } + + void send() override; + +private: + /** + * @verbatim + * + * + * | + * v + * UPDATE_CLIENT_STATE + * | + * v + * GET_REMOTE_TAG_CLASS + * | + * v + * GET_REMOTE_TAGS + * | + * v + * + * + * @endverbatim + */ + typedef std::list Tags; + + std::string m_local_mirror_uuid; + ProgressContext* m_progress_ctx; + StateBuilder* m_state_builder; + bool* m_resync_requested; + bool* m_syncing; + + uint64_t m_local_tag_tid = 0; + librbd::journal::TagData m_local_tag_data; + + uint64_t m_remote_tag_class = 0; + Tags m_remote_tags; + cls::journal::Client m_client; + + void update_client_state(); + void handle_update_client_state(int r); + + void get_remote_tag_class(); + void handle_get_remote_tag_class(int r); + + void get_remote_tags(); + void handle_get_remote_tags(int r); + + void update_progress(const std::string& description); + +}; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::PrepareReplayRequest; + +#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_PREPARE_REPLAY_REQUEST_H diff --git a/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc new file mode 100644 index 000000000..eb99d5add --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.cc @@ -0,0 +1,284 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ReplayStatusFormatter.h" +#include "common/debug.h" +#include "common/dout.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "json_spirit/json_spirit.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ + << "ReplayStatusFormatter: " << this << " " \ + << __func__ << ": " + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +using librbd::util::unique_lock_name; + +namespace { + +double round_to_two_places(double value) { + return abs(round(value * 100) / 100); +} + +json_spirit::mObject to_json_object( + const cls::journal::ObjectPosition& position) { + json_spirit::mObject object; + if (position != cls::journal::ObjectPosition{}) { + object["object_number"] = position.object_number; + object["tag_tid"] = position.tag_tid; + object["entry_tid"] = position.entry_tid; + } + return object; +} + +} // anonymous namespace + +template +ReplayStatusFormatter::ReplayStatusFormatter(Journaler *journaler, + const std::string &mirror_uuid) + : m_journaler(journaler), + m_mirror_uuid(mirror_uuid), + m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) { +} + +template +void ReplayStatusFormatter::handle_entry_processed(uint32_t bytes) { + dout(20) << dendl; + + m_bytes_per_second(bytes); + m_entries_per_second(1); +} + +template +bool ReplayStatusFormatter::get_or_send_update(std::string *description, + Context *on_finish) { + dout(20) << dendl; + + bool in_progress = false; + { + std::lock_guard locker{m_lock}; + if (m_on_finish) { + in_progress = true; + } else { + m_on_finish = on_finish; + } + } + + if (in_progress) { + dout(10) << "previous request is still in progress, ignoring" << dendl; + on_finish->complete(-EAGAIN); + return false; + } + + m_master_position = cls::journal::ObjectPosition(); + m_mirror_position = cls::journal::ObjectPosition(); + + cls::journal::Client master_client, mirror_client; + int r; + + r = m_journaler->get_cached_client(librbd::Journal<>::IMAGE_CLIENT_ID, + &master_client); + if (r < 0) { + derr << "error retrieving registered master client: " + << cpp_strerror(r) << dendl; + } else { + r = m_journaler->get_cached_client(m_mirror_uuid, &mirror_client); + if (r < 0) { + derr << "error retrieving registered mirror client: " + << cpp_strerror(r) << dendl; + } + } + + if (!master_client.commit_position.object_positions.empty()) { + m_master_position = + *(master_client.commit_position.object_positions.begin()); + } + + if (!mirror_client.commit_position.object_positions.empty()) { + m_mirror_position = + *(mirror_client.commit_position.object_positions.begin()); + } + + if (!calculate_behind_master_or_send_update()) { + dout(20) << "need to update tag cache" << dendl; + return false; + } + + format(description); + + { + std::lock_guard locker{m_lock}; + ceph_assert(m_on_finish == on_finish); + m_on_finish = nullptr; + } + + on_finish->complete(-EEXIST); + return true; +} + +template +bool ReplayStatusFormatter::calculate_behind_master_or_send_update() { + dout(20) << "m_master_position=" << m_master_position + << ", m_mirror_position=" << m_mirror_position << dendl; + + m_entries_behind_master = 0; + + if (m_master_position == cls::journal::ObjectPosition() || + m_master_position.tag_tid < m_mirror_position.tag_tid) { + return true; + } + + cls::journal::ObjectPosition master = m_master_position; + uint64_t mirror_tag_tid = m_mirror_position.tag_tid; + + while (master.tag_tid > mirror_tag_tid) { + auto tag_it = m_tag_cache.find(master.tag_tid); + if (tag_it == m_tag_cache.end()) { + send_update_tag_cache(master.tag_tid, mirror_tag_tid); + return false; + } + librbd::journal::TagData &tag_data = tag_it->second; + m_entries_behind_master += master.entry_tid; + master = {0, tag_data.predecessor.tag_tid, tag_data.predecessor.entry_tid}; + } + if (master.tag_tid == mirror_tag_tid && + master.entry_tid > m_mirror_position.entry_tid) { + m_entries_behind_master += master.entry_tid - m_mirror_position.entry_tid; + } + + dout(20) << "clearing tags not needed any more (below mirror position)" + << dendl; + + uint64_t tag_tid = mirror_tag_tid; + size_t old_size = m_tag_cache.size(); + while (tag_tid != 0) { + auto tag_it = m_tag_cache.find(tag_tid); + if (tag_it == m_tag_cache.end()) { + break; + } + librbd::journal::TagData &tag_data = tag_it->second; + + dout(20) << "erasing tag " << tag_data << "for tag_tid " << tag_tid + << dendl; + + tag_tid = tag_data.predecessor.tag_tid; + m_tag_cache.erase(tag_it); + } + + dout(20) << old_size - m_tag_cache.size() << " entries cleared" << dendl; + + return true; +} + +template +void ReplayStatusFormatter::send_update_tag_cache(uint64_t master_tag_tid, + uint64_t mirror_tag_tid) { + if (master_tag_tid <= mirror_tag_tid || + m_tag_cache.find(master_tag_tid) != m_tag_cache.end()) { + Context *on_finish = nullptr; + { + std::lock_guard locker{m_lock}; + std::swap(m_on_finish, on_finish); + } + + ceph_assert(on_finish); + on_finish->complete(0); + return; + } + + dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid=" + << mirror_tag_tid << dendl; + + auto ctx = new LambdaContext( + [this, master_tag_tid, mirror_tag_tid](int r) { + handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r); + }); + m_journaler->get_tag(master_tag_tid, &m_tag, ctx); +} + +template +void ReplayStatusFormatter::handle_update_tag_cache(uint64_t master_tag_tid, + uint64_t mirror_tag_tid, + int r) { + librbd::journal::TagData tag_data; + + if (r < 0) { + derr << "error retrieving tag " << master_tag_tid << ": " << cpp_strerror(r) + << dendl; + } else { + dout(20) << "retrieved tag " << master_tag_tid << ": " << m_tag << dendl; + + auto it = m_tag.data.cbegin(); + try { + decode(tag_data, it); + } catch (const buffer::error &err) { + derr << "error decoding tag " << master_tag_tid << ": " << err.what() + << dendl; + } + } + + if (tag_data.predecessor.mirror_uuid != + librbd::Journal<>::LOCAL_MIRROR_UUID && + tag_data.predecessor.mirror_uuid != + librbd::Journal<>::ORPHAN_MIRROR_UUID) { + dout(20) << "hit remote image non-primary epoch" << dendl; + tag_data.predecessor = {}; + } + + dout(20) << "decoded tag " << master_tag_tid << ": " << tag_data << dendl; + + m_tag_cache[master_tag_tid] = tag_data; + send_update_tag_cache(tag_data.predecessor.tag_tid, mirror_tag_tid); +} + +template +void ReplayStatusFormatter::format(std::string *description) { + dout(20) << "m_master_position=" << m_master_position + << ", m_mirror_position=" << m_mirror_position + << ", m_entries_behind_master=" << m_entries_behind_master << dendl; + + json_spirit::mObject root_obj; + root_obj["primary_position"] = to_json_object(m_master_position); + root_obj["non_primary_position"] = to_json_object(m_mirror_position); + root_obj["entries_behind_primary"] = ( + m_entries_behind_master > 0 ? m_entries_behind_master : 0); + + m_bytes_per_second(0); + root_obj["bytes_per_second"] = round_to_two_places( + m_bytes_per_second.get_average()); + + m_entries_per_second(0); + auto entries_per_second = m_entries_per_second.get_average(); + root_obj["entries_per_second"] = round_to_two_places(entries_per_second); + + if (m_entries_behind_master > 0 && entries_per_second > 0) { + std::uint64_t seconds_until_synced = round_to_two_places( + m_entries_behind_master / entries_per_second); + if (seconds_until_synced >= std::numeric_limits::max()) { + seconds_until_synced = std::numeric_limits::max(); + } + + root_obj["seconds_until_synced"] = seconds_until_synced; + } + + *description = json_spirit::write( + root_obj, json_spirit::remove_trailing_zeros); +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter; diff --git a/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h new file mode 100644 index 000000000..5dbbfe10d --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h @@ -0,0 +1,70 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H +#define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H + +#include "include/Context.h" +#include "common/ceph_mutex.h" +#include "cls/journal/cls_journal_types.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include "tools/rbd_mirror/image_replayer/TimeRollingMean.h" + +namespace journal { class Journaler; } +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +template +class ReplayStatusFormatter { +public: + typedef typename librbd::journal::TypeTraits::Journaler Journaler; + + static ReplayStatusFormatter* create(Journaler *journaler, + const std::string &mirror_uuid) { + return new ReplayStatusFormatter(journaler, mirror_uuid); + } + + static void destroy(ReplayStatusFormatter* formatter) { + delete formatter; + } + + ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid); + + void handle_entry_processed(uint32_t bytes); + + bool get_or_send_update(std::string *description, Context *on_finish); + +private: + Journaler *m_journaler; + std::string m_mirror_uuid; + ceph::mutex m_lock; + Context *m_on_finish = nullptr; + cls::journal::ObjectPosition m_master_position; + cls::journal::ObjectPosition m_mirror_position; + int64_t m_entries_behind_master = 0; + cls::journal::Tag m_tag; + std::map m_tag_cache; + + TimeRollingMean m_bytes_per_second; + TimeRollingMean m_entries_per_second; + + bool calculate_behind_master_or_send_update(); + void send_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid); + void handle_update_tag_cache(uint64_t master_tag_tid, uint64_t mirror_tag_tid, + int r); + void format(std::string *description); +}; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::ReplayStatusFormatter; + +#endif // RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc new file mode 100644 index 000000000..3ce9104d2 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.cc @@ -0,0 +1,1303 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Replayer.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/journal/Replay.h" +#include "journal/Journaler.h" +#include "journal/JournalMetadataListener.h" +#include "journal/ReplayHandler.h" +#include "tools/rbd_mirror/Threads.h" +#include "tools/rbd_mirror/Types.h" +#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" +#include "tools/rbd_mirror/image_replayer/ReplayerListener.h" +#include "tools/rbd_mirror/image_replayer/Utils.h" +#include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h" +#include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h" +#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ + << "Replayer: " << this << " " << __func__ << ": " + +extern PerfCounters *g_journal_perf_counters; + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +namespace { + +uint32_t calculate_replay_delay(const utime_t &event_time, + int mirroring_replay_delay) { + if (mirroring_replay_delay <= 0) { + return 0; + } + + utime_t now = ceph_clock_now(); + if (event_time + mirroring_replay_delay <= now) { + return 0; + } + + // ensure it is rounded up when converting to integer + return (event_time + mirroring_replay_delay - now) + 1; +} + +} // anonymous namespace + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; + +template +struct Replayer::C_ReplayCommitted : public Context { + Replayer* replayer; + ReplayEntry replay_entry; + uint64_t replay_bytes; + utime_t replay_start_time; + + C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry, + uint64_t replay_bytes, const utime_t &replay_start_time) + : replayer(replayer), replay_entry(std::move(replay_entry)), + replay_bytes(replay_bytes), replay_start_time(replay_start_time) { + } + + void finish(int r) override { + replayer->handle_process_entry_safe(replay_entry, replay_bytes, + replay_start_time, r); + } +}; + +template +struct Replayer::RemoteJournalerListener + : public ::journal::JournalMetadataListener { + Replayer* replayer; + + RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {} + + void handle_update(::journal::JournalMetadata*) override { + auto ctx = new C_TrackedOp( + replayer->m_in_flight_op_tracker, + new LambdaContext([this](int r) { + replayer->handle_remote_journal_metadata_updated(); + })); + replayer->m_threads->work_queue->queue(ctx, 0); + } +}; + +template +struct Replayer::RemoteReplayHandler : public ::journal::ReplayHandler { + Replayer* replayer; + + RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {} + ~RemoteReplayHandler() override {}; + + void handle_entries_available() override { + replayer->handle_replay_ready(); + } + + void handle_complete(int r) override { + std::string error; + if (r == -ENOMEM) { + error = "not enough memory in autotune cache"; + } else if (r < 0) { + error = "replay completed with error: " + cpp_strerror(r); + } + replayer->handle_replay_complete(r, error); + } +}; + +template +struct Replayer::LocalJournalListener + : public librbd::journal::Listener { + Replayer* replayer; + + LocalJournalListener(Replayer* replayer) : replayer(replayer) { + } + + void handle_close() override { + replayer->handle_replay_complete(0, ""); + } + + void handle_promoted() override { + replayer->handle_replay_complete(0, "force promoted"); + } + + void handle_resync() override { + replayer->handle_resync_image(); + } +}; + +template +Replayer::Replayer( + Threads* threads, + const std::string& local_mirror_uuid, + StateBuilder* state_builder, + ReplayerListener* replayer_listener) + : m_threads(threads), + m_local_mirror_uuid(local_mirror_uuid), + m_state_builder(state_builder), + m_replayer_listener(replayer_listener), + m_lock(ceph::make_mutex(librbd::util::unique_lock_name( + "rbd::mirror::image_replayer::journal::Replayer", this))) { + dout(10) << dendl; +} + +template +Replayer::~Replayer() { + dout(10) << dendl; + + { + std::unique_lock locker{m_lock}; + unregister_perf_counters(); + } + + ceph_assert(m_remote_listener == nullptr); + ceph_assert(m_local_journal_listener == nullptr); + ceph_assert(m_local_journal_replay == nullptr); + ceph_assert(m_remote_replay_handler == nullptr); + ceph_assert(m_event_preprocessor == nullptr); + ceph_assert(m_replay_status_formatter == nullptr); + ceph_assert(m_delayed_preprocess_task == nullptr); + ceph_assert(m_flush_local_replay_task == nullptr); + ceph_assert(m_state_builder->local_image_ctx == nullptr); +} + +template +void Replayer::init(Context* on_finish) { + dout(10) << dendl; + + { + auto local_image_ctx = m_state_builder->local_image_ctx; + std::shared_lock image_locker{local_image_ctx->image_lock}; + m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx, + local_image_ctx->name); + } + + { + std::unique_lock locker{m_lock}; + register_perf_counters(); + } + + ceph_assert(m_on_init_shutdown == nullptr); + m_on_init_shutdown = on_finish; + + init_remote_journaler(); +} + +template +void Replayer::shut_down(Context* on_finish) { + dout(10) << dendl; + + std::unique_lock locker{m_lock}; + ceph_assert(m_on_init_shutdown == nullptr); + m_on_init_shutdown = on_finish; + + if (m_state == STATE_INIT) { + // raced with the last piece of the init state machine + return; + } else if (m_state == STATE_REPLAYING) { + m_state = STATE_COMPLETE; + } + + // if shutting down due to an error notification, we don't + // need to propagate the same error again + m_error_code = 0; + m_error_description = ""; + + cancel_delayed_preprocess_task(); + cancel_flush_local_replay_task(); + wait_for_flush(); +} + +template +void Replayer::flush(Context* on_finish) { + dout(10) << dendl; + + flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker, on_finish)); +} + +template +bool Replayer::get_replay_status(std::string* description, + Context* on_finish) { + dout(10) << dendl; + + std::unique_lock locker{m_lock}; + if (m_replay_status_formatter == nullptr) { + derr << "replay not running" << dendl; + locker.unlock(); + + on_finish->complete(-EAGAIN); + return false; + } + + on_finish = new C_TrackedOp(m_in_flight_op_tracker, on_finish); + return m_replay_status_formatter->get_or_send_update(description, + on_finish); +} + +template +void Replayer::init_remote_journaler() { + dout(10) << dendl; + + Context *ctx = create_context_callback< + Replayer, &Replayer::handle_init_remote_journaler>(this); + m_state_builder->remote_journaler->init(ctx); +} + +template +void Replayer::handle_init_remote_journaler(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl; + handle_replay_complete(locker, r, "error initializing remote journal"); + close_local_image(); + return; + } + + // listen for metadata updates to check for disconnect events + ceph_assert(m_remote_listener == nullptr); + m_remote_listener = new RemoteJournalerListener(this); + m_state_builder->remote_journaler->add_listener(m_remote_listener); + + cls::journal::Client remote_client; + r = m_state_builder->remote_journaler->get_cached_client(m_local_mirror_uuid, + &remote_client); + if (r < 0) { + derr << "error retrieving remote journal client: " << cpp_strerror(r) + << dendl; + handle_replay_complete(locker, r, "error retrieving remote journal client"); + close_local_image(); + return; + } + + std::string error; + r = validate_remote_client_state(remote_client, + &m_state_builder->remote_client_meta, + &m_resync_requested, &error); + if (r < 0) { + handle_replay_complete(locker, r, error); + close_local_image(); + return; + } + + start_external_replay(locker); +} + +template +void Replayer::start_external_replay(std::unique_lock& locker) { + dout(10) << dendl; + + auto local_image_ctx = m_state_builder->local_image_ctx; + std::shared_lock local_image_locker{local_image_ctx->image_lock}; + + ceph_assert(m_local_journal == nullptr); + m_local_journal = local_image_ctx->journal; + if (m_local_journal == nullptr) { + local_image_locker.unlock(); + + derr << "local image journal closed" << dendl; + handle_replay_complete(locker, -EINVAL, "error accessing local journal"); + close_local_image(); + return; + } + + // safe to hold pointer to journal after external playback starts + Context *start_ctx = create_context_callback< + Replayer, &Replayer::handle_start_external_replay>(this); + m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx); +} + +template +void Replayer::handle_start_external_replay(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + ceph_assert(m_local_journal_replay == nullptr); + derr << "error starting external replay on local image " + << m_state_builder->local_image_ctx->id << ": " + << cpp_strerror(r) << dendl; + + handle_replay_complete(locker, r, "error starting replay on local image"); + close_local_image(); + return; + } + + if (!notify_init_complete(locker)) { + return; + } + + m_state = STATE_REPLAYING; + + // check for resync/promotion state after adding listener + if (!add_local_journal_listener(locker)) { + return; + } + + // start remote journal replay + m_event_preprocessor = EventPreprocessor::create( + *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler, + m_local_mirror_uuid, &m_state_builder->remote_client_meta, + m_threads->work_queue); + m_replay_status_formatter = ReplayStatusFormatter::create( + m_state_builder->remote_journaler, m_local_mirror_uuid); + + auto cct = static_cast(m_state_builder->local_image_ctx->cct); + double poll_seconds = cct->_conf.get_val( + "rbd_mirror_journal_poll_age"); + m_remote_replay_handler = new RemoteReplayHandler(this); + m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler, + poll_seconds); + + notify_status_updated(); +} + +template +bool Replayer::add_local_journal_listener( + std::unique_lock& locker) { + dout(10) << dendl; + + // listen for promotion and resync requests against local journal + ceph_assert(m_local_journal_listener == nullptr); + m_local_journal_listener = new LocalJournalListener(this); + m_local_journal->add_listener(m_local_journal_listener); + + // verify that the local image wasn't force-promoted and that a resync hasn't + // been requested now that we are listening for events + if (m_local_journal->is_tag_owner()) { + dout(10) << "local image force-promoted" << dendl; + handle_replay_complete(locker, 0, "force promoted"); + return false; + } + + bool resync_requested = false; + int r = m_local_journal->is_resync_requested(&resync_requested); + if (r < 0) { + dout(10) << "failed to determine resync state: " << cpp_strerror(r) + << dendl; + handle_replay_complete(locker, r, "error parsing resync state"); + return false; + } else if (resync_requested) { + dout(10) << "local image resync requested" << dendl; + handle_replay_complete(locker, 0, "resync requested"); + return false; + } + + return true; +} + +template +bool Replayer::notify_init_complete(std::unique_lock& locker) { + dout(10) << dendl; + + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + ceph_assert(m_state == STATE_INIT); + + // notify that init has completed + Context *on_finish = nullptr; + std::swap(m_on_init_shutdown, on_finish); + + locker.unlock(); + on_finish->complete(0); + locker.lock(); + + if (m_on_init_shutdown != nullptr) { + // shut down requested after we notified init complete but before we + // grabbed the lock + close_local_image(); + return false; + } + + return true; +} + +template +void Replayer::wait_for_flush() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + // ensure that we don't have two concurrent local journal replay shut downs + dout(10) << dendl; + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_wait_for_flush>(this)); + m_flush_tracker.wait_for_ops(ctx); +} + +template +void Replayer::handle_wait_for_flush(int r) { + dout(10) << "r=" << r << dendl; + + shut_down_local_journal_replay(); +} + +template +void Replayer::shut_down_local_journal_replay() { + std::unique_lock locker{m_lock}; + + if (m_local_journal_replay == nullptr) { + wait_for_event_replay(); + return; + } + + // It's required to stop the local journal replay state machine prior to + // waiting for the events to complete. This is to ensure that IO is properly + // flushed (it might be batched), wait for any running ops to complete, and + // to cancel any ops waiting for their associated OnFinish events. + dout(10) << dendl; + auto ctx = create_context_callback< + Replayer, &Replayer::handle_shut_down_local_journal_replay>(this); + m_local_journal_replay->shut_down(true, ctx); +} + +template +void Replayer::handle_shut_down_local_journal_replay(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl; + handle_replay_error(r, "failed to shut down local journal replay"); + } + + wait_for_event_replay(); +} + +template +void Replayer::wait_for_event_replay() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << dendl; + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_wait_for_event_replay>(this)); + m_event_replay_tracker.wait_for_ops(ctx); +} + +template +void Replayer::handle_wait_for_event_replay(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + close_local_image(); +} + +template +void Replayer::close_local_image() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + if (m_state_builder->local_image_ctx == nullptr) { + stop_remote_journaler_replay(); + return; + } + + dout(10) << dendl; + if (m_local_journal_listener != nullptr) { + // blocks if listener notification is in-progress + m_local_journal->remove_listener(m_local_journal_listener); + delete m_local_journal_listener; + m_local_journal_listener = nullptr; + } + + if (m_local_journal_replay != nullptr) { + m_local_journal->stop_external_replay(); + m_local_journal_replay = nullptr; + } + + if (m_event_preprocessor != nullptr) { + image_replayer::journal::EventPreprocessor::destroy( + m_event_preprocessor); + m_event_preprocessor = nullptr; + } + + m_local_journal.reset(); + + // NOTE: it's important to ensure that the local image is fully + // closed before attempting to close the remote journal in + // case the remote cluster is unreachable + ceph_assert(m_state_builder->local_image_ctx != nullptr); + auto ctx = create_context_callback< + Replayer, &Replayer::handle_close_local_image>(this); + auto request = image_replayer::CloseImageRequest::create( + &m_state_builder->local_image_ctx, ctx); + request->send(); +} + + +template +void Replayer::handle_close_local_image(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + derr << "error closing local iamge: " << cpp_strerror(r) << dendl; + handle_replay_error(r, "failed to close local image"); + } + + ceph_assert(m_state_builder->local_image_ctx == nullptr); + stop_remote_journaler_replay(); +} + +template +void Replayer::stop_remote_journaler_replay() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (m_state_builder->remote_journaler == nullptr) { + wait_for_in_flight_ops(); + return; + } else if (m_remote_replay_handler == nullptr) { + wait_for_in_flight_ops(); + return; + } + + dout(10) << dendl; + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_stop_remote_journaler_replay>(this)); + m_state_builder->remote_journaler->stop_replay(ctx); +} + +template +void Replayer::handle_stop_remote_journaler_replay(int r) { + dout(10) << "r=" << r << dendl; + + std::unique_lock locker{m_lock}; + if (r < 0) { + derr << "failed to stop remote journaler replay : " << cpp_strerror(r) + << dendl; + handle_replay_error(r, "failed to stop remote journaler replay"); + } + + delete m_remote_replay_handler; + m_remote_replay_handler = nullptr; + + wait_for_in_flight_ops(); +} + +template +void Replayer::wait_for_in_flight_ops() { + dout(10) << dendl; + if (m_remote_listener != nullptr) { + m_state_builder->remote_journaler->remove_listener(m_remote_listener); + delete m_remote_listener; + m_remote_listener = nullptr; + } + + auto ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_wait_for_in_flight_ops>(this)); + m_in_flight_op_tracker.wait_for_ops(ctx); +} + +template +void Replayer::handle_wait_for_in_flight_ops(int r) { + dout(10) << "r=" << r << dendl; + + ReplayStatusFormatter::destroy(m_replay_status_formatter); + m_replay_status_formatter = nullptr; + + Context* on_init_shutdown = nullptr; + { + std::unique_lock locker{m_lock}; + ceph_assert(m_on_init_shutdown != nullptr); + std::swap(m_on_init_shutdown, on_init_shutdown); + m_state = STATE_COMPLETE; + } + on_init_shutdown->complete(m_error_code); +} + +template +void Replayer::handle_remote_journal_metadata_updated() { + dout(20) << dendl; + + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + return; + } + + cls::journal::Client remote_client; + int r = m_state_builder->remote_journaler->get_cached_client( + m_local_mirror_uuid, &remote_client); + if (r < 0) { + derr << "failed to retrieve client: " << cpp_strerror(r) << dendl; + return; + } + + librbd::journal::MirrorPeerClientMeta remote_client_meta; + std::string error; + r = validate_remote_client_state(remote_client, &remote_client_meta, + &m_resync_requested, &error); + if (r < 0) { + dout(0) << "client flagged disconnected, stopping image replay" << dendl; + handle_replay_complete(locker, r, error); + } +} + +template +void Replayer::schedule_flush_local_replay_task() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + std::unique_lock timer_locker{m_threads->timer_lock}; + if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) { + return; + } + + dout(15) << dendl; + m_flush_local_replay_task = create_async_context_callback( + m_threads->work_queue, create_context_callback< + Replayer, &Replayer::handle_flush_local_replay_task>(this)); + m_threads->timer->add_event_after(30, m_flush_local_replay_task); +} + +template +void Replayer::cancel_flush_local_replay_task() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + std::unique_lock timer_locker{m_threads->timer_lock}; + if (m_flush_local_replay_task != nullptr) { + dout(10) << dendl; + m_threads->timer->cancel_event(m_flush_local_replay_task); + m_flush_local_replay_task = nullptr; + } +} + +template +void Replayer::handle_flush_local_replay_task(int) { + dout(15) << dendl; + + m_in_flight_op_tracker.start_op(); + auto on_finish = new LambdaContext([this](int) { + std::unique_lock locker{m_lock}; + + { + std::unique_lock timer_locker{m_threads->timer_lock}; + m_flush_local_replay_task = nullptr; + } + + notify_status_updated(); + m_in_flight_op_tracker.finish_op(); + }); + flush_local_replay(on_finish); +} + +template +void Replayer::flush_local_replay(Context* on_flush) { + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + locker.unlock(); + on_flush->complete(0); + return; + } else if (m_local_journal_replay == nullptr) { + // raced w/ a tag creation stop/start, which implies that + // the replay is flushed + locker.unlock(); + flush_commit_position(on_flush); + return; + } + + dout(15) << dendl; + auto ctx = new LambdaContext( + [this, on_flush](int r) { + handle_flush_local_replay(on_flush, r); + }); + m_local_journal_replay->flush(ctx); +} + +template +void Replayer::handle_flush_local_replay(Context* on_flush, int r) { + dout(15) << "r=" << r << dendl; + if (r < 0) { + derr << "error flushing local replay: " << cpp_strerror(r) << dendl; + on_flush->complete(r); + return; + } + + flush_commit_position(on_flush); +} + +template +void Replayer::flush_commit_position(Context* on_flush) { + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + locker.unlock(); + on_flush->complete(0); + return; + } + + dout(15) << dendl; + auto ctx = new LambdaContext( + [this, on_flush](int r) { + handle_flush_commit_position(on_flush, r); + }); + m_state_builder->remote_journaler->flush_commit_position(ctx); +} + +template +void Replayer::handle_flush_commit_position(Context* on_flush, int r) { + dout(15) << "r=" << r << dendl; + if (r < 0) { + derr << "error flushing remote journal commit position: " + << cpp_strerror(r) << dendl; + } + + on_flush->complete(r); +} + +template +void Replayer::handle_replay_error(int r, const std::string &error) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (m_error_code == 0) { + m_error_code = r; + m_error_description = error; + } +} + +template +bool Replayer::is_replay_complete() const { + std::unique_lock locker{m_lock}; + return is_replay_complete(locker); +} + +template +bool Replayer::is_replay_complete( + const std::unique_lock&) const { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + return (m_state == STATE_COMPLETE); +} + +template +void Replayer::handle_replay_complete(int r, const std::string &error) { + std::unique_lock locker{m_lock}; + handle_replay_complete(locker, r, error); +} + +template +void Replayer::handle_replay_complete( + const std::unique_lock&, int r, const std::string &error) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << "r=" << r << ", error=" << error << dendl; + if (r < 0) { + derr << "replay encountered an error: " << cpp_strerror(r) << dendl; + handle_replay_error(r, error); + } + + if (m_state != STATE_REPLAYING) { + return; + } + + m_state = STATE_COMPLETE; + notify_status_updated(); +} + +template +void Replayer::handle_replay_ready() { + std::unique_lock locker{m_lock}; + handle_replay_ready(locker); +} + +template +void Replayer::handle_replay_ready( + std::unique_lock& locker) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(20) << dendl; + if (is_replay_complete(locker)) { + return; + } + + if (!m_state_builder->remote_journaler->try_pop_front(&m_replay_entry, + &m_replay_tag_tid)) { + dout(20) << "no entries ready for replay" << dendl; + return; + } + + // can safely drop lock once the entry is tracked + m_event_replay_tracker.start_op(); + locker.unlock(); + + dout(20) << "entry tid=" << m_replay_entry.get_commit_tid() + << "tag_tid=" << m_replay_tag_tid << dendl; + if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) { + // must allocate a new local journal tag prior to processing + replay_flush(); + return; + } + + preprocess_entry(); +} + +template +void Replayer::replay_flush() { + dout(10) << dendl; + m_flush_tracker.start_op(); + + // shut down the replay to flush all IO and ops and create a new + // replayer to handle the new tag epoch + auto ctx = create_context_callback< + Replayer, &Replayer::handle_replay_flush_shut_down>(this); + ceph_assert(m_local_journal_replay != nullptr); + m_local_journal_replay->shut_down(false, ctx); +} + +template +void Replayer::handle_replay_flush_shut_down(int r) { + std::unique_lock locker{m_lock}; + dout(10) << "r=" << r << dendl; + + ceph_assert(m_local_journal != nullptr); + ceph_assert(m_local_journal_listener != nullptr); + + // blocks if listener notification is in-progress + m_local_journal->remove_listener(m_local_journal_listener); + delete m_local_journal_listener; + m_local_journal_listener = nullptr; + + m_local_journal->stop_external_replay(); + m_local_journal_replay = nullptr; + m_local_journal.reset(); + + if (r < 0) { + locker.unlock(); + + handle_replay_flush(r); + return; + } + + // journal might have been closed now that we stopped external replay + auto local_image_ctx = m_state_builder->local_image_ctx; + std::shared_lock local_image_locker{local_image_ctx->image_lock}; + m_local_journal = local_image_ctx->journal; + if (m_local_journal == nullptr) { + local_image_locker.unlock(); + locker.unlock(); + + derr << "local image journal closed" << dendl; + handle_replay_flush(-EINVAL); + return; + } + + auto ctx = create_context_callback< + Replayer, &Replayer::handle_replay_flush>(this); + m_local_journal->start_external_replay(&m_local_journal_replay, ctx); +} + +template +void Replayer::handle_replay_flush(int r) { + std::unique_lock locker{m_lock}; + dout(10) << "r=" << r << dendl; + m_flush_tracker.finish_op(); + + if (r < 0) { + derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl; + handle_replay_complete(locker, r, "replay flush encountered an error"); + m_event_replay_tracker.finish_op(); + return; + } else if (is_replay_complete(locker)) { + m_event_replay_tracker.finish_op(); + return; + } + + // check for resync/promotion state after adding listener + if (!add_local_journal_listener(locker)) { + m_event_replay_tracker.finish_op(); + return; + } + locker.unlock(); + + get_remote_tag(); +} + +template +void Replayer::get_remote_tag() { + dout(15) << "tag_tid: " << m_replay_tag_tid << dendl; + + Context *ctx = create_context_callback< + Replayer, &Replayer::handle_get_remote_tag>(this); + m_state_builder->remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, + ctx); +} + +template +void Replayer::handle_get_remote_tag(int r) { + dout(15) << "r=" << r << dendl; + + if (r == 0) { + try { + auto it = m_replay_tag.data.cbegin(); + decode(m_replay_tag_data, it); + } catch (const buffer::error &err) { + r = -EBADMSG; + } + } + + if (r < 0) { + derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": " + << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to retrieve remote tag"); + m_event_replay_tracker.finish_op(); + return; + } + + m_replay_tag_valid = true; + dout(15) << "decoded remote tag " << m_replay_tag_tid << ": " + << m_replay_tag_data << dendl; + + allocate_local_tag(); +} + +template +void Replayer::allocate_local_tag() { + dout(15) << dendl; + + std::string mirror_uuid = m_replay_tag_data.mirror_uuid; + if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { + mirror_uuid = m_state_builder->remote_mirror_uuid; + } else if (mirror_uuid == m_local_mirror_uuid) { + mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; + } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) { + // handle possible edge condition where daemon can failover and + // the local image has already been promoted/demoted + auto local_tag_data = m_local_journal->get_tag_data(); + if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID && + (local_tag_data.predecessor.commit_valid && + local_tag_data.predecessor.mirror_uuid == + librbd::Journal<>::LOCAL_MIRROR_UUID)) { + dout(15) << "skipping stale demotion event" << dendl; + handle_process_entry_safe(m_replay_entry, m_replay_bytes, + m_replay_start_time, 0); + handle_replay_ready(); + return; + } else { + dout(5) << "encountered image demotion: stopping" << dendl; + handle_replay_complete(0, ""); + } + } + + librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor); + if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { + predecessor.mirror_uuid = m_state_builder->remote_mirror_uuid; + } else if (predecessor.mirror_uuid == m_local_mirror_uuid) { + predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; + } + + dout(15) << "mirror_uuid=" << mirror_uuid << ", " + << "predecessor=" << predecessor << ", " + << "replay_tag_tid=" << m_replay_tag_tid << dendl; + Context *ctx = create_context_callback< + Replayer, &Replayer::handle_allocate_local_tag>(this); + m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx); +} + +template +void Replayer::handle_allocate_local_tag(int r) { + dout(15) << "r=" << r << ", " + << "tag_tid=" << m_local_journal->get_tag_tid() << dendl; + if (r < 0) { + derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to allocate journal tag"); + m_event_replay_tracker.finish_op(); + return; + } + + preprocess_entry(); +} + +template +void Replayer::preprocess_entry() { + dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid() + << dendl; + + bufferlist data = m_replay_entry.get_data(); + auto it = data.cbegin(); + int r = m_local_journal_replay->decode(&it, &m_event_entry); + if (r < 0) { + derr << "failed to decode journal event" << dendl; + handle_replay_complete(r, "failed to decode journal event"); + m_event_replay_tracker.finish_op(); + return; + } + + m_replay_bytes = data.length(); + uint32_t delay = calculate_replay_delay( + m_event_entry.timestamp, + m_state_builder->local_image_ctx->mirroring_replay_delay); + if (delay == 0) { + handle_preprocess_entry_ready(0); + return; + } + + std::unique_lock locker{m_lock}; + if (is_replay_complete(locker)) { + // don't schedule a delayed replay task if a shut-down is in-progress + m_event_replay_tracker.finish_op(); + return; + } + + dout(20) << "delaying replay by " << delay << " sec" << dendl; + std::unique_lock timer_locker{m_threads->timer_lock}; + ceph_assert(m_delayed_preprocess_task == nullptr); + m_delayed_preprocess_task = create_context_callback< + Replayer, &Replayer::handle_delayed_preprocess_task>(this); + m_threads->timer->add_event_after(delay, m_delayed_preprocess_task); +} + +template +void Replayer::handle_delayed_preprocess_task(int r) { + dout(20) << "r=" << r << dendl; + + ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock)); + m_delayed_preprocess_task = nullptr; + + m_threads->work_queue->queue(create_context_callback< + Replayer, &Replayer::handle_preprocess_entry_ready>(this), 0); +} + +template +void Replayer::handle_preprocess_entry_ready(int r) { + dout(20) << "r=" << r << dendl; + ceph_assert(r == 0); + + m_replay_start_time = ceph_clock_now(); + if (!m_event_preprocessor->is_required(m_event_entry)) { + process_entry(); + return; + } + + Context *ctx = create_context_callback< + Replayer, &Replayer::handle_preprocess_entry_safe>(this); + m_event_preprocessor->preprocess(&m_event_entry, ctx); +} + +template +void Replayer::handle_preprocess_entry_safe(int r) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + if (r == -ECANCELED) { + handle_replay_complete(0, "lost exclusive lock"); + } else { + derr << "failed to preprocess journal event" << dendl; + handle_replay_complete(r, "failed to preprocess journal event"); + } + + m_event_replay_tracker.finish_op(); + return; + } + + process_entry(); +} + +template +void Replayer::process_entry() { + dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() + << dendl; + + Context *on_ready = create_context_callback< + Replayer, &Replayer::handle_process_entry_ready>(this); + Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry), + m_replay_bytes, + m_replay_start_time); + + m_local_journal_replay->process(m_event_entry, on_ready, on_commit); +} + +template +void Replayer::handle_process_entry_ready(int r) { + std::unique_lock locker{m_lock}; + + dout(20) << dendl; + ceph_assert(r == 0); + + bool update_status = false; + { + auto local_image_ctx = m_state_builder->local_image_ctx; + std::shared_lock image_locker{local_image_ctx->image_lock}; + auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx, + local_image_ctx->name); + if (m_image_spec != image_spec) { + m_image_spec = image_spec; + update_status = true; + } + } + + m_replay_status_formatter->handle_entry_processed(m_replay_bytes); + + if (update_status) { + unregister_perf_counters(); + register_perf_counters(); + notify_status_updated(); + } + + // attempt to process the next event + handle_replay_ready(locker); +} + +template +void Replayer::handle_process_entry_safe( + const ReplayEntry &replay_entry, uint64_t replay_bytes, + const utime_t &replay_start_time, int r) { + dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r + << dendl; + + if (r < 0) { + derr << "failed to commit journal event: " << cpp_strerror(r) << dendl; + handle_replay_complete(r, "failed to commit journal event"); + } else { + ceph_assert(m_state_builder->remote_journaler != nullptr); + m_state_builder->remote_journaler->committed(replay_entry); + } + + auto latency = ceph_clock_now() - replay_start_time; + if (g_journal_perf_counters) { + g_journal_perf_counters->inc(l_rbd_mirror_replay); + g_journal_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes); + g_journal_perf_counters->tinc(l_rbd_mirror_replay_latency, latency); + } + + auto ctx = new LambdaContext( + [this, replay_bytes, latency](int r) { + std::unique_lock locker{m_lock}; + schedule_flush_local_replay_task(); + + if (m_perf_counters) { + m_perf_counters->inc(l_rbd_mirror_replay); + m_perf_counters->inc(l_rbd_mirror_replay_bytes, replay_bytes); + m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency); + } + + m_event_replay_tracker.finish_op(); + }); + m_threads->work_queue->queue(ctx, 0); +} + +template +void Replayer::handle_resync_image() { + dout(10) << dendl; + + std::unique_lock locker{m_lock}; + m_resync_requested = true; + handle_replay_complete(locker, 0, "resync requested"); +} + +template +void Replayer::notify_status_updated() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + dout(10) << dendl; + + auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext( + [this](int) { + m_replayer_listener->handle_notification(); + })); + m_threads->work_queue->queue(ctx, 0); +} + +template +void Replayer::cancel_delayed_preprocess_task() { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + bool canceled_delayed_preprocess_task = false; + { + std::unique_lock timer_locker{m_threads->timer_lock}; + if (m_delayed_preprocess_task != nullptr) { + dout(10) << dendl; + canceled_delayed_preprocess_task = m_threads->timer->cancel_event( + m_delayed_preprocess_task); + ceph_assert(canceled_delayed_preprocess_task); + m_delayed_preprocess_task = nullptr; + } + } + + if (canceled_delayed_preprocess_task) { + // wake up sleeping replay + m_event_replay_tracker.finish_op(); + } +} + +template +int Replayer::validate_remote_client_state( + const cls::journal::Client& remote_client, + librbd::journal::MirrorPeerClientMeta* remote_client_meta, + bool* resync_requested, std::string* error) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (!util::decode_client_meta(remote_client, remote_client_meta)) { + // require operator intervention since the data is corrupt + *error = "error retrieving remote journal client"; + return -EBADMSG; + } + + auto local_image_ctx = m_state_builder->local_image_ctx; + dout(5) << "image_id=" << local_image_ctx->id << ", " + << "remote_client_meta.image_id=" + << remote_client_meta->image_id << ", " + << "remote_client.state=" << remote_client.state << dendl; + if (remote_client_meta->image_id == local_image_ctx->id && + remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) { + dout(5) << "client flagged disconnected, stopping image replay" << dendl; + if (local_image_ctx->config.template get_val( + "rbd_mirroring_resync_after_disconnect")) { + dout(10) << "disconnected: automatic resync" << dendl; + *resync_requested = true; + *error = "disconnected: automatic resync"; + return -ENOTCONN; + } else { + dout(10) << "disconnected" << dendl; + *error = "disconnected"; + return -ENOTCONN; + } + } + + return 0; +} + +template +void Replayer::register_perf_counters() { + dout(5) << dendl; + + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + ceph_assert(m_perf_counters == nullptr); + + auto cct = static_cast(m_state_builder->local_image_ctx->cct); + auto prio = cct->_conf.get_val("rbd_mirror_image_perf_stats_prio"); + PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_image_" + m_image_spec, + l_rbd_mirror_journal_first, l_rbd_mirror_journal_last); + plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio); + plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes", + "Replayed data", "rb", prio, unit_t(UNIT_BYTES)); + plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency", + "Replay latency", "rl", prio); + m_perf_counters = plb.create_perf_counters(); + g_ceph_context->get_perfcounters_collection()->add(m_perf_counters); +} + +template +void Replayer::unregister_perf_counters() { + dout(5) << dendl; + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + PerfCounters *perf_counters = nullptr; + std::swap(perf_counters, m_perf_counters); + + if (perf_counters != nullptr) { + g_ceph_context->get_perfcounters_collection()->remove(perf_counters); + delete perf_counters; + } +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::image_replayer::journal::Replayer; diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.h b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h new file mode 100644 index 000000000..6b1f36d9c --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h @@ -0,0 +1,323 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H +#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H + +#include "tools/rbd_mirror/image_replayer/Replayer.h" +#include "include/utime.h" +#include "common/AsyncOpTracker.h" +#include "common/ceph_mutex.h" +#include "common/RefCountedObj.h" +#include "cls/journal/cls_journal_types.h" +#include "journal/ReplayEntry.h" +#include "librbd/ImageCtx.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include +#include + +namespace journal { class Journaler; } +namespace librbd { + +struct ImageCtx; +namespace journal { template class Replay; } + +} // namespace librbd + +namespace rbd { +namespace mirror { + +template struct Threads; + +namespace image_replayer { + +struct ReplayerListener; + +namespace journal { + +template class EventPreprocessor; +template class ReplayStatusFormatter; +template class StateBuilder; + +template +class Replayer : public image_replayer::Replayer { +public: + typedef typename librbd::journal::TypeTraits::Journaler Journaler; + + static Replayer* create( + Threads* threads, + const std::string& local_mirror_uuid, + StateBuilder* state_builder, + ReplayerListener* replayer_listener) { + return new Replayer(threads, local_mirror_uuid, state_builder, + replayer_listener); + } + + Replayer( + Threads* threads, + const std::string& local_mirror_uuid, + StateBuilder* state_builder, + ReplayerListener* replayer_listener); + ~Replayer(); + + void destroy() override { + delete this; + } + + void init(Context* on_finish) override; + void shut_down(Context* on_finish) override; + + void flush(Context* on_finish) override; + + bool get_replay_status(std::string* description, Context* on_finish) override; + + bool is_replaying() const override { + std::unique_lock locker{m_lock}; + return (m_state == STATE_REPLAYING); + } + + bool is_resync_requested() const override { + std::unique_lock locker(m_lock); + return m_resync_requested; + } + + int get_error_code() const override { + std::unique_lock locker(m_lock); + return m_error_code; + } + + std::string get_error_description() const override { + std::unique_lock locker(m_lock); + return m_error_description; + } + + std::string get_image_spec() const { + std::unique_lock locker(m_lock); + return m_image_spec; + } + +private: + /** + * @verbatim + * + * + * | + * v (error) + * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * * + * | * + * | /--------------------------------------------\ * + * | | | * + * v v (asok flush) | * + * REPLAYING -------------> LOCAL_REPLAY_FLUSH | * + * | \ | | * + * | | v | * + * | | FLUSH_COMMIT_POSITION | * + * | | | | * + * | | \--------------------/| * + * | | | * + * | | (entries available) | * + * | \-----------> REPLAY_READY | * + * | | | * + * | | (skip if not | * + * | v needed) (error) * + * | REPLAY_FLUSH * * * * * * * * * * + * | | | * * + * | | (skip if not | * * + * | v needed) (error) * * + * | GET_REMOTE_TAG * * * * * * * * * + * | | | * * + * | | (skip if not | * * + * | v needed) (error) * * + * | ALLOCATE_LOCAL_TAG * * * * * * * + * | | | * * + * | v (error) * * + * | PREPROCESS_ENTRY * * * * * * * * + * | | | * * + * | v (error) * * + * | PROCESS_ENTRY * * * * * * * * * * + * | | | * * + * | \---------------------/ * * + * v (shutdown) * * + * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * * + * | * + * v * + * WAIT_FOR_FLUSH * + * | * + * v * + * SHUT_DOWN_LOCAL_JOURNAL_REPLAY * + * | * + * v * + * WAIT_FOR_REPLAY * + * | * + * v * + * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * * + * | + * v (skip if not started) + * STOP_REMOTE_JOURNALER_REPLAY + * | + * v + * WAIT_FOR_IN_FLIGHT_OPS + * | + * v + * + * + * @endverbatim + */ + + typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; + + enum State { + STATE_INIT, + STATE_REPLAYING, + STATE_COMPLETE + }; + + struct C_ReplayCommitted; + struct RemoteJournalerListener; + struct RemoteReplayHandler; + struct LocalJournalListener; + + Threads* m_threads; + std::string m_local_mirror_uuid; + StateBuilder* m_state_builder; + ReplayerListener* m_replayer_listener; + + mutable ceph::mutex m_lock; + + std::string m_image_spec; + Context* m_on_init_shutdown = nullptr; + + State m_state = STATE_INIT; + int m_error_code = 0; + std::string m_error_description; + bool m_resync_requested = false; + + ceph::ref_t::type> + m_local_journal; + RemoteJournalerListener* m_remote_listener = nullptr; + + librbd::journal::Replay* m_local_journal_replay = nullptr; + EventPreprocessor* m_event_preprocessor = nullptr; + ReplayStatusFormatter* m_replay_status_formatter = nullptr; + RemoteReplayHandler* m_remote_replay_handler = nullptr; + LocalJournalListener* m_local_journal_listener = nullptr; + + PerfCounters *m_perf_counters = nullptr; + + ReplayEntry m_replay_entry; + uint64_t m_replay_bytes = 0; + utime_t m_replay_start_time; + bool m_replay_tag_valid = false; + uint64_t m_replay_tag_tid = 0; + cls::journal::Tag m_replay_tag; + librbd::journal::TagData m_replay_tag_data; + librbd::journal::EventEntry m_event_entry; + + AsyncOpTracker m_flush_tracker; + + AsyncOpTracker m_event_replay_tracker; + Context *m_delayed_preprocess_task = nullptr; + + AsyncOpTracker m_in_flight_op_tracker; + Context *m_flush_local_replay_task = nullptr; + + void handle_remote_journal_metadata_updated(); + + void schedule_flush_local_replay_task(); + void cancel_flush_local_replay_task(); + void handle_flush_local_replay_task(int r); + + void flush_local_replay(Context* on_flush); + void handle_flush_local_replay(Context* on_flush, int r); + + void flush_commit_position(Context* on_flush); + void handle_flush_commit_position(Context* on_flush, int r); + + void init_remote_journaler(); + void handle_init_remote_journaler(int r); + + void start_external_replay(std::unique_lock& locker); + void handle_start_external_replay(int r); + + bool add_local_journal_listener(std::unique_lock& locker); + + bool notify_init_complete(std::unique_lock& locker); + + void wait_for_flush(); + void handle_wait_for_flush(int r); + + void shut_down_local_journal_replay(); + void handle_shut_down_local_journal_replay(int r); + + void wait_for_event_replay(); + void handle_wait_for_event_replay(int r); + + void close_local_image(); + void handle_close_local_image(int r); + + void stop_remote_journaler_replay(); + void handle_stop_remote_journaler_replay(int r); + + void wait_for_in_flight_ops(); + void handle_wait_for_in_flight_ops(int r); + + void replay_flush(); + void handle_replay_flush_shut_down(int r); + void handle_replay_flush(int r); + + void get_remote_tag(); + void handle_get_remote_tag(int r); + + void allocate_local_tag(); + void handle_allocate_local_tag(int r); + + void handle_replay_error(int r, const std::string &error); + + bool is_replay_complete() const; + bool is_replay_complete(const std::unique_lock& locker) const; + + void handle_replay_complete(int r, const std::string &error_desc); + void handle_replay_complete(const std::unique_lock&, + int r, const std::string &error_desc); + void handle_replay_ready(); + void handle_replay_ready(std::unique_lock& locker); + + void preprocess_entry(); + void handle_delayed_preprocess_task(int r); + void handle_preprocess_entry_ready(int r); + void handle_preprocess_entry_safe(int r); + + void process_entry(); + void handle_process_entry_ready(int r); + void handle_process_entry_safe(const ReplayEntry& replay_entry, + uint64_t relay_bytes, + const utime_t &replay_start_time, int r); + + void handle_resync_image(); + + void notify_status_updated(); + + void cancel_delayed_preprocess_task(); + + int validate_remote_client_state( + const cls::journal::Client& remote_client, + librbd::journal::MirrorPeerClientMeta* remote_client_meta, + bool* resync_requested, std::string* error); + + void register_perf_counters(); + void unregister_perf_counters(); + +}; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::Replayer; + +#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H diff --git a/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.cc b/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.cc new file mode 100644 index 000000000..5f1fb0e2f --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.cc @@ -0,0 +1,149 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "StateBuilder.h" +#include "include/ceph_assert.h" +#include "include/Context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "tools/rbd_mirror/image_replayer/journal/CreateLocalImageRequest.h" +#include "tools/rbd_mirror/image_replayer/journal/PrepareReplayRequest.h" +#include "tools/rbd_mirror/image_replayer/journal/Replayer.h" +#include "tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ + << "StateBuilder: " << this << " " \ + << __func__ << ": " + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +template +StateBuilder::StateBuilder(const std::string& global_image_id) + : image_replayer::StateBuilder(global_image_id) { +} + +template +StateBuilder::~StateBuilder() { + ceph_assert(remote_journaler == nullptr); +} + +template +void StateBuilder::close(Context* on_finish) { + dout(10) << dendl; + + // close the remote journaler after closing the local image + // in case we have lost contact w/ the remote cluster and + // will block + on_finish = new LambdaContext([this, on_finish](int) { + shut_down_remote_journaler(on_finish); + }); + on_finish = new LambdaContext([this, on_finish](int) { + this->close_local_image(on_finish); + }); + this->close_remote_image(on_finish); +} + +template +bool StateBuilder::is_disconnected() const { + return (remote_client_state == cls::journal::CLIENT_STATE_DISCONNECTED); +} + +template +bool StateBuilder::is_linked_impl() const { + ceph_assert(!this->remote_mirror_uuid.empty()); + return (local_primary_mirror_uuid == this->remote_mirror_uuid); +} + +template +cls::rbd::MirrorImageMode StateBuilder::get_mirror_image_mode() const { + return cls::rbd::MIRROR_IMAGE_MODE_JOURNAL; +} + +template +image_sync::SyncPointHandler* StateBuilder::create_sync_point_handler() { + dout(10) << dendl; + + this->m_sync_point_handler = SyncPointHandler::create(this); + return this->m_sync_point_handler; +} + +template +BaseRequest* StateBuilder::create_local_image_request( + Threads* threads, + librados::IoCtx& local_io_ctx, + const std::string& global_image_id, + PoolMetaCache* pool_meta_cache, + ProgressContext* progress_ctx, + Context* on_finish) { + return CreateLocalImageRequest::create( + threads, local_io_ctx, this->remote_image_ctx, this->global_image_id, + pool_meta_cache, progress_ctx, this, on_finish); +} + +template +BaseRequest* StateBuilder::create_prepare_replay_request( + const std::string& local_mirror_uuid, + ProgressContext* progress_ctx, + bool* resync_requested, + bool* syncing, + Context* on_finish) { + return PrepareReplayRequest::create( + local_mirror_uuid, progress_ctx, this, resync_requested, syncing, + on_finish); +} + +template +image_replayer::Replayer* StateBuilder::create_replayer( + Threads* threads, + InstanceWatcher* instance_watcher, + const std::string& local_mirror_uuid, + PoolMetaCache* pool_meta_cache, + ReplayerListener* replayer_listener) { + return Replayer::create( + threads, local_mirror_uuid, this, replayer_listener); +} + +template +void StateBuilder::shut_down_remote_journaler(Context* on_finish) { + if (remote_journaler == nullptr) { + on_finish->complete(0); + return; + } + + dout(10) << dendl; + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_shut_down_remote_journaler(r, on_finish); + }); + remote_journaler->shut_down(ctx); +} + +template +void StateBuilder::handle_shut_down_remote_journaler(int r, + Context* on_finish) { + dout(10) << "r=" << r << dendl; + + if (r < 0) { + derr << "failed to shut down remote journaler: " << cpp_strerror(r) + << dendl; + } + + delete remote_journaler; + remote_journaler = nullptr; + on_finish->complete(r); +} + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::image_replayer::journal::StateBuilder; diff --git a/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.h b/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.h new file mode 100644 index 000000000..790d1390b --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/StateBuilder.h @@ -0,0 +1,94 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_STATE_BUILDER_H +#define CEPH_RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_STATE_BUILDER_H + +#include "tools/rbd_mirror/image_replayer/StateBuilder.h" +#include "cls/journal/cls_journal_types.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include + +struct Context; + +namespace librbd { struct ImageCtx; } + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +template class SyncPointHandler; + +template +class StateBuilder : public image_replayer::StateBuilder { +public: + typedef librbd::journal::TypeTraits TypeTraits; + typedef typename TypeTraits::Journaler Journaler; + + static StateBuilder* create(const std::string& global_image_id) { + return new StateBuilder(global_image_id); + } + + StateBuilder(const std::string& global_image_id); + ~StateBuilder() override; + + void close(Context* on_finish) override; + + bool is_disconnected() const override; + + cls::rbd::MirrorImageMode get_mirror_image_mode() const override; + + image_sync::SyncPointHandler* create_sync_point_handler() override; + + bool replay_requires_remote_image() const override { + return false; + } + + BaseRequest* create_local_image_request( + Threads* threads, + librados::IoCtx& local_io_ctx, + const std::string& global_image_id, + PoolMetaCache* pool_meta_cache, + ProgressContext* progress_ctx, + Context* on_finish) override; + + BaseRequest* create_prepare_replay_request( + const std::string& local_mirror_uuid, + ProgressContext* progress_ctx, + bool* resync_requested, + bool* syncing, + Context* on_finish) override; + + image_replayer::Replayer* create_replayer( + Threads* threads, + InstanceWatcher* instance_watcher, + const std::string& local_mirror_uuid, + PoolMetaCache* pool_meta_cache, + ReplayerListener* replayer_listener) override; + + std::string local_primary_mirror_uuid; + + Journaler* remote_journaler = nullptr; + cls::journal::ClientState remote_client_state = + cls::journal::CLIENT_STATE_CONNECTED; + librbd::journal::MirrorPeerClientMeta remote_client_meta; + + SyncPointHandler* sync_point_handler = nullptr; + +private: + bool is_linked_impl() const override; + + void shut_down_remote_journaler(Context* on_finish); + void handle_shut_down_remote_journaler(int r, Context* on_finish); +}; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::StateBuilder; + +#endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_STATE_BUILDER_H diff --git a/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.cc b/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.cc new file mode 100644 index 000000000..66d13e555 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.cc @@ -0,0 +1,109 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "SyncPointHandler.h" +#include "StateBuilder.h" +#include "include/ceph_assert.h" +#include "include/Context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "librbd/ImageCtx.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ + << "SyncPointHandler: " << this << " " \ + << __func__ << ": " + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +template +SyncPointHandler::SyncPointHandler(StateBuilder* state_builder) + : m_state_builder(state_builder), + m_client_meta_copy(state_builder->remote_client_meta) { +} + +template +typename SyncPointHandler::SyncPoints +SyncPointHandler::get_sync_points() const { + SyncPoints sync_points; + for (auto& sync_point : m_client_meta_copy.sync_points) { + sync_points.emplace_back( + sync_point.snap_namespace, + sync_point.snap_name, + sync_point.from_snap_name, + sync_point.object_number); + } + return sync_points; +} + +template +librbd::SnapSeqs SyncPointHandler::get_snap_seqs() const { + return m_client_meta_copy.snap_seqs; +} + +template +void SyncPointHandler::update_sync_points( + const librbd::SnapSeqs& snap_seqs, const SyncPoints& sync_points, + bool sync_complete, Context* on_finish) { + dout(10) << dendl; + + if (sync_complete && sync_points.empty()) { + m_client_meta_copy.state = librbd::journal::MIRROR_PEER_STATE_REPLAYING; + } + + m_client_meta_copy.snap_seqs = snap_seqs; + m_client_meta_copy.sync_points.clear(); + for (auto& sync_point : sync_points) { + m_client_meta_copy.sync_points.emplace_back( + sync_point.snap_namespace, + sync_point.snap_name, + sync_point.from_snap_name, + sync_point.object_number); + + if (sync_point.object_number) { + m_client_meta_copy.sync_object_count = std::max( + m_client_meta_copy.sync_object_count, *sync_point.object_number + 1); + } + } + + dout(20) << "client_meta=" << m_client_meta_copy << dendl; + bufferlist client_data_bl; + librbd::journal::ClientData client_data{m_client_meta_copy}; + encode(client_data, client_data_bl); + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_update_sync_points(r, on_finish); + }); + m_state_builder->remote_journaler->update_client(client_data_bl, ctx); +} + +template +void SyncPointHandler::handle_update_sync_points(int r, Context* on_finish) { + dout(10) << "r=" << r << dendl; + + if (r >= 0) { + m_state_builder->remote_client_meta.snap_seqs = + m_client_meta_copy.snap_seqs; + m_state_builder->remote_client_meta.sync_points = + m_client_meta_copy.sync_points; + } else { + derr << "failed to update remote journal client meta for image " + << m_state_builder->global_image_id << ": " << cpp_strerror(r) + << dendl; + } + + on_finish->complete(r); +} + +} // namespace journal +} // namespace image_sync +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::image_replayer::journal::SyncPointHandler; diff --git a/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h b/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h new file mode 100644 index 000000000..b4f492c19 --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/SyncPointHandler.h @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_SYNC_POINT_HANDLER_H +#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_SYNC_POINT_HANDLER_H + +#include "tools/rbd_mirror/image_sync/Types.h" +#include "librbd/journal/Types.h" + +struct Context; +namespace librbd { struct ImageCtx; } + +namespace rbd { +namespace mirror { +namespace image_replayer { +namespace journal { + +template class StateBuilder; + +template +class SyncPointHandler : public image_sync::SyncPointHandler { +public: + using SyncPoint = image_sync::SyncPoint; + using SyncPoints = image_sync::SyncPoints; + + static SyncPointHandler* create(StateBuilder* state_builder) { + return new SyncPointHandler(state_builder); + } + SyncPointHandler(StateBuilder* state_builder); + + SyncPoints get_sync_points() const override; + librbd::SnapSeqs get_snap_seqs() const override; + + void update_sync_points(const librbd::SnapSeqs& snap_seqs, + const SyncPoints& sync_points, + bool sync_complete, + Context* on_finish) override; + +private: + StateBuilder* m_state_builder; + + librbd::journal::MirrorPeerClientMeta m_client_meta_copy; + + void handle_update_sync_points(int r, Context* on_finish); + +}; + +} // namespace journal +} // namespace image_sync +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::SyncPointHandler; + +#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_SYNC_POINT_HANDLER_H -- cgit v1.2.3