summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/ops_executer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/ops_executer.cc')
-rw-r--r--src/crimson/osd/ops_executer.cc980
1 files changed, 980 insertions, 0 deletions
diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc
new file mode 100644
index 000000000..6b6614e93
--- /dev/null
+++ b/src/crimson/osd/ops_executer.cc
@@ -0,0 +1,980 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ops_executer.h"
+
+#include <boost/range/adaptor/filtered.hpp>
+#include <boost/range/adaptor/map.hpp>
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/range/algorithm_ext/push_back.hpp>
+#include <boost/range/algorithm/max_element.hpp>
+#include <boost/range/numeric.hpp>
+
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include <seastar/core/thread.hh>
+
+#include "crimson/osd/exceptions.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/watch.h"
+#include "osd/ClassHandler.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
+{
+ std::string cname, mname;
+ ceph::bufferlist indata;
+ try {
+ auto bp = std::begin(osd_op.indata);
+ bp.copy(osd_op.op.cls.class_len, cname);
+ bp.copy(osd_op.op.cls.method_len, mname);
+ bp.copy(osd_op.op.cls.indata_len, indata);
+ } catch (buffer::error&) {
+ logger().warn("call unable to decode class + method + indata");
+ return crimson::ct_error::invarg::make();
+ }
+
+ // NOTE: opening a class can actually result in dlopen(), and thus
+ // blocking the entire reactor. Thankfully to ClassHandler's cache
+ // this is supposed to be extremely infrequent.
+ ClassHandler::ClassData* cls;
+ int r = ClassHandler::get_instance().open_class(cname, &cls);
+ if (r) {
+ logger().warn("class {} open got {}", cname, cpp_strerror(r));
+ if (r == -ENOENT) {
+ return crimson::ct_error::operation_not_supported::make();
+ } else if (r == -EPERM) {
+ // propagate permission errors
+ return crimson::ct_error::permission_denied::make();
+ }
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ ClassHandler::ClassMethod* method = cls->get_method(mname);
+ if (!method) {
+ logger().warn("call method {}.{} does not exist", cname, mname);
+ return crimson::ct_error::operation_not_supported::make();
+ }
+
+ const auto flags = method->get_flags();
+ if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) {
+ return crimson::ct_error::enoent::make();
+ }
+
+#if 0
+ if (flags & CLS_METHOD_WR) {
+ ctx->user_modify = true;
+ }
+#endif
+
+ logger().debug("calling method {}.{}, num_read={}, num_write={}",
+ cname, mname, num_read, num_write);
+ const auto prev_rd = num_read;
+ const auto prev_wr = num_write;
+ return seastar::async(
+ [this, method, indata=std::move(indata)]() mutable {
+ ceph::bufferlist outdata;
+ auto cls_context = reinterpret_cast<cls_method_context_t>(this);
+ const auto ret = method->exec(cls_context, indata, outdata);
+ return std::make_pair(ret, std::move(outdata));
+ }
+ ).then(
+ [this, prev_rd, prev_wr, &osd_op, flags]
+ (auto outcome) -> call_errorator::future<> {
+ auto& [ret, outdata] = outcome;
+ osd_op.rval = ret;
+
+ logger().debug("do_op_call: method returned ret={}, outdata.length()={}"
+ " while num_read={}, num_write={}",
+ ret, outdata.length(), num_read, num_write);
+ if (num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
+ logger().error("method tried to read object but is not marked RD");
+ osd_op.rval = -EIO;
+ return crimson::ct_error::input_output_error::make();
+ }
+ if (num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
+ logger().error("method tried to update object but is not marked WR");
+ osd_op.rval = -EIO;
+ return crimson::ct_error::input_output_error::make();
+ }
+ // ceph-osd has this implemented in `PrimaryLogPG::execute_ctx`,
+ // grep for `ignore_out_data`.
+ using crimson::common::local_conf;
+ if (op_info.allows_returnvec() &&
+ op_info.may_write() &&
+ ret >= 0 &&
+ outdata.length() > local_conf()->osd_max_write_op_reply_len) {
+ // the justification of this limit it to not inflate the pg log.
+ // that's the reason why we don't worry about pure reads.
+ logger().error("outdata overflow due to .length()={}, limit={}",
+ outdata.length(),
+ local_conf()->osd_max_write_op_reply_len);
+ osd_op.rval = -EOVERFLOW;
+ return crimson::ct_error::value_too_large::make();
+ }
+ // for write calls we never return data expect errors or RETURNVEC.
+ // please refer cls/cls_hello.cc to details.
+ if (!op_info.may_write() || op_info.allows_returnvec() || ret < 0) {
+ osd_op.op.extent.length = outdata.length();
+ osd_op.outdata.claim_append(outdata);
+ }
+ if (ret < 0) {
+ return crimson::stateful_ec{
+ std::error_code(-ret, std::generic_category()) };
+ } else {
+ return seastar::now();
+ }
+ }
+ );
+}
+
+static watch_info_t create_watch_info(const OSDOp& osd_op,
+ const MOSDOp& msg)
+{
+ using crimson::common::local_conf;
+ const uint32_t timeout =
+ osd_op.op.watch.timeout == 0 ? local_conf()->osd_client_watch_timeout
+ : osd_op.op.watch.timeout;
+ return {
+ osd_op.op.watch.cookie,
+ timeout,
+ msg.get_connection()->get_peer_addr()
+ };
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ struct connect_ctx_t {
+ ObjectContext::watch_key_t key;
+ crimson::net::ConnectionRef conn;
+ watch_info_t info;
+
+ connect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg)
+ : key(osd_op.op.watch.cookie, msg.get_reqid().name),
+ conn(msg.get_connection()),
+ info(create_watch_info(osd_op, msg)) {
+ }
+ };
+ return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
+ [&] (auto& ctx) {
+ const auto& entity = ctx.key.second;
+ auto [it, emplaced] =
+ os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
+ if (emplaced) {
+ logger().info("registered new watch {} by {}", it->second, entity);
+ txn.nop();
+ } else {
+ logger().info("found existing watch {} by {}", it->second, entity);
+ }
+ return seastar::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc) {
+ auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
+ if (emplaced) {
+ const auto& [cookie, entity] = ctx.key;
+ it->second = crimson::osd::Watch::create(obc, ctx.info, entity);
+ logger().info("op_effect: added new watcher: {}", ctx.key);
+ } else {
+ logger().info("op_effect: found existing watcher: {}", ctx.key);
+ }
+ return it->second->connect(std::move(ctx.conn), true /* will_ping */);
+ });
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ const entity_name_t& entity = get_message().get_reqid().name;
+ const auto& cookie = osd_op.op.watch.cookie;
+ if (!os.oi.watchers.count(std::make_pair(cookie, entity))) {
+ return crimson::ct_error::not_connected::make();
+ } else {
+ logger().info("found existing watch by {}", entity);
+ return do_op_watch_subop_watch(osd_op, os, txn);
+ }
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ logger().info("{}", __func__);
+
+ struct disconnect_ctx_t {
+ ObjectContext::watch_key_t key;
+ bool send_disconnect{ false };
+
+ disconnect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg)
+ : key(osd_op.op.watch.cookie, msg.get_reqid().name) {
+ }
+ };
+ return with_effect_on_obc(disconnect_ctx_t{ osd_op, get_message() },
+ [&] (auto& ctx) {
+ const auto& entity = ctx.key.second;
+ if (auto nh = os.oi.watchers.extract(ctx.key); !nh.empty()) {
+ logger().info("removed watch {} by {}", nh.mapped(), entity);
+ txn.nop();
+ } else {
+ logger().info("can't remove: no watch by {}", entity);
+ }
+ return seastar::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc) {
+ if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) {
+ return seastar::do_with(std::move(nh.mapped()),
+ [ctx](auto&& watcher) {
+ logger().info("op_effect: disconnect watcher {}", ctx.key);
+ return watcher->remove(ctx.send_disconnect);
+ });
+ } else {
+ logger().info("op_effect: disconnect failed to find watcher {}", ctx.key);
+ return seastar::now();
+ }
+ });
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_ping(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ const entity_name_t& entity = get_message().get_reqid().name;
+ const auto& cookie = osd_op.op.watch.cookie;
+ const auto key = std::make_pair(cookie, entity);
+
+ // Note: WATCH with PING doesn't cause may_write() to return true,
+ // so if there is nothing else in the transaction, this is going
+ // to run do_osd_op_effects, but not write out a log entry */
+ if (!os.oi.watchers.count(key)) {
+ return crimson::ct_error::not_connected::make();
+ }
+ auto it = obc->watchers.find(key);
+ if (it == std::end(obc->watchers) || !it->second->is_connected()) {
+ return crimson::ct_error::timed_out::make();
+ }
+ logger().info("found existing watch by {}", entity);
+ it->second->got_ping(ceph_clock_now());
+ return seastar::now();
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ logger().debug("{}", __func__);
+ if (!os.exists) {
+ return crimson::ct_error::enoent::make();
+ }
+ switch (osd_op.op.watch.op) {
+ case CEPH_OSD_WATCH_OP_WATCH:
+ return do_op_watch_subop_watch(osd_op, os, txn);
+ case CEPH_OSD_WATCH_OP_RECONNECT:
+ return do_op_watch_subop_reconnect(osd_op, os, txn);
+ case CEPH_OSD_WATCH_OP_PING:
+ return do_op_watch_subop_ping(osd_op, os, txn);
+ case CEPH_OSD_WATCH_OP_UNWATCH:
+ return do_op_watch_subop_unwatch(osd_op, os, txn);
+ case CEPH_OSD_WATCH_OP_LEGACY_WATCH:
+ logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH");
+ return crimson::ct_error::invarg::make();
+ }
+ logger().warn("unrecognized WATCH subop: {}", osd_op.op.watch.op);
+ return crimson::ct_error::invarg::make();
+}
+
+static uint64_t get_next_notify_id(epoch_t e)
+{
+ // FIXME
+ static std::uint64_t next_notify_id = 0;
+ return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++));
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
+ OSDOp& osd_op,
+ const ObjectState& os)
+{
+ logger().debug("{}, msg epoch: {}", __func__, get_message().get_map_epoch());
+
+ if (!os.exists) {
+ return crimson::ct_error::enoent::make();
+ }
+ struct notify_ctx_t {
+ crimson::net::ConnectionRef conn;
+ notify_info_t ninfo;
+ const uint64_t client_gid;
+ const epoch_t epoch;
+
+ notify_ctx_t(const MOSDOp& msg)
+ : conn(msg.get_connection()),
+ client_gid(msg.get_reqid().name.num()),
+ epoch(msg.get_map_epoch()) {
+ }
+ };
+ return with_effect_on_obc(notify_ctx_t{ get_message() },
+ [&] (auto& ctx) {
+ try {
+ auto bp = osd_op.indata.cbegin();
+ uint32_t ver; // obsolete
+ ceph::decode(ver, bp);
+ ceph::decode(ctx.ninfo.timeout, bp);
+ ceph::decode(ctx.ninfo.bl, bp);
+ } catch (const buffer::error&) {
+ ctx.ninfo.timeout = 0;
+ }
+ if (!ctx.ninfo.timeout) {
+ using crimson::common::local_conf;
+ ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
+ }
+ ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
+ ctx.ninfo.cookie = osd_op.op.notify.cookie;
+ // return our unique notify id to the client
+ ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
+ return seastar::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc) {
+ auto alive_watchers = obc->watchers | boost::adaptors::map_values
+ | boost::adaptors::filtered(
+ [] (const auto& w) {
+ // FIXME: filter as for the `is_ping` in `Watch::start_notify`
+ return w->is_alive();
+ });
+ return crimson::osd::Notify::create_n_propagate(
+ std::begin(alive_watchers),
+ std::end(alive_watchers),
+ std::move(ctx.conn),
+ ctx.ninfo,
+ ctx.client_gid,
+ obc->obs.oi.user_version);
+ });
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack(
+ OSDOp& osd_op,
+ const ObjectState& os)
+{
+ logger().debug("{}", __func__);
+
+ struct notifyack_ctx_t {
+ const entity_name_t entity;
+ uint64_t watch_cookie;
+ uint64_t notify_id;
+ ceph::bufferlist reply_bl;
+
+ notifyack_ctx_t(const MOSDOp& msg) : entity(msg.get_reqid().name) {
+ }
+ };
+ return with_effect_on_obc(notifyack_ctx_t{ get_message() },
+ [&] (auto& ctx) -> watch_errorator::future<> {
+ try {
+ auto bp = osd_op.indata.cbegin();
+ ceph::decode(ctx.notify_id, bp);
+ ceph::decode(ctx.watch_cookie, bp);
+ if (!bp.end()) {
+ ceph::decode(ctx.reply_bl, bp);
+ }
+ } catch (const buffer::error&) {
+ // here we behave differently than ceph-osd. For historical reasons,
+ // it falls back to using `osd_op.op.watch.cookie` as `ctx.notify_id`.
+ // crimson just returns EINVAL if the data cannot be decoded.
+ return crimson::ct_error::invarg::make();
+ }
+ return watch_errorator::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc) {
+ logger().info("notify_ack watch_cookie={}, notify_id={}",
+ ctx.watch_cookie, ctx.notify_id);
+ return seastar::do_for_each(obc->watchers,
+ [ctx=std::move(ctx)] (auto& kv) {
+ const auto& [key, watchp] = kv;
+ static_assert(
+ std::is_same_v<std::decay_t<decltype(watchp)>,
+ seastar::shared_ptr<crimson::osd::Watch>>);
+ auto& [cookie, entity] = key;
+ if (ctx.entity != entity) {
+ logger().debug("skipping watch {}; entity name {} != {}",
+ key, entity, ctx.entity);
+ return seastar::now();
+ }
+ if (ctx.watch_cookie != cookie) {
+ logger().debug("skipping watch {}; cookie {} != {}",
+ key, ctx.watch_cookie, cookie);
+ return seastar::now();
+ }
+ logger().info("acking notify on watch {}", key);
+ return watchp->notify_ack(ctx.notify_id, ctx.reply_bl);
+ });
+ });
+}
+
+OpsExecuter::osd_op_errorator::future<>
+OpsExecuter::execute_op(OSDOp& osd_op)
+{
+ // TODO: dispatch via call table?
+ // TODO: we might want to find a way to unify both input and output
+ // of each op.
+ logger().debug(
+ "handling op {} on object {}",
+ ceph_osd_op_name(osd_op.op.op),
+ get_target());
+ switch (const ceph_osd_op& op = osd_op.op; op.op) {
+ case CEPH_OSD_OP_SYNC_READ:
+ [[fallthrough]];
+ case CEPH_OSD_OP_READ:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.read(os, osd_op);
+ });
+ case CEPH_OSD_OP_SPARSE_READ:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.sparse_read(os, osd_op);
+ });
+ case CEPH_OSD_OP_CHECKSUM:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.checksum(os, osd_op);
+ });
+ case CEPH_OSD_OP_CMPEXT:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.cmp_ext(os, osd_op);
+ });
+ case CEPH_OSD_OP_GETXATTR:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.getxattr(os, osd_op);
+ });
+ case CEPH_OSD_OP_GETXATTRS:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.get_xattrs(os, osd_op);
+ });
+ case CEPH_OSD_OP_RMXATTR:
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.rm_xattr(os, osd_op, txn);
+ }, true);
+ case CEPH_OSD_OP_CREATE:
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.create(os, osd_op, txn);
+ }, true);
+ case CEPH_OSD_OP_WRITE:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.write(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_WRITESAME:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.write_same(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_WRITEFULL:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.writefull(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_APPEND:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.append(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_TRUNCATE:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ // FIXME: rework needed. Move this out to do_write_op(), introduce
+ // do_write_op_no_user_modify()...
+ return backend.truncate(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_ZERO:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.zero(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_SETALLOCHINT:
+ return osd_op_errorator::now();
+ case CEPH_OSD_OP_SETXATTR:
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.setxattr(os, osd_op, txn);
+ }, true);
+ case CEPH_OSD_OP_DELETE:
+ return do_write_op([] (auto& backend, auto& os, auto& txn) {
+ return backend.remove(os, txn);
+ }, true);
+ case CEPH_OSD_OP_CALL:
+ return this->do_op_call(osd_op);
+ case CEPH_OSD_OP_STAT:
+ // note: stat does not require RD
+ return do_const_op([&osd_op] (/* const */auto& backend, const auto& os) {
+ return backend.stat(os, osd_op);
+ });
+ case CEPH_OSD_OP_TMAPUP:
+ // TODO: there was an effort to kill TMAP in ceph-osd. According to
+ // @dzafman this isn't possible yet. Maybe it could be accomplished
+ // before crimson's readiness and we'd luckily don't need to carry.
+ return dont_do_legacy_op();
+
+ // OMAP
+ case CEPH_OSD_OP_OMAPGETKEYS:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_keys(os, osd_op);
+ });
+ case CEPH_OSD_OP_OMAPGETVALS:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_vals(os, osd_op);
+ });
+ case CEPH_OSD_OP_OMAPGETHEADER:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_header(os, osd_op);
+ });
+ case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+ return do_read_op([&osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_vals_by_keys(os, osd_op);
+ });
+ case CEPH_OSD_OP_OMAPSETVALS:
+#if 0
+ if (!pg.get_pool().info.supports_omap()) {
+ return crimson::ct_error::operation_not_supported::make();
+ }
+#endif
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_set_vals(os, osd_op, txn, *osd_op_params);
+ }, true);
+ case CEPH_OSD_OP_OMAPSETHEADER:
+#if 0
+ if (!pg.get_pool().info.supports_omap()) {
+ return crimson::ct_error::operation_not_supported::make();
+ }
+#endif
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_set_header(os, osd_op, txn);
+ }, true);
+ case CEPH_OSD_OP_OMAPRMKEYRANGE:
+#if 0
+ if (!pg.get_pool().info.supports_omap()) {
+ return crimson::ct_error::operation_not_supported::make();
+ }
+#endif
+ return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_remove_range(os, osd_op, txn);
+ }, true);
+ case CEPH_OSD_OP_OMAPCLEAR:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_clear(os, osd_op, txn, *osd_op_params);
+ }, true);
+
+ // watch/notify
+ case CEPH_OSD_OP_WATCH:
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_op_watch(osd_op, os, txn);
+ }, false);
+ case CEPH_OSD_OP_NOTIFY:
+ return do_read_op([this, &osd_op] (auto&, const auto& os) {
+ return do_op_notify(osd_op, os);
+ });
+ case CEPH_OSD_OP_NOTIFY_ACK:
+ return do_read_op([this, &osd_op] (auto&, const auto& os) {
+ return do_op_notify_ack(osd_op, os);
+ });
+
+ default:
+ logger().warn("unknown op {}", ceph_osd_op_name(op.op));
+ throw std::runtime_error(
+ fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
+ }
+}
+
+static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
+ const std::string& type,
+ bufferlist::const_iterator& iter)
+{
+ // storing non-const PGLSFilter for the sake of ::init()
+ std::unique_ptr<PGLSFilter> filter;
+ if (type.compare("plain") == 0) {
+ filter = std::make_unique<PGLSPlainFilter>();
+ } else {
+ std::size_t dot = type.find(".");
+ if (dot == type.npos || dot == 0 || dot == type.size() - 1) {
+ throw crimson::osd::invalid_argument{};
+ }
+
+ const std::string class_name = type.substr(0, dot);
+ const std::string filter_name = type.substr(dot + 1);
+ ClassHandler::ClassData *cls = nullptr;
+ int r = ClassHandler::get_instance().open_class(class_name, &cls);
+ if (r != 0) {
+ logger().warn("can't open class {}: {}", class_name, cpp_strerror(r));
+ if (r == -EPERM) {
+ // propogate permission error
+ throw crimson::osd::permission_denied{};
+ } else {
+ throw crimson::osd::invalid_argument{};
+ }
+ } else {
+ ceph_assert(cls);
+ }
+
+ ClassHandler::ClassFilter * const class_filter = cls->get_filter(filter_name);
+ if (class_filter == nullptr) {
+ logger().warn("can't find filter {} in class {}", filter_name, class_name);
+ throw crimson::osd::invalid_argument{};
+ }
+
+ filter.reset(class_filter->fn());
+ if (!filter) {
+ // Object classes are obliged to return us something, but let's
+ // give an error rather than asserting out.
+ logger().warn("buggy class {} failed to construct filter {}",
+ class_name, filter_name);
+ throw crimson::osd::invalid_argument{};
+ }
+ }
+
+ ceph_assert(filter);
+ int r = filter->init(iter);
+ if (r < 0) {
+ logger().warn("error initializing filter {}: {}", type, cpp_strerror(r));
+ throw crimson::osd::invalid_argument{};
+ }
+
+ // successfully constructed and initialized, return it.
+ return filter;
+}
+
+static seastar::future<hobject_t> pgls_filter(
+ const PGLSFilter& filter,
+ const PGBackend& backend,
+ const hobject_t& sobj)
+{
+ if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
+ logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
+ xattr, sobj);
+ return backend.getxattr(sobj, xattr).safe_then(
+ [&filter, sobj] (ceph::bufferptr bp) {
+ logger().debug("pgls_filter: got xvalue for obj={}", sobj);
+
+ ceph::bufferlist val;
+ val.push_back(std::move(bp));
+ const bool filtered = filter.filter(sobj, val);
+ return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
+ }, PGBackend::get_attr_errorator::all_same_way([&filter, sobj] {
+ logger().debug("pgls_filter: got error for obj={}", sobj);
+
+ if (filter.reject_empty_xattr()) {
+ return seastar::make_ready_future<hobject_t>(hobject_t{});
+ }
+ ceph::bufferlist val;
+ const bool filtered = filter.filter(sobj, val);
+ return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
+ }));
+ } else {
+ ceph::bufferlist empty_lvalue_bl;
+ const bool filtered = filter.filter(sobj, empty_lvalue_bl);
+ return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
+ }
+}
+
+static seastar::future<ceph::bufferlist> do_pgnls_common(
+ const hobject_t& pg_start,
+ const hobject_t& pg_end,
+ const PGBackend& backend,
+ const hobject_t& lower_bound,
+ const std::string& nspace,
+ const uint64_t limit,
+ const PGLSFilter* const filter)
+{
+ if (!(lower_bound.is_min() ||
+ lower_bound.is_max() ||
+ (lower_bound >= pg_start && lower_bound < pg_end))) {
+ // this should only happen with a buggy client.
+ throw std::invalid_argument("outside of PG bounds");
+ }
+
+ return backend.list_objects(lower_bound, limit).then(
+ [&backend, filter, nspace](auto&& ret) {
+ auto& [objects, next] = ret;
+ auto in_my_namespace = [&nspace](const hobject_t& obj) {
+ using crimson::common::local_conf;
+ if (obj.get_namespace() == local_conf()->osd_hit_set_namespace) {
+ return false;
+ } else if (nspace == librados::all_nspaces) {
+ return true;
+ } else {
+ return obj.get_namespace() == nspace;
+ }
+ };
+ auto to_pglsed = [&backend, filter] (const hobject_t& obj) {
+ // this transformation looks costly. However, I don't have any
+ // reason to think PGLS* operations are critical for, let's say,
+ // general performance.
+ //
+ // from tchaikov: "another way is to use seastar::map_reduce(),
+ // to 1) save the effort to filter the already filtered objects
+ // 2) avoid the space to keep the tuple<bool, object> even if
+ // the object is filtered out".
+ if (filter) {
+ return pgls_filter(*filter, backend, obj);
+ } else {
+ return seastar::make_ready_future<hobject_t>(obj);
+ }
+ };
+
+ auto range = objects | boost::adaptors::filtered(in_my_namespace)
+ | boost::adaptors::transformed(to_pglsed);
+ logger().debug("do_pgnls_common: finishing the 1st stage of pgls");
+ return seastar::when_all_succeed(std::begin(range),
+ std::end(range)).then(
+ [next=std::move(next)] (auto items) mutable {
+ // the sole purpose of this chaining is to pass `next` to 2nd
+ // stage altogether with items
+ logger().debug("do_pgnls_common: 1st done");
+ return seastar::make_ready_future<
+ std::tuple<std::vector<hobject_t>, hobject_t>>(
+ std::make_tuple(std::move(items), std::move(next)));
+ });
+ }).then(
+ [pg_end] (auto&& ret) {
+ auto& [items, next] = ret;
+ auto is_matched = [] (const auto& obj) {
+ return !obj.is_min();
+ };
+ auto to_entry = [] (const auto& obj) {
+ return librados::ListObjectImpl{
+ obj.get_namespace(), obj.oid.name, obj.get_key()
+ };
+ };
+
+ pg_nls_response_t response;
+ boost::push_back(response.entries, items | boost::adaptors::filtered(is_matched)
+ | boost::adaptors::transformed(to_entry));
+ response.handle = next.is_max() ? pg_end : next;
+ ceph::bufferlist out;
+ encode(response, out);
+ logger().debug("{}: response.entries.size()=",
+ __func__, response.entries.size());
+ return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
+ });
+}
+
+static seastar::future<> do_pgnls(
+ const PG& pg,
+ const std::string& nspace,
+ OSDOp& osd_op)
+{
+ hobject_t lower_bound;
+ try {
+ ceph::decode(lower_bound, osd_op.indata);
+ } catch (const buffer::error&) {
+ throw std::invalid_argument("unable to decode PGNLS handle");
+ }
+ const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
+ const auto pg_end = \
+ pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ return do_pgnls_common(pg_start,
+ pg_end,
+ pg.get_backend(),
+ lower_bound,
+ nspace,
+ osd_op.op.pgls.count,
+ nullptr /* no filter */)
+ .then([&osd_op](bufferlist bl) {
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
+}
+
+static seastar::future<> do_pgnls_filtered(
+ const PG& pg,
+ const std::string& nspace,
+ OSDOp& osd_op)
+{
+ std::string cname, mname, type;
+ auto bp = osd_op.indata.cbegin();
+ try {
+ ceph::decode(cname, bp);
+ ceph::decode(mname, bp);
+ ceph::decode(type, bp);
+ } catch (const buffer::error&) {
+ throw crimson::osd::invalid_argument{};
+ }
+
+ auto filter = get_pgls_filter(type, bp);
+
+ hobject_t lower_bound;
+ try {
+ lower_bound.decode(bp);
+ } catch (const buffer::error&) {
+ throw std::invalid_argument("unable to decode PGNLS_FILTER description");
+ }
+
+ logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
+ __func__, cname, mname, type, lower_bound,
+ static_cast<const void*>(filter.get()));
+ return seastar::do_with(std::move(filter),
+ [&, lower_bound=std::move(lower_bound)](auto&& filter) {
+ const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
+ const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ return do_pgnls_common(pg_start,
+ pg_end,
+ pg.get_backend(),
+ lower_bound,
+ nspace,
+ osd_op.op.pgls.count,
+ filter.get())
+ .then([&osd_op](bufferlist bl) {
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
+ });
+}
+
+static seastar::future<ceph::bufferlist> do_pgls_common(
+ const hobject_t& pg_start,
+ const hobject_t& pg_end,
+ const PGBackend& backend,
+ const hobject_t& lower_bound,
+ const std::string& nspace,
+ const uint64_t limit,
+ const PGLSFilter* const filter)
+{
+ if (!(lower_bound.is_min() ||
+ lower_bound.is_max() ||
+ (lower_bound >= pg_start && lower_bound < pg_end))) {
+ // this should only happen with a buggy client.
+ throw std::invalid_argument("outside of PG bounds");
+ }
+
+ using entries_t = decltype(pg_ls_response_t::entries);
+ return backend.list_objects(lower_bound, limit).then(
+ [&backend, filter, nspace](auto&& ret) {
+ auto& [objects, next] = ret;
+ return seastar::when_all(
+ seastar::map_reduce(std::move(objects),
+ [&backend, filter, nspace](const hobject_t& obj) {
+ if (obj.get_namespace() == nspace) {
+ if (filter) {
+ return pgls_filter(*filter, backend, obj);
+ } else {
+ return seastar::make_ready_future<hobject_t>(obj);
+ }
+ } else {
+ return seastar::make_ready_future<hobject_t>(hobject_t{});
+ }
+ },
+ entries_t{},
+ [](entries_t entries, hobject_t obj) {
+ if (!obj.is_min()) {
+ entries.emplace_back(obj.oid, obj.get_key());
+ }
+ return entries;
+ }),
+ seastar::make_ready_future<hobject_t>(next));
+ }).then([pg_end](auto&& ret) {
+ auto entries = std::move(std::get<0>(ret).get0());
+ auto next = std::move(std::get<1>(ret).get0());
+ pg_ls_response_t response;
+ response.handle = next.is_max() ? pg_end : next;
+ response.entries = std::move(entries);
+ ceph::bufferlist out;
+ encode(response, out);
+ logger().debug("{}: response.entries.size()=",
+ __func__, response.entries.size());
+ return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
+ });
+}
+
+static seastar::future<> do_pgls(
+ const PG& pg,
+ const std::string& nspace,
+ OSDOp& osd_op)
+{
+ hobject_t lower_bound;
+ auto bp = osd_op.indata.cbegin();
+ try {
+ lower_bound.decode(bp);
+ } catch (const buffer::error&) {
+ throw std::invalid_argument{"unable to decode PGLS handle"};
+ }
+ const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
+ const auto pg_end =
+ pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ return do_pgls_common(pg_start,
+ pg_end,
+ pg.get_backend(),
+ lower_bound,
+ nspace,
+ osd_op.op.pgls.count,
+ nullptr /* no filter */)
+ .then([&osd_op](bufferlist bl) {
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
+}
+
+static seastar::future<> do_pgls_filtered(
+ const PG& pg,
+ const std::string& nspace,
+ OSDOp& osd_op)
+{
+ std::string cname, mname, type;
+ auto bp = osd_op.indata.cbegin();
+ try {
+ ceph::decode(cname, bp);
+ ceph::decode(mname, bp);
+ ceph::decode(type, bp);
+ } catch (const buffer::error&) {
+ throw crimson::osd::invalid_argument{};
+ }
+
+ auto filter = get_pgls_filter(type, bp);
+
+ hobject_t lower_bound;
+ try {
+ lower_bound.decode(bp);
+ } catch (const buffer::error&) {
+ throw std::invalid_argument("unable to decode PGLS_FILTER description");
+ }
+
+ logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
+ __func__, cname, mname, type, lower_bound,
+ static_cast<const void*>(filter.get()));
+ return seastar::do_with(std::move(filter),
+ [&, lower_bound=std::move(lower_bound)](auto&& filter) {
+ const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
+ const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ return do_pgls_common(pg_start,
+ pg_end,
+ pg.get_backend(),
+ lower_bound,
+ nspace,
+ osd_op.op.pgls.count,
+ filter.get())
+ .then([&osd_op](bufferlist bl) {
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
+ });
+}
+
+seastar::future<>
+PgOpsExecuter::execute_op(OSDOp& osd_op)
+{
+ logger().warn("handling op {}", ceph_osd_op_name(osd_op.op.op));
+ switch (const ceph_osd_op& op = osd_op.op; op.op) {
+ case CEPH_OSD_OP_PGLS:
+ return do_pgls(pg, nspace, osd_op);
+ case CEPH_OSD_OP_PGLS_FILTER:
+ return do_pgls_filtered(pg, nspace, osd_op);
+ case CEPH_OSD_OP_PGNLS:
+ return do_pgnls(pg, nspace, osd_op);
+ case CEPH_OSD_OP_PGNLS_FILTER:
+ return do_pgnls_filtered(pg, nspace, osd_op);
+ default:
+ logger().warn("unknown op {}", ceph_osd_op_name(op.op));
+ throw std::runtime_error(
+ fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
+ }
+}
+
+} // namespace crimson::osd