From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/crimson/osd/shard_services.h | 215 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 src/crimson/osd/shard_services.h (limited to 'src/crimson/osd/shard_services.h') diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h new file mode 100644 index 000000000..2957639c6 --- /dev/null +++ b/src/crimson/osd/shard_services.h @@ -0,0 +1,215 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "include/common_fwd.h" +#include "osd_operation.h" +#include "msg/MessageRef.h" +#include "crimson/common/exception.h" +#include "crimson/os/futurized_collection.h" +#include "osd/PeeringState.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/osd/object_context.h" +#include "common/AsyncReserver.h" + +namespace crimson::net { + class Messenger; +} + +namespace crimson::mgr { + class Client; +} + +namespace crimson::mon { + class Client; +} + +namespace crimson::os { + class FuturizedStore; +} + +class OSDMap; +class PeeringCtx; +class BufferedRecoveryMessages; + +namespace crimson::osd { + +/** + * Represents services available to each PG + */ +class ShardServices : public md_config_obs_t { + using cached_map_t = boost::local_shared_ptr; + OSDMapService &osdmap_service; + const int whoami; + crimson::net::Messenger &cluster_msgr; + crimson::net::Messenger &public_msgr; + crimson::mon::Client &monc; + crimson::mgr::Client &mgrc; + crimson::os::FuturizedStore &store; + + crimson::common::CephContext cct; + + PerfCounters *perf = nullptr; + PerfCounters *recoverystate_perf = nullptr; + + const char** get_tracked_conf_keys() const final; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) final; +public: + ShardServices( + OSDMapService &osdmap_service, + const int whoami, + crimson::net::Messenger &cluster_msgr, + crimson::net::Messenger &public_msgr, + crimson::mon::Client &monc, + crimson::mgr::Client &mgrc, + crimson::os::FuturizedStore &store); + + seastar::future<> send_to_osd( + int peer, + MessageRef m, + epoch_t from_epoch); + + crimson::os::FuturizedStore &get_store() { + return store; + } + + crimson::common::CephContext *get_cct() { + return &cct; + } + + // OSDMapService + const OSDMapService &get_osdmap_service() const { + return osdmap_service; + } + + // Op Management + OperationRegistry registry; + OperationThrottler throttler; + + template + auto start_operation(Args&&... args) { + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + auto op = registry.create_operation(std::forward(args)...); + return std::make_pair(op, op->start()); + } + + seastar::future<> stop() { + stopping = true; + return registry.stop(); + } + + // Loggers + PerfCounters &get_recoverystate_perf_logger() { + return *recoverystate_perf; + } + PerfCounters &get_perf_logger() { + return *perf; + } + + /// Dispatch and reset ctx transaction + seastar::future<> dispatch_context_transaction( + crimson::os::CollectionRef col, PeeringCtx &ctx); + + /// Dispatch and reset ctx messages + seastar::future<> dispatch_context_messages( + BufferedRecoveryMessages &&ctx); + + /// Dispatch ctx and dispose of context + seastar::future<> dispatch_context( + crimson::os::CollectionRef col, + PeeringCtx &&ctx); + + /// Dispatch ctx and dispose of ctx, transaction must be empty + seastar::future<> dispatch_context( + PeeringCtx &&ctx) { + return dispatch_context({}, std::move(ctx)); + } + + // PG Temp State +private: + // TODO: hook into map processing and some kind of heartbeat/peering + // message processing + struct pg_temp_t { + std::vector acting; + bool forced = false; + }; + map pg_temp_wanted; + map pg_temp_pending; + friend std::ostream& operator<<(std::ostream&, const pg_temp_t&); +public: + void queue_want_pg_temp(pg_t pgid, const vector& want, + bool forced = false); + void remove_want_pg_temp(pg_t pgid); + void requeue_pg_temp(); + seastar::future<> send_pg_temp(); + + // Shard-local OSDMap +private: + cached_map_t osdmap; +public: + void update_map(cached_map_t new_osdmap); + cached_map_t &get_osdmap(); + + // PG Created State +private: + set pg_created; +public: + seastar::future<> send_pg_created(pg_t pgid); + seastar::future<> send_pg_created(); + void prune_pg_created(); + + unsigned get_pg_num() const { + return num_pgs; + } + void inc_pg_num() { + ++num_pgs; + } + void dec_pg_num() { + --num_pgs; + } + + seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + + // Time state + ceph::mono_time startup_time = ceph::mono_clock::now(); + ceph::signedspan get_mnow() const { + return ceph::mono_clock::now() - startup_time; + } + HeartbeatStampsRef get_hb_stamps(int peer); + std::map heartbeat_stamps; + + crimson::osd::ObjectContextRegistry obc_registry; + + // Async Reservers +private: + unsigned num_pgs = 0; + + struct DirectFinisher { + void queue(Context *c) { + c->complete(0); + } + } finisher; + // prevent creating new osd operations when system is shutting down, + // this is necessary because there are chances that a new operation + // is created, after the interruption of all ongoing operations, and + // creats and waits on a new and may-never-resolve future, in which + // case the shutdown may never succeed. + bool stopping = false; +public: + AsyncReserver local_reserver; + AsyncReserver remote_reserver; + +private: + epoch_t up_thru_wanted = 0; +public: + seastar::future<> send_alive(epoch_t want); +}; + +} -- cgit v1.2.3