summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/pg.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/pg.h')
-rw-r--r--src/crimson/osd/pg.h704
1 files changed, 704 insertions, 0 deletions
diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h
new file mode 100644
index 000000000..34676ee7a
--- /dev/null
+++ b/src/crimson/osd/pg.h
@@ -0,0 +1,704 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <boost/smart_ptr/local_shared_ptr.hpp>
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sleep.hh>
+
+#include "common/dout.h"
+#include "crimson/net/Fwd.h"
+#include "messages/MOSDRepOpReply.h"
+#include "messages/MOSDOpReply.h"
+#include "os/Transaction.h"
+#include "osd/osd_types.h"
+#include "crimson/osd/object_context.h"
+#include "osd/PeeringState.h"
+
+#include "crimson/common/type_helpers.h"
+#include "crimson/os/futurized_collection.h"
+#include "crimson/osd/backfill_state.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/pg_recovery.h"
+#include "crimson/osd/pg_recovery_listener.h"
+#include "crimson/osd/recovery_backend.h"
+
+class MQuery;
+class OSDMap;
+class PGBackend;
+class PGPeeringEvent;
+class osd_op_params_t;
+
+namespace recovery {
+ class Context;
+}
+
+namespace crimson::net {
+ class Messenger;
+}
+
+namespace crimson::os {
+ class FuturizedStore;
+}
+
+namespace crimson::osd {
+class ClientRequest;
+class OpsExecuter;
+
+class PG : public boost::intrusive_ref_counter<
+ PG,
+ boost::thread_unsafe_counter>,
+ public PGRecoveryListener,
+ PeeringState::PeeringListener,
+ DoutPrefixProvider
+{
+ using ec_profile_t = std::map<std::string,std::string>;
+ using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+
+ ClientRequest::PGPipeline client_request_pg_pipeline;
+ PeeringEvent::PGPipeline peering_request_pg_pipeline;
+ RepRequest::PGPipeline replicated_request_pg_pipeline;
+
+ spg_t pgid;
+ pg_shard_t pg_whoami;
+ crimson::os::CollectionRef coll_ref;
+ ghobject_t pgmeta_oid;
+
+ seastar::timer<seastar::lowres_clock> check_readable_timer;
+ seastar::timer<seastar::lowres_clock> renew_lease_timer;
+
+public:
+ PG(spg_t pgid,
+ pg_shard_t pg_shard,
+ crimson::os::CollectionRef coll_ref,
+ pg_pool_t&& pool,
+ std::string&& name,
+ cached_map_t osdmap,
+ ShardServices &shard_services,
+ ec_profile_t profile);
+
+ ~PG();
+
+ const pg_shard_t& get_pg_whoami() const final {
+ return pg_whoami;
+ }
+
+ const spg_t& get_pgid() const final {
+ return pgid;
+ }
+
+ PGBackend& get_backend() {
+ return *backend;
+ }
+ const PGBackend& get_backend() const {
+ return *backend;
+ }
+ // EpochSource
+ epoch_t get_osdmap_epoch() const final {
+ return peering_state.get_osdmap_epoch();
+ }
+
+ eversion_t get_pg_trim_to() const {
+ return peering_state.get_pg_trim_to();
+ }
+
+ eversion_t get_min_last_complete_ondisk() const {
+ return peering_state.get_min_last_complete_ondisk();
+ }
+
+ const pg_info_t& get_info() const final {
+ return peering_state.get_info();
+ }
+
+ // DoutPrefixProvider
+ std::ostream& gen_prefix(std::ostream& out) const final {
+ return out << *this;
+ }
+ crimson::common::CephContext *get_cct() const final {
+ return shard_services.get_cct();
+ }
+ unsigned get_subsys() const final {
+ return ceph_subsys_osd;
+ }
+
+ crimson::os::CollectionRef get_collection_ref() {
+ return coll_ref;
+ }
+
+ // PeeringListener
+ void prepare_write(
+ pg_info_t &info,
+ pg_info_t &last_written_info,
+ PastIntervals &past_intervals,
+ PGLog &pglog,
+ bool dirty_info,
+ bool dirty_big_info,
+ bool need_write_epoch,
+ ceph::os::Transaction &t) final;
+
+ void on_info_history_change() final {
+ // Not needed yet -- mainly for scrub scheduling
+ }
+
+ void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
+
+ uint64_t get_snap_trimq_size() const final {
+ return 0;
+ }
+
+ void send_cluster_message(
+ int osd, MessageRef m,
+ epoch_t epoch, bool share_map_update=false) final {
+ (void)shard_services.send_to_osd(osd, m, epoch);
+ }
+
+ void send_pg_created(pg_t pgid) final {
+ (void)shard_services.send_pg_created(pgid);
+ }
+
+ bool try_flush_or_schedule_async() final;
+
+ void start_flush_on_transaction(
+ ceph::os::Transaction &t) final {
+ t.register_on_commit(
+ new LambdaContext([this](int r){
+ peering_state.complete_flush();
+ }));
+ }
+
+ void on_flushed() final {
+ // will be needed for unblocking IO operations/peering
+ }
+
+ template <typename T>
+ void start_peering_event_operation(T &&evt, float delay = 0) {
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ shard_services,
+ pg_whoami,
+ pgid,
+ delay,
+ std::forward<T>(evt));
+ }
+
+ void schedule_event_after(
+ PGPeeringEventRef event,
+ float delay) final {
+ start_peering_event_operation(std::move(*event), delay);
+ }
+ std::vector<pg_shard_t> get_replica_recovery_order() const final {
+ return peering_state.get_replica_recovery_order();
+ }
+ void request_local_background_io_reservation(
+ unsigned priority,
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) final {
+ shard_services.local_reserver.request_reservation(
+ pgid,
+ on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
+ start_peering_event_operation(std::move(*on_grant));
+ }) : nullptr,
+ priority,
+ on_preempt ? make_lambda_context(
+ [this, on_preempt=std::move(on_preempt)] (int) {
+ start_peering_event_operation(std::move(*on_preempt));
+ }) : nullptr);
+ }
+
+ void update_local_background_io_priority(
+ unsigned priority) final {
+ shard_services.local_reserver.update_priority(
+ pgid,
+ priority);
+ }
+
+ void cancel_local_background_io_reservation() final {
+ shard_services.local_reserver.cancel_reservation(
+ pgid);
+ }
+
+ void request_remote_recovery_reservation(
+ unsigned priority,
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) final {
+ shard_services.remote_reserver.request_reservation(
+ pgid,
+ on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
+ start_peering_event_operation(std::move(*on_grant));
+ }) : nullptr,
+ priority,
+ on_preempt ? make_lambda_context(
+ [this, on_preempt=std::move(on_preempt)] (int) {
+ start_peering_event_operation(std::move(*on_preempt));
+ }) : nullptr);
+ }
+
+ void cancel_remote_recovery_reservation() final {
+ shard_services.remote_reserver.cancel_reservation(
+ pgid);
+ }
+
+ void schedule_event_on_commit(
+ ceph::os::Transaction &t,
+ PGPeeringEventRef on_commit) final {
+ t.register_on_commit(
+ make_lambda_context(
+ [this, on_commit=std::move(on_commit)](int) {
+ start_peering_event_operation(std::move(*on_commit));
+ }));
+ }
+
+ void update_heartbeat_peers(set<int> peers) final {
+ // Not needed yet
+ }
+ void set_probe_targets(const set<pg_shard_t> &probe_set) final {
+ // Not needed yet
+ }
+ void clear_probe_targets() final {
+ // Not needed yet
+ }
+ void queue_want_pg_temp(const std::vector<int> &wanted) final {
+ shard_services.queue_want_pg_temp(pgid.pgid, wanted);
+ }
+ void clear_want_pg_temp() final {
+ shard_services.remove_want_pg_temp(pgid.pgid);
+ }
+ void publish_stats_to_osd() final {
+ if (!is_primary())
+ return;
+
+ (void) peering_state.prepare_stats_for_publish(
+ false,
+ pg_stat_t(),
+ object_stat_collection_t());
+ }
+ void clear_publish_stats() final {
+ // Not needed yet
+ }
+ void check_recovery_sources(const OSDMapRef& newmap) final {
+ // Not needed yet
+ }
+ void check_blocklisted_watchers() final {
+ // Not needed yet
+ }
+ void clear_primary_state() final {
+ // Not needed yet
+ }
+
+ void queue_check_readable(epoch_t last_peering_reset,
+ ceph::timespan delay) final;
+ void recheck_readable() final;
+
+ unsigned get_target_pg_log_entries() const final;
+
+ void on_pool_change() final {
+ // Not needed yet
+ }
+ void on_role_change() final {
+ // Not needed yet
+ }
+ void on_change(ceph::os::Transaction &t) final;
+ void on_activate(interval_set<snapid_t> to_trim) final;
+ void on_activate_complete() final;
+ void on_new_interval() final {
+ // Not needed yet
+ }
+ Context *on_clean() final {
+ // Not needed yet (will be needed for IO unblocking)
+ return nullptr;
+ }
+ void on_activate_committed() final {
+ // Not needed yet (will be needed for IO unblocking)
+ }
+ void on_active_exit() final {
+ // Not needed yet
+ }
+
+ void on_removal(ceph::os::Transaction &t) final {
+ // TODO
+ }
+ std::pair<ghobject_t, bool>
+ do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final;
+
+ // merge/split not ready
+ void clear_ready_to_merge() final {}
+ void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {}
+ void set_not_ready_to_merge_source(pg_t pgid) final {}
+ void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {}
+ void set_ready_to_merge_source(eversion_t lu) final {}
+
+ void on_active_actmap() final {
+ // Not needed yet
+ }
+ void on_active_advmap(const OSDMapRef &osdmap) final {
+ // Not needed yet
+ }
+ epoch_t oldest_stored_osdmap() final {
+ // TODO
+ return 0;
+ }
+
+ void on_backfill_reserved() final {
+ recovery_handler->on_backfill_reserved();
+ }
+ void on_backfill_canceled() final {
+ ceph_assert(0 == "Not implemented");
+ }
+
+ void on_recovery_reserved() final {
+ recovery_handler->start_pglogbased_recovery();
+ }
+
+
+ bool try_reserve_recovery_space(
+ int64_t primary_num_bytes, int64_t local_num_bytes) final {
+ // TODO
+ return true;
+ }
+ void unreserve_recovery_space() final {}
+
+ struct PGLogEntryHandler : public PGLog::LogEntryHandler {
+ PG *pg;
+ ceph::os::Transaction *t;
+ PGLogEntryHandler(PG *pg, ceph::os::Transaction *t) : pg(pg), t(t) {}
+
+ // LogEntryHandler
+ void remove(const hobject_t &hoid) override {
+ // TODO
+ }
+ void try_stash(const hobject_t &hoid, version_t v) override {
+ // TODO
+ }
+ void rollback(const pg_log_entry_t &entry) override {
+ // TODO
+ }
+ void rollforward(const pg_log_entry_t &entry) override {
+ // TODO
+ }
+ void trim(const pg_log_entry_t &entry) override {
+ // TODO
+ }
+ };
+ PGLog::LogEntryHandlerRef get_log_handler(
+ ceph::os::Transaction &t) final {
+ return std::make_unique<PG::PGLogEntryHandler>(this, &t);
+ }
+
+ void rebuild_missing_set_with_deletes(PGLog &pglog) final {
+ ceph_assert(0 == "Impossible for crimson");
+ }
+
+ PerfCounters &get_peering_perf() final {
+ return shard_services.get_recoverystate_perf_logger();
+ }
+ PerfCounters &get_perf_logger() final {
+ return shard_services.get_perf_logger();
+ }
+
+ void log_state_enter(const char *state) final;
+ void log_state_exit(
+ const char *state_name, utime_t enter_time,
+ uint64_t events, utime_t event_dur) final;
+
+ void dump_recovery_info(Formatter *f) const final {
+ }
+
+ OstreamTemp get_clog_info() final {
+ // not needed yet: replace with not a stub (needs to be wired up to monc)
+ return OstreamTemp(CLOG_INFO, nullptr);
+ }
+ OstreamTemp get_clog_debug() final {
+ // not needed yet: replace with not a stub (needs to be wired up to monc)
+ return OstreamTemp(CLOG_DEBUG, nullptr);
+ }
+ OstreamTemp get_clog_error() final {
+ // not needed yet: replace with not a stub (needs to be wired up to monc)
+ return OstreamTemp(CLOG_ERROR, nullptr);
+ }
+
+ ceph::signedspan get_mnow() final;
+ HeartbeatStampsRef get_hb_stamps(int peer) final;
+ void schedule_renew_lease(epoch_t plr, ceph::timespan delay) final;
+
+
+ // Utility
+ bool is_primary() const final {
+ return peering_state.is_primary();
+ }
+ bool is_nonprimary() const {
+ return peering_state.is_nonprimary();
+ }
+ bool is_peered() const final {
+ return peering_state.is_peered();
+ }
+ bool is_recovering() const final {
+ return peering_state.is_recovering();
+ }
+ bool is_backfilling() const final {
+ return peering_state.is_backfilling();
+ }
+ pg_stat_t get_stats() {
+ auto stats = peering_state.prepare_stats_for_publish(
+ false,
+ pg_stat_t(),
+ object_stat_collection_t());
+ ceph_assert(stats);
+ return *stats;
+ }
+ bool get_need_up_thru() const {
+ return peering_state.get_need_up_thru();
+ }
+ epoch_t get_same_interval_since() const {
+ return get_info().history.same_interval_since;
+ }
+
+ const auto& get_pool() const {
+ return peering_state.get_pool();
+ }
+ pg_shard_t get_primary() const {
+ return peering_state.get_primary();
+ }
+
+ /// initialize created PG
+ void init(
+ int role,
+ const std::vector<int>& up,
+ int up_primary,
+ const std::vector<int>& acting,
+ int acting_primary,
+ const pg_history_t& history,
+ const PastIntervals& pim,
+ bool backfill,
+ ceph::os::Transaction &t);
+
+ seastar::future<> read_state(crimson::os::FuturizedStore* store);
+
+ void do_peering_event(
+ PGPeeringEvent& evt, PeeringCtx &rctx);
+
+ void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
+ void handle_activate_map(PeeringCtx &rctx);
+ void handle_initialize(PeeringCtx &rctx);
+
+ static hobject_t get_oid(const MOSDOp &m);
+ static RWState::State get_lock_type(const OpInfo &op_info);
+ static std::optional<hobject_t> resolve_oid(
+ const SnapSet &snapset,
+ const hobject_t &oid);
+
+ using load_obc_ertr = crimson::errorator<
+ crimson::ct_error::object_corrupted>;
+
+ load_obc_ertr::future<crimson::osd::ObjectContextRef>
+ load_head_obc(ObjectContextRef obc);
+
+ load_obc_ertr::future<>
+ reload_obc(crimson::osd::ObjectContext& obc) const;
+
+public:
+ using with_obc_func_t =
+ std::function<load_obc_ertr::future<> (ObjectContextRef)>;
+
+ template<RWState::State State>
+ load_obc_ertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func);
+
+ load_obc_ertr::future<> with_locked_obc(
+ Ref<MOSDOp> &m,
+ const OpInfo &op_info,
+ Operation *op,
+ with_obc_func_t&& f);
+
+ seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
+ void handle_rep_op_reply(crimson::net::ConnectionRef conn,
+ const MOSDRepOpReply& m);
+
+ void print(std::ostream& os) const;
+ void dump_primary(Formatter*);
+
+private:
+ template<RWState::State State>
+ load_obc_ertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func);
+
+ load_obc_ertr::future<ObjectContextRef> get_locked_obc(
+ Operation *op,
+ const hobject_t &oid,
+ RWState::State type);
+
+ void do_peering_event(
+ const boost::statechart::event_base &evt,
+ PeeringCtx &rctx);
+ osd_op_params_t&& fill_op_params_bump_pg_version(
+ osd_op_params_t&& osd_op_p,
+ Ref<MOSDOp> m,
+ const bool user_modify);
+ seastar::future<Ref<MOSDOpReply>> handle_failed_op(
+ const std::error_code& e,
+ ObjectContextRef obc,
+ const OpsExecuter& ox,
+ const MOSDOp& m) const;
+ seastar::future<Ref<MOSDOpReply>> do_osd_ops(
+ Ref<MOSDOp> m,
+ ObjectContextRef obc,
+ const OpInfo &op_info);
+ seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
+ seastar::future<> submit_transaction(const OpInfo& op_info,
+ const std::vector<OSDOp>& ops,
+ ObjectContextRef&& obc,
+ ceph::os::Transaction&& txn,
+ const osd_op_params_t& oop);
+
+private:
+ OSDMapGate osdmap_gate;
+ ShardServices &shard_services;
+
+ cached_map_t osdmap;
+
+public:
+ cached_map_t get_osdmap() { return osdmap; }
+ eversion_t next_version() {
+ return eversion_t(get_osdmap_epoch(),
+ ++projected_last_update.version);
+ }
+ ShardServices& get_shard_services() final {
+ return shard_services;
+ }
+ seastar::future<> stop();
+
+private:
+ std::unique_ptr<PGBackend> backend;
+ std::unique_ptr<RecoveryBackend> recovery_backend;
+ std::unique_ptr<PGRecovery> recovery_handler;
+
+ PeeringState peering_state;
+ eversion_t projected_last_update;
+public:
+ RecoveryBackend* get_recovery_backend() final {
+ return recovery_backend.get();
+ }
+ PGRecovery* get_recovery_handler() final {
+ return recovery_handler.get();
+ }
+ PeeringState& get_peering_state() final {
+ return peering_state;
+ }
+ bool has_reset_since(epoch_t epoch) const final {
+ return peering_state.pg_has_reset_since(epoch);
+ }
+
+ const pg_missing_tracker_t& get_local_missing() const {
+ return peering_state.get_pg_log().get_missing();
+ }
+ epoch_t get_last_peering_reset() const final {
+ return peering_state.get_last_peering_reset();
+ }
+ const set<pg_shard_t> &get_acting_recovery_backfill() const {
+ return peering_state.get_acting_recovery_backfill();
+ }
+ bool is_backfill_target(pg_shard_t osd) const {
+ return peering_state.is_backfill_target(osd);
+ }
+ void begin_peer_recover(pg_shard_t peer, const hobject_t oid) {
+ peering_state.begin_peer_recover(peer, oid);
+ }
+ uint64_t min_peer_features() const {
+ return peering_state.get_min_peer_features();
+ }
+ const map<hobject_t, set<pg_shard_t>>&
+ get_missing_loc_shards() const {
+ return peering_state.get_missing_loc().get_missing_locs();
+ }
+ const map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
+ return peering_state.get_peer_missing();
+ }
+ const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
+ if (shard == pg_whoami)
+ return &get_local_missing();
+ else {
+ auto it = peering_state.get_peer_missing().find(shard);
+ if (it == peering_state.get_peer_missing().end())
+ return nullptr;
+ else
+ return &it->second;
+ }
+ }
+ int get_recovery_op_priority() const {
+ int64_t pri = 0;
+ get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
+ return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
+ }
+ seastar::future<> mark_unfound_lost(int) {
+ // TODO: see PrimaryLogPG::mark_all_unfound_lost()
+ return seastar::now();
+ }
+
+private:
+ // instead of seastar::gate, we use a boolean flag to indicate
+ // whether the system is shutting down, as we don't need to track
+ // continuations here.
+ bool stopping = false;
+
+ class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
+ PG *pg;
+
+ const spg_t pgid;
+ seastar::shared_promise<> p;
+
+ protected:
+ void dump_detail(Formatter *f) const;
+
+ public:
+ static constexpr const char *type_name = "WaitForActiveBlocker";
+
+ WaitForActiveBlocker(PG *pg) : pg(pg) {}
+ void on_active();
+ blocking_future<> wait();
+ seastar::future<> stop();
+ } wait_for_active_blocker;
+
+ friend std::ostream& operator<<(std::ostream&, const PG& pg);
+ friend class ClientRequest;
+ friend class PGAdvanceMap;
+ friend class PeeringEvent;
+ friend class RepRequest;
+ friend class BackfillRecovery;
+ friend struct PGFacade;
+private:
+ seastar::future<bool> find_unfound() {
+ return seastar::make_ready_future<bool>(true);
+ }
+
+ template <typename MsgType>
+ bool can_discard_replica_op(const MsgType& m) const;
+ bool can_discard_op(const MOSDOp& m) const;
+ bool is_missing_object(const hobject_t& soid) const {
+ return peering_state.get_pg_log().get_missing().get_items().count(soid);
+ }
+ bool is_unreadable_object(const hobject_t &oid,
+ eversion_t* v = 0) const final {
+ return is_missing_object(oid) ||
+ !peering_state.get_missing_loc().readable_with_acting(
+ oid, get_actingset(), v);
+ }
+ bool is_degraded_or_backfilling_object(const hobject_t& soid) const;
+ const set<pg_shard_t> &get_actingset() const {
+ return peering_state.get_actingset();
+ }
+
+private:
+ BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline;
+};
+
+std::ostream& operator<<(std::ostream&, const PG& pg);
+
+}