diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/osd/osd_operations/client_request.h | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/osd/osd_operations/client_request.h')
-rw-r--r-- | src/crimson/osd/osd_operations/client_request.h | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h new file mode 100644 index 000000000..b2dce1e87 --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request.h @@ -0,0 +1,281 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> + +#include <boost/intrusive/list.hpp> +#include <boost/intrusive_ptr.hpp> + +#include "osd/osd_op_util.h" +#include "crimson/net/Connection.h" +#include "crimson/osd/object_context.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/client_request_common.h" +#include "crimson/osd/osd_operations/common/pg_pipeline.h" +#include "crimson/osd/pg_activation_blocker.h" +#include "crimson/osd/pg_map.h" +#include "crimson/common/type_helpers.h" +#include "crimson/common/utility.h" +#include "messages/MOSDOp.h" + +namespace crimson::osd { +class PG; +class OSD; +class ShardServices; + +class ClientRequest final : public PhasedOperationT<ClientRequest>, + private CommonClientRequest { + // Initially set to primary core, updated to pg core after move, + // used by put_historic + ShardServices *put_historic_shard_services = nullptr; + + crimson::net::ConnectionRef conn; + // must be after conn due to ConnectionPipeline's life-time + Ref<MOSDOp> m; + OpInfo op_info; + seastar::promise<> on_complete; + unsigned instance_id = 0; + +public: + class PGPipeline : public CommonPGPipeline { + public: + struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> { + static constexpr auto type_name = "ClientRequest::PGPipeline::await_map"; + } await_map; + struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> { + static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop"; + } wait_repop; + struct SendReply : OrderedExclusivePhaseT<SendReply> { + static constexpr auto type_name = "ClientRequest::PGPipeline::send_reply"; + } send_reply; + friend class ClientRequest; + friend class LttngBackend; + friend class HistoricBackend; + friend class ReqRequest; + friend class LogMissingRequest; + friend class LogMissingRequestReply; + }; + + /** + * instance_handle_t + * + * Client request is, at present, the only Operation which can be requeued. + * This is, mostly, fine. However, reusing the PipelineHandle or + * BlockingEvent structures before proving that the prior instance has stopped + * can create hangs or crashes due to violations of the BlockerT and + * PipelineHandle invariants. + * + * To solve this, we create an instance_handle_t which contains the events + * for the portion of execution that can be rerun as well as the + * PipelineHandle. ClientRequest::with_pg_int grabs a reference to the current + * instance_handle_t and releases its PipelineHandle in the finally block. + * On requeue, we create a new instance_handle_t with a fresh PipelineHandle + * and events tuple and use it and use it for the next invocation of + * with_pg_int. + */ + std::tuple< + StartEvent, + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + CompletionEvent + > tracking_events; + + class instance_handle_t : public boost::intrusive_ref_counter< + instance_handle_t, boost::thread_unsafe_counter> { + public: + // intrusive_ptr because seastar::lw_shared_ptr includes a cpu debug check + // that we will fail since the core on which we allocate the request may not + // be the core on which we perform with_pg_int. This is harmless, since we + // don't leave any references on the source core, so we just bypass it by using + // intrusive_ptr instead. + using ref_t = boost::intrusive_ptr<instance_handle_t>; + PipelineHandle handle; + + std::tuple< + PGPipeline::AwaitMap::BlockingEvent, + PG_OSDMapGate::OSDMapBlocker::BlockingEvent, + PGPipeline::WaitForActive::BlockingEvent, + PGActivationBlocker::BlockingEvent, + PGPipeline::RecoverMissing::BlockingEvent, + PGPipeline::GetOBC::BlockingEvent, + PGPipeline::Process::BlockingEvent, + PGPipeline::WaitRepop::BlockingEvent, + PGPipeline::SendReply::BlockingEvent, + CompletionEvent + > pg_tracking_events; + + template <typename BlockingEventT, typename InterruptorT=void, typename F> + auto with_blocking_event(F &&f, ClientRequest &op) { + auto ret = std::forward<F>(f)( + typename BlockingEventT::template Trigger<ClientRequest>{ + std::get<BlockingEventT>(pg_tracking_events), op + }); + if constexpr (std::is_same_v<InterruptorT, void>) { + return ret; + } else { + using ret_t = decltype(ret); + return typename InterruptorT::template futurize_t<ret_t>{std::move(ret)}; + } + } + + template <typename InterruptorT=void, typename StageT> + auto enter_stage(StageT &stage, ClientRequest &op) { + return this->template with_blocking_event< + typename StageT::BlockingEvent, + InterruptorT>( + [&stage, this](auto &&trigger) { + return handle.template enter<ClientRequest>( + stage, std::move(trigger)); + }, op); + } + + template < + typename InterruptorT=void, typename BlockingObj, typename Method, + typename... Args> + auto enter_blocker( + ClientRequest &op, BlockingObj &obj, Method method, Args&&... args) { + return this->template with_blocking_event< + typename BlockingObj::Blocker::BlockingEvent, + InterruptorT>( + [&obj, method, + args=std::forward_as_tuple(std::move(args)...)](auto &&trigger) mutable { + return apply_method_to_tuple( + obj, method, + std::tuple_cat( + std::forward_as_tuple(std::move(trigger)), + std::move(args)) + ); + }, op); + } + }; + instance_handle_t::ref_t instance_handle; + void reset_instance_handle() { + instance_handle = new instance_handle_t; + } + auto get_instance_handle() { return instance_handle; } + + using ordering_hook_t = boost::intrusive::list_member_hook<>; + ordering_hook_t ordering_hook; + class Orderer { + using list_t = boost::intrusive::list< + ClientRequest, + boost::intrusive::member_hook< + ClientRequest, + typename ClientRequest::ordering_hook_t, + &ClientRequest::ordering_hook> + >; + list_t list; + + public: + void add_request(ClientRequest &request) { + assert(!request.ordering_hook.is_linked()); + intrusive_ptr_add_ref(&request); + list.push_back(request); + } + void remove_request(ClientRequest &request) { + assert(request.ordering_hook.is_linked()); + list.erase(list_t::s_iterator_to(request)); + intrusive_ptr_release(&request); + } + void requeue(ShardServices &shard_services, Ref<PG> pg); + void clear_and_cancel(); + }; + void complete_request(); + + static constexpr OperationTypeCode type = OperationTypeCode::client_request; + + ClientRequest( + ShardServices &shard_services, + crimson::net::ConnectionRef, Ref<MOSDOp> &&m); + ~ClientRequest(); + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return m->get_spg(); + } + PipelineHandle &get_handle() { return instance_handle->handle; } + epoch_t get_epoch() const { return m->get_min_epoch(); } + + ConnectionPipeline &get_connection_pipeline(); + seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + + seastar::future<> with_pg_int( + ShardServices &shard_services, Ref<PG> pg); + +public: + seastar::future<> with_pg( + ShardServices &shard_services, Ref<PG> pgref); + +private: + template <typename FuncT> + interruptible_future<> with_sequencer(FuncT&& func); + auto reply_op_error(const Ref<PG>& pg, int err); + + interruptible_future<> do_process( + instance_handle_t &ihref, + Ref<PG>& pg, + crimson::osd::ObjectContextRef obc); + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition> process_pg_op( + Ref<PG> &pg); + ::crimson::interruptible::interruptible_future< + ::crimson::osd::IOInterruptCondition> process_op( + instance_handle_t &ihref, + Ref<PG> &pg); + bool is_pg_op() const; + + PGPipeline &client_pp(PG &pg); + + template <typename Errorator> + using interruptible_errorator = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + Errorator>; + + bool is_misdirected(const PG& pg) const; + + const SnapContext get_snapc( + Ref<PG>& pg, + crimson::osd::ObjectContextRef obc) const; + +public: + + friend class LttngBackend; + friend class HistoricBackend; + + auto get_started() const { + return get_event<StartEvent>().get_timestamp(); + }; + + auto get_completed() const { + return get_event<CompletionEvent>().get_timestamp(); + }; + + void put_historic() const; +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::ClientRequest> : fmt::ostream_formatter {}; +#endif |