diff options
Diffstat (limited to '')
-rw-r--r-- | src/osd/OSD.cc | 259 |
1 files changed, 210 insertions, 49 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c61e7d332..515eb6042 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1703,7 +1703,7 @@ void OSDService::queue_recovery_context( epoch_t e = get_osdmap_epoch(); uint64_t cost_for_queue = [this, cost] { - if (cct->_conf->osd_op_queue == "mclock_scheduler") { + if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) { return cost; } else { /* We retain this legacy behavior for WeightedPriorityQueue. It seems to @@ -1726,14 +1726,32 @@ void OSDService::queue_recovery_context( e)); } -void OSDService::queue_for_snap_trim(PG *pg) +void OSDService::queue_for_snap_trim(PG *pg, uint64_t cost_per_object) { dout(10) << "queueing " << *pg << " for snaptrim" << dendl; + uint64_t cost_for_queue = [this, cost_per_object] { + if (cct->_conf->osd_op_queue == "mclock_scheduler") { + /* The cost calculation is valid for most snap trim iterations except + * for the following cases: + * 1) The penultimate iteration which may return 1 object to trim, in + * which case the cost will be off by a factor equivalent to the + * average object size, and, + * 2) The final iteration which returns -ENOENT and performs clean-ups. + */ + return cost_per_object * cct->_conf->osd_pg_max_concurrent_snap_trims; + } else { + /* We retain this legacy behavior for WeightedPriorityQueue. + * This branch should be removed after Squid. + */ + return cct->_conf->osd_snap_trim_cost; + } + }(); + enqueue_back( OpSchedulerItem( unique_ptr<OpSchedulerItem::OpQueueable>( new PGSnapTrim(pg->get_pgid(), pg->get_osdmap_epoch())), - cct->_conf->osd_snap_trim_cost, + cost_for_queue, cct->_conf->osd_snap_trim_priority, ceph_clock_now(), 0, @@ -1771,7 +1789,7 @@ int64_t OSDService::get_scrub_cost() { int64_t cost_for_queue = cct->_conf->osd_scrub_cost; - if (cct->_conf->osd_op_queue == "mclock_scheduler") { + if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) { cost_for_queue = cct->_conf->osd_scrub_event_cost * cct->_conf->osd_shallow_scrub_chunk_max; } @@ -2049,7 +2067,7 @@ void OSDService::_queue_for_recovery( ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock)); uint64_t cost_for_queue = [this, &reserved_pushes, &p] { - if (cct->_conf->osd_op_queue == "mclock_scheduler") { + if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) { return p.cost_per_object * reserved_pushes; } else { /* We retain this legacy behavior for WeightedPriorityQueue. It seems to @@ -2092,6 +2110,22 @@ int heap(CephContext& cct, } // namespace ceph::osd_cmds +void OSD::write_superblock(CephContext* cct, OSDSuperblock& sb, ObjectStore::Transaction& t) +{ + dout(10) << "write_superblock " << sb << dendl; + + //hack: at minimum it's using the baseline feature set + if (!sb.compat_features.incompat.contains(CEPH_OSD_FEATURE_INCOMPAT_BASE)) + sb.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); + + bufferlist bl; + encode(sb, bl); + t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); + std::map<std::string, ceph::buffer::list> attrs; + attrs.emplace(OSD_SUPERBLOCK_OMAP_KEY, bl); + t.omap_setkeys(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, attrs); +} + int OSD::mkfs(CephContext *cct, std::unique_ptr<ObjectStore> store, uuid_d fsid, @@ -2153,15 +2187,11 @@ int OSD::mkfs(CephContext *cct, sb.osd_fsid = store->get_fsid(); sb.whoami = whoami; sb.compat_features = get_osd_initial_compat_set(); - - bufferlist bl; - encode(sb, bl); - ObjectStore::CollectionHandle ch = store->create_new_collection( coll_t::meta()); ObjectStore::Transaction t; t.create_collection(coll_t::meta(), 0); - t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); + write_superblock(cct, sb, t); ret = store->queue_transaction(ch, std::move(t)); if (ret) { derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: " @@ -2385,13 +2415,55 @@ OSD::OSD(CephContext *cct_, trace_endpoint.copy_name(ss.str()); #endif + // Determine scheduler type for this OSD + auto get_op_queue_type = [this, &conf = cct->_conf]() { + op_queue_type_t queue_type; + if (auto type = conf.get_val<std::string>("osd_op_queue"); + type != "debug_random") { + if (auto qt = get_op_queue_type_by_name(type); qt.has_value()) { + queue_type = *qt; + } else { + // This should never happen + dout(0) << "Invalid value passed for 'osd_op_queue': " << type << dendl; + ceph_assert(0 == "Unsupported op queue type"); + } + } else { + static const std::vector<op_queue_type_t> index_lookup = { + op_queue_type_t::mClockScheduler, + op_queue_type_t::WeightedPriorityQueue + }; + std::mt19937 random_gen(std::random_device{}()); + auto which = random_gen() % index_lookup.size(); + queue_type = index_lookup[which]; + } + return queue_type; + }; + op_queue_type_t op_queue = get_op_queue_type(); + + // Determine op queue cutoff + auto get_op_queue_cut_off = [&conf = cct->_conf]() { + if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") { + std::random_device rd; + std::mt19937 random_gen(rd()); + return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + }; + unsigned op_queue_cut_off = get_op_queue_cut_off(); + // initialize shards num_shards = get_num_op_shards(); for (uint32_t i = 0; i < num_shards; i++) { OSDShard *one_shard = new OSDShard( i, cct, - this); + this, + op_queue, + op_queue_cut_off); shards.push_back(one_shard); } } @@ -3121,6 +3193,19 @@ will start to track new ops received afterwards."; scrub_purged_snaps(); } + else if (prefix == "reset_purged_snaps_last") { + lock_guard l(osd_lock); + superblock.purged_snaps_last = 0; + ObjectStore::Transaction t; + dout(10) << __func__ << " updating superblock" << dendl; + write_superblock(cct, superblock, t); + ret = store->queue_transaction(service.meta_ch, std::move(t), nullptr); + if (ret < 0) { + ss << "Error writing superblock: " << cpp_strerror(ret); + goto out; + } + } + else if (prefix == "dump_osd_network") { lock_guard l(osd_lock); int64_t value = 0; @@ -3762,7 +3847,7 @@ int OSD::init() } ObjectStore::Transaction t; - write_superblock(t); + write_superblock(cct, superblock, t); r = store->queue_transaction(service.meta_ch, std::move(t)); if (r < 0) goto out; @@ -4300,6 +4385,11 @@ void OSD::final_init() "Scrub purged_snaps vs snapmapper index"); ceph_assert(r == 0); r = admin_socket->register_command( + "reset_purged_snaps_last", + asok_hook, + "Reset the superblock's purged_snaps_last"); + ceph_assert(r == 0); + r = admin_socket->register_command( "scrubdebug " \ "name=pgid,type=CephPgid " \ "name=cmd,type=CephChoices,strings=block|unblock|set|unset " \ @@ -4572,7 +4662,7 @@ int OSD::shutdown() superblock.mounted = service.get_boot_epoch(); superblock.clean_thru = get_osdmap_epoch(); ObjectStore::Transaction t; - write_superblock(t); + write_superblock(cct, superblock, t); int r = store->queue_transaction(service.meta_ch, std::move(t)); if (r) { derr << "OSD::shutdown: error writing superblock: " @@ -4769,31 +4859,81 @@ int OSD::update_crush_device_class() } } -void OSD::write_superblock(ObjectStore::Transaction& t) -{ - dout(10) << "write_superblock " << superblock << dendl; - - //hack: at minimum it's using the baseline feature set - if (!superblock.compat_features.incompat.contains(CEPH_OSD_FEATURE_INCOMPAT_BASE)) - superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); - - bufferlist bl; - encode(superblock, bl); - t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); -} int OSD::read_superblock() { + // Read superblock from both object data and omap metadata + // for better robustness. + // Use the most recent superblock replica if obtained versions + // mismatch. bufferlist bl; - int r = store->read(service.meta_ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, bl); - if (r < 0) - return r; - auto p = bl.cbegin(); - decode(superblock, p); + set<string> keys; + keys.insert(OSD_SUPERBLOCK_OMAP_KEY); + map<string, bufferlist> vals; + OSDSuperblock super_omap; + OSDSuperblock super_disk; + int r_omap = store->omap_get_values( + service.meta_ch, OSD_SUPERBLOCK_GOBJECT, keys, &vals); + if (r_omap >= 0 && vals.size() > 0) { + try { + auto p = vals.begin()->second.cbegin(); + decode(super_omap, p); + } catch(...) { + derr << __func__ << " omap replica is corrupted." + << dendl; + r_omap = -EFAULT; + } + } else { + derr << __func__ << " omap replica is missing." + << dendl; + r_omap = -ENOENT; + } + int r_disk = store->read(service.meta_ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, bl); + if (r_disk >= 0) { + try { + auto p = bl.cbegin(); + decode(super_disk, p); + } catch(...) { + derr << __func__ << " disk replica is corrupted." + << dendl; + r_disk = -EFAULT; + } + } else { + derr << __func__ << " disk replica is missing." + << dendl; + r_disk = -ENOENT; + } - dout(10) << "read_superblock " << superblock << dendl; + if (r_omap >= 0 && r_disk < 0) { + std::swap(superblock, super_omap); + dout(1) << __func__ << " got omap replica but failed to get disk one." + << dendl; + } else if (r_omap < 0 && r_disk >= 0) { + std::swap(superblock, super_disk); + dout(1) << __func__ << " got disk replica but failed to get omap one." + << dendl; + } else if (r_omap < 0 && r_disk < 0) { + // error to be logged by the caller + return -ENOENT; + } else { + std::swap(superblock, super_omap); // let omap be the primary source + if (superblock.current_epoch != super_disk.current_epoch) { + derr << __func__ << " got mismatching superblocks, omap:" + << superblock << " vs. disk:" << super_disk + << dendl; + if (superblock.current_epoch < super_disk.current_epoch) { + std::swap(superblock, super_disk); + dout(0) << __func__ << " using disk superblock" + << dendl; + } else { + dout(0) << __func__ << " using omap superblock" + << dendl; + } + } + } + dout(10) << "read_superblock " << superblock << dendl; return 0; } @@ -6695,7 +6835,7 @@ void OSD::handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *m) m->purged_snaps); } superblock.purged_snaps_last = m->last; - write_superblock(t); + write_superblock(cct, superblock, t); store->queue_transaction( service.meta_ch, std::move(t)); @@ -7179,7 +7319,7 @@ void OSD::scrub_purged_snaps() dout(10) << __func__ << " done queueing pgs, updating superblock" << dendl; ObjectStore::Transaction t; superblock.last_purged_snaps_scrub = ceph_clock_now(); - write_superblock(t); + write_superblock(cct, superblock, t); int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr); ceph_assert(tr == 0); if (is_active()) { @@ -7892,7 +8032,7 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps) num++; if (num >= cct->_conf->osd_target_transaction_size && num >= nreceived) { service.publish_superblock(superblock); - write_superblock(t); + write_superblock(cct, superblock, t); int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr); ceph_assert(tr == 0); num = 0; @@ -7908,7 +8048,7 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps) } if (num > 0) { service.publish_superblock(superblock); - write_superblock(t); + write_superblock(cct, superblock, t); int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr); ceph_assert(tr == 0); } @@ -8220,7 +8360,19 @@ void OSD::handle_osd_map(MOSDMap *m) { bufferlist bl; ::encode(pg_num_history, bl); - t.write(coll_t::meta(), make_pg_num_history_oid(), 0, bl.length(), bl); + auto oid = make_pg_num_history_oid(); + t.truncate(coll_t::meta(), oid, 0); // we don't need bytes left if new data + // block is shorter than the previous + // one. And better to trim them, e.g. + // this allows to avoid csum eroors + // when issuing overwrite + // (which happens to be partial) + // and original data is corrupted. + // Another side effect is that the + // superblock is not permanently + // anchored to a fixed disk location + // any more. + t.write(coll_t::meta(), oid, 0, bl.length(), bl); dout(20) << __func__ << " pg_num_history " << pg_num_history << dendl; } @@ -8240,7 +8392,7 @@ void OSD::handle_osd_map(MOSDMap *m) } // superblock and commit - write_superblock(t); + write_superblock(cct, superblock, t); t.register_on_commit(new C_OnMapCommit(this, start, last, m)); store->queue_transaction( service.meta_ch, @@ -8558,7 +8710,7 @@ void OSD::check_osdmap_features() dout(0) << __func__ << " enabling on-disk ERASURE CODES compat feature" << dendl; superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SHARDS); ObjectStore::Transaction t; - write_superblock(t); + write_superblock(cct, superblock, t); int err = store->queue_transaction(service.meta_ch, std::move(t), NULL); ceph_assert(err == 0); } @@ -9888,7 +10040,7 @@ void OSD::maybe_override_max_osd_capacity_for_qos() // If the scheduler enabled is mclock, override the default // osd capacity with the value obtained from running the // osd bench test. This is later used to setup mclock. - if ((cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") && + if ((op_queue_type_t::mClockScheduler == osd_op_queue_type()) && (cct->_conf.get_val<bool>("osd_mclock_skip_benchmark") == false) && (!unsupported_objstore_for_qos())) { std::string max_capacity_iops_config; @@ -9988,7 +10140,7 @@ bool OSD::maybe_override_options_for_qos(const std::set<std::string> *changed) { // Override options only if the scheduler enabled is mclock and the // underlying objectstore is supported by mclock - if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" && + if (op_queue_type_t::mClockScheduler == osd_op_queue_type() && !unsupported_objstore_for_qos()) { static const std::map<std::string, uint64_t> recovery_qos_defaults { {"osd_recovery_max_active", 0}, @@ -10090,9 +10242,8 @@ void OSD::maybe_override_sleep_options_for_qos() { // Override options only if the scheduler enabled is mclock and the // underlying objectstore is supported by mclock - if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" && + if (op_queue_type_t::mClockScheduler == osd_op_queue_type() && !unsupported_objstore_for_qos()) { - // Override the various sleep settings // Disable recovery sleep cct->_conf.set_val("osd_recovery_sleep", std::to_string(0)); @@ -10121,7 +10272,7 @@ void OSD::maybe_override_cost_for_qos() { // If the scheduler enabled is mclock, override the default PG deletion cost // so that mclock can meet the QoS goals. - if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" && + if (op_queue_type_t::mClockScheduler == osd_op_queue_type() && !unsupported_objstore_for_qos()) { uint64_t pg_delete_cost = 15728640; cct->_conf.set_val("osd_pg_delete_cost", std::to_string(pg_delete_cost)); @@ -10195,6 +10346,16 @@ bool OSD::unsupported_objstore_for_qos() store->get_type()) != unsupported_objstores.end(); } +op_queue_type_t OSD::osd_op_queue_type() const +{ + /** + * All OSD shards employ the same scheduler type. Therefore, return + * the scheduler type set on the OSD shard with lowest id(0). + */ + ceph_assert(shards.size()); + return shards[0]->get_op_queue_type(); +} + void OSD::update_log_config() { auto parsed_options = clog->parse_client_options(cct); @@ -10695,17 +10856,17 @@ void OSDShard::update_scheduler_config() scheduler->update_configuration(); } -std::string OSDShard::get_scheduler_type() +op_queue_type_t OSDShard::get_op_queue_type() const { - std::ostringstream scheduler_type; - scheduler_type << *scheduler; - return scheduler_type.str(); + return scheduler->get_type(); } OSDShard::OSDShard( int id, CephContext *cct, - OSD *osd) + OSD *osd, + op_queue_type_t osd_op_queue, + unsigned osd_op_queue_cut_off) : shard_id(id), cct(cct), osd(osd), @@ -10717,7 +10878,7 @@ OSDShard::OSDShard( shard_lock{make_mutex(shard_lock_name)}, scheduler(ceph::osd::scheduler::make_scheduler( cct, osd->whoami, osd->num_shards, id, osd->store->is_rotational(), - osd->store->get_type(), osd->monc)), + osd->store->get_type(), osd_op_queue, osd_op_queue_cut_off, osd->monc)), context_queue(sdata_wait_lock, sdata_cond) { dout(0) << "using op scheduler " << *scheduler << dendl; |