// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H #define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H #include "tools/rbd_mirror/image_replayer/Replayer.h" #include "include/utime.h" #include "common/AsyncOpTracker.h" #include "common/ceph_mutex.h" #include "common/RefCountedObj.h" #include "cls/journal/cls_journal_types.h" #include "journal/ReplayEntry.h" #include "librbd/ImageCtx.h" #include "librbd/journal/Types.h" #include "librbd/journal/TypeTraits.h" #include #include namespace journal { class Journaler; } namespace librbd { struct ImageCtx; namespace journal { template class Replay; } } // namespace librbd namespace rbd { namespace mirror { template struct Threads; namespace image_replayer { struct ReplayerListener; namespace journal { template class EventPreprocessor; template class ReplayStatusFormatter; template class StateBuilder; template class Replayer : public image_replayer::Replayer { public: typedef typename librbd::journal::TypeTraits::Journaler Journaler; static Replayer* create( Threads* threads, const std::string& local_mirror_uuid, StateBuilder* state_builder, ReplayerListener* replayer_listener) { return new Replayer(threads, local_mirror_uuid, state_builder, replayer_listener); } Replayer( Threads* threads, const std::string& local_mirror_uuid, StateBuilder* state_builder, ReplayerListener* replayer_listener); ~Replayer(); void destroy() override { delete this; } void init(Context* on_finish) override; void shut_down(Context* on_finish) override; void flush(Context* on_finish) override; bool get_replay_status(std::string* description, Context* on_finish) override; bool is_replaying() const override { std::unique_lock locker{m_lock}; return (m_state == STATE_REPLAYING); } bool is_resync_requested() const override { std::unique_lock locker(m_lock); return m_resync_requested; } int get_error_code() const override { std::unique_lock locker(m_lock); return m_error_code; } std::string get_error_description() const override { std::unique_lock locker(m_lock); return m_error_description; } std::string get_image_spec() const { std::unique_lock locker(m_lock); return m_image_spec; } private: /** * @verbatim * * * | * v (error) * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * * * | * * v (error) * * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * * * | * * | /--------------------------------------------\ * * | | | * * v v (asok flush) | * * REPLAYING -------------> LOCAL_REPLAY_FLUSH | * * | \ | | * * | | v | * * | | FLUSH_COMMIT_POSITION | * * | | | | * * | | \--------------------/| * * | | | * * | | (entries available) | * * | \-----------> REPLAY_READY | * * | | | * * | | (skip if not | * * | v needed) (error) * * | REPLAY_FLUSH * * * * * * * * * * * | | | * * * | | (skip if not | * * * | v needed) (error) * * * | GET_REMOTE_TAG * * * * * * * * * * | | | * * * | | (skip if not | * * * | v needed) (error) * * * | ALLOCATE_LOCAL_TAG * * * * * * * * | | | * * * | v (error) * * * | PREPROCESS_ENTRY * * * * * * * * * | | | * * * | v (error) * * * | PROCESS_ENTRY * * * * * * * * * * * | | | * * * | \---------------------/ * * * v (shutdown) * * * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * * * | * * v * * WAIT_FOR_FLUSH * * | * * v * * SHUT_DOWN_LOCAL_JOURNAL_REPLAY * * | * * v * * WAIT_FOR_REPLAY * * | * * v * * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * * * | * v (skip if not started) * STOP_REMOTE_JOURNALER_REPLAY * | * v * WAIT_FOR_IN_FLIGHT_OPS * | * v * * * @endverbatim */ typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; enum State { STATE_INIT, STATE_REPLAYING, STATE_COMPLETE }; struct C_ReplayCommitted; struct RemoteJournalerListener; struct RemoteReplayHandler; struct LocalJournalListener; Threads* m_threads; std::string m_local_mirror_uuid; StateBuilder* m_state_builder; ReplayerListener* m_replayer_listener; mutable ceph::mutex m_lock; std::string m_image_spec; Context* m_on_init_shutdown = nullptr; State m_state = STATE_INIT; int m_error_code = 0; std::string m_error_description; bool m_resync_requested = false; ceph::ref_t::type> m_local_journal; RemoteJournalerListener* m_remote_listener = nullptr; librbd::journal::Replay* m_local_journal_replay = nullptr; EventPreprocessor* m_event_preprocessor = nullptr; ReplayStatusFormatter* m_replay_status_formatter = nullptr; RemoteReplayHandler* m_remote_replay_handler = nullptr; LocalJournalListener* m_local_journal_listener = nullptr; PerfCounters *m_perf_counters = nullptr; ReplayEntry m_replay_entry; uint64_t m_replay_bytes = 0; utime_t m_replay_start_time; bool m_replay_tag_valid = false; uint64_t m_replay_tag_tid = 0; cls::journal::Tag m_replay_tag; librbd::journal::TagData m_replay_tag_data; librbd::journal::EventEntry m_event_entry; AsyncOpTracker m_flush_tracker; AsyncOpTracker m_event_replay_tracker; Context *m_delayed_preprocess_task = nullptr; AsyncOpTracker m_in_flight_op_tracker; Context *m_flush_local_replay_task = nullptr; void handle_remote_journal_metadata_updated(); void schedule_flush_local_replay_task(); void cancel_flush_local_replay_task(); void handle_flush_local_replay_task(int r); void flush_local_replay(Context* on_flush); void handle_flush_local_replay(Context* on_flush, int r); void flush_commit_position(Context* on_flush); void handle_flush_commit_position(Context* on_flush, int r); void init_remote_journaler(); void handle_init_remote_journaler(int r); void start_external_replay(std::unique_lock& locker); void handle_start_external_replay(int r); bool add_local_journal_listener(std::unique_lock& locker); bool notify_init_complete(std::unique_lock& locker); void wait_for_flush(); void handle_wait_for_flush(int r); void shut_down_local_journal_replay(); void handle_shut_down_local_journal_replay(int r); void wait_for_event_replay(); void handle_wait_for_event_replay(int r); void close_local_image(); void handle_close_local_image(int r); void stop_remote_journaler_replay(); void handle_stop_remote_journaler_replay(int r); void wait_for_in_flight_ops(); void handle_wait_for_in_flight_ops(int r); void replay_flush(); void handle_replay_flush_shut_down(int r); void handle_replay_flush(int r); void get_remote_tag(); void handle_get_remote_tag(int r); void allocate_local_tag(); void handle_allocate_local_tag(int r); void handle_replay_error(int r, const std::string &error); bool is_replay_complete() const; bool is_replay_complete(const std::unique_lock& locker) const; void handle_replay_complete(int r, const std::string &error_desc); void handle_replay_complete(const std::unique_lock&, int r, const std::string &error_desc); void handle_replay_ready(); void handle_replay_ready(std::unique_lock& locker); void preprocess_entry(); void handle_delayed_preprocess_task(int r); void handle_preprocess_entry_ready(int r); void handle_preprocess_entry_safe(int r); void process_entry(); void handle_process_entry_ready(int r); void handle_process_entry_safe(const ReplayEntry& replay_entry, uint64_t relay_bytes, const utime_t &replay_start_time, int r); void handle_resync_image(); void notify_status_updated(); void cancel_delayed_preprocess_task(); int validate_remote_client_state( const cls::journal::Client& remote_client, librbd::journal::MirrorPeerClientMeta* remote_client_meta, bool* resync_requested, std::string* error); void register_perf_counters(); void unregister_perf_counters(); }; } // namespace journal } // namespace image_replayer } // namespace mirror } // namespace rbd extern template class rbd::mirror::image_replayer::journal::Replayer; #endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H