// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #pragma once #include "crimson/common/operation.h" #include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/scheduler/scheduler.h" #include "osd/osd_types.h" namespace crimson::os::seastore { template class OperationProxyT; } namespace crimson::osd { /// Ordering stages for a class of operations ordered by PG. struct ConnectionPipeline { struct AwaitActive : OrderedExclusivePhaseT { static constexpr auto type_name = "ConnectionPipeline::await_active"; } await_active; struct AwaitMap : OrderedExclusivePhaseT { static constexpr auto type_name = "ConnectionPipeline::await_map"; } await_map; struct GetPG : OrderedExclusivePhaseT { static constexpr auto type_name = "ConnectionPipeline::get_pg"; } get_pg; }; enum class OperationTypeCode { client_request = 0, peering_event, pg_advance_map, pg_creation, replicated_request, background_recovery, background_recovery_sub, internal_client_request, historic_client_request, logmissing_request, logmissing_request_reply, snaptrim_event, snaptrimobj_subevent, last_op }; static constexpr const char* const OP_NAMES[] = { "client_request", "peering_event", "pg_advance_map", "pg_creation", "replicated_request", "background_recovery", "background_recovery_sub", "internal_client_request", "historic_client_request", "logmissing_request", "logmissing_request_reply", "snaptrim_event", "snaptrimobj_subevent", }; // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry: static_assert( (sizeof(OP_NAMES)/sizeof(OP_NAMES[0])) == static_cast(OperationTypeCode::last_op)); struct InterruptibleOperation : Operation { template using interruptible_future = ::crimson::interruptible::interruptible_future< ::crimson::osd::IOInterruptCondition, ValuesT>; using interruptor = ::crimson::interruptible::interruptor< ::crimson::osd::IOInterruptCondition>; }; template struct OperationT : InterruptibleOperation { static constexpr const char *type_name = OP_NAMES[static_cast(T::type)]; using IRef = boost::intrusive_ptr; using ICRef = boost::intrusive_ptr; unsigned get_type() const final { return static_cast(T::type); } const char *get_type_name() const final { return T::type_name; } virtual ~OperationT() = default; private: virtual void dump_detail(ceph::Formatter *f) const = 0; }; template class TrackableOperationT : public OperationT { T* that() { return static_cast(this); } const T* that() const { return static_cast(this); } protected: template decltype(auto) get_event() { // all out derivates are supposed to define the list of tracking // events accessible via `std::get`. This will usually boil down // into an instance of `std::tuple`. return std::get(that()->tracking_events); } template decltype(auto) get_event() const { return std::get(that()->tracking_events); } using OperationT::OperationT; struct StartEvent : TimeEvent {}; struct CompletionEvent : TimeEvent {}; template void track_event(Args&&... args) { // the idea is to have a visitor-like interface that allows to double // dispatch (backend, blocker type) get_event().trigger(*that(), std::forward(args)...); } template auto with_blocking_event(F&& f) { auto ret = std::forward(f)(typename BlockingEventT::template Trigger{ get_event(), *that() }); if constexpr (std::is_same_v) { return ret; } else { using ret_t = decltype(ret); return typename InterruptorT::template futurize_t{std::move(ret)}; } } public: static constexpr bool is_trackable = true; }; template class PhasedOperationT : public TrackableOperationT { using base_t = TrackableOperationT; T* that() { return static_cast(this); } const T* that() const { return static_cast(this); } protected: using TrackableOperationT::TrackableOperationT; template auto enter_stage(StageT& stage) { return this->template with_blocking_event( [&stage, this] (auto&& trigger) { // delegated storing the pipeline handle to let childs to match // the lifetime of pipeline with e.g. ConnectedSocket (important // for ConnectionPipeline). return that()->get_handle().template enter(stage, std::move(trigger)); }); } template friend class crimson::os::seastore::OperationProxyT; // PGShardManager::start_pg_operation needs access to enter_stage, we can make this // more sophisticated later on friend class PGShardManager; }; /** * Maintains a set of lists of all active ops. */ struct OSDOperationRegistry : OperationRegistryT< static_cast(OperationTypeCode::last_op) > { OSDOperationRegistry(); void do_stop() override; void put_historic(const class ClientRequest& op); size_t dump_historic_client_requests(ceph::Formatter* f) const; size_t dump_slowest_historic_client_requests(ceph::Formatter* f) const; private: op_list::const_iterator last_of_recents; size_t num_recent_ops = 0; size_t num_slow_ops = 0; }; /** * Throttles set of currently running operations * * Very primitive currently, assumes all ops are equally * expensive and simply limits the number that can be * concurrently active. */ class OperationThrottler : public BlockerT, private md_config_obs_t { friend BlockerT; static constexpr const char* type_name = "OperationThrottler"; template auto with_throttle( OperationT* op, crimson::osd::scheduler::params_t params, F &&f) { if (!max_in_progress) return f(); return acquire_throttle(params) .then(std::forward(f)) .then([this](auto x) { release_throttle(); return x; }); } template seastar::future<> with_throttle_while( OperationT* op, crimson::osd::scheduler::params_t params, F &&f) { return with_throttle(op, params, f).then([this, params, op, f](bool cont) { return cont ? with_throttle_while(op, params, f) : seastar::now(); }); } public: OperationThrottler(ConfigProxy &conf); const char** get_tracked_conf_keys() const final; void handle_conf_change(const ConfigProxy& conf, const std::set &changed) final; void update_from_config(const ConfigProxy &conf); template seastar::future<> with_throttle_while( BlockingEvent::Trigger&& trigger, Args&&... args) { return trigger.maybe_record_blocking( with_throttle_while(std::forward(args)...), *this); } private: void dump_detail(Formatter *f) const final; crimson::osd::scheduler::SchedulerRef scheduler; uint64_t max_in_progress = 0; uint64_t in_progress = 0; uint64_t pending = 0; void wake(); seastar::future<> acquire_throttle( crimson::osd::scheduler::params_t params); void release_throttle(); }; }