path: root/src/osd/PrimaryLogPG.h
diff options
Diffstat (limited to '')
1 files changed, 1969 insertions, 0 deletions
diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h
new file mode 100644
index 000000000..68cdec24e
--- /dev/null
+++ b/src/osd/PrimaryLogPG.h
@@ -0,0 +1,1969 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <>
+ * Copyright (C) 2013 Cloudwatt <>
+ *
+ * Author: Loic Dachary <>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include <boost/tuple/tuple.hpp>
+#include "include/ceph_assert.h"
+#include "DynamicPerfStats.h"
+#include "OSD.h"
+#include "PG.h"
+#include "Watch.h"
+#include "TierAgentState.h"
+#include "messages/MOSDOpReply.h"
+#include "common/Checksummer.h"
+#include "common/sharedptr_registry.hpp"
+#include "common/shared_cache.hpp"
+#include "ReplicatedBackend.h"
+#include "PGTransaction.h"
+#include "cls/cas/cls_cas_ops.h"
+class CopyFromCallback;
+class PromoteCallback;
+struct RefCountCallback;
+class PrimaryLogPG;
+class PGLSFilter;
+class HitSet;
+struct TierAgentState;
+class OSDService;
+void intrusive_ptr_add_ref(PrimaryLogPG *pg);
+void intrusive_ptr_release(PrimaryLogPG *pg);
+uint64_t get_with_id(PrimaryLogPG *pg);
+void put_with_id(PrimaryLogPG *pg, uint64_t id);
+ typedef TrackedIntPtr<PrimaryLogPG> PrimaryLogPGRef;
+ typedef boost::intrusive_ptr<PrimaryLogPG> PrimaryLogPGRef;
+struct inconsistent_snapset_wrapper;
+class PrimaryLogPG : public PG, public PGBackend::Listener {
+ friend class OSD;
+ friend class Watch;
+ friend class PrimaryLogScrub;
+ /*
+ * state associated with a copy operation
+ */
+ struct OpContext;
+ class CopyCallback;
+ /**
+ * CopyResults stores the object metadata of interest to a copy initiator.
+ */
+ struct CopyResults {
+ ceph::real_time mtime; ///< the copy source's mtime
+ uint64_t object_size; ///< the copied object's size
+ bool started_temp_obj; ///< true if the callback needs to delete temp object
+ hobject_t temp_oid; ///< temp object (if any)
+ /**
+ * Function to fill in transaction; if non-empty the callback
+ * must execute it before any other accesses to the object
+ * (in order to complete the copy).
+ */
+ std::function<void(PGTransaction *)> fill_in_final_tx;
+ version_t user_version; ///< The copy source's user version
+ bool should_requeue; ///< op should be requeued on cancel
+ std::vector<snapid_t> snaps; ///< src's snaps (if clone)
+ snapid_t snap_seq; ///< src's snap_seq (if head)
+ librados::snap_set_t snapset; ///< src snapset (if head)
+ bool mirror_snapset;
+ bool has_omap;
+ uint32_t flags; // object_copy_data_t::FLAG_*
+ uint32_t source_data_digest, source_omap_digest;
+ uint32_t data_digest, omap_digest;
+ mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
+ mempool::osd_pglog::map<uint32_t, int> reqid_return_codes; // std::map reqids by index to error code
+ std::map<std::string, ceph::buffer::list> attrs; // xattrs
+ uint64_t truncate_seq;
+ uint64_t truncate_size;
+ bool is_data_digest() {
+ return flags & object_copy_data_t::FLAG_DATA_DIGEST;
+ }
+ bool is_omap_digest() {
+ return flags & object_copy_data_t::FLAG_OMAP_DIGEST;
+ }
+ CopyResults()
+ : object_size(0), started_temp_obj(false),
+ user_version(0),
+ should_requeue(false), mirror_snapset(false),
+ has_omap(false),
+ flags(0),
+ source_data_digest(-1), source_omap_digest(-1),
+ data_digest(-1), omap_digest(-1),
+ truncate_seq(0), truncate_size(0)
+ {}
+ };
+ struct CopyOp;
+ typedef std::shared_ptr<CopyOp> CopyOpRef;
+ struct CopyOp {
+ CopyCallback *cb;
+ ObjectContextRef obc;
+ hobject_t src;
+ object_locator_t oloc;
+ unsigned flags;
+ bool mirror_snapset;
+ CopyResults results;
+ ceph_tid_t objecter_tid;
+ ceph_tid_t objecter_tid2;
+ object_copy_cursor_t cursor;
+ std::map<std::string,ceph::buffer::list> attrs;
+ ceph::buffer::list data;
+ ceph::buffer::list omap_header;
+ ceph::buffer::list omap_data;
+ int rval;
+ object_copy_cursor_t temp_cursor;
+ /*
+ * For CopyOp the process is:
+ * step1: read the data(attr/omap/data) from the source object
+ * step2: handle those data(w/ those data create a new object)
+ * src_obj_fadvise_flags used in step1;
+ * dest_obj_fadvise_flags used in step2
+ */
+ unsigned src_obj_fadvise_flags;
+ unsigned dest_obj_fadvise_flags;
+ std::map<uint64_t, CopyOpRef> chunk_cops;
+ int num_chunk;
+ bool failed;
+ uint64_t start_offset = 0;
+ uint64_t last_offset = 0;
+ std::vector<OSDOp> chunk_ops;
+ CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
+ object_locator_t l,
+ version_t v,
+ unsigned f,
+ bool ms,
+ unsigned src_obj_fadvise_flags,
+ unsigned dest_obj_fadvise_flags)
+ : cb(cb_), obc(_obc), src(s), oloc(l), flags(f),
+ mirror_snapset(ms),
+ objecter_tid(0),
+ objecter_tid2(0),
+ rval(-1),
+ src_obj_fadvise_flags(src_obj_fadvise_flags),
+ dest_obj_fadvise_flags(dest_obj_fadvise_flags),
+ num_chunk(0),
+ failed(false)
+ {
+ results.user_version = v;
+ results.mirror_snapset = mirror_snapset;
+ }
+ };
+ /**
+ * The CopyCallback class defines an interface for completions to the
+ * copy_start code. Users of the copy infrastructure must implement
+ * one and give an instance of the class to start_copy.
+ *
+ * The implementer is responsible for making sure that the CopyCallback
+ * can associate itself with the correct copy operation.
+ */
+ typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
+ friend class CopyFromCallback;
+ friend struct CopyFromFinisher;
+ friend class PromoteCallback;
+ friend struct PromoteFinisher;
+ struct ProxyReadOp {
+ OpRequestRef op;
+ hobject_t soid;
+ ceph_tid_t objecter_tid;
+ std::vector<OSDOp> &ops;
+ version_t user_version;
+ int data_offset;
+ bool canceled; ///< true if canceled
+ ProxyReadOp(OpRequestRef _op, hobject_t oid, std::vector<OSDOp>& _ops)
+ : op(_op), soid(oid),
+ objecter_tid(0), ops(_ops),
+ user_version(0), data_offset(0),
+ canceled(false) { }
+ };
+ typedef std::shared_ptr<ProxyReadOp> ProxyReadOpRef;
+ struct ProxyWriteOp {
+ OpContext *ctx;
+ OpRequestRef op;
+ hobject_t soid;
+ ceph_tid_t objecter_tid;
+ std::vector<OSDOp> &ops;
+ version_t user_version;
+ bool sent_reply;
+ utime_t mtime;
+ bool canceled;
+ osd_reqid_t reqid;
+ ProxyWriteOp(OpRequestRef _op, hobject_t oid, std::vector<OSDOp>& _ops, osd_reqid_t _reqid)
+ : ctx(NULL), op(_op), soid(oid),
+ objecter_tid(0), ops(_ops),
+ user_version(0), sent_reply(false),
+ canceled(false),
+ reqid(_reqid) { }
+ };
+ typedef std::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
+ struct FlushOp {
+ ObjectContextRef obc; ///< obc we are flushing
+ OpRequestRef op; ///< initiating op
+ std::list<OpRequestRef> dup_ops; ///< bandwagon jumpers
+ version_t flushed_version; ///< user version we are flushing
+ ceph_tid_t objecter_tid; ///< copy-from request tid
+ int rval; ///< copy-from result
+ bool blocking; ///< whether we are blocking updates
+ bool removal; ///< we are removing the backend object
+ std::optional<std::function<void()>> on_flush; ///< callback, may be null
+ // for chunked object
+ std::map<uint64_t, int> io_results;
+ std::map<uint64_t, ceph_tid_t> io_tids;
+ uint64_t chunks;
+ FlushOp()
+ : flushed_version(0), objecter_tid(0), rval(0),
+ blocking(false), removal(false), chunks(0) {}
+ ~FlushOp() { ceph_assert(!on_flush); }
+ };
+ typedef std::shared_ptr<FlushOp> FlushOpRef;
+ friend struct RefCountCallback;
+ struct ManifestOp {
+ RefCountCallback *cb;
+ ceph_tid_t objecter_tid;
+ OpRequestRef op;
+ std::map<uint64_t, int> results;
+ std::map<uint64_t, ceph_tid_t> tids;
+ std::map<hobject_t, pair<uint64_t, uint64_t>> chunks;
+ uint64_t num_chunks = 0;
+ object_manifest_t new_manifest;
+ ManifestOp(RefCountCallback* cb)
+ : cb(cb), objecter_tid(0) {}
+ };
+ typedef std::shared_ptr<ManifestOp> ManifestOpRef;
+ std::map<hobject_t, ManifestOpRef> manifest_ops;
+ boost::scoped_ptr<PGBackend> pgbackend;
+ PGBackend *get_pgbackend() override {
+ return pgbackend.get();
+ }
+ const PGBackend *get_pgbackend() const override {
+ return pgbackend.get();
+ }
+ /// Listener methods
+ DoutPrefixProvider *get_dpp() override {
+ return this;
+ }
+ void on_local_recover(
+ const hobject_t &oid,
+ const ObjectRecoveryInfo &recovery_info,
+ ObjectContextRef obc,
+ bool is_delete,
+ ObjectStore::Transaction *t
+ ) override;
+ void on_peer_recover(
+ pg_shard_t peer,
+ const hobject_t &oid,
+ const ObjectRecoveryInfo &recovery_info
+ ) override {
+ recovery_state.on_peer_recover(peer, oid, recovery_info.version);
+ }
+ void begin_peer_recover(
+ pg_shard_t peer,
+ const hobject_t oid) override {
+ recovery_state.begin_peer_recover(peer, oid);
+ }
+ void on_global_recover(
+ const hobject_t &oid,
+ const object_stat_sum_t &stat_diff,
+ bool is_delete) override;
+ void on_failed_pull(
+ const std::set<pg_shard_t> &from,
+ const hobject_t &soid,
+ const eversion_t &version) override;
+ void cancel_pull(const hobject_t &soid) override;
+ void apply_stats(
+ const hobject_t &soid,
+ const object_stat_sum_t &delta_stats) override;
+ bool primary_error(const hobject_t& soid, eversion_t v);
+ void remove_missing_object(const hobject_t &oid,
+ eversion_t v,
+ Context *on_complete) override;
+ template<class T> class BlessedGenContext;
+ template<class T> class UnlockedBlessedGenContext;
+ class BlessedContext;
+ Context *bless_context(Context *c) override;
+ GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+ GenContext<ThreadPool::TPHandle&> *c) override;
+ GenContext<ThreadPool::TPHandle&> *bless_unlocked_gencontext(
+ GenContext<ThreadPool::TPHandle&> *c) override;
+ void send_message(int to_osd, Message *m) override {
+ osd->send_message_osd_cluster(to_osd, m, get_osdmap_epoch());
+ }
+ void queue_transaction(ObjectStore::Transaction&& t,
+ OpRequestRef op) override {
+ osd->store->queue_transaction(ch, std::move(t), op);
+ }
+ void queue_transactions(std::vector<ObjectStore::Transaction>& tls,
+ OpRequestRef op) override {
+ osd->store->queue_transactions(ch, tls, op, NULL);
+ }
+ epoch_t get_interval_start_epoch() const override {
+ return info.history.same_interval_since;
+ }
+ epoch_t get_last_peering_reset_epoch() const override {
+ return get_last_peering_reset();
+ }
+ const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
+ return get_acting_recovery_backfill();
+ }
+ const std::set<pg_shard_t> &get_acting_shards() const override {
+ return recovery_state.get_actingset();
+ }
+ const std::set<pg_shard_t> &get_backfill_shards() const override {
+ return get_backfill_targets();
+ }
+ std::ostream& gen_dbg_prefix(std::ostream& out) const override {
+ return gen_prefix(out);
+ }
+ const HobjToShardSetMapping& get_missing_loc_shards() const override
+ {
+ return recovery_state.get_missing_loc().get_missing_locs();
+ }
+ const std::map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
+ return recovery_state.get_peer_missing();
+ }
+ using PGBackend::Listener::get_shard_missing;
+ const std::map<pg_shard_t, pg_info_t> &get_shard_info() const override {
+ return recovery_state.get_peer_info();
+ }
+ using PGBackend::Listener::get_shard_info;
+ const pg_missing_tracker_t &get_local_missing() const override {
+ return recovery_state.get_pg_log().get_missing();
+ }
+ const PGLog &get_log() const override {
+ return recovery_state.get_pg_log();
+ }
+ void add_local_next_event(const pg_log_entry_t& e) override {
+ recovery_state.add_local_next_event(e);
+ }
+ bool pgb_is_primary() const override {
+ return is_primary();
+ }
+ const OSDMapRef& pgb_get_osdmap() const override final {
+ return get_osdmap();
+ }
+ epoch_t pgb_get_osdmap_epoch() const override final {
+ return get_osdmap_epoch();
+ }
+ const pg_info_t &get_info() const override {
+ return info;
+ }
+ const pg_pool_t &get_pool() const override {
+ return;
+ }
+ ObjectContextRef get_obc(
+ const hobject_t &hoid,
+ const std::map<std::string, ceph::buffer::list> &attrs) override {
+ return get_object_context(hoid, true, &attrs);
+ }
+ bool try_lock_for_read(
+ const hobject_t &hoid,
+ ObcLockManager &manager) override {
+ if (is_missing_object(hoid))
+ return false;
+ auto obc = get_object_context(hoid, false, nullptr);
+ if (!obc)
+ return false;
+ return manager.try_get_read_lock(hoid, obc);
+ }
+ void release_locks(ObcLockManager &manager) override {
+ release_object_locks(manager);
+ }
+ bool pg_is_repair() override {
+ return is_repair();
+ }
+ void inc_osd_stat_repaired() override {
+ osd->inc_osd_stat_repaired();
+ }
+ bool pg_is_remote_backfilling() override {
+ return is_remote_backfilling();
+ }
+ void pg_add_local_num_bytes(int64_t num_bytes) override {
+ add_local_num_bytes(num_bytes);
+ }
+ void pg_sub_local_num_bytes(int64_t num_bytes) override {
+ sub_local_num_bytes(num_bytes);
+ }
+ void pg_add_num_bytes(int64_t num_bytes) override {
+ add_num_bytes(num_bytes);
+ }
+ void pg_sub_num_bytes(int64_t num_bytes) override {
+ sub_num_bytes(num_bytes);
+ }
+ void pgb_set_object_snap_mapping(
+ const hobject_t &soid,
+ const std::set<snapid_t> &snaps,
+ ObjectStore::Transaction *t) override {
+ return update_object_snap_mapping(t, soid, snaps);
+ }
+ void pgb_clear_object_snap_mapping(
+ const hobject_t &soid,
+ ObjectStore::Transaction *t) override {
+ return clear_object_snap_mapping(t, soid);
+ }
+ void log_operation(
+ std::vector<pg_log_entry_t>&& logv,
+ const std::optional<pg_hit_set_history_t> &hset_history,
+ const eversion_t &trim_to,
+ const eversion_t &roll_forward_to,
+ const eversion_t &min_last_complete_ondisk,
+ bool transaction_applied,
+ ObjectStore::Transaction &t,
+ bool async = false) override {
+ if (is_primary()) {
+ ceph_assert(trim_to <= recovery_state.get_last_update_ondisk());
+ }
+ if (hset_history) {
+ recovery_state.update_hset(*hset_history);
+ }
+ if (transaction_applied) {
+ update_snap_map(logv, t);
+ }
+ auto last = logv.rbegin();
+ if (is_primary() && last != logv.rend()) {
+ projected_log.skip_can_rollback_to_to_head();
+ projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
+ }
+ if (!is_primary() && !is_ec_pg()) {
+ replica_clear_repop_obc(logv, t);
+ }
+ recovery_state.append_log(
+ std::move(logv), trim_to, roll_forward_to, min_last_complete_ondisk,
+ t, transaction_applied, async);
+ }
+ void replica_clear_repop_obc(
+ const std::vector<pg_log_entry_t> &logv,
+ ObjectStore::Transaction &t);
+ void op_applied(const eversion_t &applied_version) override;
+ bool should_send_op(
+ pg_shard_t peer,
+ const hobject_t &hoid) override;
+ bool pg_is_undersized() const override {
+ return is_undersized();
+ }
+ bool pg_is_repair() const override {
+ return is_repair();
+ }
+ void update_peer_last_complete_ondisk(
+ pg_shard_t fromosd,
+ eversion_t lcod) override {
+ recovery_state.update_peer_last_complete_ondisk(fromosd, lcod);
+ }
+ void update_last_complete_ondisk(
+ eversion_t lcod) override {
+ recovery_state.update_last_complete_ondisk(lcod);
+ }
+ void update_stats(
+ const pg_stat_t &stat) override {
+ recovery_state.update_stats(
+ [&stat](auto &history, auto &stats) {
+ stats = stat;
+ return false;
+ });
+ }
+ void schedule_recovery_work(
+ GenContext<ThreadPool::TPHandle&> *c) override;
+ pg_shard_t whoami_shard() const override {
+ return pg_whoami;
+ }
+ spg_t primary_spg_t() const override {
+ return spg_t(info.pgid.pgid, get_primary().shard);
+ }
+ pg_shard_t primary_shard() const override {
+ return get_primary();
+ }
+ uint64_t min_peer_features() const override {
+ return recovery_state.get_min_peer_features();
+ }
+ uint64_t min_upacting_features() const override {
+ return recovery_state.get_min_upacting_features();
+ }
+ void send_message_osd_cluster(
+ int peer, Message *m, epoch_t from_epoch) override {
+ osd->send_message_osd_cluster(peer, m, from_epoch);
+ }
+ void send_message_osd_cluster(
+ std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) override {
+ osd->send_message_osd_cluster(messages, from_epoch);
+ }
+ void send_message_osd_cluster(
+ MessageRef m, Connection *con) override {
+ osd->send_message_osd_cluster(std::move(m), con);
+ }
+ void send_message_osd_cluster(
+ Message *m, const ConnectionRef& con) override {
+ osd->send_message_osd_cluster(m, con);
+ }
+ ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override;
+ entity_name_t get_cluster_msgr_name() override {
+ return osd->get_cluster_msgr_name();
+ }
+ PerfCounters *get_logger() override;
+ ceph_tid_t get_tid() override { return osd->get_tid(); }
+ OstreamTemp clog_error() override { return osd->clog->error(); }
+ OstreamTemp clog_warn() override { return osd->clog->warn(); }
+ /**
+ * a scrub-map arrived from a replica
+ */
+ void do_replica_scrub_map(OpRequestRef op);
+ struct watch_disconnect_t {
+ uint64_t cookie;
+ entity_name_t name;
+ bool send_disconnect;
+ watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
+ : cookie(c), name(n), send_disconnect(sd) {}
+ };
+ void complete_disconnect_watches(
+ ObjectContextRef obc,
+ const std::list<watch_disconnect_t> &to_disconnect);
+ struct OpFinisher {
+ virtual ~OpFinisher() {
+ }
+ virtual int execute() = 0;
+ };
+ /*
+ * Capture all object state associated with an in-progress read or write.
+ */
+ struct OpContext {
+ OpRequestRef op;
+ osd_reqid_t reqid;
+ std::vector<OSDOp> *ops;
+ const ObjectState *obs; // Old objectstate
+ const SnapSet *snapset; // Old snapset
+ ObjectState new_obs; // resulting ObjectState
+ SnapSet new_snapset; // resulting SnapSet (in case of a write)
+ //pg_stat_t new_stats; // resulting Stats
+ object_stat_sum_t delta_stats;
+ bool modify; // (force) modification (even if op_t is empty)
+ bool user_modify; // user-visible modification
+ bool undirty; // user explicitly un-dirtying this object
+ bool cache_operation; ///< true if this is a cache eviction
+ bool ignore_cache; ///< true if IGNORE_CACHE flag is std::set
+ bool ignore_log_op_stats; // don't log op stats
+ bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
+ ObjectCleanRegions clean_regions;
+ // side effects
+ std::list<std::pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
+ std::list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
+ std::list<notify_info_t> notifies;
+ struct NotifyAck {
+ std::optional<uint64_t> watch_cookie;
+ uint64_t notify_id;
+ ceph::buffer::list reply_bl;
+ explicit NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
+ NotifyAck(uint64_t notify_id, uint64_t cookie, ceph::buffer::list& rbl)
+ : watch_cookie(cookie), notify_id(notify_id) {
+ reply_bl = std::move(rbl);
+ }
+ };
+ std::list<NotifyAck> notify_acks;
+ uint64_t bytes_written, bytes_read;
+ utime_t mtime;
+ SnapContext snapc; // writer snap context
+ eversion_t at_version; // pg's current version pointer
+ version_t user_at_version; // pg's current user version pointer
+ /// index of the current subop - only valid inside of do_osd_ops()
+ int current_osd_subop_num;
+ /// total number of subops processed in this context for cls_cxx_subop_version()
+ int processed_subop_count = 0;
+ PGTransactionUPtr op_t;
+ std::vector<pg_log_entry_t> log;
+ std::optional<pg_hit_set_history_t> updated_hset_history;
+ interval_set<uint64_t> modified_ranges;
+ ObjectContextRef obc;
+ ObjectContextRef clone_obc; // if we created a clone
+ ObjectContextRef head_obc; // if we also update snapset (see trim_object)
+ // FIXME: we may want to kill this msgr hint off at some point!
+ std::optional<int> data_off = std::nullopt;
+ MOSDOpReply *reply;
+ PrimaryLogPG *pg;
+ int num_read; ///< count read ops
+ int num_write; ///< count update ops
+ mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > extra_reqids;
+ mempool::osd_pglog::map<uint32_t, int> extra_reqid_return_codes;
+ hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
+ std::list<std::function<void()>> on_applied;
+ std::list<std::function<void()>> on_committed;
+ std::list<std::function<void()>> on_finish;
+ std::list<std::function<void()>> on_success;
+ template <typename F>
+ void register_on_finish(F &&f) {
+ on_finish.emplace_back(std::forward<F>(f));
+ }
+ template <typename F>
+ void register_on_success(F &&f) {
+ on_success.emplace_back(std::forward<F>(f));
+ }
+ template <typename F>
+ void register_on_applied(F &&f) {
+ on_applied.emplace_back(std::forward<F>(f));
+ }
+ template <typename F>
+ void register_on_commit(F &&f) {
+ on_committed.emplace_back(std::forward<F>(f));
+ }
+ bool sent_reply = false;
+ // pending async reads <off, len, op_flags> -> <outbl, outr>
+ std::list<std::pair<boost::tuple<uint64_t, uint64_t, unsigned>,
+ std::pair<ceph::buffer::list*, Context*> > > pending_async_reads;
+ int inflightreads;
+ friend struct OnReadComplete;
+ void start_async_reads(PrimaryLogPG *pg);
+ void finish_read(PrimaryLogPG *pg);
+ bool async_reads_complete() {
+ return inflightreads == 0;
+ }
+ RWState::State lock_type;
+ ObcLockManager lock_manager;
+ std::map<int, std::unique_ptr<OpFinisher>> op_finishers;
+ OpContext(const OpContext& other);
+ const OpContext& operator=(const OpContext& other);
+ OpContext(OpRequestRef _op, osd_reqid_t _reqid, std::vector<OSDOp>* _ops,
+ ObjectContextRef& obc,
+ PrimaryLogPG *_pg) :
+ op(_op), reqid(_reqid), ops(_ops),
+ obs(&obc->obs),
+ snapset(0),
+ new_obs(obs->oi, obs->exists),
+ modify(false), user_modify(false), undirty(false), cache_operation(false),
+ ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
+ bytes_written(0), bytes_read(0), user_at_version(0),
+ current_osd_subop_num(0),
+ obc(obc),
+ reply(NULL), pg(_pg),
+ num_read(0),
+ num_write(0),
+ sent_reply(false),
+ inflightreads(0),
+ lock_type(RWState::RWNONE) {
+ if (obc->ssc) {
+ new_snapset = obc->ssc->snapset;
+ snapset = &obc->ssc->snapset;
+ }
+ }
+ OpContext(OpRequestRef _op, osd_reqid_t _reqid,
+ std::vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
+ op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
+ modify(false), user_modify(false), undirty(false), cache_operation(false),
+ ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
+ bytes_written(0), bytes_read(0), user_at_version(0),
+ current_osd_subop_num(0),
+ reply(NULL), pg(_pg),
+ num_read(0),
+ num_write(0),
+ inflightreads(0),
+ lock_type(RWState::RWNONE) {}
+ void reset_obs(ObjectContextRef obc) {
+ new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
+ if (obc->ssc) {
+ new_snapset = obc->ssc->snapset;
+ snapset = &obc->ssc->snapset;
+ }
+ }
+ ~OpContext() {
+ ceph_assert(!op_t);
+ if (reply)
+ reply->put();
+ for (std::list<std::pair<boost::tuple<uint64_t, uint64_t, unsigned>,
+ std::pair<ceph::buffer::list*, Context*> > >::iterator i =
+ pending_async_reads.begin();
+ i != pending_async_reads.end();
+ pending_async_reads.erase(i++)) {
+ delete i->second.second;
+ }
+ }
+ uint64_t get_features() {
+ if (op && op->get_req()) {
+ return op->get_req()->get_connection()->get_features();
+ }
+ return -1ull;
+ }
+ };
+ using OpContextUPtr = std::unique_ptr<OpContext>;
+ friend struct OpContext;
+ /*
+ * State on the PG primary associated with the replicated mutation
+ */
+ class RepGather {
+ public:
+ hobject_t hoid;
+ OpRequestRef op;
+ xlist<RepGather*>::item queue_item;
+ int nref;
+ eversion_t v;
+ int r = 0;
+ ceph_tid_t rep_tid;
+ bool rep_aborted;
+ bool all_committed;
+ utime_t start;
+ eversion_t pg_local_last_complete;
+ ObcLockManager lock_manager;
+ std::list<std::function<void()>> on_committed;
+ std::list<std::function<void()>> on_success;
+ std::list<std::function<void()>> on_finish;
+ RepGather(
+ OpContext *c, ceph_tid_t rt,
+ eversion_t lc) :
+ hoid(c->obc->obs.oi.soid),
+ op(c->op),
+ queue_item(this),
+ nref(1),
+ rep_tid(rt),
+ rep_aborted(false),
+ all_committed(false),
+ pg_local_last_complete(lc),
+ lock_manager(std::move(c->lock_manager)),
+ on_committed(std::move(c->on_committed)),
+ on_success(std::move(c->on_success)),
+ on_finish(std::move(c->on_finish)) {}
+ RepGather(
+ ObcLockManager &&manager,
+ OpRequestRef &&o,
+ std::optional<std::function<void(void)> > &&on_complete,
+ ceph_tid_t rt,
+ eversion_t lc,
+ int r) :
+ op(o),
+ queue_item(this),
+ nref(1),
+ r(r),
+ rep_tid(rt),
+ rep_aborted(false),
+ all_committed(false),
+ pg_local_last_complete(lc),
+ lock_manager(std::move(manager)) {
+ if (on_complete) {
+ on_success.push_back(std::move(*on_complete));
+ }
+ }
+ RepGather *get() {
+ nref++;
+ return this;
+ }
+ void put() {
+ ceph_assert(nref > 0);
+ if (--nref == 0) {
+ delete this;
+ //generic_dout(0) << "deleting " << this << dendl;
+ }
+ }
+ };
+ /**
+ * Grabs locks for OpContext, should be cleaned up in close_op_ctx
+ *
+ * @param ctx [in,out] ctx to get locks for
+ * @return true on success, false if we are queued
+ */
+ bool get_rw_locks(bool write_ordered, OpContext *ctx) {
+ /* If head_obc, !obc->obs->exists and we will always take the
+ * snapdir lock *before* the head lock. Since all callers will do
+ * this (read or write) if we get the first we will be guaranteed
+ * to get the second.
+ */
+ if (write_ordered && ctx->op->may_read()) {
+ ctx->lock_type = RWState::RWEXCL;
+ } else if (write_ordered) {
+ ctx->lock_type = RWState::RWWRITE;
+ } else {
+ ceph_assert(ctx->op->may_read());
+ ctx->lock_type = RWState::RWREAD;
+ }
+ if (ctx->head_obc) {
+ ceph_assert(!ctx->obc->obs.exists);
+ if (!ctx->lock_manager.get_lock_type(
+ ctx->lock_type,
+ ctx->head_obc->obs.oi.soid,
+ ctx->head_obc,
+ ctx->op)) {
+ ctx->lock_type = RWState::RWNONE;
+ return false;
+ }
+ }
+ if (ctx->lock_manager.get_lock_type(
+ ctx->lock_type,
+ ctx->obc->obs.oi.soid,
+ ctx->obc,
+ ctx->op)) {
+ return true;
+ } else {
+ ceph_assert(!ctx->head_obc);
+ ctx->lock_type = RWState::RWNONE;
+ return false;
+ }
+ }
+ /**
+ * Cleans up OpContext
+ *
+ * @param ctx [in] ctx to clean up
+ */
+ void close_op_ctx(OpContext *ctx);
+ /**
+ * Releases locks
+ *
+ * @param manager [in] manager with locks to release
+ *
+ * (moved to .cc due to scrubber access)
+ */
+ void release_object_locks(ObcLockManager &lock_manager);
+ // replica ops
+ // [primary|tail]
+ xlist<RepGather*> repop_queue;
+ friend class C_OSD_RepopCommit;
+ void repop_all_committed(RepGather *repop);
+ void eval_repop(RepGather*);
+ void issue_repop(RepGather *repop, OpContext *ctx);
+ RepGather *new_repop(
+ OpContext *ctx,
+ ObjectContextRef obc,
+ ceph_tid_t rep_tid);
+ boost::intrusive_ptr<RepGather> new_repop(
+ eversion_t version,
+ int r,
+ ObcLockManager &&manager,
+ OpRequestRef &&op,
+ std::optional<std::function<void(void)> > &&on_complete);
+ void remove_repop(RepGather *repop);
+ OpContextUPtr simple_opc_create(ObjectContextRef obc);
+ void simple_opc_submit(OpContextUPtr ctx);
+ /**
+ * Merge entries atomically into all acting_recovery_backfill osds
+ * adjusting missing and recovery state as necessary.
+ *
+ * Also used to store error log entries for dup detection.
+ */
+ void submit_log_entries(
+ const mempool::osd_pglog::list<pg_log_entry_t> &entries,
+ ObcLockManager &&manager,
+ std::optional<std::function<void(void)> > &&on_complete,
+ OpRequestRef op = OpRequestRef(),
+ int r = 0);
+ struct LogUpdateCtx {
+ boost::intrusive_ptr<RepGather> repop;
+ std::set<pg_shard_t> waiting_on;
+ };
+ void cancel_log_updates();
+ std::map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
+ // hot/cold tracking
+ HitSetRef hit_set; ///< currently accumulating HitSet
+ utime_t hit_set_start_stamp; ///< time the current HitSet started recording
+ void hit_set_clear(); ///< discard any HitSet state
+ void hit_set_setup(); ///< initialize HitSet state
+ void hit_set_create(); ///< create a new HitSet
+ void hit_set_persist(); ///< persist hit info
+ bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
+ void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
+ void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
+ void hit_set_remove_all();
+ hobject_t get_hit_set_current_object(utime_t stamp);
+ hobject_t get_hit_set_archive_object(utime_t start,
+ utime_t end,
+ bool using_gmt);
+ // agent
+ boost::scoped_ptr<TierAgentState> agent_state;
+ void agent_setup(); ///< initialize agent state
+ bool agent_work(int max) override ///< entry point to do some agent work
+ {
+ return agent_work(max, max);
+ }
+ bool agent_work(int max, int agent_flush_quota) override;
+ bool agent_maybe_flush(ObjectContextRef& obc); ///< maybe flush
+ bool agent_maybe_evict(ObjectContextRef& obc, bool after_flush); ///< maybe evict
+ void agent_load_hit_sets(); ///< load HitSets, if needed
+ /// estimate object atime and temperature
+ ///
+ /// @param oid [in] object name
+ /// @param temperature [out] relative temperature (# consider both access time and frequency)
+ void agent_estimate_temp(const hobject_t& oid, int *temperature);
+ /// stop the agent
+ void agent_stop() override;
+ void agent_delay() override;
+ /// clear agent state
+ void agent_clear() override;
+ /// choose (new) agent mode(s), returns true if op is requeued
+ bool agent_choose_mode(bool restart = false, OpRequestRef op = OpRequestRef());
+ void agent_choose_mode_restart() override;
+ /// true if we can send an ondisk/commit for v
+ bool already_complete(eversion_t v);
+ // projected object info
+ SharedLRU<hobject_t, ObjectContext> object_contexts;
+ // std::map from oid.snapdir() to SnapSetContext *
+ std::map<hobject_t, SnapSetContext*> snapset_contexts;
+ ceph::mutex snapset_contexts_lock =
+ ceph::make_mutex("PrimaryLogPG::snapset_contexts_lock");
+ // debug order that client ops are applied
+ std::map<hobject_t, std::map<client_t, ceph_tid_t>> debug_op_order;
+ void populate_obc_watchers(ObjectContextRef obc);
+ void check_blocklisted_obc_watchers(ObjectContextRef obc);
+ void check_blocklisted_watchers() override;
+ void get_watchers(std::list<obj_watch_item_t> *ls) override;
+ void get_obc_watchers(ObjectContextRef obc, std::list<obj_watch_item_t> &pg_watchers);
+ void handle_watch_timeout(WatchRef watch);
+ ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
+ ObjectContextRef get_object_context(
+ const hobject_t& soid,
+ bool can_create,
+ const std::map<std::string, ceph::buffer::list> *attrs = 0
+ );
+ void context_registry_on_change();
+ void object_context_destructor_callback(ObjectContext *obc);
+ class C_PG_ObjectContext;
+ int find_object_context(const hobject_t& oid,
+ ObjectContextRef *pobc,
+ bool can_create,
+ bool map_snapid_to_clone=false,
+ hobject_t *missing_oid=NULL);
+ void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat);
+ void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
+ SnapSetContext *get_snapset_context(
+ const hobject_t& oid,
+ bool can_create,
+ const std::map<std::string, ceph::buffer::list> *attrs = 0,
+ bool oid_existed = true //indicate this oid whether exsited in backend
+ );
+ void register_snapset_context(SnapSetContext *ssc) {
+ std::lock_guard l(snapset_contexts_lock);
+ _register_snapset_context(ssc);
+ }
+ void _register_snapset_context(SnapSetContext *ssc) {
+ ceph_assert(ceph_mutex_is_locked(snapset_contexts_lock));
+ if (!ssc->registered) {
+ ceph_assert(snapset_contexts.count(ssc->oid) == 0);
+ ssc->registered = true;
+ snapset_contexts[ssc->oid] = ssc;
+ }
+ }
+ void put_snapset_context(SnapSetContext *ssc);
+ std::map<hobject_t, ObjectContextRef> recovering;
+ /*
+ * Backfill
+ *
+ * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
+ *
+ * objects prior to peer_info[backfill_target].last_backfill
+ * - are on the peer
+ * - are included in the peer stats
+ *
+ * objects \in (last_backfill, last_backfill_started]
+ * - are on the peer or are in backfills_in_flight
+ * - are not included in pg stats (yet)
+ * - have their stats in pending_backfill_updates on the primary
+ */
+ std::set<hobject_t> backfills_in_flight;
+ std::map<hobject_t, pg_stat_t> pending_backfill_updates;
+ void dump_recovery_info(ceph::Formatter *f) const override {
+ f->open_array_section("waiting_on_backfill");
+ for (std::set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
+ p != waiting_on_backfill.end(); ++p)
+ f->dump_stream("osd") << *p;
+ f->close_section();
+ f->dump_stream("last_backfill_started") << last_backfill_started;
+ {
+ f->open_object_section("backfill_info");
+ backfill_info.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_array_section("peer_backfill_info");
+ for (std::map<pg_shard_t, BackfillInterval>::const_iterator pbi =
+ peer_backfill_info.begin();
+ pbi != peer_backfill_info.end(); ++pbi) {
+ f->dump_stream("osd") << pbi->first;
+ f->open_object_section("BackfillInterval");
+ pbi->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ {
+ f->open_array_section("backfills_in_flight");
+ for (std::set<hobject_t>::const_iterator i = backfills_in_flight.begin();
+ i != backfills_in_flight.end();
+ ++i) {
+ f->dump_stream("object") << *i;
+ }
+ f->close_section();
+ }
+ {
+ f->open_array_section("recovering");
+ for (std::map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
+ i != recovering.end();
+ ++i) {
+ f->dump_stream("object") << i->first;
+ }
+ f->close_section();
+ }
+ {
+ f->open_object_section("pg_backend");
+ pgbackend->dump_recovery_info(f);
+ f->close_section();
+ }
+ }
+ /// last backfill operation started
+ hobject_t last_backfill_started;
+ bool new_backfill;
+ int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
+ PGBackend::RecoveryHandle *h,
+ bool *work_started);
+ int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
+ PGBackend::RecoveryHandle *h,
+ bool *work_started);
+ void finish_degraded_object(const hobject_t oid);
+ // Cancels/resets pulls from peer
+ void check_recovery_sources(const OSDMapRef& map) override ;
+ int recover_missing(
+ const hobject_t& oid,
+ eversion_t v,
+ int priority,
+ PGBackend::RecoveryHandle *h);
+ // low level ops
+ void _make_clone(
+ OpContext *ctx,
+ PGTransaction* t,
+ ObjectContextRef obc,
+ const hobject_t& head, const hobject_t& coid,
+ object_info_t *poi);
+ void execute_ctx(OpContext *ctx);
+ void finish_ctx(OpContext *ctx, int log_op_type, int result=0);
+ void reply_ctx(OpContext *ctx, int err);
+ void make_writeable(OpContext *ctx);
+ void log_op_stats(const OpRequest& op, uint64_t inb, uint64_t outb);
+ void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
+ interval_set<uint64_t>& modified, uint64_t offset,
+ uint64_t length, bool write_full=false);
+ inline void truncate_update_size_and_usage(
+ object_stat_sum_t& delta_stats,
+ object_info_t& oi,
+ uint64_t truncate_size);
+ enum class cache_result_t {
+ };
+ cache_result_t maybe_handle_cache_detail(OpRequestRef op,
+ bool write_ordered,
+ ObjectContextRef obc, int r,
+ hobject_t missing_oid,
+ bool must_promote,
+ bool in_hit_set,
+ ObjectContextRef *promote_obc);
+ cache_result_t maybe_handle_manifest_detail(OpRequestRef op,
+ bool write_ordered,
+ ObjectContextRef obc);
+ bool maybe_handle_manifest(OpRequestRef op,
+ bool write_ordered,
+ ObjectContextRef obc) {
+ return cache_result_t::NOOP != maybe_handle_manifest_detail(
+ op,
+ write_ordered,
+ obc);
+ }
+ /**
+ * This helper function is called from do_op if the ObjectContext lookup fails.
+ * @returns true if the caching code is handling the Op, false otherwise.
+ */
+ bool maybe_handle_cache(OpRequestRef op,
+ bool write_ordered,
+ ObjectContextRef obc, int r,
+ const hobject_t& missing_oid,
+ bool must_promote,
+ bool in_hit_set = false) {
+ return cache_result_t::NOOP != maybe_handle_cache_detail(
+ op,
+ write_ordered,
+ obc,
+ r,
+ missing_oid,
+ must_promote,
+ in_hit_set,
+ nullptr);
+ }
+ /**
+ * This helper function checks if a promotion is needed.
+ */
+ bool maybe_promote(ObjectContextRef obc,
+ const hobject_t& missing_oid,
+ const object_locator_t& oloc,
+ bool in_hit_set,
+ uint32_t recency,
+ OpRequestRef promote_op,
+ ObjectContextRef *promote_obc = nullptr);
+ /**
+ * This helper function tells the client to redirect their request elsewhere.
+ */
+ void do_cache_redirect(OpRequestRef op);
+ /**
+ * This function attempts to start a promote. Either it succeeds,
+ * or places op on a wait std::list. If op is null, failure means that
+ * this is a noop. If a future user wants to be able to distinguish
+ * these cases, a return value should be added.
+ */
+ void promote_object(
+ ObjectContextRef obc, ///< [optional] obc
+ const hobject_t& missing_object, ///< oid (if !obc)
+ const object_locator_t& oloc, ///< locator for obc|oid
+ OpRequestRef op, ///< [optional] client op
+ ObjectContextRef *promote_obc = nullptr ///< [optional] new obc for object
+ );
+ int prepare_transaction(OpContext *ctx);
+ std::list<std::pair<OpRequestRef, OpContext*> > in_progress_async_reads;
+ void complete_read_ctx(int result, OpContext *ctx);
+ // pg on-disk content
+ void check_local() override;
+ void _clear_recovery_state() override;
+ bool start_recovery_ops(
+ uint64_t max,
+ ThreadPool::TPHandle &handle, uint64_t *started) override;
+ uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
+ uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle,
+ bool *recovery_started);
+ hobject_t earliest_peer_backfill() const;
+ bool all_peer_done() const;
+ /**
+ * @param work_started will be std::set to true if recover_backfill got anywhere
+ * @returns the number of operations started
+ */
+ uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
+ bool *work_started);
+ /**
+ * scan a (hash) range of objects in the current pg
+ *
+ * @min return at least this many items, unless we are done
+ * @max return no more than this many items
+ * @bi.begin first item should be >= this value
+ * @bi [out] resulting std::map of objects to eversion_t's
+ */
+ void scan_range(
+ int min, int max, BackfillInterval *bi,
+ ThreadPool::TPHandle &handle
+ );
+ /// Update a hash range to reflect changes since the last scan
+ void update_range(
+ BackfillInterval *bi, ///< [in,out] interval to update
+ ThreadPool::TPHandle &handle ///< [in] tp handle
+ );
+ int prep_backfill_object_push(
+ hobject_t oid, eversion_t v, ObjectContextRef obc,
+ std::vector<pg_shard_t> peers,
+ PGBackend::RecoveryHandle *h);
+ void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
+ class C_OSD_AppliedRecoveredObject;
+ class C_OSD_CommittedPushedObject;
+ class C_OSD_AppliedRecoveredObjectReplica;
+ void _applied_recovered_object(ObjectContextRef obc);
+ void _applied_recovered_object_replica();
+ void _committed_pushed_object(epoch_t epoch, eversion_t lc);
+ void recover_got(hobject_t oid, eversion_t v);
+ // -- copyfrom --
+ std::map<hobject_t, CopyOpRef> copy_ops;
+ int do_copy_get(OpContext *ctx, ceph::buffer::list::const_iterator& bp, OSDOp& op,
+ ObjectContextRef& obc);
+ int finish_copy_get();
+ void fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
+ OSDOp& osd_op);
+ /**
+ * To copy an object, call start_copy.
+ *
+ * @param cb: The CopyCallback to be activated when the copy is complete
+ * @param obc: The ObjectContext we are copying into
+ * @param src: The source object
+ * @param oloc: the source object locator
+ * @param version: the version of the source object to copy (0 for any)
+ */
+ void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
+ object_locator_t oloc, version_t version, unsigned flags,
+ bool mirror_snapset, unsigned src_obj_fadvise_flags,
+ unsigned dest_obj_fadvise_flags);
+ void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
+ void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
+ uint64_t get_copy_chunk_size() const {
+ uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
+ if ( {
+ uint64_t alignment =;
+ if (size % alignment) {
+ size += alignment - (size % alignment);
+ }
+ }
+ return size;
+ }
+ void _copy_some(ObjectContextRef obc, CopyOpRef cop);
+ void finish_copyfrom(CopyFromCallback *cb);
+ void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
+ void cancel_copy(CopyOpRef cop, bool requeue, std::vector<ceph_tid_t> *tids);
+ void cancel_copy_ops(bool requeue, std::vector<ceph_tid_t> *tids);
+ friend struct C_Copyfrom;
+ // -- flush --
+ std::map<hobject_t, FlushOpRef> flush_ops;
+ /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
+ int start_flush(
+ OpRequestRef op, ObjectContextRef obc,
+ bool blocking, hobject_t *pmissing,
+ std::optional<std::function<void()>> &&on_flush);
+ void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
+ int try_flush_mark_clean(FlushOpRef fop);
+ void cancel_flush(FlushOpRef fop, bool requeue, std::vector<ceph_tid_t> *tids);
+ void cancel_flush_ops(bool requeue, std::vector<ceph_tid_t> *tids);
+ /// @return false if clone is has been evicted
+ bool is_present_clone(hobject_t coid);
+ friend struct C_Flush;
+ // -- scrub --
+ bool _range_available_for_scrub(
+ const hobject_t &begin, const hobject_t &end) override;
+ void _split_into(pg_t child_pgid, PG *child,
+ unsigned split_bits) override;
+ void apply_and_flush_repops(bool requeue);
+ int do_xattr_cmp_u64(int op, __u64 v1, ceph::buffer::list& xattr);
+ int do_xattr_cmp_str(int op, std::string& v1s, ceph::buffer::list& xattr);
+ // -- checksum --
+ int do_checksum(OpContext *ctx, OSDOp& osd_op, ceph::buffer::list::const_iterator *bl_it);
+ int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
+ ceph::buffer::list::const_iterator *init_value_bl_it,
+ const ceph::buffer::list &read_bl);
+ friend struct C_ChecksumRead;
+ int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
+ int finish_extent_cmp(OSDOp& osd_op, const ceph::buffer::list &read_bl);
+ friend struct C_ExtentCmpRead;
+ int do_read(OpContext *ctx, OSDOp& osd_op);
+ int do_sparse_read(OpContext *ctx, OSDOp& osd_op);
+ int do_writesame(OpContext *ctx, OSDOp& osd_op);
+ bool pgls_filter(const PGLSFilter& filter, const hobject_t& sobj);
+ std::pair<int, std::unique_ptr<const PGLSFilter>> get_pgls_filter(
+ ceph::buffer::list::const_iterator& iter);
+ std::map<hobject_t, std::list<OpRequestRef>> in_progress_proxy_ops;
+ void kick_proxy_ops_blocked(hobject_t& soid);
+ void cancel_proxy_ops(bool requeue, std::vector<ceph_tid_t> *tids);
+ // -- proxyread --
+ std::map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
+ void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
+ void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
+ void cancel_proxy_read(ProxyReadOpRef prdop, std::vector<ceph_tid_t> *tids);
+ friend struct C_ProxyRead;
+ // -- proxywrite --
+ std::map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
+ void do_proxy_write(OpRequestRef op, ObjectContextRef obc = NULL);
+ void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
+ void cancel_proxy_write(ProxyWriteOpRef pwop, std::vector<ceph_tid_t> *tids);
+ friend struct C_ProxyWrite_Commit;
+ // -- chunkop --
+ enum class refcount_t {
+ };
+ void do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing_oid,
+ ObjectContextRef obc, bool write_ordered);
+ void do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index,
+ uint64_t chunk_index, uint64_t req_offset, uint64_t req_length,
+ uint64_t req_total_len, bool write_ordered);
+ bool can_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc);
+ void _copy_some_manifest(ObjectContextRef obc, CopyOpRef cop, uint64_t start_offset);
+ void process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset);
+ void finish_promote_manifest(int r, CopyResults *results, ObjectContextRef obc);
+ void cancel_and_requeue_proxy_ops(hobject_t oid);
+ void cancel_manifest_ops(bool requeue, vector<ceph_tid_t> *tids);
+ ceph_tid_t refcount_manifest(hobject_t src_soid, hobject_t tgt_soid, refcount_t type,
+ Context *cb, std::optional<bufferlist> chunk);
+ void dec_all_refcount_manifest(const object_info_t& oi, OpContext* ctx);
+ void dec_refcount(const hobject_t& soid, const object_ref_delta_t& refs);
+ void dec_refcount_by_dirty(OpContext* ctx);
+ ObjectContextRef get_prev_clone_obc(ObjectContextRef obc);
+ bool recover_adjacent_clones(ObjectContextRef obc, OpRequestRef op);
+ void get_adjacent_clones(ObjectContextRef src_obc,
+ ObjectContextRef& _l, ObjectContextRef& _g);
+ bool inc_refcount_by_set(OpContext* ctx, object_manifest_t& tgt,
+ OSDOp& osd_op);
+ int do_cdc(const object_info_t& oi, std::map<uint64_t, chunk_info_t>& chunk_map,
+ std::map<uint64_t, bufferlist>& chunks);
+ int start_dedup(OpRequestRef op, ObjectContextRef obc);
+ std::pair<int, hobject_t> get_fpoid_from_chunk(const hobject_t soid, bufferlist& chunk);
+ int finish_set_dedup(hobject_t oid, int r, ceph_tid_t tid, uint64_t offset);
+ friend struct C_ProxyChunkRead;
+ friend class PromoteManifestCallback;
+ friend struct C_CopyChunk;
+ friend struct RefCountCallback;
+ friend struct C_SetDedupChunks;
+ PrimaryLogPG(OSDService *o, OSDMapRef curmap,
+ const PGPool &_pool,
+ const std::map<std::string,std::string>& ec_profile,
+ spg_t p);
+ ~PrimaryLogPG() override;
+ void do_command(
+ const std::string_view& prefix,
+ const cmdmap_t& cmdmap,
+ const ceph::buffer::list& idata,
+ std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish) override;
+ void clear_cache() override;
+ int get_cache_obj_count() override {
+ return object_contexts.get_count();
+ }
+ unsigned get_pg_shard() const {
+ return info.pgid.hash_to_shard(osd->get_num_shards());
+ }
+ void do_request(
+ OpRequestRef& op,
+ ThreadPool::TPHandle &handle) override;
+ void do_op(OpRequestRef& op);
+ void record_write_error(OpRequestRef op, const hobject_t &soid,
+ MOSDOpReply *orig_reply, int r,
+ OpContext *ctx_for_op_returns=nullptr);
+ void do_pg_op(OpRequestRef op);
+ void do_scan(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle);
+ void do_backfill(OpRequestRef op);
+ void do_backfill_remove(OpRequestRef op);
+ void handle_backoff(OpRequestRef& op);
+ int trim_object(bool first, const hobject_t &coid, snapid_t snap_to_trim,
+ OpContextUPtr *ctxp);
+ void snap_trimmer(epoch_t e) override;
+ void kick_snap_trim() override;
+ void snap_trimmer_scrub_complete() override;
+ int do_osd_ops(OpContext *ctx, std::vector<OSDOp>& ops);
+ int _get_tmap(OpContext *ctx, ceph::buffer::list *header, ceph::buffer::list *vals);
+ int do_tmap2omap(OpContext *ctx, unsigned flags);
+ int do_tmapup(OpContext *ctx, ceph::buffer::list::const_iterator& bp, OSDOp& osd_op);
+ int do_tmapup_slow(OpContext *ctx, ceph::buffer::list::const_iterator& bp, OSDOp& osd_op, ceph::buffer::list& bl);
+ void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
+ int do_scrub_ls(const MOSDOp *op, OSDOp *osd_op);
+ bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
+ uint64_t temp_seq; ///< last id for naming temp objects
+ /// generate a new temp object name
+ hobject_t generate_temp_object(const hobject_t& target);
+ /// generate a new temp object name (for recovery)
+ hobject_t get_temp_recovery_object(const hobject_t& target,
+ eversion_t version) override;
+ int get_recovery_op_priority() const {
+ int64_t pri = 0;
+, &pri);
+ return pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
+ }
+ coll_t get_coll() {
+ return coll;
+ }
+ void split_colls(
+ spg_t child,
+ int split_bits,
+ int seed,
+ const pg_pool_t *pool,
+ ObjectStore::Transaction &t) override {
+ coll_t target = coll_t(child);
+ create_pg_collection(t, child, split_bits);
+ t.split_collection(
+ coll,
+ split_bits,
+ seed,
+ target);
+ init_pg_ondisk(t, child, pool);
+ }
+ struct DoSnapWork : boost::statechart::event< DoSnapWork > {
+ DoSnapWork() : boost::statechart::event < DoSnapWork >() {}
+ };
+ struct KickTrim : boost::statechart::event< KickTrim > {
+ KickTrim() : boost::statechart::event < KickTrim >() {}
+ };
+ struct RepopsComplete : boost::statechart::event< RepopsComplete > {
+ RepopsComplete() : boost::statechart::event < RepopsComplete >() {}
+ };
+ struct ScrubComplete : boost::statechart::event< ScrubComplete > {
+ ScrubComplete() : boost::statechart::event < ScrubComplete >() {}
+ };
+ struct TrimWriteUnblocked : boost::statechart::event< TrimWriteUnblocked > {
+ TrimWriteUnblocked() : boost::statechart::event < TrimWriteUnblocked >() {}
+ };
+ struct Reset : boost::statechart::event< Reset > {
+ Reset() : boost::statechart::event< Reset >() {}
+ };
+ struct SnapTrimReserved : boost::statechart::event< SnapTrimReserved > {
+ SnapTrimReserved() : boost::statechart::event< SnapTrimReserved >() {}
+ };
+ struct SnapTrimTimerReady : boost::statechart::event< SnapTrimTimerReady > {
+ SnapTrimTimerReady() : boost::statechart::event< SnapTrimTimerReady >() {}
+ };
+ struct NotTrimming;
+ struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
+ PrimaryLogPG *pg;
+ explicit SnapTrimmer(PrimaryLogPG *pg) : pg(pg) {}
+ void log_enter(const char *state_name);
+ void log_exit(const char *state_name, utime_t duration);
+ bool permit_trim();
+ bool can_trim() {
+ return
+ permit_trim() &&
+ !pg->get_osdmap()->test_flag(CEPH_OSDMAP_NOSNAPTRIM);
+ }
+ } snap_trimmer_machine;
+ struct WaitReservation;
+ struct Trimming : boost::statechart::state< Trimming, SnapTrimmer, WaitReservation >, NamedState {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< KickTrim >,
+ boost::statechart::transition< Reset, NotTrimming >
+ > reactions;
+ std::set<hobject_t> in_flight;
+ snapid_t snap_to_trim;
+ explicit Trimming(my_context ctx)
+ : my_base(ctx),
+ NamedState(nullptr, "Trimming") {
+ context< SnapTrimmer >().log_enter(state_name);
+ ceph_assert(context< SnapTrimmer >().permit_trim());
+ ceph_assert(in_flight.empty());
+ }
+ void exit() {
+ context< SnapTrimmer >().log_exit(state_name, enter_time);
+ auto *pg = context< SnapTrimmer >().pg;
+ pg->osd->snap_reserver.cancel_reservation(pg->get_pgid());
+ pg->state_clear(PG_STATE_SNAPTRIM);
+ pg->publish_stats_to_osd();
+ }
+ boost::statechart::result react(const KickTrim&) {
+ return discard_event();
+ }
+ };
+ /* SnapTrimmerStates */
+ struct WaitTrimTimer : boost::statechart::state< WaitTrimTimer, Trimming >, NamedState {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< SnapTrimTimerReady >
+ > reactions;
+ Context *wakeup = nullptr;
+ explicit WaitTrimTimer(my_context ctx)
+ : my_base(ctx),
+ NamedState(nullptr, "Trimming/WaitTrimTimer") {
+ context< SnapTrimmer >().log_enter(state_name);
+ ceph_assert(context<Trimming>().in_flight.empty());
+ struct OnTimer : Context {
+ PrimaryLogPGRef pg;
+ epoch_t epoch;
+ OnTimer(PrimaryLogPGRef pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
+ void finish(int) override {
+ pg->lock();
+ if (!pg->pg_has_reset_since(epoch))
+ pg->snap_trimmer_machine.process_event(SnapTrimTimerReady());
+ pg->unlock();
+ }
+ };
+ auto *pg = context< SnapTrimmer >().pg;
+ float osd_snap_trim_sleep = pg->osd->osd->get_osd_snap_trim_sleep();
+ if (osd_snap_trim_sleep > 0) {
+ std::lock_guard l(pg->osd->sleep_lock);
+ wakeup = pg->osd->sleep_timer.add_event_after(
+ osd_snap_trim_sleep,
+ new OnTimer{pg, pg->get_osdmap_epoch()});
+ } else {
+ post_event(SnapTrimTimerReady());
+ }
+ }
+ void exit() {
+ context< SnapTrimmer >().log_exit(state_name, enter_time);
+ auto *pg = context< SnapTrimmer >().pg;
+ if (wakeup) {
+ std::lock_guard l(pg->osd->sleep_lock);
+ pg->osd->sleep_timer.cancel_event(wakeup);
+ wakeup = nullptr;
+ }
+ }
+ boost::statechart::result react(const SnapTrimTimerReady &) {
+ wakeup = nullptr;
+ if (!context< SnapTrimmer >().can_trim()) {
+ post_event(KickTrim());
+ return transit< NotTrimming >();
+ } else {
+ return transit< AwaitAsyncWork >();
+ }
+ }
+ };
+ struct WaitRWLock : boost::statechart::state< WaitRWLock, Trimming >, NamedState {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< TrimWriteUnblocked >
+ > reactions;
+ explicit WaitRWLock(my_context ctx)
+ : my_base(ctx),
+ NamedState(nullptr, "Trimming/WaitRWLock") {
+ context< SnapTrimmer >().log_enter(state_name);
+ ceph_assert(context<Trimming>().in_flight.empty());
+ }
+ void exit() {
+ context< SnapTrimmer >().log_exit(state_name, enter_time);
+ }
+ boost::statechart::result react(const TrimWriteUnblocked&) {
+ if (!context< SnapTrimmer >().can_trim()) {
+ post_event(KickTrim());
+ return transit< NotTrimming >();
+ } else {
+ return transit< AwaitAsyncWork >();
+ }
+ }
+ };
+ struct WaitRepops : boost::statechart::state< WaitRepops, Trimming >, NamedState {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< RepopsComplete >
+ > reactions;
+ explicit WaitRepops(my_context ctx)
+ : my_base(ctx),
+ NamedState(nullptr, "Trimming/WaitRepops") {
+ context< SnapTrimmer >().log_enter(state_name);
+ ceph_assert(!context<Trimming>().in_flight.empty());
+ }
+ void exit() {
+ context< SnapTrimmer >().log_exit(state_name, enter_time);
+ }
+ boost::statechart::result react(const RepopsComplete&) {
+ if (!context< SnapTrimmer >().can_trim()) {
+ post_event(KickTrim());
+ return transit< NotTrimming >();
+ } else {
+ return transit< WaitTrimTimer >();
+ }
+ }
+ };
+ struct AwaitAsyncWork : boost::statechart::state< AwaitAsyncWork, Trimming >, NamedState {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< DoSnapWork >
+ > reactions;
+ explicit AwaitAsyncWork(my_context ctx);
+ void exit() {
+ context< SnapTrimmer >().log_exit(state_name, enter_time);
+ }
+ boost::statechart::result react(const DoSnapWork&);
+ };
+ struct WaitReservation : boost::statechart::state< WaitReservation, Trimming >, NamedState {
+ /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
+ * always cancels the reservation */
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< SnapTrimReserved >
+ > reactions;
+ struct ReservationCB : public Context {
+ PrimaryLogPGRef pg;
+ bool canceled;
+ explicit ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
+ void finish(int) override {
+ pg->lock();
+ if (!canceled)
+ pg->snap_trimmer_machine.process_event(SnapTrimReserved());
+ pg->unlock();
+ }
+ void cancel() {
+ ceph_assert(pg->is_locked());
+ ceph_assert(!canceled);
+ canceled = true;
+ }
+ };
+ ReservationCB *pending = nullptr;
+ explicit WaitReservation(my_context ctx)
+ : my_base(ctx),
+ NamedState(nullptr, "Trimming/WaitReservation") {
+ context< SnapTrimmer >().log_enter(state_name);
+ ceph_assert(context<Trimming>().in_flight.empty());
+ auto *pg = context< SnapTrimmer >().pg;
+ pending = new ReservationCB(pg);
+ pg->osd->snap_reserver.request_reservation(
+ pg->get_pgid(),
+ pending,
+ 0);
+ pg->state_set(PG_STATE_SNAPTRIM_WAIT);
+ pg->publish_stats_to_osd();
+ }
+ boost::statechart::result react(const SnapTrimReserved&);
+ void exit() {
+ context< SnapTrimmer >().log_exit(state_name, enter_time);
+ if (pending)
+ pending->cancel();
+ pending = nullptr;
+ auto *pg = context< SnapTrimmer >().pg;
+ pg->state_clear(PG_STATE_SNAPTRIM_WAIT);
+ pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
+ pg->publish_stats_to_osd();
+ }
+ };
+ struct WaitScrub : boost::statechart::state< WaitScrub, SnapTrimmer >, NamedState {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< ScrubComplete >,
+ boost::statechart::custom_reaction< KickTrim >,
+ boost::statechart::transition< Reset, NotTrimming >
+ > reactions;
+ explicit WaitScrub(my_context ctx)
+ : my_base(ctx),
+ NamedState(nullptr, "Trimming/WaitScrub") {
+ context< SnapTrimmer >().log_enter(state_name);
+ }
+ void exit() {
+ context< SnapTrimmer >().log_exit(state_name, enter_time);
+ }
+ boost::statechart::result react(const ScrubComplete&) {
+ post_event(KickTrim());
+ return transit< NotTrimming >();
+ }
+ boost::statechart::result react(const KickTrim&) {
+ return discard_event();
+ }
+ };
+ struct NotTrimming : boost::statechart::state< NotTrimming, SnapTrimmer >, NamedState {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< KickTrim >,
+ boost::statechart::transition< Reset, NotTrimming >
+ > reactions;
+ explicit NotTrimming(my_context ctx);
+ void exit();
+ boost::statechart::result react(const KickTrim&);
+ };
+ int _verify_no_head_clones(const hobject_t& soid,
+ const SnapSet& ss);
+ // return true if we're creating a local object, false for a
+ // whiteout or no change.
+ void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
+ int _delete_oid(OpContext *ctx, bool no_whiteout, bool try_no_whiteout);
+ int _rollback_to(OpContext *ctx, ceph_osd_op& op);
+ bool is_missing_object(const hobject_t& oid) const;
+ bool is_unreadable_object(const hobject_t &oid) const {
+ return is_missing_object(oid) ||
+ !recovery_state.get_missing_loc().readable_with_acting(
+ oid, get_actingset());
+ }
+ void maybe_kick_recovery(const hobject_t &soid);
+ void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
+ int get_manifest_ref_count(ObjectContextRef obc, std::string& fp_oid, OpRequestRef op);
+ bool check_laggy(OpRequestRef& op);
+ bool check_laggy_requeue(OpRequestRef& op);
+ void recheck_readable() override;
+ bool is_backfill_target(pg_shard_t osd) const {
+ return recovery_state.is_backfill_target(osd);
+ }
+ const std::set<pg_shard_t> &get_backfill_targets() const {
+ return recovery_state.get_backfill_targets();
+ }
+ bool is_async_recovery_target(pg_shard_t peer) const {
+ return recovery_state.is_async_recovery_target(peer);
+ }
+ const std::set<pg_shard_t> &get_async_recovery_targets() const {
+ return recovery_state.get_async_recovery_targets();
+ }
+ bool is_degraded_or_backfilling_object(const hobject_t& oid);
+ bool is_degraded_on_async_recovery_target(const hobject_t& soid);
+ void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
+ void block_write_on_full_cache(
+ const hobject_t& oid, OpRequestRef op);
+ void block_for_clean(
+ const hobject_t& oid, OpRequestRef op);
+ void block_write_on_snap_rollback(
+ const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
+ void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
+ bool maybe_await_blocked_head(const hobject_t &soid, OpRequestRef op);
+ void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
+ void kick_object_context_blocked(ObjectContextRef obc);
+ void maybe_force_recovery();
+ void mark_all_unfound_lost(
+ int what,
+ std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish);
+ eversion_t pick_newest_available(const hobject_t& oid);
+ void do_update_log_missing(
+ OpRequestRef &op);
+ void do_update_log_missing_reply(
+ OpRequestRef &op);
+ void plpg_on_role_change() override;
+ void plpg_on_pool_change() override;
+ void clear_async_reads();
+ void on_change(ObjectStore::Transaction &t) override;
+ void on_activate_complete() override;
+ void on_flushed() override;
+ void on_removal(ObjectStore::Transaction &t) override;
+ void on_shutdown() override;
+ bool check_failsafe_full() override;
+ bool maybe_preempt_replica_scrub(const hobject_t& oid) override;
+ int rep_repair_primary_object(const hobject_t& soid, OpContext *ctx);
+ // attr cache handling
+ void setattr_maybe_cache(
+ ObjectContextRef obc,
+ PGTransaction *t,
+ const std::string &key,
+ ceph::buffer::list &val);
+ void setattrs_maybe_cache(
+ ObjectContextRef obc,
+ PGTransaction *t,
+ std::map<std::string, ceph::buffer::list> &attrs);
+ void rmattr_maybe_cache(
+ ObjectContextRef obc,
+ PGTransaction *t,
+ const std::string &key);
+ int getattr_maybe_cache(
+ ObjectContextRef obc,
+ const std::string &key,
+ ceph::buffer::list *val);
+ int getattrs_maybe_cache(
+ ObjectContextRef obc,
+ std::map<std::string, ceph::buffer::list> *out);
+ void set_dynamic_perf_stats_queries(
+ const std::list<OSDPerfMetricQuery> &queries) override;
+ void get_dynamic_perf_stats(DynamicPerfStats *stats) override;
+ DynamicPerfStats m_dynamic_perf_stats;
+inline ostream& operator<<(ostream& out, const PrimaryLogPG::RepGather& repop)
+ out << "repgather(" << &repop
+ << " " << repop.v
+ << " rep_tid=" << repop.rep_tid
+ << " committed?=" << repop.all_committed
+ << " r=" << repop.r
+ << ")";
+ return out;
+inline ostream& operator<<(ostream& out,
+ const PrimaryLogPG::ProxyWriteOpRef& pwop)
+ out << "proxywrite(" << &pwop
+ << " " << pwop->user_version
+ << " pwop_tid=" << pwop->objecter_tid;
+ if (pwop->ctx->op)
+ out << " op=" << *(pwop->ctx->op->get_req());
+ out << ")";
+ return out;
+void intrusive_ptr_add_ref(PrimaryLogPG::RepGather *repop);
+void intrusive_ptr_release(PrimaryLogPG::RepGather *repop);