// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #pragma once #include #include #include #include "crimson/osd/shard_services.h" #include "crimson/osd/pg_map.h" namespace crimson::os { class FuturizedStore; } namespace crimson::osd { /** * PGShardManager * * Manages all state required to partition PGs over seastar reactors * as well as state required to route messages to pgs. Mediates access to * shared resources required by PGs (objectstore, messenger, monclient, * etc) */ class PGShardManager { seastar::sharded &osd_singleton_state; seastar::sharded &shard_services; seastar::sharded &pg_to_shard_mapping; #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \ template \ auto FROM_METHOD(Args&&... args) const { \ return TARGET.TO_METHOD(std::forward(args)...); \ } #define FORWARD(FROM_METHOD, TO_METHOD, TARGET) \ template \ auto FROM_METHOD(Args&&... args) { \ return TARGET.TO_METHOD(std::forward(args)...); \ } #define FORWARD_TO_OSD_SINGLETON(METHOD) \ FORWARD(METHOD, METHOD, get_osd_singleton_state()) public: using cached_map_t = OSDMapService::cached_map_t; using local_cached_map_t = OSDMapService::local_cached_map_t; PGShardManager( seastar::sharded &osd_singleton_state, seastar::sharded &shard_services, seastar::sharded &pg_to_shard_mapping) : osd_singleton_state(osd_singleton_state), shard_services(shard_services), pg_to_shard_mapping(pg_to_shard_mapping) {} auto &get_osd_singleton_state() { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); return osd_singleton_state.local(); } auto &get_osd_singleton_state() const { ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); return osd_singleton_state.local(); } auto &get_shard_services() { return shard_services.local(); } auto &get_shard_services() const { return shard_services.local(); } auto &get_local_state() { return get_shard_services().local_state; } auto &get_local_state() const { return get_shard_services().local_state; } auto &get_pg_to_shard_mapping() { return pg_to_shard_mapping.local(); } auto &get_pg_to_shard_mapping() const { return pg_to_shard_mapping.local(); } seastar::future<> update_map(local_cached_map_t &&map) { get_osd_singleton_state().update_map( make_local_shared_foreign(local_cached_map_t(map)) ); /* We need each core to get its own foreign_ptr. * foreign_ptr can't be cheaply copied, so we make one for each core * up front. */ return seastar::do_with( std::vector>(), [this, map](auto &fmaps) { fmaps.resize(seastar::smp::count); for (auto &i: fmaps) { i = seastar::foreign_ptr(map); } return shard_services.invoke_on_all( [&fmaps](auto &local) mutable { local.local_state.update_map( make_local_shared_foreign( std::move(fmaps[seastar::this_shard_id()]) )); }); }); } seastar::future<> stop_registries() { return shard_services.invoke_on_all([](auto &local) { return local.local_state.stop_registry(); }); } FORWARD_TO_OSD_SINGLETON(send_pg_created) // osd state forwards FORWARD(is_active, is_active, get_shard_services().local_state.osd_state) FORWARD(is_preboot, is_preboot, get_shard_services().local_state.osd_state) FORWARD(is_booting, is_booting, get_shard_services().local_state.osd_state) FORWARD(is_stopping, is_stopping, get_shard_services().local_state.osd_state) FORWARD(is_prestop, is_prestop, get_shard_services().local_state.osd_state) FORWARD(is_initializing, is_initializing, get_shard_services().local_state.osd_state) FORWARD(set_prestop, set_prestop, get_shard_services().local_state.osd_state) FORWARD(set_preboot, set_preboot, get_shard_services().local_state.osd_state) FORWARD(set_booting, set_booting, get_shard_services().local_state.osd_state) FORWARD(set_stopping, set_stopping, get_shard_services().local_state.osd_state) FORWARD(set_active, set_active, get_shard_services().local_state.osd_state) FORWARD(when_active, when_active, get_shard_services().local_state.osd_state) FORWARD_CONST(get_osd_state_string, to_string, get_shard_services().local_state.osd_state) FORWARD(got_map, got_map, get_shard_services().local_state.osdmap_gate) FORWARD(wait_for_map, wait_for_map, get_shard_services().local_state.osdmap_gate) // Metacoll FORWARD_TO_OSD_SINGLETON(init_meta_coll) FORWARD_TO_OSD_SINGLETON(get_meta_coll) FORWARD_TO_OSD_SINGLETON(set_superblock) // Core OSDMap methods FORWARD_TO_OSD_SINGLETON(get_local_map) FORWARD_TO_OSD_SINGLETON(load_map_bl) FORWARD_TO_OSD_SINGLETON(load_map_bls) FORWARD_TO_OSD_SINGLETON(store_maps) seastar::future<> set_up_epoch(epoch_t e); template auto with_remote_shard_state(core_id_t core, F &&f) { return shard_services.invoke_on( core, [f=std::move(f)](auto &target_shard_services) mutable { return std::invoke( std::move(f), target_shard_services.local_state, target_shard_services); }); } template auto with_remote_shard_state_and_op( core_id_t core, typename T::IRef &&op, F &&f) { if (seastar::this_shard_id() == core) { auto &target_shard_services = shard_services.local(); return std::invoke( std::move(f), target_shard_services.local_state, target_shard_services, std::move(op)); } return op->prepare_remote_submission( ).then([op=std::move(op), f=std::move(f), this, core ](auto f_conn) mutable { return shard_services.invoke_on( core, [f=std::move(f), op=std::move(op), f_conn=std::move(f_conn) ](auto &target_shard_services) mutable { op->finish_remote_submission(std::move(f_conn)); return std::invoke( std::move(f), target_shard_services.local_state, target_shard_services, std::move(op)); }); }); } /// Runs opref on the appropriate core, creating the pg as necessary. template seastar::future<> run_with_pg_maybe_create( typename T::IRef op ) { ceph_assert(op->use_count() == 1); auto &logger = crimson::get_logger(ceph_subsys_osd); static_assert(T::can_create()); logger.debug("{}: can_create", *op); get_local_state().registry.remove_from_registry(*op); return get_pg_to_shard_mapping().maybe_create_pg( op->get_pgid() ).then([this, op = std::move(op)](auto core) mutable { return this->template with_remote_shard_state_and_op( core, std::move(op), [](PerShardState &per_shard_state, ShardServices &shard_services, typename T::IRef op) { per_shard_state.registry.add_to_registry(*op); auto &logger = crimson::get_logger(ceph_subsys_osd); auto &opref = *op; return opref.template with_blocking_event< PGMap::PGCreationBlockingEvent >([&shard_services, &opref]( auto &&trigger) { return shard_services.get_or_create_pg( std::move(trigger), opref.get_pgid(), std::move(opref.get_create_info()) ); }).safe_then([&logger, &shard_services, &opref](Ref pgref) { logger.debug("{}: have_pg", opref); return opref.with_pg(shard_services, pgref); }).handle_error( crimson::ct_error::ecanceled::handle([&logger, &opref](auto) { logger.debug("{}: pg creation canceled, dropping", opref); return seastar::now(); }) ).then([op=std::move(op)] {}); }); }); } /// Runs opref on the appropriate core, waiting for pg as necessary template seastar::future<> run_with_pg_maybe_wait( typename T::IRef op ) { ceph_assert(op->use_count() == 1); auto &logger = crimson::get_logger(ceph_subsys_osd); static_assert(!T::can_create()); logger.debug("{}: !can_create", *op); get_local_state().registry.remove_from_registry(*op); return get_pg_to_shard_mapping().maybe_create_pg( op->get_pgid() ).then([this, op = std::move(op)](auto core) mutable { return this->template with_remote_shard_state_and_op( core, std::move(op), [](PerShardState &per_shard_state, ShardServices &shard_services, typename T::IRef op) { per_shard_state.registry.add_to_registry(*op); auto &logger = crimson::get_logger(ceph_subsys_osd); auto &opref = *op; return opref.template with_blocking_event< PGMap::PGCreationBlockingEvent >([&shard_services, &opref]( auto &&trigger) { return shard_services.wait_for_pg( std::move(trigger), opref.get_pgid()); }).safe_then([&logger, &shard_services, &opref](Ref pgref) { logger.debug("{}: have_pg", opref); return opref.with_pg(shard_services, pgref); }).handle_error( crimson::ct_error::ecanceled::handle([&logger, &opref](auto) { logger.debug("{}: pg creation canceled, dropping", opref); return seastar::now(); }) ).then([op=std::move(op)] {}); }); }); } seastar::future<> load_pgs(crimson::os::FuturizedStore& store); seastar::future<> stop_pgs(); seastar::future> get_pg_stats() const; /** * invoke_method_on_each_shard_seq * * Invokes shard_services method on each shard sequentially. */ template seastar::future<> invoke_on_each_shard_seq( F &&f) const { return sharded_map_seq( shard_services, [f=std::forward(f)](const ShardServices &shard_services) mutable { return std::invoke( f, shard_services); }); } /** * for_each_pg * * Invokes f on each pg sequentially. Caller may rely on f not being * invoked concurrently on multiple cores. */ template seastar::future<> for_each_pg(F &&f) const { return invoke_on_each_shard_seq( [f=std::move(f)](const auto &local_service) mutable { for (auto &pg: local_service.local_state.pg_map.get_pgs()) { std::apply(f, pg); } return seastar::now(); }); } /** * for_each_pgid * * Syncronously invokes f on each pgid */ template void for_each_pgid(F &&f) const { return get_pg_to_shard_mapping().for_each_pgid( std::forward(f)); } auto get_num_pgs() const { return get_pg_to_shard_mapping().get_num_pgs(); } seastar::future<> broadcast_map_to_pgs(epoch_t epoch); template auto with_pg(spg_t pgid, F &&f) { core_id_t core = get_pg_to_shard_mapping().get_pg_mapping(pgid); return with_remote_shard_state( core, [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable { return std::invoke( std::move(f), local_state.pg_map.get_pg(pgid)); }); } template auto start_pg_operation(Args&&... args) { auto op = get_local_state().registry.create_operation( std::forward(args)...); auto &logger = crimson::get_logger(ceph_subsys_osd); logger.debug("{}: starting {}", *op, __func__); auto &opref = *op; auto id = op->get_id(); if constexpr (T::is_trackable) { op->template track_event(); } auto fut = opref.template enter_stage<>( opref.get_connection_pipeline().await_active ).then([this, &opref, &logger] { logger.debug("{}: start_pg_operation in await_active stage", opref); return get_shard_services().local_state.osd_state.when_active(); }).then([&logger, &opref] { logger.debug("{}: start_pg_operation active, entering await_map", opref); return opref.template enter_stage<>( opref.get_connection_pipeline().await_map); }).then([this, &logger, &opref] { logger.debug("{}: start_pg_operation await_map stage", opref); using OSDMapBlockingEvent = OSD_OSDMapGate::OSDMapBlocker::BlockingEvent; return opref.template with_blocking_event( [this, &opref](auto &&trigger) { std::ignore = this; return get_shard_services().local_state.osdmap_gate.wait_for_map( std::move(trigger), opref.get_epoch(), &get_shard_services()); }); }).then([&logger, &opref](auto epoch) { logger.debug("{}: got map {}, entering get_pg", opref, epoch); return opref.template enter_stage<>( opref.get_connection_pipeline().get_pg); }).then([this, &logger, &opref, op=std::move(op)]() mutable { logger.debug("{}: in get_pg core {}", opref, seastar::this_shard_id()); logger.debug("{}: in get_pg", opref); if constexpr (T::can_create()) { logger.debug("{}: can_create", opref); return run_with_pg_maybe_create(std::move(op)); } else { logger.debug("{}: !can_create", opref); return run_with_pg_maybe_wait(std::move(op)); } }); return std::make_pair(id, std::move(fut)); } #undef FORWARD #undef FORWARD_CONST #undef FORWARD_TO_OSD_SINGLETON }; }