summaryrefslogtreecommitdiffstats
path: root/src/osd/OSD.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/osd/OSD.cc')
-rw-r--r--src/osd/OSD.cc259
1 files changed, 210 insertions, 49 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index c61e7d332..515eb6042 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1703,7 +1703,7 @@ void OSDService::queue_recovery_context(
epoch_t e = get_osdmap_epoch();
uint64_t cost_for_queue = [this, cost] {
- if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) {
return cost;
} else {
/* We retain this legacy behavior for WeightedPriorityQueue. It seems to
@@ -1726,14 +1726,32 @@ void OSDService::queue_recovery_context(
e));
}
-void OSDService::queue_for_snap_trim(PG *pg)
+void OSDService::queue_for_snap_trim(PG *pg, uint64_t cost_per_object)
{
dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
+ uint64_t cost_for_queue = [this, cost_per_object] {
+ if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ /* The cost calculation is valid for most snap trim iterations except
+ * for the following cases:
+ * 1) The penultimate iteration which may return 1 object to trim, in
+ * which case the cost will be off by a factor equivalent to the
+ * average object size, and,
+ * 2) The final iteration which returns -ENOENT and performs clean-ups.
+ */
+ return cost_per_object * cct->_conf->osd_pg_max_concurrent_snap_trims;
+ } else {
+ /* We retain this legacy behavior for WeightedPriorityQueue.
+ * This branch should be removed after Squid.
+ */
+ return cct->_conf->osd_snap_trim_cost;
+ }
+ }();
+
enqueue_back(
OpSchedulerItem(
unique_ptr<OpSchedulerItem::OpQueueable>(
new PGSnapTrim(pg->get_pgid(), pg->get_osdmap_epoch())),
- cct->_conf->osd_snap_trim_cost,
+ cost_for_queue,
cct->_conf->osd_snap_trim_priority,
ceph_clock_now(),
0,
@@ -1771,7 +1789,7 @@ int64_t OSDService::get_scrub_cost()
{
int64_t cost_for_queue = cct->_conf->osd_scrub_cost;
- if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) {
cost_for_queue = cct->_conf->osd_scrub_event_cost *
cct->_conf->osd_shallow_scrub_chunk_max;
}
@@ -2049,7 +2067,7 @@ void OSDService::_queue_for_recovery(
ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
uint64_t cost_for_queue = [this, &reserved_pushes, &p] {
- if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) {
return p.cost_per_object * reserved_pushes;
} else {
/* We retain this legacy behavior for WeightedPriorityQueue. It seems to
@@ -2092,6 +2110,22 @@ int heap(CephContext& cct,
} // namespace ceph::osd_cmds
+void OSD::write_superblock(CephContext* cct, OSDSuperblock& sb, ObjectStore::Transaction& t)
+{
+ dout(10) << "write_superblock " << sb << dendl;
+
+ //hack: at minimum it's using the baseline feature set
+ if (!sb.compat_features.incompat.contains(CEPH_OSD_FEATURE_INCOMPAT_BASE))
+ sb.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
+
+ bufferlist bl;
+ encode(sb, bl);
+ t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
+ std::map<std::string, ceph::buffer::list> attrs;
+ attrs.emplace(OSD_SUPERBLOCK_OMAP_KEY, bl);
+ t.omap_setkeys(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, attrs);
+}
+
int OSD::mkfs(CephContext *cct,
std::unique_ptr<ObjectStore> store,
uuid_d fsid,
@@ -2153,15 +2187,11 @@ int OSD::mkfs(CephContext *cct,
sb.osd_fsid = store->get_fsid();
sb.whoami = whoami;
sb.compat_features = get_osd_initial_compat_set();
-
- bufferlist bl;
- encode(sb, bl);
-
ObjectStore::CollectionHandle ch = store->create_new_collection(
coll_t::meta());
ObjectStore::Transaction t;
t.create_collection(coll_t::meta(), 0);
- t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
+ write_superblock(cct, sb, t);
ret = store->queue_transaction(ch, std::move(t));
if (ret) {
derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: "
@@ -2385,13 +2415,55 @@ OSD::OSD(CephContext *cct_,
trace_endpoint.copy_name(ss.str());
#endif
+ // Determine scheduler type for this OSD
+ auto get_op_queue_type = [this, &conf = cct->_conf]() {
+ op_queue_type_t queue_type;
+ if (auto type = conf.get_val<std::string>("osd_op_queue");
+ type != "debug_random") {
+ if (auto qt = get_op_queue_type_by_name(type); qt.has_value()) {
+ queue_type = *qt;
+ } else {
+ // This should never happen
+ dout(0) << "Invalid value passed for 'osd_op_queue': " << type << dendl;
+ ceph_assert(0 == "Unsupported op queue type");
+ }
+ } else {
+ static const std::vector<op_queue_type_t> index_lookup = {
+ op_queue_type_t::mClockScheduler,
+ op_queue_type_t::WeightedPriorityQueue
+ };
+ std::mt19937 random_gen(std::random_device{}());
+ auto which = random_gen() % index_lookup.size();
+ queue_type = index_lookup[which];
+ }
+ return queue_type;
+ };
+ op_queue_type_t op_queue = get_op_queue_type();
+
+ // Determine op queue cutoff
+ auto get_op_queue_cut_off = [&conf = cct->_conf]() {
+ if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+ std::random_device rd;
+ std::mt19937 random_gen(rd());
+ return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ };
+ unsigned op_queue_cut_off = get_op_queue_cut_off();
+
// initialize shards
num_shards = get_num_op_shards();
for (uint32_t i = 0; i < num_shards; i++) {
OSDShard *one_shard = new OSDShard(
i,
cct,
- this);
+ this,
+ op_queue,
+ op_queue_cut_off);
shards.push_back(one_shard);
}
}
@@ -3121,6 +3193,19 @@ will start to track new ops received afterwards.";
scrub_purged_snaps();
}
+ else if (prefix == "reset_purged_snaps_last") {
+ lock_guard l(osd_lock);
+ superblock.purged_snaps_last = 0;
+ ObjectStore::Transaction t;
+ dout(10) << __func__ << " updating superblock" << dendl;
+ write_superblock(cct, superblock, t);
+ ret = store->queue_transaction(service.meta_ch, std::move(t), nullptr);
+ if (ret < 0) {
+ ss << "Error writing superblock: " << cpp_strerror(ret);
+ goto out;
+ }
+ }
+
else if (prefix == "dump_osd_network") {
lock_guard l(osd_lock);
int64_t value = 0;
@@ -3762,7 +3847,7 @@ int OSD::init()
}
ObjectStore::Transaction t;
- write_superblock(t);
+ write_superblock(cct, superblock, t);
r = store->queue_transaction(service.meta_ch, std::move(t));
if (r < 0)
goto out;
@@ -4300,6 +4385,11 @@ void OSD::final_init()
"Scrub purged_snaps vs snapmapper index");
ceph_assert(r == 0);
r = admin_socket->register_command(
+ "reset_purged_snaps_last",
+ asok_hook,
+ "Reset the superblock's purged_snaps_last");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
"scrubdebug " \
"name=pgid,type=CephPgid " \
"name=cmd,type=CephChoices,strings=block|unblock|set|unset " \
@@ -4572,7 +4662,7 @@ int OSD::shutdown()
superblock.mounted = service.get_boot_epoch();
superblock.clean_thru = get_osdmap_epoch();
ObjectStore::Transaction t;
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int r = store->queue_transaction(service.meta_ch, std::move(t));
if (r) {
derr << "OSD::shutdown: error writing superblock: "
@@ -4769,31 +4859,81 @@ int OSD::update_crush_device_class()
}
}
-void OSD::write_superblock(ObjectStore::Transaction& t)
-{
- dout(10) << "write_superblock " << superblock << dendl;
-
- //hack: at minimum it's using the baseline feature set
- if (!superblock.compat_features.incompat.contains(CEPH_OSD_FEATURE_INCOMPAT_BASE))
- superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
-
- bufferlist bl;
- encode(superblock, bl);
- t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
-}
int OSD::read_superblock()
{
+ // Read superblock from both object data and omap metadata
+ // for better robustness.
+ // Use the most recent superblock replica if obtained versions
+ // mismatch.
bufferlist bl;
- int r = store->read(service.meta_ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, bl);
- if (r < 0)
- return r;
- auto p = bl.cbegin();
- decode(superblock, p);
+ set<string> keys;
+ keys.insert(OSD_SUPERBLOCK_OMAP_KEY);
+ map<string, bufferlist> vals;
+ OSDSuperblock super_omap;
+ OSDSuperblock super_disk;
+ int r_omap = store->omap_get_values(
+ service.meta_ch, OSD_SUPERBLOCK_GOBJECT, keys, &vals);
+ if (r_omap >= 0 && vals.size() > 0) {
+ try {
+ auto p = vals.begin()->second.cbegin();
+ decode(super_omap, p);
+ } catch(...) {
+ derr << __func__ << " omap replica is corrupted."
+ << dendl;
+ r_omap = -EFAULT;
+ }
+ } else {
+ derr << __func__ << " omap replica is missing."
+ << dendl;
+ r_omap = -ENOENT;
+ }
+ int r_disk = store->read(service.meta_ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, bl);
+ if (r_disk >= 0) {
+ try {
+ auto p = bl.cbegin();
+ decode(super_disk, p);
+ } catch(...) {
+ derr << __func__ << " disk replica is corrupted."
+ << dendl;
+ r_disk = -EFAULT;
+ }
+ } else {
+ derr << __func__ << " disk replica is missing."
+ << dendl;
+ r_disk = -ENOENT;
+ }
- dout(10) << "read_superblock " << superblock << dendl;
+ if (r_omap >= 0 && r_disk < 0) {
+ std::swap(superblock, super_omap);
+ dout(1) << __func__ << " got omap replica but failed to get disk one."
+ << dendl;
+ } else if (r_omap < 0 && r_disk >= 0) {
+ std::swap(superblock, super_disk);
+ dout(1) << __func__ << " got disk replica but failed to get omap one."
+ << dendl;
+ } else if (r_omap < 0 && r_disk < 0) {
+ // error to be logged by the caller
+ return -ENOENT;
+ } else {
+ std::swap(superblock, super_omap); // let omap be the primary source
+ if (superblock.current_epoch != super_disk.current_epoch) {
+ derr << __func__ << " got mismatching superblocks, omap:"
+ << superblock << " vs. disk:" << super_disk
+ << dendl;
+ if (superblock.current_epoch < super_disk.current_epoch) {
+ std::swap(superblock, super_disk);
+ dout(0) << __func__ << " using disk superblock"
+ << dendl;
+ } else {
+ dout(0) << __func__ << " using omap superblock"
+ << dendl;
+ }
+ }
+ }
+ dout(10) << "read_superblock " << superblock << dendl;
return 0;
}
@@ -6695,7 +6835,7 @@ void OSD::handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *m)
m->purged_snaps);
}
superblock.purged_snaps_last = m->last;
- write_superblock(t);
+ write_superblock(cct, superblock, t);
store->queue_transaction(
service.meta_ch,
std::move(t));
@@ -7179,7 +7319,7 @@ void OSD::scrub_purged_snaps()
dout(10) << __func__ << " done queueing pgs, updating superblock" << dendl;
ObjectStore::Transaction t;
superblock.last_purged_snaps_scrub = ceph_clock_now();
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr);
ceph_assert(tr == 0);
if (is_active()) {
@@ -7892,7 +8032,7 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps)
num++;
if (num >= cct->_conf->osd_target_transaction_size && num >= nreceived) {
service.publish_superblock(superblock);
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr);
ceph_assert(tr == 0);
num = 0;
@@ -7908,7 +8048,7 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps)
}
if (num > 0) {
service.publish_superblock(superblock);
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr);
ceph_assert(tr == 0);
}
@@ -8220,7 +8360,19 @@ void OSD::handle_osd_map(MOSDMap *m)
{
bufferlist bl;
::encode(pg_num_history, bl);
- t.write(coll_t::meta(), make_pg_num_history_oid(), 0, bl.length(), bl);
+ auto oid = make_pg_num_history_oid();
+ t.truncate(coll_t::meta(), oid, 0); // we don't need bytes left if new data
+ // block is shorter than the previous
+ // one. And better to trim them, e.g.
+ // this allows to avoid csum eroors
+ // when issuing overwrite
+ // (which happens to be partial)
+ // and original data is corrupted.
+ // Another side effect is that the
+ // superblock is not permanently
+ // anchored to a fixed disk location
+ // any more.
+ t.write(coll_t::meta(), oid, 0, bl.length(), bl);
dout(20) << __func__ << " pg_num_history " << pg_num_history << dendl;
}
@@ -8240,7 +8392,7 @@ void OSD::handle_osd_map(MOSDMap *m)
}
// superblock and commit
- write_superblock(t);
+ write_superblock(cct, superblock, t);
t.register_on_commit(new C_OnMapCommit(this, start, last, m));
store->queue_transaction(
service.meta_ch,
@@ -8558,7 +8710,7 @@ void OSD::check_osdmap_features()
dout(0) << __func__ << " enabling on-disk ERASURE CODES compat feature" << dendl;
superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SHARDS);
ObjectStore::Transaction t;
- write_superblock(t);
+ write_superblock(cct, superblock, t);
int err = store->queue_transaction(service.meta_ch, std::move(t), NULL);
ceph_assert(err == 0);
}
@@ -9888,7 +10040,7 @@ void OSD::maybe_override_max_osd_capacity_for_qos()
// If the scheduler enabled is mclock, override the default
// osd capacity with the value obtained from running the
// osd bench test. This is later used to setup mclock.
- if ((cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") &&
+ if ((op_queue_type_t::mClockScheduler == osd_op_queue_type()) &&
(cct->_conf.get_val<bool>("osd_mclock_skip_benchmark") == false) &&
(!unsupported_objstore_for_qos())) {
std::string max_capacity_iops_config;
@@ -9988,7 +10140,7 @@ bool OSD::maybe_override_options_for_qos(const std::set<std::string> *changed)
{
// Override options only if the scheduler enabled is mclock and the
// underlying objectstore is supported by mclock
- if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+ if (op_queue_type_t::mClockScheduler == osd_op_queue_type() &&
!unsupported_objstore_for_qos()) {
static const std::map<std::string, uint64_t> recovery_qos_defaults {
{"osd_recovery_max_active", 0},
@@ -10090,9 +10242,8 @@ void OSD::maybe_override_sleep_options_for_qos()
{
// Override options only if the scheduler enabled is mclock and the
// underlying objectstore is supported by mclock
- if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+ if (op_queue_type_t::mClockScheduler == osd_op_queue_type() &&
!unsupported_objstore_for_qos()) {
-
// Override the various sleep settings
// Disable recovery sleep
cct->_conf.set_val("osd_recovery_sleep", std::to_string(0));
@@ -10121,7 +10272,7 @@ void OSD::maybe_override_cost_for_qos()
{
// If the scheduler enabled is mclock, override the default PG deletion cost
// so that mclock can meet the QoS goals.
- if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+ if (op_queue_type_t::mClockScheduler == osd_op_queue_type() &&
!unsupported_objstore_for_qos()) {
uint64_t pg_delete_cost = 15728640;
cct->_conf.set_val("osd_pg_delete_cost", std::to_string(pg_delete_cost));
@@ -10195,6 +10346,16 @@ bool OSD::unsupported_objstore_for_qos()
store->get_type()) != unsupported_objstores.end();
}
+op_queue_type_t OSD::osd_op_queue_type() const
+{
+ /**
+ * All OSD shards employ the same scheduler type. Therefore, return
+ * the scheduler type set on the OSD shard with lowest id(0).
+ */
+ ceph_assert(shards.size());
+ return shards[0]->get_op_queue_type();
+}
+
void OSD::update_log_config()
{
auto parsed_options = clog->parse_client_options(cct);
@@ -10695,17 +10856,17 @@ void OSDShard::update_scheduler_config()
scheduler->update_configuration();
}
-std::string OSDShard::get_scheduler_type()
+op_queue_type_t OSDShard::get_op_queue_type() const
{
- std::ostringstream scheduler_type;
- scheduler_type << *scheduler;
- return scheduler_type.str();
+ return scheduler->get_type();
}
OSDShard::OSDShard(
int id,
CephContext *cct,
- OSD *osd)
+ OSD *osd,
+ op_queue_type_t osd_op_queue,
+ unsigned osd_op_queue_cut_off)
: shard_id(id),
cct(cct),
osd(osd),
@@ -10717,7 +10878,7 @@ OSDShard::OSDShard(
shard_lock{make_mutex(shard_lock_name)},
scheduler(ceph::osd::scheduler::make_scheduler(
cct, osd->whoami, osd->num_shards, id, osd->store->is_rotational(),
- osd->store->get_type(), osd->monc)),
+ osd->store->get_type(), osd_op_queue, osd_op_queue_cut_off, osd->monc)),
context_queue(sdata_wait_lock, sdata_cond)
{
dout(0) << "using op scheduler " << *scheduler << dendl;