summaryrefslogtreecommitdiffstats
path: root/src/mon/OSDMonitor.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/mon/OSDMonitor.h
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/mon/OSDMonitor.h')
-rw-r--r--src/mon/OSDMonitor.h874
1 files changed, 874 insertions, 0 deletions
diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h
new file mode 100644
index 000000000..e7701a639
--- /dev/null
+++ b/src/mon/OSDMonitor.h
@@ -0,0 +1,874 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Copyright (C) 2013,2014 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+/* Object Store Device (OSD) Monitor
+ */
+
+#ifndef CEPH_OSDMONITOR_H
+#define CEPH_OSDMONITOR_H
+
+#include <map>
+#include <set>
+#include <utility>
+
+#include "include/types.h"
+#include "include/encoding.h"
+#include "common/simple_cache.hpp"
+#include "common/PriorityCache.h"
+#include "msg/Messenger.h"
+
+#include "osd/OSDMap.h"
+#include "osd/OSDMapMapping.h"
+
+#include "CreatingPGs.h"
+#include "PaxosService.h"
+
+#include "erasure-code/ErasureCodeInterface.h"
+#include "mon/MonOpRequest.h"
+#include <boost/functional/hash.hpp>
+
+class Monitor;
+class PGMap;
+struct MonSession;
+class MOSDMap;
+
+
+/// information about a particular peer's failure reports for one osd
+struct failure_reporter_t {
+ utime_t failed_since; ///< when they think it failed
+ MonOpRequestRef op; ///< failure op request
+
+ failure_reporter_t() {}
+ failure_reporter_t(utime_t s, MonOpRequestRef op)
+ : failed_since(s), op(op) {}
+ ~failure_reporter_t() { }
+};
+
+/// information about all failure reports for one osd
+struct failure_info_t {
+ std::map<int, failure_reporter_t> reporters; ///< reporter -> failed_since etc
+ utime_t max_failed_since; ///< most recent failed_since
+
+ failure_info_t() {}
+
+ utime_t get_failed_since() {
+ if (max_failed_since == utime_t() && !reporters.empty()) {
+ // the old max must have canceled; recalculate.
+ for (auto p = reporters.begin(); p != reporters.end(); ++p)
+ if (p->second.failed_since > max_failed_since)
+ max_failed_since = p->second.failed_since;
+ }
+ return max_failed_since;
+ }
+
+ // set the message for the latest report.
+ void add_report(int who, utime_t failed_since, MonOpRequestRef op) {
+ [[maybe_unused]] auto [it, new_reporter] =
+ reporters.insert_or_assign(who, failure_reporter_t{failed_since, op});
+ if (new_reporter) {
+ if (max_failed_since != utime_t() && max_failed_since < failed_since) {
+ max_failed_since = failed_since;
+ }
+ }
+ }
+
+ void take_report_messages(std::list<MonOpRequestRef>& ls) {
+ for (auto p = reporters.begin(); p != reporters.end(); ++p) {
+ if (p->second.op) {
+ ls.push_back(p->second.op);
+ p->second.op.reset();
+ }
+ }
+ }
+
+ void cancel_report(int who) {
+ reporters.erase(who);
+ max_failed_since = utime_t();
+ }
+};
+
+
+class LastEpochClean {
+ struct Lec {
+ std::vector<epoch_t> epoch_by_pg;
+ ps_t next_missing = 0;
+ epoch_t floor = std::numeric_limits<epoch_t>::max();
+ void report(unsigned pg_num, ps_t pg, epoch_t last_epoch_clean);
+ };
+ std::map<uint64_t, Lec> report_by_pool;
+public:
+ void report(unsigned pg_num, const pg_t& pg, epoch_t last_epoch_clean);
+ void remove_pool(uint64_t pool);
+ epoch_t get_lower_bound(const OSDMap& latest) const;
+
+ void dump(Formatter *f) const;
+};
+
+
+struct osdmap_manifest_t {
+ // all the maps we have pinned -- i.e., won't be removed unless
+ // they are inside a trim interval.
+ std::set<version_t> pinned;
+
+ osdmap_manifest_t() {}
+
+ version_t get_last_pinned() const
+ {
+ auto it = pinned.crbegin();
+ if (it == pinned.crend()) {
+ return 0;
+ }
+ return *it;
+ }
+
+ version_t get_first_pinned() const
+ {
+ auto it = pinned.cbegin();
+ if (it == pinned.cend()) {
+ return 0;
+ }
+ return *it;
+ }
+
+ bool is_pinned(version_t v) const
+ {
+ return pinned.find(v) != pinned.end();
+ }
+
+ void pin(version_t v)
+ {
+ pinned.insert(v);
+ }
+
+ version_t get_lower_closest_pinned(version_t v) const {
+ auto p = pinned.lower_bound(v);
+ if (p == pinned.cend()) {
+ return 0;
+ } else if (*p > v) {
+ if (p == pinned.cbegin()) {
+ return 0;
+ }
+ --p;
+ }
+ return *p;
+ }
+
+ void encode(ceph::buffer::list& bl) const
+ {
+ ENCODE_START(1, 1, bl);
+ encode(pinned, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl)
+ {
+ DECODE_START(1, bl);
+ decode(pinned, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list& bl) {
+ auto p = bl.cbegin();
+ decode(p);
+ }
+
+ void dump(ceph::Formatter *f) {
+ f->dump_unsigned("first_pinned", get_first_pinned());
+ f->dump_unsigned("last_pinned", get_last_pinned());
+ f->open_array_section("pinned_maps");
+ for (auto& i : pinned) {
+ f->dump_unsigned("epoch", i);
+ }
+ f->close_section();
+ }
+};
+WRITE_CLASS_ENCODER(osdmap_manifest_t);
+
+class OSDMonitor : public PaxosService,
+ public md_config_obs_t {
+ CephContext *cct;
+
+public:
+ OSDMap osdmap;
+
+ // config observer
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) override;
+ // [leader]
+ OSDMap::Incremental pending_inc;
+ std::map<int, ceph::buffer::list> pending_metadata;
+ std::set<int> pending_metadata_rm;
+ std::map<int, failure_info_t> failure_info;
+ std::map<int,utime_t> down_pending_out; // osd down -> out
+ bool priority_convert = false;
+ std::map<int64_t,std::set<snapid_t>> pending_pseudo_purged_snaps;
+ std::shared_ptr<PriorityCache::PriCache> rocksdb_binned_kv_cache = nullptr;
+ std::shared_ptr<PriorityCache::Manager> pcm = nullptr;
+ ceph::mutex balancer_lock = ceph::make_mutex("OSDMonitor::balancer_lock");
+
+ std::map<int,double> osd_weight;
+
+ using osdmap_key_t = std::pair<version_t, uint64_t>;
+ using osdmap_cache_t = SimpleLRU<osdmap_key_t,
+ ceph::buffer::list,
+ std::less<osdmap_key_t>,
+ boost::hash<osdmap_key_t>>;
+ osdmap_cache_t inc_osd_cache;
+ osdmap_cache_t full_osd_cache;
+
+ bool has_osdmap_manifest;
+ osdmap_manifest_t osdmap_manifest;
+
+ bool check_failures(utime_t now);
+ bool check_failure(utime_t now, int target_osd, failure_info_t& fi);
+ utime_t get_grace_time(utime_t now, int target_osd, failure_info_t& fi) const;
+ bool is_failure_stale(utime_t now, failure_info_t& fi) const;
+ void force_failure(int target_osd, int by);
+
+ bool _have_pending_crush();
+ CrushWrapper &_get_stable_crush();
+ void _get_pending_crush(CrushWrapper& newcrush);
+
+ enum FastReadType {
+ FAST_READ_OFF,
+ FAST_READ_ON,
+ FAST_READ_DEFAULT
+ };
+
+ struct CleanUpmapJob : public ParallelPGMapper::Job {
+ CephContext *cct;
+ const OSDMap& osdmap;
+ OSDMap::Incremental& pending_inc;
+ // lock to protect pending_inc form changing
+ // when checking is done
+ ceph::mutex pending_inc_lock =
+ ceph::make_mutex("CleanUpmapJob::pending_inc_lock");
+
+ CleanUpmapJob(CephContext *cct, const OSDMap& om, OSDMap::Incremental& pi)
+ : ParallelPGMapper::Job(&om),
+ cct(cct),
+ osdmap(om),
+ pending_inc(pi) {}
+
+ void process(const std::vector<pg_t>& to_check) override {
+ std::vector<pg_t> to_cancel;
+ std::map<pg_t, mempool::osdmap::vector<std::pair<int,int>>> to_remap;
+ osdmap.check_pg_upmaps(cct, to_check, &to_cancel, &to_remap);
+ // don't bother taking lock if nothing changes
+ if (!to_cancel.empty() || !to_remap.empty()) {
+ std::lock_guard l(pending_inc_lock);
+ osdmap.clean_pg_upmaps(cct, &pending_inc, to_cancel, to_remap);
+ }
+ }
+
+ void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) override {}
+ void complete() override {}
+ }; // public as this will need to be accessible from TestTestOSDMap.cc
+
+ // svc
+public:
+ void create_initial() override;
+ void get_store_prefixes(std::set<std::string>& s) const override;
+
+private:
+ void update_from_paxos(bool *need_bootstrap) override;
+ void create_pending() override; // prepare a new pending
+ void encode_pending(MonitorDBStore::TransactionRef t) override;
+ void on_active() override;
+ void on_restart() override;
+ void on_shutdown() override;
+
+ /* osdmap full map prune */
+ void load_osdmap_manifest();
+ bool should_prune() const;
+ void _prune_update_trimmed(
+ MonitorDBStore::TransactionRef tx,
+ version_t first);
+ void prune_init(osdmap_manifest_t& manifest);
+ bool _prune_sanitize_options() const;
+ bool is_prune_enabled() const;
+ bool is_prune_supported() const;
+ bool do_prune(MonitorDBStore::TransactionRef tx);
+
+ // Priority cache control
+ uint32_t mon_osd_cache_size = 0; ///< Number of cached OSDMaps
+ uint64_t rocksdb_cache_size = 0; ///< Cache for kv Db
+ double cache_kv_ratio = 0; ///< Cache ratio dedicated to kv
+ double cache_inc_ratio = 0; ///< Cache ratio dedicated to inc
+ double cache_full_ratio = 0; ///< Cache ratio dedicated to full
+ uint64_t mon_memory_base = 0; ///< Mon base memory for cache autotuning
+ double mon_memory_fragmentation = 0; ///< Expected memory fragmentation
+ uint64_t mon_memory_target = 0; ///< Mon target memory for cache autotuning
+ uint64_t mon_memory_min = 0; ///< Min memory to cache osdmaps
+ bool mon_memory_autotune = false; ///< Cache auto tune setting
+ int register_cache_with_pcm();
+ int _set_cache_sizes();
+ int _set_cache_ratios();
+ void _set_new_cache_sizes();
+ void _set_cache_autotuning();
+ int _update_mon_cache_settings();
+
+ friend struct OSDMemCache;
+ friend struct IncCache;
+ friend struct FullCache;
+
+ /**
+ * we haven't delegated full version stashing to paxosservice for some time
+ * now, making this function useless in current context.
+ */
+ void encode_full(MonitorDBStore::TransactionRef t) override { }
+ /**
+ * do not let paxosservice periodically stash full osdmaps, or we will break our
+ * locally-managed full maps. (update_from_paxos loads the latest and writes them
+ * out going forward from there, but if we just synced that may mean we skip some.)
+ */
+ bool should_stash_full() override {
+ return false;
+ }
+
+ /**
+ * hook into trim to include the oldest full map in the trim transaction
+ *
+ * This ensures that anyone post-sync will have enough to rebuild their
+ * full osdmaps.
+ */
+ void encode_trim_extra(MonitorDBStore::TransactionRef tx, version_t first) override;
+
+ void update_msgr_features();
+ /**
+ * check if the cluster supports the features required by the
+ * given crush map. Outputs the daemons which don't support it
+ * to the stringstream.
+ *
+ * @returns true if the map is passable, false otherwise
+ */
+ bool validate_crush_against_features(const CrushWrapper *newcrush,
+ std::stringstream &ss);
+ void check_osdmap_subs();
+ void share_map_with_random_osd();
+
+ ceph::mutex prime_pg_temp_lock =
+ ceph::make_mutex("OSDMonitor::prime_pg_temp_lock");
+ struct PrimeTempJob : public ParallelPGMapper::Job {
+ OSDMonitor *osdmon;
+ PrimeTempJob(const OSDMap& om, OSDMonitor *m)
+ : ParallelPGMapper::Job(&om), osdmon(m) {}
+ void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
+ for (unsigned ps = ps_begin; ps < ps_end; ++ps) {
+ pg_t pgid(ps, pool);
+ osdmon->prime_pg_temp(*osdmap, pgid);
+ }
+ }
+ void process(const std::vector<pg_t>& pgs) override {}
+ void complete() override {}
+ };
+ void maybe_prime_pg_temp();
+ void prime_pg_temp(const OSDMap& next, pg_t pgid);
+
+ ParallelPGMapper mapper; ///< for background pg work
+ OSDMapMapping mapping; ///< pg <-> osd mappings
+ std::unique_ptr<ParallelPGMapper::Job> mapping_job; ///< background mapping job
+ void start_mapping();
+
+ void update_logger();
+
+ void handle_query(PaxosServiceMessage *m);
+ bool preprocess_query(MonOpRequestRef op) override; // true if processed.
+ bool prepare_update(MonOpRequestRef op) override;
+ bool should_propose(double &delay) override;
+
+ version_t get_trim_to() const override;
+
+ bool can_mark_down(int o);
+ bool can_mark_up(int o);
+ bool can_mark_out(int o);
+ bool can_mark_in(int o);
+
+ // ...
+ MOSDMap *build_latest_full(uint64_t features);
+ MOSDMap *build_incremental(epoch_t first, epoch_t last, uint64_t features);
+ void send_full(MonOpRequestRef op);
+ void send_incremental(MonOpRequestRef op, epoch_t first);
+public:
+ /**
+ * Make sure the existing (up) OSDs support the given features
+ * @return 0 on success, or an error code if any OSDs re missing features.
+ * @param ss Filled in with ane explanation of failure, if any
+ */
+ int check_cluster_features(uint64_t features, std::stringstream &ss);
+ // @param req an optional op request, if the osdmaps are replies to it. so
+ // @c Monitor::send_reply() can mark_event with it.
+ void send_incremental(epoch_t first, MonSession *session, bool onetime,
+ MonOpRequestRef req = MonOpRequestRef());
+
+private:
+ void print_utilization(std::ostream &out, ceph::Formatter *f, bool tree) const;
+
+ bool check_source(MonOpRequestRef op, uuid_d fsid);
+
+ bool preprocess_get_osdmap(MonOpRequestRef op);
+
+ bool preprocess_mark_me_down(MonOpRequestRef op);
+
+ friend class C_AckMarkedDown;
+ bool preprocess_failure(MonOpRequestRef op);
+ bool prepare_failure(MonOpRequestRef op);
+ bool prepare_mark_me_down(MonOpRequestRef op);
+ void process_failures();
+ void take_all_failures(std::list<MonOpRequestRef>& ls);
+
+ bool preprocess_mark_me_dead(MonOpRequestRef op);
+ bool prepare_mark_me_dead(MonOpRequestRef op);
+
+ bool preprocess_full(MonOpRequestRef op);
+ bool prepare_full(MonOpRequestRef op);
+
+ bool preprocess_boot(MonOpRequestRef op);
+ bool prepare_boot(MonOpRequestRef op);
+ void _booted(MonOpRequestRef op, bool logit);
+
+ void update_up_thru(int from, epoch_t up_thru);
+ bool preprocess_alive(MonOpRequestRef op);
+ bool prepare_alive(MonOpRequestRef op);
+ void _reply_map(MonOpRequestRef op, epoch_t e);
+
+ bool preprocess_pgtemp(MonOpRequestRef op);
+ bool prepare_pgtemp(MonOpRequestRef op);
+
+ bool preprocess_pg_created(MonOpRequestRef op);
+ bool prepare_pg_created(MonOpRequestRef op);
+
+ bool preprocess_pg_ready_to_merge(MonOpRequestRef op);
+ bool prepare_pg_ready_to_merge(MonOpRequestRef op);
+
+ int _check_remove_pool(int64_t pool_id, const pg_pool_t &pool, std::ostream *ss);
+ bool _check_become_tier(
+ int64_t tier_pool_id, const pg_pool_t *tier_pool,
+ int64_t base_pool_id, const pg_pool_t *base_pool,
+ int *err, std::ostream *ss) const;
+ bool _check_remove_tier(
+ int64_t base_pool_id, const pg_pool_t *base_pool, const pg_pool_t *tier_pool,
+ int *err, std::ostream *ss) const;
+
+ int _prepare_remove_pool(int64_t pool, std::ostream *ss, bool no_fake);
+ int _prepare_rename_pool(int64_t pool, std::string newname);
+
+ bool enforce_pool_op_caps(MonOpRequestRef op);
+ bool preprocess_pool_op (MonOpRequestRef op);
+ bool preprocess_pool_op_create (MonOpRequestRef op);
+ bool prepare_pool_op (MonOpRequestRef op);
+ bool prepare_pool_op_create (MonOpRequestRef op);
+ bool prepare_pool_op_delete(MonOpRequestRef op);
+ int crush_rename_bucket(const std::string& srcname,
+ const std::string& dstname,
+ std::ostream *ss);
+ void check_legacy_ec_plugin(const std::string& plugin,
+ const std::string& profile) const;
+ int normalize_profile(const std::string& profilename,
+ ceph::ErasureCodeProfile &profile,
+ bool force,
+ std::ostream *ss);
+ int crush_rule_create_erasure(const std::string &name,
+ const std::string &profile,
+ int *rule,
+ std::ostream *ss);
+ int get_crush_rule(const std::string &rule_name,
+ int *crush_rule,
+ std::ostream *ss);
+ int get_erasure_code(const std::string &erasure_code_profile,
+ ceph::ErasureCodeInterfaceRef *erasure_code,
+ std::ostream *ss) const;
+ int prepare_pool_crush_rule(const unsigned pool_type,
+ const std::string &erasure_code_profile,
+ const std::string &rule_name,
+ int *crush_rule,
+ std::ostream *ss);
+ bool erasure_code_profile_in_use(
+ const mempool::osdmap::map<int64_t, pg_pool_t> &pools,
+ const std::string &profile,
+ std::ostream *ss);
+ int parse_erasure_code_profile(const std::vector<std::string> &erasure_code_profile,
+ std::map<std::string,std::string> *erasure_code_profile_map,
+ std::ostream *ss);
+ int prepare_pool_size(const unsigned pool_type,
+ const std::string &erasure_code_profile,
+ uint8_t repl_size,
+ unsigned *size, unsigned *min_size,
+ std::ostream *ss);
+ int prepare_pool_stripe_width(const unsigned pool_type,
+ const std::string &erasure_code_profile,
+ unsigned *stripe_width,
+ std::ostream *ss);
+ int check_pg_num(int64_t pool, int pg_num, int size, std::ostream* ss);
+ int prepare_new_pool(std::string& name,
+ int crush_rule,
+ const std::string &crush_rule_name,
+ unsigned pg_num, unsigned pgp_num,
+ unsigned pg_num_min,
+ unsigned pg_num_max,
+ uint64_t repl_size,
+ const uint64_t target_size_bytes,
+ const float target_size_ratio,
+ const std::string &erasure_code_profile,
+ const unsigned pool_type,
+ const uint64_t expected_num_objects,
+ FastReadType fast_read,
+ const std::string& pg_autoscale_mode,
+ bool bulk,
+ std::ostream *ss);
+ int prepare_new_pool(MonOpRequestRef op);
+
+ void set_pool_flags(int64_t pool_id, uint64_t flags);
+ void clear_pool_flags(int64_t pool_id, uint64_t flags);
+ bool update_pools_status();
+
+ bool _is_removed_snap(int64_t pool_id, snapid_t snapid);
+ bool _is_pending_removed_snap(int64_t pool_id, snapid_t snapid);
+
+ std::string make_purged_snap_epoch_key(epoch_t epoch);
+ std::string make_purged_snap_key(int64_t pool, snapid_t snap);
+ std::string make_purged_snap_key_value(int64_t pool, snapid_t snap, snapid_t num,
+ epoch_t epoch, ceph::buffer::list *v);
+
+ bool try_prune_purged_snaps();
+ int lookup_purged_snap(int64_t pool, snapid_t snap,
+ snapid_t *begin, snapid_t *end);
+
+ void insert_purged_snap_update(
+ int64_t pool,
+ snapid_t start, snapid_t end,
+ epoch_t epoch,
+ MonitorDBStore::TransactionRef t);
+
+ bool prepare_set_flag(MonOpRequestRef op, int flag);
+ bool prepare_unset_flag(MonOpRequestRef op, int flag);
+
+ void _pool_op_reply(MonOpRequestRef op,
+ int ret, epoch_t epoch, ceph::buffer::list *blp=NULL);
+
+ struct C_Booted : public C_MonOp {
+ OSDMonitor *cmon;
+ bool logit;
+ C_Booted(OSDMonitor *cm, MonOpRequestRef op_, bool l=true) :
+ C_MonOp(op_), cmon(cm), logit(l) {}
+ void _finish(int r) override {
+ if (r >= 0)
+ cmon->_booted(op, logit);
+ else if (r == -ECANCELED)
+ return;
+ else if (r == -EAGAIN)
+ cmon->dispatch(op);
+ else
+ ceph_abort_msg("bad C_Booted return value");
+ }
+ };
+
+ struct C_ReplyMap : public C_MonOp {
+ OSDMonitor *osdmon;
+ epoch_t e;
+ C_ReplyMap(OSDMonitor *o, MonOpRequestRef op_, epoch_t ee)
+ : C_MonOp(op_), osdmon(o), e(ee) {}
+ void _finish(int r) override {
+ if (r >= 0)
+ osdmon->_reply_map(op, e);
+ else if (r == -ECANCELED)
+ return;
+ else if (r == -EAGAIN)
+ osdmon->dispatch(op);
+ else
+ ceph_abort_msg("bad C_ReplyMap return value");
+ }
+ };
+ struct C_PoolOp : public C_MonOp {
+ OSDMonitor *osdmon;
+ int replyCode;
+ int epoch;
+ ceph::buffer::list reply_data;
+ C_PoolOp(OSDMonitor * osd, MonOpRequestRef op_, int rc, int e, ceph::buffer::list *rd=NULL) :
+ C_MonOp(op_), osdmon(osd), replyCode(rc), epoch(e) {
+ if (rd)
+ reply_data = *rd;
+ }
+ void _finish(int r) override {
+ if (r >= 0)
+ osdmon->_pool_op_reply(op, replyCode, epoch, &reply_data);
+ else if (r == -ECANCELED)
+ return;
+ else if (r == -EAGAIN)
+ osdmon->dispatch(op);
+ else
+ ceph_abort_msg("bad C_PoolOp return value");
+ }
+ };
+
+ bool preprocess_remove_snaps(MonOpRequestRef op);
+ bool prepare_remove_snaps(MonOpRequestRef op);
+
+ bool preprocess_get_purged_snaps(MonOpRequestRef op);
+
+ int load_metadata(int osd, std::map<std::string, std::string>& m,
+ std::ostream *err);
+ void count_metadata(const std::string& field, ceph::Formatter *f);
+
+ void reencode_incremental_map(ceph::buffer::list& bl, uint64_t features);
+ void reencode_full_map(ceph::buffer::list& bl, uint64_t features);
+public:
+ void count_metadata(const std::string& field, std::map<std::string,int> *out);
+ void get_versions(std::map<std::string, std::list<std::string>> &versions);
+protected:
+ int get_osd_objectstore_type(int osd, std::string *type);
+ bool is_pool_currently_all_bluestore(int64_t pool_id, const pg_pool_t &pool,
+ std::ostream *err);
+
+ // when we last received PG stats from each osd and the osd's osd_beacon_report_interval
+ std::map<int, std::pair<utime_t, int>> last_osd_report;
+ // TODO: use last_osd_report to store the osd report epochs, once we don't
+ // need to upgrade from pre-luminous releases.
+ std::map<int,epoch_t> osd_epochs;
+ LastEpochClean last_epoch_clean;
+ bool preprocess_beacon(MonOpRequestRef op);
+ bool prepare_beacon(MonOpRequestRef op);
+ epoch_t get_min_last_epoch_clean() const;
+
+ friend class C_UpdateCreatingPGs;
+ std::map<int, std::map<epoch_t, std::set<spg_t>>> creating_pgs_by_osd_epoch;
+ std::vector<pg_t> pending_created_pgs;
+ // the epoch when the pg mapping was calculated
+ epoch_t creating_pgs_epoch = 0;
+ creating_pgs_t creating_pgs;
+ mutable std::mutex creating_pgs_lock;
+
+ creating_pgs_t update_pending_pgs(const OSDMap::Incremental& inc,
+ const OSDMap& nextmap);
+ unsigned scan_for_creating_pgs(
+ const mempool::osdmap::map<int64_t,pg_pool_t>& pools,
+ const mempool::osdmap::set<int64_t>& removed_pools,
+ utime_t modified,
+ creating_pgs_t* creating_pgs) const;
+ std::pair<int32_t, pg_t> get_parent_pg(pg_t pgid) const;
+ void update_creating_pgs();
+ void check_pg_creates_subs();
+ epoch_t send_pg_creates(int osd, Connection *con, epoch_t next) const;
+
+ int32_t _allocate_osd_id(int32_t* existing_id);
+
+ int get_grace_interval_threshold();
+ bool grace_interval_threshold_exceeded(int last_failed);
+ void set_default_laggy_params(int target_osd);
+
+public:
+ OSDMonitor(CephContext *cct, Monitor &mn, Paxos &p, const std::string& service_name);
+
+ void tick() override; // check state, take actions
+
+ bool preprocess_command(MonOpRequestRef op);
+ bool prepare_command(MonOpRequestRef op);
+ bool prepare_command_impl(MonOpRequestRef op, const cmdmap_t& cmdmap);
+
+ int validate_osd_create(
+ const int32_t id,
+ const uuid_d& uuid,
+ const bool check_osd_exists,
+ int32_t* existing_id,
+ std::stringstream& ss);
+ int prepare_command_osd_create(
+ const int32_t id,
+ const uuid_d& uuid,
+ int32_t* existing_id,
+ std::stringstream& ss);
+ void do_osd_create(const int32_t id, const uuid_d& uuid,
+ const std::string& device_class,
+ int32_t* new_id);
+ int prepare_command_osd_purge(int32_t id, std::stringstream& ss);
+ int prepare_command_osd_destroy(int32_t id, std::stringstream& ss);
+ int _prepare_command_osd_crush_remove(
+ CrushWrapper &newcrush,
+ int32_t id,
+ int32_t ancestor,
+ bool has_ancestor,
+ bool unlink_only);
+ void do_osd_crush_remove(CrushWrapper& newcrush);
+ int prepare_command_osd_crush_remove(
+ CrushWrapper &newcrush,
+ int32_t id,
+ int32_t ancestor,
+ bool has_ancestor,
+ bool unlink_only);
+ int prepare_command_osd_remove(int32_t id);
+ int prepare_command_osd_new(
+ MonOpRequestRef op,
+ const cmdmap_t& cmdmap,
+ const std::map<std::string,std::string>& secrets,
+ std::stringstream &ss,
+ ceph::Formatter *f);
+
+ int prepare_command_pool_set(const cmdmap_t& cmdmap,
+ std::stringstream& ss);
+
+ int prepare_command_pool_application(const std::string &prefix,
+ const cmdmap_t& cmdmap,
+ std::stringstream& ss);
+ int preprocess_command_pool_application(const std::string &prefix,
+ const cmdmap_t& cmdmap,
+ std::stringstream& ss,
+ bool *modified);
+ int _command_pool_application(const std::string &prefix,
+ const cmdmap_t& cmdmap,
+ std::stringstream& ss,
+ bool *modified,
+ bool preparing);
+
+ bool handle_osd_timeouts(const utime_t &now,
+ std::map<int, std::pair<utime_t, int>> &last_osd_report);
+
+ void send_latest(MonOpRequestRef op, epoch_t start=0);
+ void send_latest_now_nodelete(MonOpRequestRef op, epoch_t start=0) {
+ op->mark_osdmon_event(__func__);
+ send_incremental(op, start);
+ }
+
+ int get_version(version_t ver, ceph::buffer::list& bl) override;
+ int get_version(version_t ver, uint64_t feature, ceph::buffer::list& bl);
+
+ int get_version_full(version_t ver, uint64_t feature, ceph::buffer::list& bl);
+ int get_version_full(version_t ver, ceph::buffer::list& bl) override;
+ int get_inc(version_t ver, OSDMap::Incremental& inc);
+ int get_full_from_pinned_map(version_t ver, ceph::buffer::list& bl);
+
+ epoch_t blocklist(const entity_addrvec_t& av, utime_t until);
+ epoch_t blocklist(entity_addr_t a, utime_t until);
+
+ void dump_info(ceph::Formatter *f);
+ int dump_osd_metadata(int osd, ceph::Formatter *f, std::ostream *err);
+ void print_nodes(ceph::Formatter *f);
+
+ void check_osdmap_sub(Subscription *sub);
+ void check_pg_creates_sub(Subscription *sub);
+
+ void do_application_enable(int64_t pool_id, const std::string &app_name,
+ const std::string &app_key="",
+ const std::string &app_value="",
+ bool force=false);
+ void do_set_pool_opt(int64_t pool_id, pool_opts_t::key_t opt,
+ pool_opts_t::value_t);
+
+ void add_flag(int flag) {
+ if (!(osdmap.flags & flag)) {
+ if (pending_inc.new_flags < 0)
+ pending_inc.new_flags = osdmap.flags;
+ pending_inc.new_flags |= flag;
+ }
+ }
+
+ void remove_flag(int flag) {
+ if(osdmap.flags & flag) {
+ if (pending_inc.new_flags < 0)
+ pending_inc.new_flags = osdmap.flags;
+ pending_inc.new_flags &= ~flag;
+ }
+ }
+ void convert_pool_priorities(void);
+ /**
+ * Find the pools which are requested to be put into stretch mode,
+ * validate that they are allowed to be in stretch mode (eg, are replicated)
+ * and place copies of them in the pools set.
+ * This does not make any changes to the pools or state; it's just
+ * a safety-check-and-collect function.
+ */
+ void try_enable_stretch_mode_pools(stringstream& ss, bool *okay,
+ int *errcode,
+ set<pg_pool_t*>* pools, const string& new_crush_rule);
+ /**
+ * Check validity of inputs and OSD/CRUSH state to
+ * engage stretch mode. Designed to be used with
+ * MonmapMonitor::try_enable_stretch_mode() where we call both twice,
+ * first with commit=false to validate.
+ * @param ss: a stringstream to write errors into
+ * @param okay: Filled to true if okay, false if validation fails
+ * @param errcode: filled with -errno if there's a problem
+ * @param commit: true if we should commit the change, false if just testing
+ * @param dividing_bucket: the bucket type (eg 'dc') that divides the cluster
+ * @param bucket_count: The number of buckets required in peering.
+ * Currently must be 2.
+ * @param pools: The pg_pool_ts which are being set to stretch mode (obtained
+ * from try_enable_stretch_mode_pools()).
+ * @param new_crush_rule: The crush rule to set the pools to.
+ */
+ void try_enable_stretch_mode(stringstream& ss, bool *okay,
+ int *errcode, bool commit,
+ const string& dividing_bucket,
+ uint32_t bucket_count,
+ const set<pg_pool_t*>& pools,
+ const string& new_crush_rule);
+ /**
+ * Check the input dead_buckets mapping (buckets->dead monitors) to see
+ * if the OSDs are also down. If so, fill in really_down_buckets and
+ * really_down_mons and return true; else return false.
+ */
+ bool check_for_dead_crush_zones(const map<string,set<string>>& dead_buckets,
+ set<int> *really_down_buckets,
+ set<string> *really_down_mons);
+ /**
+ * Set degraded mode in the OSDMap, adding the given dead buckets to the dead set
+ * and using the live_zones (should presently be size 1)
+ */
+ void trigger_degraded_stretch_mode(const set<int>& dead_buckets,
+ const set<string>& live_zones);
+ /**
+ * This is just to maintain stretch_recovery_triggered; below
+ */
+ void set_degraded_stretch_mode();
+ /**
+ * Set recovery stretch mode in the OSDMap, resetting pool size back to normal
+ */
+ void trigger_recovery_stretch_mode();
+ /**
+ * This is just to maintain stretch_recovery_triggered; below
+ */
+ void set_recovery_stretch_mode();
+ /**
+ * This is just to maintain stretch_recovery_triggered; below
+ */
+ void set_healthy_stretch_mode();
+ /**
+ * Tells the OSD there's a new pg digest, in case it's interested.
+ * (It's interested when in recovering stretch mode.)
+ */
+ void notify_new_pg_digest();
+ /**
+ * Check if we can exit recovery stretch mode and go back to normal.
+ * @param force If true, we will force the exit through once it is legal,
+ * without regard to the reported PG status.
+ */
+ void try_end_recovery_stretch_mode(bool force);
+ /**
+ * Sets the osdmap and pg_pool_t values back to healthy stretch mode status.
+ */
+ void trigger_healthy_stretch_mode();
+ /**
+ * Obtain the crush rule being used for stretch pools.
+ * Note that right now this is heuristic and simply selects the
+ * most-used rule on replicated stretch pools.
+ * @return the crush rule ID, or a negative errno
+ */
+ int get_replicated_stretch_crush_rule();
+private:
+ utime_t stretch_recovery_triggered; // what time we committed a switch to recovery mode
+};
+
+#endif