From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/tools/cephfs_mirror/FSMirror.cc | 441 ++++++++++++++++++++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 src/tools/cephfs_mirror/FSMirror.cc (limited to 'src/tools/cephfs_mirror/FSMirror.cc') diff --git a/src/tools/cephfs_mirror/FSMirror.cc b/src/tools/cephfs_mirror/FSMirror.cc new file mode 100644 index 000000000..76dcc11f6 --- /dev/null +++ b/src/tools/cephfs_mirror/FSMirror.cc @@ -0,0 +1,441 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/admin_socket.h" +#include "common/ceph_argparse.h" +#include "common/ceph_context.h" +#include "common/common_init.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "include/stringify.h" +#include "msg/Messenger.h" +#include "FSMirror.h" +#include "PeerReplayer.h" +#include "aio_utils.h" +#include "ServiceDaemon.h" +#include "Utils.h" + +#include "common/Cond.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__ + +namespace cephfs { +namespace mirror { + +namespace { + +const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count"); +const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed"); + +class MirrorAdminSocketCommand { +public: + virtual ~MirrorAdminSocketCommand() { + } + virtual int call(Formatter *f) = 0; +}; + +class StatusCommand : public MirrorAdminSocketCommand { +public: + explicit StatusCommand(FSMirror *fs_mirror) + : fs_mirror(fs_mirror) { + } + + int call(Formatter *f) override { + fs_mirror->mirror_status(f); + return 0; + } + +private: + FSMirror *fs_mirror; +}; + +} // anonymous namespace + +class MirrorAdminSocketHook : public AdminSocketHook { +public: + MirrorAdminSocketHook(CephContext *cct, const Filesystem &filesystem, FSMirror *fs_mirror) + : admin_socket(cct->get_admin_socket()) { + int r; + std::string cmd; + + // mirror status format is name@fscid + cmd = "fs mirror status " + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid); + r = admin_socket->register_command( + cmd, this, "get filesystem mirror status"); + if (r == 0) { + commands[cmd] = new StatusCommand(fs_mirror); + } + } + + ~MirrorAdminSocketHook() override { + admin_socket->unregister_commands(this); + for (auto &[command, cmdptr] : commands) { + delete cmdptr; + } + } + + int call(std::string_view command, const cmdmap_t& cmdmap, + Formatter *f, std::ostream &errss, bufferlist &out) override { + auto p = commands.at(std::string(command)); + return p->call(f); + } + +private: + typedef std::map> Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + +FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id, + ServiceDaemon *service_daemon, std::vector args, + ContextWQ *work_queue) + : m_cct(cct), + m_filesystem(filesystem), + m_pool_id(pool_id), + m_service_daemon(service_daemon), + m_args(args), + m_work_queue(work_queue), + m_snap_listener(this), + m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) { + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + (uint64_t)0); +} + +FSMirror::~FSMirror() { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + delete m_instance_watcher; + delete m_mirror_watcher; + } + // outside the lock so that in-progress commands can acquire + // lock and finish executing. + delete m_asok_hook; +} + +int FSMirror::init_replayer(PeerReplayer *peer_replayer) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + return peer_replayer->init(); +} + +void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) { + peer_replayer->shutdown(); +} + +void FSMirror::cleanup() { + dout(20) << dendl; + ceph_unmount(m_mount); + ceph_release(m_mount); + m_ioctx.close(); + m_cluster.reset(); +} + +void FSMirror::reopen_logs() { + std::scoped_lock locker(m_lock); + + if (m_cluster) { + reinterpret_cast(m_cluster->cct())->reopen_logs(); + } + for (auto &[peer, replayer] : m_peer_replayers) { + replayer->reopen_logs(); + } +} + +void FSMirror::init(Context *on_finish) { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + int r = connect(g_ceph_context->_conf->name.to_str(), + g_ceph_context->_conf->cluster, &m_cluster, "", "", m_args); + if (r < 0) { + m_init_failed = true; + on_finish->complete(r); + return; + } + + r = m_cluster->ioctx_create2(m_pool_id, m_ioctx); + if (r < 0) { + m_init_failed = true; + m_cluster.reset(); + derr << ": error accessing local pool (id=" << m_pool_id << "): " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + r = mount(m_cluster, m_filesystem, true, &m_mount); + if (r < 0) { + m_init_failed = true; + m_ioctx.close(); + m_cluster.reset(); + on_finish->complete(r); + return; + } + + m_addrs = m_cluster->get_addrs(); + dout(10) << ": rados addrs=" << m_addrs << dendl; + + init_instance_watcher(on_finish); +} + +void FSMirror::shutdown(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + m_stopping = true; + if (m_on_init_finish != nullptr) { + dout(10) << ": delaying shutdown -- init in progress" << dendl; + m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) { + if (r < 0) { + on_finish->complete(0); + return; + } + m_on_shutdown_finish = on_finish; + shutdown_peer_replayers(); + }); + return; + } + + m_on_shutdown_finish = on_finish; + } + + shutdown_peer_replayers(); +} + +void FSMirror::shutdown_peer_replayers() { + dout(20) << dendl; + + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(5) << ": shutting down replayer for peer=" << peer << dendl; + shutdown_replayer(peer_replayer.get()); + } + m_peer_replayers.clear(); + + shutdown_mirror_watcher(); +} + +void FSMirror::init_instance_watcher(Context *on_finish) { + dout(20) << dendl; + + m_on_init_finish = new LambdaContext([this, on_finish](int r) { + { + std::scoped_lock locker(m_lock); + if (r < 0) { + m_init_failed = true; + } + } + on_finish->complete(r); + if (m_on_shutdown_finish != nullptr) { + m_on_shutdown_finish->complete(r); + } + }); + + Context *ctx = new C_CallbackAdapter< + FSMirror, &FSMirror::handle_init_instance_watcher>(this); + m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue); + m_instance_watcher->init(ctx); +} + +void FSMirror::handle_init_instance_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r < 0) { + std::swap(on_init_finish, m_on_init_finish); + } + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + return; + } + + init_mirror_watcher(); +} + +void FSMirror::init_mirror_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *ctx = new C_CallbackAdapter< + FSMirror, &FSMirror::handle_init_mirror_watcher>(this); + m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue); + m_mirror_watcher->init(ctx); +} + +void FSMirror::handle_init_mirror_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r == 0) { + std::swap(on_init_finish, m_on_init_finish); + } + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + return; + } + + m_retval = r; // save errcode for init context callback + shutdown_instance_watcher(); +} + +void FSMirror::shutdown_mirror_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *ctx = new C_CallbackAdapter< + FSMirror, &FSMirror::handle_shutdown_mirror_watcher>(this); + m_mirror_watcher->shutdown(ctx); +} + +void FSMirror::handle_shutdown_mirror_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + shutdown_instance_watcher(); +} + +void FSMirror::shutdown_instance_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *ctx = new C_CallbackAdapter< + FSMirror, &FSMirror::handle_shutdown_instance_watcher>(this); + m_instance_watcher->shutdown(new C_AsyncCallback(m_work_queue, ctx)); +} + +void FSMirror::handle_shutdown_instance_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + cleanup(); + + Context *on_init_finish = nullptr; + Context *on_shutdown_finish = nullptr; + + { + std::scoped_lock locker(m_lock); + std::swap(on_init_finish, m_on_init_finish); + std::swap(on_shutdown_finish, m_on_shutdown_finish); + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(m_retval); + } + if (on_shutdown_finish != nullptr) { + on_shutdown_finish->complete(r); + } +} + +void FSMirror::handle_acquire_directory(string_view dir_path) { + dout(5) << ": dir_path=" << dir_path << dendl; + + { + std::scoped_lock locker(m_lock); + m_directories.emplace(dir_path); + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + m_directories.size()); + + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(10) << ": peer=" << peer << dendl; + peer_replayer->add_directory(dir_path); + } + } +} + +void FSMirror::handle_release_directory(string_view dir_path) { + dout(5) << ": dir_path=" << dir_path << dendl; + + { + std::scoped_lock locker(m_lock); + auto it = m_directories.find(dir_path); + if (it != m_directories.end()) { + m_directories.erase(it); + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + m_directories.size()); + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(10) << ": peer=" << peer << dendl; + peer_replayer->remove_directory(dir_path); + } + } + } +} + +void FSMirror::add_peer(const Peer &peer) { + dout(10) << ": peer=" << peer << dendl; + + std::scoped_lock locker(m_lock); + m_all_peers.emplace(peer); + if (m_peer_replayers.find(peer) != m_peer_replayers.end()) { + return; + } + + auto replayer = std::make_unique( + m_cct, this, m_cluster, m_filesystem, peer, m_directories, m_mount, m_service_daemon); + int r = init_replayer(replayer.get()); + if (r < 0) { + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer, + SERVICE_DAEMON_PEER_INIT_FAILED_KEY, + true); + return; + } + m_peer_replayers.emplace(peer, std::move(replayer)); + ceph_assert(m_peer_replayers.size() == 1); // support only a single peer +} + +void FSMirror::remove_peer(const Peer &peer) { + dout(10) << ": peer=" << peer << dendl; + + std::unique_ptr replayer; + { + std::scoped_lock locker(m_lock); + m_all_peers.erase(peer); + auto it = m_peer_replayers.find(peer); + if (it != m_peer_replayers.end()) { + replayer = std::move(it->second); + m_peer_replayers.erase(it); + } + } + + if (replayer) { + dout(5) << ": shutting down replayers for peer=" << peer << dendl; + shutdown_replayer(replayer.get()); + } +} + +void FSMirror::mirror_status(Formatter *f) { + std::scoped_lock locker(m_lock); + f->open_object_section("status"); + if (m_init_failed) { + f->dump_string("state", "failed"); + } else if (is_blocklisted(locker)) { + f->dump_string("state", "blocklisted"); + } else { + // dump rados addr for blocklist test + f->dump_string("rados_inst", m_addrs); + f->open_object_section("peers"); + for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) { + peer.dump(f); + } + f->close_section(); // peers + f->open_object_section("snap_dirs"); + f->dump_int("dir_count", m_directories.size()); + f->close_section(); // snap_dirs + } + f->close_section(); // status +} + + +} // namespace mirror +} // namespace cephfs -- cgit v1.2.3