summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd_mirror/ImageReplayer.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/tools/rbd_mirror/ImageReplayer.cc
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/rbd_mirror/ImageReplayer.cc')
-rw-r--r--src/tools/rbd_mirror/ImageReplayer.cc1190
1 files changed, 1190 insertions, 0 deletions
diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc
new file mode 100644
index 000000000..ee22b8d34
--- /dev/null
+++ b/src/tools/rbd_mirror/ImageReplayer.cc
@@ -0,0 +1,1190 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/compat.h"
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "include/stringify.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "common/Timer.h"
+#include "global/global_context.h"
+#include "journal/Journaler.h"
+#include "librbd/ExclusiveLock.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/Journal.h"
+#include "librbd/Operations.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
+#include "ImageDeleter.h"
+#include "ImageReplayer.h"
+#include "MirrorStatusUpdater.h"
+#include "Threads.h"
+#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
+#include "tools/rbd_mirror/image_replayer/ReplayerListener.h"
+#include "tools/rbd_mirror/image_replayer/StateBuilder.h"
+#include "tools/rbd_mirror/image_replayer/Utils.h"
+#include "tools/rbd_mirror/image_replayer/journal/Replayer.h"
+#include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h"
+#include <map>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
+ << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+using librbd::util::create_context_callback;
+
+template <typename I>
+std::ostream &operator<<(std::ostream &os,
+ const typename ImageReplayer<I>::State &state);
+
+namespace {
+
+template <typename I>
+class ImageReplayerAdminSocketCommand {
+public:
+ ImageReplayerAdminSocketCommand(const std::string &desc,
+ ImageReplayer<I> *replayer)
+ : desc(desc), replayer(replayer) {
+ }
+ virtual ~ImageReplayerAdminSocketCommand() {}
+ virtual int call(Formatter *f) = 0;
+
+ std::string desc;
+ ImageReplayer<I> *replayer;
+ bool registered = false;
+};
+
+template <typename I>
+class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
+public:
+ explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->print_status(f);
+ return 0;
+ }
+};
+
+template <typename I>
+class StartCommand : public ImageReplayerAdminSocketCommand<I> {
+public:
+ explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->start(nullptr, true);
+ return 0;
+ }
+};
+
+template <typename I>
+class StopCommand : public ImageReplayerAdminSocketCommand<I> {
+public:
+ explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->stop(nullptr, true);
+ return 0;
+ }
+};
+
+template <typename I>
+class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
+public:
+ explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->restart();
+ return 0;
+ }
+};
+
+template <typename I>
+class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
+public:
+ explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
+ : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
+ }
+
+ int call(Formatter *f) override {
+ this->replayer->flush();
+ return 0;
+ }
+};
+
+template <typename I>
+class ImageReplayerAdminSocketHook : public AdminSocketHook {
+public:
+ ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
+ ImageReplayer<I> *replayer)
+ : admin_socket(cct->get_admin_socket()),
+ commands{{"rbd mirror flush " + name,
+ new FlushCommand<I>("flush rbd mirror " + name, replayer)},
+ {"rbd mirror restart " + name,
+ new RestartCommand<I>("restart rbd mirror " + name, replayer)},
+ {"rbd mirror start " + name,
+ new StartCommand<I>("start rbd mirror " + name, replayer)},
+ {"rbd mirror status " + name,
+ new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
+ {"rbd mirror stop " + name,
+ new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
+ }
+
+ int register_commands() {
+ for (auto &it : commands) {
+ int r = admin_socket->register_command(it.first, this,
+ it.second->desc);
+ if (r < 0) {
+ return r;
+ }
+ it.second->registered = true;
+ }
+ return 0;
+ }
+
+ ~ImageReplayerAdminSocketHook() override {
+ admin_socket->unregister_commands(this);
+ for (auto &it : commands) {
+ delete it.second;
+ }
+ commands.clear();
+ }
+
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
+ auto i = commands.find(command);
+ ceph_assert(i != commands.end());
+ return i->second->call(f);
+ }
+
+private:
+ typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
+ std::less<>> Commands;
+
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
+} // anonymous namespace
+
+template <typename I>
+void ImageReplayer<I>::BootstrapProgressContext::update_progress(
+ const std::string &description, bool flush)
+{
+ const std::string desc = "bootstrapping, " + description;
+ replayer->set_state_description(0, desc);
+ if (flush) {
+ replayer->update_mirror_image_status(false, boost::none);
+ }
+}
+
+template <typename I>
+struct ImageReplayer<I>::ReplayerListener
+ : public image_replayer::ReplayerListener {
+ ImageReplayer<I>* image_replayer;
+
+ ReplayerListener(ImageReplayer<I>* image_replayer)
+ : image_replayer(image_replayer) {
+ }
+
+ void handle_notification() override {
+ image_replayer->handle_replayer_notification();
+ }
+};
+
+template <typename I>
+ImageReplayer<I>::ImageReplayer(
+ librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
+ const std::string &global_image_id, Threads<I> *threads,
+ InstanceWatcher<I> *instance_watcher,
+ MirrorStatusUpdater<I>* local_status_updater,
+ journal::CacheManagerHandler *cache_manager_handler,
+ PoolMetaCache* pool_meta_cache) :
+ m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
+ m_global_image_id(global_image_id), m_threads(threads),
+ m_instance_watcher(instance_watcher),
+ m_local_status_updater(local_status_updater),
+ m_cache_manager_handler(cache_manager_handler),
+ m_pool_meta_cache(pool_meta_cache),
+ m_local_image_name(global_image_id),
+ m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
+ stringify(local_io_ctx.get_id()) + " " + global_image_id)),
+ m_progress_cxt(this),
+ m_replayer_listener(new ReplayerListener(this))
+{
+ // Register asok commands using a temporary "remote_pool_name/global_image_id"
+ // name. When the image name becomes known on start the asok commands will be
+ // re-registered using "remote_pool_name/remote_image_name" name.
+
+ m_image_spec = image_replayer::util::compute_image_spec(
+ local_io_ctx, global_image_id);
+ register_admin_socket_hook();
+}
+
+template <typename I>
+ImageReplayer<I>::~ImageReplayer()
+{
+ unregister_admin_socket_hook();
+ ceph_assert(m_state_builder == nullptr);
+ ceph_assert(m_on_start_finish == nullptr);
+ ceph_assert(m_on_stop_contexts.empty());
+ ceph_assert(m_bootstrap_request == nullptr);
+ ceph_assert(m_update_status_task == nullptr);
+ delete m_replayer_listener;
+}
+
+template <typename I>
+image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
+ std::lock_guard locker{m_lock};
+
+ if (!m_mirror_image_status_state) {
+ return image_replayer::HEALTH_STATE_OK;
+ } else if (*m_mirror_image_status_state ==
+ cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
+ *m_mirror_image_status_state ==
+ cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
+ return image_replayer::HEALTH_STATE_WARNING;
+ }
+ return image_replayer::HEALTH_STATE_ERROR;
+}
+
+template <typename I>
+void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
+ dout(10) << "peer=" << peer << dendl;
+
+ std::lock_guard locker{m_lock};
+ auto it = m_peers.find(peer);
+ if (it == m_peers.end()) {
+ m_peers.insert(peer);
+ }
+}
+
+template <typename I>
+void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
+
+ std::lock_guard l{m_lock};
+ m_last_r = r;
+ m_state_desc = desc;
+}
+
+template <typename I>
+void ImageReplayer<I>::start(Context *on_finish, bool manual, bool restart)
+{
+ dout(10) << "on_finish=" << on_finish << dendl;
+
+ int r = 0;
+ {
+ std::lock_guard locker{m_lock};
+ if (!is_stopped_()) {
+ derr << "already running" << dendl;
+ r = -EINVAL;
+ } else if (m_manual_stop && !manual) {
+ dout(5) << "stopped manually, ignoring start without manual flag"
+ << dendl;
+ r = -EPERM;
+ } else if (restart && !m_restart_requested) {
+ dout(10) << "canceled restart" << dendl;
+ r = -ECANCELED;
+ } else {
+ m_state = STATE_STARTING;
+ m_last_r = 0;
+ m_state_desc.clear();
+ m_manual_stop = false;
+ m_delete_requested = false;
+ m_restart_requested = false;
+ m_status_removed = false;
+
+ if (on_finish != nullptr) {
+ ceph_assert(m_on_start_finish == nullptr);
+ m_on_start_finish = on_finish;
+ }
+ ceph_assert(m_on_stop_contexts.empty());
+ }
+ }
+
+ if (r < 0) {
+ if (on_finish) {
+ on_finish->complete(r);
+ }
+ return;
+ }
+
+ bootstrap();
+}
+
+template <typename I>
+void ImageReplayer<I>::bootstrap() {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (m_peers.empty()) {
+ locker.unlock();
+
+ dout(5) << "no peer clusters" << dendl;
+ on_start_fail(-ENOENT, "no peer clusters");
+ return;
+ }
+
+ // TODO need to support multiple remote images
+ ceph_assert(!m_peers.empty());
+ m_remote_image_peer = *m_peers.begin();
+
+ if (on_start_interrupted(m_lock)) {
+ return;
+ }
+
+ ceph_assert(m_state_builder == nullptr);
+ auto ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
+ auto request = image_replayer::BootstrapRequest<I>::create(
+ m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
+ m_global_image_id, m_local_mirror_uuid,
+ m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
+ m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested,
+ ctx);
+
+ request->get();
+ m_bootstrap_request = request;
+ locker.unlock();
+
+ update_mirror_image_status(false, boost::none);
+ request->send();
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_bootstrap(int r) {
+ dout(10) << "r=" << r << dendl;
+ {
+ std::lock_guard locker{m_lock};
+ m_bootstrap_request->put();
+ m_bootstrap_request = nullptr;
+ }
+
+ if (on_start_interrupted()) {
+ return;
+ } else if (r == -ENOMSG) {
+ dout(5) << "local image is primary" << dendl;
+ on_start_fail(0, "local image is primary");
+ return;
+ } else if (r == -EREMOTEIO) {
+ dout(5) << "remote image is not primary" << dendl;
+ on_start_fail(-EREMOTEIO, "remote image is not primary");
+ return;
+ } else if (r == -EEXIST) {
+ on_start_fail(r, "split-brain detected");
+ return;
+ } else if (r == -ENOLINK) {
+ m_delete_requested = true;
+ on_start_fail(0, "remote image no longer exists");
+ return;
+ } else if (r == -ERESTART) {
+ on_start_fail(r, "image in transient state, try again");
+ return;
+ } else if (r < 0) {
+ on_start_fail(r, "error bootstrapping replay");
+ return;
+ } else if (m_resync_requested) {
+ on_start_fail(0, "resync requested");
+ return;
+ }
+
+ start_replay();
+}
+
+template <typename I>
+void ImageReplayer<I>::start_replay() {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_replayer == nullptr);
+ m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher,
+ m_local_mirror_uuid,
+ m_pool_meta_cache,
+ m_replayer_listener);
+
+ auto ctx = create_context_callback<
+ ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this);
+ m_replayer->init(ctx);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_start_replay(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (on_start_interrupted()) {
+ return;
+ } else if (r < 0) {
+ std::string error_description = m_replayer->get_error_description();
+ if (r == -ENOTCONN && m_replayer->is_resync_requested()) {
+ std::unique_lock locker{m_lock};
+ m_resync_requested = true;
+ }
+
+ // shut down not required if init failed
+ m_replayer->destroy();
+ m_replayer = nullptr;
+
+ derr << "error starting replay: " << cpp_strerror(r) << dendl;
+ on_start_fail(r, error_description);
+ return;
+ }
+
+ Context *on_finish = nullptr;
+ {
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_state == STATE_STARTING);
+ m_state = STATE_REPLAYING;
+ std::swap(m_on_start_finish, on_finish);
+
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ schedule_update_mirror_image_replay_status();
+ }
+
+ update_mirror_image_status(true, boost::none);
+ if (on_replay_interrupted()) {
+ if (on_finish != nullptr) {
+ on_finish->complete(r);
+ }
+ return;
+ }
+
+ dout(10) << "start succeeded" << dendl;
+ if (on_finish != nullptr) {
+ dout(10) << "on finish complete, r=" << r << dendl;
+ on_finish->complete(r);
+ }
+}
+
+template <typename I>
+void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
+{
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
+ Context *ctx = new LambdaContext([this, r, desc](int _r) {
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_STARTING);
+ m_state = STATE_STOPPING;
+ if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
+ derr << "start failed: " << cpp_strerror(r) << dendl;
+ } else {
+ dout(10) << "start canceled" << dendl;
+ }
+ }
+
+ set_state_description(r, desc);
+ update_mirror_image_status(false, boost::none);
+ shut_down(r);
+ });
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+bool ImageReplayer<I>::on_start_interrupted() {
+ std::lock_guard locker{m_lock};
+ return on_start_interrupted(m_lock);
+}
+
+template <typename I>
+bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(m_state == STATE_STARTING);
+ if (!m_stop_requested) {
+ return false;
+ }
+
+ on_start_fail(-ECANCELED, "");
+ return true;
+}
+
+template <typename I>
+void ImageReplayer<I>::stop(Context *on_finish, bool manual, bool restart)
+{
+ dout(10) << "on_finish=" << on_finish << ", manual=" << manual
+ << ", restart=" << restart << dendl;
+
+ image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
+ bool shut_down_replay = false;
+ bool is_stopped = false;
+ {
+ std::lock_guard locker{m_lock};
+
+ if (!is_running_()) {
+ if (manual && !m_manual_stop) {
+ dout(10) << "marking manual" << dendl;
+ m_manual_stop = true;
+ }
+ if (!restart && m_restart_requested) {
+ dout(10) << "canceling restart" << dendl;
+ m_restart_requested = false;
+ }
+ if (is_stopped_()) {
+ dout(10) << "already stopped" << dendl;
+ is_stopped = true;
+ } else {
+ dout(10) << "joining in-flight stop" << dendl;
+ if (on_finish != nullptr) {
+ m_on_stop_contexts.push_back(on_finish);
+ }
+ }
+ } else {
+ if (m_state == STATE_STARTING) {
+ dout(10) << "canceling start" << dendl;
+ if (m_bootstrap_request != nullptr) {
+ bootstrap_request = m_bootstrap_request;
+ bootstrap_request->get();
+ }
+ } else {
+ dout(10) << "interrupting replay" << dendl;
+ shut_down_replay = true;
+ }
+
+ ceph_assert(m_on_stop_contexts.empty());
+ if (on_finish != nullptr) {
+ m_on_stop_contexts.push_back(on_finish);
+ }
+ m_stop_requested = true;
+ m_manual_stop = manual;
+ }
+ }
+
+ if (is_stopped) {
+ if (on_finish) {
+ on_finish->complete(-EINVAL);
+ }
+ return;
+ }
+
+ // avoid holding lock since bootstrap request will update status
+ if (bootstrap_request != nullptr) {
+ dout(10) << "canceling bootstrap" << dendl;
+ bootstrap_request->cancel();
+ bootstrap_request->put();
+ }
+
+ if (shut_down_replay) {
+ on_stop_journal_replay();
+ }
+}
+
+template <typename I>
+void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
+{
+ dout(10) << dendl;
+
+ {
+ std::lock_guard locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ // might be invoked multiple times while stopping
+ return;
+ }
+
+ m_stop_requested = true;
+ m_state = STATE_STOPPING;
+ }
+
+ cancel_update_mirror_image_replay_status();
+ set_state_description(r, desc);
+ update_mirror_image_status(true, boost::none);
+ shut_down(0);
+}
+
+template <typename I>
+void ImageReplayer<I>::restart(Context *on_finish)
+{
+ {
+ std::lock_guard locker{m_lock};
+ m_restart_requested = true;
+ }
+
+ auto ctx = new LambdaContext(
+ [this, on_finish](int r) {
+ if (r < 0) {
+ // Try start anyway.
+ }
+ start(on_finish, true, true);
+ });
+ stop(ctx, false, true);
+}
+
+template <typename I>
+void ImageReplayer<I>::flush()
+{
+ C_SaferCond ctx;
+
+ {
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ dout(10) << dendl;
+ ceph_assert(m_replayer != nullptr);
+ m_replayer->flush(&ctx);
+ }
+
+ int r = ctx.wait();
+ if (r >= 0) {
+ update_mirror_image_status(false, boost::none);
+ }
+}
+
+template <typename I>
+bool ImageReplayer<I>::on_replay_interrupted()
+{
+ bool shut_down;
+ {
+ std::lock_guard locker{m_lock};
+ shut_down = m_stop_requested;
+ }
+
+ if (shut_down) {
+ on_stop_journal_replay();
+ }
+ return shut_down;
+}
+
+template <typename I>
+void ImageReplayer<I>::print_status(Formatter *f)
+{
+ dout(10) << dendl;
+
+ std::lock_guard l{m_lock};
+
+ f->open_object_section("image_replayer");
+ f->dump_string("name", m_image_spec);
+ f->dump_string("state", to_string(m_state));
+ f->close_section();
+}
+
+template <typename I>
+void ImageReplayer<I>::schedule_update_mirror_image_replay_status() {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ dout(10) << dendl;
+
+ // periodically update the replaying status even if nothing changes
+ // so that we can adjust our performance stats
+ ceph_assert(m_update_status_task == nullptr);
+ m_update_status_task = create_context_callback<
+ ImageReplayer<I>,
+ &ImageReplayer<I>::handle_update_mirror_image_replay_status>(this);
+ m_threads->timer->add_event_after(10, m_update_status_task);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_update_mirror_image_replay_status(int r) {
+ dout(10) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
+
+ ceph_assert(m_update_status_task != nullptr);
+ m_update_status_task = nullptr;
+
+ auto ctx = new LambdaContext([this](int) {
+ update_mirror_image_status(false, boost::none);
+
+ std::unique_lock locker{m_lock};
+ std::unique_lock timer_locker{m_threads->timer_lock};
+
+ schedule_update_mirror_image_replay_status();
+ m_in_flight_op_tracker.finish_op();
+ });
+
+ m_in_flight_op_tracker.start_op();
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void ImageReplayer<I>::cancel_update_mirror_image_replay_status() {
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_update_status_task != nullptr) {
+ dout(10) << dendl;
+
+ if (m_threads->timer->cancel_event(m_update_status_task)) {
+ m_update_status_task = nullptr;
+ }
+ }
+}
+
+template <typename I>
+void ImageReplayer<I>::update_mirror_image_status(
+ bool force, const OptionalState &opt_state) {
+ dout(15) << "force=" << force << ", "
+ << "state=" << opt_state << dendl;
+
+ {
+ std::lock_guard locker{m_lock};
+ if (!force && !is_stopped_() && !is_running_()) {
+ dout(15) << "shut down in-progress: ignoring update" << dendl;
+ return;
+ }
+ }
+
+ m_in_flight_op_tracker.start_op();
+ auto ctx = new LambdaContext(
+ [this, force, opt_state](int r) {
+ set_mirror_image_status_update(force, opt_state);
+ });
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void ImageReplayer<I>::set_mirror_image_status_update(
+ bool force, const OptionalState &opt_state) {
+ dout(15) << "force=" << force << ", "
+ << "state=" << opt_state << dendl;
+
+ reregister_admin_socket_hook();
+
+ State state;
+ std::string state_desc;
+ int last_r;
+ bool stopping_replay;
+
+ auto mirror_image_status_state = boost::make_optional(
+ false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
+ image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+ state = m_state;
+ state_desc = m_state_desc;
+ mirror_image_status_state = m_mirror_image_status_state;
+ last_r = m_last_r;
+ stopping_replay = (m_replayer != nullptr);
+
+ if (m_bootstrap_request != nullptr) {
+ bootstrap_request = m_bootstrap_request;
+ bootstrap_request->get();
+ }
+ }
+
+ bool syncing = false;
+ if (bootstrap_request != nullptr) {
+ syncing = bootstrap_request->is_syncing();
+ bootstrap_request->put();
+ bootstrap_request = nullptr;
+ }
+
+ if (opt_state) {
+ state = *opt_state;
+ }
+
+ cls::rbd::MirrorImageSiteStatus status;
+ status.up = true;
+ switch (state) {
+ case STATE_STARTING:
+ if (syncing) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
+ status.description = state_desc.empty() ? "syncing" : state_desc;
+ mirror_image_status_state = status.state;
+ } else {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
+ status.description = "starting replay";
+ }
+ break;
+ case STATE_REPLAYING:
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
+ {
+ std::string desc;
+ auto on_req_finish = new LambdaContext(
+ [this, force](int r) {
+ dout(15) << "replay status ready: r=" << r << dendl;
+ if (r >= 0) {
+ set_mirror_image_status_update(force, boost::none);
+ } else if (r == -EAGAIN) {
+ m_in_flight_op_tracker.finish_op();
+ }
+ });
+
+ ceph_assert(m_replayer != nullptr);
+ if (!m_replayer->get_replay_status(&desc, on_req_finish)) {
+ dout(15) << "waiting for replay status" << dendl;
+ return;
+ }
+
+ status.description = "replaying, " + desc;
+ mirror_image_status_state = boost::make_optional(
+ false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
+ }
+ break;
+ case STATE_STOPPING:
+ if (stopping_replay) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
+ status.description = state_desc.empty() ? "stopping replay" : state_desc;
+ break;
+ }
+ // FALLTHROUGH
+ case STATE_STOPPED:
+ if (last_r == -EREMOTEIO) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
+ status.description = state_desc;
+ mirror_image_status_state = status.state;
+ } else if (last_r < 0 && last_r != -ECANCELED) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
+ status.description = state_desc;
+ mirror_image_status_state = status.state;
+ } else {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
+ status.description = state_desc.empty() ? "stopped" : state_desc;
+ mirror_image_status_state = boost::none;
+ }
+ break;
+ default:
+ ceph_assert(!"invalid state");
+ }
+
+ {
+ std::lock_guard locker{m_lock};
+ m_mirror_image_status_state = mirror_image_status_state;
+ }
+
+ // prevent the status from ping-ponging when failed replays are restarted
+ if (mirror_image_status_state &&
+ *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
+ status.state = *mirror_image_status_state;
+ }
+
+ dout(15) << "status=" << status << dendl;
+ m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
+ force);
+ if (m_remote_image_peer.mirror_status_updater != nullptr) {
+ m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
+ m_global_image_id, status, force);
+ }
+
+ m_in_flight_op_tracker.finish_op();
+}
+
+template <typename I>
+void ImageReplayer<I>::shut_down(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_STOPPING);
+ }
+
+ if (!m_in_flight_op_tracker.empty()) {
+ dout(15) << "waiting for in-flight operations to complete" << dendl;
+ m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
+ shut_down(r);
+ }));
+ return;
+ }
+
+ // chain the shut down sequence (reverse order)
+ Context *ctx = new LambdaContext(
+ [this, r](int _r) {
+ update_mirror_image_status(true, STATE_STOPPED);
+ handle_shut_down(r);
+ });
+
+ // destruct the state builder
+ if (m_state_builder != nullptr) {
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_state_builder->close(ctx);
+ });
+ }
+
+ // close the replayer
+ if (m_replayer != nullptr) {
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_replayer->destroy();
+ m_replayer = nullptr;
+ ctx->complete(0);
+ });
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_replayer->shut_down(ctx);
+ });
+ }
+
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_shut_down(int r) {
+ bool resync_requested = false;
+ bool delete_requested = false;
+ bool unregister_asok_hook = false;
+ {
+ std::lock_guard locker{m_lock};
+
+ if (m_delete_requested && m_state_builder != nullptr &&
+ !m_state_builder->local_image_id.empty()) {
+ ceph_assert(m_state_builder->remote_image_id.empty());
+ dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
+ unregister_asok_hook = true;
+ std::swap(delete_requested, m_delete_requested);
+ m_delete_in_progress = true;
+ }
+
+ std::swap(resync_requested, m_resync_requested);
+ if (!delete_requested && !resync_requested && m_last_r == -ENOENT &&
+ ((m_state_builder == nullptr) ||
+ (m_state_builder->local_image_id.empty() &&
+ m_state_builder->remote_image_id.empty()))) {
+ dout(0) << "mirror image no longer exists" << dendl;
+ unregister_asok_hook = true;
+ m_finished = true;
+ }
+ }
+
+ if (unregister_asok_hook) {
+ unregister_admin_socket_hook();
+ }
+
+ if (delete_requested || resync_requested) {
+ dout(5) << "moving image to trash" << dendl;
+ auto ctx = new LambdaContext([this, r](int) {
+ handle_shut_down(r);
+ });
+ ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
+ resync_requested, m_threads->work_queue, ctx);
+ return;
+ }
+
+ if (!m_in_flight_op_tracker.empty()) {
+ dout(15) << "waiting for in-flight operations to complete" << dendl;
+ m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
+ handle_shut_down(r);
+ }));
+ return;
+ }
+
+ if (!m_status_removed) {
+ auto ctx = new LambdaContext([this, r](int) {
+ m_status_removed = true;
+ handle_shut_down(r);
+ });
+ remove_image_status(m_delete_in_progress, ctx);
+ return;
+ }
+
+ if (m_state_builder != nullptr) {
+ m_state_builder->destroy();
+ m_state_builder = nullptr;
+ }
+
+ dout(10) << "stop complete" << dendl;
+ Context *on_start = nullptr;
+ Contexts on_stop_contexts;
+ {
+ std::lock_guard locker{m_lock};
+ std::swap(on_start, m_on_start_finish);
+ on_stop_contexts = std::move(m_on_stop_contexts);
+ m_stop_requested = false;
+ ceph_assert(m_state == STATE_STOPPING);
+ m_state = STATE_STOPPED;
+ }
+
+ if (on_start != nullptr) {
+ dout(10) << "on start finish complete, r=" << r << dendl;
+ on_start->complete(r);
+ r = 0;
+ }
+ for (auto ctx : on_stop_contexts) {
+ dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl;
+ ctx->complete(r);
+ }
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_replayer_notification() {
+ dout(10) << dendl;
+
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ // might be attempting to shut down
+ return;
+ }
+
+ {
+ // detect a rename of the local image
+ ceph_assert(m_state_builder != nullptr &&
+ m_state_builder->local_image_ctx != nullptr);
+ std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock};
+ if (m_local_image_name != m_state_builder->local_image_ctx->name) {
+ // will re-register with new name after next status update
+ dout(10) << "image renamed" << dendl;
+ m_local_image_name = m_state_builder->local_image_ctx->name;
+ }
+ }
+
+ // replayer cannot be shut down while notification is in-flight
+ ceph_assert(m_replayer != nullptr);
+ locker.unlock();
+
+ if (m_replayer->is_resync_requested()) {
+ dout(10) << "resync requested" << dendl;
+ m_resync_requested = true;
+ on_stop_journal_replay(0, "resync requested");
+ return;
+ }
+
+ if (!m_replayer->is_replaying()) {
+ auto error_code = m_replayer->get_error_code();
+ auto error_description = m_replayer->get_error_description();
+ dout(10) << "replay interrupted: "
+ << "r=" << error_code << ", "
+ << "error=" << error_description << dendl;
+ on_stop_journal_replay(error_code, error_description);
+ return;
+ }
+
+ update_mirror_image_status(false, {});
+}
+
+template <typename I>
+std::string ImageReplayer<I>::to_string(const State state) {
+ switch (state) {
+ case ImageReplayer<I>::STATE_STARTING:
+ return "Starting";
+ case ImageReplayer<I>::STATE_REPLAYING:
+ return "Replaying";
+ case ImageReplayer<I>::STATE_STOPPING:
+ return "Stopping";
+ case ImageReplayer<I>::STATE_STOPPED:
+ return "Stopped";
+ default:
+ break;
+ }
+ return "Unknown(" + stringify(state) + ")";
+}
+
+template <typename I>
+void ImageReplayer<I>::register_admin_socket_hook() {
+ ImageReplayerAdminSocketHook<I> *asok_hook;
+ {
+ std::lock_guard locker{m_lock};
+ if (m_asok_hook != nullptr) {
+ return;
+ }
+
+ dout(15) << "registered asok hook: " << m_image_spec << dendl;
+ asok_hook = new ImageReplayerAdminSocketHook<I>(
+ g_ceph_context, m_image_spec, this);
+ int r = asok_hook->register_commands();
+ if (r == 0) {
+ m_asok_hook = asok_hook;
+ return;
+ }
+ derr << "error registering admin socket commands" << dendl;
+ }
+ delete asok_hook;
+}
+
+template <typename I>
+void ImageReplayer<I>::unregister_admin_socket_hook() {
+ dout(15) << dendl;
+
+ AdminSocketHook *asok_hook = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+ std::swap(asok_hook, m_asok_hook);
+ }
+ delete asok_hook;
+}
+
+template <typename I>
+void ImageReplayer<I>::reregister_admin_socket_hook() {
+ std::unique_lock locker{m_lock};
+ if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) {
+ m_local_image_name = m_bootstrap_request->get_local_image_name();
+ }
+
+ auto image_spec = image_replayer::util::compute_image_spec(
+ m_local_io_ctx, m_local_image_name);
+ if (m_asok_hook != nullptr && m_image_spec == image_spec) {
+ return;
+ }
+
+ dout(15) << "old_image_spec=" << m_image_spec << ", "
+ << "new_image_spec=" << image_spec << dendl;
+ m_image_spec = image_spec;
+
+ if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
+ // no need to re-register if stopping
+ return;
+ }
+ locker.unlock();
+
+ unregister_admin_socket_hook();
+ register_admin_socket_hook();
+}
+
+template <typename I>
+void ImageReplayer<I>::remove_image_status(bool force, Context *on_finish)
+{
+ auto ctx = new LambdaContext([this, force, on_finish](int) {
+ remove_image_status_remote(force, on_finish);
+ });
+
+ if (m_local_status_updater->exists(m_global_image_id)) {
+ dout(15) << "removing local mirror image status" << dendl;
+ if (force) {
+ m_local_status_updater->remove_mirror_image_status(
+ m_global_image_id, true, ctx);
+ } else {
+ m_local_status_updater->remove_refresh_mirror_image_status(
+ m_global_image_id, ctx);
+ }
+ return;
+ }
+
+ ctx->complete(0);
+}
+
+template <typename I>
+void ImageReplayer<I>::remove_image_status_remote(bool force, Context *on_finish)
+{
+ if (m_remote_image_peer.mirror_status_updater != nullptr &&
+ m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
+ dout(15) << "removing remote mirror image status" << dendl;
+ if (force) {
+ m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
+ m_global_image_id, true, on_finish);
+ } else {
+ m_remote_image_peer.mirror_status_updater->remove_refresh_mirror_image_status(
+ m_global_image_id, on_finish);
+ }
+ return;
+ }
+ if (on_finish) {
+ on_finish->complete(0);
+ }
+}
+
+template <typename I>
+std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
+{
+ os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
+ << "/" << replayer.get_global_image_id() << "]";
+ return os;
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;