diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/tools/cephfs_mirror | |
parent | Initial commit. (diff) | |
download | ceph-upstream/16.2.11+ds.tar.xz ceph-upstream/16.2.11+ds.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
25 files changed, 5373 insertions, 0 deletions
diff --git a/src/tools/cephfs_mirror/CMakeLists.txt b/src/tools/cephfs_mirror/CMakeLists.txt new file mode 100644 index 000000000..4b6dea7a1 --- /dev/null +++ b/src/tools/cephfs_mirror/CMakeLists.txt @@ -0,0 +1,30 @@ +set(cephfs_mirror_internal + ClusterWatcher.cc + Mirror.cc + FSMirror.cc + InstanceWatcher.cc + MirrorWatcher.cc + PeerReplayer.cc + ServiceDaemon.cc + Types.cc + Utils.cc + Watcher.cc + watcher/RewatchRequest.cc) + +add_executable(cephfs-mirror + main.cc) + +add_library(cephfs_mirror_internal STATIC + ${cephfs_mirror_internal}) + +target_link_libraries(cephfs-mirror + cephfs_mirror_internal + global + ceph-common + cls_cephfs_client + librados + mds + cephfs + ${ALLOC_LIBS}) + +install(TARGETS cephfs-mirror DESTINATION bin) diff --git a/src/tools/cephfs_mirror/ClusterWatcher.cc b/src/tools/cephfs_mirror/ClusterWatcher.cc new file mode 100644 index 000000000..b5f6f81d7 --- /dev/null +++ b/src/tools/cephfs_mirror/ClusterWatcher.cc @@ -0,0 +1,182 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <mutex> +#include <vector> + +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "mon/MonClient.h" + +#include "ClusterWatcher.h" +#include "ServiceDaemon.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::ClusterWatcher " << __func__ + +namespace cephfs { +namespace mirror { + +ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon, + Listener &listener) + : Dispatcher(cct), + m_monc(monc), + m_service_daemon(service_daemon), + m_listener(listener) { +} + +ClusterWatcher::~ClusterWatcher() { +} + +bool ClusterWatcher::ms_can_fast_dispatch2(const cref_t<Message> &m) const { + return m->get_type() == CEPH_MSG_FS_MAP; +} + +void ClusterWatcher::ms_fast_dispatch2(const ref_t<Message> &m) { + bool handled = ms_dispatch2(m); + ceph_assert(handled); +} + +bool ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) { + if (m->get_type() == CEPH_MSG_FS_MAP) { + if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) { + handle_fsmap(ref_cast<MFSMap>(m)); + } + return true; + } + + return false; +} + +int ClusterWatcher::init() { + dout(20) << dendl; + + bool sub = m_monc->sub_want("fsmap", 0, 0); + if (!sub) { + derr << ": failed subscribing to FSMap" << dendl; + return -1; + } + + m_monc->renew_subs(); + dout(10) << ": subscribed to FSMap" << dendl; + return 0; +} + +void ClusterWatcher::shutdown() { + dout(20) << dendl; + std::scoped_lock locker(m_lock); + m_stopping = true; + m_monc->sub_unwant("fsmap"); +} + +void ClusterWatcher::handle_fsmap(const cref_t<MFSMap> &m) { + dout(20) << dendl; + + auto fsmap = m->get_fsmap(); + auto filesystems = fsmap.get_filesystems(); + + std::vector<Filesystem> mirroring_enabled; + std::vector<Filesystem> mirroring_disabled; + std::map<Filesystem, Peers> peers_added; + std::map<Filesystem, Peers> peers_removed; + std::map<Filesystem, uint64_t> fs_metadata_pools; + { + std::scoped_lock locker(m_lock); + if (m_stopping) { + return; + } + + // deleted filesystems are considered mirroring disabled + for (auto it = m_filesystem_peers.begin(); it != m_filesystem_peers.end();) { + if (!fsmap.filesystem_exists(it->first.fscid)) { + mirroring_disabled.emplace_back(it->first); + it = m_filesystem_peers.erase(it); + continue; + } + ++it; + } + + for (auto &filesystem : filesystems) { + auto fs = Filesystem{filesystem->fscid, + std::string(filesystem->mds_map.get_fs_name())}; + auto pool_id = filesystem->mds_map.get_metadata_pool(); + auto &mirror_info = filesystem->mirror_info; + + if (!mirror_info.is_mirrored()) { + auto it = m_filesystem_peers.find(fs); + if (it != m_filesystem_peers.end()) { + mirroring_disabled.emplace_back(fs); + m_filesystem_peers.erase(it); + } + } else { + auto [fspeersit, enabled] = m_filesystem_peers.emplace(fs, Peers{}); + auto &peers = fspeersit->second; + + if (enabled) { + mirroring_enabled.emplace_back(fs); + fs_metadata_pools.emplace(fs, pool_id); + } + + // peers added + Peers added; + std::set_difference(mirror_info.peers.begin(), mirror_info.peers.end(), + peers.begin(), peers.end(), std::inserter(added, added.end())); + + // peers removed + Peers removed; + std::set_difference(peers.begin(), peers.end(), + mirror_info.peers.begin(), mirror_info.peers.end(), + std::inserter(removed, removed.end())); + + // update set + if (!added.empty()) { + peers_added.emplace(fs, added); + peers.insert(added.begin(), added.end()); + } + if (!removed.empty()) { + peers_removed.emplace(fs, removed); + for (auto &p : removed) { + peers.erase(p); + } + } + } + } + } + + dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled=" + << mirroring_disabled << dendl; + for (auto &fs : mirroring_enabled) { + m_service_daemon->add_filesystem(fs.fscid, fs.fs_name); + m_listener.handle_mirroring_enabled(FilesystemSpec(fs, fs_metadata_pools.at(fs))); + } + for (auto &fs : mirroring_disabled) { + m_service_daemon->remove_filesystem(fs.fscid); + m_listener.handle_mirroring_disabled(fs); + } + + dout(5) << ": peers added=" << peers_added << ", peers removed=" << peers_removed << dendl; + + for (auto &[fs, peers] : peers_added) { + for (auto &peer : peers) { + m_service_daemon->add_peer(fs.fscid, peer); + m_listener.handle_peers_added(fs, peer); + } + } + for (auto &[fs, peers] : peers_removed) { + for (auto &peer : peers) { + m_service_daemon->remove_peer(fs.fscid, peer); + m_listener.handle_peers_removed(fs, peer); + } + } + + std::scoped_lock locker(m_lock); + if (!m_stopping) { + m_monc->sub_got("fsmap", fsmap.get_epoch()); + } // else we have already done a sub_unwant() +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/ClusterWatcher.h b/src/tools/cephfs_mirror/ClusterWatcher.h new file mode 100644 index 000000000..a418898f5 --- /dev/null +++ b/src/tools/cephfs_mirror/ClusterWatcher.h @@ -0,0 +1,77 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_CLUSTER_WATCHER_H +#define CEPHFS_MIRROR_CLUSTER_WATCHER_H + +#include <map> + +#include "common/ceph_mutex.h" +#include "common/async/context_pool.h" +#include "messages/MFSMap.h" +#include "msg/Dispatcher.h" +#include "Types.h" + +class MonClient; + +namespace cephfs { +namespace mirror { + +class ServiceDaemon; + +// watch peer changes for filesystems via FSMap updates + +class ClusterWatcher : public Dispatcher { +public: + struct Listener { + virtual ~Listener() { + } + + virtual void handle_mirroring_enabled(const FilesystemSpec &spec) = 0; + virtual void handle_mirroring_disabled(const Filesystem &filesystem) = 0; + + virtual void handle_peers_added(const Filesystem &filesystem, const Peer &peer) = 0; + virtual void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) = 0; + }; + + ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon, + Listener &listener); + ~ClusterWatcher(); + + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch2(const cref_t<Message> &m) const override; + void ms_fast_dispatch2(const ref_t<Message> &m) override; + bool ms_dispatch2(const ref_t<Message> &m) override; + + void ms_handle_connect(Connection *c) override { + } + bool ms_handle_reset(Connection *c) override { + return false; + } + void ms_handle_remote_reset(Connection *c) override { + } + bool ms_handle_refused(Connection *c) override { + return false; + } + + int init(); + void shutdown(); + +private: + ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::cluster_watcher"); + MonClient *m_monc; + ServiceDaemon *m_service_daemon; + Listener &m_listener; + + bool m_stopping = false; + std::map<Filesystem, Peers> m_filesystem_peers; + + void handle_fsmap(const cref_t<MFSMap> &m); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_CLUSTER_WATCHER_H diff --git a/src/tools/cephfs_mirror/FSMirror.cc b/src/tools/cephfs_mirror/FSMirror.cc new file mode 100644 index 000000000..76dcc11f6 --- /dev/null +++ b/src/tools/cephfs_mirror/FSMirror.cc @@ -0,0 +1,441 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/admin_socket.h" +#include "common/ceph_argparse.h" +#include "common/ceph_context.h" +#include "common/common_init.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "include/stringify.h" +#include "msg/Messenger.h" +#include "FSMirror.h" +#include "PeerReplayer.h" +#include "aio_utils.h" +#include "ServiceDaemon.h" +#include "Utils.h" + +#include "common/Cond.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__ + +namespace cephfs { +namespace mirror { + +namespace { + +const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count"); +const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed"); + +class MirrorAdminSocketCommand { +public: + virtual ~MirrorAdminSocketCommand() { + } + virtual int call(Formatter *f) = 0; +}; + +class StatusCommand : public MirrorAdminSocketCommand { +public: + explicit StatusCommand(FSMirror *fs_mirror) + : fs_mirror(fs_mirror) { + } + + int call(Formatter *f) override { + fs_mirror->mirror_status(f); + return 0; + } + +private: + FSMirror *fs_mirror; +}; + +} // anonymous namespace + +class MirrorAdminSocketHook : public AdminSocketHook { +public: + MirrorAdminSocketHook(CephContext *cct, const Filesystem &filesystem, FSMirror *fs_mirror) + : admin_socket(cct->get_admin_socket()) { + int r; + std::string cmd; + + // mirror status format is name@fscid + cmd = "fs mirror status " + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid); + r = admin_socket->register_command( + cmd, this, "get filesystem mirror status"); + if (r == 0) { + commands[cmd] = new StatusCommand(fs_mirror); + } + } + + ~MirrorAdminSocketHook() override { + admin_socket->unregister_commands(this); + for (auto &[command, cmdptr] : commands) { + delete cmdptr; + } + } + + int call(std::string_view command, const cmdmap_t& cmdmap, + Formatter *f, std::ostream &errss, bufferlist &out) override { + auto p = commands.at(std::string(command)); + return p->call(f); + } + +private: + typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + +FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id, + ServiceDaemon *service_daemon, std::vector<const char*> args, + ContextWQ *work_queue) + : m_cct(cct), + m_filesystem(filesystem), + m_pool_id(pool_id), + m_service_daemon(service_daemon), + m_args(args), + m_work_queue(work_queue), + m_snap_listener(this), + m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) { + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + (uint64_t)0); +} + +FSMirror::~FSMirror() { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + delete m_instance_watcher; + delete m_mirror_watcher; + } + // outside the lock so that in-progress commands can acquire + // lock and finish executing. + delete m_asok_hook; +} + +int FSMirror::init_replayer(PeerReplayer *peer_replayer) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + return peer_replayer->init(); +} + +void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) { + peer_replayer->shutdown(); +} + +void FSMirror::cleanup() { + dout(20) << dendl; + ceph_unmount(m_mount); + ceph_release(m_mount); + m_ioctx.close(); + m_cluster.reset(); +} + +void FSMirror::reopen_logs() { + std::scoped_lock locker(m_lock); + + if (m_cluster) { + reinterpret_cast<CephContext *>(m_cluster->cct())->reopen_logs(); + } + for (auto &[peer, replayer] : m_peer_replayers) { + replayer->reopen_logs(); + } +} + +void FSMirror::init(Context *on_finish) { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + int r = connect(g_ceph_context->_conf->name.to_str(), + g_ceph_context->_conf->cluster, &m_cluster, "", "", m_args); + if (r < 0) { + m_init_failed = true; + on_finish->complete(r); + return; + } + + r = m_cluster->ioctx_create2(m_pool_id, m_ioctx); + if (r < 0) { + m_init_failed = true; + m_cluster.reset(); + derr << ": error accessing local pool (id=" << m_pool_id << "): " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + r = mount(m_cluster, m_filesystem, true, &m_mount); + if (r < 0) { + m_init_failed = true; + m_ioctx.close(); + m_cluster.reset(); + on_finish->complete(r); + return; + } + + m_addrs = m_cluster->get_addrs(); + dout(10) << ": rados addrs=" << m_addrs << dendl; + + init_instance_watcher(on_finish); +} + +void FSMirror::shutdown(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + m_stopping = true; + if (m_on_init_finish != nullptr) { + dout(10) << ": delaying shutdown -- init in progress" << dendl; + m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) { + if (r < 0) { + on_finish->complete(0); + return; + } + m_on_shutdown_finish = on_finish; + shutdown_peer_replayers(); + }); + return; + } + + m_on_shutdown_finish = on_finish; + } + + shutdown_peer_replayers(); +} + +void FSMirror::shutdown_peer_replayers() { + dout(20) << dendl; + + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(5) << ": shutting down replayer for peer=" << peer << dendl; + shutdown_replayer(peer_replayer.get()); + } + m_peer_replayers.clear(); + + shutdown_mirror_watcher(); +} + +void FSMirror::init_instance_watcher(Context *on_finish) { + dout(20) << dendl; + + m_on_init_finish = new LambdaContext([this, on_finish](int r) { + { + std::scoped_lock locker(m_lock); + if (r < 0) { + m_init_failed = true; + } + } + on_finish->complete(r); + if (m_on_shutdown_finish != nullptr) { + m_on_shutdown_finish->complete(r); + } + }); + + Context *ctx = new C_CallbackAdapter< + FSMirror, &FSMirror::handle_init_instance_watcher>(this); + m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue); + m_instance_watcher->init(ctx); +} + +void FSMirror::handle_init_instance_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r < 0) { + std::swap(on_init_finish, m_on_init_finish); + } + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + return; + } + + init_mirror_watcher(); +} + +void FSMirror::init_mirror_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *ctx = new C_CallbackAdapter< + FSMirror, &FSMirror::handle_init_mirror_watcher>(this); + m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue); + m_mirror_watcher->init(ctx); +} + +void FSMirror::handle_init_mirror_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r == 0) { + std::swap(on_init_finish, m_on_init_finish); + } + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + return; + } + + m_retval = r; // save errcode for init context callback + shutdown_instance_watcher(); +} + +void FSMirror::shutdown_mirror_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *ctx = new C_CallbackAdapter< + FSMirror, &FSMirror::handle_shutdown_mirror_watcher>(this); + m_mirror_watcher->shutdown(ctx); +} + +void FSMirror::handle_shutdown_mirror_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + shutdown_instance_watcher(); +} + +void FSMirror::shutdown_instance_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *ctx = new C_CallbackAdapter< + FSMirror, &FSMirror::handle_shutdown_instance_watcher>(this); + m_instance_watcher->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, ctx)); +} + +void FSMirror::handle_shutdown_instance_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + cleanup(); + + Context *on_init_finish = nullptr; + Context *on_shutdown_finish = nullptr; + + { + std::scoped_lock locker(m_lock); + std::swap(on_init_finish, m_on_init_finish); + std::swap(on_shutdown_finish, m_on_shutdown_finish); + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(m_retval); + } + if (on_shutdown_finish != nullptr) { + on_shutdown_finish->complete(r); + } +} + +void FSMirror::handle_acquire_directory(string_view dir_path) { + dout(5) << ": dir_path=" << dir_path << dendl; + + { + std::scoped_lock locker(m_lock); + m_directories.emplace(dir_path); + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + m_directories.size()); + + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(10) << ": peer=" << peer << dendl; + peer_replayer->add_directory(dir_path); + } + } +} + +void FSMirror::handle_release_directory(string_view dir_path) { + dout(5) << ": dir_path=" << dir_path << dendl; + + { + std::scoped_lock locker(m_lock); + auto it = m_directories.find(dir_path); + if (it != m_directories.end()) { + m_directories.erase(it); + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + m_directories.size()); + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(10) << ": peer=" << peer << dendl; + peer_replayer->remove_directory(dir_path); + } + } + } +} + +void FSMirror::add_peer(const Peer &peer) { + dout(10) << ": peer=" << peer << dendl; + + std::scoped_lock locker(m_lock); + m_all_peers.emplace(peer); + if (m_peer_replayers.find(peer) != m_peer_replayers.end()) { + return; + } + + auto replayer = std::make_unique<PeerReplayer>( + m_cct, this, m_cluster, m_filesystem, peer, m_directories, m_mount, m_service_daemon); + int r = init_replayer(replayer.get()); + if (r < 0) { + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer, + SERVICE_DAEMON_PEER_INIT_FAILED_KEY, + true); + return; + } + m_peer_replayers.emplace(peer, std::move(replayer)); + ceph_assert(m_peer_replayers.size() == 1); // support only a single peer +} + +void FSMirror::remove_peer(const Peer &peer) { + dout(10) << ": peer=" << peer << dendl; + + std::unique_ptr<PeerReplayer> replayer; + { + std::scoped_lock locker(m_lock); + m_all_peers.erase(peer); + auto it = m_peer_replayers.find(peer); + if (it != m_peer_replayers.end()) { + replayer = std::move(it->second); + m_peer_replayers.erase(it); + } + } + + if (replayer) { + dout(5) << ": shutting down replayers for peer=" << peer << dendl; + shutdown_replayer(replayer.get()); + } +} + +void FSMirror::mirror_status(Formatter *f) { + std::scoped_lock locker(m_lock); + f->open_object_section("status"); + if (m_init_failed) { + f->dump_string("state", "failed"); + } else if (is_blocklisted(locker)) { + f->dump_string("state", "blocklisted"); + } else { + // dump rados addr for blocklist test + f->dump_string("rados_inst", m_addrs); + f->open_object_section("peers"); + for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) { + peer.dump(f); + } + f->close_section(); // peers + f->open_object_section("snap_dirs"); + f->dump_int("dir_count", m_directories.size()); + f->close_section(); // snap_dirs + } + f->close_section(); // status +} + + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/FSMirror.h b/src/tools/cephfs_mirror/FSMirror.h new file mode 100644 index 000000000..bae5a38e1 --- /dev/null +++ b/src/tools/cephfs_mirror/FSMirror.h @@ -0,0 +1,158 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_FS_MIRROR_H +#define CEPHFS_MIRROR_FS_MIRROR_H + +#include "common/Formatter.h" +#include "common/Thread.h" +#include "mds/FSMap.h" +#include "Types.h" +#include "InstanceWatcher.h" +#include "MirrorWatcher.h" + +class ContextWQ; + +namespace cephfs { +namespace mirror { + +class MirrorAdminSocketHook; +class PeerReplayer; +class ServiceDaemon; + +// handle mirroring for a filesystem to a set of peers + +class FSMirror { +public: + FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id, + ServiceDaemon *service_daemon, std::vector<const char*> args, + ContextWQ *work_queue); + ~FSMirror(); + + void init(Context *on_finish); + void shutdown(Context *on_finish); + + void add_peer(const Peer &peer); + void remove_peer(const Peer &peer); + + bool is_stopping() { + std::scoped_lock locker(m_lock); + return m_stopping; + } + + bool is_init_failed() { + std::scoped_lock locker(m_lock); + return m_init_failed; + } + + bool is_failed() { + std::scoped_lock locker(m_lock); + return m_init_failed || + m_instance_watcher->is_failed() || + m_mirror_watcher->is_failed(); + } + + bool is_blocklisted() { + std::scoped_lock locker(m_lock); + return is_blocklisted(locker); + } + + Peers get_peers() { + std::scoped_lock locker(m_lock); + return m_all_peers; + } + + std::string get_instance_addr() { + std::scoped_lock locker(m_lock); + return m_addrs; + } + + // admin socket helpers + void mirror_status(Formatter *f); + + void reopen_logs(); + +private: + bool is_blocklisted(const std::scoped_lock<ceph::mutex> &locker) const { + bool blocklisted = false; + if (m_instance_watcher) { + blocklisted = m_instance_watcher->is_blocklisted(); + } + if (m_mirror_watcher) { + blocklisted |= m_mirror_watcher->is_blocklisted(); + } + + return blocklisted; + } + + struct SnapListener : public InstanceWatcher::Listener { + FSMirror *fs_mirror; + + SnapListener(FSMirror *fs_mirror) + : fs_mirror(fs_mirror) { + } + + void acquire_directory(string_view dir_path) override { + fs_mirror->handle_acquire_directory(dir_path); + } + + void release_directory(string_view dir_path) override { + fs_mirror->handle_release_directory(dir_path); + } + }; + + CephContext *m_cct; + Filesystem m_filesystem; + uint64_t m_pool_id; + ServiceDaemon *m_service_daemon; + std::vector<const char *> m_args; + ContextWQ *m_work_queue; + + ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror"); + SnapListener m_snap_listener; + std::set<std::string, std::less<>> m_directories; + Peers m_all_peers; + std::map<Peer, std::unique_ptr<PeerReplayer>> m_peer_replayers; + + RadosRef m_cluster; + std::string m_addrs; + librados::IoCtx m_ioctx; + InstanceWatcher *m_instance_watcher = nullptr; + MirrorWatcher *m_mirror_watcher = nullptr; + + int m_retval = 0; + bool m_stopping = false; + bool m_init_failed = false; + Context *m_on_init_finish = nullptr; + Context *m_on_shutdown_finish = nullptr; + + MirrorAdminSocketHook *m_asok_hook = nullptr; + + MountRef m_mount; + + int init_replayer(PeerReplayer *peer_replayer); + void shutdown_replayer(PeerReplayer *peer_replayer); + void cleanup(); + + void init_instance_watcher(Context *on_finish); + void handle_init_instance_watcher(int r); + + void init_mirror_watcher(); + void handle_init_mirror_watcher(int r); + + void shutdown_peer_replayers(); + + void shutdown_mirror_watcher(); + void handle_shutdown_mirror_watcher(int r); + + void shutdown_instance_watcher(); + void handle_shutdown_instance_watcher(int r); + + void handle_acquire_directory(string_view dir_path); + void handle_release_directory(string_view dir_path); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_FS_MIRROR_H diff --git a/src/tools/cephfs_mirror/InstanceWatcher.cc b/src/tools/cephfs_mirror/InstanceWatcher.cc new file mode 100644 index 000000000..9c357da31 --- /dev/null +++ b/src/tools/cephfs_mirror/InstanceWatcher.cc @@ -0,0 +1,251 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_context.h" +#include "common/ceph_json.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "cls/cephfs/cls_cephfs_client.h" +#include "include/stringify.h" +#include "aio_utils.h" +#include "InstanceWatcher.h" +#include "Types.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::InstanceWatcher " << __func__ + +namespace cephfs { +namespace mirror { + +namespace { + +std::string instance_oid(const std::string &instance_id) { + return CEPHFS_MIRROR_OBJECT + "." + instance_id; +} + +} // anonymous namespace + +InstanceWatcher::InstanceWatcher(librados::IoCtx &ioctx, + Listener &listener, ContextWQ *work_queue) + : Watcher(ioctx, instance_oid(stringify(ioctx.get_instance_id())), work_queue), + m_ioctx(ioctx), + m_listener(listener), + m_work_queue(work_queue), + m_lock(ceph::make_mutex("cephfs::mirror::instance_watcher")) { +} + +InstanceWatcher::~InstanceWatcher() { +} + +void InstanceWatcher::init(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + ceph_assert(m_on_init_finish == nullptr); + m_on_init_finish = new LambdaContext([this, on_finish](int r) { + on_finish->complete(r); + if (m_on_shutdown_finish != nullptr) { + m_on_shutdown_finish->complete(0); + } + }); + } + + create_instance(); +} + +void InstanceWatcher::shutdown(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + ceph_assert(m_on_shutdown_finish == nullptr); + if (m_on_init_finish != nullptr) { + dout(10) << ": delaying shutdown -- init in progress" << dendl; + m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) { + m_on_shutdown_finish = nullptr; + shutdown(on_finish); + }); + return; + } + + m_on_shutdown_finish = on_finish; + } + + unregister_watcher(); +} + +void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) { + dout(20) << dendl; + + std::string dir_path; + std::string mode; + try { + JSONDecoder jd(bl); + JSONDecoder::decode_json("dir_path", dir_path, &jd.parser, true); + JSONDecoder::decode_json("mode", mode, &jd.parser, true); + } catch (const JSONDecoder::err &e) { + derr << ": failed to decode notify json: " << e.what() << dendl; + } + + dout(20) << ": notifier_id=" << notifier_id << ", dir_path=" << dir_path + << ", mode=" << mode << dendl; + + if (mode == "acquire") { + m_listener.acquire_directory(dir_path); + } else if (mode == "release") { + m_listener.release_directory(dir_path); + } else { + derr << ": unknown mode" << dendl; + } + + bufferlist outbl; + acknowledge_notify(notify_id, handle, outbl); +} + +void InstanceWatcher::handle_rewatch_complete(int r) { + dout(5) << ": r=" << r << dendl; + + if (r == -EBLOCKLISTED) { + dout(0) << ": client blocklisted" <<dendl; + std::scoped_lock locker(m_lock); + m_blocklisted = true; + } else if (r == -ENOENT) { + derr << ": mirroring object deleted" << dendl; + m_failed = true; + } else if (r < 0) { + derr << ": rewatch error: " << cpp_strerror(r) << dendl; + m_failed = true; + } +} + +void InstanceWatcher::create_instance() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + librados::ObjectWriteOperation op; + op.create(false); + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback<InstanceWatcher, &InstanceWatcher::handle_create_instance>); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + ceph_assert(r == 0); + aio_comp->release(); +} + +void InstanceWatcher::handle_create_instance(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r < 0) { + std::swap(on_init_finish, m_on_init_finish); + } + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + return; + } + + register_watcher(); +} + +void InstanceWatcher::register_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *on_finish = new C_CallbackAdapter< + InstanceWatcher, &InstanceWatcher::handle_register_watcher>(this); + register_watch(on_finish); +} + +void InstanceWatcher::handle_register_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r == 0) { + std::swap(on_init_finish, m_on_init_finish); + } + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + return; + } + + remove_instance(); +} + +void InstanceWatcher::unregister_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *on_finish = new C_CallbackAdapter< + InstanceWatcher, &InstanceWatcher::handle_unregister_watcher>(this); + unregister_watch(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish)); +} + +void InstanceWatcher::handle_unregister_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_shutdown_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r < 0) { + std::swap(on_shutdown_finish, m_on_shutdown_finish); + } + } + + if (on_shutdown_finish != nullptr) { + on_shutdown_finish->complete(r); + return; + } + + remove_instance(); +} + +void InstanceWatcher::remove_instance() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + librados::ObjectWriteOperation op; + op.remove(); + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback<InstanceWatcher, &InstanceWatcher::handle_remove_instance>); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + ceph_assert(r == 0); + aio_comp->release(); +} + +void InstanceWatcher::handle_remove_instance(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + Context *on_shutdown_finish = nullptr; + { + std::scoped_lock locker(m_lock); + std::swap(on_init_finish, m_on_init_finish); + std::swap(on_shutdown_finish, m_on_shutdown_finish); + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + } + if (on_shutdown_finish != nullptr) { + on_shutdown_finish->complete(r); + } +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/InstanceWatcher.h b/src/tools/cephfs_mirror/InstanceWatcher.h new file mode 100644 index 000000000..06edf5da9 --- /dev/null +++ b/src/tools/cephfs_mirror/InstanceWatcher.h @@ -0,0 +1,85 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_INSTANCE_WATCHER_H +#define CEPHFS_MIRROR_INSTANCE_WATCHER_H + +#include <string_view> + +#include "common/ceph_mutex.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "Watcher.h" + +class ContextWQ; + +namespace cephfs { +namespace mirror { + +// watch directory update notifications via per daemon rados +// object and invoke listener callback. + +class InstanceWatcher : public Watcher { +public: + struct Listener { + virtual ~Listener() { + } + + virtual void acquire_directory(string_view dir_path) = 0; + virtual void release_directory(string_view dir_path) = 0; + }; + + static InstanceWatcher *create(librados::IoCtx &ioctx, + Listener &listener, ContextWQ *work_queue) { + return new InstanceWatcher(ioctx, listener, work_queue); + } + + InstanceWatcher(librados::IoCtx &ioctx, Listener &listener, ContextWQ *work_queue); + ~InstanceWatcher(); + + void init(Context *on_finish); + void shutdown(Context *on_finish); + + void handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) override; + void handle_rewatch_complete(int r) override; + + bool is_blocklisted() { + std::scoped_lock locker(m_lock); + return m_blocklisted; + } + + bool is_failed() { + std::scoped_lock locker(m_lock); + return m_failed; + } + +private: + librados::IoCtx &m_ioctx; + Listener &m_listener; + ContextWQ *m_work_queue; + + ceph::mutex m_lock; + Context *m_on_init_finish = nullptr; + Context *m_on_shutdown_finish = nullptr; + + bool m_blocklisted = false; + bool m_failed = false; + + void create_instance(); + void handle_create_instance(int r); + + void register_watcher(); + void handle_register_watcher(int r); + + void remove_instance(); + void handle_remove_instance(int r); + + void unregister_watcher(); + void handle_unregister_watcher(int r); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_INSTANCE_WATCHER_H diff --git a/src/tools/cephfs_mirror/Mirror.cc b/src/tools/cephfs_mirror/Mirror.cc new file mode 100644 index 000000000..890805764 --- /dev/null +++ b/src/tools/cephfs_mirror/Mirror.cc @@ -0,0 +1,602 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_argparse.h" +#include "common/ceph_context.h" +#include "common/common_init.h" +#include "common/Cond.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" +#include "include/types.h" +#include "mon/MonClient.h" +#include "msg/Messenger.h" +#include "aio_utils.h" +#include "Mirror.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::Mirror " << __func__ + +namespace cephfs { +namespace mirror { + +namespace { + +const std::string SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY("mirroring_failed"); + +class SafeTimerSingleton : public CommonSafeTimer<ceph::mutex> { +public: + ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock"); + + explicit SafeTimerSingleton(CephContext *cct) + : SafeTimer(cct, timer_lock, true) { + init(); + } +}; + +class ThreadPoolSingleton : public ThreadPool { +public: + ContextWQ *work_queue = nullptr; + + explicit ThreadPoolSingleton(CephContext *cct) + : ThreadPool(cct, "Mirror::thread_pool", "tp_mirror", 1) { + work_queue = new ContextWQ("Mirror::work_queue", ceph::make_timespan(60), this); + + start(); + } +}; + +} // anonymous namespace + +struct Mirror::C_EnableMirroring : Context { + Mirror *mirror; + Filesystem filesystem; + uint64_t pool_id; + + C_EnableMirroring(Mirror *mirror, const Filesystem &filesystem, uint64_t pool_id) + : mirror(mirror), + filesystem(filesystem), + pool_id(pool_id) { + } + + void finish(int r) override { + enable_mirroring(); + } + + void enable_mirroring() { + Context *ctx = new C_CallbackAdapter<C_EnableMirroring, + &C_EnableMirroring::handle_enable_mirroring>(this); + mirror->enable_mirroring(filesystem, pool_id, ctx); + } + + void handle_enable_mirroring(int r) { + mirror->handle_enable_mirroring(filesystem, r); + delete this; + } + + // context needs to live post completion + void complete(int r) override { + finish(r); + } +}; + +struct Mirror::C_DisableMirroring : Context { + Mirror *mirror; + Filesystem filesystem; + + C_DisableMirroring(Mirror *mirror, const Filesystem &filesystem) + : mirror(mirror), + filesystem(filesystem) { + } + + void finish(int r) override { + disable_mirroring(); + } + + void disable_mirroring() { + Context *ctx = new C_CallbackAdapter<C_DisableMirroring, + &C_DisableMirroring::handle_disable_mirroring>(this); + mirror->disable_mirroring(filesystem, ctx); + } + + void handle_disable_mirroring(int r) { + mirror->handle_disable_mirroring(filesystem, r); + delete this; + } + + // context needs to live post completion + void complete(int r) override { + finish(r); + } +}; + +struct Mirror::C_PeerUpdate : Context { + Mirror *mirror; + Filesystem filesystem; + Peer peer; + bool remove = false; + + C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem, + const Peer &peer) + : mirror(mirror), + filesystem(filesystem), + peer(peer) { + } + C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem, + const Peer &peer, bool remove) + : mirror(mirror), + filesystem(filesystem), + peer(peer), + remove(remove) { + } + + void finish(int r) override { + if (remove) { + mirror->remove_peer(filesystem, peer); + } else { + mirror->add_peer(filesystem, peer); + } + } +}; + +struct Mirror::C_RestartMirroring : Context { + Mirror *mirror; + Filesystem filesystem; + uint64_t pool_id; + Peers peers; + + C_RestartMirroring(Mirror *mirror, const Filesystem &filesystem, + uint64_t pool_id, const Peers &peers) + : mirror(mirror), + filesystem(filesystem), + pool_id(pool_id), + peers(peers) { + } + + void finish(int r) override { + disable_mirroring(); + } + + void disable_mirroring() { + Context *ctx = new C_CallbackAdapter<C_RestartMirroring, + &C_RestartMirroring::handle_disable_mirroring>(this); + mirror->disable_mirroring(filesystem, ctx); + } + + void handle_disable_mirroring(int r) { + enable_mirroring(); + } + + void enable_mirroring() { + std::scoped_lock locker(mirror->m_lock); + Context *ctx = new C_CallbackAdapter<C_RestartMirroring, + &C_RestartMirroring::handle_enable_mirroring>(this); + mirror->enable_mirroring(filesystem, pool_id, ctx, true); + } + + void handle_enable_mirroring(int r) { + mirror->handle_enable_mirroring(filesystem, peers, r); + delete this; + } + + // context needs to live post completion + void complete(int r) override { + finish(r); + } +}; + +Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args, + MonClient *monc, Messenger *msgr) + : m_cct(cct), + m_args(args), + m_monc(monc), + m_msgr(msgr), + m_listener(this), + m_last_blocklist_check(ceph_clock_now()), + m_last_failure_check(ceph_clock_now()), + m_local(new librados::Rados()) { + auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( + "cephfs::mirror::thread_pool", false, cct)); + auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>( + "cephfs::mirror::safe_timer", false, cct)); + m_thread_pool = thread_pool; + m_work_queue = thread_pool->work_queue; + m_timer = safe_timer; + m_timer_lock = &safe_timer->timer_lock; + std::scoped_lock timer_lock(*m_timer_lock); + schedule_mirror_update_task(); +} + +Mirror::~Mirror() { + dout(10) << dendl; + { + std::scoped_lock timer_lock(*m_timer_lock); + m_timer->shutdown(); + } + + m_work_queue->drain(); + delete m_work_queue; + { + std::scoped_lock locker(m_lock); + m_thread_pool->stop(); + } +} + +int Mirror::init_mon_client() { + dout(20) << dendl; + + m_monc->set_messenger(m_msgr); + m_msgr->add_dispatcher_head(m_monc); + m_monc->set_want_keys(CEPH_ENTITY_TYPE_MON); + + int r = m_monc->init(); + if (r < 0) { + derr << ": failed to init mon client: " << cpp_strerror(r) << dendl; + return r; + } + + r = m_monc->authenticate(std::chrono::duration<double>(m_cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count()); + if (r < 0) { + derr << ": failed to authenticate to monitor: " << cpp_strerror(r) << dendl; + return r; + } + + client_t me = m_monc->get_global_id(); + m_msgr->set_myname(entity_name_t::CLIENT(me.v)); + return 0; +} + +int Mirror::init(std::string &reason) { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + + int r = m_local->init_with_context(m_cct); + if (r < 0) { + derr << ": could not initialize rados handler" << dendl; + return r; + } + + r = m_local->connect(); + if (r < 0) { + derr << ": error connecting to local cluster" << dendl; + return r; + } + + m_service_daemon = std::make_unique<ServiceDaemon>(m_cct, m_local); + r = m_service_daemon->init(); + if (r < 0) { + derr << ": error registering service daemon: " << cpp_strerror(r) << dendl; + return r; + } + + r = init_mon_client(); + if (r < 0) { + return r; + } + + return 0; +} + +void Mirror::shutdown() { + dout(20) << dendl; + m_stopping = true; + m_cluster_watcher->shutdown(); + m_cond.notify_all(); +} + +void Mirror::reopen_logs() { + for (auto &[filesystem, mirror_action] : m_mirror_actions) { + mirror_action.fs_mirror->reopen_logs(); + } + g_ceph_context->reopen_logs(); +} + +void Mirror::handle_signal(int signum) { + dout(10) << ": signal=" << signum << dendl; + + std::scoped_lock locker(m_lock); + switch (signum) { + case SIGHUP: + reopen_logs(); + break; + case SIGINT: + case SIGTERM: + shutdown(); + break; + default: + ceph_abort_msgf("unexpected signal %d", signum); + } +} + +void Mirror::handle_enable_mirroring(const Filesystem &filesystem, + const Peers &peers, int r) { + dout(20) << ": filesystem=" << filesystem << ", peers=" << peers + << ", r=" << r << dendl; + + std::scoped_lock locker(m_lock); + auto &mirror_action = m_mirror_actions.at(filesystem); + ceph_assert(mirror_action.action_in_progress); + + mirror_action.action_in_progress = false; + m_cond.notify_all(); + if (r < 0) { + derr << ": failed to initialize FSMirror for filesystem=" << filesystem + << ": " << cpp_strerror(r) << dendl; + m_service_daemon->add_or_update_fs_attribute(filesystem.fscid, + SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY, + true); + return; + } + + for (auto &peer : peers) { + mirror_action.fs_mirror->add_peer(peer); + } + + dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl; +} + +void Mirror::handle_enable_mirroring(const Filesystem &filesystem, int r) { + dout(20) << ": filesystem=" << filesystem << ", r=" << r << dendl; + + std::scoped_lock locker(m_lock); + auto &mirror_action = m_mirror_actions.at(filesystem); + ceph_assert(mirror_action.action_in_progress); + + mirror_action.action_in_progress = false; + m_cond.notify_all(); + if (r < 0) { + derr << ": failed to initialize FSMirror for filesystem=" << filesystem + << ": " << cpp_strerror(r) << dendl; + m_service_daemon->add_or_update_fs_attribute(filesystem.fscid, + SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY, + true); + return; + } + + dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl; +} + +void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id, + Context *on_finish, bool is_restart) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + auto &mirror_action = m_mirror_actions.at(filesystem); + if (is_restart) { + mirror_action.fs_mirror.reset(); + } else { + ceph_assert(!mirror_action.action_in_progress); + } + + ceph_assert(!mirror_action.fs_mirror); + + dout(10) << ": starting FSMirror: filesystem=" << filesystem << dendl; + + mirror_action.action_in_progress = true; + mirror_action.fs_mirror = std::make_unique<FSMirror>(m_cct, filesystem, local_pool_id, + m_service_daemon.get(), m_args, m_work_queue); + mirror_action.fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish)); +} + +void Mirror::mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id) { + dout(10) << ": filesystem=" << filesystem << ", pool_id=" << local_pool_id << dendl; + + std::scoped_lock locker(m_lock); + if (m_stopping) { + return; + } + + auto p = m_mirror_actions.emplace(filesystem, MirrorAction(local_pool_id)); + auto &mirror_action = p.first->second; + mirror_action.action_ctxs.push_back(new C_EnableMirroring(this, filesystem, local_pool_id)); +} + +void Mirror::handle_disable_mirroring(const Filesystem &filesystem, int r) { + dout(10) << ": filesystem=" << filesystem << ", r=" << r << dendl; + + std::scoped_lock locker(m_lock); + auto &mirror_action = m_mirror_actions.at(filesystem); + + if (!mirror_action.fs_mirror->is_init_failed()) { + ceph_assert(mirror_action.action_in_progress); + mirror_action.action_in_progress = false; + m_cond.notify_all(); + } + + if (!m_stopping) { + mirror_action.fs_mirror.reset(); + if (mirror_action.action_ctxs.empty()) { + dout(10) << ": no pending actions for filesystem=" << filesystem << dendl; + m_mirror_actions.erase(filesystem); + } + } +} + +void Mirror::disable_mirroring(const Filesystem &filesystem, Context *on_finish) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + auto &mirror_action = m_mirror_actions.at(filesystem); + ceph_assert(mirror_action.fs_mirror); + ceph_assert(!mirror_action.action_in_progress); + + if (mirror_action.fs_mirror->is_init_failed()) { + dout(10) << ": init failed for filesystem=" << filesystem << dendl; + m_work_queue->queue(on_finish, -EINVAL); + return; + } + + mirror_action.action_in_progress = true; + mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish)); +} + +void Mirror::mirroring_disabled(const Filesystem &filesystem) { + dout(10) << ": filesystem=" << filesystem << dendl; + + std::scoped_lock locker(m_lock); + if (m_stopping) { + dout(5) << "shutting down" << dendl; + return; + } + + auto &mirror_action = m_mirror_actions.at(filesystem); + mirror_action.action_ctxs.push_back(new C_DisableMirroring(this, filesystem)); +} + +void Mirror::add_peer(const Filesystem &filesystem, const Peer &peer) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + auto &mirror_action = m_mirror_actions.at(filesystem); + ceph_assert(mirror_action.fs_mirror); + ceph_assert(!mirror_action.action_in_progress); + + mirror_action.fs_mirror->add_peer(peer); +} + +void Mirror::peer_added(const Filesystem &filesystem, const Peer &peer) { + dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl; + + std::scoped_lock locker(m_lock); + if (m_stopping) { + dout(5) << "shutting down" << dendl; + return; + } + + auto &mirror_action = m_mirror_actions.at(filesystem); + mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer)); +} + +void Mirror::remove_peer(const Filesystem &filesystem, const Peer &peer) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + auto &mirror_action = m_mirror_actions.at(filesystem); + ceph_assert(mirror_action.fs_mirror); + ceph_assert(!mirror_action.action_in_progress); + + mirror_action.fs_mirror->remove_peer(peer); +} + +void Mirror::peer_removed(const Filesystem &filesystem, const Peer &peer) { + dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl; + + std::scoped_lock locker(m_lock); + if (m_stopping) { + dout(5) << "shutting down" << dendl; + return; + } + + auto &mirror_action = m_mirror_actions.at(filesystem); + mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer, true)); +} + +void Mirror::update_fs_mirrors() { + dout(20) << dendl; + + auto now = ceph_clock_now(); + double blocklist_interval = g_ceph_context->_conf.get_val<std::chrono::seconds> + ("cephfs_mirror_restart_mirror_on_blocklist_interval").count(); + bool check_blocklist = blocklist_interval > 0 && ((now - m_last_blocklist_check) >= blocklist_interval); + + double failed_interval = g_ceph_context->_conf.get_val<std::chrono::seconds> + ("cephfs_mirror_restart_mirror_on_failure_interval").count(); + bool check_failure = failed_interval > 0 && ((now - m_last_failure_check) >= failed_interval); + + { + std::scoped_lock locker(m_lock); + for (auto &[filesystem, mirror_action] : m_mirror_actions) { + auto failed = mirror_action.fs_mirror && mirror_action.fs_mirror->is_failed(); + auto blocklisted = mirror_action.fs_mirror && mirror_action.fs_mirror->is_blocklisted(); + + if (check_failure && !mirror_action.action_in_progress && failed) { + // about to restart failed mirror instance -- nothing + // should interfere + dout(5) << ": filesystem=" << filesystem << " failed mirroring -- restarting" << dendl; + auto peers = mirror_action.fs_mirror->get_peers(); + auto ctx = new C_RestartMirroring(this, filesystem, mirror_action.pool_id, peers); + ctx->complete(0); + } else if (check_blocklist && !mirror_action.action_in_progress && blocklisted) { + // about to restart blocklisted mirror instance -- nothing + // should interfere + dout(5) << ": filesystem=" << filesystem << " is blocklisted -- restarting" << dendl; + auto peers = mirror_action.fs_mirror->get_peers(); + auto ctx = new C_RestartMirroring(this, filesystem, mirror_action.pool_id, peers); + ctx->complete(0); + } + if (!failed && !blocklisted && !mirror_action.action_ctxs.empty() + && !mirror_action.action_in_progress) { + auto ctx = std::move(mirror_action.action_ctxs.front()); + mirror_action.action_ctxs.pop_front(); + ctx->complete(0); + } + } + + if (check_blocklist) { + m_last_blocklist_check = now; + } + if (check_failure) { + m_last_failure_check = now; + } + } + + schedule_mirror_update_task(); +} + +void Mirror::schedule_mirror_update_task() { + ceph_assert(m_timer_task == nullptr); + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + + m_timer_task = new LambdaContext([this](int _) { + m_timer_task = nullptr; + update_fs_mirrors(); + }); + double after = g_ceph_context->_conf.get_val<std::chrono::seconds> + ("cephfs_mirror_action_update_interval").count(); + dout(20) << ": scheduling fs mirror update (" << m_timer_task << ") after " + << after << " seconds" << dendl; + m_timer->add_event_after(after, m_timer_task); +} + +void Mirror::run() { + dout(20) << dendl; + + std::unique_lock locker(m_lock); + m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_service_daemon.get(), m_listener)); + m_msgr->add_dispatcher_tail(m_cluster_watcher.get()); + + m_cluster_watcher->init(); + m_cond.wait(locker, [this]{return m_stopping;}); + + locker.unlock(); + { + std::scoped_lock timer_lock(*m_timer_lock); + if (m_timer_task != nullptr) { + dout(10) << ": canceling timer task=" << m_timer_task << dendl; + m_timer->cancel_event(m_timer_task); + m_timer_task = nullptr; + } + } + locker.lock(); + + for (auto &[filesystem, mirror_action] : m_mirror_actions) { + dout(10) << ": trying to shutdown filesystem=" << filesystem << dendl; + // wait for in-progress action and shutdown + m_cond.wait(locker, [&mirror_action=mirror_action] + {return !mirror_action.action_in_progress;}); + if (mirror_action.fs_mirror && + !mirror_action.fs_mirror->is_stopping() && + !mirror_action.fs_mirror->is_init_failed()) { + C_SaferCond cond; + mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, &cond)); + int r = cond.wait(); + dout(10) << ": shutdown filesystem=" << filesystem << ", r=" << r << dendl; + } + + mirror_action.fs_mirror.reset(); + } +} + +} // namespace mirror +} // namespace cephfs + diff --git a/src/tools/cephfs_mirror/Mirror.h b/src/tools/cephfs_mirror/Mirror.h new file mode 100644 index 000000000..f0ffdd516 --- /dev/null +++ b/src/tools/cephfs_mirror/Mirror.h @@ -0,0 +1,140 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_H +#define CEPHFS_MIRROR_H + +#include <map> +#include <set> +#include <vector> + +#include "common/ceph_mutex.h" +#include "common/WorkQueue.h" +#include "mds/FSMap.h" +#include "ClusterWatcher.h" +#include "FSMirror.h" +#include "ServiceDaemon.h" +#include "Types.h" + +class Messenger; +class MonClient; +class ContextWQ; + +namespace cephfs { +namespace mirror { + +// this wraps up ClusterWatcher and FSMirrors to implement mirroring +// for ceph filesystems. + +class Mirror { +public: + Mirror(CephContext *cct, const std::vector<const char*> &args, + MonClient *monc, Messenger *msgr); + ~Mirror(); + + int init(std::string &reason); + void shutdown(); + void run(); + + void handle_signal(int signum); + +private: + static constexpr std::string_view MIRRORING_MODULE = "mirroring"; + + struct C_EnableMirroring; + struct C_DisableMirroring; + struct C_PeerUpdate; + struct C_RestartMirroring; + + struct ClusterListener : ClusterWatcher::Listener { + Mirror *mirror; + + ClusterListener(Mirror *mirror) + : mirror(mirror) { + } + + void handle_mirroring_enabled(const FilesystemSpec &spec) override { + mirror->mirroring_enabled(spec.filesystem, spec.pool_id); + } + + void handle_mirroring_disabled(const Filesystem &filesystem) override { + mirror->mirroring_disabled(filesystem); + } + + void handle_peers_added(const Filesystem &filesystem, const Peer &peer) override { + mirror->peer_added(filesystem, peer); + } + + void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) override { + mirror->peer_removed(filesystem, peer); + } + }; + + struct MirrorAction { + MirrorAction(uint64_t pool_id) : + pool_id(pool_id) { + } + + uint64_t pool_id; // for restarting blocklisted mirror instance + bool action_in_progress = false; + std::list<Context *> action_ctxs; + std::unique_ptr<FSMirror> fs_mirror; + }; + + ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::Mirror"); + ceph::condition_variable m_cond; + + CephContext *m_cct; + std::vector<const char *> m_args; + MonClient *m_monc; + Messenger *m_msgr; + ClusterListener m_listener; + + ThreadPool *m_thread_pool = nullptr; + ContextWQ *m_work_queue = nullptr; + SafeTimer *m_timer = nullptr; + ceph::mutex *m_timer_lock = nullptr; + Context *m_timer_task = nullptr; + + bool m_stopping = false; + std::unique_ptr<ClusterWatcher> m_cluster_watcher; + std::map<Filesystem, MirrorAction> m_mirror_actions; + + utime_t m_last_blocklist_check; + utime_t m_last_failure_check; + + RadosRef m_local; + std::unique_ptr<ServiceDaemon> m_service_daemon; + + int init_mon_client(); + + // called via listener + void mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id); + void mirroring_disabled(const Filesystem &filesystem); + void peer_added(const Filesystem &filesystem, const Peer &peer); + void peer_removed(const Filesystem &filesystem, const Peer &peer); + + // mirror enable callback + void enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id, + Context *on_finish, bool is_restart=false); + void handle_enable_mirroring(const Filesystem &filesystem, int r); + void handle_enable_mirroring(const Filesystem &filesystem, const Peers &peers, int r); + + // mirror disable callback + void disable_mirroring(const Filesystem &filesystem, Context *on_finish); + void handle_disable_mirroring(const Filesystem &filesystem, int r); + + // peer update callback + void add_peer(const Filesystem &filesystem, const Peer &peer); + void remove_peer(const Filesystem &filesystem, const Peer &peer); + + void schedule_mirror_update_task(); + void update_fs_mirrors(); + + void reopen_logs(); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_H diff --git a/src/tools/cephfs_mirror/MirrorWatcher.cc b/src/tools/cephfs_mirror/MirrorWatcher.cc new file mode 100644 index 000000000..26b88d077 --- /dev/null +++ b/src/tools/cephfs_mirror/MirrorWatcher.cc @@ -0,0 +1,148 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_context.h" +#include "common/ceph_json.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "include/stringify.h" +#include "msg/Messenger.h" +#include "aio_utils.h" +#include "MirrorWatcher.h" +#include "FSMirror.h" +#include "Types.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::MirrorWatcher " << __func__ + +namespace cephfs { +namespace mirror { + +MirrorWatcher::MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror, + ContextWQ *work_queue) + : Watcher(ioctx, CEPHFS_MIRROR_OBJECT, work_queue), + m_ioctx(ioctx), + m_fs_mirror(fs_mirror), + m_work_queue(work_queue), + m_lock(ceph::make_mutex("cephfs::mirror::mirror_watcher")), + m_instance_id(stringify(m_ioctx.get_instance_id())) { +} + +MirrorWatcher::~MirrorWatcher() { +} + +void MirrorWatcher::init(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + ceph_assert(m_on_init_finish == nullptr); + m_on_init_finish = new LambdaContext([this, on_finish](int r) { + on_finish->complete(r); + if (m_on_shutdown_finish != nullptr) { + m_on_shutdown_finish->complete(0); + } + }); + } + + register_watcher(); +} + +void MirrorWatcher::shutdown(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + ceph_assert(m_on_shutdown_finish == nullptr); + if (m_on_init_finish != nullptr) { + dout(10) << ": delaying shutdown -- init in progress" << dendl; + m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) { + m_on_shutdown_finish = nullptr; + shutdown(on_finish); + }); + return; + } + + m_on_shutdown_finish = on_finish; + } + + unregister_watcher(); +} + +void MirrorWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) { + dout(20) << dendl; + + JSONFormatter f; + f.open_object_section("info"); + encode_json("addr", m_fs_mirror->get_instance_addr(), &f); + f.close_section(); + + bufferlist outbl; + f.flush(outbl); + acknowledge_notify(notify_id, handle, outbl); +} + +void MirrorWatcher::handle_rewatch_complete(int r) { + dout(5) << ": r=" << r << dendl; + + if (r == -EBLOCKLISTED) { + dout(0) << ": client blocklisted" <<dendl; + std::scoped_lock locker(m_lock); + m_blocklisted = true; + } else if (r == -ENOENT) { + derr << ": mirroring object deleted" << dendl; + m_failed = true; + } else if (r < 0) { + derr << ": rewatch error: " << cpp_strerror(r) << dendl; + m_failed = true; + } +} + +void MirrorWatcher::register_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *on_finish = new C_CallbackAdapter< + MirrorWatcher, &MirrorWatcher::handle_register_watcher>(this); + register_watch(on_finish); +} + +void MirrorWatcher::handle_register_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + std::swap(on_init_finish, m_on_init_finish); + } + + on_init_finish->complete(r); +} + +void MirrorWatcher::unregister_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *on_finish = new C_CallbackAdapter< + MirrorWatcher, &MirrorWatcher::handle_unregister_watcher>(this); + unregister_watch(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish)); +} + +void MirrorWatcher::handle_unregister_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_shutdown_finish = nullptr; + { + std::scoped_lock locker(m_lock); + std::swap(on_shutdown_finish, m_on_shutdown_finish); + } + + on_shutdown_finish->complete(r); +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/MirrorWatcher.h b/src/tools/cephfs_mirror/MirrorWatcher.h new file mode 100644 index 000000000..c4d4f4522 --- /dev/null +++ b/src/tools/cephfs_mirror/MirrorWatcher.h @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_MIRROR_WATCHER_H +#define CEPHFS_MIRROR_MIRROR_WATCHER_H + +#include <string_view> + +#include "common/ceph_mutex.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "Watcher.h" + +class ContextWQ; +class Messenger; + +namespace cephfs { +namespace mirror { + +class FSMirror; + +// watch for notifications via cephfs_mirror object (in metadata +// pool). this is used sending keepalived with keepalive payload +// being the rados instance address (used by the manager module +// to blocklist when needed). + +class MirrorWatcher : public Watcher { +public: + static MirrorWatcher *create(librados::IoCtx &ioctx, FSMirror *fs_mirror, + ContextWQ *work_queue) { + return new MirrorWatcher(ioctx, fs_mirror, work_queue); + } + + MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror, + ContextWQ *work_queue); + ~MirrorWatcher(); + + void init(Context *on_finish); + void shutdown(Context *on_finish); + + void handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) override; + void handle_rewatch_complete(int r) override; + + bool is_blocklisted() { + std::scoped_lock locker(m_lock); + return m_blocklisted; + } + + bool is_failed() { + std::scoped_lock locker(m_lock); + return m_failed; + } + +private: + librados::IoCtx &m_ioctx; + FSMirror *m_fs_mirror; + ContextWQ *m_work_queue; + + ceph::mutex m_lock; + std::string m_instance_id; + + Context *m_on_init_finish = nullptr; + Context *m_on_shutdown_finish = nullptr; + + bool m_blocklisted = false; + bool m_failed = false; + + void register_watcher(); + void handle_register_watcher(int r); + + void unregister_watcher(); + void handle_unregister_watcher(int r); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_MIRROR_WATCHER_H diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc new file mode 100644 index 000000000..aaf97b868 --- /dev/null +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -0,0 +1,1552 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <stack> +#include <fcntl.h> +#include <algorithm> +#include <sys/time.h> +#include <sys/file.h> +#include <boost/scope_exit.hpp> + +#include "common/admin_socket.h" +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "FSMirror.h" +#include "PeerReplayer.h" +#include "Utils.h" + +#include "json_spirit/json_spirit.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::PeerReplayer(" \ + << m_peer.uuid << ") " << __func__ + +namespace cephfs { +namespace mirror { + +namespace { + +const std::string PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer"; + +std::string snapshot_dir_path(CephContext *cct, const std::string &path) { + return path + "/" + cct->_conf->client_snapdir; +} + +std::string snapshot_path(const std::string &snap_dir, const std::string &snap_name) { + return snap_dir + "/" + snap_name; +} + +std::string snapshot_path(CephContext *cct, const std::string &path, const std::string &snap_name) { + return path + "/" + cct->_conf->client_snapdir + "/" + snap_name; +} + +std::string entry_path(const std::string &dir, const std::string &name) { + return dir + "/" + name; +} + +std::map<std::string, std::string> decode_snap_metadata(snap_metadata *snap_metadata, + size_t nr_snap_metadata) { + std::map<std::string, std::string> metadata; + for (size_t i = 0; i < nr_snap_metadata; ++i) { + metadata.emplace(snap_metadata[i].key, snap_metadata[i].value); + } + + return metadata; +} + +std::string peer_config_key(const std::string &fs_name, const std::string &uuid) { + return PEER_CONFIG_KEY_PREFIX + "/" + fs_name + "/" + uuid; +} + +class PeerAdminSocketCommand { +public: + virtual ~PeerAdminSocketCommand() { + } + virtual int call(Formatter *f) = 0; +}; + +class StatusCommand : public PeerAdminSocketCommand { +public: + explicit StatusCommand(PeerReplayer *peer_replayer) + : peer_replayer(peer_replayer) { + } + + int call(Formatter *f) override { + peer_replayer->peer_status(f); + return 0; + } + +private: + PeerReplayer *peer_replayer; +}; + +// helper to open a directory relative to a file descriptor +int opendirat(MountRef mnt, int dirfd, const std::string &relpath, int flags, + ceph_dir_result **dirp) { + int r = ceph_openat(mnt, dirfd, relpath.c_str(), flags, 0); + if (r < 0) { + return r; + } + + int fd = r; + r = ceph_fdopendir(mnt, fd, dirp); + ceph_close(mnt, fd); + return r; +} + +} // anonymous namespace + +class PeerReplayerAdminSocketHook : public AdminSocketHook { +public: + PeerReplayerAdminSocketHook(CephContext *cct, const Filesystem &filesystem, + const Peer &peer, PeerReplayer *peer_replayer) + : admin_socket(cct->get_admin_socket()) { + int r; + std::string cmd; + + // mirror peer status format is name@id uuid + cmd = "fs mirror peer status " + + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid) + + " " + + stringify(peer.uuid); + r = admin_socket->register_command( + cmd, this, "get peer mirror status"); + if (r == 0) { + commands[cmd] = new StatusCommand(peer_replayer); + } + } + + ~PeerReplayerAdminSocketHook() override { + admin_socket->unregister_commands(this); + for (auto &[command, cmdptr] : commands) { + delete cmdptr; + } + } + + int call(std::string_view command, const cmdmap_t& cmdmap, + Formatter *f, std::ostream &errss, bufferlist &out) override { + auto p = commands.at(std::string(command)); + return p->call(f); + } + +private: + typedef std::map<std::string, PeerAdminSocketCommand*, std::less<>> Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + +PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror, + RadosRef local_cluster, const Filesystem &filesystem, + const Peer &peer, const std::set<std::string, std::less<>> &directories, + MountRef mount, ServiceDaemon *service_daemon) + : m_cct(cct), + m_fs_mirror(fs_mirror), + m_local_cluster(local_cluster), + m_filesystem(filesystem), + m_peer(peer), + m_directories(directories.begin(), directories.end()), + m_local_mount(mount), + m_service_daemon(service_daemon), + m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)), + m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) { + // reset sync stats sent via service daemon + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, + SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0); + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, + SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, (uint64_t)0); +} + +PeerReplayer::~PeerReplayer() { + delete m_asok_hook; +} + +int PeerReplayer::init() { + dout(20) << ": initial dir list=[" << m_directories << "]" << dendl; + for (auto &dir_root : m_directories) { + m_snap_sync_stats.emplace(dir_root, SnapSyncStat()); + } + + auto &remote_client = m_peer.remote.client_name; + auto &remote_cluster = m_peer.remote.cluster_name; + auto remote_filesystem = Filesystem{0, m_peer.remote.fs_name}; + + std::string key = peer_config_key(m_filesystem.fs_name, m_peer.uuid); + std::string cmd = + "{" + "\"prefix\": \"config-key get\", " + "\"key\": \"" + key + "\"" + "}"; + + bufferlist in_bl; + bufferlist out_bl; + + int r = m_local_cluster->mon_command(cmd, in_bl, &out_bl, nullptr); + dout(5) << ": mon command r=" << r << dendl; + if (r < 0 && r != -ENOENT) { + return r; + } + + std::string mon_host; + std::string cephx_key; + if (!r) { + json_spirit::mValue root; + if (!json_spirit::read(out_bl.to_str(), root)) { + derr << ": invalid config-key JSON" << dendl; + return -EBADMSG; + } + try { + auto &root_obj = root.get_obj(); + mon_host = root_obj.at("mon_host").get_str(); + cephx_key = root_obj.at("key").get_str(); + dout(0) << ": remote monitor host=" << mon_host << dendl; + } catch (std::runtime_error&) { + derr << ": unexpected JSON received" << dendl; + return -EBADMSG; + } + } + + r = connect(remote_client, remote_cluster, &m_remote_cluster, mon_host, cephx_key); + if (r < 0) { + derr << ": error connecting to remote cluster: " << cpp_strerror(r) + << dendl; + return r; + } + + r = mount(m_remote_cluster, remote_filesystem, false, &m_remote_mount); + if (r < 0) { + m_remote_cluster.reset(); + derr << ": error mounting remote filesystem=" << remote_filesystem << dendl; + return r; + } + + std::scoped_lock locker(m_lock); + auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>( + "cephfs_mirror_max_concurrent_directory_syncs"); + dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl; + + while (nr_replayers-- > 0) { + std::unique_ptr<SnapshotReplayerThread> replayer( + new SnapshotReplayerThread(this)); + std::string name("replayer-" + stringify(nr_replayers)); + replayer->create(name.c_str()); + m_replayers.push_back(std::move(replayer)); + } + + return 0; +} + +void PeerReplayer::shutdown() { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + ceph_assert(!m_stopping); + m_stopping = true; + m_cond.notify_all(); + } + + for (auto &replayer : m_replayers) { + replayer->join(); + } + m_replayers.clear(); + ceph_unmount(m_remote_mount); + ceph_release(m_remote_mount); + m_remote_mount = nullptr; + m_remote_cluster.reset(); +} + +void PeerReplayer::add_directory(string_view dir_root) { + dout(20) << ": dir_root=" << dir_root << dendl; + + std::scoped_lock locker(m_lock); + m_directories.emplace_back(dir_root); + m_snap_sync_stats.emplace(dir_root, SnapSyncStat()); + m_cond.notify_all(); +} + +void PeerReplayer::remove_directory(string_view dir_root) { + dout(20) << ": dir_root=" << dir_root << dendl; + auto _dir_root = std::string(dir_root); + + std::scoped_lock locker(m_lock); + auto it = std::find(m_directories.begin(), m_directories.end(), _dir_root); + if (it != m_directories.end()) { + m_directories.erase(it); + } + + auto it1 = m_registered.find(_dir_root); + if (it1 == m_registered.end()) { + m_snap_sync_stats.erase(_dir_root); + } else { + it1->second.canceled = true; + } + m_cond.notify_all(); +} + +boost::optional<std::string> PeerReplayer::pick_directory() { + dout(20) << dendl; + + auto now = clock::now(); + auto retry_timo = g_ceph_context->_conf.get_val<uint64_t>( + "cephfs_mirror_retry_failed_directories_interval"); + + boost::optional<std::string> candidate; + for (auto &dir_root : m_directories) { + auto &sync_stat = m_snap_sync_stats.at(dir_root); + if (sync_stat.failed) { + std::chrono::duration<double> d = now - *sync_stat.last_failed; + if (d.count() < retry_timo) { + continue; + } + } + if (!m_registered.count(dir_root)) { + candidate = dir_root; + break; + } + } + + std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end()); + return candidate; +} + +int PeerReplayer::register_directory(const std::string &dir_root, + SnapshotReplayerThread *replayer) { + dout(20) << ": dir_root=" << dir_root << dendl; + ceph_assert(m_registered.find(dir_root) == m_registered.end()); + + DirRegistry registry; + int r = try_lock_directory(dir_root, replayer, ®istry); + if (r < 0) { + return r; + } + + dout(5) << ": dir_root=" << dir_root << " registered with replayer=" + << replayer << dendl; + m_registered.emplace(dir_root, std::move(registry)); + return 0; +} + +void PeerReplayer::unregister_directory(const std::string &dir_root) { + dout(20) << ": dir_root=" << dir_root << dendl; + + auto it = m_registered.find(dir_root); + ceph_assert(it != m_registered.end()); + + unlock_directory(it->first, it->second); + m_registered.erase(it); + if (std::find(m_directories.begin(), m_directories.end(), dir_root) == m_directories.end()) { + m_snap_sync_stats.erase(dir_root); + } +} + +int PeerReplayer::try_lock_directory(const std::string &dir_root, + SnapshotReplayerThread *replayer, DirRegistry *registry) { + dout(20) << ": dir_root=" << dir_root << dendl; + + int r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0); + if (r < 0 && r != -ENOENT) { + derr << ": failed to open remote dir_root=" << dir_root << ": " << cpp_strerror(r) + << dendl; + return r; + } + + if (r == -ENOENT) { + // we snap under dir_root, so mode does not matter much + r = ceph_mkdirs(m_remote_mount, dir_root.c_str(), 0755); + if (r < 0) { + derr << ": failed to create remote directory=" << dir_root << ": " << cpp_strerror(r) + << dendl; + return r; + } + + r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0); + if (r < 0) { + derr << ": failed to open remote dir_root=" << dir_root << ": " << cpp_strerror(r) + << dendl; + return r; + } + } + + int fd = r; + r = ceph_flock(m_remote_mount, fd, LOCK_EX | LOCK_NB, (uint64_t)replayer->get_thread_id()); + if (r != 0) { + if (r == -EWOULDBLOCK) { + dout(5) << ": dir_root=" << dir_root << " is locked by cephfs-mirror, " + << "will retry again" << dendl; + } else { + derr << ": failed to lock dir_root=" << dir_root << ": " << cpp_strerror(r) + << dendl; + } + + if (ceph_close(m_remote_mount, fd) < 0) { + derr << ": failed to close (cleanup) remote dir_root=" << dir_root << ": " + << cpp_strerror(r) << dendl; + } + return r; + } + + dout(10) << ": dir_root=" << dir_root << " locked" << dendl; + + registry->fd = fd; + registry->replayer = replayer; + return 0; +} + +void PeerReplayer::unlock_directory(const std::string &dir_root, const DirRegistry ®istry) { + dout(20) << ": dir_root=" << dir_root << dendl; + + int r = ceph_flock(m_remote_mount, registry.fd, LOCK_UN, + (uint64_t)registry.replayer->get_thread_id()); + if (r < 0) { + derr << ": failed to unlock remote dir_root=" << dir_root << ": " << cpp_strerror(r) + << dendl; + return; + } + + r = ceph_close(m_remote_mount, registry.fd); + if (r < 0) { + derr << ": failed to close remote dir_root=" << dir_root << ": " << cpp_strerror(r) + << dendl; + } + + dout(10) << ": dir_root=" << dir_root << " unlocked" << dendl; +} + +int PeerReplayer::build_snap_map(const std::string &dir_root, + std::map<uint64_t, std::string> *snap_map, bool is_remote) { + auto snap_dir = snapshot_dir_path(m_cct, dir_root); + dout(20) << ": dir_root=" << dir_root << ", snap_dir=" << snap_dir + << ", is_remote=" << is_remote << dendl; + + auto lr_str = is_remote ? "remote" : "local"; + auto mnt = is_remote ? m_remote_mount : m_local_mount; + + ceph_dir_result *dirp = nullptr; + int r = ceph_opendir(mnt, snap_dir.c_str(), &dirp); + if (r < 0) { + if (is_remote && r == -ENOENT) { + return 0; + } + derr << ": failed to open " << lr_str << " snap directory=" << snap_dir + << ": " << cpp_strerror(r) << dendl; + return r; + } + + std::set<std::string> snaps; + auto entry = ceph_readdir(mnt, dirp); + while (entry != NULL) { + auto d_name = std::string(entry->d_name); + dout(20) << ": entry=" << d_name << dendl; + if (d_name != "." && d_name != ".." && d_name.rfind("_", 0) != 0) { + snaps.emplace(d_name); + } + + entry = ceph_readdir(mnt, dirp); + } + + int rv = 0; + for (auto &snap : snaps) { + snap_info info; + auto snap_path = snapshot_path(snap_dir, snap); + r = ceph_get_snap_info(mnt, snap_path.c_str(), &info); + if (r < 0) { + derr << ": failed to fetch " << lr_str << " snap info for snap_path=" << snap_path + << ": " << cpp_strerror(r) << dendl; + rv = r; + break; + } + + uint64_t snap_id; + if (is_remote) { + if (!info.nr_snap_metadata) { + derr << ": snap_path=" << snap_path << " has invalid metadata in remote snapshot" + << dendl; + rv = -EINVAL; + } else { + auto metadata = decode_snap_metadata(info.snap_metadata, info.nr_snap_metadata); + dout(20) << ": snap_path=" << snap_path << ", metadata=" << metadata << dendl; + auto it = metadata.find(PRIMARY_SNAP_ID_KEY); + if (it == metadata.end()) { + derr << ": snap_path=" << snap_path << " has missing \"" << PRIMARY_SNAP_ID_KEY + << "\" in metadata" << dendl; + rv = -EINVAL; + } else { + snap_id = std::stoull(it->second); + } + ceph_free_snap_info_buffer(&info); + } + } else { + snap_id = info.id; + } + + if (rv != 0) { + break; + } + snap_map->emplace(snap_id, snap); + } + + r = ceph_closedir(mnt, dirp); + if (r < 0) { + derr << ": failed to close " << lr_str << " snap directory=" << snap_dir + << ": " << cpp_strerror(r) << dendl; + } + + dout(10) << ": " << lr_str << " snap_map=" << *snap_map << dendl; + return rv; +} + +int PeerReplayer::propagate_snap_deletes(const std::string &dir_root, + const std::set<std::string> &snaps) { + dout(5) << ": dir_root=" << dir_root << ", deleted snapshots=" << snaps << dendl; + + for (auto &snap : snaps) { + dout(20) << ": deleting dir_root=" << dir_root << ", snapshot=" << snap + << dendl; + int r = ceph_rmsnap(m_remote_mount, dir_root.c_str(), snap.c_str()); + if (r < 0) { + derr << ": failed to delete remote snap dir_root=" << dir_root + << ", snapshot=" << snaps << ": " << cpp_strerror(r) << dendl; + return r; + } + inc_deleted_snap(dir_root); + } + + return 0; +} + +int PeerReplayer::propagate_snap_renames( + const std::string &dir_root, + const std::set<std::pair<std::string,std::string>> &snaps) { + dout(10) << ": dir_root=" << dir_root << ", renamed snapshots=" << snaps << dendl; + + for (auto &snapp : snaps) { + auto from = snapshot_path(m_cct, dir_root, snapp.first); + auto to = snapshot_path(m_cct, dir_root, snapp.second); + dout(20) << ": renaming dir_root=" << dir_root << ", snapshot from=" + << from << ", to=" << to << dendl; + int r = ceph_rename(m_remote_mount, from.c_str(), to.c_str()); + if (r < 0) { + derr << ": failed to rename remote snap dir_root=" << dir_root + << ", snapshot from =" << from << ", to=" << to << ": " + << cpp_strerror(r) << dendl; + return r; + } + inc_renamed_snap(dir_root); + } + + return 0; +} + +int PeerReplayer::remote_mkdir(const std::string &epath, const struct ceph_statx &stx, + const FHandles &fh) { + dout(10) << ": remote epath=" << epath << dendl; + + int r = ceph_mkdirat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFDIR); + if (r < 0 && r != -EEXIST) { + derr << ": failed to create remote directory=" << epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid, + AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFMT, + AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to chmod remote directory=" << epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec}, + {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}}; + r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to change [am]time on remote directory=" << epath << ": " + << cpp_strerror(r) << dendl; + return r; + } + + return 0; +} + +#define NR_IOVECS 8 // # iovecs +#define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec +int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string &epath, + const struct ceph_statx &stx, const FHandles &fh) { + dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl; + int l_fd; + int r_fd; + void *ptr; + struct iovec iov[NR_IOVECS]; + + int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0); + if (r < 0) { + derr << ": failed to open local file path=" << epath << ": " + << cpp_strerror(r) << dendl; + return r; + } + + l_fd = r; + r = ceph_openat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), + O_CREAT | O_TRUNC | O_WRONLY | O_NOFOLLOW, stx.stx_mode); + if (r < 0) { + derr << ": failed to create remote file path=" << epath << ": " + << cpp_strerror(r) << dendl; + goto close_local_fd; + } + + r_fd = r; + ptr = malloc(NR_IOVECS * IOVEC_SIZE); + if (!ptr) { + r = -ENOMEM; + derr << ": failed to allocate memory" << dendl; + goto close_remote_fd; + } + + while (true) { + if (should_backoff(dir_root, &r)) { + dout(0) << ": backing off r=" << r << dendl; + break; + } + + for (int i = 0; i < NR_IOVECS; ++i) { + iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i; + iov[i].iov_len = IOVEC_SIZE; + } + + r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1); + if (r < 0) { + derr << ": failed to read local file path=" << epath << ": " + << cpp_strerror(r) << dendl; + break; + } + if (r == 0) { + break; + } + + int iovs = (int)(r / IOVEC_SIZE); + int t = r % IOVEC_SIZE; + if (t) { + iov[iovs].iov_len = t; + ++iovs; + } + + r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1); + if (r < 0) { + derr << ": failed to write remote file path=" << epath << ": " + << cpp_strerror(r) << dendl; + break; + } + } + + if (r == 0) { + r = ceph_fsync(m_remote_mount, r_fd, 0); + if (r < 0) { + derr << ": failed to sync data for file path=" << epath << ": " + << cpp_strerror(r) << dendl; + } + } + + free(ptr); + +close_remote_fd: + if (ceph_close(m_remote_mount, r_fd) < 0) { + derr << ": failed to close remote fd path=" << epath << ": " << cpp_strerror(r) + << dendl; + return -EINVAL; + } + +close_local_fd: + if (ceph_close(m_local_mount, l_fd) < 0) { + derr << ": failed to close local fd path=" << epath << ": " << cpp_strerror(r) + << dendl; + return -EINVAL; + } + + return r == 0 ? 0 : r; +} + +int PeerReplayer::remote_file_op(const std::string &dir_root, const std::string &epath, + const struct ceph_statx &stx, const FHandles &fh, + bool need_data_sync, bool need_attr_sync) { + dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync + << ", need_attr_sync=" << need_attr_sync << dendl; + + int r; + if (need_data_sync) { + if (S_ISREG(stx.stx_mode)) { + r = copy_to_remote(dir_root, epath, stx, fh); + if (r < 0) { + derr << ": failed to copy path=" << epath << ": " << cpp_strerror(r) << dendl; + return r; + } + } else if (S_ISLNK(stx.stx_mode)) { + // free the remote link before relinking + r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), 0); + if (r < 0 && r != -ENOENT) { + derr << ": failed to remove remote symlink=" << epath << dendl; + return r; + } + char *target = (char *)alloca(stx.stx_size+1); + r = ceph_readlinkat(m_local_mount, fh.c_fd, epath.c_str(), target, stx.stx_size); + if (r < 0) { + derr << ": failed to readlink local path=" << epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + target[stx.stx_size] = '\0'; + r = ceph_symlinkat(m_remote_mount, target, fh.r_fd_dir_root, epath.c_str()); + if (r < 0 && r != EEXIST) { + derr << ": failed to symlink remote path=" << epath << " to target=" << target + << ": " << cpp_strerror(r) << dendl; + return r; + } + } else { + dout(5) << ": skipping entry=" << epath << ": unsupported mode=" << stx.stx_mode + << dendl; + return 0; + } + } + + if (need_attr_sync) { + r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid, + AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFMT, + AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to chmod remote directory=" << epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec}, + {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}}; + r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to change [am]time on remote directory=" << epath << ": " + << cpp_strerror(r) << dendl; + return r; + } + } + + return 0; +} + +int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, + const std::string &epath, const FHandles &fh) { + dout(20) << ": dir_root=" << dir_root << ", epath=" << epath + << dendl; + + struct ceph_statx tstx; + int r = ceph_statxat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), &tstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to stat remote directory=" << epath << ": " + << cpp_strerror(r) << dendl; + return r; + } + + ceph_dir_result *tdirp; + r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW, + &tdirp); + if (r < 0) { + derr << ": failed to open remote directory=" << epath << ": " + << cpp_strerror(r) << dendl; + return r; + } + + std::stack<SyncEntry> rm_stack; + rm_stack.emplace(SyncEntry(epath, tdirp, tstx)); + while (!rm_stack.empty()) { + if (should_backoff(dir_root, &r)) { + dout(0) << ": backing off r=" << r << dendl; + break; + } + + dout(20) << ": " << rm_stack.size() << " entries in stack" << dendl; + std::string e_name; + auto &entry = rm_stack.top(); + dout(20) << ": top of stack path=" << entry.epath << dendl; + if (entry.is_directory()) { + struct ceph_statx stx; + struct dirent de; + while (true) { + r = ceph_readdirplus_r(m_remote_mount, entry.dirp, &de, &stx, + CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); + if (r < 0) { + derr << ": failed to read remote directory=" << entry.epath << dendl; + break; + } + if (r == 0) { + break; + } + + auto d_name = std::string(de.d_name); + if (d_name != "." && d_name != "..") { + e_name = d_name; + break; + } + } + + if (r == 0) { + r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), AT_REMOVEDIR); + if (r < 0) { + derr << ": failed to remove remote directory=" << entry.epath << ": " + << cpp_strerror(r) << dendl; + break; + } + + dout(10) << ": done for remote directory=" << entry.epath << dendl; + if (ceph_closedir(m_remote_mount, entry.dirp) < 0) { + derr << ": failed to close remote directory=" << entry.epath << dendl; + } + rm_stack.pop(); + continue; + } + if (r < 0) { + break; + } + + auto epath = entry_path(entry.epath, e_name); + if (S_ISDIR(stx.stx_mode)) { + ceph_dir_result *dirp; + r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW, + &dirp); + if (r < 0) { + derr << ": failed to open remote directory=" << epath << ": " + << cpp_strerror(r) << dendl; + break; + } + rm_stack.emplace(SyncEntry(epath, dirp, stx)); + } else { + rm_stack.emplace(SyncEntry(epath, stx)); + } + } else { + r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), 0); + if (r < 0) { + derr << ": failed to remove remote directory=" << entry.epath << ": " + << cpp_strerror(r) << dendl; + break; + } + dout(10) << ": done for remote file=" << entry.epath << dendl; + rm_stack.pop(); + } + } + + while (!rm_stack.empty()) { + auto &entry = rm_stack.top(); + if (entry.is_directory()) { + dout(20) << ": closing remote directory=" << entry.epath << dendl; + if (ceph_closedir(m_remote_mount, entry.dirp) < 0) { + derr << ": failed to close remote directory=" << entry.epath << dendl; + } + } + + rm_stack.pop(); + } + + return r; +} + +int PeerReplayer::should_sync_entry(const std::string &epath, const struct ceph_statx &cstx, + const FHandles &fh, bool *need_data_sync, bool *need_attr_sync) { + dout(10) << ": epath=" << epath << dendl; + + *need_data_sync = false; + *need_attr_sync = false; + struct ceph_statx pstx; + int r = ceph_statxat(fh.p_mnt, fh.p_fd, epath.c_str(), &pstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_CTIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0 && r != -ENOENT && r != -ENOTDIR) { + derr << ": failed to stat prev entry= " << epath << ": " << cpp_strerror(r) + << dendl; + return r; + } + + if (r < 0) { + // inode does not exist in prev snapshot or file type has changed + // (file was S_IFREG earlier, S_IFDIR now). + dout(5) << ": entry=" << epath << ", r=" << r << dendl; + *need_data_sync = true; + *need_attr_sync = true; + return 0; + } + + dout(10) << ": local cur statx: mode=" << cstx.stx_mode << ", uid=" << cstx.stx_uid + << ", gid=" << cstx.stx_gid << ", size=" << cstx.stx_size << ", ctime=" + << cstx.stx_ctime << ", mtime=" << cstx.stx_mtime << dendl; + dout(10) << ": local prev statx: mode=" << pstx.stx_mode << ", uid=" << pstx.stx_uid + << ", gid=" << pstx.stx_gid << ", size=" << pstx.stx_size << ", ctime=" + << pstx.stx_ctime << ", mtime=" << pstx.stx_mtime << dendl; + if ((cstx.stx_mode & S_IFMT) != (pstx.stx_mode & S_IFMT)) { + dout(5) << ": entry=" << epath << " has mode mismatch" << dendl; + *need_data_sync = true; + *need_attr_sync = true; + } else { + *need_data_sync = (cstx.stx_size != pstx.stx_size) || (cstx.stx_mtime != pstx.stx_mtime); + *need_attr_sync = (cstx.stx_ctime != pstx.stx_ctime); + } + + return 0; +} + +int PeerReplayer::propagate_deleted_entries(const std::string &dir_root, + const std::string &epath, const FHandles &fh) { + dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl; + + ceph_dir_result *dirp; + int r = opendirat(fh.p_mnt, fh.p_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp); + if (r < 0) { + if (r == -ELOOP) { + dout(5) << ": epath=" << epath << " is a symbolic link -- mode sync" + << " done when traversing parent" << dendl; + return 0; + } + if (r == -ENOTDIR) { + dout(5) << ": epath=" << epath << " is not a directory -- mode sync" + << " done when traversing parent" << dendl; + return 0; + } + if (r == -ENOENT) { + dout(5) << ": epath=" << epath << " missing in previous-snap/remote dir-root" + << dendl; + } + return r; + } + + struct dirent *dire = (struct dirent *)alloca(512 * sizeof(struct dirent)); + while (true) { + if (should_backoff(dir_root, &r)) { + dout(0) << ": backing off r=" << r << dendl; + break; + } + + int len = ceph_getdents(fh.p_mnt, dirp, (char *)dire, 512); + if (len < 0) { + derr << ": failed to read directory entries: " << cpp_strerror(len) << dendl; + r = len; + // flip errno to signal that we got an err (possible the + // snapshot getting deleted in midst). + if (r == -ENOENT) { + r = -EINVAL; + } + break; + } + if (len == 0) { + dout(10) << ": reached EOD" << dendl; + break; + } + int nr = len / sizeof(struct dirent); + for (int i = 0; i < nr; ++i) { + if (should_backoff(dir_root, &r)) { + dout(0) << ": backing off r=" << r << dendl; + break; + } + std::string d_name = std::string(dire[i].d_name); + if (d_name == "." || d_name == "..") { + continue; + } + + struct ceph_statx pstx; + auto dpath = entry_path(epath, d_name); + r = ceph_statxat(fh.p_mnt, fh.p_fd, dpath.c_str(), &pstx, + CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to stat (prev) directory=" << dpath << ": " + << cpp_strerror(r) << dendl; + // flip errno to signal that we got an err (possible the + // snapshot getting deleted in midst). + if (r == -ENOENT) { + r = -EINVAL; + } + return r; + } + + struct ceph_statx cstx; + r = ceph_statxat(m_local_mount, fh.c_fd, dpath.c_str(), &cstx, + CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0 && r != -ENOENT) { + derr << ": failed to stat local (cur) directory=" << dpath << ": " + << cpp_strerror(r) << dendl; + return r; + } + + bool purge_remote = true; + if (r == 0) { + // directory entry present in both snapshots -- check inode + // type + if ((pstx.stx_mode & S_IFMT) == (cstx.stx_mode & S_IFMT)) { + dout(5) << ": mode matches for entry=" << d_name << dendl; + purge_remote = false; + } else { + dout(5) << ": mode mismatch for entry=" << d_name << dendl; + } + } else { + dout(5) << ": entry=" << d_name << " missing in current snapshot" << dendl; + } + + if (purge_remote) { + dout(5) << ": purging remote entry=" << dpath << dendl; + if (S_ISDIR(pstx.stx_mode)) { + r = cleanup_remote_dir(dir_root, dpath, fh); + } else { + r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, dpath.c_str(), 0); + } + + if (r < 0 && r != -ENOENT) { + derr << ": failed to cleanup remote entry=" << d_name << ": " + << cpp_strerror(r) << dendl; + return r; + } + } + } + } + + ceph_closedir(fh.p_mnt, dirp); + return r; +} + +int PeerReplayer::open_dir(MountRef mnt, const std::string &dir_path, + boost::optional<uint64_t> snap_id) { + dout(20) << ": dir_path=" << dir_path << dendl; + if (snap_id) { + dout(20) << ": expected snapshot id=" << *snap_id << dendl; + } + + int fd = ceph_open(mnt, dir_path.c_str(), O_DIRECTORY | O_RDONLY, 0); + if (fd < 0) { + derr << ": cannot open dir_path=" << dir_path << ": " << cpp_strerror(fd) + << dendl; + return fd; + } + + if (!snap_id) { + return fd; + } + + snap_info info; + int r = ceph_get_snap_info(mnt, dir_path.c_str(), &info); + if (r < 0) { + derr << ": failed to fetch snap_info for path=" << dir_path + << ": " << cpp_strerror(r) << dendl; + ceph_close(mnt, fd); + return r; + } + + if (info.id != *snap_id) { + dout(5) << ": got mismatching snapshot id for path=" << dir_path << " (" << info.id + << " vs " << *snap_id << ") -- possible recreate" << dendl; + ceph_close(mnt, fd); + return -EINVAL; + } + + return fd; +} + +int PeerReplayer::pre_sync_check_and_open_handles( + const std::string &dir_root, + const Snapshot ¤t, boost::optional<Snapshot> prev, + FHandles *fh) { + dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; + if (prev) { + dout(20) << ": prev=" << prev << dendl; + } + + auto cur_snap_path = snapshot_path(m_cct, dir_root, current.first); + auto fd = open_dir(m_local_mount, cur_snap_path, current.second); + if (fd < 0) { + return fd; + } + + // current snapshot file descriptor + fh->c_fd = fd; + + MountRef mnt; + if (prev) { + mnt = m_local_mount; + auto prev_snap_path = snapshot_path(m_cct, dir_root, (*prev).first); + fd = open_dir(mnt, prev_snap_path, (*prev).second); + } else { + mnt = m_remote_mount; + fd = open_dir(mnt, dir_root, boost::none); + } + + if (fd < 0) { + if (!prev || fd != -ENOENT) { + ceph_close(m_local_mount, fh->c_fd); + return fd; + } + + // ENOENT of previous snap + dout(5) << ": previous snapshot=" << *prev << " missing" << dendl; + mnt = m_remote_mount; + fd = open_dir(mnt, dir_root, boost::none); + if (fd < 0) { + ceph_close(m_local_mount, fh->c_fd); + return fd; + } + } + + // "previous" snapshot or dir_root file descriptor + fh->p_fd = fd; + fh->p_mnt = mnt; + + { + std::scoped_lock locker(m_lock); + auto it = m_registered.find(dir_root); + ceph_assert(it != m_registered.end()); + fh->r_fd_dir_root = it->second.fd; + } + + dout(5) << ": using " << ((fh->p_mnt == m_local_mount) ? "local (previous) snapshot" : "remote dir_root") + << " for incremental transfer" << dendl; + return 0; +} + +void PeerReplayer::post_sync_close_handles(const FHandles &fh) { + dout(20) << dendl; + + // @FHandles.r_fd_dir_root is closed in @unregister_directory since + // its used to acquire an exclusive lock on remote dir_root. + ceph_close(m_local_mount, fh.c_fd); + ceph_close(fh.p_mnt, fh.p_fd); +} + +int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t, + boost::optional<Snapshot> prev) { + dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; + if (prev) { + dout(20) << ": incremental sync check from prev=" << prev << dendl; + } + + FHandles fh; + int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh); + if (r < 0) { + dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r) << dendl; + return r; + } + + BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) { + post_sync_close_handles(fh); + }; + + // record that we are going to "dirty" the data under this + // directory root + auto snap_id_str{stringify(current.second)}; + r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id", + snap_id_str.c_str(), snap_id_str.size(), 0); + if (r < 0) { + derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root + << ": " << cpp_strerror(r) << dendl; + return r; + } + + struct ceph_statx tstx; + r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r) + << dendl; + return r; + } + + ceph_dir_result *tdirp; + r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp); + if (r < 0) { + derr << ": failed to open local snap=" << current.first << ": " << cpp_strerror(r) + << dendl; + return r; + } + + std::stack<SyncEntry> sync_stack; + sync_stack.emplace(SyncEntry(".", tdirp, tstx)); + while (!sync_stack.empty()) { + if (should_backoff(dir_root, &r)) { + dout(0) << ": backing off r=" << r << dendl; + break; + } + + dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl; + std::string e_name; + auto &entry = sync_stack.top(); + dout(20) << ": top of stack path=" << entry.epath << dendl; + if (entry.is_directory()) { + // entry is a directory -- propagate deletes for missing entries + // (and changed inode types) to the remote filesystem. + if (!entry.needs_remote_sync()) { + r = propagate_deleted_entries(dir_root, entry.epath, fh); + if (r < 0 && r != -ENOENT) { + derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl; + break; + } + entry.set_remote_synced(); + } + + struct ceph_statx stx; + struct dirent de; + while (true) { + r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); + if (r < 0) { + derr << ": failed to local read directory=" << entry.epath << dendl; + break; + } + if (r == 0) { + break; + } + + auto d_name = std::string(de.d_name); + if (d_name != "." && d_name != "..") { + e_name = d_name; + break; + } + } + + if (r == 0) { + dout(10) << ": done for directory=" << entry.epath << dendl; + if (ceph_closedir(m_local_mount, entry.dirp) < 0) { + derr << ": failed to close local directory=" << entry.epath << dendl; + } + sync_stack.pop(); + continue; + } + if (r < 0) { + break; + } + + auto epath = entry_path(entry.epath, e_name); + if (S_ISDIR(stx.stx_mode)) { + r = remote_mkdir(epath, stx, fh); + if (r < 0) { + break; + } + ceph_dir_result *dirp; + r = opendirat(m_local_mount, fh.c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp); + if (r < 0) { + derr << ": failed to open local directory=" << epath << ": " + << cpp_strerror(r) << dendl; + break; + } + sync_stack.emplace(SyncEntry(epath, dirp, stx)); + } else { + sync_stack.emplace(SyncEntry(epath, stx)); + } + } else { + bool need_data_sync = true; + bool need_attr_sync = true; + r = should_sync_entry(entry.epath, entry.stx, fh, + &need_data_sync, &need_attr_sync); + if (r < 0) { + break; + } + + dout(5) << ": entry=" << entry.epath << ", data_sync=" << need_data_sync + << ", attr_sync=" << need_attr_sync << dendl; + if (need_data_sync || need_attr_sync) { + r = remote_file_op(dir_root, entry.epath, entry.stx, fh, need_data_sync, + need_attr_sync); + if (r < 0) { + break; + } + } + dout(10) << ": done for epath=" << entry.epath << dendl; + sync_stack.pop(); + } + } + + while (!sync_stack.empty()) { + auto &entry = sync_stack.top(); + if (entry.is_directory()) { + dout(20) << ": closing local directory=" << entry.epath << dendl; + if (ceph_closedir(m_local_mount, entry.dirp) < 0) { + derr << ": failed to close local directory=" << entry.epath << dendl; + } + } + + sync_stack.pop(); + } + + return r; +} + +int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot ¤t, + boost::optional<Snapshot> prev) { + dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; + if (prev) { + dout(20) << ": prev=" << prev << dendl; + } + + int r = ceph_getxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", nullptr, 0); + if (r < 0 && r != -ENODATA) { + derr << ": failed to fetch primary_snap_id length from dir_root=" << dir_root + << ": " << cpp_strerror(r) << dendl; + return r; + } + + // no xattr, can't determine which snap the data belongs to! + if (r < 0) { + dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using" + << " incremental sync with remote scan" << dendl; + r = do_synchronize(dir_root, current, boost::none); + } else { + size_t xlen = r; + char *val = (char *)alloca(xlen+1); + r = ceph_getxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", (void*)val, xlen); + if (r < 0) { + derr << ": failed to fetch \"dirty_snap_id\" for dir_root: " << dir_root + << ": " << cpp_strerror(r) << dendl; + return r; + } + + val[xlen] = '\0'; + uint64_t dirty_snap_id = atoll(val); + + dout(20) << ": dirty_snap_id: " << dirty_snap_id << " vs (" << current.second + << "," << (prev ? stringify((*prev).second) : "~") << ")" << dendl; + if (prev && (dirty_snap_id == (*prev).second || dirty_snap_id == current.second)) { + dout(5) << ": match -- using incremental sync with local scan" << dendl; + r = do_synchronize(dir_root, current, prev); + } else { + dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl; + r = do_synchronize(dir_root, current, boost::none); + } + } + + // snap sync failed -- bail out! + if (r < 0) { + return r; + } + + auto cur_snap_id_str{stringify(current.second)}; + snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), cur_snap_id_str.c_str()}}; + r = ceph_mksnap(m_remote_mount, dir_root.c_str(), current.first.c_str(), 0755, + snap_meta, sizeof(snap_meta)/sizeof(snap_metadata)); + if (r < 0) { + derr << ": failed to snap remote directory dir_root=" << dir_root + << ": " << cpp_strerror(r) << dendl; + } + + return r; +} + +int PeerReplayer::do_sync_snaps(const std::string &dir_root) { + dout(20) << ": dir_root=" << dir_root << dendl; + + std::map<uint64_t, std::string> local_snap_map; + std::map<uint64_t, std::string> remote_snap_map; + + int r = build_snap_map(dir_root, &local_snap_map); + if (r < 0) { + derr << ": failed to build local snap map" << dendl; + return r; + } + + r = build_snap_map(dir_root, &remote_snap_map, true); + if (r < 0) { + derr << ": failed to build remote snap map" << dendl; + return r; + } + + // infer deleted and renamed snapshots from local and remote + // snap maps + std::set<std::string> snaps_deleted; + std::set<std::pair<std::string,std::string>> snaps_renamed; + for (auto &[primary_snap_id, snap_name] : remote_snap_map) { + auto it = local_snap_map.find(primary_snap_id); + if (it == local_snap_map.end()) { + snaps_deleted.emplace(snap_name); + } else if (it->second != snap_name) { + snaps_renamed.emplace(std::make_pair(snap_name, it->second)); + } + } + + r = propagate_snap_deletes(dir_root, snaps_deleted); + if (r < 0) { + derr << ": failed to propgate deleted snapshots" << dendl; + return r; + } + + r = propagate_snap_renames(dir_root, snaps_renamed); + if (r < 0) { + derr << ": failed to propgate renamed snapshots" << dendl; + return r; + } + + // start mirroring snapshots from the last snap-id synchronized + uint64_t last_snap_id = 0; + std::string last_snap_name; + if (!remote_snap_map.empty()) { + auto last = remote_snap_map.rbegin(); + last_snap_id = last->first; + last_snap_name = last->second; + set_last_synced_snap(dir_root, last_snap_id, last_snap_name); + } + + dout(5) << ": last snap-id transferred=" << last_snap_id << dendl; + auto it = local_snap_map.upper_bound(last_snap_id); + if (it == local_snap_map.end()) { + dout(20) << ": nothing to synchronize" << dendl; + return 0; + } + + auto snaps_per_cycle = g_ceph_context->_conf.get_val<uint64_t>( + "cephfs_mirror_max_snapshot_sync_per_cycle"); + + dout(10) << ": synchronizing from snap-id=" << it->first << dendl; + for (; it != local_snap_map.end(); ++it) { + set_current_syncing_snap(dir_root, it->first, it->second); + auto start = clock::now(); + boost::optional<Snapshot> prev = boost::none; + if (last_snap_id != 0) { + prev = std::make_pair(last_snap_name, last_snap_id); + } + r = synchronize(dir_root, std::make_pair(it->second, it->first), prev); + if (r < 0) { + derr << ": failed to synchronize dir_root=" << dir_root + << ", snapshot=" << it->second << dendl; + clear_current_syncing_snap(dir_root); + return r; + } + std::chrono::duration<double> duration = clock::now() - start; + set_last_synced_stat(dir_root, it->first, it->second, duration.count()); + if (--snaps_per_cycle == 0) { + break; + } + + last_snap_name = it->second; + last_snap_id = it->first; + } + + return 0; +} + +void PeerReplayer::sync_snaps(const std::string &dir_root, + std::unique_lock<ceph::mutex> &locker) { + dout(20) << ": dir_root=" << dir_root << dendl; + locker.unlock(); + int r = do_sync_snaps(dir_root); + if (r < 0) { + derr << ": failed to sync snapshots for dir_root=" << dir_root << dendl; + } + locker.lock(); + if (r < 0) { + _inc_failed_count(dir_root); + } else { + _reset_failed_count(dir_root); + } +} + +void PeerReplayer::run(SnapshotReplayerThread *replayer) { + dout(10) << ": snapshot replayer=" << replayer << dendl; + + time last_directory_scan = clock::zero(); + auto scan_interval = g_ceph_context->_conf.get_val<uint64_t>( + "cephfs_mirror_directory_scan_interval"); + + std::unique_lock locker(m_lock); + while (true) { + // do not check if client is blocklisted under lock + m_cond.wait_for(locker, 1s, [this]{return is_stopping();}); + if (is_stopping()) { + dout(5) << ": exiting" << dendl; + break; + } + + locker.unlock(); + + if (m_fs_mirror->is_blocklisted()) { + dout(5) << ": exiting as client is blocklisted" << dendl; + break; + } + + locker.lock(); + + auto now = clock::now(); + std::chrono::duration<double> timo = now - last_directory_scan; + if (timo.count() >= scan_interval && m_directories.size()) { + dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl; + auto dir_root = pick_directory(); + if (dir_root) { + dout(5) << ": picked dir_root=" << *dir_root << dendl; + int r = register_directory(*dir_root, replayer); + if (r == 0) { + sync_snaps(*dir_root, locker); + unregister_directory(*dir_root); + } + } + + last_directory_scan = now; + } + } +} + +void PeerReplayer::peer_status(Formatter *f) { + std::scoped_lock locker(m_lock); + f->open_object_section("stats"); + for (auto &[dir_root, sync_stat] : m_snap_sync_stats) { + f->open_object_section(dir_root); + if (sync_stat.failed) { + f->dump_string("state", "failed"); + } else if (!sync_stat.current_syncing_snap) { + f->dump_string("state", "idle"); + } else { + f->dump_string("state", "syncing"); + f->open_object_section("current_sycning_snap"); + f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first); + f->dump_string("name", (*sync_stat.current_syncing_snap).second); + f->close_section(); + } + if (sync_stat.last_synced_snap) { + f->open_object_section("last_synced_snap"); + f->dump_unsigned("id", (*sync_stat.last_synced_snap).first); + f->dump_string("name", (*sync_stat.last_synced_snap).second); + if (sync_stat.last_sync_duration) { + f->dump_float("sync_duration", *sync_stat.last_sync_duration); + f->dump_stream("sync_time_stamp") << sync_stat.last_synced; + } + f->close_section(); + } + f->dump_unsigned("snaps_synced", sync_stat.synced_snap_count); + f->dump_unsigned("snaps_deleted", sync_stat.deleted_snap_count); + f->dump_unsigned("snaps_renamed", sync_stat.renamed_snap_count); + f->close_section(); // dir_root + } + f->close_section(); // stats +} + +void PeerReplayer::reopen_logs() { + std::scoped_lock locker(m_lock); + + if (m_remote_cluster) { + reinterpret_cast<CephContext *>(m_remote_cluster->cct())->reopen_logs(); + } +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h new file mode 100644 index 000000000..886c95329 --- /dev/null +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -0,0 +1,319 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_PEER_REPLAYER_H +#define CEPHFS_MIRROR_PEER_REPLAYER_H + +#include "common/Formatter.h" +#include "common/Thread.h" +#include "mds/FSMap.h" +#include "ServiceDaemon.h" +#include "Types.h" + +namespace cephfs { +namespace mirror { + +class FSMirror; +class PeerReplayerAdminSocketHook; + +class PeerReplayer { +public: + PeerReplayer(CephContext *cct, FSMirror *fs_mirror, + RadosRef local_cluster, const Filesystem &filesystem, + const Peer &peer, const std::set<std::string, std::less<>> &directories, + MountRef mount, ServiceDaemon *service_daemon); + ~PeerReplayer(); + + // initialize replayer for a peer + int init(); + + // shutdown replayer for a peer + void shutdown(); + + // add a directory to mirror queue + void add_directory(string_view dir_root); + + // remove a directory from queue + void remove_directory(string_view dir_root); + + // admin socket helpers + void peer_status(Formatter *f); + + // reopen logs + void reopen_logs(); + +private: + inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id"; + + inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count"; + inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count"; + + using Snapshot = std::pair<std::string, uint64_t>; + + // file descriptor "triplet" for synchronizing a snapshot + // w/ an added MountRef for accessing "previous" snapshot. + struct FHandles { + // open file descriptor on the snap directory for snapshot + // currently being synchronized. Always use this fd with + // @m_local_mount. + int c_fd; + + // open file descriptor on the "previous" snapshot or on + // dir_root on remote filesystem (based on if the snapshot + // can be used for incremental transfer). Always use this + // fd with p_mnt which either points to @m_local_mount ( + // for local incremental comparison) or @m_remote_mount ( + // for remote incremental comparison). + int p_fd; + MountRef p_mnt; + + // open file descriptor on dir_root on remote filesystem. + // Always use this fd with @m_remote_mount. + int r_fd_dir_root; + }; + + bool is_stopping() { + return m_stopping; + } + + struct Replayer; + class SnapshotReplayerThread : public Thread { + public: + SnapshotReplayerThread(PeerReplayer *peer_replayer) + : m_peer_replayer(peer_replayer) { + } + + void *entry() override { + m_peer_replayer->run(this); + return 0; + } + + private: + PeerReplayer *m_peer_replayer; + }; + + struct DirRegistry { + int fd; + bool canceled = false; + SnapshotReplayerThread *replayer; + }; + + struct SyncEntry { + std::string epath; + ceph_dir_result *dirp; // valid for directories + struct ceph_statx stx; + // set by incremental sync _after_ ensuring missing entries + // in the currently synced snapshot have been propagated to + // the remote filesystem. + bool remote_synced = false; + + SyncEntry(std::string_view path, + const struct ceph_statx &stx) + : epath(path), + stx(stx) { + } + SyncEntry(std::string_view path, + ceph_dir_result *dirp, + const struct ceph_statx &stx) + : epath(path), + dirp(dirp), + stx(stx) { + } + + bool is_directory() const { + return S_ISDIR(stx.stx_mode); + } + + bool needs_remote_sync() const { + return remote_synced; + } + void set_remote_synced() { + remote_synced = true; + } + }; + + using clock = ceph::coarse_mono_clock; + using time = ceph::coarse_mono_time; + + // stats sent to service daemon + struct ServiceDaemonStats { + uint64_t failed_dir_count = 0; + uint64_t recovered_dir_count = 0; + }; + + struct SnapSyncStat { + uint64_t nr_failures = 0; // number of consecutive failures + boost::optional<time> last_failed; // lat failed timestamp + bool failed = false; // hit upper cap for consecutive failures + boost::optional<std::pair<uint64_t, std::string>> last_synced_snap; + boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap; + uint64_t synced_snap_count = 0; + uint64_t deleted_snap_count = 0; + uint64_t renamed_snap_count = 0; + time last_synced = clock::zero(); + boost::optional<double> last_sync_duration; + }; + + void _inc_failed_count(const std::string &dir_root) { + auto max_failures = g_ceph_context->_conf.get_val<uint64_t>( + "cephfs_mirror_max_consecutive_failures_per_directory"); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.last_failed = clock::now(); + if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) { + sync_stat.failed = true; + ++m_service_daemon_stats.failed_dir_count; + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, + SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, + m_service_daemon_stats.failed_dir_count); + } + } + void _reset_failed_count(const std::string &dir_root) { + auto &sync_stat = m_snap_sync_stats.at(dir_root); + if (sync_stat.failed) { + ++m_service_daemon_stats.recovered_dir_count; + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, + SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, + m_service_daemon_stats.recovered_dir_count); + } + sync_stat.nr_failures = 0; + sync_stat.failed = false; + sync_stat.last_failed = boost::none; + } + + void _set_last_synced_snap(const std::string &dir_root, uint64_t snap_id, + const std::string &snap_name) { + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name); + sync_stat.current_syncing_snap = boost::none; + } + void set_last_synced_snap(const std::string &dir_root, uint64_t snap_id, + const std::string &snap_name) { + std::scoped_lock locker(m_lock); + _set_last_synced_snap(dir_root, snap_id, snap_name); + } + void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id, + const std::string &snap_name) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name); + } + void clear_current_syncing_snap(const std::string &dir_root) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.current_syncing_snap = boost::none; + } + void inc_deleted_snap(const std::string &dir_root) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + ++sync_stat.deleted_snap_count; + } + void inc_renamed_snap(const std::string &dir_root) { + std::scoped_lock locker(m_lock); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + ++sync_stat.renamed_snap_count; + } + void set_last_synced_stat(const std::string &dir_root, uint64_t snap_id, + const std::string &snap_name, double duration) { + std::scoped_lock locker(m_lock); + _set_last_synced_snap(dir_root, snap_id, snap_name); + auto &sync_stat = m_snap_sync_stats.at(dir_root); + sync_stat.last_synced = clock::now(); + sync_stat.last_sync_duration = duration; + ++sync_stat.synced_snap_count; + } + + bool should_backoff(const std::string &dir_root, int *retval) { + if (m_fs_mirror->is_blocklisted()) { + *retval = -EBLOCKLISTED; + return true; + } + + std::scoped_lock locker(m_lock); + if (is_stopping()) { + // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use + // EINPROGRESS to identify shutdown. + *retval = -EINPROGRESS; + return true; + } + auto &dr = m_registered.at(dir_root); + if (dr.canceled) { + *retval = -ECANCELED; + return true; + } + + *retval = 0; + return false; + } + + typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers; + + CephContext *m_cct; + FSMirror *m_fs_mirror; + RadosRef m_local_cluster; + Filesystem m_filesystem; + Peer m_peer; + // probably need to be encapsulated when supporting cancelations + std::map<std::string, DirRegistry> m_registered; + std::vector<std::string> m_directories; + std::map<std::string, SnapSyncStat> m_snap_sync_stats; + MountRef m_local_mount; + ServiceDaemon *m_service_daemon; + PeerReplayerAdminSocketHook *m_asok_hook = nullptr; + + ceph::mutex m_lock; + ceph::condition_variable m_cond; + RadosRef m_remote_cluster; + MountRef m_remote_mount; + bool m_stopping = false; + SnapshotReplayers m_replayers; + + ServiceDaemonStats m_service_daemon_stats; + + void run(SnapshotReplayerThread *replayer); + + boost::optional<std::string> pick_directory(); + int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer); + void unregister_directory(const std::string &dir_root); + int try_lock_directory(const std::string &dir_root, SnapshotReplayerThread *replayer, + DirRegistry *registry); + void unlock_directory(const std::string &dir_root, const DirRegistry ®istry); + void sync_snaps(const std::string &dir_root, std::unique_lock<ceph::mutex> &locker); + + + int build_snap_map(const std::string &dir_root, std::map<uint64_t, std::string> *snap_map, + bool is_remote=false); + + int propagate_snap_deletes(const std::string &dir_root, const std::set<std::string> &snaps); + int propagate_snap_renames(const std::string &dir_root, + const std::set<std::pair<std::string,std::string>> &snaps); + int propagate_deleted_entries(const std::string &dir_root, const std::string &epath, + const FHandles &fh); + int cleanup_remote_dir(const std::string &dir_root, const std::string &epath, + const FHandles &fh); + + int should_sync_entry(const std::string &epath, const struct ceph_statx &cstx, + const FHandles &fh, bool *need_data_sync, bool *need_attr_sync); + + int open_dir(MountRef mnt, const std::string &dir_path, boost::optional<uint64_t> snap_id); + int pre_sync_check_and_open_handles(const std::string &dir_root, const Snapshot ¤t, + boost::optional<Snapshot> prev, FHandles *fh); + void post_sync_close_handles(const FHandles &fh); + + int do_synchronize(const std::string &dir_root, const Snapshot ¤t, + boost::optional<Snapshot> prev); + + int synchronize(const std::string &dir_root, const Snapshot ¤t, + boost::optional<Snapshot> prev); + int do_sync_snaps(const std::string &dir_root); + + int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh); + int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, + const FHandles &fh, bool need_data_sync, bool need_attr_sync); + int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, + const FHandles &fh); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_PEER_REPLAYER_H diff --git a/src/tools/cephfs_mirror/ServiceDaemon.cc b/src/tools/cephfs_mirror/ServiceDaemon.cc new file mode 100644 index 000000000..f66dd46bf --- /dev/null +++ b/src/tools/cephfs_mirror/ServiceDaemon.cc @@ -0,0 +1,225 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/debug.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "include/stringify.h" +#include "ServiceDaemon.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::ServiceDaemon: " << this << " " \ + << __func__ + +namespace cephfs { +namespace mirror { + +namespace { + +struct AttributeDumpVisitor : public boost::static_visitor<void> { + ceph::Formatter *f; + std::string name; + + AttributeDumpVisitor(ceph::Formatter *f, std::string_view name) + : f(f), name(name) { + } + + void operator()(bool val) const { + f->dump_bool(name.c_str(), val); + } + void operator()(uint64_t val) const { + f->dump_unsigned(name.c_str(), val); + } + void operator()(const std::string &val) const { + f->dump_string(name.c_str(), val); + } +}; + +} // anonymous namespace + +ServiceDaemon::ServiceDaemon(CephContext *cct, RadosRef rados) + : m_cct(cct), + m_rados(rados), + m_timer(new SafeTimer(cct, m_timer_lock, true)) { + m_timer->init(); +} + +ServiceDaemon::~ServiceDaemon() { + dout(10) << dendl; + { + std::scoped_lock timer_lock(m_timer_lock); + if (m_timer_ctx != nullptr) { + dout(5) << ": canceling timer task=" << m_timer_ctx << dendl; + m_timer->cancel_event(m_timer_ctx); + } + m_timer->shutdown(); + } + + delete m_timer; +} + +int ServiceDaemon::init() { + dout(20) << dendl; + + std::string id = m_cct->_conf->name.get_id(); + if (id.find(CEPHFS_MIRROR_AUTH_ID_PREFIX) == 0) { + id = id.substr(CEPHFS_MIRROR_AUTH_ID_PREFIX.size()); + } + std::string instance_id = stringify(m_rados->get_instance_id()); + + std::map<std::string, std::string> service_metadata = {{"id", id}, + {"instance_id", instance_id}}; + int r = m_rados->service_daemon_register("cephfs-mirror", instance_id, + service_metadata); + if (r < 0) { + return r; + } + return 0; +} + +void ServiceDaemon::add_filesystem(fs_cluster_id_t fscid, std::string_view fs_name) { + dout(10) << ": fscid=" << fscid << ", fs_name=" << fs_name << dendl; + + { + std::scoped_lock locker(m_lock); + m_filesystems.emplace(fscid, Filesystem(fs_name)); + } + schedule_update_status(); +} + +void ServiceDaemon::remove_filesystem(fs_cluster_id_t fscid) { + dout(10) << ": fscid=" << fscid << dendl; + + { + std::scoped_lock locker(m_lock); + m_filesystems.erase(fscid); + } + schedule_update_status(); +} + +void ServiceDaemon::add_peer(fs_cluster_id_t fscid, const Peer &peer) { + dout(10) << ": peer=" << peer << dendl; + + { + std::scoped_lock locker(m_lock); + auto fs_it = m_filesystems.find(fscid); + if (fs_it == m_filesystems.end()) { + return; + } + fs_it->second.peer_attributes.emplace(peer, Attributes{}); + } + schedule_update_status(); +} + +void ServiceDaemon::remove_peer(fs_cluster_id_t fscid, const Peer &peer) { + dout(10) << ": peer=" << peer << dendl; + + { + std::scoped_lock locker(m_lock); + auto fs_it = m_filesystems.find(fscid); + if (fs_it == m_filesystems.end()) { + return; + } + fs_it->second.peer_attributes.erase(peer); + } + schedule_update_status(); +} + +void ServiceDaemon::add_or_update_fs_attribute(fs_cluster_id_t fscid, std::string_view key, + AttributeValue value) { + dout(10) << ": fscid=" << fscid << dendl; + + { + std::scoped_lock locker(m_lock); + auto fs_it = m_filesystems.find(fscid); + if (fs_it == m_filesystems.end()) { + return; + } + + fs_it->second.fs_attributes[std::string(key)] = value; + } + schedule_update_status(); +} + +void ServiceDaemon::add_or_update_peer_attribute(fs_cluster_id_t fscid, const Peer &peer, + std::string_view key, AttributeValue value) { + dout(10) << ": fscid=" << fscid << dendl; + + { + std::scoped_lock locker(m_lock); + auto fs_it = m_filesystems.find(fscid); + if (fs_it == m_filesystems.end()) { + return; + } + + auto peer_it = fs_it->second.peer_attributes.find(peer); + if (peer_it == fs_it->second.peer_attributes.end()) { + return; + } + + peer_it->second[std::string(key)] = value; + } + schedule_update_status(); +} + +void ServiceDaemon::schedule_update_status() { + dout(10) << dendl; + + std::scoped_lock timer_lock(m_timer_lock); + if (m_timer_ctx != nullptr) { + return; + } + + m_timer_ctx = new LambdaContext([this] { + m_timer_ctx = nullptr; + update_status(); + }); + m_timer->add_event_after(1, m_timer_ctx); +} + +void ServiceDaemon::update_status() { + dout(20) << ": " << m_filesystems.size() << " filesystem(s)" << dendl; + + ceph::JSONFormatter f; + { + std::scoped_lock locker(m_lock); + f.open_object_section("filesystems"); + for (auto &[fscid, filesystem] : m_filesystems) { + f.open_object_section(stringify(fscid).c_str()); + f.dump_string("name", filesystem.fs_name); + for (auto &[attr_name, attr_value] : filesystem.fs_attributes) { + AttributeDumpVisitor visitor(&f, attr_name); + boost::apply_visitor(visitor, attr_value); + } + f.open_object_section("peers"); + for (auto &[peer, attributes] : filesystem.peer_attributes) { + f.open_object_section(peer.uuid); + f.dump_object("remote", peer.remote); + f.open_object_section("stats"); + for (auto &[attr_name, attr_value] : attributes) { + AttributeDumpVisitor visitor(&f, attr_name); + boost::apply_visitor(visitor, attr_value); + } + f.close_section(); // stats + f.close_section(); // peer.uuid + } + f.close_section(); // peers + f.close_section(); // fscid + } + f.close_section(); // filesystems + } + + std::stringstream ss; + f.flush(ss); + + int r = m_rados->service_daemon_update_status({{"status_json", ss.str()}}); + if (r < 0) { + derr << ": failed to update service daemon status: " << cpp_strerror(r) + << dendl; + } +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/ServiceDaemon.h b/src/tools/cephfs_mirror/ServiceDaemon.h new file mode 100644 index 000000000..83eee286d --- /dev/null +++ b/src/tools/cephfs_mirror/ServiceDaemon.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_SERVICE_DAEMON_H +#define CEPHFS_MIRROR_SERVICE_DAEMON_H + +#include "common/ceph_mutex.h" +#include "common/Timer.h" +#include "mds/FSMap.h" +#include "Types.h" + +namespace cephfs { +namespace mirror { + +class ServiceDaemon { +public: + ServiceDaemon(CephContext *cct, RadosRef rados); + ~ServiceDaemon(); + + int init(); + + void add_filesystem(fs_cluster_id_t fscid, std::string_view fs_name); + void remove_filesystem(fs_cluster_id_t fscid); + + void add_peer(fs_cluster_id_t fscid, const Peer &peer); + void remove_peer(fs_cluster_id_t fscid, const Peer &peer); + + void add_or_update_fs_attribute(fs_cluster_id_t fscid, std::string_view key, + AttributeValue value); + void add_or_update_peer_attribute(fs_cluster_id_t fscid, const Peer &peer, + std::string_view key, AttributeValue value); + +private: + struct Filesystem { + std::string fs_name; + Attributes fs_attributes; + std::map<Peer, Attributes> peer_attributes; + + Filesystem(std::string_view fs_name) + : fs_name(fs_name) { + } + }; + + const std::string CEPHFS_MIRROR_AUTH_ID_PREFIX = "cephfs-mirror."; + + CephContext *m_cct; + RadosRef m_rados; + SafeTimer *m_timer; + ceph::mutex m_timer_lock = ceph::make_mutex("cephfs::mirror::ServiceDaemon"); + + ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::service_daemon"); + Context *m_timer_ctx = nullptr; + std::map<fs_cluster_id_t, Filesystem> m_filesystems; + + void schedule_update_status(); + void update_status(); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_SERVICE_DAEMON_H diff --git a/src/tools/cephfs_mirror/Types.cc b/src/tools/cephfs_mirror/Types.cc new file mode 100644 index 000000000..0049f9d79 --- /dev/null +++ b/src/tools/cephfs_mirror/Types.cc @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Types.h" + +namespace cephfs { +namespace mirror { + +std::ostream& operator<<(std::ostream& out, const Filesystem &filesystem) { + out << "{fscid=" << filesystem.fscid << ", fs_name=" << filesystem.fs_name << "}"; + return out; +} + +std::ostream& operator<<(std::ostream& out, const FilesystemSpec &spec) { + out << "{filesystem=" << spec.filesystem << ", pool_id=" << spec.pool_id << "}"; + return out; +} + +} // namespace mirror +} // namespace cephfs + diff --git a/src/tools/cephfs_mirror/Types.h b/src/tools/cephfs_mirror/Types.h new file mode 100644 index 000000000..016a8dc86 --- /dev/null +++ b/src/tools/cephfs_mirror/Types.h @@ -0,0 +1,87 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_TYPES_H +#define CEPHFS_MIRROR_TYPES_H + +#include <set> +#include <iostream> +#include <string_view> + +#include "include/rados/librados.hpp" +#include "include/cephfs/libcephfs.h" +#include "mds/mdstypes.h" + +namespace cephfs { +namespace mirror { + +static const std::string CEPHFS_MIRROR_OBJECT("cephfs_mirror"); + +typedef boost::variant<bool, uint64_t, std::string> AttributeValue; +typedef std::map<std::string, AttributeValue> Attributes; + +// distinct filesystem identifier +struct Filesystem { + fs_cluster_id_t fscid; + std::string fs_name; + + bool operator==(const Filesystem &rhs) const { + return (fscid == rhs.fscid && + fs_name == rhs.fs_name); + } + + bool operator!=(const Filesystem &rhs) const { + return !(*this == rhs); + } + + bool operator<(const Filesystem &rhs) const { + if (fscid != rhs.fscid) { + return fscid < rhs.fscid; + } + + return fs_name < rhs.fs_name; + } +}; + +// specification of a filesystem -- pool id the metadata pool id. +struct FilesystemSpec { + FilesystemSpec() = default; + FilesystemSpec(const Filesystem &filesystem, uint64_t pool_id) + : filesystem(filesystem), + pool_id(pool_id) { + } + FilesystemSpec(fs_cluster_id_t fscid, std::string_view fs_name, uint64_t pool_id) + : filesystem(Filesystem{fscid, std::string(fs_name)}), + pool_id(pool_id) { + } + + Filesystem filesystem; + uint64_t pool_id; + + bool operator==(const FilesystemSpec &rhs) const { + return (filesystem == rhs.filesystem && + pool_id == rhs.pool_id); + } + + bool operator<(const FilesystemSpec &rhs) const { + if (filesystem != rhs.filesystem) { + return filesystem < rhs.filesystem; + } + + return pool_id < rhs.pool_id; + } +}; + +std::ostream& operator<<(std::ostream& out, const Filesystem &filesystem); +std::ostream& operator<<(std::ostream& out, const FilesystemSpec &spec); + +typedef std::shared_ptr<librados::Rados> RadosRef; +typedef std::shared_ptr<librados::IoCtx> IoCtxRef; + +// not a shared_ptr since the type is incomplete +typedef ceph_mount_info *MountRef; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_TYPES_H diff --git a/src/tools/cephfs_mirror/Utils.cc b/src/tools/cephfs_mirror/Utils.cc new file mode 100644 index 000000000..1a8b8e0ac --- /dev/null +++ b/src/tools/cephfs_mirror/Utils.cc @@ -0,0 +1,166 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_argparse.h" +#include "common/ceph_context.h" +#include "common/common_init.h" +#include "common/debug.h" +#include "common/errno.h" + +#include "Utils.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::Utils " << __func__ + +namespace cephfs { +namespace mirror { + +int connect(std::string_view client_name, std::string_view cluster_name, + RadosRef *cluster, std::string_view mon_host, std::string_view cephx_key, + std::vector<const char *> args) { + dout(20) << ": connecting to cluster=" << cluster_name << ", client=" << client_name + << ", mon_host=" << mon_host << dendl; + + CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); + if (client_name.empty() || !iparams.name.from_str(client_name)) { + derr << ": error initializing cluster handle for " << cluster_name << dendl; + return -EINVAL; + } + + CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); + if (mon_host.empty()) { + cct->_conf->cluster = cluster_name; + } + + int r = cct->_conf.parse_config_files(nullptr, nullptr, 0); + if (r < 0 && r != -ENOENT) { + derr << ": could not read ceph conf: " << ": " << cpp_strerror(r) << dendl; + return r; + } + + cct->_conf.parse_env(cct->get_module_type()); + + if (!args.empty()) { + r = cct->_conf.parse_argv(args); + if (r < 0) { + derr << ": could not parse command line args: " << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + } + cct->_conf.parse_env(cct->get_module_type()); + + if (!mon_host.empty()) { + r = cct->_conf.set_val("mon_host", std::string(mon_host)); + if (r < 0) { + derr << "failed to set mon_host config: " << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + } + if (!cephx_key.empty()) { + r = cct->_conf.set_val("key", std::string(cephx_key)); + if (r < 0) { + derr << "failed to set key config: " << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + } + + dout(10) << ": using mon addr=" << cct->_conf.get_val<std::string>("mon_host") << dendl; + + cluster->reset(new librados::Rados()); + + r = (*cluster)->init_with_context(cct); + ceph_assert(r == 0); + cct->put(); + + r = (*cluster)->connect(); + if (r < 0) { + derr << ": error connecting to " << cluster_name << ": " << cpp_strerror(r) + << dendl; + return r; + } + + dout(10) << ": connected to cluster=" << cluster_name << " using client=" + << client_name << dendl; + + return 0; +} + +int mount(RadosRef cluster, const Filesystem &filesystem, bool cross_check_fscid, + MountRef *mount) { + dout(20) << ": filesystem=" << filesystem << dendl; + + ceph_mount_info *cmi; + int r = ceph_create_with_context(&cmi, reinterpret_cast<CephContext*>(cluster->cct())); + if (r < 0) { + derr << ": mount error: " << cpp_strerror(r) << dendl; + return r; + } + + r = ceph_conf_set(cmi, "client_mount_uid", "0"); + if (r < 0) { + derr << ": mount error: " << cpp_strerror(r) << dendl; + return r; + } + + r = ceph_conf_set(cmi, "client_mount_gid", "0"); + if (r < 0) { + derr << ": mount error: " << cpp_strerror(r) << dendl; + return r; + } + + // mount timeout applies for local and remote mounts. + auto mount_timeout = g_ceph_context->_conf.get_val<std::chrono::seconds> + ("cephfs_mirror_mount_timeout").count(); + r = ceph_set_mount_timeout(cmi, mount_timeout); + if (r < 0) { + derr << ": mount error: " << cpp_strerror(r) << dendl; + return r; + } + + r = ceph_init(cmi); + if (r < 0) { + derr << ": mount error: " << cpp_strerror(r) << dendl; + return r; + } + + r = ceph_select_filesystem(cmi, filesystem.fs_name.c_str()); + if (r < 0) { + derr << ": mount error: " << cpp_strerror(r) << dendl; + return r; + } + + r = ceph_mount(cmi, NULL); + if (r < 0) { + derr << ": mount error: " << cpp_strerror(r) << dendl; + return r; + } + + auto fs_id = ceph_get_fs_cid(cmi); + if (cross_check_fscid && fs_id != filesystem.fscid) { + // this can happen in the most remotest possibility when a + // filesystem is deleted and recreated with the same name. + // since all this is driven asynchronously, we were able to + // mount the recreated filesystem. so bubble up the error. + // cleanup will eventually happen since a mirror disable event + // would have been queued. + derr << ": filesystem-id mismatch " << fs_id << " vs " << filesystem.fscid + << dendl; + // ignore errors, we are shutting down anyway. + ceph_unmount(cmi); + return -EINVAL; + } + + dout(10) << ": mounted filesystem=" << filesystem << dendl; + + *mount = cmi; + return 0; +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/Utils.h b/src/tools/cephfs_mirror/Utils.h new file mode 100644 index 000000000..76b0c0726 --- /dev/null +++ b/src/tools/cephfs_mirror/Utils.h @@ -0,0 +1,22 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_UTILS_H +#define CEPHFS_MIRROR_UTILS_H + +#include "Types.h" + +namespace cephfs { +namespace mirror { + +int connect(std::string_view client_name, std::string_view cluster_name, + RadosRef *cluster, std::string_view mon_host={}, std::string_view cephx_key={}, + std::vector<const char *> args={}); + +int mount(RadosRef cluster, const Filesystem &filesystem, bool cross_check_fscid, + MountRef *mount); + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_UTILS_H diff --git a/src/tools/cephfs_mirror/Watcher.cc b/src/tools/cephfs_mirror/Watcher.cc new file mode 100644 index 000000000..1445fce5f --- /dev/null +++ b/src/tools/cephfs_mirror/Watcher.cc @@ -0,0 +1,285 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "include/stringify.h" +#include "aio_utils.h" +#include "watcher/RewatchRequest.h" +#include "Watcher.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::Watcher " << __func__ + +using cephfs::mirror::watcher::RewatchRequest; + +namespace cephfs { +namespace mirror { + +namespace { + +struct C_UnwatchAndFlush : public Context { + librados::Rados rados; + Context *on_finish; + bool flushing = false; + int ret_val = 0; + + C_UnwatchAndFlush(librados::IoCtx &ioctx, Context *on_finish) + : rados(ioctx), on_finish(on_finish) { + } + + void complete(int r) override { + if (ret_val == 0 && r < 0) { + ret_val = r; + } + + if (!flushing) { + flushing = true; + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback<Context, &Context::complete>); + r = rados.aio_watch_flush(aio_comp); + + ceph_assert(r == 0); + aio_comp->release(); + return; + } + + // ensure our reference to the RadosClient is released prior + // to completing the callback to avoid racing an explicit + // librados shutdown + Context *ctx = on_finish; + r = ret_val; + delete this; + + ctx->complete(r); + } + + void finish(int r) override { + } +}; + +} // anonymous namespace + +Watcher::Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue) + : m_oid(oid), + m_ioctx(ioctx), + m_work_queue(work_queue), + m_lock(ceph::make_shared_mutex("cephfs::mirror::snap_watcher")), + m_state(STATE_IDLE), + m_watch_ctx(*this) { +} + +Watcher::~Watcher() { +} + +void Watcher::register_watch(Context *on_finish) { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + m_state = STATE_REGISTERING; + + on_finish = new C_RegisterWatch(this, on_finish); + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion(on_finish, &rados_callback<Context, &Context::complete>); + int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx); + ceph_assert(r == 0); + aio_comp->release(); +} + +void Watcher::handle_register_watch(int r, Context *on_finish) { + dout(20) << ": r=" << r << dendl; + + bool watch_error = false; + Context *unregister_watch_ctx = nullptr; + { + std::scoped_lock locker(m_lock); + ceph_assert(m_state == STATE_REGISTERING); + + m_state = STATE_IDLE; + if (r < 0) { + derr << ": failed to register watch: " << cpp_strerror(r) << dendl; + m_watch_handle = 0; + } + + if (m_unregister_watch_ctx != nullptr) { + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + } else if (r == 0 && m_watch_error) { + derr << ": re-registering after watch error" << dendl; + m_state = STATE_REGISTERING; + watch_error = true; + } else { + m_watch_blocklisted = (r == -EBLOCKLISTED); + } + } + + on_finish->complete(r); + if (unregister_watch_ctx != nullptr) { + unregister_watch_ctx->complete(0); + } else if (watch_error) { + rewatch(); + } +} + +void Watcher::unregister_watch(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + if (m_state != STATE_IDLE) { + dout(10) << ": delaying unregister -- watch register in progress" << dendl; + ceph_assert(m_unregister_watch_ctx == nullptr); + m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) { + unregister_watch(on_finish); + }); + return; + } else if (is_registered()) { + // watch is registered -- unwatch + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion(new C_UnwatchAndFlush(m_ioctx, on_finish), + &rados_callback<Context, &Context::complete>); + int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp); + ceph_assert(r == 0); + aio_comp->release(); + m_watch_handle = 0; + m_watch_blocklisted = false; + return; + } + } + + on_finish->complete(0); +} + +void Watcher::handle_error(uint64_t handle, int err) { + derr << ": handle=" << handle << ": " << cpp_strerror(err) << dendl; + + std::scoped_lock locker(m_lock); + m_watch_error = true; + + if (is_registered()) { + m_state = STATE_REWATCHING; + if (err == -EBLOCKLISTED) { + m_watch_blocklisted = true; + } + m_work_queue->queue(new LambdaContext([this] { + rewatch(); + }), 0); + } +} + +void Watcher::rewatch() { + dout(20) << dendl; + + Context *unregister_watch_ctx = nullptr; + { + std::unique_lock locker(m_lock); + ceph_assert(m_state == STATE_REWATCHING); + + if (m_unregister_watch_ctx != nullptr) { + m_state = STATE_IDLE; + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + } else { + m_watch_error = false; + Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch>(this); + auto req = RewatchRequest::create(m_ioctx, m_oid, m_lock, + &m_watch_ctx, &m_watch_handle, ctx); + req->send(); + return; + } + } + + unregister_watch_ctx->complete(0); +} + +void Watcher::handle_rewatch(int r) { + dout(20) << ": r=" << r << dendl; + + bool watch_error = false; + Context *unregister_watch_ctx = nullptr; + { + std::scoped_lock locker(m_lock); + ceph_assert(m_state == STATE_REWATCHING); + + m_watch_blocklisted = false; + if (m_unregister_watch_ctx != nullptr) { + dout(10) << ": skipping rewatch -- unregistering" << dendl; + m_state = STATE_IDLE; + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + } else if (r == -EBLOCKLISTED) { + m_watch_blocklisted = true; + derr << ": client blocklisted" << dendl; + } else if (r == -ENOENT) { + dout(5) << ": object " << m_oid << " does not exist" << dendl; + } else if (r < 0) { + derr << ": failed to rewatch: " << cpp_strerror(r) << dendl; + watch_error = true; + } else if (m_watch_error) { + derr << ": re-registering watch after error" << dendl; + watch_error = true; + } + } + + if (unregister_watch_ctx != nullptr) { + unregister_watch_ctx->complete(0); + return; + } else if (watch_error) { + rewatch(); + return; + } + + Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch_callback>(this); + m_work_queue->queue(ctx, r); +} + +void Watcher::handle_rewatch_callback(int r) { + dout(10) << ": r=" << r << dendl; + handle_rewatch_complete(r); + + bool watch_error = false; + Context *unregister_watch_ctx = nullptr; + { + std::scoped_lock locker(m_lock); + ceph_assert(m_state == STATE_REWATCHING); + + if (m_unregister_watch_ctx != nullptr) { + m_state = STATE_IDLE; + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + } else if (r == -EBLOCKLISTED || r == -ENOENT) { + m_state = STATE_IDLE; + } else if (r < 0 || m_watch_error) { + watch_error = true; + } else { + m_state = STATE_IDLE; + } + } + + if (unregister_watch_ctx != nullptr) { + unregister_watch_ctx->complete(0); + } else if (watch_error) { + rewatch(); + } +} + +void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl) { + m_ioctx.notify_ack(m_oid, notify_id, handle, bl); +} + +void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) { + dout(20) << ": notify_id=" << notify_id << ", handle=" << handle + << ", notifier_id=" << notifier_id << dendl; + watcher.handle_notify(notify_id, handle, notifier_id, bl); +} + +void Watcher::WatchCtx::handle_error(uint64_t handle, int err) { + dout(20) << dendl; + watcher.handle_error(handle, err); +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/Watcher.h b/src/tools/cephfs_mirror/Watcher.h new file mode 100644 index 000000000..9e7c54eeb --- /dev/null +++ b/src/tools/cephfs_mirror/Watcher.h @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_WATCHER_H +#define CEPHFS_MIRROR_WATCHER_H + +#include <string_view> + +#include "common/ceph_mutex.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" + +class ContextWQ; + +namespace cephfs { +namespace mirror { + +// generic watcher class -- establish watch on a given rados object +// and invoke handle_notify() when notified. On notify error, try +// to re-establish the watch. Errors during rewatch are notified via +// handle_rewatch_complete(). + +class Watcher { +public: + Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue); + virtual ~Watcher(); + + void register_watch(Context *on_finish); + void unregister_watch(Context *on_finish); + +protected: + std::string m_oid; + + void acknowledge_notify(uint64_t notify_if, uint64_t handle, bufferlist &bl); + + bool is_registered() const { + return m_state == STATE_IDLE && m_watch_handle != 0; + } + bool is_unregistered() const { + return m_state == STATE_IDLE && m_watch_handle == 0; + } + + virtual void handle_rewatch_complete(int r) { } + +private: + enum State { + STATE_IDLE, + STATE_REGISTERING, + STATE_REWATCHING + }; + + struct WatchCtx : public librados::WatchCtx2 { + Watcher &watcher; + + WatchCtx(Watcher &parent) : watcher(parent) {} + + void handle_notify(uint64_t notify_id, + uint64_t handle, + uint64_t notifier_id, + bufferlist& bl) override; + void handle_error(uint64_t handle, int err) override; + }; + + struct C_RegisterWatch : public Context { + Watcher *watcher; + Context *on_finish; + + C_RegisterWatch(Watcher *watcher, Context *on_finish) + : watcher(watcher), + on_finish(on_finish) { + } + + void finish(int r) override { + watcher->handle_register_watch(r, on_finish); + } + }; + + librados::IoCtx &m_ioctx; + ContextWQ *m_work_queue; + + mutable ceph::shared_mutex m_lock; + State m_state; + bool m_watch_error = false; + bool m_watch_blocklisted = false; + uint64_t m_watch_handle; + WatchCtx m_watch_ctx; + Context *m_unregister_watch_ctx = nullptr; + + virtual void handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) = 0; + void handle_error(uint64_t handle, int err); + + void rewatch(); + void handle_rewatch(int r); + void handle_rewatch_callback(int r); + void handle_register_watch(int r, Context *on_finish); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_WATCHER_H diff --git a/src/tools/cephfs_mirror/aio_utils.h b/src/tools/cephfs_mirror/aio_utils.h new file mode 100644 index 000000000..43f356381 --- /dev/null +++ b/src/tools/cephfs_mirror/aio_utils.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_AIO_UTILS_H +#define CEPHFS_MIRROR_AIO_UTILS_H + +#include "include/rados/librados.hpp" + +namespace cephfs { +namespace mirror { + +template <typename T, void(T::*MF)(int)> +void rados_callback(rados_completion_t c, void *arg) { + T *obj = reinterpret_cast<T*>(arg); + int r = rados_aio_get_return_value(c); + (obj->*MF)(r); +} + +template <typename T, void (T::*MF)(int)> +class C_CallbackAdapter : public Context { + T *obj; +public: + C_CallbackAdapter(T *obj) + : obj(obj) { + } + +protected: + void finish(int r) override { + (obj->*MF)(r); + } +}; + +template <typename WQ> +struct C_AsyncCallback : public Context { + WQ *op_work_queue; + Context *on_finish; + + C_AsyncCallback(WQ *op_work_queue, Context *on_finish) + : op_work_queue(op_work_queue), on_finish(on_finish) { + } + ~C_AsyncCallback() override { + delete on_finish; + } + void finish(int r) override { + op_work_queue->queue(on_finish, r); + on_finish = nullptr; + } +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_AIO_UTILS_H diff --git a/src/tools/cephfs_mirror/main.cc b/src/tools/cephfs_mirror/main.cc new file mode 100644 index 000000000..efaa89c35 --- /dev/null +++ b/src/tools/cephfs_mirror/main.cc @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_argparse.h" +#include "common/config.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/async/context_pool.h" +#include "common/Preforker.h" +#include "global/global_init.h" +#include "global/signal_handler.h" +#include "mon/MonClient.h" +#include "msg/Messenger.h" +#include "Mirror.h" + +#include <vector> + +void usage() { + std::cout << "usage: cephfs-mirror [options...]" << std::endl; + std::cout << "options:\n"; + std::cout << " --mon-host monaddress[:port] connect to specified monitor\n"; + std::cout << " --keyring=<path> path to keyring for local cluster\n"; + std::cout << " --log-file=<logfile> file to log debug output\n"; + std::cout << " --debug-cephfs-mirror=<log-level>/<memory-level> set cephfs-mirror debug level\n"; + generic_server_usage(); +} + +cephfs::mirror::Mirror *mirror = nullptr; + +static void handle_signal(int signum) { + if (mirror) { + mirror->handle_signal(signum); + } +} + +int main(int argc, const char **argv) { + std::vector<const char*> args; + argv_to_vec(argc, argv, args); + if (args.empty()) { + cerr << argv[0] << ": -h or --help for usage" << std::endl; + ::exit(1); + } + + if (ceph_argparse_need_usage(args)) { + usage(); + ::exit(0); + } + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); + + Preforker forker; + if (global_init_prefork(g_ceph_context) >= 0) { + std::string err; + int r = forker.prefork(err); + if (r < 0) { + cerr << err << std::endl; + return r; + } + if (forker.is_parent()) { + g_ceph_context->_log->start(); + if (forker.parent_wait(err) != 0) { + return -ENXIO; + } + return 0; + } + global_init_postfork_start(g_ceph_context); + } + + common_init_finish(g_ceph_context); + + bool daemonize = g_conf().get_val<bool>("daemonize"); + if (daemonize) { + global_init_postfork_finish(g_ceph_context); + forker.daemonize(); + } + + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, handle_signal); + register_async_signal_handler_oneshot(SIGINT, handle_signal); + register_async_signal_handler_oneshot(SIGTERM, handle_signal); + + std::vector<const char*> cmd_args; + argv_to_vec(argc, argv, cmd_args); + + Messenger *msgr = Messenger::create_client_messenger(g_ceph_context, "client"); + msgr->set_default_policy(Messenger::Policy::lossy_client(0)); + + std::string reason; + ceph::async::io_context_pool ctxpool(1); + MonClient monc(MonClient(g_ceph_context, ctxpool)); + int r = monc.build_initial_monmap(); + if (r < 0) { + cerr << "failed to generate initial monmap" << std::endl; + goto cleanup_messenger; + } + + msgr->start(); + + mirror = new cephfs::mirror::Mirror(g_ceph_context, cmd_args, &monc, msgr); + r = mirror->init(reason); + if (r < 0) { + std::cerr << "failed to initialize cephfs-mirror: " << reason << std::endl; + goto cleanup; + } + + mirror->run(); + delete mirror; + +cleanup: + monc.shutdown(); +cleanup_messenger: + msgr->shutdown(); + msgr->wait(); + delete msgr; + + unregister_async_signal_handler(SIGHUP, handle_signal); + unregister_async_signal_handler(SIGINT, handle_signal); + unregister_async_signal_handler(SIGTERM, handle_signal); + shutdown_async_signal_handler(); + + return forker.signal_exit(r); +} diff --git a/src/tools/cephfs_mirror/watcher/RewatchRequest.cc b/src/tools/cephfs_mirror/watcher/RewatchRequest.cc new file mode 100644 index 000000000..3070e6f8b --- /dev/null +++ b/src/tools/cephfs_mirror/watcher/RewatchRequest.cc @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_mutex.h" +#include "common/debug.h" +#include "common/errno.h" +#include "include/Context.h" +#include "tools/cephfs_mirror/aio_utils.h" +#include "RewatchRequest.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::watcher:RewatchRequest " << __func__ + +namespace cephfs { +namespace mirror { +namespace watcher { + +RewatchRequest::RewatchRequest(librados::IoCtx &ioctx, const std::string &oid, + ceph::shared_mutex &watch_lock, + librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish) + : m_ioctx(ioctx), m_oid(oid), m_lock(watch_lock), + m_watch_ctx(watch_ctx), m_watch_handle(watch_handle), + m_on_finish(on_finish) { +} + +void RewatchRequest::send() { + unwatch(); +} + +void RewatchRequest::unwatch() { + ceph_assert(ceph_mutex_is_wlocked(m_lock)); + if (*m_watch_handle == 0) { + rewatch(); + return; + } + + dout(10) << dendl; + + uint64_t watch_handle = 0; + std::swap(*m_watch_handle, watch_handle); + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback<RewatchRequest, &RewatchRequest::handle_unwatch>); + int r = m_ioctx.aio_unwatch(watch_handle, aio_comp); + ceph_assert(r == 0); + aio_comp->release(); +} + +void RewatchRequest::handle_unwatch(int r) { + dout(20) << ": r=" << r << dendl; + + if (r == -EBLOCKLISTED) { + derr << ": client blocklisted" << dendl; + finish(r); + return; + } else if (r < 0) { + derr << ": failed to unwatch: " << cpp_strerror(r) << dendl; + } + + rewatch(); +} + +void RewatchRequest::rewatch() { + dout(20) << dendl; + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback<RewatchRequest, &RewatchRequest::handle_rewatch>); + int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx); + ceph_assert(r == 0); + aio_comp->release(); +} + +void RewatchRequest::handle_rewatch(int r) { + dout(20) << ": r=" << r << dendl; + + if (r < 0) { + derr << ": failed to watch object: " << cpp_strerror(r) << dendl; + m_rewatch_handle = 0; + } + + { + std::unique_lock locker(m_lock); + *m_watch_handle = m_rewatch_handle; + } + + finish(r); +} + +void RewatchRequest::finish(int r) { + dout(20) << ": r=" << r << dendl; + m_on_finish->complete(r); + delete this; +} + +} // namespace watcher +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/watcher/RewatchRequest.h b/src/tools/cephfs_mirror/watcher/RewatchRequest.h new file mode 100644 index 000000000..453fcb219 --- /dev/null +++ b/src/tools/cephfs_mirror/watcher/RewatchRequest.h @@ -0,0 +1,60 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H +#define CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H + +#include "common/ceph_mutex.h" +#include "include/int_types.h" +#include "include/rados/librados.hpp" + +struct Context; + +namespace cephfs { +namespace mirror { +namespace watcher { + +// Rewatch an existing watch -- the watch can be in an operatioal +// or error state. + +class RewatchRequest { +public: + + static RewatchRequest *create(librados::IoCtx &ioctx, const std::string &oid, + ceph::shared_mutex &watch_lock, + librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish) { + return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle, + on_finish); + } + + RewatchRequest(librados::IoCtx &ioctx, const std::string &oid, + ceph::shared_mutex &watch_lock, librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish); + + void send(); + +private: + librados::IoCtx& m_ioctx; + std::string m_oid; + ceph::shared_mutex &m_lock; + librados::WatchCtx2 *m_watch_ctx; + uint64_t *m_watch_handle; + Context *m_on_finish; + + uint64_t m_rewatch_handle = 0; + + void unwatch(); + void handle_unwatch(int r); + + void rewatch(); + void handle_rewatch(int r); + + void finish(int r); +}; + +} // namespace watcher +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H |