// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #pragma once #include #include #include #include #include #include #include #include #include #include #include #include "include/ceph_assert.h" #include "crimson/osd/scheduler/scheduler.h" namespace ceph { class Formatter; } namespace crimson::osd { enum class OperationTypeCode { client_request = 0, peering_event, compound_peering_request, pg_advance_map, pg_creation, replicated_request, background_recovery, background_recovery_sub, last_op }; static constexpr const char* const OP_NAMES[] = { "client_request", "peering_event", "compound_peering_request", "pg_advance_map", "pg_creation", "replicated_request", "background_recovery", "background_recovery_sub", }; // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry: static_assert( (sizeof(OP_NAMES)/sizeof(OP_NAMES[0])) == static_cast(OperationTypeCode::last_op)); class OperationRegistry; using registry_hook_t = boost::intrusive::list_member_hook< boost::intrusive::link_mode>; class Operation; class Blocker; /** * Provides an abstraction for registering and unregistering a blocker * for the duration of a future becoming available. */ template class blocking_future_detail { friend class Operation; friend class Blocker; Blocker *blocker; Fut fut; blocking_future_detail(Blocker *b, Fut &&f) : blocker(b), fut(std::move(f)) {} template friend blocking_future_detail> make_ready_blocking_future(U&& args); template friend blocking_future_detail> make_exception_blocking_future(Exception&& e); template friend blocking_future_detail> join_blocking_futures(U &&u); template friend class blocking_future_detail; public: template auto then(F &&f) && { using result = decltype(std::declval().then(f)); return blocking_future_detail>( blocker, std::move(fut).then(std::forward(f))); } }; template using blocking_future = blocking_future_detail>; template blocking_future_detail> make_ready_blocking_future(U&& args) { return blocking_future( nullptr, seastar::make_ready_future(std::forward(args))); } template blocking_future_detail> make_exception_blocking_future(Exception&& e) { return blocking_future( nullptr, seastar::make_exception_future(e)); } /** * Provides an interface for dumping diagnostic information about * why a particular op is not making progress. */ class Blocker { public: template blocking_future make_blocking_future(seastar::future &&f) { return blocking_future(this, std::move(f)); } void dump(ceph::Formatter *f) const; virtual ~Blocker() = default; private: virtual void dump_detail(ceph::Formatter *f) const = 0; virtual const char *get_type_name() const = 0; }; template class BlockerT : public Blocker { public: virtual ~BlockerT() = default; private: const char *get_type_name() const final { return T::type_name; } }; class AggregateBlocker : public BlockerT { vector parent_blockers; public: AggregateBlocker(vector &&parent_blockers) : parent_blockers(std::move(parent_blockers)) {} static constexpr const char *type_name = "AggregateBlocker"; private: void dump_detail(ceph::Formatter *f) const final; }; template blocking_future<> join_blocking_futures(T &&t) { vector blockers; blockers.reserve(t.size()); for (auto &&bf: t) { blockers.push_back(bf.blocker); bf.blocker = nullptr; } auto agg = std::make_unique(std::move(blockers)); return agg->make_blocking_future( seastar::parallel_for_each( std::forward(t), [](auto &&bf) { return std::move(bf.fut); }).then([agg=std::move(agg)] { return seastar::make_ready_future<>(); })); } /** * Common base for all crimson-osd operations. Mainly provides * an interface for registering ops in flight and dumping * diagnostic information. */ class Operation : public boost::intrusive_ref_counter< Operation, boost::thread_unsafe_counter> { public: uint64_t get_id() const { return id; } virtual OperationTypeCode get_type() const = 0; virtual const char *get_type_name() const = 0; virtual void print(std::ostream &) const = 0; template seastar::future with_blocking_future(blocking_future &&f) { if (f.fut.available()) { return std::move(f.fut); } assert(f.blocker); add_blocker(f.blocker); return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) { clear_blocker(blocker); return std::move(arg); }); } void dump(ceph::Formatter *f); void dump_brief(ceph::Formatter *f); virtual ~Operation() = default; private: virtual void dump_detail(ceph::Formatter *f) const = 0; private: registry_hook_t registry_hook; std::vector blockers; uint64_t id = 0; void set_id(uint64_t in_id) { id = in_id; } void add_blocker(Blocker *b) { blockers.push_back(b); } void clear_blocker(Blocker *b) { auto iter = std::find(blockers.begin(), blockers.end(), b); if (iter != blockers.end()) { blockers.erase(iter); } } friend class OperationRegistry; }; using OperationRef = boost::intrusive_ptr; std::ostream &operator<<(std::ostream &, const Operation &op); template class OperationT : public Operation { public: static constexpr const char *type_name = OP_NAMES[static_cast(T::type)]; using IRef = boost::intrusive_ptr; OperationTypeCode get_type() const final { return T::type; } const char *get_type_name() const final { return T::type_name; } virtual ~OperationT() = default; private: virtual void dump_detail(ceph::Formatter *f) const = 0; }; /** * Maintains a set of lists of all active ops. */ class OperationRegistry { friend class Operation; using op_list_member_option = boost::intrusive::member_hook< Operation, registry_hook_t, &Operation::registry_hook >; using op_list = boost::intrusive::list< Operation, op_list_member_option, boost::intrusive::constant_time_size>; std::array< op_list, static_cast(OperationTypeCode::last_op) > registries; std::array< uint64_t, static_cast(OperationTypeCode::last_op) > op_id_counters = {}; seastar::timer shutdown_timer; seastar::promise<> shutdown; public: template typename T::IRef create_operation(Args&&... args) { typename T::IRef op = new T(std::forward(args)...); registries[static_cast(T::type)].push_back(*op); op->set_id(op_id_counters[static_cast(T::type)]++); return op; } seastar::future<> stop() { shutdown_timer.set_callback([this] { if (std::all_of(registries.begin(), registries.end(), [](auto& opl) { return opl.empty(); })) { shutdown.set_value(); shutdown_timer.cancel(); } }); shutdown_timer.arm_periodic(std::chrono::milliseconds(100/*TODO: use option instead*/)); return shutdown.get_future(); } }; /** * Throttles set of currently running operations * * Very primitive currently, assumes all ops are equally * expensive and simply limits the number that can be * concurrently active. */ class OperationThrottler : public Blocker, private md_config_obs_t { public: OperationThrottler(ConfigProxy &conf); const char** get_tracked_conf_keys() const final; void handle_conf_change(const ConfigProxy& conf, const std::set &changed) final; void update_from_config(const ConfigProxy &conf); template auto with_throttle( OperationRef op, crimson::osd::scheduler::params_t params, F &&f) { if (!max_in_progress) return f(); auto fut = acquire_throttle(params); return op->with_blocking_future(std::move(fut)) .then(std::forward(f)) .then([this](auto x) { release_throttle(); return x; }); } template seastar::future<> with_throttle_while( OperationRef op, crimson::osd::scheduler::params_t params, F &&f) { return with_throttle(op, params, f).then([this, params, op, f](bool cont) { if (cont) return with_throttle_while(op, params, f); else return seastar::make_ready_future<>(); }); } private: void dump_detail(Formatter *f) const final; const char *get_type_name() const final { return "OperationThrottler"; } private: crimson::osd::scheduler::SchedulerRef scheduler; uint64_t max_in_progress = 0; uint64_t in_progress = 0; uint64_t pending = 0; void wake(); blocking_future<> acquire_throttle( crimson::osd::scheduler::params_t params); void release_throttle(); }; /** * Ensures that at most one op may consider itself in the phase at a time. * Ops will see enter() unblock in the order in which they tried to enter * the phase. entering (though not necessarily waiting for the future to * resolve) a new phase prior to exiting the previous one will ensure that * the op ordering is preserved. */ class OrderedPipelinePhase : public Blocker { private: void dump_detail(ceph::Formatter *f) const final; const char *get_type_name() const final { return name; } public: /** * Used to encapsulate pipeline residency state. */ class Handle { OrderedPipelinePhase *phase = nullptr; public: Handle() = default; Handle(const Handle&) = delete; Handle(Handle&&) = delete; Handle &operator=(const Handle&) = delete; Handle &operator=(Handle&&) = delete; /** * Returns a future which unblocks when the handle has entered the passed * OrderedPipelinePhase. If already in a phase, enter will also release * that phase after placing itself in the queue for the next one to preserve * ordering. */ blocking_future<> enter(OrderedPipelinePhase &phase); /** * Releases the current phase if there is one. Called in ~Handle(). */ void exit(); ~Handle(); }; OrderedPipelinePhase(const char *name) : name(name) {} private: const char * name; seastar::shared_mutex mutex; }; }