// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "Replayer.h" #include "common/debug.h" #include "common/errno.h" #include "common/perf_counters.h" #include "common/perf_counters_key.h" #include "common/Timer.h" #include "librbd/Journal.h" #include "librbd/Utils.h" #include "librbd/asio/ContextWQ.h" #include "librbd/journal/Replay.h" #include "journal/Journaler.h" #include "journal/JournalMetadataListener.h" #include "journal/ReplayHandler.h" #include "tools/rbd_mirror/Threads.h" #include "tools/rbd_mirror/Types.h" #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" #include "tools/rbd_mirror/image_replayer/ReplayerListener.h" #include "tools/rbd_mirror/image_replayer/Utils.h" #include "tools/rbd_mirror/image_replayer/journal/EventPreprocessor.h" #include "tools/rbd_mirror/image_replayer/journal/ReplayStatusFormatter.h" #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror #undef dout_prefix #define dout_prefix *_dout << "rbd::mirror::image_replayer::journal::" \ << "Replayer: " << this << " " << __func__ << ": " extern PerfCounters *g_journal_perf_counters; namespace rbd { namespace mirror { namespace image_replayer { namespace journal { namespace { uint32_t calculate_replay_delay(const utime_t &event_time, int mirroring_replay_delay) { if (mirroring_replay_delay <= 0) { return 0; } utime_t now = ceph_clock_now(); if (event_time + mirroring_replay_delay <= now) { return 0; } // ensure it is rounded up when converting to integer return (event_time + mirroring_replay_delay - now) + 1; } } // anonymous namespace using librbd::util::create_async_context_callback; using librbd::util::create_context_callback; template struct Replayer::C_ReplayCommitted : public Context { Replayer* replayer; ReplayEntry replay_entry; uint64_t replay_bytes; utime_t replay_start_time; C_ReplayCommitted(Replayer* replayer, ReplayEntry &&replay_entry, uint64_t replay_bytes, const utime_t &replay_start_time) : replayer(replayer), replay_entry(std::move(replay_entry)), replay_bytes(replay_bytes), replay_start_time(replay_start_time) { } void finish(int r) override { replayer->handle_process_entry_safe(replay_entry, replay_bytes, replay_start_time, r); } }; template struct Replayer::RemoteJournalerListener : public ::journal::JournalMetadataListener { Replayer* replayer; RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {} void handle_update(::journal::JournalMetadata*) override { auto ctx = new C_TrackedOp( replayer->m_in_flight_op_tracker, new LambdaContext([this](int r) { replayer->handle_remote_journal_metadata_updated(); })); replayer->m_threads->work_queue->queue(ctx, 0); } }; template struct Replayer::RemoteReplayHandler : public ::journal::ReplayHandler { Replayer* replayer; RemoteReplayHandler(Replayer* replayer) : replayer(replayer) {} ~RemoteReplayHandler() override {}; void handle_entries_available() override { replayer->handle_replay_ready(); } void handle_complete(int r) override { std::string error; if (r == -ENOMEM) { error = "not enough memory in autotune cache"; } else if (r < 0) { error = "replay completed with error: " + cpp_strerror(r); } replayer->handle_replay_complete(r, error); } }; template struct Replayer::LocalJournalListener : public librbd::journal::Listener { Replayer* replayer; LocalJournalListener(Replayer* replayer) : replayer(replayer) { } void handle_close() override { replayer->handle_replay_complete(0, ""); } void handle_promoted() override { replayer->handle_replay_complete(0, "force promoted"); } void handle_resync() override { replayer->handle_resync_image(); } }; template Replayer::Replayer( Threads* threads, const std::string& local_mirror_uuid, StateBuilder* state_builder, ReplayerListener* replayer_listener) : m_threads(threads), m_local_mirror_uuid(local_mirror_uuid), m_state_builder(state_builder), m_replayer_listener(replayer_listener), m_lock(ceph::make_mutex(librbd::util::unique_lock_name( "rbd::mirror::image_replayer::journal::Replayer", this))) { dout(10) << dendl; } template Replayer::~Replayer() { dout(10) << dendl; { std::unique_lock locker{m_lock}; unregister_perf_counters(); } ceph_assert(m_remote_listener == nullptr); ceph_assert(m_local_journal_listener == nullptr); ceph_assert(m_local_journal_replay == nullptr); ceph_assert(m_remote_replay_handler == nullptr); ceph_assert(m_event_preprocessor == nullptr); ceph_assert(m_replay_status_formatter == nullptr); ceph_assert(m_delayed_preprocess_task == nullptr); ceph_assert(m_flush_local_replay_task == nullptr); ceph_assert(m_state_builder->local_image_ctx == nullptr); } template void Replayer::init(Context* on_finish) { dout(10) << dendl; { auto local_image_ctx = m_state_builder->local_image_ctx; std::shared_lock image_locker{local_image_ctx->image_lock}; m_image_spec = util::compute_image_spec(local_image_ctx->md_ctx, local_image_ctx->name); } { std::unique_lock locker{m_lock}; register_perf_counters(); } ceph_assert(m_on_init_shutdown == nullptr); m_on_init_shutdown = on_finish; init_remote_journaler(); } template void Replayer::shut_down(Context* on_finish) { dout(10) << dendl; std::unique_lock locker{m_lock}; ceph_assert(m_on_init_shutdown == nullptr); m_on_init_shutdown = on_finish; if (m_state == STATE_INIT) { // raced with the last piece of the init state machine return; } else if (m_state == STATE_REPLAYING) { m_state = STATE_COMPLETE; } // if shutting down due to an error notification, we don't // need to propagate the same error again m_error_code = 0; m_error_description = ""; cancel_delayed_preprocess_task(); cancel_flush_local_replay_task(); wait_for_flush(); } template void Replayer::flush(Context* on_finish) { dout(10) << dendl; flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker, on_finish)); } template bool Replayer::get_replay_status(std::string* description, Context* on_finish) { dout(10) << dendl; std::unique_lock locker{m_lock}; if (m_replay_status_formatter == nullptr) { derr << "replay not running" << dendl; locker.unlock(); on_finish->complete(-EAGAIN); return false; } on_finish = new C_TrackedOp(m_in_flight_op_tracker, on_finish); return m_replay_status_formatter->get_or_send_update(description, on_finish); } template void Replayer::init_remote_journaler() { dout(10) << dendl; Context *ctx = create_context_callback< Replayer, &Replayer::handle_init_remote_journaler>(this); m_state_builder->remote_journaler->init(ctx); } template void Replayer::handle_init_remote_journaler(int r) { dout(10) << "r=" << r << dendl; std::unique_lock locker{m_lock}; if (r < 0) { derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl; handle_replay_complete(locker, r, "error initializing remote journal"); close_local_image(); return; } // listen for metadata updates to check for disconnect events ceph_assert(m_remote_listener == nullptr); m_remote_listener = new RemoteJournalerListener(this); m_state_builder->remote_journaler->add_listener(m_remote_listener); cls::journal::Client remote_client; r = m_state_builder->remote_journaler->get_cached_client(m_local_mirror_uuid, &remote_client); if (r < 0) { derr << "error retrieving remote journal client: " << cpp_strerror(r) << dendl; handle_replay_complete(locker, r, "error retrieving remote journal client"); close_local_image(); return; } std::string error; r = validate_remote_client_state(remote_client, &m_state_builder->remote_client_meta, &m_resync_requested, &error); if (r < 0) { handle_replay_complete(locker, r, error); close_local_image(); return; } start_external_replay(locker); } template void Replayer::start_external_replay(std::unique_lock& locker) { dout(10) << dendl; auto local_image_ctx = m_state_builder->local_image_ctx; std::shared_lock local_image_locker{local_image_ctx->image_lock}; ceph_assert(m_local_journal == nullptr); m_local_journal = local_image_ctx->journal; if (m_local_journal == nullptr) { local_image_locker.unlock(); derr << "local image journal closed" << dendl; handle_replay_complete(locker, -EINVAL, "error accessing local journal"); close_local_image(); return; } // safe to hold pointer to journal after external playback starts Context *start_ctx = create_context_callback< Replayer, &Replayer::handle_start_external_replay>(this); m_local_journal->start_external_replay(&m_local_journal_replay, start_ctx); } template void Replayer::handle_start_external_replay(int r) { dout(10) << "r=" << r << dendl; std::unique_lock locker{m_lock}; if (r < 0) { ceph_assert(m_local_journal_replay == nullptr); derr << "error starting external replay on local image " << m_state_builder->local_image_ctx->id << ": " << cpp_strerror(r) << dendl; handle_replay_complete(locker, r, "error starting replay on local image"); close_local_image(); return; } if (!notify_init_complete(locker)) { return; } m_state = STATE_REPLAYING; // check for resync/promotion state after adding listener if (!add_local_journal_listener(locker)) { return; } // start remote journal replay m_event_preprocessor = EventPreprocessor::create( *m_state_builder->local_image_ctx, *m_state_builder->remote_journaler, m_local_mirror_uuid, &m_state_builder->remote_client_meta, m_threads->work_queue); m_replay_status_formatter = ReplayStatusFormatter::create( m_state_builder->remote_journaler, m_local_mirror_uuid); auto cct = static_cast(m_state_builder->local_image_ctx->cct); double poll_seconds = cct->_conf.get_val( "rbd_mirror_journal_poll_age"); m_remote_replay_handler = new RemoteReplayHandler(this); m_state_builder->remote_journaler->start_live_replay(m_remote_replay_handler, poll_seconds); notify_status_updated(); } template bool Replayer::add_local_journal_listener( std::unique_lock& locker) { dout(10) << dendl; // listen for promotion and resync requests against local journal ceph_assert(m_local_journal_listener == nullptr); m_local_journal_listener = new LocalJournalListener(this); m_local_journal->add_listener(m_local_journal_listener); // verify that the local image wasn't force-promoted and that a resync hasn't // been requested now that we are listening for events if (m_local_journal->is_tag_owner()) { dout(10) << "local image force-promoted" << dendl; handle_replay_complete(locker, 0, "force promoted"); return false; } bool resync_requested = false; int r = m_local_journal->is_resync_requested(&resync_requested); if (r < 0) { dout(10) << "failed to determine resync state: " << cpp_strerror(r) << dendl; handle_replay_complete(locker, r, "error parsing resync state"); return false; } else if (resync_requested) { dout(10) << "local image resync requested" << dendl; handle_replay_complete(locker, 0, "resync requested"); return false; } return true; } template bool Replayer::notify_init_complete(std::unique_lock& locker) { dout(10) << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); ceph_assert(m_state == STATE_INIT); // notify that init has completed Context *on_finish = nullptr; std::swap(m_on_init_shutdown, on_finish); locker.unlock(); on_finish->complete(0); locker.lock(); if (m_on_init_shutdown != nullptr) { // shut down requested after we notified init complete but before we // grabbed the lock close_local_image(); return false; } return true; } template void Replayer::wait_for_flush() { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); // ensure that we don't have two concurrent local journal replay shut downs dout(10) << dendl; auto ctx = create_async_context_callback( m_threads->work_queue, create_context_callback< Replayer, &Replayer::handle_wait_for_flush>(this)); m_flush_tracker.wait_for_ops(ctx); } template void Replayer::handle_wait_for_flush(int r) { dout(10) << "r=" << r << dendl; shut_down_local_journal_replay(); } template void Replayer::shut_down_local_journal_replay() { std::unique_lock locker{m_lock}; if (m_local_journal_replay == nullptr) { wait_for_event_replay(); return; } // It's required to stop the local journal replay state machine prior to // waiting for the events to complete. This is to ensure that IO is properly // flushed (it might be batched), wait for any running ops to complete, and // to cancel any ops waiting for their associated OnFinish events. dout(10) << dendl; auto ctx = create_context_callback< Replayer, &Replayer::handle_shut_down_local_journal_replay>(this); m_local_journal_replay->shut_down(true, ctx); } template void Replayer::handle_shut_down_local_journal_replay(int r) { dout(10) << "r=" << r << dendl; std::unique_lock locker{m_lock}; if (r < 0) { derr << "error shutting down journal replay: " << cpp_strerror(r) << dendl; handle_replay_error(r, "failed to shut down local journal replay"); } wait_for_event_replay(); } template void Replayer::wait_for_event_replay() { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); dout(10) << dendl; auto ctx = create_async_context_callback( m_threads->work_queue, create_context_callback< Replayer, &Replayer::handle_wait_for_event_replay>(this)); m_event_replay_tracker.wait_for_ops(ctx); } template void Replayer::handle_wait_for_event_replay(int r) { dout(10) << "r=" << r << dendl; std::unique_lock locker{m_lock}; close_local_image(); } template void Replayer::close_local_image() { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); if (m_state_builder->local_image_ctx == nullptr) { stop_remote_journaler_replay(); return; } dout(10) << dendl; if (m_local_journal_listener != nullptr) { // blocks if listener notification is in-progress m_local_journal->remove_listener(m_local_journal_listener); delete m_local_journal_listener; m_local_journal_listener = nullptr; } if (m_local_journal_replay != nullptr) { m_local_journal->stop_external_replay(); m_local_journal_replay = nullptr; } if (m_event_preprocessor != nullptr) { image_replayer::journal::EventPreprocessor::destroy( m_event_preprocessor); m_event_preprocessor = nullptr; } m_local_journal.reset(); // NOTE: it's important to ensure that the local image is fully // closed before attempting to close the remote journal in // case the remote cluster is unreachable ceph_assert(m_state_builder->local_image_ctx != nullptr); auto ctx = create_context_callback< Replayer, &Replayer::handle_close_local_image>(this); auto request = image_replayer::CloseImageRequest::create( &m_state_builder->local_image_ctx, ctx); request->send(); } template void Replayer::handle_close_local_image(int r) { dout(10) << "r=" << r << dendl; std::unique_lock locker{m_lock}; if (r < 0) { derr << "error closing local iamge: " << cpp_strerror(r) << dendl; handle_replay_error(r, "failed to close local image"); } ceph_assert(m_state_builder->local_image_ctx == nullptr); stop_remote_journaler_replay(); } template void Replayer::stop_remote_journaler_replay() { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); if (m_state_builder->remote_journaler == nullptr) { wait_for_in_flight_ops(); return; } else if (m_remote_replay_handler == nullptr) { wait_for_in_flight_ops(); return; } dout(10) << dendl; auto ctx = create_async_context_callback( m_threads->work_queue, create_context_callback< Replayer, &Replayer::handle_stop_remote_journaler_replay>(this)); m_state_builder->remote_journaler->stop_replay(ctx); } template void Replayer::handle_stop_remote_journaler_replay(int r) { dout(10) << "r=" << r << dendl; std::unique_lock locker{m_lock}; if (r < 0) { derr << "failed to stop remote journaler replay : " << cpp_strerror(r) << dendl; handle_replay_error(r, "failed to stop remote journaler replay"); } delete m_remote_replay_handler; m_remote_replay_handler = nullptr; wait_for_in_flight_ops(); } template void Replayer::wait_for_in_flight_ops() { dout(10) << dendl; if (m_remote_listener != nullptr) { m_state_builder->remote_journaler->remove_listener(m_remote_listener); delete m_remote_listener; m_remote_listener = nullptr; } auto ctx = create_async_context_callback( m_threads->work_queue, create_context_callback< Replayer, &Replayer::handle_wait_for_in_flight_ops>(this)); m_in_flight_op_tracker.wait_for_ops(ctx); } template void Replayer::handle_wait_for_in_flight_ops(int r) { dout(10) << "r=" << r << dendl; ReplayStatusFormatter::destroy(m_replay_status_formatter); m_replay_status_formatter = nullptr; Context* on_init_shutdown = nullptr; { std::unique_lock locker{m_lock}; ceph_assert(m_on_init_shutdown != nullptr); std::swap(m_on_init_shutdown, on_init_shutdown); m_state = STATE_COMPLETE; } on_init_shutdown->complete(m_error_code); } template void Replayer::handle_remote_journal_metadata_updated() { dout(20) << dendl; std::unique_lock locker{m_lock}; if (m_state != STATE_REPLAYING) { return; } cls::journal::Client remote_client; int r = m_state_builder->remote_journaler->get_cached_client( m_local_mirror_uuid, &remote_client); if (r < 0) { derr << "failed to retrieve client: " << cpp_strerror(r) << dendl; return; } librbd::journal::MirrorPeerClientMeta remote_client_meta; std::string error; r = validate_remote_client_state(remote_client, &remote_client_meta, &m_resync_requested, &error); if (r < 0) { dout(0) << "client flagged disconnected, stopping image replay" << dendl; handle_replay_complete(locker, r, error); } } template void Replayer::schedule_flush_local_replay_task() { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); std::unique_lock timer_locker{m_threads->timer_lock}; if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) { return; } dout(15) << dendl; m_flush_local_replay_task = create_async_context_callback( m_threads->work_queue, create_context_callback< Replayer, &Replayer::handle_flush_local_replay_task>(this)); m_threads->timer->add_event_after(30, m_flush_local_replay_task); } template void Replayer::cancel_flush_local_replay_task() { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); std::unique_lock timer_locker{m_threads->timer_lock}; if (m_flush_local_replay_task != nullptr) { dout(10) << dendl; m_threads->timer->cancel_event(m_flush_local_replay_task); m_flush_local_replay_task = nullptr; } } template void Replayer::handle_flush_local_replay_task(int) { dout(15) << dendl; m_in_flight_op_tracker.start_op(); auto on_finish = new LambdaContext([this](int) { std::unique_lock locker{m_lock}; { std::unique_lock timer_locker{m_threads->timer_lock}; m_flush_local_replay_task = nullptr; } notify_status_updated(); m_in_flight_op_tracker.finish_op(); }); flush_local_replay(on_finish); } template void Replayer::flush_local_replay(Context* on_flush) { std::unique_lock locker{m_lock}; if (m_state != STATE_REPLAYING) { locker.unlock(); on_flush->complete(0); return; } else if (m_local_journal_replay == nullptr) { // raced w/ a tag creation stop/start, which implies that // the replay is flushed locker.unlock(); flush_commit_position(on_flush); return; } dout(15) << dendl; auto ctx = new LambdaContext( [this, on_flush](int r) { handle_flush_local_replay(on_flush, r); }); m_local_journal_replay->flush(ctx); } template void Replayer::handle_flush_local_replay(Context* on_flush, int r) { dout(15) << "r=" << r << dendl; if (r < 0) { derr << "error flushing local replay: " << cpp_strerror(r) << dendl; on_flush->complete(r); return; } flush_commit_position(on_flush); } template void Replayer::flush_commit_position(Context* on_flush) { std::unique_lock locker{m_lock}; if (m_state != STATE_REPLAYING) { locker.unlock(); on_flush->complete(0); return; } dout(15) << dendl; auto ctx = new LambdaContext( [this, on_flush](int r) { handle_flush_commit_position(on_flush, r); }); m_state_builder->remote_journaler->flush_commit_position(ctx); } template void Replayer::handle_flush_commit_position(Context* on_flush, int r) { dout(15) << "r=" << r << dendl; if (r < 0) { derr << "error flushing remote journal commit position: " << cpp_strerror(r) << dendl; } on_flush->complete(r); } template void Replayer::handle_replay_error(int r, const std::string &error) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); if (m_error_code == 0) { m_error_code = r; m_error_description = error; } } template bool Replayer::is_replay_complete() const { std::unique_lock locker{m_lock}; return is_replay_complete(locker); } template bool Replayer::is_replay_complete( const std::unique_lock&) const { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); return (m_state == STATE_COMPLETE); } template void Replayer::handle_replay_complete(int r, const std::string &error) { std::unique_lock locker{m_lock}; handle_replay_complete(locker, r, error); } template void Replayer::handle_replay_complete( const std::unique_lock&, int r, const std::string &error) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); dout(10) << "r=" << r << ", error=" << error << dendl; if (r < 0) { derr << "replay encountered an error: " << cpp_strerror(r) << dendl; handle_replay_error(r, error); } if (m_state != STATE_REPLAYING) { return; } m_state = STATE_COMPLETE; notify_status_updated(); } template void Replayer::handle_replay_ready() { std::unique_lock locker{m_lock}; handle_replay_ready(locker); } template void Replayer::handle_replay_ready( std::unique_lock& locker) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); dout(20) << dendl; if (is_replay_complete(locker)) { return; } if (!m_state_builder->remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) { dout(20) << "no entries ready for replay" << dendl; return; } // can safely drop lock once the entry is tracked m_event_replay_tracker.start_op(); locker.unlock(); dout(20) << "entry tid=" << m_replay_entry.get_commit_tid() << "tag_tid=" << m_replay_tag_tid << dendl; if (!m_replay_tag_valid || m_replay_tag.tid != m_replay_tag_tid) { // must allocate a new local journal tag prior to processing replay_flush(); return; } preprocess_entry(); } template void Replayer::replay_flush() { dout(10) << dendl; m_flush_tracker.start_op(); // shut down the replay to flush all IO and ops and create a new // replayer to handle the new tag epoch auto ctx = create_context_callback< Replayer, &Replayer::handle_replay_flush_shut_down>(this); ceph_assert(m_local_journal_replay != nullptr); m_local_journal_replay->shut_down(false, ctx); } template void Replayer::handle_replay_flush_shut_down(int r) { std::unique_lock locker{m_lock}; dout(10) << "r=" << r << dendl; ceph_assert(m_local_journal != nullptr); ceph_assert(m_local_journal_listener != nullptr); // blocks if listener notification is in-progress m_local_journal->remove_listener(m_local_journal_listener); delete m_local_journal_listener; m_local_journal_listener = nullptr; m_local_journal->stop_external_replay(); m_local_journal_replay = nullptr; m_local_journal.reset(); if (r < 0) { locker.unlock(); handle_replay_flush(r); return; } // journal might have been closed now that we stopped external replay auto local_image_ctx = m_state_builder->local_image_ctx; std::shared_lock local_image_locker{local_image_ctx->image_lock}; m_local_journal = local_image_ctx->journal; if (m_local_journal == nullptr) { local_image_locker.unlock(); locker.unlock(); derr << "local image journal closed" << dendl; handle_replay_flush(-EINVAL); return; } auto ctx = create_context_callback< Replayer, &Replayer::handle_replay_flush>(this); m_local_journal->start_external_replay(&m_local_journal_replay, ctx); } template void Replayer::handle_replay_flush(int r) { std::unique_lock locker{m_lock}; dout(10) << "r=" << r << dendl; m_flush_tracker.finish_op(); if (r < 0) { derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl; handle_replay_complete(locker, r, "replay flush encountered an error"); m_event_replay_tracker.finish_op(); return; } else if (is_replay_complete(locker)) { m_event_replay_tracker.finish_op(); return; } // check for resync/promotion state after adding listener if (!add_local_journal_listener(locker)) { m_event_replay_tracker.finish_op(); return; } locker.unlock(); get_remote_tag(); } template void Replayer::get_remote_tag() { dout(15) << "tag_tid: " << m_replay_tag_tid << dendl; Context *ctx = create_context_callback< Replayer, &Replayer::handle_get_remote_tag>(this); m_state_builder->remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx); } template void Replayer::handle_get_remote_tag(int r) { dout(15) << "r=" << r << dendl; if (r == 0) { try { auto it = m_replay_tag.data.cbegin(); decode(m_replay_tag_data, it); } catch (const buffer::error &err) { r = -EBADMSG; } } if (r < 0) { derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": " << cpp_strerror(r) << dendl; handle_replay_complete(r, "failed to retrieve remote tag"); m_event_replay_tracker.finish_op(); return; } m_replay_tag_valid = true; dout(15) << "decoded remote tag " << m_replay_tag_tid << ": " << m_replay_tag_data << dendl; allocate_local_tag(); } template void Replayer::allocate_local_tag() { dout(15) << dendl; std::string mirror_uuid = m_replay_tag_data.mirror_uuid; if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { mirror_uuid = m_state_builder->remote_mirror_uuid; } else if (mirror_uuid == m_local_mirror_uuid) { mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) { // handle possible edge condition where daemon can failover and // the local image has already been promoted/demoted auto local_tag_data = m_local_journal->get_tag_data(); if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID && (local_tag_data.predecessor.commit_valid && local_tag_data.predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID)) { dout(15) << "skipping stale demotion event" << dendl; handle_process_entry_safe(m_replay_entry, m_replay_bytes, m_replay_start_time, 0); handle_replay_ready(); return; } else { dout(5) << "encountered image demotion: stopping" << dendl; handle_replay_complete(0, ""); } } librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor); if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { predecessor.mirror_uuid = m_state_builder->remote_mirror_uuid; } else if (predecessor.mirror_uuid == m_local_mirror_uuid) { predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; } dout(15) << "mirror_uuid=" << mirror_uuid << ", " << "predecessor=" << predecessor << ", " << "replay_tag_tid=" << m_replay_tag_tid << dendl; Context *ctx = create_context_callback< Replayer, &Replayer::handle_allocate_local_tag>(this); m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx); } template void Replayer::handle_allocate_local_tag(int r) { dout(15) << "r=" << r << ", " << "tag_tid=" << m_local_journal->get_tag_tid() << dendl; if (r < 0) { derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl; handle_replay_complete(r, "failed to allocate journal tag"); m_event_replay_tracker.finish_op(); return; } preprocess_entry(); } template void Replayer::preprocess_entry() { dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid() << dendl; bufferlist data = m_replay_entry.get_data(); auto it = data.cbegin(); int r = m_local_journal_replay->decode(&it, &m_event_entry); if (r < 0) { derr << "failed to decode journal event" << dendl; handle_replay_complete(r, "failed to decode journal event"); m_event_replay_tracker.finish_op(); return; } m_replay_bytes = data.length(); uint32_t delay = calculate_replay_delay( m_event_entry.timestamp, m_state_builder->local_image_ctx->mirroring_replay_delay); if (delay == 0) { handle_preprocess_entry_ready(0); return; } std::unique_lock locker{m_lock}; if (is_replay_complete(locker)) { // don't schedule a delayed replay task if a shut-down is in-progress m_event_replay_tracker.finish_op(); return; } dout(20) << "delaying replay by " << delay << " sec" << dendl; std::unique_lock timer_locker{m_threads->timer_lock}; ceph_assert(m_delayed_preprocess_task == nullptr); m_delayed_preprocess_task = create_context_callback< Replayer, &Replayer::handle_delayed_preprocess_task>(this); m_threads->timer->add_event_after(delay, m_delayed_preprocess_task); } template void Replayer::handle_delayed_preprocess_task(int r) { dout(20) << "r=" << r << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock)); m_delayed_preprocess_task = nullptr; m_threads->work_queue->queue(create_context_callback< Replayer, &Replayer::handle_preprocess_entry_ready>(this), 0); } template void Replayer::handle_preprocess_entry_ready(int r) { dout(20) << "r=" << r << dendl; ceph_assert(r == 0); m_replay_start_time = ceph_clock_now(); if (!m_event_preprocessor->is_required(m_event_entry)) { process_entry(); return; } Context *ctx = create_context_callback< Replayer, &Replayer::handle_preprocess_entry_safe>(this); m_event_preprocessor->preprocess(&m_event_entry, ctx); } template void Replayer::handle_preprocess_entry_safe(int r) { dout(20) << "r=" << r << dendl; if (r < 0) { if (r == -ECANCELED) { handle_replay_complete(0, "lost exclusive lock"); } else { derr << "failed to preprocess journal event" << dendl; handle_replay_complete(r, "failed to preprocess journal event"); } m_event_replay_tracker.finish_op(); return; } process_entry(); } template void Replayer::process_entry() { dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() << dendl; Context *on_ready = create_context_callback< Replayer, &Replayer::handle_process_entry_ready>(this); Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry), m_replay_bytes, m_replay_start_time); m_local_journal_replay->process(m_event_entry, on_ready, on_commit); } template void Replayer::handle_process_entry_ready(int r) { std::unique_lock locker{m_lock}; dout(20) << dendl; ceph_assert(r == 0); bool update_status = false; { auto local_image_ctx = m_state_builder->local_image_ctx; std::shared_lock image_locker{local_image_ctx->image_lock}; auto image_spec = util::compute_image_spec(local_image_ctx->md_ctx, local_image_ctx->name); if (m_image_spec != image_spec) { m_image_spec = image_spec; update_status = true; } } m_replay_status_formatter->handle_entry_processed(m_replay_bytes); if (update_status) { unregister_perf_counters(); register_perf_counters(); notify_status_updated(); } // attempt to process the next event handle_replay_ready(locker); } template void Replayer::handle_process_entry_safe( const ReplayEntry &replay_entry, uint64_t replay_bytes, const utime_t &replay_start_time, int r) { dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r << dendl; if (r < 0) { derr << "failed to commit journal event: " << cpp_strerror(r) << dendl; handle_replay_complete(r, "failed to commit journal event"); } else { ceph_assert(m_state_builder->remote_journaler != nullptr); m_state_builder->remote_journaler->committed(replay_entry); } auto latency = ceph_clock_now() - replay_start_time; if (g_journal_perf_counters) { g_journal_perf_counters->inc(l_rbd_mirror_journal_entries); g_journal_perf_counters->inc(l_rbd_mirror_journal_replay_bytes, replay_bytes); g_journal_perf_counters->tinc(l_rbd_mirror_journal_replay_latency, latency); } auto ctx = new LambdaContext( [this, replay_bytes, latency](int r) { std::unique_lock locker{m_lock}; schedule_flush_local_replay_task(); if (m_perf_counters) { m_perf_counters->inc(l_rbd_mirror_journal_entries); m_perf_counters->inc(l_rbd_mirror_journal_replay_bytes, replay_bytes); m_perf_counters->tinc(l_rbd_mirror_journal_replay_latency, latency); } m_event_replay_tracker.finish_op(); }); m_threads->work_queue->queue(ctx, 0); } template void Replayer::handle_resync_image() { dout(10) << dendl; std::unique_lock locker{m_lock}; m_resync_requested = true; handle_replay_complete(locker, 0, "resync requested"); } template void Replayer::notify_status_updated() { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); dout(10) << dendl; auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext( [this](int) { m_replayer_listener->handle_notification(); })); m_threads->work_queue->queue(ctx, 0); } template void Replayer::cancel_delayed_preprocess_task() { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); bool canceled_delayed_preprocess_task = false; { std::unique_lock timer_locker{m_threads->timer_lock}; if (m_delayed_preprocess_task != nullptr) { dout(10) << dendl; canceled_delayed_preprocess_task = m_threads->timer->cancel_event( m_delayed_preprocess_task); ceph_assert(canceled_delayed_preprocess_task); m_delayed_preprocess_task = nullptr; } } if (canceled_delayed_preprocess_task) { // wake up sleeping replay m_event_replay_tracker.finish_op(); } } template int Replayer::validate_remote_client_state( const cls::journal::Client& remote_client, librbd::journal::MirrorPeerClientMeta* remote_client_meta, bool* resync_requested, std::string* error) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); if (!util::decode_client_meta(remote_client, remote_client_meta)) { // require operator intervention since the data is corrupt *error = "error retrieving remote journal client"; return -EBADMSG; } auto local_image_ctx = m_state_builder->local_image_ctx; dout(5) << "image_id=" << local_image_ctx->id << ", " << "remote_client_meta.image_id=" << remote_client_meta->image_id << ", " << "remote_client.state=" << remote_client.state << dendl; if (remote_client_meta->image_id == local_image_ctx->id && remote_client.state != cls::journal::CLIENT_STATE_CONNECTED) { dout(5) << "client flagged disconnected, stopping image replay" << dendl; if (local_image_ctx->config.template get_val( "rbd_mirroring_resync_after_disconnect")) { dout(10) << "disconnected: automatic resync" << dendl; *resync_requested = true; *error = "disconnected: automatic resync"; return -ENOTCONN; } else { dout(10) << "disconnected" << dendl; *error = "disconnected"; return -ENOTCONN; } } return 0; } template void Replayer::register_perf_counters() { dout(5) << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); ceph_assert(m_perf_counters == nullptr); auto cct = static_cast(m_state_builder->local_image_ctx->cct); auto prio = cct->_conf.get_val("rbd_mirror_image_perf_stats_prio"); auto local_image_ctx = m_state_builder->local_image_ctx; std::string labels = ceph::perf_counters::key_create( "rbd_mirror_journal_image", {{"pool", local_image_ctx->md_ctx.get_pool_name()}, {"namespace", local_image_ctx->md_ctx.get_namespace()}, {"image", local_image_ctx->name}}); PerfCountersBuilder plb(g_ceph_context, labels, l_rbd_mirror_journal_first, l_rbd_mirror_journal_last); plb.add_u64_counter(l_rbd_mirror_journal_entries, "entries", "Number of entries replayed", nullptr, prio); plb.add_u64_counter(l_rbd_mirror_journal_replay_bytes, "replay_bytes", "Total bytes replayed", nullptr, prio, unit_t(UNIT_BYTES)); plb.add_time_avg(l_rbd_mirror_journal_replay_latency, "replay_latency", "Replay latency", nullptr, prio); m_perf_counters = plb.create_perf_counters(); g_ceph_context->get_perfcounters_collection()->add(m_perf_counters); } template void Replayer::unregister_perf_counters() { dout(5) << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); PerfCounters *perf_counters = nullptr; std::swap(perf_counters, m_perf_counters); if (perf_counters != nullptr) { g_ceph_context->get_perfcounters_collection()->remove(perf_counters); delete perf_counters; } } } // namespace journal } // namespace image_replayer } // namespace mirror } // namespace rbd template class rbd::mirror::image_replayer::journal::Replayer;