summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd_mirror/ClusterWatcher.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/tools/rbd_mirror/ClusterWatcher.cc
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/rbd_mirror/ClusterWatcher.cc')
-rw-r--r--src/tools/rbd_mirror/ClusterWatcher.cc251
1 files changed, 251 insertions, 0 deletions
diff --git a/src/tools/rbd_mirror/ClusterWatcher.cc b/src/tools/rbd_mirror/ClusterWatcher.cc
new file mode 100644
index 000000000..2ae1306be
--- /dev/null
+++ b/src/tools/rbd_mirror/ClusterWatcher.cc
@@ -0,0 +1,251 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ClusterWatcher.h"
+#include "include/stringify.h"
+#include "common/ceph_json.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/internal.h"
+#include "librbd/api/Mirror.h"
+#include "tools/rbd_mirror/ServiceDaemon.h"
+#include "json_spirit/json_spirit.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::ClusterWatcher:" << this << " " \
+ << __func__ << ": "
+
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using librados::Rados;
+using librados::IoCtx;
+
+namespace rbd {
+namespace mirror {
+
+ClusterWatcher::ClusterWatcher(RadosRef cluster, ceph::mutex &lock,
+ ServiceDaemon<librbd::ImageCtx>* service_daemon)
+ : m_cluster(cluster), m_lock(lock), m_service_daemon(service_daemon)
+{
+}
+
+const ClusterWatcher::PoolPeers& ClusterWatcher::get_pool_peers() const
+{
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ return m_pool_peers;
+}
+
+std::string ClusterWatcher::get_site_name() const {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ return m_site_name;
+}
+
+void ClusterWatcher::refresh_pools()
+{
+ dout(20) << "enter" << dendl;
+
+ PoolPeers pool_peers;
+ read_pool_peers(&pool_peers);
+
+ std::string site_name;
+ int r = read_site_name(&site_name);
+
+ std::lock_guard l{m_lock};
+ m_pool_peers = pool_peers;
+
+ if (r >= 0) {
+ m_site_name = site_name;
+ }
+
+ // TODO: perhaps use a workqueue instead, once we get notifications
+ // about config changes for existing pools
+}
+
+void ClusterWatcher::read_pool_peers(PoolPeers *pool_peers)
+{
+ int r = m_cluster->wait_for_latest_osdmap();
+ if (r < 0) {
+ derr << "error waiting for OSD map: " << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ list<pair<int64_t, string> > pools;
+ r = m_cluster->pool_list2(pools);
+ if (r < 0) {
+ derr << "error listing pools: " << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ std::set<int64_t> service_pool_ids;
+ for (auto& kv : pools) {
+ int64_t pool_id = kv.first;
+ auto& pool_name = kv.second;
+ int64_t base_tier;
+ r = m_cluster->pool_get_base_tier(pool_id, &base_tier);
+ if (r == -ENOENT) {
+ dout(10) << "pool " << pool_name << " no longer exists" << dendl;
+ continue;
+ } else if (r < 0) {
+ derr << "Error retrieving base tier for pool " << pool_name << dendl;
+ continue;
+ }
+ if (pool_id != base_tier) {
+ // pool is a cache; skip it
+ continue;
+ }
+
+ IoCtx ioctx;
+ r = m_cluster->ioctx_create2(pool_id, ioctx);
+ if (r == -ENOENT) {
+ dout(10) << "pool " << pool_id << " no longer exists" << dendl;
+ continue;
+ } else if (r < 0) {
+ derr << "Error accessing pool " << pool_name << cpp_strerror(r) << dendl;
+ continue;
+ }
+
+ cls::rbd::MirrorMode mirror_mode_internal;
+ r = librbd::cls_client::mirror_mode_get(&ioctx, &mirror_mode_internal);
+ if (r == 0 && mirror_mode_internal == cls::rbd::MIRROR_MODE_DISABLED) {
+ dout(10) << "mirroring is disabled for pool " << pool_name << dendl;
+ continue;
+ }
+
+ service_pool_ids.insert(pool_id);
+ if (m_service_pools.find(pool_id) == m_service_pools.end()) {
+ m_service_pools[pool_id] = {};
+ m_service_daemon->add_pool(pool_id, pool_name);
+ }
+
+ if (r == -EPERM) {
+ dout(10) << "access denied querying pool " << pool_name << dendl;
+ m_service_pools[pool_id] = m_service_daemon->add_or_update_callout(
+ pool_id, m_service_pools[pool_id],
+ service_daemon::CALLOUT_LEVEL_WARNING, "access denied");
+ continue;
+ } else if (r < 0) {
+ derr << "could not tell whether mirroring was enabled for " << pool_name
+ << " : " << cpp_strerror(r) << dendl;
+ m_service_pools[pool_id] = m_service_daemon->add_or_update_callout(
+ pool_id, m_service_pools[pool_id],
+ service_daemon::CALLOUT_LEVEL_WARNING, "mirroring mode query failed");
+ continue;
+ }
+
+ vector<librbd::mirror_peer_site_t> configs;
+ r = librbd::api::Mirror<>::peer_site_list(ioctx, &configs);
+ if (r < 0) {
+ derr << "error reading mirroring config for pool " << pool_name
+ << cpp_strerror(r) << dendl;
+ m_service_pools[pool_id] = m_service_daemon->add_or_update_callout(
+ pool_id, m_service_pools[pool_id],
+ service_daemon::CALLOUT_LEVEL_ERROR, "mirroring peer list failed");
+ continue;
+ }
+
+ std::vector<PeerSpec> peers;
+ peers.reserve(configs.size());
+ for (auto& peer : configs) {
+ if (peer.direction != RBD_MIRROR_PEER_DIRECTION_TX) {
+ peers.push_back(peer);
+ }
+ }
+
+ for (auto& peer : peers) {
+ r = resolve_peer_site_config_keys(pool_id, pool_name, &peer);
+ if (r < 0) {
+ break;
+ }
+ }
+
+ if (m_service_pools[pool_id] != service_daemon::CALLOUT_ID_NONE) {
+ m_service_daemon->remove_callout(pool_id, m_service_pools[pool_id]);
+ m_service_pools[pool_id] = service_daemon::CALLOUT_ID_NONE;
+ }
+
+ pool_peers->emplace(pool_id, Peers{peers.begin(), peers.end()});
+ }
+
+ for (auto it = m_service_pools.begin(); it != m_service_pools.end(); ) {
+ auto current_it(it++);
+ if (service_pool_ids.find(current_it->first) == service_pool_ids.end()) {
+ m_service_daemon->remove_pool(current_it->first);
+ m_service_pools.erase(current_it->first);
+ }
+ }
+}
+
+int ClusterWatcher::read_site_name(std::string* site_name) {
+ dout(10) << dendl;
+
+ librbd::RBD rbd;
+ return rbd.mirror_site_name_get(*m_cluster, site_name);
+}
+
+int ClusterWatcher::resolve_peer_site_config_keys(int64_t pool_id,
+ const std::string& pool_name,
+ PeerSpec* peer) {
+ dout(10) << "retrieving config-key: pool_id=" << pool_id << ", "
+ << "pool_name=" << pool_name << ", "
+ << "peer_uuid=" << peer->uuid << dendl;
+
+ std::string cmd =
+ "{"
+ "\"prefix\": \"config-key get\", "
+ "\"key\": \"" RBD_MIRROR_PEER_CONFIG_KEY_PREFIX + stringify(pool_id) +
+ "/" + peer->uuid + "\""
+ "}";
+
+ bufferlist in_bl;
+ bufferlist out_bl;
+ int r = m_cluster->mon_command(cmd, in_bl, &out_bl, nullptr);
+ if (r == -ENOENT || out_bl.length() == 0) {
+ return 0;
+ } else if (r < 0) {
+ derr << "error reading mirroring peer config for pool " << pool_name << ": "
+ << cpp_strerror(r) << dendl;
+ m_service_pools[pool_id] = m_service_daemon->add_or_update_callout(
+ pool_id, m_service_pools[pool_id],
+ service_daemon::CALLOUT_LEVEL_WARNING,
+ "mirroring peer config-key query failed");
+ return r;
+ }
+
+ bool json_valid = false;
+ json_spirit::mValue json_root;
+ if(json_spirit::read(out_bl.to_str(), json_root)) {
+ try {
+ auto& json_obj = json_root.get_obj();
+ if (json_obj.count("mon_host")) {
+ peer->mon_host = json_obj["mon_host"].get_str();
+ }
+ if (json_obj.count("key")) {
+ peer->key = json_obj["key"].get_str();
+ }
+ json_valid = true;
+ } catch (std::runtime_error&) {
+ }
+ }
+
+ if (!json_valid) {
+ derr << "error parsing mirroring peer config for pool " << pool_name << ", "
+ << "peer " << peer->uuid << dendl;
+ m_service_pools[pool_id] = m_service_daemon->add_or_update_callout(
+ pool_id, m_service_pools[pool_id],
+ service_daemon::CALLOUT_LEVEL_WARNING,
+ "mirroring peer config-key decode failed");
+ }
+
+ return 0;
+}
+
+} // namespace mirror
+} // namespace rbd