summaryrefslogtreecommitdiffstats
path: root/src/tools/cephfs_mirror
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/tools/cephfs_mirror
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/cephfs_mirror')
-rw-r--r--src/tools/cephfs_mirror/CMakeLists.txt30
-rw-r--r--src/tools/cephfs_mirror/ClusterWatcher.cc182
-rw-r--r--src/tools/cephfs_mirror/ClusterWatcher.h77
-rw-r--r--src/tools/cephfs_mirror/FSMirror.cc444
-rw-r--r--src/tools/cephfs_mirror/FSMirror.h182
-rw-r--r--src/tools/cephfs_mirror/InstanceWatcher.cc256
-rw-r--r--src/tools/cephfs_mirror/InstanceWatcher.h98
-rw-r--r--src/tools/cephfs_mirror/Mirror.cc589
-rw-r--r--src/tools/cephfs_mirror/Mirror.h153
-rw-r--r--src/tools/cephfs_mirror/MirrorWatcher.cc151
-rw-r--r--src/tools/cephfs_mirror/MirrorWatcher.h92
-rw-r--r--src/tools/cephfs_mirror/PeerReplayer.cc1581
-rw-r--r--src/tools/cephfs_mirror/PeerReplayer.h320
-rw-r--r--src/tools/cephfs_mirror/ServiceDaemon.cc225
-rw-r--r--src/tools/cephfs_mirror/ServiceDaemon.h62
-rw-r--r--src/tools/cephfs_mirror/Types.cc21
-rw-r--r--src/tools/cephfs_mirror/Types.h87
-rw-r--r--src/tools/cephfs_mirror/Utils.cc166
-rw-r--r--src/tools/cephfs_mirror/Utils.h22
-rw-r--r--src/tools/cephfs_mirror/Watcher.cc285
-rw-r--r--src/tools/cephfs_mirror/Watcher.h102
-rw-r--r--src/tools/cephfs_mirror/aio_utils.h53
-rw-r--r--src/tools/cephfs_mirror/main.cc124
-rw-r--r--src/tools/cephfs_mirror/watcher/RewatchRequest.cc102
-rw-r--r--src/tools/cephfs_mirror/watcher/RewatchRequest.h60
25 files changed, 5464 insertions, 0 deletions
diff --git a/src/tools/cephfs_mirror/CMakeLists.txt b/src/tools/cephfs_mirror/CMakeLists.txt
new file mode 100644
index 000000000..4b6dea7a1
--- /dev/null
+++ b/src/tools/cephfs_mirror/CMakeLists.txt
@@ -0,0 +1,30 @@
+set(cephfs_mirror_internal
+ ClusterWatcher.cc
+ Mirror.cc
+ FSMirror.cc
+ InstanceWatcher.cc
+ MirrorWatcher.cc
+ PeerReplayer.cc
+ ServiceDaemon.cc
+ Types.cc
+ Utils.cc
+ Watcher.cc
+ watcher/RewatchRequest.cc)
+
+add_executable(cephfs-mirror
+ main.cc)
+
+add_library(cephfs_mirror_internal STATIC
+ ${cephfs_mirror_internal})
+
+target_link_libraries(cephfs-mirror
+ cephfs_mirror_internal
+ global
+ ceph-common
+ cls_cephfs_client
+ librados
+ mds
+ cephfs
+ ${ALLOC_LIBS})
+
+install(TARGETS cephfs-mirror DESTINATION bin)
diff --git a/src/tools/cephfs_mirror/ClusterWatcher.cc b/src/tools/cephfs_mirror/ClusterWatcher.cc
new file mode 100644
index 000000000..b5f6f81d7
--- /dev/null
+++ b/src/tools/cephfs_mirror/ClusterWatcher.cc
@@ -0,0 +1,182 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <mutex>
+#include <vector>
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "mon/MonClient.h"
+
+#include "ClusterWatcher.h"
+#include "ServiceDaemon.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::ClusterWatcher " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon,
+ Listener &listener)
+ : Dispatcher(cct),
+ m_monc(monc),
+ m_service_daemon(service_daemon),
+ m_listener(listener) {
+}
+
+ClusterWatcher::~ClusterWatcher() {
+}
+
+bool ClusterWatcher::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
+ return m->get_type() == CEPH_MSG_FS_MAP;
+}
+
+void ClusterWatcher::ms_fast_dispatch2(const ref_t<Message> &m) {
+ bool handled = ms_dispatch2(m);
+ ceph_assert(handled);
+}
+
+bool ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) {
+ if (m->get_type() == CEPH_MSG_FS_MAP) {
+ if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ handle_fsmap(ref_cast<MFSMap>(m));
+ }
+ return true;
+ }
+
+ return false;
+}
+
+int ClusterWatcher::init() {
+ dout(20) << dendl;
+
+ bool sub = m_monc->sub_want("fsmap", 0, 0);
+ if (!sub) {
+ derr << ": failed subscribing to FSMap" << dendl;
+ return -1;
+ }
+
+ m_monc->renew_subs();
+ dout(10) << ": subscribed to FSMap" << dendl;
+ return 0;
+}
+
+void ClusterWatcher::shutdown() {
+ dout(20) << dendl;
+ std::scoped_lock locker(m_lock);
+ m_stopping = true;
+ m_monc->sub_unwant("fsmap");
+}
+
+void ClusterWatcher::handle_fsmap(const cref_t<MFSMap> &m) {
+ dout(20) << dendl;
+
+ auto fsmap = m->get_fsmap();
+ auto filesystems = fsmap.get_filesystems();
+
+ std::vector<Filesystem> mirroring_enabled;
+ std::vector<Filesystem> mirroring_disabled;
+ std::map<Filesystem, Peers> peers_added;
+ std::map<Filesystem, Peers> peers_removed;
+ std::map<Filesystem, uint64_t> fs_metadata_pools;
+ {
+ std::scoped_lock locker(m_lock);
+ if (m_stopping) {
+ return;
+ }
+
+ // deleted filesystems are considered mirroring disabled
+ for (auto it = m_filesystem_peers.begin(); it != m_filesystem_peers.end();) {
+ if (!fsmap.filesystem_exists(it->first.fscid)) {
+ mirroring_disabled.emplace_back(it->first);
+ it = m_filesystem_peers.erase(it);
+ continue;
+ }
+ ++it;
+ }
+
+ for (auto &filesystem : filesystems) {
+ auto fs = Filesystem{filesystem->fscid,
+ std::string(filesystem->mds_map.get_fs_name())};
+ auto pool_id = filesystem->mds_map.get_metadata_pool();
+ auto &mirror_info = filesystem->mirror_info;
+
+ if (!mirror_info.is_mirrored()) {
+ auto it = m_filesystem_peers.find(fs);
+ if (it != m_filesystem_peers.end()) {
+ mirroring_disabled.emplace_back(fs);
+ m_filesystem_peers.erase(it);
+ }
+ } else {
+ auto [fspeersit, enabled] = m_filesystem_peers.emplace(fs, Peers{});
+ auto &peers = fspeersit->second;
+
+ if (enabled) {
+ mirroring_enabled.emplace_back(fs);
+ fs_metadata_pools.emplace(fs, pool_id);
+ }
+
+ // peers added
+ Peers added;
+ std::set_difference(mirror_info.peers.begin(), mirror_info.peers.end(),
+ peers.begin(), peers.end(), std::inserter(added, added.end()));
+
+ // peers removed
+ Peers removed;
+ std::set_difference(peers.begin(), peers.end(),
+ mirror_info.peers.begin(), mirror_info.peers.end(),
+ std::inserter(removed, removed.end()));
+
+ // update set
+ if (!added.empty()) {
+ peers_added.emplace(fs, added);
+ peers.insert(added.begin(), added.end());
+ }
+ if (!removed.empty()) {
+ peers_removed.emplace(fs, removed);
+ for (auto &p : removed) {
+ peers.erase(p);
+ }
+ }
+ }
+ }
+ }
+
+ dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled="
+ << mirroring_disabled << dendl;
+ for (auto &fs : mirroring_enabled) {
+ m_service_daemon->add_filesystem(fs.fscid, fs.fs_name);
+ m_listener.handle_mirroring_enabled(FilesystemSpec(fs, fs_metadata_pools.at(fs)));
+ }
+ for (auto &fs : mirroring_disabled) {
+ m_service_daemon->remove_filesystem(fs.fscid);
+ m_listener.handle_mirroring_disabled(fs);
+ }
+
+ dout(5) << ": peers added=" << peers_added << ", peers removed=" << peers_removed << dendl;
+
+ for (auto &[fs, peers] : peers_added) {
+ for (auto &peer : peers) {
+ m_service_daemon->add_peer(fs.fscid, peer);
+ m_listener.handle_peers_added(fs, peer);
+ }
+ }
+ for (auto &[fs, peers] : peers_removed) {
+ for (auto &peer : peers) {
+ m_service_daemon->remove_peer(fs.fscid, peer);
+ m_listener.handle_peers_removed(fs, peer);
+ }
+ }
+
+ std::scoped_lock locker(m_lock);
+ if (!m_stopping) {
+ m_monc->sub_got("fsmap", fsmap.get_epoch());
+ } // else we have already done a sub_unwant()
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/ClusterWatcher.h b/src/tools/cephfs_mirror/ClusterWatcher.h
new file mode 100644
index 000000000..a418898f5
--- /dev/null
+++ b/src/tools/cephfs_mirror/ClusterWatcher.h
@@ -0,0 +1,77 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_CLUSTER_WATCHER_H
+#define CEPHFS_MIRROR_CLUSTER_WATCHER_H
+
+#include <map>
+
+#include "common/ceph_mutex.h"
+#include "common/async/context_pool.h"
+#include "messages/MFSMap.h"
+#include "msg/Dispatcher.h"
+#include "Types.h"
+
+class MonClient;
+
+namespace cephfs {
+namespace mirror {
+
+class ServiceDaemon;
+
+// watch peer changes for filesystems via FSMap updates
+
+class ClusterWatcher : public Dispatcher {
+public:
+ struct Listener {
+ virtual ~Listener() {
+ }
+
+ virtual void handle_mirroring_enabled(const FilesystemSpec &spec) = 0;
+ virtual void handle_mirroring_disabled(const Filesystem &filesystem) = 0;
+
+ virtual void handle_peers_added(const Filesystem &filesystem, const Peer &peer) = 0;
+ virtual void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) = 0;
+ };
+
+ ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon,
+ Listener &listener);
+ ~ClusterWatcher();
+
+ bool ms_can_fast_dispatch_any() const override {
+ return true;
+ }
+ bool ms_can_fast_dispatch2(const cref_t<Message> &m) const override;
+ void ms_fast_dispatch2(const ref_t<Message> &m) override;
+ bool ms_dispatch2(const ref_t<Message> &m) override;
+
+ void ms_handle_connect(Connection *c) override {
+ }
+ bool ms_handle_reset(Connection *c) override {
+ return false;
+ }
+ void ms_handle_remote_reset(Connection *c) override {
+ }
+ bool ms_handle_refused(Connection *c) override {
+ return false;
+ }
+
+ int init();
+ void shutdown();
+
+private:
+ ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::cluster_watcher");
+ MonClient *m_monc;
+ ServiceDaemon *m_service_daemon;
+ Listener &m_listener;
+
+ bool m_stopping = false;
+ std::map<Filesystem, Peers> m_filesystem_peers;
+
+ void handle_fsmap(const cref_t<MFSMap> &m);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_CLUSTER_WATCHER_H
diff --git a/src/tools/cephfs_mirror/FSMirror.cc b/src/tools/cephfs_mirror/FSMirror.cc
new file mode 100644
index 000000000..7ea798e6b
--- /dev/null
+++ b/src/tools/cephfs_mirror/FSMirror.cc
@@ -0,0 +1,444 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/admin_socket.h"
+#include "common/ceph_argparse.h"
+#include "common/ceph_context.h"
+#include "common/common_init.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "include/stringify.h"
+#include "msg/Messenger.h"
+#include "FSMirror.h"
+#include "PeerReplayer.h"
+#include "aio_utils.h"
+#include "ServiceDaemon.h"
+#include "Utils.h"
+
+#include "common/Cond.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__
+
+using namespace std;
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count");
+const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed");
+
+class MirrorAdminSocketCommand {
+public:
+ virtual ~MirrorAdminSocketCommand() {
+ }
+ virtual int call(Formatter *f) = 0;
+};
+
+class StatusCommand : public MirrorAdminSocketCommand {
+public:
+ explicit StatusCommand(FSMirror *fs_mirror)
+ : fs_mirror(fs_mirror) {
+ }
+
+ int call(Formatter *f) override {
+ fs_mirror->mirror_status(f);
+ return 0;
+ }
+
+private:
+ FSMirror *fs_mirror;
+};
+
+} // anonymous namespace
+
+class MirrorAdminSocketHook : public AdminSocketHook {
+public:
+ MirrorAdminSocketHook(CephContext *cct, const Filesystem &filesystem, FSMirror *fs_mirror)
+ : admin_socket(cct->get_admin_socket()) {
+ int r;
+ std::string cmd;
+
+ // mirror status format is name@fscid
+ cmd = "fs mirror status " + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid);
+ r = admin_socket->register_command(
+ cmd, this, "get filesystem mirror status");
+ if (r == 0) {
+ commands[cmd] = new StatusCommand(fs_mirror);
+ }
+ }
+
+ ~MirrorAdminSocketHook() override {
+ admin_socket->unregister_commands(this);
+ for (auto &[command, cmdptr] : commands) {
+ delete cmdptr;
+ }
+ }
+
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ const bufferlist&,
+ Formatter *f, std::ostream &errss, bufferlist &out) override {
+ auto p = commands.at(std::string(command));
+ return p->call(f);
+ }
+
+private:
+ typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
+
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
+FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
+ ServiceDaemon *service_daemon, std::vector<const char*> args,
+ ContextWQ *work_queue)
+ : m_cct(cct),
+ m_filesystem(filesystem),
+ m_pool_id(pool_id),
+ m_service_daemon(service_daemon),
+ m_args(args),
+ m_work_queue(work_queue),
+ m_snap_listener(this),
+ m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
+ m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+ (uint64_t)0);
+}
+
+FSMirror::~FSMirror() {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ delete m_instance_watcher;
+ delete m_mirror_watcher;
+ }
+ // outside the lock so that in-progress commands can acquire
+ // lock and finish executing.
+ delete m_asok_hook;
+}
+
+int FSMirror::init_replayer(PeerReplayer *peer_replayer) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ return peer_replayer->init();
+}
+
+void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) {
+ peer_replayer->shutdown();
+}
+
+void FSMirror::cleanup() {
+ dout(20) << dendl;
+ ceph_unmount(m_mount);
+ ceph_release(m_mount);
+ m_ioctx.close();
+ m_cluster.reset();
+}
+
+void FSMirror::reopen_logs() {
+ std::scoped_lock locker(m_lock);
+
+ if (m_cluster) {
+ reinterpret_cast<CephContext *>(m_cluster->cct())->reopen_logs();
+ }
+ for (auto &[peer, replayer] : m_peer_replayers) {
+ replayer->reopen_logs();
+ }
+}
+
+void FSMirror::init(Context *on_finish) {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ int r = connect(g_ceph_context->_conf->name.to_str(),
+ g_ceph_context->_conf->cluster, &m_cluster, "", "", m_args);
+ if (r < 0) {
+ m_init_failed = true;
+ on_finish->complete(r);
+ return;
+ }
+
+ r = m_cluster->ioctx_create2(m_pool_id, m_ioctx);
+ if (r < 0) {
+ m_init_failed = true;
+ m_cluster.reset();
+ derr << ": error accessing local pool (id=" << m_pool_id << "): "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ r = mount(m_cluster, m_filesystem, true, &m_mount);
+ if (r < 0) {
+ m_init_failed = true;
+ m_ioctx.close();
+ m_cluster.reset();
+ on_finish->complete(r);
+ return;
+ }
+
+ m_addrs = m_cluster->get_addrs();
+ dout(10) << ": rados addrs=" << m_addrs << dendl;
+
+ init_instance_watcher(on_finish);
+}
+
+void FSMirror::shutdown(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ m_stopping = true;
+ if (m_on_init_finish != nullptr) {
+ dout(10) << ": delaying shutdown -- init in progress" << dendl;
+ m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
+ if (r < 0) {
+ on_finish->complete(0);
+ return;
+ }
+ m_on_shutdown_finish = on_finish;
+ shutdown_peer_replayers();
+ });
+ return;
+ }
+
+ m_on_shutdown_finish = on_finish;
+ }
+
+ shutdown_peer_replayers();
+}
+
+void FSMirror::shutdown_peer_replayers() {
+ dout(20) << dendl;
+
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ dout(5) << ": shutting down replayer for peer=" << peer << dendl;
+ shutdown_replayer(peer_replayer.get());
+ }
+ m_peer_replayers.clear();
+
+ shutdown_mirror_watcher();
+}
+
+void FSMirror::init_instance_watcher(Context *on_finish) {
+ dout(20) << dendl;
+
+ m_on_init_finish = new LambdaContext([this, on_finish](int r) {
+ {
+ std::scoped_lock locker(m_lock);
+ if (r < 0) {
+ m_init_failed = true;
+ }
+ }
+ on_finish->complete(r);
+ if (m_on_shutdown_finish != nullptr) {
+ m_on_shutdown_finish->complete(r);
+ }
+ });
+
+ Context *ctx = new C_CallbackAdapter<
+ FSMirror, &FSMirror::handle_init_instance_watcher>(this);
+ m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue);
+ m_instance_watcher->init(ctx);
+}
+
+void FSMirror::handle_init_instance_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_init_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ if (r < 0) {
+ std::swap(on_init_finish, m_on_init_finish);
+ }
+ }
+
+ if (on_init_finish != nullptr) {
+ on_init_finish->complete(r);
+ return;
+ }
+
+ init_mirror_watcher();
+}
+
+void FSMirror::init_mirror_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *ctx = new C_CallbackAdapter<
+ FSMirror, &FSMirror::handle_init_mirror_watcher>(this);
+ m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue);
+ m_mirror_watcher->init(ctx);
+}
+
+void FSMirror::handle_init_mirror_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_init_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ if (r == 0) {
+ std::swap(on_init_finish, m_on_init_finish);
+ }
+ }
+
+ if (on_init_finish != nullptr) {
+ on_init_finish->complete(r);
+ return;
+ }
+
+ m_retval = r; // save errcode for init context callback
+ shutdown_instance_watcher();
+}
+
+void FSMirror::shutdown_mirror_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *ctx = new C_CallbackAdapter<
+ FSMirror, &FSMirror::handle_shutdown_mirror_watcher>(this);
+ m_mirror_watcher->shutdown(ctx);
+}
+
+void FSMirror::handle_shutdown_mirror_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ shutdown_instance_watcher();
+}
+
+void FSMirror::shutdown_instance_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *ctx = new C_CallbackAdapter<
+ FSMirror, &FSMirror::handle_shutdown_instance_watcher>(this);
+ m_instance_watcher->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, ctx));
+}
+
+void FSMirror::handle_shutdown_instance_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ cleanup();
+
+ Context *on_init_finish = nullptr;
+ Context *on_shutdown_finish = nullptr;
+
+ {
+ std::scoped_lock locker(m_lock);
+ std::swap(on_init_finish, m_on_init_finish);
+ std::swap(on_shutdown_finish, m_on_shutdown_finish);
+ }
+
+ if (on_init_finish != nullptr) {
+ on_init_finish->complete(m_retval);
+ }
+ if (on_shutdown_finish != nullptr) {
+ on_shutdown_finish->complete(r);
+ }
+}
+
+void FSMirror::handle_acquire_directory(string_view dir_path) {
+ dout(5) << ": dir_path=" << dir_path << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ m_directories.emplace(dir_path);
+ m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+ m_directories.size());
+
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ dout(10) << ": peer=" << peer << dendl;
+ peer_replayer->add_directory(dir_path);
+ }
+ }
+}
+
+void FSMirror::handle_release_directory(string_view dir_path) {
+ dout(5) << ": dir_path=" << dir_path << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto it = m_directories.find(dir_path);
+ if (it != m_directories.end()) {
+ m_directories.erase(it);
+ m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+ m_directories.size());
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ dout(10) << ": peer=" << peer << dendl;
+ peer_replayer->remove_directory(dir_path);
+ }
+ }
+ }
+}
+
+void FSMirror::add_peer(const Peer &peer) {
+ dout(10) << ": peer=" << peer << dendl;
+
+ std::scoped_lock locker(m_lock);
+ m_all_peers.emplace(peer);
+ if (m_peer_replayers.find(peer) != m_peer_replayers.end()) {
+ return;
+ }
+
+ auto replayer = std::make_unique<PeerReplayer>(
+ m_cct, this, m_cluster, m_filesystem, peer, m_directories, m_mount, m_service_daemon);
+ int r = init_replayer(replayer.get());
+ if (r < 0) {
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer,
+ SERVICE_DAEMON_PEER_INIT_FAILED_KEY,
+ true);
+ return;
+ }
+ m_peer_replayers.emplace(peer, std::move(replayer));
+ ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
+}
+
+void FSMirror::remove_peer(const Peer &peer) {
+ dout(10) << ": peer=" << peer << dendl;
+
+ std::unique_ptr<PeerReplayer> replayer;
+ {
+ std::scoped_lock locker(m_lock);
+ m_all_peers.erase(peer);
+ auto it = m_peer_replayers.find(peer);
+ if (it != m_peer_replayers.end()) {
+ replayer = std::move(it->second);
+ m_peer_replayers.erase(it);
+ }
+ }
+
+ if (replayer) {
+ dout(5) << ": shutting down replayers for peer=" << peer << dendl;
+ shutdown_replayer(replayer.get());
+ }
+}
+
+void FSMirror::mirror_status(Formatter *f) {
+ std::scoped_lock locker(m_lock);
+ f->open_object_section("status");
+ if (m_init_failed) {
+ f->dump_string("state", "failed");
+ } else if (is_blocklisted(locker)) {
+ f->dump_string("state", "blocklisted");
+ } else {
+ // dump rados addr for blocklist test
+ f->dump_string("rados_inst", m_addrs);
+ f->open_object_section("peers");
+ for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) {
+ peer.dump(f);
+ }
+ f->close_section(); // peers
+ f->open_object_section("snap_dirs");
+ f->dump_int("dir_count", m_directories.size());
+ f->close_section(); // snap_dirs
+ }
+ f->close_section(); // status
+}
+
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/FSMirror.h b/src/tools/cephfs_mirror/FSMirror.h
new file mode 100644
index 000000000..a9c1fab10
--- /dev/null
+++ b/src/tools/cephfs_mirror/FSMirror.h
@@ -0,0 +1,182 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_FS_MIRROR_H
+#define CEPHFS_MIRROR_FS_MIRROR_H
+
+#include "common/Formatter.h"
+#include "common/Thread.h"
+#include "mds/FSMap.h"
+#include "Types.h"
+#include "InstanceWatcher.h"
+#include "MirrorWatcher.h"
+
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+class MirrorAdminSocketHook;
+class PeerReplayer;
+class ServiceDaemon;
+
+// handle mirroring for a filesystem to a set of peers
+
+class FSMirror {
+public:
+ FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
+ ServiceDaemon *service_daemon, std::vector<const char*> args,
+ ContextWQ *work_queue);
+ ~FSMirror();
+
+ void init(Context *on_finish);
+ void shutdown(Context *on_finish);
+
+ void add_peer(const Peer &peer);
+ void remove_peer(const Peer &peer);
+
+ bool is_stopping() {
+ std::scoped_lock locker(m_lock);
+ return m_stopping;
+ }
+
+ bool is_init_failed() {
+ std::scoped_lock locker(m_lock);
+ return m_init_failed;
+ }
+
+ bool is_failed() {
+ std::scoped_lock locker(m_lock);
+ return m_init_failed ||
+ m_instance_watcher->is_failed() ||
+ m_mirror_watcher->is_failed();
+ }
+
+ utime_t get_failed_ts() {
+ std::scoped_lock locker(m_lock);
+ if (m_instance_watcher) {
+ return m_instance_watcher->get_failed_ts();
+ }
+ if (m_mirror_watcher) {
+ return m_mirror_watcher->get_failed_ts();
+ }
+
+ return utime_t();
+ }
+
+ bool is_blocklisted() {
+ std::scoped_lock locker(m_lock);
+ return is_blocklisted(locker);
+ }
+
+ utime_t get_blocklisted_ts() {
+ std::scoped_lock locker(m_lock);
+ if (m_instance_watcher) {
+ return m_instance_watcher->get_blocklisted_ts();
+ }
+ if (m_mirror_watcher) {
+ return m_mirror_watcher->get_blocklisted_ts();
+ }
+
+ return utime_t();
+ }
+
+ Peers get_peers() {
+ std::scoped_lock locker(m_lock);
+ return m_all_peers;
+ }
+
+ std::string get_instance_addr() {
+ std::scoped_lock locker(m_lock);
+ return m_addrs;
+ }
+
+ // admin socket helpers
+ void mirror_status(Formatter *f);
+
+ void reopen_logs();
+
+private:
+ bool is_blocklisted(const std::scoped_lock<ceph::mutex> &locker) const {
+ bool blocklisted = false;
+ if (m_instance_watcher) {
+ blocklisted = m_instance_watcher->is_blocklisted();
+ }
+ if (m_mirror_watcher) {
+ blocklisted |= m_mirror_watcher->is_blocklisted();
+ }
+
+ return blocklisted;
+ }
+
+ struct SnapListener : public InstanceWatcher::Listener {
+ FSMirror *fs_mirror;
+
+ SnapListener(FSMirror *fs_mirror)
+ : fs_mirror(fs_mirror) {
+ }
+
+ void acquire_directory(std::string_view dir_path) override {
+ fs_mirror->handle_acquire_directory(dir_path);
+ }
+
+ void release_directory(std::string_view dir_path) override {
+ fs_mirror->handle_release_directory(dir_path);
+ }
+ };
+
+ CephContext *m_cct;
+ Filesystem m_filesystem;
+ uint64_t m_pool_id;
+ ServiceDaemon *m_service_daemon;
+ std::vector<const char *> m_args;
+ ContextWQ *m_work_queue;
+
+ ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror");
+ SnapListener m_snap_listener;
+ std::set<std::string, std::less<>> m_directories;
+ Peers m_all_peers;
+ std::map<Peer, std::unique_ptr<PeerReplayer>> m_peer_replayers;
+
+ RadosRef m_cluster;
+ std::string m_addrs;
+ librados::IoCtx m_ioctx;
+ InstanceWatcher *m_instance_watcher = nullptr;
+ MirrorWatcher *m_mirror_watcher = nullptr;
+
+ int m_retval = 0;
+ bool m_stopping = false;
+ bool m_init_failed = false;
+ Context *m_on_init_finish = nullptr;
+ Context *m_on_shutdown_finish = nullptr;
+
+ MirrorAdminSocketHook *m_asok_hook = nullptr;
+
+ MountRef m_mount;
+
+ int init_replayer(PeerReplayer *peer_replayer);
+ void shutdown_replayer(PeerReplayer *peer_replayer);
+ void cleanup();
+
+ void init_instance_watcher(Context *on_finish);
+ void handle_init_instance_watcher(int r);
+
+ void init_mirror_watcher();
+ void handle_init_mirror_watcher(int r);
+
+ void shutdown_peer_replayers();
+
+ void shutdown_mirror_watcher();
+ void handle_shutdown_mirror_watcher(int r);
+
+ void shutdown_instance_watcher();
+ void handle_shutdown_instance_watcher(int r);
+
+ void handle_acquire_directory(std::string_view dir_path);
+ void handle_release_directory(std::string_view dir_path);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_FS_MIRROR_H
diff --git a/src/tools/cephfs_mirror/InstanceWatcher.cc b/src/tools/cephfs_mirror/InstanceWatcher.cc
new file mode 100644
index 000000000..b6a51a141
--- /dev/null
+++ b/src/tools/cephfs_mirror/InstanceWatcher.cc
@@ -0,0 +1,256 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/ceph_json.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "cls/cephfs/cls_cephfs_client.h"
+#include "include/stringify.h"
+#include "aio_utils.h"
+#include "InstanceWatcher.h"
+#include "Types.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::InstanceWatcher " << __func__
+
+using namespace std;
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+std::string instance_oid(const std::string &instance_id) {
+ return CEPHFS_MIRROR_OBJECT + "." + instance_id;
+}
+
+} // anonymous namespace
+
+InstanceWatcher::InstanceWatcher(librados::IoCtx &ioctx,
+ Listener &listener, ContextWQ *work_queue)
+ : Watcher(ioctx, instance_oid(stringify(ioctx.get_instance_id())), work_queue),
+ m_ioctx(ioctx),
+ m_listener(listener),
+ m_work_queue(work_queue),
+ m_lock(ceph::make_mutex("cephfs::mirror::instance_watcher")) {
+}
+
+InstanceWatcher::~InstanceWatcher() {
+}
+
+void InstanceWatcher::init(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_on_init_finish == nullptr);
+ m_on_init_finish = new LambdaContext([this, on_finish](int r) {
+ on_finish->complete(r);
+ if (m_on_shutdown_finish != nullptr) {
+ m_on_shutdown_finish->complete(0);
+ }
+ });
+ }
+
+ create_instance();
+}
+
+void InstanceWatcher::shutdown(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_on_shutdown_finish == nullptr);
+ if (m_on_init_finish != nullptr) {
+ dout(10) << ": delaying shutdown -- init in progress" << dendl;
+ m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
+ m_on_shutdown_finish = nullptr;
+ shutdown(on_finish);
+ });
+ return;
+ }
+
+ m_on_shutdown_finish = on_finish;
+ }
+
+ unregister_watcher();
+}
+
+void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) {
+ dout(20) << dendl;
+
+ std::string dir_path;
+ std::string mode;
+ try {
+ JSONDecoder jd(bl);
+ JSONDecoder::decode_json("dir_path", dir_path, &jd.parser, true);
+ JSONDecoder::decode_json("mode", mode, &jd.parser, true);
+ } catch (const JSONDecoder::err &e) {
+ derr << ": failed to decode notify json: " << e.what() << dendl;
+ }
+
+ dout(20) << ": notifier_id=" << notifier_id << ", dir_path=" << dir_path
+ << ", mode=" << mode << dendl;
+
+ if (mode == "acquire") {
+ m_listener.acquire_directory(dir_path);
+ } else if (mode == "release") {
+ m_listener.release_directory(dir_path);
+ } else {
+ derr << ": unknown mode" << dendl;
+ }
+
+ bufferlist outbl;
+ acknowledge_notify(notify_id, handle, outbl);
+}
+
+void InstanceWatcher::handle_rewatch_complete(int r) {
+ dout(5) << ": r=" << r << dendl;
+
+ if (r == -EBLOCKLISTED) {
+ dout(0) << ": client blocklisted" <<dendl;
+ std::scoped_lock locker(m_lock);
+ m_blocklisted = true;
+ m_blocklisted_ts = ceph_clock_now();
+ } else if (r == -ENOENT) {
+ derr << ": mirroring object deleted" << dendl;
+ m_failed = true;
+ m_failed_ts = ceph_clock_now();
+ } else if (r < 0) {
+ derr << ": rewatch error: " << cpp_strerror(r) << dendl;
+ m_failed = true;
+ m_failed_ts = ceph_clock_now();
+ }
+}
+
+void InstanceWatcher::create_instance() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ librados::ObjectWriteOperation op;
+ op.create(false);
+
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(
+ this, &rados_callback<InstanceWatcher, &InstanceWatcher::handle_create_instance>);
+ int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void InstanceWatcher::handle_create_instance(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_init_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ if (r < 0) {
+ std::swap(on_init_finish, m_on_init_finish);
+ }
+ }
+
+ if (on_init_finish != nullptr) {
+ on_init_finish->complete(r);
+ return;
+ }
+
+ register_watcher();
+}
+
+void InstanceWatcher::register_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *on_finish = new C_CallbackAdapter<
+ InstanceWatcher, &InstanceWatcher::handle_register_watcher>(this);
+ register_watch(on_finish);
+}
+
+void InstanceWatcher::handle_register_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_init_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ if (r == 0) {
+ std::swap(on_init_finish, m_on_init_finish);
+ }
+ }
+
+ if (on_init_finish != nullptr) {
+ on_init_finish->complete(r);
+ return;
+ }
+
+ remove_instance();
+}
+
+void InstanceWatcher::unregister_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *on_finish = new C_CallbackAdapter<
+ InstanceWatcher, &InstanceWatcher::handle_unregister_watcher>(this);
+ unregister_watch(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+}
+
+void InstanceWatcher::handle_unregister_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_shutdown_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ if (r < 0) {
+ std::swap(on_shutdown_finish, m_on_shutdown_finish);
+ }
+ }
+
+ if (on_shutdown_finish != nullptr) {
+ on_shutdown_finish->complete(r);
+ return;
+ }
+
+ remove_instance();
+}
+
+void InstanceWatcher::remove_instance() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ librados::ObjectWriteOperation op;
+ op.remove();
+
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(
+ this, &rados_callback<InstanceWatcher, &InstanceWatcher::handle_remove_instance>);
+ int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void InstanceWatcher::handle_remove_instance(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_init_finish = nullptr;
+ Context *on_shutdown_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ std::swap(on_init_finish, m_on_init_finish);
+ std::swap(on_shutdown_finish, m_on_shutdown_finish);
+ }
+
+ if (on_init_finish != nullptr) {
+ on_init_finish->complete(r);
+ }
+ if (on_shutdown_finish != nullptr) {
+ on_shutdown_finish->complete(r);
+ }
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/InstanceWatcher.h b/src/tools/cephfs_mirror/InstanceWatcher.h
new file mode 100644
index 000000000..a07400096
--- /dev/null
+++ b/src/tools/cephfs_mirror/InstanceWatcher.h
@@ -0,0 +1,98 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_INSTANCE_WATCHER_H
+#define CEPHFS_MIRROR_INSTANCE_WATCHER_H
+
+#include <string_view>
+
+#include "common/ceph_mutex.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "Watcher.h"
+
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+// watch directory update notifications via per daemon rados
+// object and invoke listener callback.
+
+class InstanceWatcher : public Watcher {
+public:
+ struct Listener {
+ virtual ~Listener() {
+ }
+
+ virtual void acquire_directory(std::string_view dir_path) = 0;
+ virtual void release_directory(std::string_view dir_path) = 0;
+ };
+
+ static InstanceWatcher *create(librados::IoCtx &ioctx,
+ Listener &listener, ContextWQ *work_queue) {
+ return new InstanceWatcher(ioctx, listener, work_queue);
+ }
+
+ InstanceWatcher(librados::IoCtx &ioctx, Listener &listener, ContextWQ *work_queue);
+ ~InstanceWatcher();
+
+ void init(Context *on_finish);
+ void shutdown(Context *on_finish);
+
+ void handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) override;
+ void handle_rewatch_complete(int r) override;
+
+ bool is_blocklisted() {
+ std::scoped_lock locker(m_lock);
+ return m_blocklisted;
+ }
+
+ utime_t get_blocklisted_ts() {
+ std::scoped_lock locker(m_lock);
+ return m_blocklisted_ts;
+ }
+
+ bool is_failed() {
+ std::scoped_lock locker(m_lock);
+ return m_failed;
+ }
+
+ utime_t get_failed_ts() {
+ std::scoped_lock locker(m_lock);
+ return m_failed_ts;
+ }
+
+private:
+ librados::IoCtx &m_ioctx;
+ Listener &m_listener;
+ ContextWQ *m_work_queue;
+
+ ceph::mutex m_lock;
+ Context *m_on_init_finish = nullptr;
+ Context *m_on_shutdown_finish = nullptr;
+
+ bool m_blocklisted = false;
+ bool m_failed = false;
+
+ utime_t m_blocklisted_ts;
+ utime_t m_failed_ts;
+
+ void create_instance();
+ void handle_create_instance(int r);
+
+ void register_watcher();
+ void handle_register_watcher(int r);
+
+ void remove_instance();
+ void handle_remove_instance(int r);
+
+ void unregister_watcher();
+ void handle_unregister_watcher(int r);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_INSTANCE_WATCHER_H
diff --git a/src/tools/cephfs_mirror/Mirror.cc b/src/tools/cephfs_mirror/Mirror.cc
new file mode 100644
index 000000000..edf903b92
--- /dev/null
+++ b/src/tools/cephfs_mirror/Mirror.cc
@@ -0,0 +1,589 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_context.h"
+#include "common/common_init.h"
+#include "common/Cond.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "include/types.h"
+#include "mon/MonClient.h"
+#include "msg/Messenger.h"
+#include "aio_utils.h"
+#include "Mirror.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::Mirror " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+const std::string SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY("mirroring_failed");
+
+class SafeTimerSingleton : public CommonSafeTimer<ceph::mutex> {
+public:
+ ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock");
+
+ explicit SafeTimerSingleton(CephContext *cct)
+ : SafeTimer(cct, timer_lock, true) {
+ init();
+ }
+};
+
+class ThreadPoolSingleton : public ThreadPool {
+public:
+ ContextWQ *work_queue = nullptr;
+
+ explicit ThreadPoolSingleton(CephContext *cct)
+ : ThreadPool(cct, "Mirror::thread_pool", "tp_mirror", 1) {
+ work_queue = new ContextWQ("Mirror::work_queue", ceph::make_timespan(60), this);
+
+ start();
+ }
+};
+
+} // anonymous namespace
+
+struct Mirror::C_EnableMirroring : Context {
+ Mirror *mirror;
+ Filesystem filesystem;
+ uint64_t pool_id;
+
+ C_EnableMirroring(Mirror *mirror, const Filesystem &filesystem, uint64_t pool_id)
+ : mirror(mirror),
+ filesystem(filesystem),
+ pool_id(pool_id) {
+ }
+
+ void finish(int r) override {
+ enable_mirroring();
+ }
+
+ void enable_mirroring() {
+ Context *ctx = new C_CallbackAdapter<C_EnableMirroring,
+ &C_EnableMirroring::handle_enable_mirroring>(this);
+ mirror->enable_mirroring(filesystem, pool_id, ctx);
+ }
+
+ void handle_enable_mirroring(int r) {
+ mirror->handle_enable_mirroring(filesystem, r);
+ delete this;
+ }
+
+ // context needs to live post completion
+ void complete(int r) override {
+ finish(r);
+ }
+};
+
+struct Mirror::C_DisableMirroring : Context {
+ Mirror *mirror;
+ Filesystem filesystem;
+
+ C_DisableMirroring(Mirror *mirror, const Filesystem &filesystem)
+ : mirror(mirror),
+ filesystem(filesystem) {
+ }
+
+ void finish(int r) override {
+ disable_mirroring();
+ }
+
+ void disable_mirroring() {
+ Context *ctx = new C_CallbackAdapter<C_DisableMirroring,
+ &C_DisableMirroring::handle_disable_mirroring>(this);
+ mirror->disable_mirroring(filesystem, ctx);
+ }
+
+ void handle_disable_mirroring(int r) {
+ mirror->handle_disable_mirroring(filesystem, r);
+ delete this;
+ }
+
+ // context needs to live post completion
+ void complete(int r) override {
+ finish(r);
+ }
+};
+
+struct Mirror::C_PeerUpdate : Context {
+ Mirror *mirror;
+ Filesystem filesystem;
+ Peer peer;
+ bool remove = false;
+
+ C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem,
+ const Peer &peer)
+ : mirror(mirror),
+ filesystem(filesystem),
+ peer(peer) {
+ }
+ C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem,
+ const Peer &peer, bool remove)
+ : mirror(mirror),
+ filesystem(filesystem),
+ peer(peer),
+ remove(remove) {
+ }
+
+ void finish(int r) override {
+ if (remove) {
+ mirror->remove_peer(filesystem, peer);
+ } else {
+ mirror->add_peer(filesystem, peer);
+ }
+ }
+};
+
+struct Mirror::C_RestartMirroring : Context {
+ Mirror *mirror;
+ Filesystem filesystem;
+ uint64_t pool_id;
+ Peers peers;
+
+ C_RestartMirroring(Mirror *mirror, const Filesystem &filesystem,
+ uint64_t pool_id, const Peers &peers)
+ : mirror(mirror),
+ filesystem(filesystem),
+ pool_id(pool_id),
+ peers(peers) {
+ }
+
+ void finish(int r) override {
+ disable_mirroring();
+ }
+
+ void disable_mirroring() {
+ Context *ctx = new C_CallbackAdapter<C_RestartMirroring,
+ &C_RestartMirroring::handle_disable_mirroring>(this);
+ mirror->disable_mirroring(filesystem, ctx);
+ }
+
+ void handle_disable_mirroring(int r) {
+ enable_mirroring();
+ }
+
+ void enable_mirroring() {
+ std::scoped_lock locker(mirror->m_lock);
+ Context *ctx = new C_CallbackAdapter<C_RestartMirroring,
+ &C_RestartMirroring::handle_enable_mirroring>(this);
+ mirror->enable_mirroring(filesystem, pool_id, ctx, true);
+ }
+
+ void handle_enable_mirroring(int r) {
+ mirror->handle_enable_mirroring(filesystem, peers, r);
+ mirror->_unset_restarting(filesystem);
+ delete this;
+ }
+
+ // context needs to live post completion
+ void complete(int r) override {
+ finish(r);
+ }
+};
+
+Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
+ MonClient *monc, Messenger *msgr)
+ : m_cct(cct),
+ m_args(args),
+ m_monc(monc),
+ m_msgr(msgr),
+ m_listener(this),
+ m_local(new librados::Rados()) {
+ auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+ "cephfs::mirror::thread_pool", false, cct));
+ auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+ "cephfs::mirror::safe_timer", false, cct));
+ m_thread_pool = thread_pool;
+ m_work_queue = thread_pool->work_queue;
+ m_timer = safe_timer;
+ m_timer_lock = &safe_timer->timer_lock;
+ std::scoped_lock timer_lock(*m_timer_lock);
+ schedule_mirror_update_task();
+}
+
+Mirror::~Mirror() {
+ dout(10) << dendl;
+ {
+ std::scoped_lock timer_lock(*m_timer_lock);
+ m_timer->shutdown();
+ }
+
+ m_work_queue->drain();
+ delete m_work_queue;
+ {
+ std::scoped_lock locker(m_lock);
+ m_thread_pool->stop();
+ }
+}
+
+int Mirror::init_mon_client() {
+ dout(20) << dendl;
+
+ m_monc->set_messenger(m_msgr);
+ m_msgr->add_dispatcher_head(m_monc);
+ m_monc->set_want_keys(CEPH_ENTITY_TYPE_MON);
+
+ int r = m_monc->init();
+ if (r < 0) {
+ derr << ": failed to init mon client: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = m_monc->authenticate(std::chrono::duration<double>(m_cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count());
+ if (r < 0) {
+ derr << ": failed to authenticate to monitor: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ client_t me = m_monc->get_global_id();
+ m_msgr->set_myname(entity_name_t::CLIENT(me.v));
+ return 0;
+}
+
+int Mirror::init(std::string &reason) {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+
+ int r = m_local->init_with_context(m_cct);
+ if (r < 0) {
+ derr << ": could not initialize rados handler" << dendl;
+ return r;
+ }
+
+ r = m_local->connect();
+ if (r < 0) {
+ derr << ": error connecting to local cluster" << dendl;
+ return r;
+ }
+
+ m_service_daemon = std::make_unique<ServiceDaemon>(m_cct, m_local);
+ r = m_service_daemon->init();
+ if (r < 0) {
+ derr << ": error registering service daemon: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = init_mon_client();
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+void Mirror::shutdown() {
+ dout(20) << dendl;
+ m_stopping = true;
+ m_cluster_watcher->shutdown();
+ m_cond.notify_all();
+}
+
+void Mirror::reopen_logs() {
+ for (auto &[filesystem, mirror_action] : m_mirror_actions) {
+ mirror_action.fs_mirror->reopen_logs();
+ }
+ g_ceph_context->reopen_logs();
+}
+
+void Mirror::handle_signal(int signum) {
+ dout(10) << ": signal=" << signum << dendl;
+
+ std::scoped_lock locker(m_lock);
+ switch (signum) {
+ case SIGHUP:
+ reopen_logs();
+ break;
+ case SIGINT:
+ case SIGTERM:
+ shutdown();
+ break;
+ default:
+ ceph_abort_msgf("unexpected signal %d", signum);
+ }
+}
+
+void Mirror::handle_enable_mirroring(const Filesystem &filesystem,
+ const Peers &peers, int r) {
+ dout(20) << ": filesystem=" << filesystem << ", peers=" << peers
+ << ", r=" << r << dendl;
+
+ std::scoped_lock locker(m_lock);
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.action_in_progress);
+
+ mirror_action.action_in_progress = false;
+ m_cond.notify_all();
+ if (r < 0) {
+ derr << ": failed to initialize FSMirror for filesystem=" << filesystem
+ << ": " << cpp_strerror(r) << dendl;
+ m_service_daemon->add_or_update_fs_attribute(filesystem.fscid,
+ SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY,
+ true);
+ return;
+ }
+
+ for (auto &peer : peers) {
+ mirror_action.fs_mirror->add_peer(peer);
+ }
+
+ dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl;
+}
+
+void Mirror::handle_enable_mirroring(const Filesystem &filesystem, int r) {
+ dout(20) << ": filesystem=" << filesystem << ", r=" << r << dendl;
+
+ std::scoped_lock locker(m_lock);
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.action_in_progress);
+
+ mirror_action.action_in_progress = false;
+ m_cond.notify_all();
+ if (r < 0) {
+ derr << ": failed to initialize FSMirror for filesystem=" << filesystem
+ << ": " << cpp_strerror(r) << dendl;
+ m_service_daemon->add_or_update_fs_attribute(filesystem.fscid,
+ SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY,
+ true);
+ return;
+ }
+
+ dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl;
+}
+
+void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
+ Context *on_finish, bool is_restart) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ if (is_restart) {
+ mirror_action.fs_mirror.reset();
+ } else {
+ ceph_assert(!mirror_action.action_in_progress);
+ }
+
+ ceph_assert(!mirror_action.fs_mirror);
+
+ dout(10) << ": starting FSMirror: filesystem=" << filesystem << dendl;
+
+ mirror_action.action_in_progress = true;
+ mirror_action.fs_mirror = std::make_unique<FSMirror>(m_cct, filesystem, local_pool_id,
+ m_service_daemon.get(), m_args, m_work_queue);
+ mirror_action.fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+}
+
+void Mirror::mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id) {
+ dout(10) << ": filesystem=" << filesystem << ", pool_id=" << local_pool_id << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (m_stopping) {
+ return;
+ }
+
+ auto p = m_mirror_actions.emplace(filesystem, MirrorAction(local_pool_id));
+ auto &mirror_action = p.first->second;
+ mirror_action.action_ctxs.push_back(new C_EnableMirroring(this, filesystem, local_pool_id));
+}
+
+void Mirror::handle_disable_mirroring(const Filesystem &filesystem, int r) {
+ dout(10) << ": filesystem=" << filesystem << ", r=" << r << dendl;
+
+ std::scoped_lock locker(m_lock);
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+
+ if (!mirror_action.fs_mirror->is_init_failed()) {
+ ceph_assert(mirror_action.action_in_progress);
+ mirror_action.action_in_progress = false;
+ m_cond.notify_all();
+ }
+
+ if (!m_stopping) {
+ mirror_action.fs_mirror.reset();
+ if (mirror_action.action_ctxs.empty()) {
+ dout(10) << ": no pending actions for filesystem=" << filesystem << dendl;
+ m_mirror_actions.erase(filesystem);
+ }
+ }
+}
+
+void Mirror::disable_mirroring(const Filesystem &filesystem, Context *on_finish) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.fs_mirror);
+ ceph_assert(!mirror_action.action_in_progress);
+
+ if (mirror_action.fs_mirror->is_init_failed()) {
+ dout(10) << ": init failed for filesystem=" << filesystem << dendl;
+ m_work_queue->queue(on_finish, -EINVAL);
+ return;
+ }
+
+ mirror_action.action_in_progress = true;
+ mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+}
+
+void Mirror::mirroring_disabled(const Filesystem &filesystem) {
+ dout(10) << ": filesystem=" << filesystem << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (m_stopping) {
+ dout(5) << "shutting down" << dendl;
+ return;
+ }
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ mirror_action.action_ctxs.push_back(new C_DisableMirroring(this, filesystem));
+}
+
+void Mirror::add_peer(const Filesystem &filesystem, const Peer &peer) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.fs_mirror);
+ ceph_assert(!mirror_action.action_in_progress);
+
+ mirror_action.fs_mirror->add_peer(peer);
+}
+
+void Mirror::peer_added(const Filesystem &filesystem, const Peer &peer) {
+ dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (m_stopping) {
+ dout(5) << "shutting down" << dendl;
+ return;
+ }
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer));
+}
+
+void Mirror::remove_peer(const Filesystem &filesystem, const Peer &peer) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ ceph_assert(mirror_action.fs_mirror);
+ ceph_assert(!mirror_action.action_in_progress);
+
+ mirror_action.fs_mirror->remove_peer(peer);
+}
+
+void Mirror::peer_removed(const Filesystem &filesystem, const Peer &peer) {
+ dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (m_stopping) {
+ dout(5) << "shutting down" << dendl;
+ return;
+ }
+
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer, true));
+}
+
+void Mirror::update_fs_mirrors() {
+ dout(20) << dendl;
+
+ auto now = ceph_clock_now();
+ double blocklist_interval = g_ceph_context->_conf.get_val<std::chrono::seconds>
+ ("cephfs_mirror_restart_mirror_on_blocklist_interval").count();
+ double failed_interval = g_ceph_context->_conf.get_val<std::chrono::seconds>
+ ("cephfs_mirror_restart_mirror_on_failure_interval").count();
+
+ {
+ std::scoped_lock locker(m_lock);
+ for (auto &[filesystem, mirror_action] : m_mirror_actions) {
+ auto failed_restart = mirror_action.fs_mirror && mirror_action.fs_mirror->is_failed() &&
+ (failed_interval > 0 && (mirror_action.fs_mirror->get_failed_ts() - now) > failed_interval);
+ auto blocklisted_restart = mirror_action.fs_mirror && mirror_action.fs_mirror->is_blocklisted() &&
+ (blocklist_interval > 0 && (mirror_action.fs_mirror->get_blocklisted_ts() - now) > blocklist_interval);
+
+ if (!mirror_action.action_in_progress && !_is_restarting(filesystem)) {
+ if (failed_restart || blocklisted_restart) {
+ dout(5) << ": filesystem=" << filesystem << " failed mirroring (failed: "
+ << failed_restart << ", blocklisted: " << blocklisted_restart << dendl;
+ _set_restarting(filesystem);
+ auto peers = mirror_action.fs_mirror->get_peers();
+ auto ctx = new C_RestartMirroring(this, filesystem, mirror_action.pool_id, peers);
+ ctx->complete(0);
+ }
+ }
+
+ if (!failed_restart && !blocklisted_restart && !mirror_action.action_ctxs.empty()
+ && !mirror_action.action_in_progress) {
+ auto ctx = std::move(mirror_action.action_ctxs.front());
+ mirror_action.action_ctxs.pop_front();
+ ctx->complete(0);
+ }
+ }
+ }
+
+ schedule_mirror_update_task();
+}
+
+void Mirror::schedule_mirror_update_task() {
+ ceph_assert(m_timer_task == nullptr);
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+
+ m_timer_task = new LambdaContext([this](int _) {
+ m_timer_task = nullptr;
+ update_fs_mirrors();
+ });
+ double after = g_ceph_context->_conf.get_val<std::chrono::seconds>
+ ("cephfs_mirror_action_update_interval").count();
+ dout(20) << ": scheduling fs mirror update (" << m_timer_task << ") after "
+ << after << " seconds" << dendl;
+ m_timer->add_event_after(after, m_timer_task);
+}
+
+void Mirror::run() {
+ dout(20) << dendl;
+
+ std::unique_lock locker(m_lock);
+ m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_service_daemon.get(), m_listener));
+ m_msgr->add_dispatcher_tail(m_cluster_watcher.get());
+
+ m_cluster_watcher->init();
+ m_cond.wait(locker, [this]{return m_stopping;});
+
+ locker.unlock();
+ {
+ std::scoped_lock timer_lock(*m_timer_lock);
+ if (m_timer_task != nullptr) {
+ dout(10) << ": canceling timer task=" << m_timer_task << dendl;
+ m_timer->cancel_event(m_timer_task);
+ m_timer_task = nullptr;
+ }
+ }
+ locker.lock();
+
+ for (auto &[filesystem, mirror_action] : m_mirror_actions) {
+ dout(10) << ": trying to shutdown filesystem=" << filesystem << dendl;
+ // wait for in-progress action and shutdown
+ m_cond.wait(locker, [&mirror_action=mirror_action]
+ {return !mirror_action.action_in_progress;});
+ if (mirror_action.fs_mirror &&
+ !mirror_action.fs_mirror->is_stopping() &&
+ !mirror_action.fs_mirror->is_init_failed()) {
+ C_SaferCond cond;
+ mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, &cond));
+ int r = cond.wait();
+ dout(10) << ": shutdown filesystem=" << filesystem << ", r=" << r << dendl;
+ }
+
+ mirror_action.fs_mirror.reset();
+ }
+}
+
+} // namespace mirror
+} // namespace cephfs
+
diff --git a/src/tools/cephfs_mirror/Mirror.h b/src/tools/cephfs_mirror/Mirror.h
new file mode 100644
index 000000000..2081b5b53
--- /dev/null
+++ b/src/tools/cephfs_mirror/Mirror.h
@@ -0,0 +1,153 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_H
+#define CEPHFS_MIRROR_H
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "common/ceph_mutex.h"
+#include "common/WorkQueue.h"
+#include "mds/FSMap.h"
+#include "ClusterWatcher.h"
+#include "FSMirror.h"
+#include "ServiceDaemon.h"
+#include "Types.h"
+
+class Messenger;
+class MonClient;
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+// this wraps up ClusterWatcher and FSMirrors to implement mirroring
+// for ceph filesystems.
+
+class Mirror {
+public:
+ Mirror(CephContext *cct, const std::vector<const char*> &args,
+ MonClient *monc, Messenger *msgr);
+ ~Mirror();
+
+ int init(std::string &reason);
+ void shutdown();
+ void run();
+
+ void handle_signal(int signum);
+
+private:
+ static constexpr std::string_view MIRRORING_MODULE = "mirroring";
+
+ struct C_EnableMirroring;
+ struct C_DisableMirroring;
+ struct C_PeerUpdate;
+ struct C_RestartMirroring;
+
+ struct ClusterListener : ClusterWatcher::Listener {
+ Mirror *mirror;
+
+ ClusterListener(Mirror *mirror)
+ : mirror(mirror) {
+ }
+
+ void handle_mirroring_enabled(const FilesystemSpec &spec) override {
+ mirror->mirroring_enabled(spec.filesystem, spec.pool_id);
+ }
+
+ void handle_mirroring_disabled(const Filesystem &filesystem) override {
+ mirror->mirroring_disabled(filesystem);
+ }
+
+ void handle_peers_added(const Filesystem &filesystem, const Peer &peer) override {
+ mirror->peer_added(filesystem, peer);
+ }
+
+ void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) override {
+ mirror->peer_removed(filesystem, peer);
+ }
+ };
+
+ struct MirrorAction {
+ MirrorAction(uint64_t pool_id) :
+ pool_id(pool_id) {
+ }
+
+ uint64_t pool_id; // for restarting blocklisted mirror instance
+ bool action_in_progress = false;
+ bool restarting = false;
+ std::list<Context *> action_ctxs;
+ std::unique_ptr<FSMirror> fs_mirror;
+ };
+
+ ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::Mirror");
+ ceph::condition_variable m_cond;
+
+ CephContext *m_cct;
+ std::vector<const char *> m_args;
+ MonClient *m_monc;
+ Messenger *m_msgr;
+ ClusterListener m_listener;
+
+ ThreadPool *m_thread_pool = nullptr;
+ ContextWQ *m_work_queue = nullptr;
+ SafeTimer *m_timer = nullptr;
+ ceph::mutex *m_timer_lock = nullptr;
+ Context *m_timer_task = nullptr;
+
+ bool m_stopping = false;
+ std::unique_ptr<ClusterWatcher> m_cluster_watcher;
+ std::map<Filesystem, MirrorAction> m_mirror_actions;
+
+ RadosRef m_local;
+ std::unique_ptr<ServiceDaemon> m_service_daemon;
+
+ int init_mon_client();
+
+ // called via listener
+ void mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id);
+ void mirroring_disabled(const Filesystem &filesystem);
+ void peer_added(const Filesystem &filesystem, const Peer &peer);
+ void peer_removed(const Filesystem &filesystem, const Peer &peer);
+
+ // mirror enable callback
+ void enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
+ Context *on_finish, bool is_restart=false);
+ void handle_enable_mirroring(const Filesystem &filesystem, int r);
+ void handle_enable_mirroring(const Filesystem &filesystem, const Peers &peers, int r);
+
+ // mirror disable callback
+ void disable_mirroring(const Filesystem &filesystem, Context *on_finish);
+ void handle_disable_mirroring(const Filesystem &filesystem, int r);
+
+ // peer update callback
+ void add_peer(const Filesystem &filesystem, const Peer &peer);
+ void remove_peer(const Filesystem &filesystem, const Peer &peer);
+
+ void schedule_mirror_update_task();
+ void update_fs_mirrors();
+
+ void reopen_logs();
+
+ void _set_restarting(const Filesystem &filesystem) {
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ mirror_action.restarting = true;
+ }
+
+ void _unset_restarting(const Filesystem &filesystem) {
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ mirror_action.restarting = false;
+ }
+
+ bool _is_restarting(const Filesystem &filesystem) {
+ auto &mirror_action = m_mirror_actions.at(filesystem);
+ return mirror_action.restarting;
+ }
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_H
diff --git a/src/tools/cephfs_mirror/MirrorWatcher.cc b/src/tools/cephfs_mirror/MirrorWatcher.cc
new file mode 100644
index 000000000..b3770d103
--- /dev/null
+++ b/src/tools/cephfs_mirror/MirrorWatcher.cc
@@ -0,0 +1,151 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/ceph_json.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "include/stringify.h"
+#include "msg/Messenger.h"
+#include "aio_utils.h"
+#include "MirrorWatcher.h"
+#include "FSMirror.h"
+#include "Types.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::MirrorWatcher " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+MirrorWatcher::MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror,
+ ContextWQ *work_queue)
+ : Watcher(ioctx, CEPHFS_MIRROR_OBJECT, work_queue),
+ m_ioctx(ioctx),
+ m_fs_mirror(fs_mirror),
+ m_work_queue(work_queue),
+ m_lock(ceph::make_mutex("cephfs::mirror::mirror_watcher")),
+ m_instance_id(stringify(m_ioctx.get_instance_id())) {
+}
+
+MirrorWatcher::~MirrorWatcher() {
+}
+
+void MirrorWatcher::init(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_on_init_finish == nullptr);
+ m_on_init_finish = new LambdaContext([this, on_finish](int r) {
+ on_finish->complete(r);
+ if (m_on_shutdown_finish != nullptr) {
+ m_on_shutdown_finish->complete(0);
+ }
+ });
+ }
+
+ register_watcher();
+}
+
+void MirrorWatcher::shutdown(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_on_shutdown_finish == nullptr);
+ if (m_on_init_finish != nullptr) {
+ dout(10) << ": delaying shutdown -- init in progress" << dendl;
+ m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
+ m_on_shutdown_finish = nullptr;
+ shutdown(on_finish);
+ });
+ return;
+ }
+
+ m_on_shutdown_finish = on_finish;
+ }
+
+ unregister_watcher();
+}
+
+void MirrorWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) {
+ dout(20) << dendl;
+
+ JSONFormatter f;
+ f.open_object_section("info");
+ encode_json("addr", m_fs_mirror->get_instance_addr(), &f);
+ f.close_section();
+
+ bufferlist outbl;
+ f.flush(outbl);
+ acknowledge_notify(notify_id, handle, outbl);
+}
+
+void MirrorWatcher::handle_rewatch_complete(int r) {
+ dout(5) << ": r=" << r << dendl;
+
+ if (r == -EBLOCKLISTED) {
+ dout(0) << ": client blocklisted" <<dendl;
+ std::scoped_lock locker(m_lock);
+ m_blocklisted = true;
+ m_blocklisted_ts = ceph_clock_now();
+ } else if (r == -ENOENT) {
+ derr << ": mirroring object deleted" << dendl;
+ m_failed = true;
+ m_failed_ts = ceph_clock_now();
+ } else if (r < 0) {
+ derr << ": rewatch error: " << cpp_strerror(r) << dendl;
+ m_failed = true;
+ m_failed_ts = ceph_clock_now();
+ }
+}
+
+void MirrorWatcher::register_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *on_finish = new C_CallbackAdapter<
+ MirrorWatcher, &MirrorWatcher::handle_register_watcher>(this);
+ register_watch(on_finish);
+}
+
+void MirrorWatcher::handle_register_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_init_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ std::swap(on_init_finish, m_on_init_finish);
+ }
+
+ on_init_finish->complete(r);
+}
+
+void MirrorWatcher::unregister_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *on_finish = new C_CallbackAdapter<
+ MirrorWatcher, &MirrorWatcher::handle_unregister_watcher>(this);
+ unregister_watch(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+}
+
+void MirrorWatcher::handle_unregister_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_shutdown_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ std::swap(on_shutdown_finish, m_on_shutdown_finish);
+ }
+
+ on_shutdown_finish->complete(r);
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/MirrorWatcher.h b/src/tools/cephfs_mirror/MirrorWatcher.h
new file mode 100644
index 000000000..54e185b95
--- /dev/null
+++ b/src/tools/cephfs_mirror/MirrorWatcher.h
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_MIRROR_WATCHER_H
+#define CEPHFS_MIRROR_MIRROR_WATCHER_H
+
+#include <string_view>
+
+#include "common/ceph_mutex.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "Watcher.h"
+
+class ContextWQ;
+class Messenger;
+
+namespace cephfs {
+namespace mirror {
+
+class FSMirror;
+
+// watch for notifications via cephfs_mirror object (in metadata
+// pool). this is used sending keepalived with keepalive payload
+// being the rados instance address (used by the manager module
+// to blocklist when needed).
+
+class MirrorWatcher : public Watcher {
+public:
+ static MirrorWatcher *create(librados::IoCtx &ioctx, FSMirror *fs_mirror,
+ ContextWQ *work_queue) {
+ return new MirrorWatcher(ioctx, fs_mirror, work_queue);
+ }
+
+ MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror,
+ ContextWQ *work_queue);
+ ~MirrorWatcher();
+
+ void init(Context *on_finish);
+ void shutdown(Context *on_finish);
+
+ void handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) override;
+ void handle_rewatch_complete(int r) override;
+
+ bool is_blocklisted() {
+ std::scoped_lock locker(m_lock);
+ return m_blocklisted;
+ }
+
+ utime_t get_blocklisted_ts() {
+ std::scoped_lock locker(m_lock);
+ return m_blocklisted_ts;
+ }
+
+ bool is_failed() {
+ std::scoped_lock locker(m_lock);
+ return m_failed;
+ }
+
+ utime_t get_failed_ts() {
+ std::scoped_lock locker(m_lock);
+ return m_failed_ts;
+ }
+
+private:
+ librados::IoCtx &m_ioctx;
+ FSMirror *m_fs_mirror;
+ ContextWQ *m_work_queue;
+
+ ceph::mutex m_lock;
+ std::string m_instance_id;
+
+ Context *m_on_init_finish = nullptr;
+ Context *m_on_shutdown_finish = nullptr;
+
+ bool m_blocklisted = false;
+ bool m_failed = false;
+
+ utime_t m_blocklisted_ts;
+ utime_t m_failed_ts;
+
+ void register_watcher();
+ void handle_register_watcher(int r);
+
+ void unregister_watcher();
+ void handle_unregister_watcher(int r);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_MIRROR_WATCHER_H
diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc
new file mode 100644
index 000000000..bd47046bb
--- /dev/null
+++ b/src/tools/cephfs_mirror/PeerReplayer.cc
@@ -0,0 +1,1581 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <stack>
+#include <fcntl.h>
+#include <algorithm>
+#include <sys/time.h>
+#include <sys/file.h>
+#include <boost/scope_exit.hpp>
+
+#include "common/admin_socket.h"
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "FSMirror.h"
+#include "PeerReplayer.h"
+#include "Utils.h"
+
+#include "json_spirit/json_spirit.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::PeerReplayer(" \
+ << m_peer.uuid << ") " << __func__
+
+using namespace std;
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+const std::string PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer";
+
+std::string snapshot_dir_path(CephContext *cct, const std::string &path) {
+ return path + "/" + cct->_conf->client_snapdir;
+}
+
+std::string snapshot_path(const std::string &snap_dir, const std::string &snap_name) {
+ return snap_dir + "/" + snap_name;
+}
+
+std::string snapshot_path(CephContext *cct, const std::string &path, const std::string &snap_name) {
+ return path + "/" + cct->_conf->client_snapdir + "/" + snap_name;
+}
+
+std::string entry_path(const std::string &dir, const std::string &name) {
+ return dir + "/" + name;
+}
+
+std::map<std::string, std::string> decode_snap_metadata(snap_metadata *snap_metadata,
+ size_t nr_snap_metadata) {
+ std::map<std::string, std::string> metadata;
+ for (size_t i = 0; i < nr_snap_metadata; ++i) {
+ metadata.emplace(snap_metadata[i].key, snap_metadata[i].value);
+ }
+
+ return metadata;
+}
+
+std::string peer_config_key(const std::string &fs_name, const std::string &uuid) {
+ return PEER_CONFIG_KEY_PREFIX + "/" + fs_name + "/" + uuid;
+}
+
+class PeerAdminSocketCommand {
+public:
+ virtual ~PeerAdminSocketCommand() {
+ }
+ virtual int call(Formatter *f) = 0;
+};
+
+class StatusCommand : public PeerAdminSocketCommand {
+public:
+ explicit StatusCommand(PeerReplayer *peer_replayer)
+ : peer_replayer(peer_replayer) {
+ }
+
+ int call(Formatter *f) override {
+ peer_replayer->peer_status(f);
+ return 0;
+ }
+
+private:
+ PeerReplayer *peer_replayer;
+};
+
+// helper to open a directory relative to a file descriptor
+int opendirat(MountRef mnt, int dirfd, const std::string &relpath, int flags,
+ ceph_dir_result **dirp) {
+ int r = ceph_openat(mnt, dirfd, relpath.c_str(), flags, 0);
+ if (r < 0) {
+ return r;
+ }
+
+ int fd = r;
+ r = ceph_fdopendir(mnt, fd, dirp);
+ ceph_close(mnt, fd);
+ return r;
+}
+
+} // anonymous namespace
+
+class PeerReplayerAdminSocketHook : public AdminSocketHook {
+public:
+ PeerReplayerAdminSocketHook(CephContext *cct, const Filesystem &filesystem,
+ const Peer &peer, PeerReplayer *peer_replayer)
+ : admin_socket(cct->get_admin_socket()) {
+ int r;
+ std::string cmd;
+
+ // mirror peer status format is name@id uuid
+ cmd = "fs mirror peer status "
+ + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid)
+ + " "
+ + stringify(peer.uuid);
+ r = admin_socket->register_command(
+ cmd, this, "get peer mirror status");
+ if (r == 0) {
+ commands[cmd] = new StatusCommand(peer_replayer);
+ }
+ }
+
+ ~PeerReplayerAdminSocketHook() override {
+ admin_socket->unregister_commands(this);
+ for (auto &[command, cmdptr] : commands) {
+ delete cmdptr;
+ }
+ }
+
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ const bufferlist&,
+ Formatter *f, std::ostream &errss, bufferlist &out) override {
+ auto p = commands.at(std::string(command));
+ return p->call(f);
+ }
+
+private:
+ typedef std::map<std::string, PeerAdminSocketCommand*, std::less<>> Commands;
+
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
+PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
+ RadosRef local_cluster, const Filesystem &filesystem,
+ const Peer &peer, const std::set<std::string, std::less<>> &directories,
+ MountRef mount, ServiceDaemon *service_daemon)
+ : m_cct(cct),
+ m_fs_mirror(fs_mirror),
+ m_local_cluster(local_cluster),
+ m_filesystem(filesystem),
+ m_peer(peer),
+ m_directories(directories.begin(), directories.end()),
+ m_local_mount(mount),
+ m_service_daemon(service_daemon),
+ m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
+ m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
+ // reset sync stats sent via service daemon
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+ SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+ SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, (uint64_t)0);
+}
+
+PeerReplayer::~PeerReplayer() {
+ delete m_asok_hook;
+}
+
+int PeerReplayer::init() {
+ dout(20) << ": initial dir list=[" << m_directories << "]" << dendl;
+ for (auto &dir_root : m_directories) {
+ m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
+ }
+
+ auto &remote_client = m_peer.remote.client_name;
+ auto &remote_cluster = m_peer.remote.cluster_name;
+ auto remote_filesystem = Filesystem{0, m_peer.remote.fs_name};
+
+ std::string key = peer_config_key(m_filesystem.fs_name, m_peer.uuid);
+ std::string cmd =
+ "{"
+ "\"prefix\": \"config-key get\", "
+ "\"key\": \"" + key + "\""
+ "}";
+
+ bufferlist in_bl;
+ bufferlist out_bl;
+
+ int r = m_local_cluster->mon_command(cmd, in_bl, &out_bl, nullptr);
+ dout(5) << ": mon command r=" << r << dendl;
+ if (r < 0 && r != -ENOENT) {
+ return r;
+ }
+
+ std::string mon_host;
+ std::string cephx_key;
+ if (!r) {
+ json_spirit::mValue root;
+ if (!json_spirit::read(out_bl.to_str(), root)) {
+ derr << ": invalid config-key JSON" << dendl;
+ return -EBADMSG;
+ }
+ try {
+ auto &root_obj = root.get_obj();
+ mon_host = root_obj.at("mon_host").get_str();
+ cephx_key = root_obj.at("key").get_str();
+ dout(0) << ": remote monitor host=" << mon_host << dendl;
+ } catch (std::runtime_error&) {
+ derr << ": unexpected JSON received" << dendl;
+ return -EBADMSG;
+ }
+ }
+
+ r = connect(remote_client, remote_cluster, &m_remote_cluster, mon_host, cephx_key);
+ if (r < 0) {
+ derr << ": error connecting to remote cluster: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ r = mount(m_remote_cluster, remote_filesystem, false, &m_remote_mount);
+ if (r < 0) {
+ m_remote_cluster.reset();
+ derr << ": error mounting remote filesystem=" << remote_filesystem << dendl;
+ return r;
+ }
+
+ std::scoped_lock locker(m_lock);
+ auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
+ "cephfs_mirror_max_concurrent_directory_syncs");
+ dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl;
+
+ while (nr_replayers-- > 0) {
+ std::unique_ptr<SnapshotReplayerThread> replayer(
+ new SnapshotReplayerThread(this));
+ std::string name("replayer-" + stringify(nr_replayers));
+ replayer->create(name.c_str());
+ m_replayers.push_back(std::move(replayer));
+ }
+
+ return 0;
+}
+
+void PeerReplayer::shutdown() {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(!m_stopping);
+ m_stopping = true;
+ m_cond.notify_all();
+ }
+
+ for (auto &replayer : m_replayers) {
+ replayer->join();
+ }
+ m_replayers.clear();
+ ceph_unmount(m_remote_mount);
+ ceph_release(m_remote_mount);
+ m_remote_mount = nullptr;
+ m_remote_cluster.reset();
+}
+
+void PeerReplayer::add_directory(string_view dir_root) {
+ dout(20) << ": dir_root=" << dir_root << dendl;
+
+ std::scoped_lock locker(m_lock);
+ m_directories.emplace_back(dir_root);
+ m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
+ m_cond.notify_all();
+}
+
+void PeerReplayer::remove_directory(string_view dir_root) {
+ dout(20) << ": dir_root=" << dir_root << dendl;
+ auto _dir_root = std::string(dir_root);
+
+ std::scoped_lock locker(m_lock);
+ auto it = std::find(m_directories.begin(), m_directories.end(), _dir_root);
+ if (it != m_directories.end()) {
+ m_directories.erase(it);
+ }
+
+ auto it1 = m_registered.find(_dir_root);
+ if (it1 == m_registered.end()) {
+ m_snap_sync_stats.erase(_dir_root);
+ } else {
+ it1->second.canceled = true;
+ }
+ m_cond.notify_all();
+}
+
+boost::optional<std::string> PeerReplayer::pick_directory() {
+ dout(20) << dendl;
+
+ auto now = clock::now();
+ auto retry_timo = g_ceph_context->_conf.get_val<uint64_t>(
+ "cephfs_mirror_retry_failed_directories_interval");
+
+ boost::optional<std::string> candidate;
+ for (auto &dir_root : m_directories) {
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ if (sync_stat.failed) {
+ std::chrono::duration<double> d = now - *sync_stat.last_failed;
+ if (d.count() < retry_timo) {
+ continue;
+ }
+ }
+ if (!m_registered.count(dir_root)) {
+ candidate = dir_root;
+ break;
+ }
+ }
+
+ std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end());
+ return candidate;
+}
+
+int PeerReplayer::register_directory(const std::string &dir_root,
+ SnapshotReplayerThread *replayer) {
+ dout(20) << ": dir_root=" << dir_root << dendl;
+ ceph_assert(m_registered.find(dir_root) == m_registered.end());
+
+ DirRegistry registry;
+ int r = try_lock_directory(dir_root, replayer, &registry);
+ if (r < 0) {
+ return r;
+ }
+
+ dout(5) << ": dir_root=" << dir_root << " registered with replayer="
+ << replayer << dendl;
+ m_registered.emplace(dir_root, std::move(registry));
+ return 0;
+}
+
+void PeerReplayer::unregister_directory(const std::string &dir_root) {
+ dout(20) << ": dir_root=" << dir_root << dendl;
+
+ auto it = m_registered.find(dir_root);
+ ceph_assert(it != m_registered.end());
+
+ unlock_directory(it->first, it->second);
+ m_registered.erase(it);
+ if (std::find(m_directories.begin(), m_directories.end(), dir_root) == m_directories.end()) {
+ m_snap_sync_stats.erase(dir_root);
+ }
+}
+
+int PeerReplayer::try_lock_directory(const std::string &dir_root,
+ SnapshotReplayerThread *replayer, DirRegistry *registry) {
+ dout(20) << ": dir_root=" << dir_root << dendl;
+
+ int r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0);
+ if (r < 0 && r != -ENOENT) {
+ derr << ": failed to open remote dir_root=" << dir_root << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ if (r == -ENOENT) {
+ // we snap under dir_root, so mode does not matter much
+ r = ceph_mkdirs(m_remote_mount, dir_root.c_str(), 0755);
+ if (r < 0) {
+ derr << ": failed to create remote directory=" << dir_root << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0);
+ if (r < 0) {
+ derr << ": failed to open remote dir_root=" << dir_root << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+ }
+
+ int fd = r;
+ r = ceph_flock(m_remote_mount, fd, LOCK_EX | LOCK_NB, (uint64_t)replayer->get_thread_id());
+ if (r != 0) {
+ if (r == -EWOULDBLOCK) {
+ dout(5) << ": dir_root=" << dir_root << " is locked by cephfs-mirror, "
+ << "will retry again" << dendl;
+ } else {
+ derr << ": failed to lock dir_root=" << dir_root << ": " << cpp_strerror(r)
+ << dendl;
+ }
+
+ if (ceph_close(m_remote_mount, fd) < 0) {
+ derr << ": failed to close (cleanup) remote dir_root=" << dir_root << ": "
+ << cpp_strerror(r) << dendl;
+ }
+ return r;
+ }
+
+ dout(10) << ": dir_root=" << dir_root << " locked" << dendl;
+
+ registry->fd = fd;
+ registry->replayer = replayer;
+ return 0;
+}
+
+void PeerReplayer::unlock_directory(const std::string &dir_root, const DirRegistry &registry) {
+ dout(20) << ": dir_root=" << dir_root << dendl;
+
+ int r = ceph_flock(m_remote_mount, registry.fd, LOCK_UN,
+ (uint64_t)registry.replayer->get_thread_id());
+ if (r < 0) {
+ derr << ": failed to unlock remote dir_root=" << dir_root << ": " << cpp_strerror(r)
+ << dendl;
+ return;
+ }
+
+ r = ceph_close(m_remote_mount, registry.fd);
+ if (r < 0) {
+ derr << ": failed to close remote dir_root=" << dir_root << ": " << cpp_strerror(r)
+ << dendl;
+ }
+
+ dout(10) << ": dir_root=" << dir_root << " unlocked" << dendl;
+}
+
+int PeerReplayer::build_snap_map(const std::string &dir_root,
+ std::map<uint64_t, std::string> *snap_map, bool is_remote) {
+ auto snap_dir = snapshot_dir_path(m_cct, dir_root);
+ dout(20) << ": dir_root=" << dir_root << ", snap_dir=" << snap_dir
+ << ", is_remote=" << is_remote << dendl;
+
+ auto lr_str = is_remote ? "remote" : "local";
+ auto mnt = is_remote ? m_remote_mount : m_local_mount;
+
+ ceph_dir_result *dirp = nullptr;
+ int r = ceph_opendir(mnt, snap_dir.c_str(), &dirp);
+ if (r < 0) {
+ if (is_remote && r == -ENOENT) {
+ return 0;
+ }
+ derr << ": failed to open " << lr_str << " snap directory=" << snap_dir
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ std::set<std::string> snaps;
+ auto entry = ceph_readdir(mnt, dirp);
+ while (entry != NULL) {
+ auto d_name = std::string(entry->d_name);
+ dout(20) << ": entry=" << d_name << dendl;
+ if (d_name != "." && d_name != ".." && d_name.rfind("_", 0) != 0) {
+ snaps.emplace(d_name);
+ }
+
+ entry = ceph_readdir(mnt, dirp);
+ }
+
+ int rv = 0;
+ for (auto &snap : snaps) {
+ snap_info info;
+ auto snap_path = snapshot_path(snap_dir, snap);
+ r = ceph_get_snap_info(mnt, snap_path.c_str(), &info);
+ if (r < 0) {
+ derr << ": failed to fetch " << lr_str << " snap info for snap_path=" << snap_path
+ << ": " << cpp_strerror(r) << dendl;
+ rv = r;
+ break;
+ }
+
+ uint64_t snap_id;
+ if (is_remote) {
+ if (!info.nr_snap_metadata) {
+ derr << ": snap_path=" << snap_path << " has invalid metadata in remote snapshot"
+ << dendl;
+ rv = -EINVAL;
+ } else {
+ auto metadata = decode_snap_metadata(info.snap_metadata, info.nr_snap_metadata);
+ dout(20) << ": snap_path=" << snap_path << ", metadata=" << metadata << dendl;
+ auto it = metadata.find(PRIMARY_SNAP_ID_KEY);
+ if (it == metadata.end()) {
+ derr << ": snap_path=" << snap_path << " has missing \"" << PRIMARY_SNAP_ID_KEY
+ << "\" in metadata" << dendl;
+ rv = -EINVAL;
+ } else {
+ snap_id = std::stoull(it->second);
+ }
+ ceph_free_snap_info_buffer(&info);
+ }
+ } else {
+ snap_id = info.id;
+ }
+
+ if (rv != 0) {
+ break;
+ }
+ snap_map->emplace(snap_id, snap);
+ }
+
+ r = ceph_closedir(mnt, dirp);
+ if (r < 0) {
+ derr << ": failed to close " << lr_str << " snap directory=" << snap_dir
+ << ": " << cpp_strerror(r) << dendl;
+ }
+
+ dout(10) << ": " << lr_str << " snap_map=" << *snap_map << dendl;
+ return rv;
+}
+
+int PeerReplayer::propagate_snap_deletes(const std::string &dir_root,
+ const std::set<std::string> &snaps) {
+ dout(5) << ": dir_root=" << dir_root << ", deleted snapshots=" << snaps << dendl;
+
+ for (auto &snap : snaps) {
+ dout(20) << ": deleting dir_root=" << dir_root << ", snapshot=" << snap
+ << dendl;
+ int r = ceph_rmsnap(m_remote_mount, dir_root.c_str(), snap.c_str());
+ if (r < 0) {
+ derr << ": failed to delete remote snap dir_root=" << dir_root
+ << ", snapshot=" << snaps << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ inc_deleted_snap(dir_root);
+ }
+
+ return 0;
+}
+
+int PeerReplayer::propagate_snap_renames(
+ const std::string &dir_root,
+ const std::set<std::pair<std::string,std::string>> &snaps) {
+ dout(10) << ": dir_root=" << dir_root << ", renamed snapshots=" << snaps << dendl;
+
+ for (auto &snapp : snaps) {
+ auto from = snapshot_path(m_cct, dir_root, snapp.first);
+ auto to = snapshot_path(m_cct, dir_root, snapp.second);
+ dout(20) << ": renaming dir_root=" << dir_root << ", snapshot from="
+ << from << ", to=" << to << dendl;
+ int r = ceph_rename(m_remote_mount, from.c_str(), to.c_str());
+ if (r < 0) {
+ derr << ": failed to rename remote snap dir_root=" << dir_root
+ << ", snapshot from =" << from << ", to=" << to << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+ inc_renamed_snap(dir_root);
+ }
+
+ return 0;
+}
+
+int PeerReplayer::remote_mkdir(const std::string &epath, const struct ceph_statx &stx,
+ const FHandles &fh) {
+ dout(10) << ": remote epath=" << epath << dendl;
+
+ int r = ceph_mkdirat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFDIR);
+ if (r < 0 && r != -EEXIST) {
+ derr << ": failed to create remote directory=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid,
+ AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFMT,
+ AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to chmod remote directory=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec},
+ {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}};
+ r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to change [am]time on remote directory=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+#define NR_IOVECS 8 // # iovecs
+#define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec
+int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string &epath,
+ const struct ceph_statx &stx, const FHandles &fh) {
+ dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl;
+ int l_fd;
+ int r_fd;
+ void *ptr;
+ struct iovec iov[NR_IOVECS];
+
+ int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0);
+ if (r < 0) {
+ derr << ": failed to open local file path=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ l_fd = r;
+ r = ceph_openat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(),
+ O_CREAT | O_TRUNC | O_WRONLY | O_NOFOLLOW, stx.stx_mode);
+ if (r < 0) {
+ derr << ": failed to create remote file path=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ goto close_local_fd;
+ }
+
+ r_fd = r;
+ ptr = malloc(NR_IOVECS * IOVEC_SIZE);
+ if (!ptr) {
+ r = -ENOMEM;
+ derr << ": failed to allocate memory" << dendl;
+ goto close_remote_fd;
+ }
+
+ while (true) {
+ if (should_backoff(dir_root, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+
+ for (int i = 0; i < NR_IOVECS; ++i) {
+ iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i;
+ iov[i].iov_len = IOVEC_SIZE;
+ }
+
+ r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1);
+ if (r < 0) {
+ derr << ": failed to read local file path=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ break;
+ }
+ if (r == 0) {
+ break;
+ }
+
+ int iovs = (int)(r / IOVEC_SIZE);
+ int t = r % IOVEC_SIZE;
+ if (t) {
+ iov[iovs].iov_len = t;
+ ++iovs;
+ }
+
+ r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1);
+ if (r < 0) {
+ derr << ": failed to write remote file path=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ break;
+ }
+ }
+
+ if (r == 0) {
+ r = ceph_fsync(m_remote_mount, r_fd, 0);
+ if (r < 0) {
+ derr << ": failed to sync data for file path=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ }
+ }
+
+ free(ptr);
+
+close_remote_fd:
+ if (ceph_close(m_remote_mount, r_fd) < 0) {
+ derr << ": failed to close remote fd path=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return -EINVAL;
+ }
+
+close_local_fd:
+ if (ceph_close(m_local_mount, l_fd) < 0) {
+ derr << ": failed to close local fd path=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return -EINVAL;
+ }
+
+ return r == 0 ? 0 : r;
+}
+
+int PeerReplayer::remote_file_op(const std::string &dir_root, const std::string &epath,
+ const struct ceph_statx &stx, const FHandles &fh,
+ bool need_data_sync, bool need_attr_sync) {
+ dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync
+ << ", need_attr_sync=" << need_attr_sync << dendl;
+
+ int r;
+ if (need_data_sync) {
+ if (S_ISREG(stx.stx_mode)) {
+ r = copy_to_remote(dir_root, epath, stx, fh);
+ if (r < 0) {
+ derr << ": failed to copy path=" << epath << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ } else if (S_ISLNK(stx.stx_mode)) {
+ // free the remote link before relinking
+ r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), 0);
+ if (r < 0 && r != -ENOENT) {
+ derr << ": failed to remove remote symlink=" << epath << dendl;
+ return r;
+ }
+ char *target = (char *)alloca(stx.stx_size+1);
+ r = ceph_readlinkat(m_local_mount, fh.c_fd, epath.c_str(), target, stx.stx_size);
+ if (r < 0) {
+ derr << ": failed to readlink local path=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ target[stx.stx_size] = '\0';
+ r = ceph_symlinkat(m_remote_mount, target, fh.r_fd_dir_root, epath.c_str());
+ if (r < 0 && r != EEXIST) {
+ derr << ": failed to symlink remote path=" << epath << " to target=" << target
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ } else {
+ dout(5) << ": skipping entry=" << epath << ": unsupported mode=" << stx.stx_mode
+ << dendl;
+ return 0;
+ }
+ }
+
+ if (need_attr_sync) {
+ r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid,
+ AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFMT,
+ AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to chmod remote directory=" << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec},
+ {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}};
+ r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to change [am]time on remote directory=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+
+ return 0;
+}
+
+int PeerReplayer::cleanup_remote_dir(const std::string &dir_root,
+ const std::string &epath, const FHandles &fh) {
+ dout(20) << ": dir_root=" << dir_root << ", epath=" << epath
+ << dendl;
+
+ struct ceph_statx tstx;
+ int r = ceph_statxat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), &tstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to stat remote directory=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ ceph_dir_result *tdirp;
+ r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW,
+ &tdirp);
+ if (r < 0) {
+ derr << ": failed to open remote directory=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ std::stack<SyncEntry> rm_stack;
+ rm_stack.emplace(SyncEntry(epath, tdirp, tstx));
+ while (!rm_stack.empty()) {
+ if (should_backoff(dir_root, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+
+ dout(20) << ": " << rm_stack.size() << " entries in stack" << dendl;
+ std::string e_name;
+ auto &entry = rm_stack.top();
+ dout(20) << ": top of stack path=" << entry.epath << dendl;
+ if (entry.is_directory()) {
+ struct ceph_statx stx;
+ struct dirent de;
+ while (true) {
+ r = ceph_readdirplus_r(m_remote_mount, entry.dirp, &de, &stx,
+ CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
+ if (r < 0) {
+ derr << ": failed to read remote directory=" << entry.epath << dendl;
+ break;
+ }
+ if (r == 0) {
+ break;
+ }
+
+ auto d_name = std::string(de.d_name);
+ if (d_name != "." && d_name != "..") {
+ e_name = d_name;
+ break;
+ }
+ }
+
+ if (r == 0) {
+ r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), AT_REMOVEDIR);
+ if (r < 0) {
+ derr << ": failed to remove remote directory=" << entry.epath << ": "
+ << cpp_strerror(r) << dendl;
+ break;
+ }
+
+ dout(10) << ": done for remote directory=" << entry.epath << dendl;
+ if (ceph_closedir(m_remote_mount, entry.dirp) < 0) {
+ derr << ": failed to close remote directory=" << entry.epath << dendl;
+ }
+ rm_stack.pop();
+ continue;
+ }
+ if (r < 0) {
+ break;
+ }
+
+ auto epath = entry_path(entry.epath, e_name);
+ if (S_ISDIR(stx.stx_mode)) {
+ ceph_dir_result *dirp;
+ r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW,
+ &dirp);
+ if (r < 0) {
+ derr << ": failed to open remote directory=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ break;
+ }
+ rm_stack.emplace(SyncEntry(epath, dirp, stx));
+ } else {
+ rm_stack.emplace(SyncEntry(epath, stx));
+ }
+ } else {
+ r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), 0);
+ if (r < 0) {
+ derr << ": failed to remove remote directory=" << entry.epath << ": "
+ << cpp_strerror(r) << dendl;
+ break;
+ }
+ dout(10) << ": done for remote file=" << entry.epath << dendl;
+ rm_stack.pop();
+ }
+ }
+
+ while (!rm_stack.empty()) {
+ auto &entry = rm_stack.top();
+ if (entry.is_directory()) {
+ dout(20) << ": closing remote directory=" << entry.epath << dendl;
+ if (ceph_closedir(m_remote_mount, entry.dirp) < 0) {
+ derr << ": failed to close remote directory=" << entry.epath << dendl;
+ }
+ }
+
+ rm_stack.pop();
+ }
+
+ return r;
+}
+
+int PeerReplayer::should_sync_entry(const std::string &epath, const struct ceph_statx &cstx,
+ const FHandles &fh, bool *need_data_sync, bool *need_attr_sync) {
+ dout(10) << ": epath=" << epath << dendl;
+
+ *need_data_sync = false;
+ *need_attr_sync = false;
+ struct ceph_statx pstx;
+ int r = ceph_statxat(fh.p_mnt, fh.p_fd, epath.c_str(), &pstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_CTIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0 && r != -ENOENT && r != -ENOTDIR) {
+ derr << ": failed to stat prev entry= " << epath << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ if (r < 0) {
+ // inode does not exist in prev snapshot or file type has changed
+ // (file was S_IFREG earlier, S_IFDIR now).
+ dout(5) << ": entry=" << epath << ", r=" << r << dendl;
+ *need_data_sync = true;
+ *need_attr_sync = true;
+ return 0;
+ }
+
+ dout(10) << ": local cur statx: mode=" << cstx.stx_mode << ", uid=" << cstx.stx_uid
+ << ", gid=" << cstx.stx_gid << ", size=" << cstx.stx_size << ", ctime="
+ << cstx.stx_ctime << ", mtime=" << cstx.stx_mtime << dendl;
+ dout(10) << ": local prev statx: mode=" << pstx.stx_mode << ", uid=" << pstx.stx_uid
+ << ", gid=" << pstx.stx_gid << ", size=" << pstx.stx_size << ", ctime="
+ << pstx.stx_ctime << ", mtime=" << pstx.stx_mtime << dendl;
+ if ((cstx.stx_mode & S_IFMT) != (pstx.stx_mode & S_IFMT)) {
+ dout(5) << ": entry=" << epath << " has mode mismatch" << dendl;
+ *need_data_sync = true;
+ *need_attr_sync = true;
+ } else {
+ *need_data_sync = (cstx.stx_size != pstx.stx_size) || (cstx.stx_mtime != pstx.stx_mtime);
+ *need_attr_sync = (cstx.stx_ctime != pstx.stx_ctime);
+ }
+
+ return 0;
+}
+
+int PeerReplayer::propagate_deleted_entries(const std::string &dir_root,
+ const std::string &epath, const FHandles &fh) {
+ dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl;
+
+ ceph_dir_result *dirp;
+ int r = opendirat(fh.p_mnt, fh.p_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
+ if (r < 0) {
+ if (r == -ELOOP) {
+ dout(5) << ": epath=" << epath << " is a symbolic link -- mode sync"
+ << " done when traversing parent" << dendl;
+ return 0;
+ }
+ if (r == -ENOTDIR) {
+ dout(5) << ": epath=" << epath << " is not a directory -- mode sync"
+ << " done when traversing parent" << dendl;
+ return 0;
+ }
+ if (r == -ENOENT) {
+ dout(5) << ": epath=" << epath << " missing in previous-snap/remote dir-root"
+ << dendl;
+ }
+ return r;
+ }
+
+ struct dirent *dire = (struct dirent *)alloca(512 * sizeof(struct dirent));
+ while (true) {
+ if (should_backoff(dir_root, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+
+ int len = ceph_getdents(fh.p_mnt, dirp, (char *)dire, 512);
+ if (len < 0) {
+ derr << ": failed to read directory entries: " << cpp_strerror(len) << dendl;
+ r = len;
+ // flip errno to signal that we got an err (possible the
+ // snapshot getting deleted in midst).
+ if (r == -ENOENT) {
+ r = -EINVAL;
+ }
+ break;
+ }
+ if (len == 0) {
+ dout(10) << ": reached EOD" << dendl;
+ break;
+ }
+ int nr = len / sizeof(struct dirent);
+ for (int i = 0; i < nr; ++i) {
+ if (should_backoff(dir_root, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+ std::string d_name = std::string(dire[i].d_name);
+ if (d_name == "." || d_name == "..") {
+ continue;
+ }
+
+ struct ceph_statx pstx;
+ auto dpath = entry_path(epath, d_name);
+ r = ceph_statxat(fh.p_mnt, fh.p_fd, dpath.c_str(), &pstx,
+ CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to stat (prev) directory=" << dpath << ": "
+ << cpp_strerror(r) << dendl;
+ // flip errno to signal that we got an err (possible the
+ // snapshot getting deleted in midst).
+ if (r == -ENOENT) {
+ r = -EINVAL;
+ }
+ return r;
+ }
+
+ struct ceph_statx cstx;
+ r = ceph_statxat(m_local_mount, fh.c_fd, dpath.c_str(), &cstx,
+ CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0 && r != -ENOENT) {
+ derr << ": failed to stat local (cur) directory=" << dpath << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ bool purge_remote = true;
+ if (r == 0) {
+ // directory entry present in both snapshots -- check inode
+ // type
+ if ((pstx.stx_mode & S_IFMT) == (cstx.stx_mode & S_IFMT)) {
+ dout(5) << ": mode matches for entry=" << d_name << dendl;
+ purge_remote = false;
+ } else {
+ dout(5) << ": mode mismatch for entry=" << d_name << dendl;
+ }
+ } else {
+ dout(5) << ": entry=" << d_name << " missing in current snapshot" << dendl;
+ }
+
+ if (purge_remote) {
+ dout(5) << ": purging remote entry=" << dpath << dendl;
+ if (S_ISDIR(pstx.stx_mode)) {
+ r = cleanup_remote_dir(dir_root, dpath, fh);
+ } else {
+ r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, dpath.c_str(), 0);
+ }
+
+ if (r < 0 && r != -ENOENT) {
+ derr << ": failed to cleanup remote entry=" << d_name << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+ }
+ }
+
+ ceph_closedir(fh.p_mnt, dirp);
+ return r;
+}
+
+int PeerReplayer::open_dir(MountRef mnt, const std::string &dir_path,
+ boost::optional<uint64_t> snap_id) {
+ dout(20) << ": dir_path=" << dir_path << dendl;
+ if (snap_id) {
+ dout(20) << ": expected snapshot id=" << *snap_id << dendl;
+ }
+
+ int fd = ceph_open(mnt, dir_path.c_str(), O_DIRECTORY | O_RDONLY, 0);
+ if (fd < 0) {
+ derr << ": cannot open dir_path=" << dir_path << ": " << cpp_strerror(fd)
+ << dendl;
+ return fd;
+ }
+
+ if (!snap_id) {
+ return fd;
+ }
+
+ snap_info info;
+ int r = ceph_get_snap_info(mnt, dir_path.c_str(), &info);
+ if (r < 0) {
+ derr << ": failed to fetch snap_info for path=" << dir_path
+ << ": " << cpp_strerror(r) << dendl;
+ ceph_close(mnt, fd);
+ return r;
+ }
+
+ if (info.id != *snap_id) {
+ dout(5) << ": got mismatching snapshot id for path=" << dir_path << " (" << info.id
+ << " vs " << *snap_id << ") -- possible recreate" << dendl;
+ ceph_close(mnt, fd);
+ return -EINVAL;
+ }
+
+ return fd;
+}
+
+int PeerReplayer::pre_sync_check_and_open_handles(
+ const std::string &dir_root,
+ const Snapshot &current, boost::optional<Snapshot> prev,
+ FHandles *fh) {
+ dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
+ if (prev) {
+ dout(20) << ": prev=" << prev << dendl;
+ }
+
+ auto cur_snap_path = snapshot_path(m_cct, dir_root, current.first);
+ auto fd = open_dir(m_local_mount, cur_snap_path, current.second);
+ if (fd < 0) {
+ return fd;
+ }
+
+ // current snapshot file descriptor
+ fh->c_fd = fd;
+
+ MountRef mnt;
+ if (prev) {
+ mnt = m_local_mount;
+ auto prev_snap_path = snapshot_path(m_cct, dir_root, (*prev).first);
+ fd = open_dir(mnt, prev_snap_path, (*prev).second);
+ } else {
+ mnt = m_remote_mount;
+ fd = open_dir(mnt, dir_root, boost::none);
+ }
+
+ if (fd < 0) {
+ if (!prev || fd != -ENOENT) {
+ ceph_close(m_local_mount, fh->c_fd);
+ return fd;
+ }
+
+ // ENOENT of previous snap
+ dout(5) << ": previous snapshot=" << *prev << " missing" << dendl;
+ mnt = m_remote_mount;
+ fd = open_dir(mnt, dir_root, boost::none);
+ if (fd < 0) {
+ ceph_close(m_local_mount, fh->c_fd);
+ return fd;
+ }
+ }
+
+ // "previous" snapshot or dir_root file descriptor
+ fh->p_fd = fd;
+ fh->p_mnt = mnt;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto it = m_registered.find(dir_root);
+ ceph_assert(it != m_registered.end());
+ fh->r_fd_dir_root = it->second.fd;
+ }
+
+ dout(5) << ": using " << ((fh->p_mnt == m_local_mount) ? "local (previous) snapshot" : "remote dir_root")
+ << " for incremental transfer" << dendl;
+ return 0;
+}
+
+// sync the mode of the remote dir_root with that of the local dir_root
+int PeerReplayer::sync_perms(const std::string& path) {
+ int r = 0;
+ struct ceph_statx tstx;
+
+ r = ceph_statx(m_local_mount, path.c_str(), &tstx, CEPH_STATX_MODE,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to fetch stat for local path: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+ r = ceph_chmod(m_remote_mount, path.c_str(), tstx.stx_mode);
+ if (r < 0) {
+ derr << ": failed to set mode for remote path: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+ return 0;
+}
+
+void PeerReplayer::post_sync_close_handles(const FHandles &fh) {
+ dout(20) << dendl;
+
+ // @FHandles.r_fd_dir_root is closed in @unregister_directory since
+ // its used to acquire an exclusive lock on remote dir_root.
+ ceph_close(m_local_mount, fh.c_fd);
+ ceph_close(fh.p_mnt, fh.p_fd);
+}
+
+int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &current,
+ boost::optional<Snapshot> prev) {
+ dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
+ if (prev) {
+ dout(20) << ": incremental sync check from prev=" << prev << dendl;
+ }
+
+ FHandles fh;
+ int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh);
+ if (r < 0) {
+ dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) {
+ post_sync_close_handles(fh);
+ };
+
+ // record that we are going to "dirty" the data under this
+ // directory root
+ auto snap_id_str{stringify(current.second)};
+ r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id",
+ snap_id_str.c_str(), snap_id_str.size(), 0);
+ if (r < 0) {
+ derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ struct ceph_statx tstx;
+ r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
+ if (r < 0) {
+ derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ ceph_dir_result *tdirp;
+ r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp);
+ if (r < 0) {
+ derr << ": failed to open local snap=" << current.first << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ std::stack<SyncEntry> sync_stack;
+ sync_stack.emplace(SyncEntry(".", tdirp, tstx));
+ while (!sync_stack.empty()) {
+ if (should_backoff(dir_root, &r)) {
+ dout(0) << ": backing off r=" << r << dendl;
+ break;
+ }
+
+ dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl;
+ std::string e_name;
+ auto &entry = sync_stack.top();
+ dout(20) << ": top of stack path=" << entry.epath << dendl;
+ if (entry.is_directory()) {
+ // entry is a directory -- propagate deletes for missing entries
+ // (and changed inode types) to the remote filesystem.
+ if (!entry.needs_remote_sync()) {
+ r = propagate_deleted_entries(dir_root, entry.epath, fh);
+ if (r < 0 && r != -ENOENT) {
+ derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl;
+ break;
+ }
+ entry.set_remote_synced();
+ }
+
+ struct ceph_statx stx;
+ struct dirent de;
+ while (true) {
+ r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx,
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
+ if (r < 0) {
+ derr << ": failed to local read directory=" << entry.epath << dendl;
+ break;
+ }
+ if (r == 0) {
+ break;
+ }
+
+ auto d_name = std::string(de.d_name);
+ if (d_name != "." && d_name != "..") {
+ e_name = d_name;
+ break;
+ }
+ }
+
+ if (r == 0) {
+ dout(10) << ": done for directory=" << entry.epath << dendl;
+ if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
+ derr << ": failed to close local directory=" << entry.epath << dendl;
+ }
+ sync_stack.pop();
+ continue;
+ }
+ if (r < 0) {
+ break;
+ }
+
+ auto epath = entry_path(entry.epath, e_name);
+ if (S_ISDIR(stx.stx_mode)) {
+ r = remote_mkdir(epath, stx, fh);
+ if (r < 0) {
+ break;
+ }
+ ceph_dir_result *dirp;
+ r = opendirat(m_local_mount, fh.c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp);
+ if (r < 0) {
+ derr << ": failed to open local directory=" << epath << ": "
+ << cpp_strerror(r) << dendl;
+ break;
+ }
+ sync_stack.emplace(SyncEntry(epath, dirp, stx));
+ } else {
+ sync_stack.emplace(SyncEntry(epath, stx));
+ }
+ } else {
+ bool need_data_sync = true;
+ bool need_attr_sync = true;
+ r = should_sync_entry(entry.epath, entry.stx, fh,
+ &need_data_sync, &need_attr_sync);
+ if (r < 0) {
+ break;
+ }
+
+ dout(5) << ": entry=" << entry.epath << ", data_sync=" << need_data_sync
+ << ", attr_sync=" << need_attr_sync << dendl;
+ if (need_data_sync || need_attr_sync) {
+ r = remote_file_op(dir_root, entry.epath, entry.stx, fh, need_data_sync,
+ need_attr_sync);
+ if (r < 0) {
+ break;
+ }
+ }
+ dout(10) << ": done for epath=" << entry.epath << dendl;
+ sync_stack.pop();
+ }
+ }
+
+ while (!sync_stack.empty()) {
+ auto &entry = sync_stack.top();
+ if (entry.is_directory()) {
+ dout(20) << ": closing local directory=" << entry.epath << dendl;
+ if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
+ derr << ": failed to close local directory=" << entry.epath << dendl;
+ }
+ }
+
+ sync_stack.pop();
+ }
+
+ return r;
+}
+
+int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &current,
+ boost::optional<Snapshot> prev) {
+ dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl;
+ if (prev) {
+ dout(20) << ": prev=" << prev << dendl;
+ }
+
+ int r = ceph_getxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", nullptr, 0);
+ if (r < 0 && r != -ENODATA) {
+ derr << ": failed to fetch primary_snap_id length from dir_root=" << dir_root
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ // no xattr, can't determine which snap the data belongs to!
+ if (r < 0) {
+ dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using"
+ << " incremental sync with remote scan" << dendl;
+ r = do_synchronize(dir_root, current, boost::none);
+ } else {
+ size_t xlen = r;
+ char *val = (char *)alloca(xlen+1);
+ r = ceph_getxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", (void*)val, xlen);
+ if (r < 0) {
+ derr << ": failed to fetch \"dirty_snap_id\" for dir_root: " << dir_root
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ val[xlen] = '\0';
+ uint64_t dirty_snap_id = atoll(val);
+
+ dout(20) << ": dirty_snap_id: " << dirty_snap_id << " vs (" << current.second
+ << "," << (prev ? stringify((*prev).second) : "~") << ")" << dendl;
+ if (prev && (dirty_snap_id == (*prev).second || dirty_snap_id == current.second)) {
+ dout(5) << ": match -- using incremental sync with local scan" << dendl;
+ r = do_synchronize(dir_root, current, prev);
+ } else {
+ dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl;
+ r = do_synchronize(dir_root, current, boost::none);
+ }
+ }
+
+ // snap sync failed -- bail out!
+ if (r < 0) {
+ return r;
+ }
+
+ auto cur_snap_id_str{stringify(current.second)};
+ snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), cur_snap_id_str.c_str()}};
+ r = ceph_mksnap(m_remote_mount, dir_root.c_str(), current.first.c_str(), 0755,
+ snap_meta, sizeof(snap_meta)/sizeof(snap_metadata));
+ if (r < 0) {
+ derr << ": failed to snap remote directory dir_root=" << dir_root
+ << ": " << cpp_strerror(r) << dendl;
+ }
+
+ return r;
+}
+
+int PeerReplayer::do_sync_snaps(const std::string &dir_root) {
+ dout(20) << ": dir_root=" << dir_root << dendl;
+
+ std::map<uint64_t, std::string> local_snap_map;
+ std::map<uint64_t, std::string> remote_snap_map;
+
+ int r = build_snap_map(dir_root, &local_snap_map);
+ if (r < 0) {
+ derr << ": failed to build local snap map" << dendl;
+ return r;
+ }
+
+ r = build_snap_map(dir_root, &remote_snap_map, true);
+ if (r < 0) {
+ derr << ": failed to build remote snap map" << dendl;
+ return r;
+ }
+
+ // infer deleted and renamed snapshots from local and remote
+ // snap maps
+ std::set<std::string> snaps_deleted;
+ std::set<std::pair<std::string,std::string>> snaps_renamed;
+ for (auto &[primary_snap_id, snap_name] : remote_snap_map) {
+ auto it = local_snap_map.find(primary_snap_id);
+ if (it == local_snap_map.end()) {
+ snaps_deleted.emplace(snap_name);
+ } else if (it->second != snap_name) {
+ snaps_renamed.emplace(std::make_pair(snap_name, it->second));
+ }
+ }
+
+ r = propagate_snap_deletes(dir_root, snaps_deleted);
+ if (r < 0) {
+ derr << ": failed to propgate deleted snapshots" << dendl;
+ return r;
+ }
+
+ r = propagate_snap_renames(dir_root, snaps_renamed);
+ if (r < 0) {
+ derr << ": failed to propgate renamed snapshots" << dendl;
+ return r;
+ }
+
+ // start mirroring snapshots from the last snap-id synchronized
+ uint64_t last_snap_id = 0;
+ std::string last_snap_name;
+ if (!remote_snap_map.empty()) {
+ auto last = remote_snap_map.rbegin();
+ last_snap_id = last->first;
+ last_snap_name = last->second;
+ set_last_synced_snap(dir_root, last_snap_id, last_snap_name);
+ }
+
+ dout(5) << ": last snap-id transferred=" << last_snap_id << dendl;
+ auto it = local_snap_map.upper_bound(last_snap_id);
+ if (it == local_snap_map.end()) {
+ dout(20) << ": nothing to synchronize" << dendl;
+ return 0;
+ }
+
+ auto snaps_per_cycle = g_ceph_context->_conf.get_val<uint64_t>(
+ "cephfs_mirror_max_snapshot_sync_per_cycle");
+
+ dout(10) << ": synchronizing from snap-id=" << it->first << dendl;
+ for (; it != local_snap_map.end(); ++it) {
+ set_current_syncing_snap(dir_root, it->first, it->second);
+ auto start = clock::now();
+ boost::optional<Snapshot> prev = boost::none;
+ if (last_snap_id != 0) {
+ prev = std::make_pair(last_snap_name, last_snap_id);
+ }
+ r = synchronize(dir_root, std::make_pair(it->second, it->first), prev);
+ if (r < 0) {
+ derr << ": failed to synchronize dir_root=" << dir_root
+ << ", snapshot=" << it->second << dendl;
+ clear_current_syncing_snap(dir_root);
+ return r;
+ }
+ std::chrono::duration<double> duration = clock::now() - start;
+ set_last_synced_stat(dir_root, it->first, it->second, duration.count());
+ if (--snaps_per_cycle == 0) {
+ break;
+ }
+
+ last_snap_name = it->second;
+ last_snap_id = it->first;
+ }
+
+ return 0;
+}
+
+void PeerReplayer::sync_snaps(const std::string &dir_root,
+ std::unique_lock<ceph::mutex> &locker) {
+ dout(20) << ": dir_root=" << dir_root << dendl;
+ locker.unlock();
+ int r = do_sync_snaps(dir_root);
+ if (r < 0) {
+ derr << ": failed to sync snapshots for dir_root=" << dir_root << dendl;
+ }
+ locker.lock();
+ if (r < 0) {
+ _inc_failed_count(dir_root);
+ } else {
+ _reset_failed_count(dir_root);
+ }
+}
+
+void PeerReplayer::run(SnapshotReplayerThread *replayer) {
+ dout(10) << ": snapshot replayer=" << replayer << dendl;
+
+ time last_directory_scan = clock::zero();
+ auto scan_interval = g_ceph_context->_conf.get_val<uint64_t>(
+ "cephfs_mirror_directory_scan_interval");
+
+ std::unique_lock locker(m_lock);
+ while (true) {
+ // do not check if client is blocklisted under lock
+ m_cond.wait_for(locker, 1s, [this]{return is_stopping();});
+ if (is_stopping()) {
+ dout(5) << ": exiting" << dendl;
+ break;
+ }
+
+ locker.unlock();
+
+ if (m_fs_mirror->is_blocklisted()) {
+ dout(5) << ": exiting as client is blocklisted" << dendl;
+ break;
+ }
+
+ locker.lock();
+
+ auto now = clock::now();
+ std::chrono::duration<double> timo = now - last_directory_scan;
+ if (timo.count() >= scan_interval && m_directories.size()) {
+ dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
+ auto dir_root = pick_directory();
+ if (dir_root) {
+ dout(5) << ": picked dir_root=" << *dir_root << dendl;
+ int r = register_directory(*dir_root, replayer);
+ if (r == 0) {
+ r = sync_perms(*dir_root);
+ if (r < 0) {
+ _inc_failed_count(*dir_root);
+ } else {
+ sync_snaps(*dir_root, locker);
+ }
+ unregister_directory(*dir_root);
+ }
+ }
+
+ last_directory_scan = now;
+ }
+ }
+}
+
+void PeerReplayer::peer_status(Formatter *f) {
+ std::scoped_lock locker(m_lock);
+ f->open_object_section("stats");
+ for (auto &[dir_root, sync_stat] : m_snap_sync_stats) {
+ f->open_object_section(dir_root);
+ if (sync_stat.failed) {
+ f->dump_string("state", "failed");
+ } else if (!sync_stat.current_syncing_snap) {
+ f->dump_string("state", "idle");
+ } else {
+ f->dump_string("state", "syncing");
+ f->open_object_section("current_sycning_snap");
+ f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first);
+ f->dump_string("name", (*sync_stat.current_syncing_snap).second);
+ f->close_section();
+ }
+ if (sync_stat.last_synced_snap) {
+ f->open_object_section("last_synced_snap");
+ f->dump_unsigned("id", (*sync_stat.last_synced_snap).first);
+ f->dump_string("name", (*sync_stat.last_synced_snap).second);
+ if (sync_stat.last_sync_duration) {
+ f->dump_float("sync_duration", *sync_stat.last_sync_duration);
+ f->dump_stream("sync_time_stamp") << sync_stat.last_synced;
+ }
+ f->close_section();
+ }
+ f->dump_unsigned("snaps_synced", sync_stat.synced_snap_count);
+ f->dump_unsigned("snaps_deleted", sync_stat.deleted_snap_count);
+ f->dump_unsigned("snaps_renamed", sync_stat.renamed_snap_count);
+ f->close_section(); // dir_root
+ }
+ f->close_section(); // stats
+}
+
+void PeerReplayer::reopen_logs() {
+ std::scoped_lock locker(m_lock);
+
+ if (m_remote_cluster) {
+ reinterpret_cast<CephContext *>(m_remote_cluster->cct())->reopen_logs();
+ }
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h
new file mode 100644
index 000000000..0511d154a
--- /dev/null
+++ b/src/tools/cephfs_mirror/PeerReplayer.h
@@ -0,0 +1,320 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_PEER_REPLAYER_H
+#define CEPHFS_MIRROR_PEER_REPLAYER_H
+
+#include "common/Formatter.h"
+#include "common/Thread.h"
+#include "mds/FSMap.h"
+#include "ServiceDaemon.h"
+#include "Types.h"
+
+namespace cephfs {
+namespace mirror {
+
+class FSMirror;
+class PeerReplayerAdminSocketHook;
+
+class PeerReplayer {
+public:
+ PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
+ RadosRef local_cluster, const Filesystem &filesystem,
+ const Peer &peer, const std::set<std::string, std::less<>> &directories,
+ MountRef mount, ServiceDaemon *service_daemon);
+ ~PeerReplayer();
+
+ // initialize replayer for a peer
+ int init();
+
+ // shutdown replayer for a peer
+ void shutdown();
+
+ // add a directory to mirror queue
+ void add_directory(std::string_view dir_root);
+
+ // remove a directory from queue
+ void remove_directory(std::string_view dir_root);
+
+ // admin socket helpers
+ void peer_status(Formatter *f);
+
+ // reopen logs
+ void reopen_logs();
+
+private:
+ inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id";
+
+ inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count";
+ inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count";
+
+ using Snapshot = std::pair<std::string, uint64_t>;
+
+ // file descriptor "triplet" for synchronizing a snapshot
+ // w/ an added MountRef for accessing "previous" snapshot.
+ struct FHandles {
+ // open file descriptor on the snap directory for snapshot
+ // currently being synchronized. Always use this fd with
+ // @m_local_mount.
+ int c_fd;
+
+ // open file descriptor on the "previous" snapshot or on
+ // dir_root on remote filesystem (based on if the snapshot
+ // can be used for incremental transfer). Always use this
+ // fd with p_mnt which either points to @m_local_mount (
+ // for local incremental comparison) or @m_remote_mount (
+ // for remote incremental comparison).
+ int p_fd;
+ MountRef p_mnt;
+
+ // open file descriptor on dir_root on remote filesystem.
+ // Always use this fd with @m_remote_mount.
+ int r_fd_dir_root;
+ };
+
+ bool is_stopping() {
+ return m_stopping;
+ }
+
+ struct Replayer;
+ class SnapshotReplayerThread : public Thread {
+ public:
+ SnapshotReplayerThread(PeerReplayer *peer_replayer)
+ : m_peer_replayer(peer_replayer) {
+ }
+
+ void *entry() override {
+ m_peer_replayer->run(this);
+ return 0;
+ }
+
+ private:
+ PeerReplayer *m_peer_replayer;
+ };
+
+ struct DirRegistry {
+ int fd;
+ bool canceled = false;
+ SnapshotReplayerThread *replayer;
+ };
+
+ struct SyncEntry {
+ std::string epath;
+ ceph_dir_result *dirp; // valid for directories
+ struct ceph_statx stx;
+ // set by incremental sync _after_ ensuring missing entries
+ // in the currently synced snapshot have been propagated to
+ // the remote filesystem.
+ bool remote_synced = false;
+
+ SyncEntry(std::string_view path,
+ const struct ceph_statx &stx)
+ : epath(path),
+ stx(stx) {
+ }
+ SyncEntry(std::string_view path,
+ ceph_dir_result *dirp,
+ const struct ceph_statx &stx)
+ : epath(path),
+ dirp(dirp),
+ stx(stx) {
+ }
+
+ bool is_directory() const {
+ return S_ISDIR(stx.stx_mode);
+ }
+
+ bool needs_remote_sync() const {
+ return remote_synced;
+ }
+ void set_remote_synced() {
+ remote_synced = true;
+ }
+ };
+
+ using clock = ceph::coarse_mono_clock;
+ using time = ceph::coarse_mono_time;
+
+ // stats sent to service daemon
+ struct ServiceDaemonStats {
+ uint64_t failed_dir_count = 0;
+ uint64_t recovered_dir_count = 0;
+ };
+
+ struct SnapSyncStat {
+ uint64_t nr_failures = 0; // number of consecutive failures
+ boost::optional<time> last_failed; // lat failed timestamp
+ bool failed = false; // hit upper cap for consecutive failures
+ boost::optional<std::pair<uint64_t, std::string>> last_synced_snap;
+ boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap;
+ uint64_t synced_snap_count = 0;
+ uint64_t deleted_snap_count = 0;
+ uint64_t renamed_snap_count = 0;
+ time last_synced = clock::zero();
+ boost::optional<double> last_sync_duration;
+ };
+
+ void _inc_failed_count(const std::string &dir_root) {
+ auto max_failures = g_ceph_context->_conf.get_val<uint64_t>(
+ "cephfs_mirror_max_consecutive_failures_per_directory");
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.last_failed = clock::now();
+ if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) {
+ sync_stat.failed = true;
+ ++m_service_daemon_stats.failed_dir_count;
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+ SERVICE_DAEMON_FAILED_DIR_COUNT_KEY,
+ m_service_daemon_stats.failed_dir_count);
+ }
+ }
+ void _reset_failed_count(const std::string &dir_root) {
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ if (sync_stat.failed) {
+ ++m_service_daemon_stats.recovered_dir_count;
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+ SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY,
+ m_service_daemon_stats.recovered_dir_count);
+ }
+ sync_stat.nr_failures = 0;
+ sync_stat.failed = false;
+ sync_stat.last_failed = boost::none;
+ }
+
+ void _set_last_synced_snap(const std::string &dir_root, uint64_t snap_id,
+ const std::string &snap_name) {
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name);
+ sync_stat.current_syncing_snap = boost::none;
+ }
+ void set_last_synced_snap(const std::string &dir_root, uint64_t snap_id,
+ const std::string &snap_name) {
+ std::scoped_lock locker(m_lock);
+ _set_last_synced_snap(dir_root, snap_id, snap_name);
+ }
+ void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id,
+ const std::string &snap_name) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name);
+ }
+ void clear_current_syncing_snap(const std::string &dir_root) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.current_syncing_snap = boost::none;
+ }
+ void inc_deleted_snap(const std::string &dir_root) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ ++sync_stat.deleted_snap_count;
+ }
+ void inc_renamed_snap(const std::string &dir_root) {
+ std::scoped_lock locker(m_lock);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ ++sync_stat.renamed_snap_count;
+ }
+ void set_last_synced_stat(const std::string &dir_root, uint64_t snap_id,
+ const std::string &snap_name, double duration) {
+ std::scoped_lock locker(m_lock);
+ _set_last_synced_snap(dir_root, snap_id, snap_name);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ sync_stat.last_synced = clock::now();
+ sync_stat.last_sync_duration = duration;
+ ++sync_stat.synced_snap_count;
+ }
+
+ bool should_backoff(const std::string &dir_root, int *retval) {
+ if (m_fs_mirror->is_blocklisted()) {
+ *retval = -EBLOCKLISTED;
+ return true;
+ }
+
+ std::scoped_lock locker(m_lock);
+ if (is_stopping()) {
+ // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use
+ // EINPROGRESS to identify shutdown.
+ *retval = -EINPROGRESS;
+ return true;
+ }
+ auto &dr = m_registered.at(dir_root);
+ if (dr.canceled) {
+ *retval = -ECANCELED;
+ return true;
+ }
+
+ *retval = 0;
+ return false;
+ }
+
+ typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
+
+ CephContext *m_cct;
+ FSMirror *m_fs_mirror;
+ RadosRef m_local_cluster;
+ Filesystem m_filesystem;
+ Peer m_peer;
+ // probably need to be encapsulated when supporting cancelations
+ std::map<std::string, DirRegistry> m_registered;
+ std::vector<std::string> m_directories;
+ std::map<std::string, SnapSyncStat> m_snap_sync_stats;
+ MountRef m_local_mount;
+ ServiceDaemon *m_service_daemon;
+ PeerReplayerAdminSocketHook *m_asok_hook = nullptr;
+
+ ceph::mutex m_lock;
+ ceph::condition_variable m_cond;
+ RadosRef m_remote_cluster;
+ MountRef m_remote_mount;
+ bool m_stopping = false;
+ SnapshotReplayers m_replayers;
+
+ ServiceDaemonStats m_service_daemon_stats;
+
+ void run(SnapshotReplayerThread *replayer);
+
+ boost::optional<std::string> pick_directory();
+ int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer);
+ void unregister_directory(const std::string &dir_root);
+ int try_lock_directory(const std::string &dir_root, SnapshotReplayerThread *replayer,
+ DirRegistry *registry);
+ void unlock_directory(const std::string &dir_root, const DirRegistry &registry);
+ void sync_snaps(const std::string &dir_root, std::unique_lock<ceph::mutex> &locker);
+
+
+ int build_snap_map(const std::string &dir_root, std::map<uint64_t, std::string> *snap_map,
+ bool is_remote=false);
+
+ int propagate_snap_deletes(const std::string &dir_root, const std::set<std::string> &snaps);
+ int propagate_snap_renames(const std::string &dir_root,
+ const std::set<std::pair<std::string,std::string>> &snaps);
+ int propagate_deleted_entries(const std::string &dir_root, const std::string &epath,
+ const FHandles &fh);
+ int cleanup_remote_dir(const std::string &dir_root, const std::string &epath,
+ const FHandles &fh);
+
+ int should_sync_entry(const std::string &epath, const struct ceph_statx &cstx,
+ const FHandles &fh, bool *need_data_sync, bool *need_attr_sync);
+
+ int open_dir(MountRef mnt, const std::string &dir_path, boost::optional<uint64_t> snap_id);
+ int pre_sync_check_and_open_handles(const std::string &dir_root, const Snapshot &current,
+ boost::optional<Snapshot> prev, FHandles *fh);
+ void post_sync_close_handles(const FHandles &fh);
+
+ int do_synchronize(const std::string &dir_root, const Snapshot &current,
+ boost::optional<Snapshot> prev);
+
+ int synchronize(const std::string &dir_root, const Snapshot &current,
+ boost::optional<Snapshot> prev);
+ int do_sync_snaps(const std::string &dir_root);
+
+ int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh);
+ int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
+ const FHandles &fh, bool need_data_sync, bool need_attr_sync);
+ int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
+ const FHandles &fh);
+ int sync_perms(const std::string& path);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_PEER_REPLAYER_H
diff --git a/src/tools/cephfs_mirror/ServiceDaemon.cc b/src/tools/cephfs_mirror/ServiceDaemon.cc
new file mode 100644
index 000000000..f66dd46bf
--- /dev/null
+++ b/src/tools/cephfs_mirror/ServiceDaemon.cc
@@ -0,0 +1,225 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "include/stringify.h"
+#include "ServiceDaemon.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::ServiceDaemon: " << this << " " \
+ << __func__
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+struct AttributeDumpVisitor : public boost::static_visitor<void> {
+ ceph::Formatter *f;
+ std::string name;
+
+ AttributeDumpVisitor(ceph::Formatter *f, std::string_view name)
+ : f(f), name(name) {
+ }
+
+ void operator()(bool val) const {
+ f->dump_bool(name.c_str(), val);
+ }
+ void operator()(uint64_t val) const {
+ f->dump_unsigned(name.c_str(), val);
+ }
+ void operator()(const std::string &val) const {
+ f->dump_string(name.c_str(), val);
+ }
+};
+
+} // anonymous namespace
+
+ServiceDaemon::ServiceDaemon(CephContext *cct, RadosRef rados)
+ : m_cct(cct),
+ m_rados(rados),
+ m_timer(new SafeTimer(cct, m_timer_lock, true)) {
+ m_timer->init();
+}
+
+ServiceDaemon::~ServiceDaemon() {
+ dout(10) << dendl;
+ {
+ std::scoped_lock timer_lock(m_timer_lock);
+ if (m_timer_ctx != nullptr) {
+ dout(5) << ": canceling timer task=" << m_timer_ctx << dendl;
+ m_timer->cancel_event(m_timer_ctx);
+ }
+ m_timer->shutdown();
+ }
+
+ delete m_timer;
+}
+
+int ServiceDaemon::init() {
+ dout(20) << dendl;
+
+ std::string id = m_cct->_conf->name.get_id();
+ if (id.find(CEPHFS_MIRROR_AUTH_ID_PREFIX) == 0) {
+ id = id.substr(CEPHFS_MIRROR_AUTH_ID_PREFIX.size());
+ }
+ std::string instance_id = stringify(m_rados->get_instance_id());
+
+ std::map<std::string, std::string> service_metadata = {{"id", id},
+ {"instance_id", instance_id}};
+ int r = m_rados->service_daemon_register("cephfs-mirror", instance_id,
+ service_metadata);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void ServiceDaemon::add_filesystem(fs_cluster_id_t fscid, std::string_view fs_name) {
+ dout(10) << ": fscid=" << fscid << ", fs_name=" << fs_name << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ m_filesystems.emplace(fscid, Filesystem(fs_name));
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::remove_filesystem(fs_cluster_id_t fscid) {
+ dout(10) << ": fscid=" << fscid << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ m_filesystems.erase(fscid);
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::add_peer(fs_cluster_id_t fscid, const Peer &peer) {
+ dout(10) << ": peer=" << peer << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto fs_it = m_filesystems.find(fscid);
+ if (fs_it == m_filesystems.end()) {
+ return;
+ }
+ fs_it->second.peer_attributes.emplace(peer, Attributes{});
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::remove_peer(fs_cluster_id_t fscid, const Peer &peer) {
+ dout(10) << ": peer=" << peer << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto fs_it = m_filesystems.find(fscid);
+ if (fs_it == m_filesystems.end()) {
+ return;
+ }
+ fs_it->second.peer_attributes.erase(peer);
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::add_or_update_fs_attribute(fs_cluster_id_t fscid, std::string_view key,
+ AttributeValue value) {
+ dout(10) << ": fscid=" << fscid << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto fs_it = m_filesystems.find(fscid);
+ if (fs_it == m_filesystems.end()) {
+ return;
+ }
+
+ fs_it->second.fs_attributes[std::string(key)] = value;
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::add_or_update_peer_attribute(fs_cluster_id_t fscid, const Peer &peer,
+ std::string_view key, AttributeValue value) {
+ dout(10) << ": fscid=" << fscid << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto fs_it = m_filesystems.find(fscid);
+ if (fs_it == m_filesystems.end()) {
+ return;
+ }
+
+ auto peer_it = fs_it->second.peer_attributes.find(peer);
+ if (peer_it == fs_it->second.peer_attributes.end()) {
+ return;
+ }
+
+ peer_it->second[std::string(key)] = value;
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::schedule_update_status() {
+ dout(10) << dendl;
+
+ std::scoped_lock timer_lock(m_timer_lock);
+ if (m_timer_ctx != nullptr) {
+ return;
+ }
+
+ m_timer_ctx = new LambdaContext([this] {
+ m_timer_ctx = nullptr;
+ update_status();
+ });
+ m_timer->add_event_after(1, m_timer_ctx);
+}
+
+void ServiceDaemon::update_status() {
+ dout(20) << ": " << m_filesystems.size() << " filesystem(s)" << dendl;
+
+ ceph::JSONFormatter f;
+ {
+ std::scoped_lock locker(m_lock);
+ f.open_object_section("filesystems");
+ for (auto &[fscid, filesystem] : m_filesystems) {
+ f.open_object_section(stringify(fscid).c_str());
+ f.dump_string("name", filesystem.fs_name);
+ for (auto &[attr_name, attr_value] : filesystem.fs_attributes) {
+ AttributeDumpVisitor visitor(&f, attr_name);
+ boost::apply_visitor(visitor, attr_value);
+ }
+ f.open_object_section("peers");
+ for (auto &[peer, attributes] : filesystem.peer_attributes) {
+ f.open_object_section(peer.uuid);
+ f.dump_object("remote", peer.remote);
+ f.open_object_section("stats");
+ for (auto &[attr_name, attr_value] : attributes) {
+ AttributeDumpVisitor visitor(&f, attr_name);
+ boost::apply_visitor(visitor, attr_value);
+ }
+ f.close_section(); // stats
+ f.close_section(); // peer.uuid
+ }
+ f.close_section(); // peers
+ f.close_section(); // fscid
+ }
+ f.close_section(); // filesystems
+ }
+
+ std::stringstream ss;
+ f.flush(ss);
+
+ int r = m_rados->service_daemon_update_status({{"status_json", ss.str()}});
+ if (r < 0) {
+ derr << ": failed to update service daemon status: " << cpp_strerror(r)
+ << dendl;
+ }
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/ServiceDaemon.h b/src/tools/cephfs_mirror/ServiceDaemon.h
new file mode 100644
index 000000000..83eee286d
--- /dev/null
+++ b/src/tools/cephfs_mirror/ServiceDaemon.h
@@ -0,0 +1,62 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_SERVICE_DAEMON_H
+#define CEPHFS_MIRROR_SERVICE_DAEMON_H
+
+#include "common/ceph_mutex.h"
+#include "common/Timer.h"
+#include "mds/FSMap.h"
+#include "Types.h"
+
+namespace cephfs {
+namespace mirror {
+
+class ServiceDaemon {
+public:
+ ServiceDaemon(CephContext *cct, RadosRef rados);
+ ~ServiceDaemon();
+
+ int init();
+
+ void add_filesystem(fs_cluster_id_t fscid, std::string_view fs_name);
+ void remove_filesystem(fs_cluster_id_t fscid);
+
+ void add_peer(fs_cluster_id_t fscid, const Peer &peer);
+ void remove_peer(fs_cluster_id_t fscid, const Peer &peer);
+
+ void add_or_update_fs_attribute(fs_cluster_id_t fscid, std::string_view key,
+ AttributeValue value);
+ void add_or_update_peer_attribute(fs_cluster_id_t fscid, const Peer &peer,
+ std::string_view key, AttributeValue value);
+
+private:
+ struct Filesystem {
+ std::string fs_name;
+ Attributes fs_attributes;
+ std::map<Peer, Attributes> peer_attributes;
+
+ Filesystem(std::string_view fs_name)
+ : fs_name(fs_name) {
+ }
+ };
+
+ const std::string CEPHFS_MIRROR_AUTH_ID_PREFIX = "cephfs-mirror.";
+
+ CephContext *m_cct;
+ RadosRef m_rados;
+ SafeTimer *m_timer;
+ ceph::mutex m_timer_lock = ceph::make_mutex("cephfs::mirror::ServiceDaemon");
+
+ ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::service_daemon");
+ Context *m_timer_ctx = nullptr;
+ std::map<fs_cluster_id_t, Filesystem> m_filesystems;
+
+ void schedule_update_status();
+ void update_status();
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_SERVICE_DAEMON_H
diff --git a/src/tools/cephfs_mirror/Types.cc b/src/tools/cephfs_mirror/Types.cc
new file mode 100644
index 000000000..0049f9d79
--- /dev/null
+++ b/src/tools/cephfs_mirror/Types.cc
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Types.h"
+
+namespace cephfs {
+namespace mirror {
+
+std::ostream& operator<<(std::ostream& out, const Filesystem &filesystem) {
+ out << "{fscid=" << filesystem.fscid << ", fs_name=" << filesystem.fs_name << "}";
+ return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const FilesystemSpec &spec) {
+ out << "{filesystem=" << spec.filesystem << ", pool_id=" << spec.pool_id << "}";
+ return out;
+}
+
+} // namespace mirror
+} // namespace cephfs
+
diff --git a/src/tools/cephfs_mirror/Types.h b/src/tools/cephfs_mirror/Types.h
new file mode 100644
index 000000000..016a8dc86
--- /dev/null
+++ b/src/tools/cephfs_mirror/Types.h
@@ -0,0 +1,87 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_TYPES_H
+#define CEPHFS_MIRROR_TYPES_H
+
+#include <set>
+#include <iostream>
+#include <string_view>
+
+#include "include/rados/librados.hpp"
+#include "include/cephfs/libcephfs.h"
+#include "mds/mdstypes.h"
+
+namespace cephfs {
+namespace mirror {
+
+static const std::string CEPHFS_MIRROR_OBJECT("cephfs_mirror");
+
+typedef boost::variant<bool, uint64_t, std::string> AttributeValue;
+typedef std::map<std::string, AttributeValue> Attributes;
+
+// distinct filesystem identifier
+struct Filesystem {
+ fs_cluster_id_t fscid;
+ std::string fs_name;
+
+ bool operator==(const Filesystem &rhs) const {
+ return (fscid == rhs.fscid &&
+ fs_name == rhs.fs_name);
+ }
+
+ bool operator!=(const Filesystem &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator<(const Filesystem &rhs) const {
+ if (fscid != rhs.fscid) {
+ return fscid < rhs.fscid;
+ }
+
+ return fs_name < rhs.fs_name;
+ }
+};
+
+// specification of a filesystem -- pool id the metadata pool id.
+struct FilesystemSpec {
+ FilesystemSpec() = default;
+ FilesystemSpec(const Filesystem &filesystem, uint64_t pool_id)
+ : filesystem(filesystem),
+ pool_id(pool_id) {
+ }
+ FilesystemSpec(fs_cluster_id_t fscid, std::string_view fs_name, uint64_t pool_id)
+ : filesystem(Filesystem{fscid, std::string(fs_name)}),
+ pool_id(pool_id) {
+ }
+
+ Filesystem filesystem;
+ uint64_t pool_id;
+
+ bool operator==(const FilesystemSpec &rhs) const {
+ return (filesystem == rhs.filesystem &&
+ pool_id == rhs.pool_id);
+ }
+
+ bool operator<(const FilesystemSpec &rhs) const {
+ if (filesystem != rhs.filesystem) {
+ return filesystem < rhs.filesystem;
+ }
+
+ return pool_id < rhs.pool_id;
+ }
+};
+
+std::ostream& operator<<(std::ostream& out, const Filesystem &filesystem);
+std::ostream& operator<<(std::ostream& out, const FilesystemSpec &spec);
+
+typedef std::shared_ptr<librados::Rados> RadosRef;
+typedef std::shared_ptr<librados::IoCtx> IoCtxRef;
+
+// not a shared_ptr since the type is incomplete
+typedef ceph_mount_info *MountRef;
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_TYPES_H
diff --git a/src/tools/cephfs_mirror/Utils.cc b/src/tools/cephfs_mirror/Utils.cc
new file mode 100644
index 000000000..1a8b8e0ac
--- /dev/null
+++ b/src/tools/cephfs_mirror/Utils.cc
@@ -0,0 +1,166 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_context.h"
+#include "common/common_init.h"
+#include "common/debug.h"
+#include "common/errno.h"
+
+#include "Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::Utils " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+int connect(std::string_view client_name, std::string_view cluster_name,
+ RadosRef *cluster, std::string_view mon_host, std::string_view cephx_key,
+ std::vector<const char *> args) {
+ dout(20) << ": connecting to cluster=" << cluster_name << ", client=" << client_name
+ << ", mon_host=" << mon_host << dendl;
+
+ CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
+ if (client_name.empty() || !iparams.name.from_str(client_name)) {
+ derr << ": error initializing cluster handle for " << cluster_name << dendl;
+ return -EINVAL;
+ }
+
+ CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+ if (mon_host.empty()) {
+ cct->_conf->cluster = cluster_name;
+ }
+
+ int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
+ if (r < 0 && r != -ENOENT) {
+ derr << ": could not read ceph conf: " << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ cct->_conf.parse_env(cct->get_module_type());
+
+ if (!args.empty()) {
+ r = cct->_conf.parse_argv(args);
+ if (r < 0) {
+ derr << ": could not parse command line args: " << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+ cct->_conf.parse_env(cct->get_module_type());
+
+ if (!mon_host.empty()) {
+ r = cct->_conf.set_val("mon_host", std::string(mon_host));
+ if (r < 0) {
+ derr << "failed to set mon_host config: " << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+ if (!cephx_key.empty()) {
+ r = cct->_conf.set_val("key", std::string(cephx_key));
+ if (r < 0) {
+ derr << "failed to set key config: " << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+
+ dout(10) << ": using mon addr=" << cct->_conf.get_val<std::string>("mon_host") << dendl;
+
+ cluster->reset(new librados::Rados());
+
+ r = (*cluster)->init_with_context(cct);
+ ceph_assert(r == 0);
+ cct->put();
+
+ r = (*cluster)->connect();
+ if (r < 0) {
+ derr << ": error connecting to " << cluster_name << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ dout(10) << ": connected to cluster=" << cluster_name << " using client="
+ << client_name << dendl;
+
+ return 0;
+}
+
+int mount(RadosRef cluster, const Filesystem &filesystem, bool cross_check_fscid,
+ MountRef *mount) {
+ dout(20) << ": filesystem=" << filesystem << dendl;
+
+ ceph_mount_info *cmi;
+ int r = ceph_create_with_context(&cmi, reinterpret_cast<CephContext*>(cluster->cct()));
+ if (r < 0) {
+ derr << ": mount error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = ceph_conf_set(cmi, "client_mount_uid", "0");
+ if (r < 0) {
+ derr << ": mount error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = ceph_conf_set(cmi, "client_mount_gid", "0");
+ if (r < 0) {
+ derr << ": mount error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ // mount timeout applies for local and remote mounts.
+ auto mount_timeout = g_ceph_context->_conf.get_val<std::chrono::seconds>
+ ("cephfs_mirror_mount_timeout").count();
+ r = ceph_set_mount_timeout(cmi, mount_timeout);
+ if (r < 0) {
+ derr << ": mount error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = ceph_init(cmi);
+ if (r < 0) {
+ derr << ": mount error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = ceph_select_filesystem(cmi, filesystem.fs_name.c_str());
+ if (r < 0) {
+ derr << ": mount error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = ceph_mount(cmi, NULL);
+ if (r < 0) {
+ derr << ": mount error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ auto fs_id = ceph_get_fs_cid(cmi);
+ if (cross_check_fscid && fs_id != filesystem.fscid) {
+ // this can happen in the most remotest possibility when a
+ // filesystem is deleted and recreated with the same name.
+ // since all this is driven asynchronously, we were able to
+ // mount the recreated filesystem. so bubble up the error.
+ // cleanup will eventually happen since a mirror disable event
+ // would have been queued.
+ derr << ": filesystem-id mismatch " << fs_id << " vs " << filesystem.fscid
+ << dendl;
+ // ignore errors, we are shutting down anyway.
+ ceph_unmount(cmi);
+ return -EINVAL;
+ }
+
+ dout(10) << ": mounted filesystem=" << filesystem << dendl;
+
+ *mount = cmi;
+ return 0;
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/Utils.h b/src/tools/cephfs_mirror/Utils.h
new file mode 100644
index 000000000..76b0c0726
--- /dev/null
+++ b/src/tools/cephfs_mirror/Utils.h
@@ -0,0 +1,22 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_UTILS_H
+#define CEPHFS_MIRROR_UTILS_H
+
+#include "Types.h"
+
+namespace cephfs {
+namespace mirror {
+
+int connect(std::string_view client_name, std::string_view cluster_name,
+ RadosRef *cluster, std::string_view mon_host={}, std::string_view cephx_key={},
+ std::vector<const char *> args={});
+
+int mount(RadosRef cluster, const Filesystem &filesystem, bool cross_check_fscid,
+ MountRef *mount);
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_UTILS_H
diff --git a/src/tools/cephfs_mirror/Watcher.cc b/src/tools/cephfs_mirror/Watcher.cc
new file mode 100644
index 000000000..1445fce5f
--- /dev/null
+++ b/src/tools/cephfs_mirror/Watcher.cc
@@ -0,0 +1,285 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "include/stringify.h"
+#include "aio_utils.h"
+#include "watcher/RewatchRequest.h"
+#include "Watcher.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::Watcher " << __func__
+
+using cephfs::mirror::watcher::RewatchRequest;
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+struct C_UnwatchAndFlush : public Context {
+ librados::Rados rados;
+ Context *on_finish;
+ bool flushing = false;
+ int ret_val = 0;
+
+ C_UnwatchAndFlush(librados::IoCtx &ioctx, Context *on_finish)
+ : rados(ioctx), on_finish(on_finish) {
+ }
+
+ void complete(int r) override {
+ if (ret_val == 0 && r < 0) {
+ ret_val = r;
+ }
+
+ if (!flushing) {
+ flushing = true;
+
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(
+ this, &rados_callback<Context, &Context::complete>);
+ r = rados.aio_watch_flush(aio_comp);
+
+ ceph_assert(r == 0);
+ aio_comp->release();
+ return;
+ }
+
+ // ensure our reference to the RadosClient is released prior
+ // to completing the callback to avoid racing an explicit
+ // librados shutdown
+ Context *ctx = on_finish;
+ r = ret_val;
+ delete this;
+
+ ctx->complete(r);
+ }
+
+ void finish(int r) override {
+ }
+};
+
+} // anonymous namespace
+
+Watcher::Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue)
+ : m_oid(oid),
+ m_ioctx(ioctx),
+ m_work_queue(work_queue),
+ m_lock(ceph::make_shared_mutex("cephfs::mirror::snap_watcher")),
+ m_state(STATE_IDLE),
+ m_watch_ctx(*this) {
+}
+
+Watcher::~Watcher() {
+}
+
+void Watcher::register_watch(Context *on_finish) {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ m_state = STATE_REGISTERING;
+
+ on_finish = new C_RegisterWatch(this, on_finish);
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(on_finish, &rados_callback<Context, &Context::complete>);
+ int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void Watcher::handle_register_watch(int r, Context *on_finish) {
+ dout(20) << ": r=" << r << dendl;
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_state == STATE_REGISTERING);
+
+ m_state = STATE_IDLE;
+ if (r < 0) {
+ derr << ": failed to register watch: " << cpp_strerror(r) << dendl;
+ m_watch_handle = 0;
+ }
+
+ if (m_unregister_watch_ctx != nullptr) {
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == 0 && m_watch_error) {
+ derr << ": re-registering after watch error" << dendl;
+ m_state = STATE_REGISTERING;
+ watch_error = true;
+ } else {
+ m_watch_blocklisted = (r == -EBLOCKLISTED);
+ }
+ }
+
+ on_finish->complete(r);
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ } else if (watch_error) {
+ rewatch();
+ }
+}
+
+void Watcher::unregister_watch(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ if (m_state != STATE_IDLE) {
+ dout(10) << ": delaying unregister -- watch register in progress" << dendl;
+ ceph_assert(m_unregister_watch_ctx == nullptr);
+ m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
+ unregister_watch(on_finish);
+ });
+ return;
+ } else if (is_registered()) {
+ // watch is registered -- unwatch
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(new C_UnwatchAndFlush(m_ioctx, on_finish),
+ &rados_callback<Context, &Context::complete>);
+ int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
+ ceph_assert(r == 0);
+ aio_comp->release();
+ m_watch_handle = 0;
+ m_watch_blocklisted = false;
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
+void Watcher::handle_error(uint64_t handle, int err) {
+ derr << ": handle=" << handle << ": " << cpp_strerror(err) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ m_watch_error = true;
+
+ if (is_registered()) {
+ m_state = STATE_REWATCHING;
+ if (err == -EBLOCKLISTED) {
+ m_watch_blocklisted = true;
+ }
+ m_work_queue->queue(new LambdaContext([this] {
+ rewatch();
+ }), 0);
+ }
+}
+
+void Watcher::rewatch() {
+ dout(20) << dendl;
+
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::unique_lock locker(m_lock);
+ ceph_assert(m_state == STATE_REWATCHING);
+
+ if (m_unregister_watch_ctx != nullptr) {
+ m_state = STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else {
+ m_watch_error = false;
+ Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch>(this);
+ auto req = RewatchRequest::create(m_ioctx, m_oid, m_lock,
+ &m_watch_ctx, &m_watch_handle, ctx);
+ req->send();
+ return;
+ }
+ }
+
+ unregister_watch_ctx->complete(0);
+}
+
+void Watcher::handle_rewatch(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_state == STATE_REWATCHING);
+
+ m_watch_blocklisted = false;
+ if (m_unregister_watch_ctx != nullptr) {
+ dout(10) << ": skipping rewatch -- unregistering" << dendl;
+ m_state = STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == -EBLOCKLISTED) {
+ m_watch_blocklisted = true;
+ derr << ": client blocklisted" << dendl;
+ } else if (r == -ENOENT) {
+ dout(5) << ": object " << m_oid << " does not exist" << dendl;
+ } else if (r < 0) {
+ derr << ": failed to rewatch: " << cpp_strerror(r) << dendl;
+ watch_error = true;
+ } else if (m_watch_error) {
+ derr << ": re-registering watch after error" << dendl;
+ watch_error = true;
+ }
+ }
+
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ return;
+ } else if (watch_error) {
+ rewatch();
+ return;
+ }
+
+ Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch_callback>(this);
+ m_work_queue->queue(ctx, r);
+}
+
+void Watcher::handle_rewatch_callback(int r) {
+ dout(10) << ": r=" << r << dendl;
+ handle_rewatch_complete(r);
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_state == STATE_REWATCHING);
+
+ if (m_unregister_watch_ctx != nullptr) {
+ m_state = STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == -EBLOCKLISTED || r == -ENOENT) {
+ m_state = STATE_IDLE;
+ } else if (r < 0 || m_watch_error) {
+ watch_error = true;
+ } else {
+ m_state = STATE_IDLE;
+ }
+ }
+
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ } else if (watch_error) {
+ rewatch();
+ }
+}
+
+void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl) {
+ m_ioctx.notify_ack(m_oid, notify_id, handle, bl);
+}
+
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) {
+ dout(20) << ": notify_id=" << notify_id << ", handle=" << handle
+ << ", notifier_id=" << notifier_id << dendl;
+ watcher.handle_notify(notify_id, handle, notifier_id, bl);
+}
+
+void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
+ dout(20) << dendl;
+ watcher.handle_error(handle, err);
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/Watcher.h b/src/tools/cephfs_mirror/Watcher.h
new file mode 100644
index 000000000..9e7c54eeb
--- /dev/null
+++ b/src/tools/cephfs_mirror/Watcher.h
@@ -0,0 +1,102 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_WATCHER_H
+#define CEPHFS_MIRROR_WATCHER_H
+
+#include <string_view>
+
+#include "common/ceph_mutex.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+// generic watcher class -- establish watch on a given rados object
+// and invoke handle_notify() when notified. On notify error, try
+// to re-establish the watch. Errors during rewatch are notified via
+// handle_rewatch_complete().
+
+class Watcher {
+public:
+ Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue);
+ virtual ~Watcher();
+
+ void register_watch(Context *on_finish);
+ void unregister_watch(Context *on_finish);
+
+protected:
+ std::string m_oid;
+
+ void acknowledge_notify(uint64_t notify_if, uint64_t handle, bufferlist &bl);
+
+ bool is_registered() const {
+ return m_state == STATE_IDLE && m_watch_handle != 0;
+ }
+ bool is_unregistered() const {
+ return m_state == STATE_IDLE && m_watch_handle == 0;
+ }
+
+ virtual void handle_rewatch_complete(int r) { }
+
+private:
+ enum State {
+ STATE_IDLE,
+ STATE_REGISTERING,
+ STATE_REWATCHING
+ };
+
+ struct WatchCtx : public librados::WatchCtx2 {
+ Watcher &watcher;
+
+ WatchCtx(Watcher &parent) : watcher(parent) {}
+
+ void handle_notify(uint64_t notify_id,
+ uint64_t handle,
+ uint64_t notifier_id,
+ bufferlist& bl) override;
+ void handle_error(uint64_t handle, int err) override;
+ };
+
+ struct C_RegisterWatch : public Context {
+ Watcher *watcher;
+ Context *on_finish;
+
+ C_RegisterWatch(Watcher *watcher, Context *on_finish)
+ : watcher(watcher),
+ on_finish(on_finish) {
+ }
+
+ void finish(int r) override {
+ watcher->handle_register_watch(r, on_finish);
+ }
+ };
+
+ librados::IoCtx &m_ioctx;
+ ContextWQ *m_work_queue;
+
+ mutable ceph::shared_mutex m_lock;
+ State m_state;
+ bool m_watch_error = false;
+ bool m_watch_blocklisted = false;
+ uint64_t m_watch_handle;
+ WatchCtx m_watch_ctx;
+ Context *m_unregister_watch_ctx = nullptr;
+
+ virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) = 0;
+ void handle_error(uint64_t handle, int err);
+
+ void rewatch();
+ void handle_rewatch(int r);
+ void handle_rewatch_callback(int r);
+ void handle_register_watch(int r, Context *on_finish);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_WATCHER_H
diff --git a/src/tools/cephfs_mirror/aio_utils.h b/src/tools/cephfs_mirror/aio_utils.h
new file mode 100644
index 000000000..43f356381
--- /dev/null
+++ b/src/tools/cephfs_mirror/aio_utils.h
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_AIO_UTILS_H
+#define CEPHFS_MIRROR_AIO_UTILS_H
+
+#include "include/rados/librados.hpp"
+
+namespace cephfs {
+namespace mirror {
+
+template <typename T, void(T::*MF)(int)>
+void rados_callback(rados_completion_t c, void *arg) {
+ T *obj = reinterpret_cast<T*>(arg);
+ int r = rados_aio_get_return_value(c);
+ (obj->*MF)(r);
+}
+
+template <typename T, void (T::*MF)(int)>
+class C_CallbackAdapter : public Context {
+ T *obj;
+public:
+ C_CallbackAdapter(T *obj)
+ : obj(obj) {
+ }
+
+protected:
+ void finish(int r) override {
+ (obj->*MF)(r);
+ }
+};
+
+template <typename WQ>
+struct C_AsyncCallback : public Context {
+ WQ *op_work_queue;
+ Context *on_finish;
+
+ C_AsyncCallback(WQ *op_work_queue, Context *on_finish)
+ : op_work_queue(op_work_queue), on_finish(on_finish) {
+ }
+ ~C_AsyncCallback() override {
+ delete on_finish;
+ }
+ void finish(int r) override {
+ op_work_queue->queue(on_finish, r);
+ on_finish = nullptr;
+ }
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_AIO_UTILS_H
diff --git a/src/tools/cephfs_mirror/main.cc b/src/tools/cephfs_mirror/main.cc
new file mode 100644
index 000000000..4abb895a0
--- /dev/null
+++ b/src/tools/cephfs_mirror/main.cc
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/async/context_pool.h"
+#include "common/Preforker.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "mon/MonClient.h"
+#include "msg/Messenger.h"
+#include "Mirror.h"
+
+#include <vector>
+
+using namespace std;
+
+void usage() {
+ std::cout << "usage: cephfs-mirror [options...]" << std::endl;
+ std::cout << "options:\n";
+ std::cout << " --mon-host monaddress[:port] connect to specified monitor\n";
+ std::cout << " --keyring=<path> path to keyring for local cluster\n";
+ std::cout << " --log-file=<logfile> file to log debug output\n";
+ std::cout << " --debug-cephfs-mirror=<log-level>/<memory-level> set cephfs-mirror debug level\n";
+ generic_server_usage();
+}
+
+cephfs::mirror::Mirror *mirror = nullptr;
+
+static void handle_signal(int signum) {
+ if (mirror) {
+ mirror->handle_signal(signum);
+ }
+}
+
+int main(int argc, const char **argv) {
+ auto args = argv_to_vec(argc, argv);
+ if (args.empty()) {
+ cerr << argv[0] << ": -h or --help for usage" << std::endl;
+ ::exit(1);
+ }
+
+ if (ceph_argparse_need_usage(args)) {
+ usage();
+ ::exit(0);
+ }
+
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+
+ Preforker forker;
+ if (global_init_prefork(g_ceph_context) >= 0) {
+ std::string err;
+ int r = forker.prefork(err);
+ if (r < 0) {
+ cerr << err << std::endl;
+ return r;
+ }
+ if (forker.is_parent()) {
+ g_ceph_context->_log->start();
+ if (forker.parent_wait(err) != 0) {
+ return -ENXIO;
+ }
+ return 0;
+ }
+ global_init_postfork_start(g_ceph_context);
+ }
+
+ common_init_finish(g_ceph_context);
+
+ bool daemonize = g_conf().get_val<bool>("daemonize");
+ if (daemonize) {
+ global_init_postfork_finish(g_ceph_context);
+ forker.daemonize();
+ }
+
+ init_async_signal_handler();
+ register_async_signal_handler(SIGHUP, handle_signal);
+ register_async_signal_handler_oneshot(SIGINT, handle_signal);
+ register_async_signal_handler_oneshot(SIGTERM, handle_signal);
+
+ auto cmd_args = argv_to_vec(argc, argv);
+
+ Messenger *msgr = Messenger::create_client_messenger(g_ceph_context, "client");
+ msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+
+ std::string reason;
+ ceph::async::io_context_pool ctxpool(1);
+ MonClient monc(MonClient(g_ceph_context, ctxpool));
+ int r = monc.build_initial_monmap();
+ if (r < 0) {
+ cerr << "failed to generate initial monmap" << std::endl;
+ goto cleanup_messenger;
+ }
+
+ msgr->start();
+
+ mirror = new cephfs::mirror::Mirror(g_ceph_context, cmd_args, &monc, msgr);
+ r = mirror->init(reason);
+ if (r < 0) {
+ std::cerr << "failed to initialize cephfs-mirror: " << reason << std::endl;
+ goto cleanup;
+ }
+
+ mirror->run();
+ delete mirror;
+
+cleanup:
+ monc.shutdown();
+cleanup_messenger:
+ msgr->shutdown();
+ msgr->wait();
+ delete msgr;
+
+ unregister_async_signal_handler(SIGHUP, handle_signal);
+ unregister_async_signal_handler(SIGINT, handle_signal);
+ unregister_async_signal_handler(SIGTERM, handle_signal);
+ shutdown_async_signal_handler();
+
+ return forker.signal_exit(r);
+}
diff --git a/src/tools/cephfs_mirror/watcher/RewatchRequest.cc b/src/tools/cephfs_mirror/watcher/RewatchRequest.cc
new file mode 100644
index 000000000..3070e6f8b
--- /dev/null
+++ b/src/tools/cephfs_mirror/watcher/RewatchRequest.cc
@@ -0,0 +1,102 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_mutex.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "include/Context.h"
+#include "tools/cephfs_mirror/aio_utils.h"
+#include "RewatchRequest.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::watcher:RewatchRequest " << __func__
+
+namespace cephfs {
+namespace mirror {
+namespace watcher {
+
+RewatchRequest::RewatchRequest(librados::IoCtx &ioctx, const std::string &oid,
+ ceph::shared_mutex &watch_lock,
+ librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish)
+ : m_ioctx(ioctx), m_oid(oid), m_lock(watch_lock),
+ m_watch_ctx(watch_ctx), m_watch_handle(watch_handle),
+ m_on_finish(on_finish) {
+}
+
+void RewatchRequest::send() {
+ unwatch();
+}
+
+void RewatchRequest::unwatch() {
+ ceph_assert(ceph_mutex_is_wlocked(m_lock));
+ if (*m_watch_handle == 0) {
+ rewatch();
+ return;
+ }
+
+ dout(10) << dendl;
+
+ uint64_t watch_handle = 0;
+ std::swap(*m_watch_handle, watch_handle);
+
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(
+ this, &rados_callback<RewatchRequest, &RewatchRequest::handle_unwatch>);
+ int r = m_ioctx.aio_unwatch(watch_handle, aio_comp);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void RewatchRequest::handle_unwatch(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ if (r == -EBLOCKLISTED) {
+ derr << ": client blocklisted" << dendl;
+ finish(r);
+ return;
+ } else if (r < 0) {
+ derr << ": failed to unwatch: " << cpp_strerror(r) << dendl;
+ }
+
+ rewatch();
+}
+
+void RewatchRequest::rewatch() {
+ dout(20) << dendl;
+
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(
+ this, &rados_callback<RewatchRequest, &RewatchRequest::handle_rewatch>);
+ int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void RewatchRequest::handle_rewatch(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ if (r < 0) {
+ derr << ": failed to watch object: " << cpp_strerror(r) << dendl;
+ m_rewatch_handle = 0;
+ }
+
+ {
+ std::unique_lock locker(m_lock);
+ *m_watch_handle = m_rewatch_handle;
+ }
+
+ finish(r);
+}
+
+void RewatchRequest::finish(int r) {
+ dout(20) << ": r=" << r << dendl;
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace watcher
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/watcher/RewatchRequest.h b/src/tools/cephfs_mirror/watcher/RewatchRequest.h
new file mode 100644
index 000000000..453fcb219
--- /dev/null
+++ b/src/tools/cephfs_mirror/watcher/RewatchRequest.h
@@ -0,0 +1,60 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H
+#define CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H
+
+#include "common/ceph_mutex.h"
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+
+struct Context;
+
+namespace cephfs {
+namespace mirror {
+namespace watcher {
+
+// Rewatch an existing watch -- the watch can be in an operatioal
+// or error state.
+
+class RewatchRequest {
+public:
+
+ static RewatchRequest *create(librados::IoCtx &ioctx, const std::string &oid,
+ ceph::shared_mutex &watch_lock,
+ librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish) {
+ return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle,
+ on_finish);
+ }
+
+ RewatchRequest(librados::IoCtx &ioctx, const std::string &oid,
+ ceph::shared_mutex &watch_lock, librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish);
+
+ void send();
+
+private:
+ librados::IoCtx& m_ioctx;
+ std::string m_oid;
+ ceph::shared_mutex &m_lock;
+ librados::WatchCtx2 *m_watch_ctx;
+ uint64_t *m_watch_handle;
+ Context *m_on_finish;
+
+ uint64_t m_rewatch_handle = 0;
+
+ void unwatch();
+ void handle_unwatch(int r);
+
+ void rewatch();
+ void handle_rewatch(int r);
+
+ void finish(int r);
+};
+
+} // namespace watcher
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H