summaryrefslogtreecommitdiffstats
path: root/src/librados/RadosClient.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/librados/RadosClient.cc')
-rw-r--r--src/librados/RadosClient.cc1180
1 files changed, 1180 insertions, 0 deletions
diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc
new file mode 100644
index 000000000..04ea14f31
--- /dev/null
+++ b/src/librados/RadosClient.cc
@@ -0,0 +1,1180 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <pthread.h>
+#include <errno.h>
+
+#include "common/ceph_context.h"
+#include "common/config.h"
+#include "common/common_init.h"
+#include "common/ceph_json.h"
+#include "common/errno.h"
+#include "common/ceph_json.h"
+#include "common/async/blocked_completion.h"
+#include "include/buffer.h"
+#include "include/stringify.h"
+#include "include/util.h"
+
+#include "msg/Messenger.h"
+
+// needed for static_cast
+#include "messages/MLog.h"
+
+#include "AioCompletionImpl.h"
+#include "IoCtxImpl.h"
+#include "PoolAsyncCompletionImpl.h"
+#include "RadosClient.h"
+
+#include "include/ceph_assert.h"
+#include "common/EventTrace.h"
+
+#define dout_subsys ceph_subsys_rados
+#undef dout_prefix
+#define dout_prefix *_dout << "librados: "
+
+namespace bc = boost::container;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+namespace cb = ceph::buffer;
+
+librados::RadosClient::RadosClient(CephContext *cct_)
+ : Dispatcher(cct_->get()),
+ cct_deleter{cct, [](CephContext *p) {p->put();}}
+{
+ auto& conf = cct->_conf;
+ conf.add_observer(this);
+ rados_mon_op_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+}
+
+int64_t librados::RadosClient::lookup_pool(const char *name)
+{
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
+ name);
+ if (-ENOENT == ret) {
+ // Make sure we have the latest map
+ int r = wait_for_latest_osdmap();
+ if (r < 0)
+ return r;
+ ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
+ name);
+ }
+
+ return ret;
+}
+
+bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
+{
+ bool requires;
+ int r = pool_requires_alignment2(pool_id, &requires);
+ if (r < 0) {
+ // Cast answer to false, this is a little bit problematic
+ // since we really don't know the answer yet, say.
+ return false;
+ }
+
+ return requires;
+}
+
+// a safer version of pool_requires_alignment
+int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
+ bool *requires)
+{
+ if (!requires)
+ return -EINVAL;
+
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ return objecter->with_osdmap([requires, pool_id](const OSDMap& o) {
+ if (!o.have_pg_pool(pool_id)) {
+ return -ENOENT;
+ }
+ *requires = o.get_pg_pool(pool_id)->requires_aligned_append();
+ return 0;
+ });
+}
+
+uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
+{
+ uint64_t alignment;
+ int r = pool_required_alignment2(pool_id, &alignment);
+ if (r < 0) {
+ return 0;
+ }
+
+ return alignment;
+}
+
+// a safer version of pool_required_alignment
+int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
+ uint64_t *alignment)
+{
+ if (!alignment)
+ return -EINVAL;
+
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
+ if (!o.have_pg_pool(pool_id)) {
+ return -ENOENT;
+ }
+ *alignment = o.get_pg_pool(pool_id)->required_alignment();
+ return 0;
+ });
+}
+
+int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s, bool wait_latest_map)
+{
+ int r = wait_for_osdmap();
+ if (r < 0)
+ return r;
+ retry:
+ objecter->with_osdmap([&](const OSDMap& o) {
+ if (!o.have_pg_pool(pool_id)) {
+ r = -ENOENT;
+ } else {
+ r = 0;
+ *s = o.get_pool_name(pool_id);
+ }
+ });
+ if (r == -ENOENT && wait_latest_map) {
+ r = wait_for_latest_osdmap();
+ if (r < 0)
+ return r;
+ wait_latest_map = false;
+ goto retry;
+ }
+
+ return r;
+}
+
+int librados::RadosClient::get_fsid(std::string *s)
+{
+ if (!s)
+ return -EINVAL;
+ std::lock_guard l(lock);
+ ostringstream oss;
+ oss << monclient.get_fsid();
+ *s = oss.str();
+ return 0;
+}
+
+int librados::RadosClient::ping_monitor(const string mon_id, string *result)
+{
+ int err = 0;
+ /* If we haven't yet connected, we have no way of telling whether we
+ * already built monc's initial monmap. IF we are in CONNECTED state,
+ * then it is safe to assume that we went through connect(), which does
+ * build a monmap.
+ */
+ if (state != CONNECTED) {
+ ldout(cct, 10) << __func__ << " build monmap" << dendl;
+ err = monclient.build_initial_monmap();
+ }
+ if (err < 0) {
+ return err;
+ }
+
+ err = monclient.ping_monitor(mon_id, result);
+ return err;
+}
+
+int librados::RadosClient::connect()
+{
+ int err;
+
+ // already connected?
+ if (state == CONNECTING)
+ return -EINPROGRESS;
+ if (state == CONNECTED)
+ return -EISCONN;
+ state = CONNECTING;
+
+ if (!cct->_log->is_started()) {
+ cct->_log->start();
+ }
+
+ {
+ MonClient mc_bootstrap(cct, poolctx);
+ err = mc_bootstrap.get_monmap_and_config();
+ if (err < 0)
+ return err;
+ }
+
+ common_init_finish(cct);
+
+ poolctx.start(cct->_conf.get_val<std::uint64_t>("librados_thread_count"));
+
+ // get monmap
+ err = monclient.build_initial_monmap();
+ if (err < 0)
+ goto out;
+
+ err = -ENOMEM;
+ messenger = Messenger::create_client_messenger(cct, "radosclient");
+ if (!messenger)
+ goto out;
+
+ // require OSDREPLYMUX feature. this means we will fail to talk to
+ // old servers. this is necessary because otherwise we won't know
+ // how to decompose the reply data into its constituent pieces.
+ messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
+
+ ldout(cct, 1) << "starting msgr at " << messenger->get_myaddrs() << dendl;
+
+ ldout(cct, 1) << "starting objecter" << dendl;
+
+ objecter = new (std::nothrow) Objecter(cct, messenger, &monclient, poolctx);
+ if (!objecter)
+ goto out;
+ objecter->set_balanced_budget();
+
+ monclient.set_messenger(messenger);
+ mgrclient.set_messenger(messenger);
+
+ objecter->init();
+ messenger->add_dispatcher_head(&mgrclient);
+ messenger->add_dispatcher_tail(objecter);
+ messenger->add_dispatcher_tail(this);
+
+ messenger->start();
+
+ ldout(cct, 1) << "setting wanted keys" << dendl;
+ monclient.set_want_keys(
+ CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
+ ldout(cct, 1) << "calling monclient init" << dendl;
+ err = monclient.init();
+ if (err) {
+ ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
+ shutdown();
+ goto out;
+ }
+
+ err = monclient.authenticate(std::chrono::duration<double>(conf.get_val<std::chrono::seconds>("client_mount_timeout")).count());
+ if (err) {
+ ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
+ shutdown();
+ goto out;
+ }
+ messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
+
+ // Detect older cluster, put mgrclient into compatible mode
+ mgrclient.set_mgr_optional(
+ !get_required_monitor_features().contains_all(
+ ceph::features::mon::FEATURE_LUMINOUS));
+
+ // MgrClient needs this (it doesn't have MonClient reference itself)
+ monclient.sub_want("mgrmap", 0, 0);
+ monclient.renew_subs();
+
+ if (service_daemon) {
+ ldout(cct, 10) << __func__ << " registering as " << service_name << "."
+ << daemon_name << dendl;
+ mgrclient.service_daemon_register(service_name, daemon_name,
+ daemon_metadata);
+ }
+ mgrclient.init();
+
+ objecter->set_client_incarnation(0);
+ objecter->start();
+ lock.lock();
+
+ state = CONNECTED;
+ instance_id = monclient.get_global_id();
+
+ lock.unlock();
+
+ ldout(cct, 1) << "init done" << dendl;
+ err = 0;
+
+ out:
+ if (err) {
+ state = DISCONNECTED;
+
+ if (objecter) {
+ delete objecter;
+ objecter = NULL;
+ }
+ if (messenger) {
+ delete messenger;
+ messenger = NULL;
+ }
+ }
+
+ return err;
+}
+
+void librados::RadosClient::shutdown()
+{
+ std::unique_lock l{lock};
+ if (state == DISCONNECTED) {
+ return;
+ }
+
+ bool need_objecter = false;
+ if (objecter && objecter->initialized) {
+ need_objecter = true;
+ }
+
+ if (state == CONNECTED) {
+ if (need_objecter) {
+ // make sure watch callbacks are flushed
+ watch_flush();
+ }
+ }
+ state = DISCONNECTED;
+ instance_id = 0;
+ l.unlock();
+ if (need_objecter) {
+ objecter->shutdown();
+ }
+ mgrclient.shutdown();
+
+ monclient.shutdown();
+ if (messenger) {
+ messenger->shutdown();
+ messenger->wait();
+ }
+ poolctx.stop();
+ ldout(cct, 1) << "shutdown" << dendl;
+}
+
+int librados::RadosClient::watch_flush()
+{
+ ldout(cct, 10) << __func__ << " enter" << dendl;
+ objecter->linger_callback_flush(ca::use_blocked);
+
+ ldout(cct, 10) << __func__ << " exit" << dendl;
+ return 0;
+}
+
+struct CB_aio_watch_flush_Complete {
+ librados::RadosClient *client;
+ librados::AioCompletionImpl *c;
+
+ CB_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
+ : client(_client), c(_c) {
+ c->get();
+ }
+
+ CB_aio_watch_flush_Complete(const CB_aio_watch_flush_Complete&) = delete;
+ CB_aio_watch_flush_Complete operator =(const CB_aio_watch_flush_Complete&) = delete;
+ CB_aio_watch_flush_Complete(CB_aio_watch_flush_Complete&& rhs) {
+ client = rhs.client;
+ c = rhs.c;
+ }
+ CB_aio_watch_flush_Complete& operator =(CB_aio_watch_flush_Complete&& rhs) {
+ client = rhs.client;
+ c = rhs.c;
+ return *this;
+ }
+
+ void operator()() {
+ c->lock.lock();
+ c->rval = 0;
+ c->complete = true;
+ c->cond.notify_all();
+
+ if (c->callback_complete ||
+ c->callback_safe) {
+ boost::asio::defer(client->finish_strand, librados::CB_AioComplete(c));
+ }
+ c->put_unlock();
+ }
+};
+
+int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
+{
+ ldout(cct, 10) << __func__ << " enter" << dendl;
+ objecter->linger_callback_flush(CB_aio_watch_flush_Complete(this, c));
+ ldout(cct, 10) << __func__ << " exit" << dendl;
+ return 0;
+}
+
+uint64_t librados::RadosClient::get_instance_id()
+{
+ return instance_id;
+}
+
+int librados::RadosClient::get_min_compatible_osd(int8_t* require_osd_release)
+{
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ objecter->with_osdmap(
+ [require_osd_release](const OSDMap& o) {
+ *require_osd_release = to_integer<int8_t>(o.require_osd_release);
+ });
+ return 0;
+}
+
+int librados::RadosClient::get_min_compatible_client(int8_t* min_compat_client,
+ int8_t* require_min_compat_client)
+{
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ objecter->with_osdmap(
+ [min_compat_client, require_min_compat_client](const OSDMap& o) {
+ *min_compat_client = to_integer<int8_t>(o.get_min_compat_client());
+ *require_min_compat_client =
+ to_integer<int8_t>(o.get_require_min_compat_client());
+ });
+ return 0;
+}
+
+librados::RadosClient::~RadosClient()
+{
+ cct->_conf.remove_observer(this);
+ if (messenger)
+ delete messenger;
+ if (objecter)
+ delete objecter;
+ cct = NULL;
+}
+
+int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
+{
+ int64_t poolid = lookup_pool(name);
+ if (poolid < 0) {
+ return (int)poolid;
+ }
+
+ *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
+ return 0;
+}
+
+int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
+{
+ std::string pool_name;
+ int r = pool_get_name(pool_id, &pool_name, true);
+ if (r < 0)
+ return r;
+ *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
+ return 0;
+}
+
+bool librados::RadosClient::ms_dispatch(Message *m)
+{
+ bool ret;
+
+ std::lock_guard l(lock);
+ if (state == DISCONNECTED) {
+ ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
+ m->put();
+ ret = true;
+ } else {
+ ret = _dispatch(m);
+ }
+ return ret;
+}
+
+void librados::RadosClient::ms_handle_connect(Connection *con)
+{
+}
+
+bool librados::RadosClient::ms_handle_reset(Connection *con)
+{
+ return false;
+}
+
+void librados::RadosClient::ms_handle_remote_reset(Connection *con)
+{
+}
+
+bool librados::RadosClient::ms_handle_refused(Connection *con)
+{
+ return false;
+}
+
+bool librados::RadosClient::_dispatch(Message *m)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ switch (m->get_type()) {
+ // OSD
+ case CEPH_MSG_OSD_MAP:
+ cond.notify_all();
+ m->put();
+ break;
+
+ case CEPH_MSG_MDS_MAP:
+ m->put();
+ break;
+
+ case MSG_LOG:
+ handle_log(static_cast<MLog *>(m));
+ break;
+
+ default:
+ return false;
+ }
+
+ return true;
+}
+
+
+int librados::RadosClient::wait_for_osdmap()
+{
+ ceph_assert(ceph_mutex_is_not_locked_by_me(lock));
+
+ if (state != CONNECTED) {
+ return -ENOTCONN;
+ }
+
+ bool need_map = false;
+ objecter->with_osdmap([&](const OSDMap& o) {
+ if (o.get_epoch() == 0) {
+ need_map = true;
+ }
+ });
+
+ if (need_map) {
+ std::unique_lock l(lock);
+
+ ceph::timespan timeout = rados_mon_op_timeout;
+ if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
+ ldout(cct, 10) << __func__ << " waiting" << dendl;
+ while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
+ if (timeout == timeout.zero()) {
+ cond.wait(l);
+ } else {
+ if (cond.wait_for(l, timeout) == std::cv_status::timeout) {
+ lderr(cct) << "timed out waiting for first osdmap from monitors"
+ << dendl;
+ return -ETIMEDOUT;
+ }
+ }
+ }
+ ldout(cct, 10) << __func__ << " done waiting" << dendl;
+ }
+ return 0;
+ } else {
+ return 0;
+ }
+}
+
+
+int librados::RadosClient::wait_for_latest_osdmap()
+{
+ bs::error_code ec;
+ objecter->wait_for_latest_osdmap(ca::use_blocked[ec]);
+ return ceph::from_error_code(ec);
+}
+
+int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
+{
+ int r = wait_for_osdmap();
+ if (r < 0)
+ return r;
+
+ objecter->with_osdmap([&](const OSDMap& o) {
+ for (auto p : o.get_pools())
+ v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
+ });
+ return 0;
+}
+
+int librados::RadosClient::get_pool_stats(std::list<string>& pools,
+ map<string,::pool_stat_t> *result,
+ bool *pper_pool)
+{
+ bs::error_code ec;
+
+ std::vector<std::string> v(pools.begin(), pools.end());
+
+ auto [res, per_pool] = objecter->get_pool_stats(v, ca::use_blocked[ec]);
+ if (ec)
+ return ceph::from_error_code(ec);
+
+ if (per_pool)
+ *pper_pool = per_pool;
+ if (result)
+ result->insert(res.begin(), res.end());
+
+ return 0;
+}
+
+bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
+ const std::string& pool)
+{
+ bool ret = false;
+ objecter->with_osdmap([&](const OSDMap& osdmap) {
+ int64_t poolid = osdmap.lookup_pg_pool_name(pool);
+ if (poolid >= 0)
+ ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
+ });
+ return ret;
+}
+
+int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
+{
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::get_fs_stats::mylock");
+ ceph::condition_variable cond;
+ bool done;
+ int ret = 0;
+ {
+ std::lock_guard l{mylock};
+ objecter->get_fs_stats(stats, boost::optional<int64_t> (),
+ new C_SafeCond(mylock, cond, &done, &ret));
+ }
+ {
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ }
+ return ret;
+}
+
+void librados::RadosClient::get() {
+ std::lock_guard l(lock);
+ ceph_assert(refcnt > 0);
+ refcnt++;
+}
+
+bool librados::RadosClient::put() {
+ std::lock_guard l(lock);
+ ceph_assert(refcnt > 0);
+ refcnt--;
+ return (refcnt == 0);
+}
+
+int librados::RadosClient::pool_create(string& name,
+ int16_t crush_rule)
+{
+ if (!name.length())
+ return -EINVAL;
+
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_create::mylock");
+ int reply;
+ ceph::condition_variable cond;
+ bool done;
+ Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
+ objecter->create_pool(name, onfinish, crush_rule);
+
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
+ return reply;
+}
+
+int librados::RadosClient::pool_create_async(string& name,
+ PoolAsyncCompletionImpl *c,
+ int16_t crush_rule)
+{
+ int r = wait_for_osdmap();
+ if (r < 0)
+ return r;
+
+ Context *onfinish = make_lambda_context(CB_PoolAsync_Safe(c));
+ objecter->create_pool(name, onfinish, crush_rule);
+ return r;
+}
+
+int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
+{
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ objecter->with_osdmap([&](const OSDMap& o) {
+ const pg_pool_t* pool = o.get_pg_pool(pool_id);
+ if (pool) {
+ if (pool->tier_of < 0) {
+ *base_tier = pool_id;
+ } else {
+ *base_tier = pool->tier_of;
+ }
+ r = 0;
+ } else {
+ r = -ENOENT;
+ }
+ });
+ return r;
+}
+
+int librados::RadosClient::pool_delete(const char *name)
+{
+ int r = wait_for_osdmap();
+ if (r < 0) {
+ return r;
+ }
+
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_delete::mylock");
+ ceph::condition_variable cond;
+ bool done;
+ int ret;
+ Context *onfinish = new C_SafeCond(mylock, cond, &done, &ret);
+ objecter->delete_pool(name, onfinish);
+
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ return ret;
+}
+
+int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
+{
+ int r = wait_for_osdmap();
+ if (r < 0)
+ return r;
+
+ Context *onfinish = make_lambda_context(CB_PoolAsync_Safe(c));
+ objecter->delete_pool(name, onfinish);
+ return r;
+}
+
+void librados::RadosClient::blocklist_self(bool set) {
+ std::lock_guard l(lock);
+ objecter->blocklist_self(set);
+}
+
+std::string librados::RadosClient::get_addrs() const {
+ CachedStackStringStream cos;
+ *cos << messenger->get_myaddrs();
+ return std::string(cos->strv());
+}
+
+int librados::RadosClient::blocklist_add(const string& client_address,
+ uint32_t expire_seconds)
+{
+ entity_addr_t addr;
+ if (!addr.parse(client_address.c_str(), 0)) {
+ lderr(cct) << "unable to parse address " << client_address << dendl;
+ return -EINVAL;
+ }
+
+ std::stringstream cmd;
+ cmd << "{"
+ << "\"prefix\": \"osd blocklist\", "
+ << "\"blocklistop\": \"add\", "
+ << "\"addr\": \"" << client_address << "\"";
+ if (expire_seconds != 0) {
+ cmd << ", \"expire\": " << expire_seconds << ".0";
+ }
+ cmd << "}";
+
+ std::vector<std::string> cmds;
+ cmds.push_back(cmd.str());
+ bufferlist inbl;
+ int r = mon_command(cmds, inbl, NULL, NULL);
+ if (r == -EINVAL) {
+ // try legacy blacklist command
+ std::stringstream cmd;
+ cmd << "{"
+ << "\"prefix\": \"osd blacklist\", "
+ << "\"blacklistop\": \"add\", "
+ << "\"addr\": \"" << client_address << "\"";
+ if (expire_seconds != 0) {
+ cmd << ", \"expire\": " << expire_seconds << ".0";
+ }
+ cmd << "}";
+ cmds.clear();
+ cmds.push_back(cmd.str());
+ r = mon_command(cmds, inbl, NULL, NULL);
+ }
+ if (r < 0) {
+ return r;
+ }
+
+ // ensure we have the latest osd map epoch before proceeding
+ r = wait_for_latest_osdmap();
+ return r;
+}
+
+int librados::RadosClient::mon_command(const vector<string>& cmd,
+ const bufferlist &inbl,
+ bufferlist *outbl, string *outs)
+{
+ C_SaferCond ctx;
+ mon_command_async(cmd, inbl, outbl, outs, &ctx);
+ return ctx.wait();
+}
+
+void librados::RadosClient::mon_command_async(const vector<string>& cmd,
+ const bufferlist &inbl,
+ bufferlist *outbl, string *outs,
+ Context *on_finish)
+{
+ std::lock_guard l{lock};
+ monclient.start_mon_command(cmd, inbl,
+ [outs, outbl,
+ on_finish = std::unique_ptr<Context>(on_finish)]
+ (bs::error_code e,
+ std::string&& s,
+ ceph::bufferlist&& b) mutable {
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(b);
+ if (on_finish)
+ on_finish.release()->complete(
+ ceph::from_error_code(e));
+ });
+}
+
+int librados::RadosClient::mgr_command(const vector<string>& cmd,
+ const bufferlist &inbl,
+ bufferlist *outbl, string *outs)
+{
+ std::lock_guard l(lock);
+
+ C_SaferCond cond;
+ int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
+ if (r < 0)
+ return r;
+
+ lock.unlock();
+ if (rados_mon_op_timeout.count() > 0) {
+ r = cond.wait_for(rados_mon_op_timeout);
+ } else {
+ r = cond.wait();
+ }
+ lock.lock();
+
+ return r;
+}
+
+int librados::RadosClient::mgr_command(
+ const string& name,
+ const vector<string>& cmd,
+ const bufferlist &inbl,
+ bufferlist *outbl, string *outs)
+{
+ std::lock_guard l(lock);
+
+ C_SaferCond cond;
+ int r = mgrclient.start_tell_command(name, cmd, inbl, outbl, outs, &cond);
+ if (r < 0)
+ return r;
+
+ lock.unlock();
+ if (rados_mon_op_timeout.count() > 0) {
+ r = cond.wait_for(rados_mon_op_timeout);
+ } else {
+ r = cond.wait();
+ }
+ lock.lock();
+
+ return r;
+}
+
+
+int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
+ const bufferlist &inbl,
+ bufferlist *outbl, string *outs)
+{
+ bs::error_code ec;
+ auto&& [s, bl] = monclient.start_mon_command(rank, cmd, inbl,
+ ca::use_blocked[ec]);
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(bl);
+
+ return ceph::from_error_code(ec);
+}
+
+int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
+ const bufferlist &inbl,
+ bufferlist *outbl, string *outs)
+{
+ bs::error_code ec;
+ auto&& [s, bl] = monclient.start_mon_command(name, cmd, inbl,
+ ca::use_blocked[ec]);
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(bl);
+
+ return ceph::from_error_code(ec);
+}
+
+int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
+ const bufferlist& inbl,
+ bufferlist *poutbl, string *prs)
+{
+ ceph_tid_t tid;
+
+ if (osd < 0)
+ return -EINVAL;
+
+
+ // XXX do anything with tid?
+ bs::error_code ec;
+ auto [s, bl] = objecter->osd_command(osd, std::move(cmd), cb::list(inbl),
+ &tid, ca::use_blocked[ec]);
+ if (poutbl)
+ *poutbl = std::move(bl);
+ if (prs)
+ *prs = std::move(s);
+ return ceph::from_error_code(ec);
+}
+
+int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
+ const bufferlist& inbl,
+ bufferlist *poutbl, string *prs)
+{
+ ceph_tid_t tid;
+ bs::error_code ec;
+ auto [s, bl] = objecter->pg_command(pgid, std::move(cmd), inbl, &tid,
+ ca::use_blocked[ec]);
+ if (poutbl)
+ *poutbl = std::move(bl);
+ if (prs)
+ *prs = std::move(s);
+ return ceph::from_error_code(ec);
+}
+
+int librados::RadosClient::monitor_log(const string& level,
+ rados_log_callback_t cb,
+ rados_log_callback2_t cb2,
+ void *arg)
+{
+ std::lock_guard l(lock);
+
+ if (state != CONNECTED) {
+ return -ENOTCONN;
+ }
+
+ if (cb == NULL && cb2 == NULL) {
+ // stop watch
+ ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
+ << " " << (void*)log_cb2 << dendl;
+ monclient.sub_unwant(log_watch);
+ log_watch.clear();
+ log_cb = NULL;
+ log_cb2 = NULL;
+ log_cb_arg = NULL;
+ return 0;
+ }
+
+ string watch_level;
+ if (level == "debug") {
+ watch_level = "log-debug";
+ } else if (level == "info") {
+ watch_level = "log-info";
+ } else if (level == "warn" || level == "warning") {
+ watch_level = "log-warn";
+ } else if (level == "err" || level == "error") {
+ watch_level = "log-error";
+ } else if (level == "sec") {
+ watch_level = "log-sec";
+ } else {
+ ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
+ return -EINVAL;
+ }
+
+ if (log_cb || log_cb2)
+ monclient.sub_unwant(log_watch);
+
+ // (re)start watch
+ ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
+ << " level " << level << dendl;
+ monclient.sub_want(watch_level, 0, 0);
+
+ monclient.renew_subs();
+ log_cb = cb;
+ log_cb2 = cb2;
+ log_cb_arg = arg;
+ log_watch = watch_level;
+ return 0;
+}
+
+void librados::RadosClient::handle_log(MLog *m)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 10) << __func__ << " version " << m->version << dendl;
+
+ if (log_last_version < m->version) {
+ log_last_version = m->version;
+
+ if (log_cb || log_cb2) {
+ for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
+ LogEntry e = *it;
+ ostringstream ss;
+ ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
+ string line = ss.str();
+ string who = stringify(e.rank) + " " + stringify(e.addrs);
+ string name = stringify(e.name);
+ string level = stringify(e.prio);
+ struct timespec stamp;
+ e.stamp.to_timespec(&stamp);
+
+ ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
+ if (log_cb)
+ log_cb(log_cb_arg, line.c_str(), who.c_str(),
+ stamp.tv_sec, stamp.tv_nsec,
+ e.seq, level.c_str(), e.msg.c_str());
+ if (log_cb2)
+ log_cb2(log_cb_arg, line.c_str(),
+ e.channel.c_str(),
+ who.c_str(), name.c_str(),
+ stamp.tv_sec, stamp.tv_nsec,
+ e.seq, level.c_str(), e.msg.c_str());
+ }
+ }
+
+ monclient.sub_got(log_watch, log_last_version);
+ }
+
+ m->put();
+}
+
+int librados::RadosClient::service_daemon_register(
+ const std::string& service, ///< service name (e.g., 'rgw')
+ const std::string& name, ///< daemon name (e.g., 'gwfoo')
+ const std::map<std::string,std::string>& metadata)
+{
+ if (service_daemon) {
+ return -EEXIST;
+ }
+ if (service == "osd" ||
+ service == "mds" ||
+ service == "client" ||
+ service == "mon" ||
+ service == "mgr") {
+ // normal ceph entity types are not allowed!
+ return -EINVAL;
+ }
+ if (service.empty() || name.empty()) {
+ return -EINVAL;
+ }
+
+ collect_sys_info(&daemon_metadata, cct);
+
+ ldout(cct,10) << __func__ << " " << service << "." << name << dendl;
+ service_daemon = true;
+ service_name = service;
+ daemon_name = name;
+ daemon_metadata.insert(metadata.begin(), metadata.end());
+
+ if (state == DISCONNECTED) {
+ return 0;
+ }
+ if (state == CONNECTING) {
+ return -EBUSY;
+ }
+ mgrclient.service_daemon_register(service_name, daemon_name,
+ daemon_metadata);
+ return 0;
+}
+
+int librados::RadosClient::service_daemon_update_status(
+ std::map<std::string,std::string>&& status)
+{
+ if (state != CONNECTED) {
+ return -ENOTCONN;
+ }
+ return mgrclient.service_daemon_update_status(std::move(status));
+}
+
+mon_feature_t librados::RadosClient::get_required_monitor_features() const
+{
+ return monclient.with_monmap([](const MonMap &monmap) {
+ return monmap.get_required_features(); } );
+}
+
+int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id,
+ std::vector<std::string>* pgs)
+{
+ vector<string> cmd = {
+ "{\"prefix\": \"pg ls\","
+ "\"pool\": " + std::to_string(pool_id) + ","
+ "\"states\": [\"inconsistent\"],"
+ "\"format\": \"json\"}"
+ };
+ bufferlist inbl, outbl;
+ string outstring;
+ if (auto ret = mgr_command(cmd, inbl, &outbl, &outstring); ret) {
+ return ret;
+ }
+ if (!outbl.length()) {
+ // no pg returned
+ return 0;
+ }
+ JSONParser parser;
+ if (!parser.parse(outbl.c_str(), outbl.length())) {
+ return -EINVAL;
+ }
+ vector<string> v;
+ if (!parser.is_array()) {
+ JSONObj *pgstat_obj = parser.find_obj("pg_stats");
+ if (!pgstat_obj)
+ return 0;
+ auto s = pgstat_obj->get_data();
+ JSONParser pg_stats;
+ if (!pg_stats.parse(s.c_str(), s.length())) {
+ return -EINVAL;
+ }
+ v = pg_stats.get_array_elements();
+ } else {
+ v = parser.get_array_elements();
+ }
+ for (auto i : v) {
+ JSONParser pg_json;
+ if (!pg_json.parse(i.c_str(), i.length())) {
+ return -EINVAL;
+ }
+ string pgid;
+ JSONDecoder::decode_json("pgid", pgid, &pg_json);
+ pgs->emplace_back(std::move(pgid));
+ }
+ return 0;
+}
+
+const char** librados::RadosClient::get_tracked_conf_keys() const
+{
+ static const char *config_keys[] = {
+ "librados_thread_count",
+ "rados_mon_op_timeout",
+ nullptr
+ };
+ return config_keys;
+}
+
+void librados::RadosClient::handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed)
+{
+ if (changed.count("librados_thread_count")) {
+ poolctx.stop();
+ poolctx.start(conf.get_val<std::uint64_t>("librados_thread_count"));
+ }
+ if (changed.count("rados_mon_op_timeout")) {
+ rados_mon_op_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+ }
+}