summaryrefslogtreecommitdiffstats
path: root/src/tools/cephfs_mirror/ClusterWatcher.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/tools/cephfs_mirror/ClusterWatcher.cc182
1 files changed, 182 insertions, 0 deletions
diff --git a/src/tools/cephfs_mirror/ClusterWatcher.cc b/src/tools/cephfs_mirror/ClusterWatcher.cc
new file mode 100644
index 000000000..b5f6f81d7
--- /dev/null
+++ b/src/tools/cephfs_mirror/ClusterWatcher.cc
@@ -0,0 +1,182 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <mutex>
+#include <vector>
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "mon/MonClient.h"
+
+#include "ClusterWatcher.h"
+#include "ServiceDaemon.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::ClusterWatcher " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon,
+ Listener &listener)
+ : Dispatcher(cct),
+ m_monc(monc),
+ m_service_daemon(service_daemon),
+ m_listener(listener) {
+}
+
+ClusterWatcher::~ClusterWatcher() {
+}
+
+bool ClusterWatcher::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
+ return m->get_type() == CEPH_MSG_FS_MAP;
+}
+
+void ClusterWatcher::ms_fast_dispatch2(const ref_t<Message> &m) {
+ bool handled = ms_dispatch2(m);
+ ceph_assert(handled);
+}
+
+bool ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) {
+ if (m->get_type() == CEPH_MSG_FS_MAP) {
+ if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ handle_fsmap(ref_cast<MFSMap>(m));
+ }
+ return true;
+ }
+
+ return false;
+}
+
+int ClusterWatcher::init() {
+ dout(20) << dendl;
+
+ bool sub = m_monc->sub_want("fsmap", 0, 0);
+ if (!sub) {
+ derr << ": failed subscribing to FSMap" << dendl;
+ return -1;
+ }
+
+ m_monc->renew_subs();
+ dout(10) << ": subscribed to FSMap" << dendl;
+ return 0;
+}
+
+void ClusterWatcher::shutdown() {
+ dout(20) << dendl;
+ std::scoped_lock locker(m_lock);
+ m_stopping = true;
+ m_monc->sub_unwant("fsmap");
+}
+
+void ClusterWatcher::handle_fsmap(const cref_t<MFSMap> &m) {
+ dout(20) << dendl;
+
+ auto fsmap = m->get_fsmap();
+ auto filesystems = fsmap.get_filesystems();
+
+ std::vector<Filesystem> mirroring_enabled;
+ std::vector<Filesystem> mirroring_disabled;
+ std::map<Filesystem, Peers> peers_added;
+ std::map<Filesystem, Peers> peers_removed;
+ std::map<Filesystem, uint64_t> fs_metadata_pools;
+ {
+ std::scoped_lock locker(m_lock);
+ if (m_stopping) {
+ return;
+ }
+
+ // deleted filesystems are considered mirroring disabled
+ for (auto it = m_filesystem_peers.begin(); it != m_filesystem_peers.end();) {
+ if (!fsmap.filesystem_exists(it->first.fscid)) {
+ mirroring_disabled.emplace_back(it->first);
+ it = m_filesystem_peers.erase(it);
+ continue;
+ }
+ ++it;
+ }
+
+ for (auto &filesystem : filesystems) {
+ auto fs = Filesystem{filesystem->fscid,
+ std::string(filesystem->mds_map.get_fs_name())};
+ auto pool_id = filesystem->mds_map.get_metadata_pool();
+ auto &mirror_info = filesystem->mirror_info;
+
+ if (!mirror_info.is_mirrored()) {
+ auto it = m_filesystem_peers.find(fs);
+ if (it != m_filesystem_peers.end()) {
+ mirroring_disabled.emplace_back(fs);
+ m_filesystem_peers.erase(it);
+ }
+ } else {
+ auto [fspeersit, enabled] = m_filesystem_peers.emplace(fs, Peers{});
+ auto &peers = fspeersit->second;
+
+ if (enabled) {
+ mirroring_enabled.emplace_back(fs);
+ fs_metadata_pools.emplace(fs, pool_id);
+ }
+
+ // peers added
+ Peers added;
+ std::set_difference(mirror_info.peers.begin(), mirror_info.peers.end(),
+ peers.begin(), peers.end(), std::inserter(added, added.end()));
+
+ // peers removed
+ Peers removed;
+ std::set_difference(peers.begin(), peers.end(),
+ mirror_info.peers.begin(), mirror_info.peers.end(),
+ std::inserter(removed, removed.end()));
+
+ // update set
+ if (!added.empty()) {
+ peers_added.emplace(fs, added);
+ peers.insert(added.begin(), added.end());
+ }
+ if (!removed.empty()) {
+ peers_removed.emplace(fs, removed);
+ for (auto &p : removed) {
+ peers.erase(p);
+ }
+ }
+ }
+ }
+ }
+
+ dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled="
+ << mirroring_disabled << dendl;
+ for (auto &fs : mirroring_enabled) {
+ m_service_daemon->add_filesystem(fs.fscid, fs.fs_name);
+ m_listener.handle_mirroring_enabled(FilesystemSpec(fs, fs_metadata_pools.at(fs)));
+ }
+ for (auto &fs : mirroring_disabled) {
+ m_service_daemon->remove_filesystem(fs.fscid);
+ m_listener.handle_mirroring_disabled(fs);
+ }
+
+ dout(5) << ": peers added=" << peers_added << ", peers removed=" << peers_removed << dendl;
+
+ for (auto &[fs, peers] : peers_added) {
+ for (auto &peer : peers) {
+ m_service_daemon->add_peer(fs.fscid, peer);
+ m_listener.handle_peers_added(fs, peer);
+ }
+ }
+ for (auto &[fs, peers] : peers_removed) {
+ for (auto &peer : peers) {
+ m_service_daemon->remove_peer(fs.fscid, peer);
+ m_listener.handle_peers_removed(fs, peer);
+ }
+ }
+
+ std::scoped_lock locker(m_lock);
+ if (!m_stopping) {
+ m_monc->sub_got("fsmap", fsmap.get_epoch());
+ } // else we have already done a sub_unwant()
+}
+
+} // namespace mirror
+} // namespace cephfs