summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd_mirror/PoolReplayer.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/tools/rbd_mirror/PoolReplayer.cc
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/rbd_mirror/PoolReplayer.cc')
-rw-r--r--src/tools/rbd_mirror/PoolReplayer.cc1133
1 files changed, 1133 insertions, 0 deletions
diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc
new file mode 100644
index 00000000..35d32eb5
--- /dev/null
+++ b/src/tools/rbd_mirror/PoolReplayer.cc
@@ -0,0 +1,1133 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "PoolReplayer.h"
+#include <boost/bind.hpp>
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
+#include "common/ceph_argparse.h"
+#include "common/code_environment.h"
+#include "common/common_init.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "include/stringify.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "global/global_context.h"
+#include "librbd/internal.h"
+#include "librbd/Utils.h"
+#include "librbd/Watcher.h"
+#include "librbd/api/Config.h"
+#include "librbd/api/Mirror.h"
+#include "ImageMap.h"
+#include "InstanceReplayer.h"
+#include "InstanceWatcher.h"
+#include "LeaderWatcher.h"
+#include "ServiceDaemon.h"
+#include "Threads.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
+ << this << " " << __func__ << ": "
+
+using std::chrono::seconds;
+using std::map;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using librbd::cls_client::dir_get_name;
+using librbd::util::create_async_context_callback;
+
+namespace rbd {
+namespace mirror {
+
+using ::operator<<;
+
+namespace {
+
+const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id");
+const std::string SERVICE_DAEMON_LEADER_KEY("leader");
+const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
+const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
+
+const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
+ {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
+
+template <typename I>
+class PoolReplayerAdminSocketCommand {
+public:
+ PoolReplayerAdminSocketCommand(PoolReplayer<I> *pool_replayer)
+ : pool_replayer(pool_replayer) {
+ }
+ virtual ~PoolReplayerAdminSocketCommand() {}
+ virtual bool call(Formatter *f, stringstream *ss) = 0;
+protected:
+ PoolReplayer<I> *pool_replayer;
+};
+
+template <typename I>
+class StatusCommand : public PoolReplayerAdminSocketCommand<I> {
+public:
+ explicit StatusCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ this->pool_replayer->print_status(f, ss);
+ return true;
+ }
+};
+
+template <typename I>
+class StartCommand : public PoolReplayerAdminSocketCommand<I> {
+public:
+ explicit StartCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ this->pool_replayer->start();
+ return true;
+ }
+};
+
+template <typename I>
+class StopCommand : public PoolReplayerAdminSocketCommand<I> {
+public:
+ explicit StopCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ this->pool_replayer->stop(true);
+ return true;
+ }
+};
+
+template <typename I>
+class RestartCommand : public PoolReplayerAdminSocketCommand<I> {
+public:
+ explicit RestartCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ this->pool_replayer->restart();
+ return true;
+ }
+};
+
+template <typename I>
+class FlushCommand : public PoolReplayerAdminSocketCommand<I> {
+public:
+ explicit FlushCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ this->pool_replayer->flush();
+ return true;
+ }
+};
+
+template <typename I>
+class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand<I> {
+public:
+ explicit LeaderReleaseCommand(PoolReplayer<I> *pool_replayer)
+ : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ this->pool_replayer->release_leader();
+ return true;
+ }
+};
+
+template <typename I>
+class PoolReplayerAdminSocketHook : public AdminSocketHook {
+public:
+ PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
+ PoolReplayer<I> *pool_replayer)
+ : admin_socket(cct->get_admin_socket()) {
+ std::string command;
+ int r;
+
+ command = "rbd mirror status " + name;
+ r = admin_socket->register_command(command, command, this,
+ "get status for rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StatusCommand<I>(pool_replayer);
+ }
+
+ command = "rbd mirror start " + name;
+ r = admin_socket->register_command(command, command, this,
+ "start rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StartCommand<I>(pool_replayer);
+ }
+
+ command = "rbd mirror stop " + name;
+ r = admin_socket->register_command(command, command, this,
+ "stop rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StopCommand<I>(pool_replayer);
+ }
+
+ command = "rbd mirror restart " + name;
+ r = admin_socket->register_command(command, command, this,
+ "restart rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new RestartCommand<I>(pool_replayer);
+ }
+
+ command = "rbd mirror flush " + name;
+ r = admin_socket->register_command(command, command, this,
+ "flush rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new FlushCommand<I>(pool_replayer);
+ }
+
+ command = "rbd mirror leader release " + name;
+ r = admin_socket->register_command(command, command, this,
+ "release rbd mirror leader " + name);
+ if (r == 0) {
+ commands[command] = new LeaderReleaseCommand<I>(pool_replayer);
+ }
+ }
+
+ ~PoolReplayerAdminSocketHook() override {
+ for (auto i = commands.begin(); i != commands.end(); ++i) {
+ (void)admin_socket->unregister_command(i->first);
+ delete i->second;
+ }
+ }
+
+ bool call(std::string_view command, const cmdmap_t& cmdmap,
+ std::string_view format, bufferlist& out) override {
+ auto i = commands.find(command);
+ ceph_assert(i != commands.end());
+ Formatter *f = Formatter::create(format);
+ stringstream ss;
+ bool r = i->second->call(f, &ss);
+ delete f;
+ out.append(ss);
+ return r;
+ }
+
+private:
+ typedef std::map<std::string, PoolReplayerAdminSocketCommand<I>*,
+ std::less<>> Commands;
+
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
+} // anonymous namespace
+
+template <typename I>
+PoolReplayer<I>::PoolReplayer(Threads<I> *threads,
+ ServiceDaemon<I>* service_daemon,
+ int64_t local_pool_id, const PeerSpec &peer,
+ const std::vector<const char*> &args) :
+ m_threads(threads),
+ m_service_daemon(service_daemon),
+ m_local_pool_id(local_pool_id),
+ m_peer(peer),
+ m_args(args),
+ m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
+ m_local_pool_watcher_listener(this, true),
+ m_remote_pool_watcher_listener(this, false),
+ m_image_map_listener(this),
+ m_pool_replayer_thread(this),
+ m_leader_listener(this)
+{
+}
+
+template <typename I>
+PoolReplayer<I>::~PoolReplayer()
+{
+ delete m_asok_hook;
+ shut_down();
+}
+
+template <typename I>
+bool PoolReplayer<I>::is_blacklisted() const {
+ Mutex::Locker locker(m_lock);
+ return m_blacklisted;
+}
+
+template <typename I>
+bool PoolReplayer<I>::is_leader() const {
+ Mutex::Locker locker(m_lock);
+ return m_leader_watcher && m_leader_watcher->is_leader();
+}
+
+template <typename I>
+bool PoolReplayer<I>::is_running() const {
+ return m_pool_replayer_thread.is_started();
+}
+
+template <typename I>
+void PoolReplayer<I>::init()
+{
+ Mutex::Locker l(m_lock);
+
+ ceph_assert(!m_pool_replayer_thread.is_started());
+
+ // reset state
+ m_stopping = false;
+ m_blacklisted = false;
+
+ dout(10) << "replaying for " << m_peer << dendl;
+ int r = init_rados(g_ceph_context->_conf->cluster,
+ g_ceph_context->_conf->name.to_str(),
+ "", "", "local cluster", &m_local_rados, false);
+ if (r < 0) {
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to connect to local cluster");
+ return;
+ }
+
+ r = init_rados(m_peer.cluster_name, m_peer.client_name,
+ m_peer.mon_host, m_peer.key,
+ std::string("remote peer ") + stringify(m_peer),
+ &m_remote_rados, true);
+ if (r < 0) {
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to connect to remote cluster");
+ return;
+ }
+
+ r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
+ if (r < 0) {
+ derr << "error accessing local pool " << m_local_pool_id << ": "
+ << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+ librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf);
+
+ std::string local_mirror_uuid;
+ r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
+ &local_mirror_uuid);
+ if (r < 0) {
+ derr << "failed to retrieve local mirror uuid from pool "
+ << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to query local mirror uuid");
+ return;
+ }
+
+ r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
+ m_remote_io_ctx);
+ if (r < 0) {
+ derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
+ << ": " << cpp_strerror(r) << dendl;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
+ "unable to access remote pool");
+ return;
+ }
+
+ dout(10) << "connected to " << m_peer << dendl;
+
+ m_instance_replayer.reset(InstanceReplayer<I>::create(
+ m_threads, m_service_daemon, m_local_rados, local_mirror_uuid,
+ m_local_pool_id));
+ m_instance_replayer->init();
+ m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
+
+ m_instance_watcher.reset(InstanceWatcher<I>::create(
+ m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
+ r = m_instance_watcher->init();
+ if (r < 0) {
+ derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to initialize instance messenger object");
+ return;
+ }
+ m_service_daemon->add_or_update_attribute(
+ m_local_pool_id, SERVICE_DAEMON_INSTANCE_ID_KEY,
+ m_instance_watcher->get_instance_id());
+
+ m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx,
+ &m_leader_listener));
+ r = m_leader_watcher->init();
+ if (r < 0) {
+ derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to initialize leader messenger object");
+ return;
+ }
+
+ if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
+ m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
+ m_callout_id = service_daemon::CALLOUT_ID_NONE;
+ }
+
+ m_pool_replayer_thread.create("pool replayer");
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down() {
+ m_stopping = true;
+ {
+ Mutex::Locker l(m_lock);
+ m_cond.Signal();
+ }
+ if (m_pool_replayer_thread.is_started()) {
+ m_pool_replayer_thread.join();
+ }
+ if (m_leader_watcher) {
+ m_leader_watcher->shut_down();
+ }
+ if (m_instance_watcher) {
+ m_instance_watcher->shut_down();
+ }
+ if (m_instance_replayer) {
+ m_instance_replayer->shut_down();
+ }
+
+ m_leader_watcher.reset();
+ m_instance_watcher.reset();
+ m_instance_replayer.reset();
+
+ ceph_assert(!m_image_map);
+ ceph_assert(!m_image_deleter);
+ ceph_assert(!m_local_pool_watcher);
+ ceph_assert(!m_remote_pool_watcher);
+ m_local_rados.reset();
+ m_remote_rados.reset();
+}
+
+template <typename I>
+int PoolReplayer<I>::init_rados(const std::string &cluster_name,
+ const std::string &client_name,
+ const std::string &mon_host,
+ const std::string &key,
+ const std::string &description,
+ RadosRef *rados_ref,
+ bool strip_cluster_overrides) {
+ // NOTE: manually bootstrap a CephContext here instead of via
+ // the librados API to avoid mixing global singletons between
+ // the librados shared library and the daemon
+ // TODO: eliminate intermingling of global singletons within Ceph APIs
+ CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
+ if (client_name.empty() || !iparams.name.from_str(client_name)) {
+ derr << "error initializing cluster handle for " << description << dendl;
+ return -EINVAL;
+ }
+
+ CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+ cct->_conf->cluster = cluster_name;
+
+ // librados::Rados::conf_read_file
+ int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
+ if (r < 0 && r != -ENOENT) {
+ // do not treat this as fatal, it might still be able to connect
+ derr << "could not read ceph conf for " << description << ": "
+ << cpp_strerror(r) << dendl;
+ }
+
+ // preserve cluster-specific config settings before applying environment/cli
+ // overrides
+ std::map<std::string, std::string> config_values;
+ if (strip_cluster_overrides) {
+ // remote peer connections shouldn't apply cluster-specific
+ // configuration settings
+ for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
+ config_values[key] = cct->_conf.get_val<std::string>(key);
+ }
+ }
+
+ cct->_conf.parse_env(cct->get_module_type());
+
+ // librados::Rados::conf_parse_env
+ std::vector<const char*> args;
+ r = cct->_conf.parse_argv(args);
+ if (r < 0) {
+ derr << "could not parse environment for " << description << ":"
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ cct->_conf.parse_env(cct->get_module_type());
+
+ if (!m_args.empty()) {
+ // librados::Rados::conf_parse_argv
+ args = m_args;
+ r = cct->_conf.parse_argv(args);
+ if (r < 0) {
+ derr << "could not parse command line args for " << description << ": "
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+
+ if (strip_cluster_overrides) {
+ // remote peer connections shouldn't apply cluster-specific
+ // configuration settings
+ for (auto& pair : config_values) {
+ auto value = cct->_conf.get_val<std::string>(pair.first);
+ if (pair.second != value) {
+ dout(0) << "reverting global config option override: "
+ << pair.first << ": " << value << " -> " << pair.second
+ << dendl;
+ cct->_conf.set_val_or_die(pair.first, pair.second);
+ }
+ }
+ }
+
+ if (!g_ceph_context->_conf->admin_socket.empty()) {
+ cct->_conf.set_val_or_die("admin_socket",
+ "$run_dir/$name.$pid.$cluster.$cctid.asok");
+ }
+
+ if (!mon_host.empty()) {
+ r = cct->_conf.set_val("mon_host", mon_host);
+ if (r < 0) {
+ derr << "failed to set mon_host config for " << description << ": "
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+
+ if (!key.empty()) {
+ r = cct->_conf.set_val("key", key);
+ if (r < 0) {
+ derr << "failed to set key config for " << description << ": "
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+
+ // disable unnecessary librbd cache
+ cct->_conf.set_val_or_die("rbd_cache", "false");
+ cct->_conf.apply_changes(nullptr);
+ cct->_conf.complain_about_parse_errors(cct);
+
+ rados_ref->reset(new librados::Rados());
+
+ r = (*rados_ref)->init_with_context(cct);
+ ceph_assert(r == 0);
+ cct->put();
+
+ r = (*rados_ref)->connect();
+ if (r < 0) {
+ derr << "error connecting to " << description << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+template <typename I>
+void PoolReplayer<I>::run()
+{
+ dout(20) << "enter" << dendl;
+
+ while (!m_stopping) {
+ std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
+ m_peer.cluster_name;
+ if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
+ m_asok_hook_name = asok_hook_name;
+ delete m_asok_hook;
+
+ m_asok_hook = new PoolReplayerAdminSocketHook<I>(g_ceph_context,
+ m_asok_hook_name, this);
+ }
+
+ Mutex::Locker locker(m_lock);
+ if (m_leader_watcher->is_blacklisted() ||
+ m_instance_replayer->is_blacklisted() ||
+ (m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
+ (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
+ m_blacklisted = true;
+ m_stopping = true;
+ break;
+ }
+
+ if (!m_stopping) {
+ m_cond.WaitInterval(m_lock, utime_t(1, 0));
+ }
+ }
+
+ m_instance_replayer->stop();
+}
+
+template <typename I>
+void PoolReplayer<I>::reopen_logs()
+{
+ Mutex::Locker l(m_lock);
+
+ if (m_local_rados) {
+ reinterpret_cast<CephContext *>(m_local_rados->cct())->reopen_logs();
+ }
+ if (m_remote_rados) {
+ reinterpret_cast<CephContext *>(m_remote_rados->cct())->reopen_logs();
+ }
+}
+
+template <typename I>
+void PoolReplayer<I>::print_status(Formatter *f, stringstream *ss)
+{
+ dout(20) << "enter" << dendl;
+
+ if (!f) {
+ return;
+ }
+
+ Mutex::Locker l(m_lock);
+
+ f->open_object_section("pool_replayer_status");
+ f->dump_stream("peer") << m_peer;
+ if (m_local_io_ctx.is_valid()) {
+ f->dump_string("pool", m_local_io_ctx.get_pool_name());
+ f->dump_stream("instance_id") << m_instance_watcher->get_instance_id();
+ }
+
+ std::string state("running");
+ if (m_manual_stop) {
+ state = "stopped (manual)";
+ } else if (m_stopping) {
+ state = "stopped";
+ }
+ f->dump_string("state", state);
+
+ std::string leader_instance_id;
+ m_leader_watcher->get_leader_instance_id(&leader_instance_id);
+ f->dump_string("leader_instance_id", leader_instance_id);
+
+ bool leader = m_leader_watcher->is_leader();
+ f->dump_bool("leader", leader);
+ if (leader) {
+ std::vector<std::string> instance_ids;
+ m_leader_watcher->list_instances(&instance_ids);
+ f->open_array_section("instances");
+ for (auto instance_id : instance_ids) {
+ f->dump_string("instance_id", instance_id);
+ }
+ f->close_section();
+ }
+
+ f->dump_string("local_cluster_admin_socket",
+ reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf.
+ get_val<std::string>("admin_socket"));
+ f->dump_string("remote_cluster_admin_socket",
+ reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf.
+ get_val<std::string>("admin_socket"));
+
+ f->open_object_section("sync_throttler");
+ m_instance_watcher->print_sync_status(f, ss);
+ f->close_section();
+
+ m_instance_replayer->print_status(f, ss);
+
+ if (m_image_deleter) {
+ f->open_object_section("image_deleter");
+ m_image_deleter->print_status(f, ss);
+ f->close_section();
+ }
+
+ f->close_section();
+ f->flush(*ss);
+}
+
+template <typename I>
+void PoolReplayer<I>::start()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping) {
+ return;
+ }
+
+ m_manual_stop = false;
+
+ if (m_instance_replayer) {
+ m_instance_replayer->start();
+ }
+}
+
+template <typename I>
+void PoolReplayer<I>::stop(bool manual)
+{
+ dout(20) << "enter: manual=" << manual << dendl;
+
+ Mutex::Locker l(m_lock);
+ if (!manual) {
+ m_stopping = true;
+ m_cond.Signal();
+ return;
+ } else if (m_stopping) {
+ return;
+ }
+
+ m_manual_stop = true;
+
+ if (m_instance_replayer) {
+ m_instance_replayer->stop();
+ }
+}
+
+template <typename I>
+void PoolReplayer<I>::restart()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping) {
+ return;
+ }
+
+ if (m_instance_replayer) {
+ m_instance_replayer->restart();
+ }
+}
+
+template <typename I>
+void PoolReplayer<I>::flush()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping || m_manual_stop) {
+ return;
+ }
+
+ if (m_instance_replayer) {
+ m_instance_replayer->flush();
+ }
+}
+
+template <typename I>
+void PoolReplayer<I>::release_leader()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping || !m_leader_watcher) {
+ return;
+ }
+
+ m_leader_watcher->release_leader();
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_update(const std::string &mirror_uuid,
+ ImageIds &&added_image_ids,
+ ImageIds &&removed_image_ids) {
+ if (m_stopping) {
+ return;
+ }
+
+ dout(10) << "mirror_uuid=" << mirror_uuid << ", "
+ << "added_count=" << added_image_ids.size() << ", "
+ << "removed_count=" << removed_image_ids.size() << dendl;
+ Mutex::Locker locker(m_lock);
+ if (!m_leader_watcher->is_leader()) {
+ return;
+ }
+
+ m_service_daemon->add_or_update_attribute(
+ m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
+ m_local_pool_watcher->get_image_count());
+ if (m_remote_pool_watcher) {
+ m_service_daemon->add_or_update_attribute(
+ m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
+ m_remote_pool_watcher->get_image_count());
+ }
+
+ std::set<std::string> added_global_image_ids;
+ for (auto& image_id : added_image_ids) {
+ added_global_image_ids.insert(image_id.global_id);
+ }
+
+ std::set<std::string> removed_global_image_ids;
+ for (auto& image_id : removed_image_ids) {
+ removed_global_image_ids.insert(image_id.global_id);
+ }
+
+ m_image_map->update_images(mirror_uuid,
+ std::move(added_global_image_ids),
+ std::move(removed_global_image_ids));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) {
+ dout(10) << dendl;
+
+ m_service_daemon->add_or_update_attribute(m_local_pool_id,
+ SERVICE_DAEMON_LEADER_KEY, true);
+ m_instance_watcher->handle_acquire_leader();
+ init_image_map(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) {
+ dout(10) << dendl;
+
+ m_service_daemon->remove_attribute(m_local_pool_id,
+ SERVICE_DAEMON_LEADER_KEY);
+ m_instance_watcher->handle_release_leader();
+ shut_down_image_deleter(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::init_image_map(Context *on_finish) {
+ dout(5) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_image_map);
+ m_image_map.reset(ImageMap<I>::create(m_local_io_ctx, m_threads,
+ m_instance_watcher->get_instance_id(),
+ m_image_map_listener));
+
+ auto ctx = new FunctionContext([this, on_finish](int r) {
+ handle_init_image_map(r, on_finish);
+ });
+ m_image_map->init(create_async_context_callback(
+ m_threads->work_queue, ctx));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_image_map(int r, Context *on_finish) {
+ dout(5) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to init image map: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_image_map(on_finish);
+ return;
+ }
+
+ init_local_pool_watcher(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::init_local_pool_watcher(Context *on_finish) {
+ dout(10) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_local_pool_watcher);
+ m_local_pool_watcher.reset(PoolWatcher<I>::create(
+ m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
+
+ // ensure the initial set of local images is up-to-date
+ // after acquiring the leader role
+ auto ctx = new FunctionContext([this, on_finish](int r) {
+ handle_init_local_pool_watcher(r, on_finish);
+ });
+ m_local_pool_watcher->init(create_async_context_callback(
+ m_threads->work_queue, ctx));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_local_pool_watcher(
+ int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_pool_watchers(on_finish);
+ return;
+ }
+
+ init_remote_pool_watcher(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::init_remote_pool_watcher(Context *on_finish) {
+ dout(10) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_remote_pool_watcher);
+ m_remote_pool_watcher.reset(PoolWatcher<I>::create(
+ m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
+
+ auto ctx = new FunctionContext([this, on_finish](int r) {
+ handle_init_remote_pool_watcher(r, on_finish);
+ });
+ m_remote_pool_watcher->init(create_async_context_callback(
+ m_threads->work_queue, ctx));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_remote_pool_watcher(
+ int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
+ if (r == -ENOENT) {
+ // Technically nothing to do since the other side doesn't
+ // have mirroring enabled. Eventually the remote pool watcher will
+ // detect images (if mirroring is enabled), so no point propagating
+ // an error which would just busy-spin the state machines.
+ dout(0) << "remote peer does not have mirroring configured" << dendl;
+ } else if (r < 0) {
+ derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_pool_watchers(on_finish);
+ return;
+ }
+
+ init_image_deleter(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::init_image_deleter(Context *on_finish) {
+ dout(10) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_image_deleter);
+
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ handle_init_image_deleter(r, on_finish);
+ });
+ m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
+ m_service_daemon));
+ m_image_deleter->init(create_async_context_callback(
+ m_threads->work_queue, on_finish));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_image_deleter(int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_image_deleter(on_finish);
+ return;
+ }
+
+ on_finish->complete(0);
+
+ Mutex::Locker locker(m_lock);
+ m_cond.Signal();
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down_image_deleter(Context* on_finish) {
+ dout(10) << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_image_deleter) {
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_image_deleter(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ m_image_deleter->shut_down(ctx);
+ return;
+ }
+ }
+ shut_down_pool_watchers(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_image_deleter(
+ int r, Context* on_finish) {
+ dout(10) << "r=" << r << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_image_deleter);
+ m_image_deleter.reset();
+ }
+
+ shut_down_pool_watchers(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down_pool_watchers(Context *on_finish) {
+ dout(10) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_local_pool_watcher) {
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_pool_watchers(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ auto gather_ctx = new C_Gather(g_ceph_context, ctx);
+ m_local_pool_watcher->shut_down(gather_ctx->new_sub());
+ if (m_remote_pool_watcher) {
+ m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
+ }
+ gather_ctx->activate();
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_pool_watchers(
+ int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_local_pool_watcher);
+ m_local_pool_watcher.reset();
+
+ if (m_remote_pool_watcher) {
+ m_remote_pool_watcher.reset();
+ }
+ }
+ wait_for_update_ops(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::wait_for_update_ops(Context *on_finish) {
+ dout(10) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_wait_for_update_ops(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ m_update_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_wait_for_update_ops(int r, Context *on_finish) {
+ dout(10) << "r=" << r << dendl;
+ ceph_assert(r == 0);
+
+ shut_down_image_map(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down_image_map(Context *on_finish) {
+ dout(5) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_image_map) {
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_image_map(r, on_finish);
+ });
+ m_image_map->shut_down(create_async_context_callback(
+ m_threads->work_queue, on_finish));
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) {
+ dout(5) << "r=" << r << dendl;
+ if (r < 0 && r != -EBLACKLISTED) {
+ derr << "failed to shut down image map: " << cpp_strerror(r) << dendl;
+ }
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_image_map);
+ m_image_map.reset();
+
+ m_instance_replayer->release_all(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_update_leader(
+ const std::string &leader_instance_id) {
+ dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
+
+ m_instance_watcher->handle_update_leader(leader_instance_id);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_acquire_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_image_acquire(instance_id, global_image_id,
+ on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_release_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_image_release(instance_id, global_image_id,
+ on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
+ const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ ceph_assert(!mirror_uuid.empty());
+ dout(5) << "mirror_uuid=" << mirror_uuid << ", "
+ << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+
+ m_instance_watcher->notify_peer_image_removed(instance_id, global_image_id,
+ mirror_uuid, on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_instances_added(const InstanceIds &instance_ids) {
+ dout(5) << "instance_ids=" << instance_ids << dendl;
+ Mutex::Locker locker(m_lock);
+ if (!m_leader_watcher->is_leader()) {
+ return;
+ }
+
+ ceph_assert(m_image_map);
+ m_image_map->update_instances_added(instance_ids);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_instances_removed(
+ const InstanceIds &instance_ids) {
+ dout(5) << "instance_ids=" << instance_ids << dendl;
+ Mutex::Locker locker(m_lock);
+ if (!m_leader_watcher->is_leader()) {
+ return;
+ }
+
+ ceph_assert(m_image_map);
+ m_image_map->update_instances_removed(instance_ids);
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::PoolReplayer<librbd::ImageCtx>;