diff options
Diffstat (limited to 'src/crimson/osd/shard_services.h')
-rw-r--r-- | src/crimson/osd/shard_services.h | 589 |
1 files changed, 589 insertions, 0 deletions
diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h new file mode 100644 index 000000000..9b7553e7b --- /dev/null +++ b/src/crimson/osd/shard_services.h @@ -0,0 +1,589 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <memory> + +#include <boost/intrusive_ptr.hpp> +#include <seastar/core/future.hh> + +#include "include/common_fwd.h" +#include "osd_operation.h" +#include "msg/MessageRef.h" +#include "crimson/common/exception.h" +#include "crimson/common/shared_lru.h" +#include "crimson/os/futurized_collection.h" +#include "osd/PeeringState.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_meta.h" +#include "crimson/osd/object_context.h" +#include "crimson/osd/pg_map.h" +#include "crimson/osd/state.h" +#include "common/AsyncReserver.h" + +namespace crimson::net { + class Messenger; +} + +namespace crimson::mgr { + class Client; +} + +namespace crimson::mon { + class Client; +} + +namespace crimson::os { + class FuturizedStore; +} + +class OSDMap; +class PeeringCtx; +class BufferedRecoveryMessages; + +namespace crimson::osd { + +class PGShardManager; + +/** + * PerShardState + * + * Per-shard state holding instances local to each shard. + */ +class PerShardState { + friend class ShardServices; + friend class PGShardManager; + friend class OSD; + using cached_map_t = OSDMapService::cached_map_t; + using local_cached_map_t = OSDMapService::local_cached_map_t; + + const core_id_t core = seastar::this_shard_id(); +#define assert_core() ceph_assert(seastar::this_shard_id() == core); + + const int whoami; + crimson::os::FuturizedStore::Shard &store; + crimson::common::CephContext cct; + + OSDState &osd_state; + OSD_OSDMapGate osdmap_gate; + + PerfCounters *perf = nullptr; + PerfCounters *recoverystate_perf = nullptr; + + // Op Management + OSDOperationRegistry registry; + OperationThrottler throttler; + + seastar::future<> dump_ops_in_flight(Formatter *f) const; + + epoch_t up_epoch = 0; + OSDMapService::cached_map_t osdmap; + const auto &get_osdmap() const { + assert_core(); + return osdmap; + } + void update_map(OSDMapService::cached_map_t new_osdmap) { + assert_core(); + osdmap = std::move(new_osdmap); + } + void set_up_epoch(epoch_t epoch) { + assert_core(); + up_epoch = epoch; + } + + // prevent creating new osd operations when system is shutting down, + // this is necessary because there are chances that a new operation + // is created, after the interruption of all ongoing operations, and + // creats and waits on a new and may-never-resolve future, in which + // case the shutdown may never succeed. + bool stopping = false; + seastar::future<> stop_registry() { + assert_core(); + crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__); + stopping = true; + return registry.stop(); + } + + // PGMap state + PGMap pg_map; + + seastar::future<> stop_pgs(); + std::map<pg_t, pg_stat_t> get_pg_stats() const; + seastar::future<> broadcast_map_to_pgs( + ShardServices &shard_services, + epoch_t epoch); + + Ref<PG> get_pg(spg_t pgid); + template <typename F> + void for_each_pg(F &&f) const { + assert_core(); + for (auto &pg : pg_map.get_pgs()) { + std::invoke(f, pg.first, pg.second); + } + } + + template <typename T, typename... Args> + auto start_operation(Args&&... args) { + assert_core(); + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + auto op = registry.create_operation<T>(std::forward<Args>(args)...); + crimson::get_logger(ceph_subsys_osd).info( + "PerShardState::{}, {}", __func__, *op); + auto fut = seastar::yield().then([op] { + return op->start().finally([op /* by copy */] { + // ensure the op's lifetime is appropriate. It is not enough to + // guarantee it's alive at the scheduling stages (i.e. `then()` + // calling) but also during the actual execution (i.e. when passed + // lambdas are actually run). + }); + }); + return std::make_pair(std::move(op), std::move(fut)); + } + + template <typename InterruptorT, typename T, typename... Args> + auto start_operation_may_interrupt(Args&&... args) { + assert_core(); + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + auto op = registry.create_operation<T>(std::forward<Args>(args)...); + crimson::get_logger(ceph_subsys_osd).info( + "PerShardState::{}, {}", __func__, *op); + auto fut = InterruptorT::make_interruptible( + seastar::yield() + ).then_interruptible([op] { + return op->start().finally([op /* by copy */] { + // ensure the op's lifetime is appropriate. It is not enough to + // guarantee it's alive at the scheduling stages (i.e. `then()` + // calling) but also during the actual execution (i.e. when passed + // lambdas are actually run). + }); + }); + return std::make_pair(std::move(op), std::move(fut)); + } + + // tids for ops i issue, prefixed with core id to ensure uniqueness + ceph_tid_t next_tid; + ceph_tid_t get_tid() { + assert_core(); + return next_tid++; + } + + HeartbeatStampsRef get_hb_stamps(int peer); + std::map<int, HeartbeatStampsRef> heartbeat_stamps; + + // Time state + const ceph::mono_time startup_time; + ceph::signedspan get_mnow() const { + assert_core(); + return ceph::mono_clock::now() - startup_time; + } + +public: + PerShardState( + int whoami, + ceph::mono_time startup_time, + PerfCounters *perf, + PerfCounters *recoverystate_perf, + crimson::os::FuturizedStore &store, + OSDState& osd_state); +}; + +/** + * OSDSingletonState + * + * OSD-wide singleton holding instances that need to be accessible + * from all PGs. + */ +class OSDSingletonState : public md_config_obs_t { + friend class ShardServices; + friend class PGShardManager; + friend class OSD; + using cached_map_t = OSDMapService::cached_map_t; + using local_cached_map_t = OSDMapService::local_cached_map_t; + +public: + OSDSingletonState( + int whoami, + crimson::net::Messenger &cluster_msgr, + crimson::net::Messenger &public_msgr, + crimson::mon::Client &monc, + crimson::mgr::Client &mgrc); + +private: + const int whoami; + + crimson::common::CephContext cct; + PerfCounters *perf = nullptr; + PerfCounters *recoverystate_perf = nullptr; + + SharedLRU<epoch_t, OSDMap> osdmaps; + SimpleLRU<epoch_t, bufferlist, false> map_bl_cache; + + cached_map_t osdmap; + cached_map_t &get_osdmap() { return osdmap; } + void update_map(cached_map_t new_osdmap) { + osdmap = std::move(new_osdmap); + } + + crimson::net::Messenger &cluster_msgr; + crimson::net::Messenger &public_msgr; + + seastar::future<> send_to_osd(int peer, MessageURef m, epoch_t from_epoch); + + crimson::mon::Client &monc; + seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + + crimson::mgr::Client &mgrc; + + std::unique_ptr<OSDMeta> meta_coll; + template <typename... Args> + void init_meta_coll(Args&&... args) { + meta_coll = std::make_unique<OSDMeta>(std::forward<Args>(args)...); + } + OSDMeta &get_meta_coll() { + assert(meta_coll); + return *meta_coll; + } + + OSDSuperblock superblock; + void set_superblock(OSDSuperblock _superblock) { + superblock = std::move(_superblock); + } + + seastar::future<> send_incremental_map( + crimson::net::Connection &conn, + epoch_t first); + + seastar::future<> send_incremental_map_to_osd(int osd, epoch_t first); + + auto get_pool_info(int64_t poolid) { + return get_meta_coll().load_final_pool_info(poolid); + } + + // global pg temp state + struct pg_temp_t { + std::vector<int> acting; + bool forced = false; + }; + std::map<pg_t, pg_temp_t> pg_temp_wanted; + std::map<pg_t, pg_temp_t> pg_temp_pending; + friend std::ostream& operator<<(std::ostream&, const pg_temp_t&); + + void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want, + bool forced = false); + void remove_want_pg_temp(pg_t pgid); + void requeue_pg_temp(); + seastar::future<> send_pg_temp(); + + std::set<pg_t> pg_created; + seastar::future<> send_pg_created(pg_t pgid); + seastar::future<> send_pg_created(); + void prune_pg_created(); + + struct DirectFinisher { + void queue(Context *c) { + c->complete(0); + } + } finisher; + AsyncReserver<spg_t, DirectFinisher> local_reserver; + AsyncReserver<spg_t, DirectFinisher> remote_reserver; + AsyncReserver<spg_t, DirectFinisher> snap_reserver; + + epoch_t up_thru_wanted = 0; + seastar::future<> send_alive(epoch_t want); + + const char** get_tracked_conf_keys() const final; + void handle_conf_change( + const ConfigProxy& conf, + const std::set <std::string> &changed) final; + + seastar::future<local_cached_map_t> get_local_map(epoch_t e); + seastar::future<std::unique_ptr<OSDMap>> load_map(epoch_t e); + seastar::future<bufferlist> load_map_bl(epoch_t e); + seastar::future<std::map<epoch_t, bufferlist>> + load_map_bls(epoch_t first, epoch_t last); + void store_map_bl(ceph::os::Transaction& t, + epoch_t e, bufferlist&& bl); + seastar::future<> store_maps(ceph::os::Transaction& t, + epoch_t start, Ref<MOSDMap> m); +}; + +/** + * Represents services available to each PG + */ +class ShardServices : public OSDMapService { + friend class PGShardManager; + friend class OSD; + using cached_map_t = OSDMapService::cached_map_t; + using local_cached_map_t = OSDMapService::local_cached_map_t; + + PerShardState local_state; + seastar::sharded<OSDSingletonState> &osd_singleton_state; + PGShardMapping& pg_to_shard_mapping; + + template <typename F, typename... Args> + auto with_singleton(F &&f, Args&&... args) { + return osd_singleton_state.invoke_on( + PRIMARY_CORE, + std::forward<F>(f), + std::forward<Args>(args)... + ); + } + +#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ + template <typename... Args> \ + auto FROM_METHOD(Args&&... args) const { \ + return TARGET.TO_METHOD(std::forward<Args>(args)...); \ + } + +#define FORWARD(FROM_METHOD, TO_METHOD, TARGET) \ + template <typename... Args> \ + auto FROM_METHOD(Args&&... args) { \ + return TARGET.TO_METHOD(std::forward<Args>(args)...); \ + } + +#define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state) +#define FORWARD_TO_LOCAL_CONST(METHOD) FORWARD_CONST( \ + METHOD, METHOD, local_state) \ + +#define FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, TARGET) \ + template <typename... Args> \ + auto METHOD(Args&&... args) { \ + return with_singleton( \ + [](auto &local_state, auto&&... args) { \ + return local_state.TARGET( \ + std::forward<decltype(args)>(args)...); \ + }, std::forward<Args>(args)...); \ + } +#define FORWARD_TO_OSD_SINGLETON(METHOD) \ + FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD) + +public: + template <typename... PSSArgs> + ShardServices( + seastar::sharded<OSDSingletonState> &osd_singleton_state, + PGShardMapping& pg_to_shard_mapping, + PSSArgs&&... args) + : local_state(std::forward<PSSArgs>(args)...), + osd_singleton_state(osd_singleton_state), + pg_to_shard_mapping(pg_to_shard_mapping) {} + + FORWARD_TO_OSD_SINGLETON(send_to_osd) + + crimson::os::FuturizedStore::Shard &get_store() { + return local_state.store; + } + + auto remove_pg(spg_t pgid) { + local_state.pg_map.remove_pg(pgid); + return pg_to_shard_mapping.remove_pg(pgid); + } + + crimson::common::CephContext *get_cct() { + return &(local_state.cct); + } + + template <typename T, typename... Args> + auto start_operation(Args&&... args) { + return local_state.start_operation<T>(std::forward<Args>(args)...); + } + + template <typename InterruptorT, typename T, typename... Args> + auto start_operation_may_interrupt(Args&&... args) { + return local_state.start_operation_may_interrupt< + InterruptorT, T>(std::forward<Args>(args)...); + } + + auto &get_registry() { return local_state.registry; } + + // Loggers + PerfCounters &get_recoverystate_perf_logger() { + return *local_state.recoverystate_perf; + } + PerfCounters &get_perf_logger() { + return *local_state.perf; + } + + // Diagnostics + FORWARD_TO_LOCAL_CONST(dump_ops_in_flight); + + // Local PG Management + seastar::future<Ref<PG>> make_pg( + cached_map_t create_map, + spg_t pgid, + bool do_create); + seastar::future<Ref<PG>> handle_pg_create_info( + std::unique_ptr<PGCreateInfo> info); + + using get_or_create_pg_ertr = PGMap::wait_for_pg_ertr; + using get_or_create_pg_ret = get_or_create_pg_ertr::future<Ref<PG>>; + get_or_create_pg_ret get_or_create_pg( + PGMap::PGCreationBlockingEvent::TriggerI&&, + spg_t pgid, + std::unique_ptr<PGCreateInfo> info); + + using wait_for_pg_ertr = PGMap::wait_for_pg_ertr; + using wait_for_pg_ret = wait_for_pg_ertr::future<Ref<PG>>; + wait_for_pg_ret wait_for_pg( + PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid); + seastar::future<Ref<PG>> load_pg(spg_t pgid); + + /// Dispatch and reset ctx transaction + seastar::future<> dispatch_context_transaction( + crimson::os::CollectionRef col, PeeringCtx &ctx); + + /// Dispatch and reset ctx messages + seastar::future<> dispatch_context_messages( + BufferedRecoveryMessages &&ctx); + + /// Dispatch ctx and dispose of context + seastar::future<> dispatch_context( + crimson::os::CollectionRef col, + PeeringCtx &&ctx); + + /// Dispatch ctx and dispose of ctx, transaction must be empty + seastar::future<> dispatch_context( + PeeringCtx &&ctx) { + return dispatch_context({}, std::move(ctx)); + } + + /// Return per-core tid + ceph_tid_t get_tid() { return local_state.get_tid(); } + + /// Return core-local pg count * number of cores + unsigned get_num_local_pgs() const { + return local_state.pg_map.get_pg_count(); + } + + // OSDMapService + cached_map_t get_map() const final { return local_state.get_osdmap(); } + epoch_t get_up_epoch() const final { return local_state.up_epoch; } + seastar::future<cached_map_t> get_map(epoch_t e) final { + return with_singleton( + [](auto &sstate, epoch_t e) { + return sstate.get_local_map( + e + ).then([](auto lmap) { + return seastar::foreign_ptr<local_cached_map_t>(lmap); + }); + }, e).then([](auto fmap) { + return make_local_shared_foreign(std::move(fmap)); + }); + } + + FORWARD_TO_OSD_SINGLETON(get_pool_info) + FORWARD(with_throttle_while, with_throttle_while, local_state.throttler) + + FORWARD_TO_OSD_SINGLETON(send_incremental_map) + FORWARD_TO_OSD_SINGLETON(send_incremental_map_to_osd) + + FORWARD_TO_OSD_SINGLETON(osdmap_subscribe) + FORWARD_TO_OSD_SINGLETON(queue_want_pg_temp) + FORWARD_TO_OSD_SINGLETON(remove_want_pg_temp) + FORWARD_TO_OSD_SINGLETON(requeue_pg_temp) + FORWARD_TO_OSD_SINGLETON(send_pg_created) + FORWARD_TO_OSD_SINGLETON(send_alive) + FORWARD_TO_OSD_SINGLETON(send_pg_temp) + FORWARD_TO_LOCAL_CONST(get_mnow) + FORWARD_TO_LOCAL(get_hb_stamps) + + FORWARD(pg_created, pg_created, local_state.pg_map) + + FORWARD_TO_OSD_SINGLETON_TARGET( + local_update_priority, + local_reserver.update_priority) + FORWARD_TO_OSD_SINGLETON_TARGET( + local_cancel_reservation, + local_reserver.cancel_reservation) + FORWARD_TO_OSD_SINGLETON_TARGET( + local_dump_reservations, + local_reserver.dump) + FORWARD_TO_OSD_SINGLETON_TARGET( + remote_cancel_reservation, + remote_reserver.cancel_reservation) + FORWARD_TO_OSD_SINGLETON_TARGET( + remote_dump_reservations, + remote_reserver.dump) + FORWARD_TO_OSD_SINGLETON_TARGET( + snap_cancel_reservation, + snap_reserver.cancel_reservation) + FORWARD_TO_OSD_SINGLETON_TARGET( + snap_dump_reservations, + snap_reserver.dump) + + Context *invoke_context_on_core(core_id_t core, Context *c) { + if (!c) return nullptr; + return new LambdaContext([core, c](int code) { + std::ignore = seastar::smp::submit_to( + core, + [c, code] { + c->complete(code); + }); + }); + } + seastar::future<> local_request_reservation( + spg_t item, + Context *on_reserved, + unsigned prio, + Context *on_preempt) { + return with_singleton( + [item, prio](OSDSingletonState &singleton, + Context *wrapped_on_reserved, Context *wrapped_on_preempt) { + return singleton.local_reserver.request_reservation( + item, + wrapped_on_reserved, + prio, + wrapped_on_preempt); + }, + invoke_context_on_core(seastar::this_shard_id(), on_reserved), + invoke_context_on_core(seastar::this_shard_id(), on_preempt)); + } + seastar::future<> remote_request_reservation( + spg_t item, + Context *on_reserved, + unsigned prio, + Context *on_preempt) { + return with_singleton( + [item, prio](OSDSingletonState &singleton, + Context *wrapped_on_reserved, Context *wrapped_on_preempt) { + return singleton.remote_reserver.request_reservation( + item, + wrapped_on_reserved, + prio, + wrapped_on_preempt); + }, + invoke_context_on_core(seastar::this_shard_id(), on_reserved), + invoke_context_on_core(seastar::this_shard_id(), on_preempt)); + } + seastar::future<> snap_request_reservation( + spg_t item, + Context *on_reserved, + unsigned prio) { + return with_singleton( + [item, prio](OSDSingletonState &singleton, + Context *wrapped_on_reserved) { + return singleton.snap_reserver.request_reservation( + item, + wrapped_on_reserved, + prio); + }, + invoke_context_on_core(seastar::this_shard_id(), on_reserved)); + } + +#undef FORWARD_CONST +#undef FORWARD +#undef FORWARD_TO_OSD_SINGLETON +#undef FORWARD_TO_LOCAL +#undef FORWARD_TO_LOCAL_CONST +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::osd::OSDSingletonState::pg_temp_t> : fmt::ostream_formatter {}; +#endif |