From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/crimson/osd/ops_executer.h | 283 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 283 insertions(+) create mode 100644 src/crimson/osd/ops_executer.h (limited to 'src/crimson/osd/ops_executer.h') diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h new file mode 100644 index 000000000..42fcf61b8 --- /dev/null +++ b/src/crimson/osd/ops_executer.h @@ -0,0 +1,283 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/dout.h" +#include "crimson/net/Fwd.h" +#include "os/Transaction.h" +#include "osd/osd_types.h" +#include "crimson/osd/object_context.h" + +#include "crimson/common/errorator.h" +#include "crimson/common/type_helpers.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/shard_services.h" +#include "crimson/osd/osdmap_gate.h" + +#include "crimson/osd/pg_backend.h" +#include "crimson/osd/exceptions.h" + +#include "messages/MOSDOp.h" + +class PG; +class PGLSFilter; +class OSDOp; + +namespace crimson::osd { + +// PgOpsExecuter -- a class for executing ops targeting a certain object. +class OpsExecuter { + using call_errorator = crimson::errorator< + crimson::stateful_ec, + crimson::ct_error::enoent, + crimson::ct_error::invarg, + crimson::ct_error::permission_denied, + crimson::ct_error::operation_not_supported, + crimson::ct_error::input_output_error, + crimson::ct_error::value_too_large>; + using read_errorator = PGBackend::read_errorator; + using write_ertr = PGBackend::write_ertr; + using get_attr_errorator = PGBackend::get_attr_errorator; + using watch_errorator = crimson::errorator< + crimson::ct_error::enoent, + crimson::ct_error::invarg, + crimson::ct_error::not_connected, + crimson::ct_error::timed_out>; + +public: + // because OpsExecuter is pretty heavy-weight object we want to ensure + // it's not copied nor even moved by accident. Performance is the sole + // reason for prohibiting that. + OpsExecuter(OpsExecuter&&) = delete; + OpsExecuter(const OpsExecuter&) = delete; + + using osd_op_errorator = crimson::compound_errorator_t< + call_errorator, + read_errorator, + write_ertr, + get_attr_errorator, + watch_errorator, + PGBackend::stat_errorator>; + +private: + // an operation can be divided into two stages: main and effect-exposing + // one. The former is performed immediately on call to `do_osd_op()` while + // the later on `submit_changes()` – after successfully processing main + // stages of all involved operations. When any stage fails, none of all + // scheduled effect-exposing stages will be executed. + // when operation requires this division, some variant of `with_effect()` + // should be used. + struct effect_t { + virtual osd_op_errorator::future<> execute() = 0; + virtual ~effect_t() = default; + }; + + ObjectContextRef obc; + const OpInfo& op_info; + const pg_pool_t& pool_info; // for the sake of the ObjClass API + PGBackend& backend; + const MOSDOp& msg; + std::optional osd_op_params; + bool user_modify = false; + ceph::os::Transaction txn; + + size_t num_read = 0; ///< count read ops + size_t num_write = 0; ///< count update ops + + // this gizmo could be wrapped in std::optional for the sake of lazy + // initialization. we don't need it for ops that doesn't have effect + // TODO: verify the init overhead of chunked_fifo + seastar::chunked_fifo> op_effects; + + template + auto with_effect_on_obc( + Context&& ctx, + MainFunc&& main_func, + EffectFunc&& effect_func); + + call_errorator::future<> do_op_call(class OSDOp& osd_op); + watch_errorator::future<> do_op_watch( + class OSDOp& osd_op, + class ObjectState& os, + ceph::os::Transaction& txn); + watch_errorator::future<> do_op_watch_subop_watch( + class OSDOp& osd_op, + class ObjectState& os, + ceph::os::Transaction& txn); + watch_errorator::future<> do_op_watch_subop_reconnect( + class OSDOp& osd_op, + class ObjectState& os, + ceph::os::Transaction& txn); + watch_errorator::future<> do_op_watch_subop_unwatch( + class OSDOp& osd_op, + class ObjectState& os, + ceph::os::Transaction& txn); + watch_errorator::future<> do_op_watch_subop_ping( + class OSDOp& osd_op, + class ObjectState& os, + ceph::os::Transaction& txn); + watch_errorator::future<> do_op_notify( + class OSDOp& osd_op, + const class ObjectState& os); + watch_errorator::future<> do_op_notify_ack( + class OSDOp& osd_op, + const class ObjectState& os); + + hobject_t &get_target() const { + return obc->obs.oi.soid; + } + + template + auto do_const_op(Func&& f) { + // TODO: pass backend as read-only + return std::forward(f)(backend, std::as_const(obc->obs)); + } + + template + auto do_read_op(Func&& f) { + ++num_read; + // TODO: pass backend as read-only + return do_const_op(std::forward(f)); + } + + template + auto do_write_op(Func&& f, bool um) { + ++num_write; + if (!osd_op_params) { + osd_op_params.emplace(); + } + user_modify = um; + return std::forward(f)(backend, obc->obs, txn); + } + + decltype(auto) dont_do_legacy_op() { + return crimson::ct_error::operation_not_supported::make(); + } + +public: + OpsExecuter(ObjectContextRef obc, + const OpInfo& op_info, + const pg_pool_t& pool_info, + PGBackend& backend, + const MOSDOp& msg) + : obc(std::move(obc)), + op_info(op_info), + pool_info(pool_info), + backend(backend), + msg(msg) { + } + + osd_op_errorator::future<> execute_op(class OSDOp& osd_op); + + template + osd_op_errorator::future<> flush_changes(Func&& func, MutFunc&& mut_func) &&; + + const auto& get_message() const { + return msg; + } + + size_t get_processed_rw_ops_num() const { + return num_read + num_write; + } + + uint32_t get_pool_stripe_width() const { + return pool_info.get_stripe_width(); + } + + bool has_seen_write() const { + return num_write > 0; + } +}; + +template +auto OpsExecuter::with_effect_on_obc( + Context&& ctx, + MainFunc&& main_func, + EffectFunc&& effect_func) +{ + using context_t = std::decay_t; + // the language offers implicit conversion to pointer-to-function for + // lambda only when it's closureless. We enforce this restriction due + // the fact that `flush_changes()` std::moves many executer's parts. + using allowed_effect_func_t = + seastar::future<> (*)(context_t&&, ObjectContextRef); + static_assert(std::is_convertible_v, + "with_effect function is not allowed to capture"); + struct task_t final : effect_t { + context_t ctx; + EffectFunc effect_func; + ObjectContextRef obc; + + task_t(Context&& ctx, EffectFunc&& effect_func, ObjectContextRef obc) + : ctx(std::move(ctx)), + effect_func(std::move(effect_func)), + obc(std::move(obc)) { + } + osd_op_errorator::future<> execute() final { + return std::move(effect_func)(std::move(ctx), std::move(obc)); + } + }; + auto task = + std::make_unique(std::move(ctx), std::move(effect_func), obc); + auto& ctx_ref = task->ctx; + op_effects.emplace_back(std::move(task)); + return std::forward(main_func)(ctx_ref); +} + +template +OpsExecuter::osd_op_errorator::future<> OpsExecuter::flush_changes( + Func&& func, + MutFunc&& mut_func) && +{ + const bool want_mutate = !txn.empty(); + // osd_op_params are instantiated by every wr-like operation. + assert(osd_op_params || !want_mutate); + assert(obc); + if (__builtin_expect(op_effects.empty(), true)) { + return want_mutate ? std::forward(mut_func)(std::move(txn), + std::move(obc), + std::move(*osd_op_params), + user_modify) + : std::forward(func)(std::move(obc)); + } else { + return (want_mutate ? std::forward(mut_func)(std::move(txn), + std::move(obc), + std::move(*osd_op_params), + user_modify) + : std::forward(func)(std::move(obc)) + ).safe_then([this] { + // let's do the cleaning of `op_effects` in destructor + return crimson::do_for_each(op_effects, [] (auto& op_effect) { + return op_effect->execute(); + }); + }); + } +} + +// PgOpsExecuter -- a class for executing ops targeting a certain PG. +class PgOpsExecuter { +public: + PgOpsExecuter(const PG& pg, const MOSDOp& msg) + : pg(pg), nspace(msg.get_hobj().nspace) { + } + + seastar::future<> execute_op(class OSDOp& osd_op); + +private: + const PG& pg; + const std::string& nspace; +}; + +} // namespace crimson::osd -- cgit v1.2.3