// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "PoolReplayer.h" #include "common/Cond.h" #include "common/Formatter.h" #include "common/admin_socket.h" #include "common/ceph_argparse.h" #include "common/code_environment.h" #include "common/common_init.h" #include "common/debug.h" #include "common/errno.h" #include "cls/rbd/cls_rbd_client.h" #include "global/global_context.h" #include "librbd/api/Config.h" #include "librbd/api/Namespace.h" #include "PoolMetaCache.h" #include "RemotePoolPoller.h" #include "ServiceDaemon.h" #include "Threads.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror #undef dout_prefix #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \ << this << " " << __func__ << ": " namespace rbd { namespace mirror { using ::operator<<; namespace { const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id"); const std::string SERVICE_DAEMON_LEADER_KEY("leader"); const std::vector UNIQUE_PEER_CONFIG_KEYS { {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}}; template class PoolReplayerAdminSocketCommand { public: PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer) : pool_replayer(pool_replayer) { } virtual ~PoolReplayerAdminSocketCommand() {} virtual int call(Formatter *f) = 0; protected: PoolReplayer *pool_replayer; }; template class StatusCommand : public PoolReplayerAdminSocketCommand { public: explicit StatusCommand(PoolReplayer *pool_replayer) : PoolReplayerAdminSocketCommand(pool_replayer) { } int call(Formatter *f) override { this->pool_replayer->print_status(f); return 0; } }; template class StartCommand : public PoolReplayerAdminSocketCommand { public: explicit StartCommand(PoolReplayer *pool_replayer) : PoolReplayerAdminSocketCommand(pool_replayer) { } int call(Formatter *f) override { this->pool_replayer->start(); return 0; } }; template class StopCommand : public PoolReplayerAdminSocketCommand { public: explicit StopCommand(PoolReplayer *pool_replayer) : PoolReplayerAdminSocketCommand(pool_replayer) { } int call(Formatter *f) override { this->pool_replayer->stop(true); return 0; } }; template class RestartCommand : public PoolReplayerAdminSocketCommand { public: explicit RestartCommand(PoolReplayer *pool_replayer) : PoolReplayerAdminSocketCommand(pool_replayer) { } int call(Formatter *f) override { this->pool_replayer->restart(); return 0; } }; template class FlushCommand : public PoolReplayerAdminSocketCommand { public: explicit FlushCommand(PoolReplayer *pool_replayer) : PoolReplayerAdminSocketCommand(pool_replayer) { } int call(Formatter *f) override { this->pool_replayer->flush(); return 0; } }; template class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand { public: explicit LeaderReleaseCommand(PoolReplayer *pool_replayer) : PoolReplayerAdminSocketCommand(pool_replayer) { } int call(Formatter *f) override { this->pool_replayer->release_leader(); return 0; } }; template class PoolReplayerAdminSocketHook : public AdminSocketHook { public: PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name, PoolReplayer *pool_replayer) : admin_socket(cct->get_admin_socket()) { std::string command; int r; command = "rbd mirror status " + name; r = admin_socket->register_command(command, this, "get status for rbd mirror " + name); if (r == 0) { commands[command] = new StatusCommand(pool_replayer); } command = "rbd mirror start " + name; r = admin_socket->register_command(command, this, "start rbd mirror " + name); if (r == 0) { commands[command] = new StartCommand(pool_replayer); } command = "rbd mirror stop " + name; r = admin_socket->register_command(command, this, "stop rbd mirror " + name); if (r == 0) { commands[command] = new StopCommand(pool_replayer); } command = "rbd mirror restart " + name; r = admin_socket->register_command(command, this, "restart rbd mirror " + name); if (r == 0) { commands[command] = new RestartCommand(pool_replayer); } command = "rbd mirror flush " + name; r = admin_socket->register_command(command, this, "flush rbd mirror " + name); if (r == 0) { commands[command] = new FlushCommand(pool_replayer); } command = "rbd mirror leader release " + name; r = admin_socket->register_command(command, this, "release rbd mirror leader " + name); if (r == 0) { commands[command] = new LeaderReleaseCommand(pool_replayer); } } ~PoolReplayerAdminSocketHook() override { (void)admin_socket->unregister_commands(this); for (auto i = commands.begin(); i != commands.end(); ++i) { delete i->second; } } int call(std::string_view command, const cmdmap_t& cmdmap, Formatter *f, std::ostream& ss, bufferlist& out) override { auto i = commands.find(command); ceph_assert(i != commands.end()); return i->second->call(f); } private: typedef std::map*, std::less<>> Commands; AdminSocket *admin_socket; Commands commands; }; } // anonymous namespace template struct PoolReplayer::RemotePoolPollerListener : public remote_pool_poller::Listener { PoolReplayer* m_pool_replayer; RemotePoolPollerListener(PoolReplayer* pool_replayer) : m_pool_replayer(pool_replayer) { } void handle_updated(const RemotePoolMeta& remote_pool_meta) override { m_pool_replayer->handle_remote_pool_meta_updated(remote_pool_meta); } }; template PoolReplayer::PoolReplayer( Threads *threads, ServiceDaemon *service_daemon, journal::CacheManagerHandler *cache_manager_handler, PoolMetaCache* pool_meta_cache, int64_t local_pool_id, const PeerSpec &peer, const std::vector &args) : m_threads(threads), m_service_daemon(service_daemon), m_cache_manager_handler(cache_manager_handler), m_pool_meta_cache(pool_meta_cache), m_local_pool_id(local_pool_id), m_peer(peer), m_args(args), m_lock(ceph::make_mutex("rbd::mirror::PoolReplayer " + stringify(peer))), m_pool_replayer_thread(this), m_leader_listener(this) { } template PoolReplayer::~PoolReplayer() { shut_down(); ceph_assert(m_asok_hook == nullptr); } template bool PoolReplayer::is_blocklisted() const { std::lock_guard locker{m_lock}; return m_blocklisted; } template bool PoolReplayer::is_leader() const { std::lock_guard locker{m_lock}; return m_leader_watcher && m_leader_watcher->is_leader(); } template bool PoolReplayer::is_running() const { return m_pool_replayer_thread.is_started() && !m_stopping; } template void PoolReplayer::init(const std::string& site_name) { std::lock_guard locker{m_lock}; ceph_assert(!m_pool_replayer_thread.is_started()); // reset state m_stopping = false; m_blocklisted = false; m_site_name = site_name; dout(10) << "replaying for " << m_peer << dendl; int r = init_rados(g_ceph_context->_conf->cluster, g_ceph_context->_conf->name.to_str(), "", "", "local cluster", &m_local_rados, false); if (r < 0) { m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, "unable to connect to local cluster"); return; } r = init_rados(m_peer.cluster_name, m_peer.client_name, m_peer.mon_host, m_peer.key, std::string("remote peer ") + stringify(m_peer), &m_remote_rados, true); if (r < 0) { m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, "unable to connect to remote cluster"); return; } r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx); if (r < 0) { derr << "error accessing local pool " << m_local_pool_id << ": " << cpp_strerror(r) << dendl; return; } auto cct = reinterpret_cast(m_local_io_ctx.cct()); librbd::api::Config::apply_pool_overrides(m_local_io_ctx, &cct->_conf); r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, &m_local_mirror_uuid); if (r < 0) { derr << "failed to retrieve local mirror uuid from pool " << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, "unable to query local mirror uuid"); return; } r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), m_remote_io_ctx); if (r < 0) { derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING, "unable to access remote pool"); return; } dout(10) << "connected to " << m_peer << dendl; m_image_sync_throttler.reset( Throttler::create(cct, "rbd_mirror_concurrent_image_syncs")); m_image_deletion_throttler.reset( Throttler::create(cct, "rbd_mirror_concurrent_image_deletions")); m_remote_pool_poller_listener.reset(new RemotePoolPollerListener(this)); m_remote_pool_poller.reset(RemotePoolPoller::create( m_threads, m_remote_io_ctx, m_site_name, m_local_mirror_uuid, *m_remote_pool_poller_listener)); C_SaferCond on_pool_poller_init; m_remote_pool_poller->init(&on_pool_poller_init); r = on_pool_poller_init.wait(); if (r < 0) { derr << "failed to initialize remote pool poller: " << cpp_strerror(r) << dendl; m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, "unable to initialize remote pool poller"); m_remote_pool_poller.reset(); return; } ceph_assert(!m_remote_pool_meta.mirror_uuid.empty()); m_pool_meta_cache->set_remote_pool_meta( m_remote_io_ctx.get_id(), m_remote_pool_meta); m_pool_meta_cache->set_local_pool_meta( m_local_io_ctx.get_id(), {m_local_mirror_uuid}); m_default_namespace_replayer.reset(NamespaceReplayer::create( "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid, m_remote_pool_meta, m_threads, m_image_sync_throttler.get(), m_image_deletion_throttler.get(), m_service_daemon, m_cache_manager_handler, m_pool_meta_cache)); C_SaferCond on_init; m_default_namespace_replayer->init(&on_init); r = on_init.wait(); if (r < 0) { derr << "error initializing default namespace replayer: " << cpp_strerror(r) << dendl; m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, "unable to initialize default namespace replayer"); m_default_namespace_replayer.reset(); return; } m_leader_watcher.reset(LeaderWatcher::create(m_threads, m_local_io_ctx, &m_leader_listener)); r = m_leader_watcher->init(); if (r < 0) { derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; m_callout_id = m_service_daemon->add_or_update_callout( m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, "unable to initialize leader messenger object"); m_leader_watcher.reset(); return; } if (m_callout_id != service_daemon::CALLOUT_ID_NONE) { m_service_daemon->remove_callout(m_local_pool_id, m_callout_id); m_callout_id = service_daemon::CALLOUT_ID_NONE; } m_service_daemon->add_or_update_attribute( m_local_io_ctx.get_id(), SERVICE_DAEMON_INSTANCE_ID_KEY, stringify(m_local_io_ctx.get_instance_id())); m_pool_replayer_thread.create("pool replayer"); } template void PoolReplayer::shut_down() { { std::lock_guard l{m_lock}; m_stopping = true; m_cond.notify_all(); } if (m_pool_replayer_thread.is_started()) { m_pool_replayer_thread.join(); } if (m_leader_watcher) { m_leader_watcher->shut_down(); } m_leader_watcher.reset(); if (m_default_namespace_replayer) { C_SaferCond on_shut_down; m_default_namespace_replayer->shut_down(&on_shut_down); on_shut_down.wait(); } m_default_namespace_replayer.reset(); if (m_remote_pool_poller) { C_SaferCond ctx; m_remote_pool_poller->shut_down(&ctx); ctx.wait(); m_pool_meta_cache->remove_remote_pool_meta(m_remote_io_ctx.get_id()); m_pool_meta_cache->remove_local_pool_meta(m_local_io_ctx.get_id()); } m_remote_pool_poller.reset(); m_remote_pool_poller_listener.reset(); m_image_sync_throttler.reset(); m_image_deletion_throttler.reset(); m_local_rados.reset(); m_remote_rados.reset(); } template int PoolReplayer::init_rados(const std::string &cluster_name, const std::string &client_name, const std::string &mon_host, const std::string &key, const std::string &description, RadosRef *rados_ref, bool strip_cluster_overrides) { // NOTE: manually bootstrap a CephContext here instead of via // the librados API to avoid mixing global singletons between // the librados shared library and the daemon // TODO: eliminate intermingling of global singletons within Ceph APIs CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); if (client_name.empty() || !iparams.name.from_str(client_name)) { derr << "error initializing cluster handle for " << description << dendl; return -EINVAL; } CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); cct->_conf->cluster = cluster_name; // librados::Rados::conf_read_file int r = cct->_conf.parse_config_files(nullptr, nullptr, 0); if (r < 0 && r != -ENOENT) { // do not treat this as fatal, it might still be able to connect derr << "could not read ceph conf for " << description << ": " << cpp_strerror(r) << dendl; } // preserve cluster-specific config settings before applying environment/cli // overrides std::map config_values; if (strip_cluster_overrides) { // remote peer connections shouldn't apply cluster-specific // configuration settings for (auto& key : UNIQUE_PEER_CONFIG_KEYS) { config_values[key] = cct->_conf.get_val(key); } } cct->_conf.parse_env(cct->get_module_type()); // librados::Rados::conf_parse_env std::vector args; r = cct->_conf.parse_argv(args); if (r < 0) { derr << "could not parse environment for " << description << ":" << cpp_strerror(r) << dendl; cct->put(); return r; } cct->_conf.parse_env(cct->get_module_type()); if (!m_args.empty()) { // librados::Rados::conf_parse_argv args = m_args; r = cct->_conf.parse_argv(args); if (r < 0) { derr << "could not parse command line args for " << description << ": " << cpp_strerror(r) << dendl; cct->put(); return r; } } if (strip_cluster_overrides) { // remote peer connections shouldn't apply cluster-specific // configuration settings for (auto& pair : config_values) { auto value = cct->_conf.get_val(pair.first); if (pair.second != value) { dout(0) << "reverting global config option override: " << pair.first << ": " << value << " -> " << pair.second << dendl; cct->_conf.set_val_or_die(pair.first, pair.second); } } } if (!g_ceph_context->_conf->admin_socket.empty()) { cct->_conf.set_val_or_die("admin_socket", "$run_dir/$name.$pid.$cluster.$cctid.asok"); } if (!mon_host.empty()) { r = cct->_conf.set_val("mon_host", mon_host); if (r < 0) { derr << "failed to set mon_host config for " << description << ": " << cpp_strerror(r) << dendl; cct->put(); return r; } } if (!key.empty()) { r = cct->_conf.set_val("key", key); if (r < 0) { derr << "failed to set key config for " << description << ": " << cpp_strerror(r) << dendl; cct->put(); return r; } } // disable unnecessary librbd cache cct->_conf.set_val_or_die("rbd_cache", "false"); cct->_conf.apply_changes(nullptr); cct->_conf.complain_about_parse_error(cct); rados_ref->reset(new librados::Rados()); r = (*rados_ref)->init_with_context(cct); ceph_assert(r == 0); cct->put(); r = (*rados_ref)->connect(); if (r < 0) { derr << "error connecting to " << description << ": " << cpp_strerror(r) << dendl; return r; } return 0; } template void PoolReplayer::run() { dout(20) << dendl; while (true) { std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " + m_peer.cluster_name; if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) { m_asok_hook_name = asok_hook_name; delete m_asok_hook; m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context, m_asok_hook_name, this); } with_namespace_replayers([this]() { update_namespace_replayers(); }); std::unique_lock locker{m_lock}; if (m_leader_watcher->is_blocklisted() || m_default_namespace_replayer->is_blocklisted()) { m_blocklisted = true; m_stopping = true; } for (auto &it : m_namespace_replayers) { if (it.second->is_blocklisted()) { m_blocklisted = true; m_stopping = true; break; } } if (m_stopping) { break; } auto seconds = g_ceph_context->_conf.get_val( "rbd_mirror_pool_replayers_refresh_interval"); m_cond.wait_for(locker, ceph::make_timespan(seconds)); } // shut down namespace replayers with_namespace_replayers([this]() { update_namespace_replayers(); }); delete m_asok_hook; m_asok_hook = nullptr; } template void PoolReplayer::update_namespace_replayers() { dout(20) << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); std::set mirroring_namespaces; if (!m_stopping) { int r = list_mirroring_namespaces(&mirroring_namespaces); if (r < 0) { return; } } auto cct = reinterpret_cast(m_local_io_ctx.cct()); C_SaferCond cond; auto gather_ctx = new C_Gather(cct, &cond); for (auto it = m_namespace_replayers.begin(); it != m_namespace_replayers.end(); ) { auto iter = mirroring_namespaces.find(it->first); if (iter == mirroring_namespaces.end()) { auto namespace_replayer = it->second; auto on_shut_down = new LambdaContext( [namespace_replayer, ctx=gather_ctx->new_sub()](int r) { delete namespace_replayer; ctx->complete(r); }); m_service_daemon->remove_namespace(m_local_pool_id, it->first); namespace_replayer->shut_down(on_shut_down); it = m_namespace_replayers.erase(it); } else { mirroring_namespaces.erase(iter); it++; } } for (auto &name : mirroring_namespaces) { auto namespace_replayer = NamespaceReplayer::create( name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid, m_remote_pool_meta, m_threads, m_image_sync_throttler.get(), m_image_deletion_throttler.get(), m_service_daemon, m_cache_manager_handler, m_pool_meta_cache); auto on_init = new LambdaContext( [this, namespace_replayer, name, &mirroring_namespaces, ctx=gather_ctx->new_sub()](int r) { std::lock_guard locker{m_lock}; if (r < 0) { derr << "failed to initialize namespace replayer for namespace " << name << ": " << cpp_strerror(r) << dendl; delete namespace_replayer; mirroring_namespaces.erase(name); } else { m_namespace_replayers[name] = namespace_replayer; m_service_daemon->add_namespace(m_local_pool_id, name); } ctx->complete(r); }); namespace_replayer->init(on_init); } gather_ctx->activate(); m_lock.unlock(); cond.wait(); m_lock.lock(); if (m_leader) { C_SaferCond acquire_cond; auto acquire_gather_ctx = new C_Gather(cct, &acquire_cond); for (auto &name : mirroring_namespaces) { namespace_replayer_acquire_leader(name, acquire_gather_ctx->new_sub()); } acquire_gather_ctx->activate(); m_lock.unlock(); acquire_cond.wait(); m_lock.lock(); std::vector instance_ids; m_leader_watcher->list_instances(&instance_ids); for (auto &name : mirroring_namespaces) { auto it = m_namespace_replayers.find(name); if (it == m_namespace_replayers.end()) { // acuire leader for this namespace replayer failed continue; } it->second->handle_instances_added(instance_ids); } } else { std::string leader_instance_id; if (m_leader_watcher->get_leader_instance_id(&leader_instance_id)) { for (auto &name : mirroring_namespaces) { m_namespace_replayers[name]->handle_update_leader(leader_instance_id); } } } } template int PoolReplayer::list_mirroring_namespaces( std::set *namespaces) { ceph_assert(ceph_mutex_is_locked(m_lock)); std::vector names; int r = librbd::api::Namespace::list(m_local_io_ctx, &names); if (r < 0) { derr << "failed to list namespaces: " << cpp_strerror(r) << dendl; return r; } for (auto &name : names) { cls::rbd::MirrorMode mirror_mode = cls::rbd::MIRROR_MODE_DISABLED; int r = librbd::cls_client::mirror_mode_get(&m_local_io_ctx, &mirror_mode); if (r < 0 && r != -ENOENT) { derr << "failed to get namespace mirror mode: " << cpp_strerror(r) << dendl; if (m_namespace_replayers.count(name) == 0) { continue; } } else if (mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) { dout(10) << "mirroring is disabled for namespace " << name << dendl; continue; } namespaces->insert(name); } return 0; } template void PoolReplayer::reopen_logs() { std::lock_guard locker{m_lock}; if (m_local_rados) { reinterpret_cast(m_local_rados->cct())->reopen_logs(); } if (m_remote_rados) { reinterpret_cast(m_remote_rados->cct())->reopen_logs(); } } template void PoolReplayer::namespace_replayer_acquire_leader(const std::string &name, Context *on_finish) { ceph_assert(ceph_mutex_is_locked(m_lock)); auto it = m_namespace_replayers.find(name); ceph_assert(it != m_namespace_replayers.end()); on_finish = new LambdaContext( [this, name, on_finish](int r) { if (r < 0) { derr << "failed to handle acquire leader for namespace: " << name << ": " << cpp_strerror(r) << dendl; // remove the namespace replayer -- update_namespace_replayers will // retry to create it and acquire leader. std::lock_guard locker{m_lock}; auto namespace_replayer = m_namespace_replayers[name]; m_namespace_replayers.erase(name); auto on_shut_down = new LambdaContext( [namespace_replayer, on_finish](int r) { delete namespace_replayer; on_finish->complete(r); }); m_service_daemon->remove_namespace(m_local_pool_id, name); namespace_replayer->shut_down(on_shut_down); return; } on_finish->complete(0); }); it->second->handle_acquire_leader(on_finish); } template void PoolReplayer::print_status(Formatter *f) { dout(20) << dendl; assert(f); std::lock_guard l{m_lock}; f->open_object_section("pool_replayer_status"); f->dump_stream("peer") << m_peer; if (m_local_io_ctx.is_valid()) { f->dump_string("pool", m_local_io_ctx.get_pool_name()); f->dump_stream("instance_id") << m_local_io_ctx.get_instance_id(); } std::string state("running"); if (m_manual_stop) { state = "stopped (manual)"; } else if (m_stopping) { state = "stopped"; } else if (!is_running()) { state = "error"; } f->dump_string("state", state); if (m_leader_watcher) { std::string leader_instance_id; m_leader_watcher->get_leader_instance_id(&leader_instance_id); f->dump_string("leader_instance_id", leader_instance_id); bool leader = m_leader_watcher->is_leader(); f->dump_bool("leader", leader); if (leader) { std::vector instance_ids; m_leader_watcher->list_instances(&instance_ids); f->open_array_section("instances"); for (auto instance_id : instance_ids) { f->dump_string("instance_id", instance_id); } f->close_section(); // instances } } if (m_local_rados) { auto cct = reinterpret_cast(m_local_rados->cct()); f->dump_string("local_cluster_admin_socket", cct->_conf.get_val("admin_socket")); } if (m_remote_rados) { auto cct = reinterpret_cast(m_remote_rados->cct()); f->dump_string("remote_cluster_admin_socket", cct->_conf.get_val("admin_socket")); } if (m_image_sync_throttler) { f->open_object_section("sync_throttler"); m_image_sync_throttler->print_status(f); f->close_section(); // sync_throttler } if (m_image_deletion_throttler) { f->open_object_section("deletion_throttler"); m_image_deletion_throttler->print_status(f); f->close_section(); // deletion_throttler } if (m_default_namespace_replayer) { m_default_namespace_replayer->print_status(f); } f->open_array_section("namespaces"); for (auto &it : m_namespace_replayers) { f->open_object_section("namespace"); f->dump_string("name", it.first); it.second->print_status(f); f->close_section(); // namespace } f->close_section(); // namespaces f->close_section(); // pool_replayer_status } template void PoolReplayer::start() { dout(20) << dendl; std::lock_guard l{m_lock}; if (m_stopping) { return; } m_manual_stop = false; if (m_default_namespace_replayer) { m_default_namespace_replayer->start(); } for (auto &it : m_namespace_replayers) { it.second->start(); } } template void PoolReplayer::stop(bool manual) { dout(20) << "enter: manual=" << manual << dendl; std::lock_guard l{m_lock}; if (!manual) { m_stopping = true; m_cond.notify_all(); return; } else if (m_stopping) { return; } m_manual_stop = true; if (m_default_namespace_replayer) { m_default_namespace_replayer->stop(); } for (auto &it : m_namespace_replayers) { it.second->stop(); } } template void PoolReplayer::restart() { dout(20) << dendl; std::lock_guard l{m_lock}; if (m_stopping) { return; } if (m_default_namespace_replayer) { m_default_namespace_replayer->restart(); } for (auto &it : m_namespace_replayers) { it.second->restart(); } } template void PoolReplayer::flush() { dout(20) << dendl; std::lock_guard l{m_lock}; if (m_stopping || m_manual_stop) { return; } if (m_default_namespace_replayer) { m_default_namespace_replayer->flush(); } for (auto &it : m_namespace_replayers) { it.second->flush(); } } template void PoolReplayer::release_leader() { dout(20) << dendl; std::lock_guard l{m_lock}; if (m_stopping || !m_leader_watcher) { return; } m_leader_watcher->release_leader(); } template void PoolReplayer::handle_post_acquire_leader(Context *on_finish) { dout(20) << dendl; with_namespace_replayers( [this](Context *on_finish) { dout(10) << "handle_post_acquire_leader" << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); m_service_daemon->add_or_update_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY, true); auto ctx = new LambdaContext( [this, on_finish](int r) { if (r == 0) { std::lock_guard locker{m_lock}; m_leader = true; } on_finish->complete(r); }); auto cct = reinterpret_cast(m_local_io_ctx.cct()); auto gather_ctx = new C_Gather(cct, ctx); m_default_namespace_replayer->handle_acquire_leader( gather_ctx->new_sub()); for (auto &it : m_namespace_replayers) { namespace_replayer_acquire_leader(it.first, gather_ctx->new_sub()); } gather_ctx->activate(); }, on_finish); } template void PoolReplayer::handle_pre_release_leader(Context *on_finish) { dout(20) << dendl; with_namespace_replayers( [this](Context *on_finish) { dout(10) << "handle_pre_release_leader" << dendl; ceph_assert(ceph_mutex_is_locked(m_lock)); m_leader = false; m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY); auto cct = reinterpret_cast(m_local_io_ctx.cct()); auto gather_ctx = new C_Gather(cct, on_finish); m_default_namespace_replayer->handle_release_leader( gather_ctx->new_sub()); for (auto &it : m_namespace_replayers) { it.second->handle_release_leader(gather_ctx->new_sub()); } gather_ctx->activate(); }, on_finish); } template void PoolReplayer::handle_update_leader( const std::string &leader_instance_id) { dout(10) << "leader_instance_id=" << leader_instance_id << dendl; std::lock_guard locker{m_lock}; m_default_namespace_replayer->handle_update_leader(leader_instance_id); for (auto &it : m_namespace_replayers) { it.second->handle_update_leader(leader_instance_id); } } template void PoolReplayer::handle_instances_added( const std::vector &instance_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; std::lock_guard locker{m_lock}; if (!m_leader_watcher->is_leader()) { return; } m_default_namespace_replayer->handle_instances_added(instance_ids); for (auto &it : m_namespace_replayers) { it.second->handle_instances_added(instance_ids); } } template void PoolReplayer::handle_instances_removed( const std::vector &instance_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; std::lock_guard locker{m_lock}; if (!m_leader_watcher->is_leader()) { return; } m_default_namespace_replayer->handle_instances_removed(instance_ids); for (auto &it : m_namespace_replayers) { it.second->handle_instances_removed(instance_ids); } } template void PoolReplayer::handle_remote_pool_meta_updated( const RemotePoolMeta& remote_pool_meta) { dout(5) << "remote_pool_meta=" << remote_pool_meta << dendl; if (!m_default_namespace_replayer) { m_remote_pool_meta = remote_pool_meta; return; } derr << "remote pool metadata updated unexpectedly" << dendl; std::unique_lock locker{m_lock}; m_stopping = true; m_cond.notify_all(); } } // namespace mirror } // namespace rbd template class rbd::mirror::PoolReplayer;