summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/ops_executer.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/osd/ops_executer.h
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/osd/ops_executer.h')
-rw-r--r--src/crimson/osd/ops_executer.h629
1 files changed, 629 insertions, 0 deletions
diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h
new file mode 100644
index 000000000..1230b1c5a
--- /dev/null
+++ b/src/crimson/osd/ops_executer.h
@@ -0,0 +1,629 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <fmt/os.h>
+#include <seastar/core/chunked_fifo.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/shared_ptr.hh>
+
+#include "common/dout.h"
+#include "common/map_cacher.hpp"
+#include "common/static_ptr.h"
+#include "messages/MOSDOp.h"
+#include "os/Transaction.h"
+#include "osd/osd_types.h"
+
+#include "crimson/common/errorator.h"
+#include "crimson/common/interruptible_future.h"
+#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/pg_backend.h"
+#include "crimson/osd/pg_interval_interrupt_condition.h"
+#include "crimson/osd/shard_services.h"
+
+struct ObjectState;
+struct OSDOp;
+class OSDriver;
+class SnapMapper;
+
+namespace crimson::osd {
+class PG;
+
+// OpsExecuter -- a class for executing ops targeting a certain object.
+class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
+ friend class SnapTrimObjSubEvent;
+
+ using call_errorator = crimson::errorator<
+ crimson::stateful_ec,
+ crimson::ct_error::enoent,
+ crimson::ct_error::eexist,
+ crimson::ct_error::enospc,
+ crimson::ct_error::edquot,
+ crimson::ct_error::cmp_fail,
+ crimson::ct_error::eagain,
+ crimson::ct_error::invarg,
+ crimson::ct_error::erange,
+ crimson::ct_error::ecanceled,
+ crimson::ct_error::enametoolong,
+ crimson::ct_error::permission_denied,
+ crimson::ct_error::operation_not_supported,
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::value_too_large,
+ crimson::ct_error::file_too_large>;
+ using read_errorator = PGBackend::read_errorator;
+ using write_ertr = PGBackend::write_ertr;
+ using get_attr_errorator = PGBackend::get_attr_errorator;
+ using watch_errorator = crimson::errorator<
+ crimson::ct_error::enoent,
+ crimson::ct_error::invarg,
+ crimson::ct_error::not_connected,
+ crimson::ct_error::timed_out>;
+
+ using call_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, call_errorator>;
+ using read_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, read_errorator>;
+ using write_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, write_ertr>;
+ using get_attr_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, get_attr_errorator>;
+ using watch_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, watch_errorator>;
+
+ template <typename Errorator, typename T = void>
+ using interruptible_errorated_future =
+ ::crimson::interruptible::interruptible_errorated_future<
+ IOInterruptCondition, Errorator, T>;
+ using interruptor =
+ ::crimson::interruptible::interruptor<IOInterruptCondition>;
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ IOInterruptCondition, T>;
+
+public:
+ // ExecutableMessage -- an interface class to allow using OpsExecuter
+ // with other message types than just the `MOSDOp`. The type erasure
+ // happens in the ctor of `OpsExecuter`.
+ struct ExecutableMessage {
+ virtual osd_reqid_t get_reqid() const = 0;
+ virtual utime_t get_mtime() const = 0;
+ virtual epoch_t get_map_epoch() const = 0;
+ virtual entity_inst_t get_orig_source_inst() const = 0;
+ virtual uint64_t get_features() const = 0;
+ virtual bool has_flag(uint32_t flag) const = 0;
+ virtual entity_name_t get_source() const = 0;
+ };
+
+ template <class ImplT>
+ class ExecutableMessagePimpl final : ExecutableMessage {
+ const ImplT* pimpl;
+ // In crimson, conn is independently maintained outside Message.
+ const crimson::net::ConnectionRef conn;
+ public:
+ ExecutableMessagePimpl(const ImplT* pimpl,
+ const crimson::net::ConnectionRef conn)
+ : pimpl(pimpl), conn(conn) {
+ }
+
+ osd_reqid_t get_reqid() const final {
+ return pimpl->get_reqid();
+ }
+ bool has_flag(uint32_t flag) const final {
+ return pimpl->has_flag(flag);
+ }
+ utime_t get_mtime() const final {
+ return pimpl->get_mtime();
+ };
+ epoch_t get_map_epoch() const final {
+ return pimpl->get_map_epoch();
+ }
+ entity_inst_t get_orig_source_inst() const final {
+ // We can't get the origin source address from the message
+ // since (In Crimson) the connection is maintained
+ // outside of the Message.
+ return entity_inst_t(get_source(), conn->get_peer_addr());
+ }
+ entity_name_t get_source() const final {
+ return pimpl->get_source();
+ }
+ uint64_t get_features() const final {
+ return pimpl->get_features();
+ }
+ };
+
+ // because OpsExecuter is pretty heavy-weight object we want to ensure
+ // it's not copied nor even moved by accident. Performance is the sole
+ // reason for prohibiting that.
+ OpsExecuter(OpsExecuter&&) = delete;
+ OpsExecuter(const OpsExecuter&) = delete;
+
+ using osd_op_errorator = crimson::compound_errorator_t<
+ call_errorator,
+ read_errorator,
+ write_ertr,
+ get_attr_errorator,
+ watch_errorator,
+ PGBackend::stat_errorator>;
+ using osd_op_ierrorator =
+ ::crimson::interruptible::interruptible_errorator<
+ IOInterruptCondition, osd_op_errorator>;
+
+ object_stat_sum_t delta_stats;
+private:
+ // an operation can be divided into two stages: main and effect-exposing
+ // one. The former is performed immediately on call to `do_osd_op()` while
+ // the later on `submit_changes()` – after successfully processing main
+ // stages of all involved operations. When any stage fails, none of all
+ // scheduled effect-exposing stages will be executed.
+ // when operation requires this division, some variant of `with_effect()`
+ // should be used.
+ struct effect_t {
+ // an effect can affect PG, i.e. create a watch timeout
+ virtual osd_op_errorator::future<> execute(Ref<PG> pg) = 0;
+ virtual ~effect_t() = default;
+ };
+
+ Ref<PG> pg; // for the sake of object class
+ ObjectContextRef obc;
+ const OpInfo& op_info;
+ using abstracted_msg_t =
+ ceph::static_ptr<ExecutableMessage,
+ sizeof(ExecutableMessagePimpl<void>)>;
+ abstracted_msg_t msg;
+ crimson::net::ConnectionRef conn;
+ std::optional<osd_op_params_t> osd_op_params;
+ bool user_modify = false;
+ ceph::os::Transaction txn;
+
+ size_t num_read = 0; ///< count read ops
+ size_t num_write = 0; ///< count update ops
+
+ SnapContext snapc; // writer snap context
+ struct CloningContext {
+ SnapSet new_snapset;
+ pg_log_entry_t log_entry;
+
+ void apply_to(
+ std::vector<pg_log_entry_t>& log_entries,
+ ObjectContext& processed_obc) &&;
+ };
+ std::unique_ptr<CloningContext> cloning_ctx;
+
+
+ /**
+ * execute_clone
+ *
+ * If snapc contains a snap which occurred logically after the last write
+ * seen by this object (see OpsExecutor::should_clone()), we first need
+ * make a clone of the object at its current state. execute_clone primes
+ * txn with that clone operation and returns an
+ * OpsExecutor::CloningContext which will allow us to fill in the corresponding
+ * metadata and log_entries once the operations have been processed.
+ *
+ * Note that this strategy differs from classic, which instead performs this
+ * work at the end and reorders the transaction. See
+ * PrimaryLogPG::make_writeable
+ *
+ * @param snapc [in] snapc for this operation (from the client if from the
+ * client, from the pool otherwise)
+ * @param initial_obs [in] objectstate for the object at operation start
+ * @param initial_snapset [in] snapset for the object at operation start
+ * @param backend [in,out] interface for generating mutations
+ * @param txn [out] transaction for the operation
+ */
+ std::unique_ptr<CloningContext> execute_clone(
+ const SnapContext& snapc,
+ const ObjectState& initial_obs,
+ const SnapSet& initial_snapset,
+ PGBackend& backend,
+ ceph::os::Transaction& txn);
+
+
+ /**
+ * should_clone
+ *
+ * Predicate returning whether a user write with snap context snapc
+ * contains a snap which occurred prior to the most recent write
+ * on the object reflected in initial_obc.
+ *
+ * @param initial_obc [in] obc for object to be mutated
+ * @param snapc [in] snapc for this operation (from the client if from the
+ * client, from the pool otherwise)
+ */
+ static bool should_clone(
+ const ObjectContext& initial_obc,
+ const SnapContext& snapc) {
+ // clone?
+ return initial_obc.obs.exists // both nominally and...
+ && !initial_obc.obs.oi.is_whiteout() // ... logically exists
+ && snapc.snaps.size() // there are snaps
+ && snapc.snaps[0] > initial_obc.ssc->snapset.seq; // existing obj is old
+ }
+
+ interruptible_future<std::vector<pg_log_entry_t>> flush_clone_metadata(
+ std::vector<pg_log_entry_t>&& log_entries,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn);
+
+ static interruptible_future<> snap_map_remove(
+ const hobject_t& soid,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn);
+ static interruptible_future<> snap_map_modify(
+ const hobject_t& soid,
+ const std::set<snapid_t>& snaps,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn);
+ static interruptible_future<> snap_map_clone(
+ const hobject_t& soid,
+ const std::set<snapid_t>& snaps,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn);
+
+ // this gizmo could be wrapped in std::optional for the sake of lazy
+ // initialization. we don't need it for ops that doesn't have effect
+ // TODO: verify the init overhead of chunked_fifo
+ seastar::chunked_fifo<std::unique_ptr<effect_t>> op_effects;
+
+ template <class Context, class MainFunc, class EffectFunc>
+ auto with_effect_on_obc(
+ Context&& ctx,
+ MainFunc&& main_func,
+ EffectFunc&& effect_func);
+
+ call_ierrorator::future<> do_op_call(OSDOp& osd_op);
+ watch_ierrorator::future<> do_op_watch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn);
+ watch_ierrorator::future<> do_op_watch_subop_watch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn);
+ watch_ierrorator::future<> do_op_watch_subop_reconnect(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn);
+ watch_ierrorator::future<> do_op_watch_subop_unwatch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn);
+ watch_ierrorator::future<> do_op_watch_subop_ping(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn);
+ watch_ierrorator::future<> do_op_list_watchers(
+ OSDOp& osd_op,
+ const ObjectState& os);
+ watch_ierrorator::future<> do_op_notify(
+ OSDOp& osd_op,
+ const ObjectState& os);
+ watch_ierrorator::future<> do_op_notify_ack(
+ OSDOp& osd_op,
+ const ObjectState& os);
+ call_errorator::future<> do_assert_ver(
+ OSDOp& osd_op,
+ const ObjectState& os);
+
+ using list_snaps_ertr = read_errorator::extend<
+ crimson::ct_error::invarg>;
+ using list_snaps_iertr = ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ list_snaps_ertr>;
+ list_snaps_iertr::future<> do_list_snaps(
+ OSDOp& osd_op,
+ const ObjectState& os,
+ const SnapSet& ss);
+
+ template <class Func>
+ auto do_const_op(Func&& f);
+
+ template <class Func>
+ auto do_read_op(Func&& f) {
+ ++num_read;
+ // TODO: pass backend as read-only
+ return do_const_op(std::forward<Func>(f));
+ }
+
+ template <class Func>
+ auto do_snapset_op(Func&& f) {
+ ++num_read;
+ return std::invoke(
+ std::forward<Func>(f),
+ std::as_const(obc->obs),
+ std::as_const(obc->ssc->snapset));
+ }
+
+ enum class modified_by {
+ user,
+ sys,
+ };
+
+ template <class Func>
+ auto do_write_op(Func&& f, modified_by m = modified_by::user);
+
+ decltype(auto) dont_do_legacy_op() {
+ return crimson::ct_error::operation_not_supported::make();
+ }
+
+ interruptible_errorated_future<osd_op_errorator>
+ do_execute_op(OSDOp& osd_op);
+
+ OpsExecuter(Ref<PG> pg,
+ ObjectContextRef obc,
+ const OpInfo& op_info,
+ abstracted_msg_t&& msg,
+ crimson::net::ConnectionRef conn,
+ const SnapContext& snapc);
+
+public:
+ template <class MsgT>
+ OpsExecuter(Ref<PG> pg,
+ ObjectContextRef obc,
+ const OpInfo& op_info,
+ const MsgT& msg,
+ crimson::net::ConnectionRef conn,
+ const SnapContext& snapc)
+ : OpsExecuter(
+ std::move(pg),
+ std::move(obc),
+ op_info,
+ abstracted_msg_t{
+ std::in_place_type_t<ExecutableMessagePimpl<MsgT>>{},
+ &msg,
+ conn},
+ conn,
+ snapc) {
+ }
+
+ template <class Func>
+ struct RollbackHelper;
+
+ template <class Func>
+ RollbackHelper<Func> create_rollbacker(Func&& func);
+
+ interruptible_errorated_future<osd_op_errorator>
+ execute_op(OSDOp& osd_op);
+
+ using rep_op_fut_tuple =
+ std::tuple<interruptible_future<>, osd_op_ierrorator::future<>>;
+ using rep_op_fut_t =
+ interruptible_future<rep_op_fut_tuple>;
+ template <typename MutFunc>
+ rep_op_fut_t flush_changes_n_do_ops_effects(
+ const std::vector<OSDOp>& ops,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ MutFunc&& mut_func) &&;
+ std::vector<pg_log_entry_t> prepare_transaction(
+ const std::vector<OSDOp>& ops);
+ void fill_op_params_bump_pg_version();
+
+ ObjectContextRef get_obc() const {
+ return obc;
+ }
+
+ const object_info_t &get_object_info() const {
+ return obc->obs.oi;
+ }
+ const hobject_t &get_target() const {
+ return get_object_info().soid;
+ }
+
+ const auto& get_message() const {
+ return *msg;
+ }
+
+ size_t get_processed_rw_ops_num() const {
+ return num_read + num_write;
+ }
+
+ uint32_t get_pool_stripe_width() const;
+
+ bool has_seen_write() const {
+ return num_write > 0;
+ }
+
+ object_stat_sum_t& get_stats(){
+ return delta_stats;
+ }
+
+ version_t get_last_user_version() const;
+
+ std::pair<object_info_t, ObjectContextRef> prepare_clone(
+ const hobject_t& coid);
+
+ void apply_stats();
+};
+
+template <class Context, class MainFunc, class EffectFunc>
+auto OpsExecuter::with_effect_on_obc(
+ Context&& ctx,
+ MainFunc&& main_func,
+ EffectFunc&& effect_func)
+{
+ using context_t = std::decay_t<Context>;
+ // the language offers implicit conversion to pointer-to-function for
+ // lambda only when it's closureless. We enforce this restriction due
+ // the fact that `flush_changes()` std::moves many executer's parts.
+ using allowed_effect_func_t =
+ seastar::future<> (*)(context_t&&, ObjectContextRef, Ref<PG>);
+ static_assert(std::is_convertible_v<EffectFunc, allowed_effect_func_t>,
+ "with_effect function is not allowed to capture");
+ struct task_t final : effect_t {
+ context_t ctx;
+ EffectFunc effect_func;
+ ObjectContextRef obc;
+
+ task_t(Context&& ctx, EffectFunc&& effect_func, ObjectContextRef obc)
+ : ctx(std::move(ctx)),
+ effect_func(std::move(effect_func)),
+ obc(std::move(obc)) {
+ }
+ osd_op_errorator::future<> execute(Ref<PG> pg) final {
+ return std::move(effect_func)(std::move(ctx),
+ std::move(obc),
+ std::move(pg));
+ }
+ };
+ auto task =
+ std::make_unique<task_t>(std::move(ctx), std::move(effect_func), obc);
+ auto& ctx_ref = task->ctx;
+ op_effects.emplace_back(std::move(task));
+ return std::forward<MainFunc>(main_func)(ctx_ref);
+}
+
+template <typename MutFunc>
+OpsExecuter::rep_op_fut_t
+OpsExecuter::flush_changes_n_do_ops_effects(
+ const std::vector<OSDOp>& ops,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ MutFunc&& mut_func) &&
+{
+ const bool want_mutate = !txn.empty();
+ // osd_op_params are instantiated by every wr-like operation.
+ assert(osd_op_params || !want_mutate);
+ assert(obc);
+ rep_op_fut_t maybe_mutated =
+ interruptor::make_ready_future<rep_op_fut_tuple>(
+ seastar::now(),
+ interruptor::make_interruptible(osd_op_errorator::now()));
+ if (cloning_ctx) {
+ ceph_assert(want_mutate);
+ }
+ if (want_mutate) {
+ if (user_modify) {
+ osd_op_params->user_at_version = osd_op_params->at_version.version;
+ }
+ maybe_mutated = flush_clone_metadata(
+ prepare_transaction(ops),
+ snap_mapper,
+ osdriver,
+ txn
+ ).then_interruptible([mut_func=std::move(mut_func),
+ this](auto&& log_entries) mutable {
+ auto [submitted, all_completed] =
+ std::forward<MutFunc>(mut_func)(std::move(txn),
+ std::move(obc),
+ std::move(*osd_op_params),
+ std::move(log_entries));
+ return interruptor::make_ready_future<rep_op_fut_tuple>(
+ std::move(submitted),
+ osd_op_ierrorator::future<>(std::move(all_completed)));
+ });
+ }
+ apply_stats();
+
+ if (__builtin_expect(op_effects.empty(), true)) {
+ return maybe_mutated;
+ } else {
+ return maybe_mutated.then_unpack_interruptible(
+ // need extra ref pg due to apply_stats() which can be executed after
+ // informing snap mapper
+ [this, pg=this->pg](auto&& submitted, auto&& all_completed) mutable {
+ return interruptor::make_ready_future<rep_op_fut_tuple>(
+ std::move(submitted),
+ all_completed.safe_then_interruptible([this, pg=std::move(pg)] {
+ // let's do the cleaning of `op_effects` in destructor
+ return interruptor::do_for_each(op_effects,
+ [pg=std::move(pg)](auto& op_effect) {
+ return op_effect->execute(pg);
+ });
+ }));
+ });
+ }
+}
+
+template <class Func>
+struct OpsExecuter::RollbackHelper {
+ interruptible_future<> rollback_obc_if_modified(const std::error_code& e);
+ ObjectContextRef get_obc() const {
+ assert(ox);
+ return ox->obc;
+ }
+ seastar::lw_shared_ptr<OpsExecuter> ox;
+ Func func;
+};
+
+template <class Func>
+inline OpsExecuter::RollbackHelper<Func>
+OpsExecuter::create_rollbacker(Func&& func) {
+ return {shared_from_this(), std::forward<Func>(func)};
+}
+
+
+template <class Func>
+OpsExecuter::interruptible_future<>
+OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
+ const std::error_code& e)
+{
+ // Oops, an operation had failed. do_osd_ops() altogether with
+ // OpsExecuter already dropped the ObjectStore::Transaction if
+ // there was any. However, this is not enough to completely
+ // rollback as we gave OpsExecuter the very single copy of `obc`
+ // we maintain and we did it for both reading and writing.
+ // Now all modifications must be reverted.
+ //
+ // Let's just reload from the store. Evicting from the shared
+ // LRU would be tricky as next MOSDOp (the one at `get_obc`
+ // phase) could actually already finished the lookup. Fortunately,
+ // this is supposed to live on cold paths, so performance is not
+ // a concern -- simplicity wins.
+ //
+ // The conditional's purpose is to efficiently handle hot errors
+ // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
+ // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
+ // typically append them before any write. If OpsExecuter hasn't
+ // seen any modifying operation, `obc` is supposed to be kept
+ // unchanged.
+ assert(ox);
+ const auto need_rollback = ox->has_seen_write();
+ crimson::get_logger(ceph_subsys_osd).debug(
+ "{}: object {} got error {}, need_rollback={}",
+ __func__,
+ ox->obc->get_oid(),
+ e,
+ need_rollback);
+ return need_rollback ? func(*ox->obc) : interruptor::now();
+}
+
+// PgOpsExecuter -- a class for executing ops targeting a certain PG.
+class PgOpsExecuter {
+ template <typename T = void>
+ using interruptible_future =
+ ::crimson::interruptible::interruptible_future<
+ IOInterruptCondition, T>;
+
+public:
+ PgOpsExecuter(const PG& pg, const MOSDOp& msg)
+ : pg(pg), nspace(msg.get_hobj().nspace) {
+ }
+
+ interruptible_future<> execute_op(OSDOp& osd_op);
+
+private:
+ const PG& pg;
+ const std::string& nspace;
+};
+
+} // namespace crimson::osd