summaryrefslogtreecommitdiffstats
path: root/src/osd
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-23 16:45:17 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-23 16:45:44 +0000
commit17d6a993fc17d533460c5f40f3908c708e057c18 (patch)
tree1a3bd93e0ecd74fa02f93a528fe2f87e5314c4b5 /src/osd
parentReleasing progress-linux version 18.2.2-0progress7.99u1. (diff)
downloadceph-17d6a993fc17d533460c5f40f3908c708e057c18.tar.xz
ceph-17d6a993fc17d533460c5f40f3908c708e057c18.zip
Merging upstream version 18.2.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/osd')
-rw-r--r--src/osd/OSD.cc259
-rw-r--r--src/osd/OSD.h16
-rw-r--r--src/osd/OSDMap.cc16
-rw-r--r--src/osd/OSDMap.h1
-rw-r--r--src/osd/OpRequest.h1
-rw-r--r--src/osd/PG.cc10
-rw-r--r--src/osd/PG.h13
-rw-r--r--src/osd/PrimaryLogPG.cc14
-rw-r--r--src/osd/osd_op_util.cc19
-rw-r--r--src/osd/osd_op_util.h2
-rw-r--r--src/osd/osd_types.cc28
-rw-r--r--src/osd/osd_types.h16
-rw-r--r--src/osd/scheduler/OpScheduler.cc21
-rw-r--r--src/osd/scheduler/OpScheduler.h26
-rw-r--r--src/osd/scheduler/mClockScheduler.cc2
-rw-r--r--src/osd/scheduler/mClockScheduler.h28
16 files changed, 359 insertions, 113 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index c61e7d332..515eb6042 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1703,7 +1703,7 @@ void OSDService::queue_recovery_context(
epoch_t e = get_osdmap_epoch();
uint64_t cost_for_queue = [this, cost] {
- if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) {
return cost;
} else {
/* We retain this legacy behavior for WeightedPriorityQueue. It seems to
@@ -1726,14 +1726,32 @@ void OSDService::queue_recovery_context(
e));
}
-void OSDService::queue_for_snap_trim(PG *pg)
+void OSDService::queue_for_snap_trim(PG *pg, uint64_t cost_per_object)
{
dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
+ uint64_t cost_for_queue = [this, cost_per_object] {
+ if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ /* The cost calculation is valid for most snap trim iterations except
+ * for the following cases:
+ * 1) The penultimate iteration which may return 1 object to trim, in
+ * which case the cost will be off by a factor equivalent to the
+ * average object size, and,
+ * 2) The final iteration which returns -ENOENT and performs clean-ups.
+ */
+ return cost_per_object * cct->_conf->osd_pg_max_concurrent_snap_trims;
+ } else {
+ /* We retain this legacy behavior for WeightedPriorityQueue.
+ * This branch should be removed after Squid.
+ */
+ return cct->_conf->osd_snap_trim_cost;
+ }
+ }();
+
enqueue_back(
OpSchedulerItem(
unique_ptr<OpSchedulerItem::OpQueueable>(
new PGSnapTrim(pg->get_pgid(), pg->get_osdmap_epoch())),
- cct->_conf->osd_snap_trim_cost,
+ cost_for_queue,
cct->_conf->osd_snap_trim_priority,
ceph_clock_now(),
0,
@@ -1771,7 +1789,7 @@ int64_t OSDService::get_scrub_cost()
{
int64_t cost_for_queue = cct->_conf->osd_scrub_cost;
- if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) {
cost_for_queue = cct->_conf->osd_scrub_event_cost *
cct->_conf->osd_shallow_scrub_chunk_max;
}
@@ -2049,7 +2067,7 @@ void OSDService::_queue_for_recovery(
ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
uint64_t cost_for_queue = [this, &reserved_pushes, &p] {
- if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) {
return p.cost_per_object * reserved_pushes;
} else {
/* We retain this legacy behavior for WeightedPriorityQueue. It seems to
@@ -2092,6 +2110,22 @@ int heap(CephContext& cct,
} // namespace ceph::osd_cmds
+void OSD::write_superblock(CephContext* cct, OSDSuperblock& sb, ObjectStore::Transaction& t)
+{
+ dout(10) << "write_superblock " << sb << dendl;
+
+ //hack: at minimum it's using the baseline feature set
+ if (!sb.compat_features.incompat.contains(CEPH_OSD_FEATURE_INCOMPAT_BASE))
+ sb.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
+
+ bufferlist bl;
+ encode(sb, bl);
+ t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
+ std::map<std::string, ceph::buffer::list> attrs;
+ attrs.emplace(OSD_SUPERBLOCK_OMAP_KEY, bl);
+ t.omap_setkeys(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, attrs);
+}
+
int OSD::mkfs(CephContext *cct,
std::unique_ptr<ObjectStore> store,
uuid_d fsid,
@@ -2153,15 +2187,11 @@ int OSD::mkfs(CephContext *cct,
sb.osd_fsid = store->get_fsid();
sb.whoami = whoami;
sb.compat_features = get_osd_initial_compat_set();
-
- bufferlist bl;
- encode(sb, bl);
-
ObjectStore::CollectionHandle ch = store->create_new_collection(
coll_t::meta());
ObjectStore::Transaction t;
t.create_collection(coll_t::meta(), 0);
- t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
+ write_superblock(cct, sb, t);
ret = store->queue_transaction(ch, std::move(t));
if (ret) {
derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: "
@@ -2385,13 +2415,55 @@ OSD::OSD(CephContext *cct_,
trace_endpoint.copy_name(ss.str());
#endif
+ // Determine scheduler type for this OSD
+ auto get_op_queue_type = [this, &conf = cct->_conf]() {
+ op_queue_type_t queue_type;
+ if (auto type = conf.get_val<std::string>("osd_op_queue");
+ type != "debug_random") {
+ if (auto qt = get_op_queue_type_by_name(type); qt.has_value()) {
+ queue_type = *qt;
+ } else {
+ // This should never happen
+ dout(0) << "Invalid value passed for 'osd_op_queue': " << type << dendl;
+ ceph_assert(0 == "Unsupported op queue type");
+ }
+ } else {
+ static const std::vector<op_queue_type_t> index_lookup = {
+ op_queue_type_t::mClockScheduler,
+ op_queue_type_t::WeightedPriorityQueue
+ };
+ std::mt19937 random_gen(std::random_device{}());
+ auto which = random_gen() % index_lookup.size();
+ queue_type = index_lookup[which];
+ }
+ return queue_type;
+ };
+ op_queue_type_t op_queue = get_op_queue_type();
+
+ // Determine op queue cutoff
+ auto get_op_queue_cut_off = [&conf = cct->_conf]() {
+ if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+ std::random_device rd;
+ std::mt19937 random_gen(rd());
+ return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ };
+ unsigned op_queue_cut_off = get_op_queue_cut_off();
+
// initialize shards
num_shards = get_num_op_shards();
for (uint32_t i = 0; i < num_shards; i++) {
OSDShard *one_shard = new OSDShard(
i,
cct,
- this);
+ this,
+ op_queue,
+ op_queue_cut_off);
shards.push_back(one_shard);
}
}
@@ -3121,6 +3193,19 @@ will start to track new ops received afterwards.";
scrub_purged_snaps();
}
+ else if (prefix == "reset_purged_snaps_last") {
+ lock_guard l(osd_lock);
+ superblock.purged_snaps_last = 0;
+ ObjectStore::Transaction t;
+ dout(10) << __func__ << " updating superblock" << dendl;
+ write_superblock(cct, superblock, t);
+ ret = store->queue_transaction(service.meta_ch, std::move(t), nullptr);
+ if (ret < 0) {
+ ss << "Error writing superblock: " << cpp_strerror(ret);
+ goto out;
+ }
+ }
+
else if (prefix == "dump_osd_network") {
lock_guard l(osd_lock);
int64_t value = 0;
@@ -3762,7 +3847,7 @@ int OSD::init()
}
ObjectStore::Transaction t;
- write_superblock(t);
+ write_superblock(cct, superblock, t);
r = store->queue_transaction(service.meta_ch, std::move(t));
if (r < 0)
goto out;
@@ -4300,6 +4385,11 @@ void OSD::final_init()
"Scrub purged_snaps vs snapmapper index");
ceph_assert(r == 0);
r = admin_socket->register_command(
+ "reset_purged_snaps_last",
+ asok_hook,
+ "Reset the superblock's purged_snaps_last");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
"scrubdebug " \
"name=pgid,type=CephPgid " \
"name=cmd,type=CephChoices,strings=block|unblock|set|unset " \
@@ -4572,7 +4662,7 @@ int OSD::shutdown()
superblock.mounted = service.get_boot_epoch();
superblock.clean_thru = get_osdmap_epoch();
ObjectStore::Transaction t;
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int r = store->queue_transaction(service.meta_ch, std::move(t));
if (r) {
derr << "OSD::shutdown: error writing superblock: "
@@ -4769,31 +4859,81 @@ int OSD::update_crush_device_class()
}
}
-void OSD::write_superblock(ObjectStore::Transaction& t)
-{
- dout(10) << "write_superblock " << superblock << dendl;
-
- //hack: at minimum it's using the baseline feature set
- if (!superblock.compat_features.incompat.contains(CEPH_OSD_FEATURE_INCOMPAT_BASE))
- superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
-
- bufferlist bl;
- encode(superblock, bl);
- t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
-}
int OSD::read_superblock()
{
+ // Read superblock from both object data and omap metadata
+ // for better robustness.
+ // Use the most recent superblock replica if obtained versions
+ // mismatch.
bufferlist bl;
- int r = store->read(service.meta_ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, bl);
- if (r < 0)
- return r;
- auto p = bl.cbegin();
- decode(superblock, p);
+ set<string> keys;
+ keys.insert(OSD_SUPERBLOCK_OMAP_KEY);
+ map<string, bufferlist> vals;
+ OSDSuperblock super_omap;
+ OSDSuperblock super_disk;
+ int r_omap = store->omap_get_values(
+ service.meta_ch, OSD_SUPERBLOCK_GOBJECT, keys, &vals);
+ if (r_omap >= 0 && vals.size() > 0) {
+ try {
+ auto p = vals.begin()->second.cbegin();
+ decode(super_omap, p);
+ } catch(...) {
+ derr << __func__ << " omap replica is corrupted."
+ << dendl;
+ r_omap = -EFAULT;
+ }
+ } else {
+ derr << __func__ << " omap replica is missing."
+ << dendl;
+ r_omap = -ENOENT;
+ }
+ int r_disk = store->read(service.meta_ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, bl);
+ if (r_disk >= 0) {
+ try {
+ auto p = bl.cbegin();
+ decode(super_disk, p);
+ } catch(...) {
+ derr << __func__ << " disk replica is corrupted."
+ << dendl;
+ r_disk = -EFAULT;
+ }
+ } else {
+ derr << __func__ << " disk replica is missing."
+ << dendl;
+ r_disk = -ENOENT;
+ }
- dout(10) << "read_superblock " << superblock << dendl;
+ if (r_omap >= 0 && r_disk < 0) {
+ std::swap(superblock, super_omap);
+ dout(1) << __func__ << " got omap replica but failed to get disk one."
+ << dendl;
+ } else if (r_omap < 0 && r_disk >= 0) {
+ std::swap(superblock, super_disk);
+ dout(1) << __func__ << " got disk replica but failed to get omap one."
+ << dendl;
+ } else if (r_omap < 0 && r_disk < 0) {
+ // error to be logged by the caller
+ return -ENOENT;
+ } else {
+ std::swap(superblock, super_omap); // let omap be the primary source
+ if (superblock.current_epoch != super_disk.current_epoch) {
+ derr << __func__ << " got mismatching superblocks, omap:"
+ << superblock << " vs. disk:" << super_disk
+ << dendl;
+ if (superblock.current_epoch < super_disk.current_epoch) {
+ std::swap(superblock, super_disk);
+ dout(0) << __func__ << " using disk superblock"
+ << dendl;
+ } else {
+ dout(0) << __func__ << " using omap superblock"
+ << dendl;
+ }
+ }
+ }
+ dout(10) << "read_superblock " << superblock << dendl;
return 0;
}
@@ -6695,7 +6835,7 @@ void OSD::handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *m)
m->purged_snaps);
}
superblock.purged_snaps_last = m->last;
- write_superblock(t);
+ write_superblock(cct, superblock, t);
store->queue_transaction(
service.meta_ch,
std::move(t));
@@ -7179,7 +7319,7 @@ void OSD::scrub_purged_snaps()
dout(10) << __func__ << " done queueing pgs, updating superblock" << dendl;
ObjectStore::Transaction t;
superblock.last_purged_snaps_scrub = ceph_clock_now();
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr);
ceph_assert(tr == 0);
if (is_active()) {
@@ -7892,7 +8032,7 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps)
num++;
if (num >= cct->_conf->osd_target_transaction_size && num >= nreceived) {
service.publish_superblock(superblock);
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr);
ceph_assert(tr == 0);
num = 0;
@@ -7908,7 +8048,7 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps)
}
if (num > 0) {
service.publish_superblock(superblock);
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr);
ceph_assert(tr == 0);
}
@@ -8220,7 +8360,19 @@ void OSD::handle_osd_map(MOSDMap *m)
{
bufferlist bl;
::encode(pg_num_history, bl);
- t.write(coll_t::meta(), make_pg_num_history_oid(), 0, bl.length(), bl);
+ auto oid = make_pg_num_history_oid();
+ t.truncate(coll_t::meta(), oid, 0); // we don't need bytes left if new data
+ // block is shorter than the previous
+ // one. And better to trim them, e.g.
+ // this allows to avoid csum eroors
+ // when issuing overwrite
+ // (which happens to be partial)
+ // and original data is corrupted.
+ // Another side effect is that the
+ // superblock is not permanently
+ // anchored to a fixed disk location
+ // any more.
+ t.write(coll_t::meta(), oid, 0, bl.length(), bl);
dout(20) << __func__ << " pg_num_history " << pg_num_history << dendl;
}
@@ -8240,7 +8392,7 @@ void OSD::handle_osd_map(MOSDMap *m)
}
// superblock and commit
- write_superblock(t);
+ write_superblock(cct, superblock, t);
t.register_on_commit(new C_OnMapCommit(this, start, last, m));
store->queue_transaction(
service.meta_ch,
@@ -8558,7 +8710,7 @@ void OSD::check_osdmap_features()
dout(0) << __func__ << " enabling on-disk ERASURE CODES compat feature" << dendl;
superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SHARDS);
ObjectStore::Transaction t;
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int err = store->queue_transaction(service.meta_ch, std::move(t), NULL);
ceph_assert(err == 0);
}
@@ -9888,7 +10040,7 @@ void OSD::maybe_override_max_osd_capacity_for_qos()
// If the scheduler enabled is mclock, override the default
// osd capacity with the value obtained from running the
// osd bench test. This is later used to setup mclock.
- if ((cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") &&
+ if ((op_queue_type_t::mClockScheduler == osd_op_queue_type()) &&
(cct->_conf.get_val<bool>("osd_mclock_skip_benchmark") == false) &&
(!unsupported_objstore_for_qos())) {
std::string max_capacity_iops_config;
@@ -9988,7 +10140,7 @@ bool OSD::maybe_override_options_for_qos(const std::set<std::string> *changed)
{
// Override options only if the scheduler enabled is mclock and the
// underlying objectstore is supported by mclock
- if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+ if (op_queue_type_t::mClockScheduler == osd_op_queue_type() &&
!unsupported_objstore_for_qos()) {
static const std::map<std::string, uint64_t> recovery_qos_defaults {
{"osd_recovery_max_active", 0},
@@ -10090,9 +10242,8 @@ void OSD::maybe_override_sleep_options_for_qos()
{
// Override options only if the scheduler enabled is mclock and the
// underlying objectstore is supported by mclock
- if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+ if (op_queue_type_t::mClockScheduler == osd_op_queue_type() &&
!unsupported_objstore_for_qos()) {
-
// Override the various sleep settings
// Disable recovery sleep
cct->_conf.set_val("osd_recovery_sleep", std::to_string(0));
@@ -10121,7 +10272,7 @@ void OSD::maybe_override_cost_for_qos()
{
// If the scheduler enabled is mclock, override the default PG deletion cost
// so that mclock can meet the QoS goals.
- if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+ if (op_queue_type_t::mClockScheduler == osd_op_queue_type() &&
!unsupported_objstore_for_qos()) {
uint64_t pg_delete_cost = 15728640;
cct->_conf.set_val("osd_pg_delete_cost", std::to_string(pg_delete_cost));
@@ -10195,6 +10346,16 @@ bool OSD::unsupported_objstore_for_qos()
store->get_type()) != unsupported_objstores.end();
}
+op_queue_type_t OSD::osd_op_queue_type() const
+{
+ /**
+ * All OSD shards employ the same scheduler type. Therefore, return
+ * the scheduler type set on the OSD shard with lowest id(0).
+ */
+ ceph_assert(shards.size());
+ return shards[0]->get_op_queue_type();
+}
+
void OSD::update_log_config()
{
auto parsed_options = clog->parse_client_options(cct);
@@ -10695,17 +10856,17 @@ void OSDShard::update_scheduler_config()
scheduler->update_configuration();
}
-std::string OSDShard::get_scheduler_type()
+op_queue_type_t OSDShard::get_op_queue_type() const
{
- std::ostringstream scheduler_type;
- scheduler_type << *scheduler;
- return scheduler_type.str();
+ return scheduler->get_type();
}
OSDShard::OSDShard(
int id,
CephContext *cct,
- OSD *osd)
+ OSD *osd,
+ op_queue_type_t osd_op_queue,
+ unsigned osd_op_queue_cut_off)
: shard_id(id),
cct(cct),
osd(osd),
@@ -10717,7 +10878,7 @@ OSDShard::OSDShard(
shard_lock{make_mutex(shard_lock_name)},
scheduler(ceph::osd::scheduler::make_scheduler(
cct, osd->whoami, osd->num_shards, id, osd->store->is_rotational(),
- osd->store->get_type(), osd->monc)),
+ osd->store->get_type(), osd_op_queue, osd_op_queue_cut_off, osd->monc)),
context_queue(sdata_wait_lock, sdata_cond)
{
dout(0) << "using op scheduler " << *scheduler << dendl;
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 00fab7ec8..d25879001 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -511,7 +511,7 @@ public:
GenContext<ThreadPool::TPHandle&> *c,
uint64_t cost,
int priority);
- void queue_for_snap_trim(PG *pg);
+ void queue_for_snap_trim(PG *pg, uint64_t cost);
void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority);
@@ -1050,12 +1050,14 @@ struct OSDShard {
void register_and_wake_split_child(PG *pg);
void unprime_split_children(spg_t parent, unsigned old_pg_num);
void update_scheduler_config();
- std::string get_scheduler_type();
+ op_queue_type_t get_op_queue_type() const;
OSDShard(
int id,
CephContext *cct,
- OSD *osd);
+ OSD *osd,
+ op_queue_type_t osd_op_queue,
+ unsigned osd_op_queue_cut_off);
};
class OSD : public Dispatcher,
@@ -1235,8 +1237,9 @@ private:
// -- superblock --
OSDSuperblock superblock;
- void write_superblock();
- void write_superblock(ObjectStore::Transaction& t);
+ static void write_superblock(CephContext* cct,
+ OSDSuperblock& sb,
+ ObjectStore::Transaction& t);
int read_superblock();
void clear_temp_objects();
@@ -2055,6 +2058,9 @@ public:
OSDService service;
friend class OSDService;
+ /// op queue type set for the OSD
+ op_queue_type_t osd_op_queue_type() const;
+
private:
void set_perf_queries(const ConfigPayload &config_payload);
MetricPayload get_perf_reports();
diff --git a/src/osd/OSDMap.cc b/src/osd/OSDMap.cc
index 11f9a87d7..5fb73084d 100644
--- a/src/osd/OSDMap.cc
+++ b/src/osd/OSDMap.cc
@@ -933,6 +933,10 @@ void OSDMap::Incremental::decode(ceph::buffer::list::const_iterator& bl)
decode(new_last_up_change, bl);
decode(new_last_in_change, bl);
}
+ if (struct_v >= 9) {
+ decode(new_pg_upmap_primary, bl);
+ decode(old_pg_upmap_primary, bl);
+ }
DECODE_FINISH(bl); // client-usable data
}
@@ -1871,6 +1875,18 @@ uint64_t OSDMap::get_up_osd_features() const
return cached_up_osd_features;
}
+bool OSDMap::any_osd_laggy() const
+{
+ for (int osd = 0; osd < max_osd; ++osd) {
+ if (!is_up(osd)) { continue; }
+ const auto &xi = get_xinfo(osd);
+ if (xi.laggy_probability || xi.laggy_interval) {
+ return true;
+ }
+ }
+ return false;
+}
+
void OSDMap::dedup(const OSDMap *o, OSDMap *n)
{
using ceph::encode;
diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h
index 3a3e6155e..510da1116 100644
--- a/src/osd/OSDMap.h
+++ b/src/osd/OSDMap.h
@@ -655,6 +655,7 @@ private:
public:
bool have_crc() const { return crc_defined; }
uint32_t get_crc() const { return crc; }
+ bool any_osd_laggy() const;
std::shared_ptr<CrushWrapper> crush; // hierarchical map
bool stretch_mode_enabled; // we are in stretch mode, requiring multiple sites
diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h
index 1a608b583..832c79353 100644
--- a/src/osd/OpRequest.h
+++ b/src/osd/OpRequest.h
@@ -35,6 +35,7 @@ public:
bool op_info_needs_init() const { return op_info.get_flags() == 0; }
bool check_rmw(int flag) const { return op_info.check_rmw(flag); }
bool may_read() const { return op_info.may_read(); }
+ bool may_read_data() const { return op_info.may_read_data(); }
bool may_write() const { return op_info.may_write(); }
bool may_cache() const { return op_info.may_cache(); }
bool rwordered_forced() const { return op_info.rwordered_forced(); }
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index fa49038ed..245aa8d00 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -420,15 +420,7 @@ void PG::queue_recovery()
dout(10) << "queue_recovery -- queuing" << dendl;
recovery_queued = true;
// Let cost per object be the average object size
- auto num_bytes = static_cast<uint64_t>(
- std::max<int64_t>(
- 0, // ensure bytes is non-negative
- info.stats.stats.sum.num_bytes));
- auto num_objects = static_cast<uint64_t>(
- std::max<int64_t>(
- 1, // ensure objects is non-negative and non-zero
- info.stats.stats.sum.num_objects));
- uint64_t cost_per_object = std::max<uint64_t>(num_bytes / num_objects, 1);
+ uint64_t cost_per_object = get_average_object_size();
osd->queue_for_recovery(
this, cost_per_object, recovery_state.get_recovery_op_priority()
);
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 88c893b35..2f8b59061 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -1019,6 +1019,19 @@ public:
return num_bytes;
}
+ uint64_t get_average_object_size() {
+ ceph_assert(ceph_mutex_is_locked_by_me(_lock));
+ auto num_bytes = static_cast<uint64_t>(
+ std::max<int64_t>(
+ 0, // ensure bytes is non-negative
+ info.stats.stats.sum.num_bytes));
+ auto num_objects = static_cast<uint64_t>(
+ std::max<int64_t>(
+ 1, // ensure objects is non-negative and non-zero
+ info.stats.stats.sum.num_objects));
+ return std::max<uint64_t>(num_bytes / num_objects, 1);
+ }
+
protected:
/*
diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc
index 243e127eb..49178dc40 100644
--- a/src/osd/PrimaryLogPG.cc
+++ b/src/osd/PrimaryLogPG.cc
@@ -1667,7 +1667,11 @@ bool PrimaryLogPG::get_rw_locks(bool write_ordered, OpContext *ctx)
* to get the second.
*/
if (write_ordered && ctx->op->may_read()) {
- ctx->lock_type = RWState::RWEXCL;
+ if (ctx->op->may_read_data()) {
+ ctx->lock_type = RWState::RWEXCL;
+ } else {
+ ctx->lock_type = RWState::RWWRITE;
+ }
} else if (write_ordered) {
ctx->lock_type = RWState::RWWRITE;
} else {
@@ -5953,8 +5957,8 @@ int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
encode(m, osd_op.outdata); // re-encode since it might be modified
::encode_destructively(data_bl, osd_op.outdata);
- dout(10) << " sparse_read got " << r << " bytes from object "
- << soid << dendl;
+ dout(10) << " sparse_read got " << m.size() << " extents and " << r
+ << " bytes from object " << soid << dendl;
}
ctx->delta_stats.num_rd_kb += shift_round_up(op.extent.length, 10);
@@ -15627,8 +15631,10 @@ PrimaryLogPG::AwaitAsyncWork::AwaitAsyncWork(my_context ctx)
NamedState(nullptr, "Trimming/AwaitAsyncWork")
{
auto *pg = context< SnapTrimmer >().pg;
+ // Determine cost in terms of the average object size
+ uint64_t cost_per_object = pg->get_average_object_size();
context< SnapTrimmer >().log_enter(state_name);
- context< SnapTrimmer >().pg->osd->queue_for_snap_trim(pg);
+ context< SnapTrimmer >().pg->osd->queue_for_snap_trim(pg, cost_per_object);
pg->state_set(PG_STATE_SNAPTRIM);
pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
pg->publish_stats_to_osd();
diff --git a/src/osd/osd_op_util.cc b/src/osd/osd_op_util.cc
index a33e2f110..d400a94db 100644
--- a/src/osd/osd_op_util.cc
+++ b/src/osd/osd_op_util.cc
@@ -16,6 +16,7 @@ bool OpInfo::check_rmw(int flag) const {
ceph_assert(rmw_flags != 0);
return rmw_flags & flag;
}
+// Returns true if op performs a read (including of the object_info).
bool OpInfo::may_read() const {
return need_read_cap() || check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ);
}
@@ -51,6 +52,16 @@ bool OpInfo::need_skip_promote() const {
bool OpInfo::allows_returnvec() const {
return check_rmw(CEPH_OSD_RMW_FLAG_RETURNVEC);
}
+/**
+ * may_read_data()
+ *
+ * Returns true if op reads information other than the object_info. Requires that the
+ * osd flush any prior writes prior to servicing this op. Includes any information not
+ * cached by the osd in the object_info or snapset.
+ */
+bool OpInfo::may_read_data() const {
+ return check_rmw(CEPH_OSD_RMW_FLAG_READ_DATA);
+}
void OpInfo::set_rmw_flags(int flags) {
rmw_flags |= flags;
@@ -67,6 +78,7 @@ void OpInfo::set_skip_handle_cache() { set_rmw_flags(CEPH_OSD_RMW_FLAG_SKIP_HAND
void OpInfo::set_skip_promote() { set_rmw_flags(CEPH_OSD_RMW_FLAG_SKIP_PROMOTE); }
void OpInfo::set_force_rwordered() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RWORDERED); }
void OpInfo::set_returnvec() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RETURNVEC); }
+void OpInfo::set_read_data() { set_rmw_flags(CEPH_OSD_RMW_FLAG_READ_DATA); }
int OpInfo::set_from_op(
@@ -108,8 +120,12 @@ int OpInfo::set_from_op(
if (ceph_osd_op_mode_modify(iter->op.op))
set_write();
}
- if (ceph_osd_op_mode_read(iter->op.op))
+ if (ceph_osd_op_mode_read(iter->op.op)) {
set_read();
+ if (iter->op.op != CEPH_OSD_OP_STAT) {
+ set_read_data();
+ }
+ }
// set READ flag if there are src_oids
if (iter->soid.oid.name.length())
@@ -202,6 +218,7 @@ int OpInfo::set_from_op(
// watch state (and may return early if the watch exists) or, in
// the case of ping, is simply a read op.
set_read();
+ set_read_data();
// fall through
case CEPH_OSD_OP_NOTIFY:
case CEPH_OSD_OP_NOTIFY_ACK:
diff --git a/src/osd/osd_op_util.h b/src/osd/osd_op_util.h
index 300fe40cc..fcd06c74b 100644
--- a/src/osd/osd_op_util.h
+++ b/src/osd/osd_op_util.h
@@ -47,6 +47,7 @@ public:
bool check_rmw(int flag) const ;
bool may_read() const;
+ bool may_read_data() const;
bool may_write() const;
bool may_cache() const;
bool rwordered_forced() const;
@@ -70,6 +71,7 @@ public:
void set_skip_promote();
void set_force_rwordered();
void set_returnvec();
+ void set_read_data();
int set_from_op(
const MOSDOp *m,
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index 14694de19..841a44b32 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -7371,3 +7371,31 @@ bool PGLSPlainFilter::filter(const hobject_t& obj,
{
return xattr_data.contents_equal(val.c_str(), val.size());
}
+
+std::string_view get_op_queue_type_name(const op_queue_type_t &q)
+{
+ switch (q) {
+ case op_queue_type_t::WeightedPriorityQueue:
+ return "wpq";
+ case op_queue_type_t::mClockScheduler:
+ return "mclock_scheduler";
+ case op_queue_type_t::PrioritizedQueue:
+ return "PrioritizedQueue";
+ default:
+ return "unknown";
+ }
+}
+
+std::optional<op_queue_type_t> get_op_queue_type_by_name(
+ const std::string_view &s)
+{
+ if (s == "wpq") {
+ return op_queue_type_t::WeightedPriorityQueue;
+ } else if (s == "mclock_scheduler") {
+ return op_queue_type_t::mClockScheduler;
+ } else if (s == "PrioritizedQueue") {
+ return op_queue_type_t::PrioritizedQueue;
+ } else {
+ return std::nullopt;
+ }
+}
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index afed5fa83..5a50a32e7 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -367,12 +367,14 @@ enum {
CEPH_OSD_RMW_FLAG_SKIP_PROMOTE = (1 << 9),
CEPH_OSD_RMW_FLAG_RWORDERED = (1 << 10),
CEPH_OSD_RMW_FLAG_RETURNVEC = (1 << 11),
+ CEPH_OSD_RMW_FLAG_READ_DATA = (1 << 12),
};
// pg stuff
#define OSD_SUPERBLOCK_GOBJECT ghobject_t(hobject_t(sobject_t(object_t("osd_superblock"), 0)))
+#define OSD_SUPERBLOCK_OMAP_KEY "osd_superblock"
// placement seed (a hash value)
typedef uint32_t ps_t;
@@ -6636,4 +6638,18 @@ using missing_map_t = std::map<hobject_t,
std::pair<std::optional<uint32_t>,
std::optional<uint32_t>>>;
+/**
+ * op_queue_type_t
+ *
+ * Supported op queue types
+ */
+enum class op_queue_type_t : uint8_t {
+ WeightedPriorityQueue = 0,
+ mClockScheduler,
+ PrioritizedQueue
+};
+std::string_view get_op_queue_type_name(const op_queue_type_t &q);
+std::optional<op_queue_type_t> get_op_queue_type_by_name(
+ const std::string_view &s);
+
#endif
diff --git a/src/osd/scheduler/OpScheduler.cc b/src/osd/scheduler/OpScheduler.cc
index cb5ef13b6..12e5bdb6c 100644
--- a/src/osd/scheduler/OpScheduler.cc
+++ b/src/osd/scheduler/OpScheduler.cc
@@ -23,30 +23,25 @@ namespace ceph::osd::scheduler {
OpSchedulerRef make_scheduler(
CephContext *cct, int whoami, uint32_t num_shards, int shard_id,
- bool is_rotational, std::string_view osd_objectstore, MonClient *monc)
+ bool is_rotational, std::string_view osd_objectstore,
+ op_queue_type_t osd_scheduler, unsigned op_queue_cut_off, MonClient *monc)
{
- const std::string *type = &cct->_conf->osd_op_queue;
- if (*type == "debug_random") {
- static const std::string index_lookup[] = { "mclock_scheduler",
- "wpq" };
- srand(time(NULL));
- unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
- type = &index_lookup[which];
- }
-
// Force the use of 'wpq' scheduler for filestore OSDs.
// The 'mclock_scheduler' is not supported for filestore OSDs.
- if (*type == "wpq" || osd_objectstore == "filestore") {
+ if (op_queue_type_t::WeightedPriorityQueue == osd_scheduler ||
+ osd_objectstore == "filestore") {
return std::make_unique<
ClassedOpQueueScheduler<WeightedPriorityQueue<OpSchedulerItem, client>>>(
cct,
+ op_queue_cut_off,
cct->_conf->osd_op_pq_max_tokens_per_priority,
cct->_conf->osd_op_pq_min_cost
);
- } else if (*type == "mclock_scheduler") {
+ } else if (op_queue_type_t::mClockScheduler == osd_scheduler) {
// default is 'mclock_scheduler'
return std::make_unique<
- mClockScheduler>(cct, whoami, num_shards, shard_id, is_rotational, monc);
+ mClockScheduler>(cct, whoami, num_shards, shard_id, is_rotational,
+ op_queue_cut_off, monc);
} else {
ceph_assert("Invalid choice of wq" == 0);
}
diff --git a/src/osd/scheduler/OpScheduler.h b/src/osd/scheduler/OpScheduler.h
index 1575bcae4..570a2a162 100644
--- a/src/osd/scheduler/OpScheduler.h
+++ b/src/osd/scheduler/OpScheduler.h
@@ -18,6 +18,7 @@
#include <variant>
#include "common/ceph_context.h"
+#include "common/OpQueue.h"
#include "mon/MonClient.h"
#include "osd/scheduler/OpSchedulerItem.h"
@@ -54,6 +55,9 @@ public:
// Apply config changes to the scheduler (if any)
virtual void update_configuration() = 0;
+ // Get the scheduler type set for the queue
+ virtual op_queue_type_t get_type() const = 0;
+
// Destructor
virtual ~OpScheduler() {};
};
@@ -63,7 +67,8 @@ using OpSchedulerRef = std::unique_ptr<OpScheduler>;
OpSchedulerRef make_scheduler(
CephContext *cct, int whoami, uint32_t num_shards, int shard_id,
- bool is_rotational, std::string_view osd_objectstore, MonClient *monc);
+ bool is_rotational, std::string_view osd_objectstore,
+ op_queue_type_t osd_scheduler, unsigned op_queue_cut_off, MonClient *monc);
/**
* Implements OpScheduler in terms of OpQueue
@@ -78,21 +83,10 @@ class ClassedOpQueueScheduler final : public OpScheduler {
unsigned cutoff;
T queue;
- static unsigned int get_io_prio_cut(CephContext *cct) {
- if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
- srand(time(NULL));
- return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
- } else if (cct->_conf->osd_op_queue_cut_off == "high") {
- return CEPH_MSG_PRIO_HIGH;
- } else {
- // default / catch-all is 'low'
- return CEPH_MSG_PRIO_LOW;
- }
- }
public:
template <typename... Args>
- ClassedOpQueueScheduler(CephContext *cct, Args&&... args) :
- cutoff(get_io_prio_cut(cct)),
+ ClassedOpQueueScheduler(CephContext *cct, unsigned prio_cut, Args&&... args) :
+ cutoff(prio_cut),
queue(std::forward<Args>(args)...)
{}
@@ -143,6 +137,10 @@ public:
// no-op
}
+ op_queue_type_t get_type() const final {
+ return queue.get_type();
+ }
+
~ClassedOpQueueScheduler() final {};
};
diff --git a/src/osd/scheduler/mClockScheduler.cc b/src/osd/scheduler/mClockScheduler.cc
index 0ea519655..f72683d52 100644
--- a/src/osd/scheduler/mClockScheduler.cc
+++ b/src/osd/scheduler/mClockScheduler.cc
@@ -35,12 +35,14 @@ mClockScheduler::mClockScheduler(CephContext *cct,
uint32_t num_shards,
int shard_id,
bool is_rotational,
+ unsigned cutoff_priority,
MonClient *monc)
: cct(cct),
whoami(whoami),
num_shards(num_shards),
shard_id(shard_id),
is_rotational(is_rotational),
+ cutoff_priority(cutoff_priority),
monc(monc),
scheduler(
std::bind(&mClockScheduler::ClientRegistry::get_info,
diff --git a/src/osd/scheduler/mClockScheduler.h b/src/osd/scheduler/mClockScheduler.h
index f708b1d7a..16e7f911f 100644
--- a/src/osd/scheduler/mClockScheduler.h
+++ b/src/osd/scheduler/mClockScheduler.h
@@ -27,7 +27,6 @@
#include "osd/scheduler/OpScheduler.h"
#include "common/config.h"
#include "common/ceph_context.h"
-#include "common/mClockPriorityQueue.h"
#include "osd/scheduler/OpSchedulerItem.h"
@@ -97,6 +96,7 @@ class mClockScheduler : public OpScheduler, md_config_obs_t {
const uint32_t num_shards;
const int shard_id;
const bool is_rotational;
+ const unsigned cutoff_priority;
MonClient *monc;
/**
@@ -199,21 +199,6 @@ class mClockScheduler : public OpScheduler, md_config_obs_t {
};
}
- static unsigned int get_io_prio_cut(CephContext *cct) {
- if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
- std::random_device rd;
- std::mt19937 random_gen(rd());
- return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
- } else if (cct->_conf->osd_op_queue_cut_off == "high") {
- return CEPH_MSG_PRIO_HIGH;
- } else {
- // default / catch-all is 'low'
- return CEPH_MSG_PRIO_LOW;
- }
- }
-
- unsigned cutoff_priority = get_io_prio_cut(cct);
-
/**
* set_osd_capacity_params_from_config
*
@@ -233,7 +218,8 @@ class mClockScheduler : public OpScheduler, md_config_obs_t {
public:
mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards,
- int shard_id, bool is_rotational, MonClient *monc);
+ int shard_id, bool is_rotational, unsigned cutoff_priority,
+ MonClient *monc);
~mClockScheduler() override;
/// Calculate scaled cost per item
@@ -260,12 +246,18 @@ public:
void dump(ceph::Formatter &f) const final;
void print(std::ostream &ostream) const final {
- ostream << "mClockScheduler";
+ ostream << get_op_queue_type_name(get_type());
+ ostream << ", cutoff=" << cutoff_priority;
}
// Update data associated with the modified mclock config key(s)
void update_configuration() final;
+ // Return the scheduler type
+ op_queue_type_t get_type() const final {
+ return op_queue_type_t::mClockScheduler;
+ }
+
const char** get_tracked_conf_keys() const final;
void handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed) final;