diff options
Diffstat (limited to 'src/tools/rbd_mirror/image_replayer/journal/Replayer.h')
-rw-r--r-- | src/tools/rbd_mirror/image_replayer/journal/Replayer.h | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/src/tools/rbd_mirror/image_replayer/journal/Replayer.h b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h new file mode 100644 index 000000000..6b1f36d9c --- /dev/null +++ b/src/tools/rbd_mirror/image_replayer/journal/Replayer.h @@ -0,0 +1,323 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H +#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H + +#include "tools/rbd_mirror/image_replayer/Replayer.h" +#include "include/utime.h" +#include "common/AsyncOpTracker.h" +#include "common/ceph_mutex.h" +#include "common/RefCountedObj.h" +#include "cls/journal/cls_journal_types.h" +#include "journal/ReplayEntry.h" +#include "librbd/ImageCtx.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include <string> +#include <type_traits> + +namespace journal { class Journaler; } +namespace librbd { + +struct ImageCtx; +namespace journal { template <typename I> class Replay; } + +} // namespace librbd + +namespace rbd { +namespace mirror { + +template <typename> struct Threads; + +namespace image_replayer { + +struct ReplayerListener; + +namespace journal { + +template <typename> class EventPreprocessor; +template <typename> class ReplayStatusFormatter; +template <typename> class StateBuilder; + +template <typename ImageCtxT> +class Replayer : public image_replayer::Replayer { +public: + typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler; + + static Replayer* create( + Threads<ImageCtxT>* threads, + const std::string& local_mirror_uuid, + StateBuilder<ImageCtxT>* state_builder, + ReplayerListener* replayer_listener) { + return new Replayer(threads, local_mirror_uuid, state_builder, + replayer_listener); + } + + Replayer( + Threads<ImageCtxT>* threads, + const std::string& local_mirror_uuid, + StateBuilder<ImageCtxT>* state_builder, + ReplayerListener* replayer_listener); + ~Replayer(); + + void destroy() override { + delete this; + } + + void init(Context* on_finish) override; + void shut_down(Context* on_finish) override; + + void flush(Context* on_finish) override; + + bool get_replay_status(std::string* description, Context* on_finish) override; + + bool is_replaying() const override { + std::unique_lock locker{m_lock}; + return (m_state == STATE_REPLAYING); + } + + bool is_resync_requested() const override { + std::unique_lock locker(m_lock); + return m_resync_requested; + } + + int get_error_code() const override { + std::unique_lock locker(m_lock); + return m_error_code; + } + + std::string get_error_description() const override { + std::unique_lock locker(m_lock); + return m_error_description; + } + + std::string get_image_spec() const { + std::unique_lock locker(m_lock); + return m_image_spec; + } + +private: + /** + * @verbatim + * + * <init> + * | + * v (error) + * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * * + * | * + * | /--------------------------------------------\ * + * | | | * + * v v (asok flush) | * + * REPLAYING -------------> LOCAL_REPLAY_FLUSH | * + * | \ | | * + * | | v | * + * | | FLUSH_COMMIT_POSITION | * + * | | | | * + * | | \--------------------/| * + * | | | * + * | | (entries available) | * + * | \-----------> REPLAY_READY | * + * | | | * + * | | (skip if not | * + * | v needed) (error) * + * | REPLAY_FLUSH * * * * * * * * * * + * | | | * * + * | | (skip if not | * * + * | v needed) (error) * * + * | GET_REMOTE_TAG * * * * * * * * * + * | | | * * + * | | (skip if not | * * + * | v needed) (error) * * + * | ALLOCATE_LOCAL_TAG * * * * * * * + * | | | * * + * | v (error) * * + * | PREPROCESS_ENTRY * * * * * * * * + * | | | * * + * | v (error) * * + * | PROCESS_ENTRY * * * * * * * * * * + * | | | * * + * | \---------------------/ * * + * v (shutdown) * * + * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * * + * | * + * v * + * WAIT_FOR_FLUSH * + * | * + * v * + * SHUT_DOWN_LOCAL_JOURNAL_REPLAY * + * | * + * v * + * WAIT_FOR_REPLAY * + * | * + * v * + * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * * + * | + * v (skip if not started) + * STOP_REMOTE_JOURNALER_REPLAY + * | + * v + * WAIT_FOR_IN_FLIGHT_OPS + * | + * v + * <shutdown> + * + * @endverbatim + */ + + typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry; + + enum State { + STATE_INIT, + STATE_REPLAYING, + STATE_COMPLETE + }; + + struct C_ReplayCommitted; + struct RemoteJournalerListener; + struct RemoteReplayHandler; + struct LocalJournalListener; + + Threads<ImageCtxT>* m_threads; + std::string m_local_mirror_uuid; + StateBuilder<ImageCtxT>* m_state_builder; + ReplayerListener* m_replayer_listener; + + mutable ceph::mutex m_lock; + + std::string m_image_spec; + Context* m_on_init_shutdown = nullptr; + + State m_state = STATE_INIT; + int m_error_code = 0; + std::string m_error_description; + bool m_resync_requested = false; + + ceph::ref_t<typename std::remove_pointer<decltype(ImageCtxT::journal)>::type> + m_local_journal; + RemoteJournalerListener* m_remote_listener = nullptr; + + librbd::journal::Replay<ImageCtxT>* m_local_journal_replay = nullptr; + EventPreprocessor<ImageCtxT>* m_event_preprocessor = nullptr; + ReplayStatusFormatter<ImageCtxT>* m_replay_status_formatter = nullptr; + RemoteReplayHandler* m_remote_replay_handler = nullptr; + LocalJournalListener* m_local_journal_listener = nullptr; + + PerfCounters *m_perf_counters = nullptr; + + ReplayEntry m_replay_entry; + uint64_t m_replay_bytes = 0; + utime_t m_replay_start_time; + bool m_replay_tag_valid = false; + uint64_t m_replay_tag_tid = 0; + cls::journal::Tag m_replay_tag; + librbd::journal::TagData m_replay_tag_data; + librbd::journal::EventEntry m_event_entry; + + AsyncOpTracker m_flush_tracker; + + AsyncOpTracker m_event_replay_tracker; + Context *m_delayed_preprocess_task = nullptr; + + AsyncOpTracker m_in_flight_op_tracker; + Context *m_flush_local_replay_task = nullptr; + + void handle_remote_journal_metadata_updated(); + + void schedule_flush_local_replay_task(); + void cancel_flush_local_replay_task(); + void handle_flush_local_replay_task(int r); + + void flush_local_replay(Context* on_flush); + void handle_flush_local_replay(Context* on_flush, int r); + + void flush_commit_position(Context* on_flush); + void handle_flush_commit_position(Context* on_flush, int r); + + void init_remote_journaler(); + void handle_init_remote_journaler(int r); + + void start_external_replay(std::unique_lock<ceph::mutex>& locker); + void handle_start_external_replay(int r); + + bool add_local_journal_listener(std::unique_lock<ceph::mutex>& locker); + + bool notify_init_complete(std::unique_lock<ceph::mutex>& locker); + + void wait_for_flush(); + void handle_wait_for_flush(int r); + + void shut_down_local_journal_replay(); + void handle_shut_down_local_journal_replay(int r); + + void wait_for_event_replay(); + void handle_wait_for_event_replay(int r); + + void close_local_image(); + void handle_close_local_image(int r); + + void stop_remote_journaler_replay(); + void handle_stop_remote_journaler_replay(int r); + + void wait_for_in_flight_ops(); + void handle_wait_for_in_flight_ops(int r); + + void replay_flush(); + void handle_replay_flush_shut_down(int r); + void handle_replay_flush(int r); + + void get_remote_tag(); + void handle_get_remote_tag(int r); + + void allocate_local_tag(); + void handle_allocate_local_tag(int r); + + void handle_replay_error(int r, const std::string &error); + + bool is_replay_complete() const; + bool is_replay_complete(const std::unique_lock<ceph::mutex>& locker) const; + + void handle_replay_complete(int r, const std::string &error_desc); + void handle_replay_complete(const std::unique_lock<ceph::mutex>&, + int r, const std::string &error_desc); + void handle_replay_ready(); + void handle_replay_ready(std::unique_lock<ceph::mutex>& locker); + + void preprocess_entry(); + void handle_delayed_preprocess_task(int r); + void handle_preprocess_entry_ready(int r); + void handle_preprocess_entry_safe(int r); + + void process_entry(); + void handle_process_entry_ready(int r); + void handle_process_entry_safe(const ReplayEntry& replay_entry, + uint64_t relay_bytes, + const utime_t &replay_start_time, int r); + + void handle_resync_image(); + + void notify_status_updated(); + + void cancel_delayed_preprocess_task(); + + int validate_remote_client_state( + const cls::journal::Client& remote_client, + librbd::journal::MirrorPeerClientMeta* remote_client_meta, + bool* resync_requested, std::string* error); + + void register_perf_counters(); + void unregister_perf_counters(); + +}; + +} // namespace journal +} // namespace image_replayer +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>; + +#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H |