diff options
Diffstat (limited to 'src/osd')
-rw-r--r-- | src/osd/OSD.cc | 259 | ||||
-rw-r--r-- | src/osd/OSD.h | 16 | ||||
-rw-r--r-- | src/osd/OSDMap.cc | 16 | ||||
-rw-r--r-- | src/osd/OSDMap.h | 1 | ||||
-rw-r--r-- | src/osd/OpRequest.h | 1 | ||||
-rw-r--r-- | src/osd/PG.cc | 10 | ||||
-rw-r--r-- | src/osd/PG.h | 13 | ||||
-rw-r--r-- | src/osd/PrimaryLogPG.cc | 14 | ||||
-rw-r--r-- | src/osd/osd_op_util.cc | 19 | ||||
-rw-r--r-- | src/osd/osd_op_util.h | 2 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 28 | ||||
-rw-r--r-- | src/osd/osd_types.h | 16 | ||||
-rw-r--r-- | src/osd/scheduler/OpScheduler.cc | 21 | ||||
-rw-r--r-- | src/osd/scheduler/OpScheduler.h | 26 | ||||
-rw-r--r-- | src/osd/scheduler/mClockScheduler.cc | 2 | ||||
-rw-r--r-- | src/osd/scheduler/mClockScheduler.h | 28 |
16 files changed, 359 insertions, 113 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c61e7d332..515eb6042 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1703,7 +1703,7 @@ void OSDService::queue_recovery_context( epoch_t e = get_osdmap_epoch(); uint64_t cost_for_queue = [this, cost] { - if (cct->_conf->osd_op_queue == "mclock_scheduler") { + if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) { return cost; } else { /* We retain this legacy behavior for WeightedPriorityQueue. It seems to @@ -1726,14 +1726,32 @@ void OSDService::queue_recovery_context( e)); } -void OSDService::queue_for_snap_trim(PG *pg) +void OSDService::queue_for_snap_trim(PG *pg, uint64_t cost_per_object) { dout(10) << "queueing " << *pg << " for snaptrim" << dendl; + uint64_t cost_for_queue = [this, cost_per_object] { + if (cct->_conf->osd_op_queue == "mclock_scheduler") { + /* The cost calculation is valid for most snap trim iterations except + * for the following cases: + * 1) The penultimate iteration which may return 1 object to trim, in + * which case the cost will be off by a factor equivalent to the + * average object size, and, + * 2) The final iteration which returns -ENOENT and performs clean-ups. + */ + return cost_per_object * cct->_conf->osd_pg_max_concurrent_snap_trims; + } else { + /* We retain this legacy behavior for WeightedPriorityQueue. + * This branch should be removed after Squid. + */ + return cct->_conf->osd_snap_trim_cost; + } + }(); + enqueue_back( OpSchedulerItem( unique_ptr<OpSchedulerItem::OpQueueable>( new PGSnapTrim(pg->get_pgid(), pg->get_osdmap_epoch())), - cct->_conf->osd_snap_trim_cost, + cost_for_queue, cct->_conf->osd_snap_trim_priority, ceph_clock_now(), 0, @@ -1771,7 +1789,7 @@ int64_t OSDService::get_scrub_cost() { int64_t cost_for_queue = cct->_conf->osd_scrub_cost; - if (cct->_conf->osd_op_queue == "mclock_scheduler") { + if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) { cost_for_queue = cct->_conf->osd_scrub_event_cost * cct->_conf->osd_shallow_scrub_chunk_max; } @@ -2049,7 +2067,7 @@ void OSDService::_queue_for_recovery( ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock)); uint64_t cost_for_queue = [this, &reserved_pushes, &p] { - if (cct->_conf->osd_op_queue == "mclock_scheduler") { + if (op_queue_type_t::mClockScheduler == osd->osd_op_queue_type()) { return p.cost_per_object * reserved_pushes; } else { /* We retain this legacy behavior for WeightedPriorityQueue. It seems to @@ -2092,6 +2110,22 @@ int heap(CephContext& cct, } // namespace ceph::osd_cmds +void OSD::write_superblock(CephContext* cct, OSDSuperblock& sb, ObjectStore::Transaction& t) +{ + dout(10) << "write_superblock " << sb << dendl; + + //hack: at minimum it's using the baseline feature set + if (!sb.compat_features.incompat.contains(CEPH_OSD_FEATURE_INCOMPAT_BASE)) + sb.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); + + bufferlist bl; + encode(sb, bl); + t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); + std::map<std::string, ceph::buffer::list> attrs; + attrs.emplace(OSD_SUPERBLOCK_OMAP_KEY, bl); + t.omap_setkeys(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, attrs); +} + int OSD::mkfs(CephContext *cct, std::unique_ptr<ObjectStore> store, uuid_d fsid, @@ -2153,15 +2187,11 @@ int OSD::mkfs(CephContext *cct, sb.osd_fsid = store->get_fsid(); sb.whoami = whoami; sb.compat_features = get_osd_initial_compat_set(); - - bufferlist bl; - encode(sb, bl); - ObjectStore::CollectionHandle ch = store->create_new_collection( coll_t::meta()); ObjectStore::Transaction t; t.create_collection(coll_t::meta(), 0); - t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); + write_superblock(cct, sb, t); ret = store->queue_transaction(ch, std::move(t)); if (ret) { derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: " @@ -2385,13 +2415,55 @@ OSD::OSD(CephContext *cct_, trace_endpoint.copy_name(ss.str()); #endif + // Determine scheduler type for this OSD + auto get_op_queue_type = [this, &conf = cct->_conf]() { + op_queue_type_t queue_type; + if (auto type = conf.get_val<std::string>("osd_op_queue"); + type != "debug_random") { + if (auto qt = get_op_queue_type_by_name(type); qt.has_value()) { + queue_type = *qt; + } else { + // This should never happen + dout(0) << "Invalid value passed for 'osd_op_queue': " << type << dendl; + ceph_assert(0 == "Unsupported op queue type"); + } + } else { + static const std::vector<op_queue_type_t> index_lookup = { + op_queue_type_t::mClockScheduler, + op_queue_type_t::WeightedPriorityQueue + }; + std::mt19937 random_gen(std::random_device{}()); + auto which = random_gen() % index_lookup.size(); + queue_type = index_lookup[which]; + } + return queue_type; + }; + op_queue_type_t op_queue = get_op_queue_type(); + + // Determine op queue cutoff + auto get_op_queue_cut_off = [&conf = cct->_conf]() { + if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") { + std::random_device rd; + std::mt19937 random_gen(rd()); + return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + }; + unsigned op_queue_cut_off = get_op_queue_cut_off(); + // initialize shards num_shards = get_num_op_shards(); for (uint32_t i = 0; i < num_shards; i++) { OSDShard *one_shard = new OSDShard( i, cct, - this); + this, + op_queue, + op_queue_cut_off); shards.push_back(one_shard); } } @@ -3121,6 +3193,19 @@ will start to track new ops received afterwards."; scrub_purged_snaps(); } + else if (prefix == "reset_purged_snaps_last") { + lock_guard l(osd_lock); + superblock.purged_snaps_last = 0; + ObjectStore::Transaction t; + dout(10) << __func__ << " updating superblock" << dendl; + write_superblock(cct, superblock, t); + ret = store->queue_transaction(service.meta_ch, std::move(t), nullptr); + if (ret < 0) { + ss << "Error writing superblock: " << cpp_strerror(ret); + goto out; + } + } + else if (prefix == "dump_osd_network") { lock_guard l(osd_lock); int64_t value = 0; @@ -3762,7 +3847,7 @@ int OSD::init() } ObjectStore::Transaction t; - write_superblock(t); + write_superblock(cct, superblock, t); r = store->queue_transaction(service.meta_ch, std::move(t)); if (r < 0) goto out; @@ -4300,6 +4385,11 @@ void OSD::final_init() "Scrub purged_snaps vs snapmapper index"); ceph_assert(r == 0); r = admin_socket->register_command( + "reset_purged_snaps_last", + asok_hook, + "Reset the superblock's purged_snaps_last"); + ceph_assert(r == 0); + r = admin_socket->register_command( "scrubdebug " \ "name=pgid,type=CephPgid " \ "name=cmd,type=CephChoices,strings=block|unblock|set|unset " \ @@ -4572,7 +4662,7 @@ int OSD::shutdown() superblock.mounted = service.get_boot_epoch(); superblock.clean_thru = get_osdmap_epoch(); ObjectStore::Transaction t; - write_superblock(t); + write_superblock(cct, superblock, t); int r = store->queue_transaction(service.meta_ch, std::move(t)); if (r) { derr << "OSD::shutdown: error writing superblock: " @@ -4769,31 +4859,81 @@ int OSD::update_crush_device_class() } } -void OSD::write_superblock(ObjectStore::Transaction& t) -{ - dout(10) << "write_superblock " << superblock << dendl; - - //hack: at minimum it's using the baseline feature set - if (!superblock.compat_features.incompat.contains(CEPH_OSD_FEATURE_INCOMPAT_BASE)) - superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); - - bufferlist bl; - encode(superblock, bl); - t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); -} int OSD::read_superblock() { + // Read superblock from both object data and omap metadata + // for better robustness. + // Use the most recent superblock replica if obtained versions + // mismatch. bufferlist bl; - int r = store->read(service.meta_ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, bl); - if (r < 0) - return r; - auto p = bl.cbegin(); - decode(superblock, p); + set<string> keys; + keys.insert(OSD_SUPERBLOCK_OMAP_KEY); + map<string, bufferlist> vals; + OSDSuperblock super_omap; + OSDSuperblock super_disk; + int r_omap = store->omap_get_values( + service.meta_ch, OSD_SUPERBLOCK_GOBJECT, keys, &vals); + if (r_omap >= 0 && vals.size() > 0) { + try { + auto p = vals.begin()->second.cbegin(); + decode(super_omap, p); + } catch(...) { + derr << __func__ << " omap replica is corrupted." + << dendl; + r_omap = -EFAULT; + } + } else { + derr << __func__ << " omap replica is missing." + << dendl; + r_omap = -ENOENT; + } + int r_disk = store->read(service.meta_ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, bl); + if (r_disk >= 0) { + try { + auto p = bl.cbegin(); + decode(super_disk, p); + } catch(...) { + derr << __func__ << " disk replica is corrupted." + << dendl; + r_disk = -EFAULT; + } + } else { + derr << __func__ << " disk replica is missing." + << dendl; + r_disk = -ENOENT; + } - dout(10) << "read_superblock " << superblock << dendl; + if (r_omap >= 0 && r_disk < 0) { + std::swap(superblock, super_omap); + dout(1) << __func__ << " got omap replica but failed to get disk one." + << dendl; + } else if (r_omap < 0 && r_disk >= 0) { + std::swap(superblock, super_disk); + dout(1) << __func__ << " got disk replica but failed to get omap one." + << dendl; + } else if (r_omap < 0 && r_disk < 0) { + // error to be logged by the caller + return -ENOENT; + } else { + std::swap(superblock, super_omap); // let omap be the primary source + if (superblock.current_epoch != super_disk.current_epoch) { + derr << __func__ << " got mismatching superblocks, omap:" + << superblock << " vs. disk:" << super_disk + << dendl; + if (superblock.current_epoch < super_disk.current_epoch) { + std::swap(superblock, super_disk); + dout(0) << __func__ << " using disk superblock" + << dendl; + } else { + dout(0) << __func__ << " using omap superblock" + << dendl; + } + } + } + dout(10) << "read_superblock " << superblock << dendl; return 0; } @@ -6695,7 +6835,7 @@ void OSD::handle_get_purged_snaps_reply(MMonGetPurgedSnapsReply *m) m->purged_snaps); } superblock.purged_snaps_last = m->last; - write_superblock(t); + write_superblock(cct, superblock, t); store->queue_transaction( service.meta_ch, std::move(t)); @@ -7179,7 +7319,7 @@ void OSD::scrub_purged_snaps() dout(10) << __func__ << " done queueing pgs, updating superblock" << dendl; ObjectStore::Transaction t; superblock.last_purged_snaps_scrub = ceph_clock_now(); - write_superblock(t); + write_superblock(cct, superblock, t); int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr); ceph_assert(tr == 0); if (is_active()) { @@ -7892,7 +8032,7 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps) num++; if (num >= cct->_conf->osd_target_transaction_size && num >= nreceived) { service.publish_superblock(superblock); - write_superblock(t); + write_superblock(cct, superblock, t); int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr); ceph_assert(tr == 0); num = 0; @@ -7908,7 +8048,7 @@ void OSD::trim_maps(epoch_t oldest, int nreceived, bool skip_maps) } if (num > 0) { service.publish_superblock(superblock); - write_superblock(t); + write_superblock(cct, superblock, t); int tr = store->queue_transaction(service.meta_ch, std::move(t), nullptr); ceph_assert(tr == 0); } @@ -8220,7 +8360,19 @@ void OSD::handle_osd_map(MOSDMap *m) { bufferlist bl; ::encode(pg_num_history, bl); - t.write(coll_t::meta(), make_pg_num_history_oid(), 0, bl.length(), bl); + auto oid = make_pg_num_history_oid(); + t.truncate(coll_t::meta(), oid, 0); // we don't need bytes left if new data + // block is shorter than the previous + // one. And better to trim them, e.g. + // this allows to avoid csum eroors + // when issuing overwrite + // (which happens to be partial) + // and original data is corrupted. + // Another side effect is that the + // superblock is not permanently + // anchored to a fixed disk location + // any more. + t.write(coll_t::meta(), oid, 0, bl.length(), bl); dout(20) << __func__ << " pg_num_history " << pg_num_history << dendl; } @@ -8240,7 +8392,7 @@ void OSD::handle_osd_map(MOSDMap *m) } // superblock and commit - write_superblock(t); + write_superblock(cct, superblock, t); t.register_on_commit(new C_OnMapCommit(this, start, last, m)); store->queue_transaction( service.meta_ch, @@ -8558,7 +8710,7 @@ void OSD::check_osdmap_features() dout(0) << __func__ << " enabling on-disk ERASURE CODES compat feature" << dendl; superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SHARDS); ObjectStore::Transaction t; - write_superblock(t); + write_superblock(cct, superblock, t); int err = store->queue_transaction(service.meta_ch, std::move(t), NULL); ceph_assert(err == 0); } @@ -9888,7 +10040,7 @@ void OSD::maybe_override_max_osd_capacity_for_qos() // If the scheduler enabled is mclock, override the default // osd capacity with the value obtained from running the // osd bench test. This is later used to setup mclock. - if ((cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") && + if ((op_queue_type_t::mClockScheduler == osd_op_queue_type()) && (cct->_conf.get_val<bool>("osd_mclock_skip_benchmark") == false) && (!unsupported_objstore_for_qos())) { std::string max_capacity_iops_config; @@ -9988,7 +10140,7 @@ bool OSD::maybe_override_options_for_qos(const std::set<std::string> *changed) { // Override options only if the scheduler enabled is mclock and the // underlying objectstore is supported by mclock - if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" && + if (op_queue_type_t::mClockScheduler == osd_op_queue_type() && !unsupported_objstore_for_qos()) { static const std::map<std::string, uint64_t> recovery_qos_defaults { {"osd_recovery_max_active", 0}, @@ -10090,9 +10242,8 @@ void OSD::maybe_override_sleep_options_for_qos() { // Override options only if the scheduler enabled is mclock and the // underlying objectstore is supported by mclock - if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" && + if (op_queue_type_t::mClockScheduler == osd_op_queue_type() && !unsupported_objstore_for_qos()) { - // Override the various sleep settings // Disable recovery sleep cct->_conf.set_val("osd_recovery_sleep", std::to_string(0)); @@ -10121,7 +10272,7 @@ void OSD::maybe_override_cost_for_qos() { // If the scheduler enabled is mclock, override the default PG deletion cost // so that mclock can meet the QoS goals. - if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" && + if (op_queue_type_t::mClockScheduler == osd_op_queue_type() && !unsupported_objstore_for_qos()) { uint64_t pg_delete_cost = 15728640; cct->_conf.set_val("osd_pg_delete_cost", std::to_string(pg_delete_cost)); @@ -10195,6 +10346,16 @@ bool OSD::unsupported_objstore_for_qos() store->get_type()) != unsupported_objstores.end(); } +op_queue_type_t OSD::osd_op_queue_type() const +{ + /** + * All OSD shards employ the same scheduler type. Therefore, return + * the scheduler type set on the OSD shard with lowest id(0). + */ + ceph_assert(shards.size()); + return shards[0]->get_op_queue_type(); +} + void OSD::update_log_config() { auto parsed_options = clog->parse_client_options(cct); @@ -10695,17 +10856,17 @@ void OSDShard::update_scheduler_config() scheduler->update_configuration(); } -std::string OSDShard::get_scheduler_type() +op_queue_type_t OSDShard::get_op_queue_type() const { - std::ostringstream scheduler_type; - scheduler_type << *scheduler; - return scheduler_type.str(); + return scheduler->get_type(); } OSDShard::OSDShard( int id, CephContext *cct, - OSD *osd) + OSD *osd, + op_queue_type_t osd_op_queue, + unsigned osd_op_queue_cut_off) : shard_id(id), cct(cct), osd(osd), @@ -10717,7 +10878,7 @@ OSDShard::OSDShard( shard_lock{make_mutex(shard_lock_name)}, scheduler(ceph::osd::scheduler::make_scheduler( cct, osd->whoami, osd->num_shards, id, osd->store->is_rotational(), - osd->store->get_type(), osd->monc)), + osd->store->get_type(), osd_op_queue, osd_op_queue_cut_off, osd->monc)), context_queue(sdata_wait_lock, sdata_cond) { dout(0) << "using op scheduler " << *scheduler << dendl; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 00fab7ec8..d25879001 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -511,7 +511,7 @@ public: GenContext<ThreadPool::TPHandle&> *c, uint64_t cost, int priority); - void queue_for_snap_trim(PG *pg); + void queue_for_snap_trim(PG *pg, uint64_t cost); void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority); void queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_priority); @@ -1050,12 +1050,14 @@ struct OSDShard { void register_and_wake_split_child(PG *pg); void unprime_split_children(spg_t parent, unsigned old_pg_num); void update_scheduler_config(); - std::string get_scheduler_type(); + op_queue_type_t get_op_queue_type() const; OSDShard( int id, CephContext *cct, - OSD *osd); + OSD *osd, + op_queue_type_t osd_op_queue, + unsigned osd_op_queue_cut_off); }; class OSD : public Dispatcher, @@ -1235,8 +1237,9 @@ private: // -- superblock -- OSDSuperblock superblock; - void write_superblock(); - void write_superblock(ObjectStore::Transaction& t); + static void write_superblock(CephContext* cct, + OSDSuperblock& sb, + ObjectStore::Transaction& t); int read_superblock(); void clear_temp_objects(); @@ -2055,6 +2058,9 @@ public: OSDService service; friend class OSDService; + /// op queue type set for the OSD + op_queue_type_t osd_op_queue_type() const; + private: void set_perf_queries(const ConfigPayload &config_payload); MetricPayload get_perf_reports(); diff --git a/src/osd/OSDMap.cc b/src/osd/OSDMap.cc index 11f9a87d7..5fb73084d 100644 --- a/src/osd/OSDMap.cc +++ b/src/osd/OSDMap.cc @@ -933,6 +933,10 @@ void OSDMap::Incremental::decode(ceph::buffer::list::const_iterator& bl) decode(new_last_up_change, bl); decode(new_last_in_change, bl); } + if (struct_v >= 9) { + decode(new_pg_upmap_primary, bl); + decode(old_pg_upmap_primary, bl); + } DECODE_FINISH(bl); // client-usable data } @@ -1871,6 +1875,18 @@ uint64_t OSDMap::get_up_osd_features() const return cached_up_osd_features; } +bool OSDMap::any_osd_laggy() const +{ + for (int osd = 0; osd < max_osd; ++osd) { + if (!is_up(osd)) { continue; } + const auto &xi = get_xinfo(osd); + if (xi.laggy_probability || xi.laggy_interval) { + return true; + } + } + return false; +} + void OSDMap::dedup(const OSDMap *o, OSDMap *n) { using ceph::encode; diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index 3a3e6155e..510da1116 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -655,6 +655,7 @@ private: public: bool have_crc() const { return crc_defined; } uint32_t get_crc() const { return crc; } + bool any_osd_laggy() const; std::shared_ptr<CrushWrapper> crush; // hierarchical map bool stretch_mode_enabled; // we are in stretch mode, requiring multiple sites diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index 1a608b583..832c79353 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -35,6 +35,7 @@ public: bool op_info_needs_init() const { return op_info.get_flags() == 0; } bool check_rmw(int flag) const { return op_info.check_rmw(flag); } bool may_read() const { return op_info.may_read(); } + bool may_read_data() const { return op_info.may_read_data(); } bool may_write() const { return op_info.may_write(); } bool may_cache() const { return op_info.may_cache(); } bool rwordered_forced() const { return op_info.rwordered_forced(); } diff --git a/src/osd/PG.cc b/src/osd/PG.cc index fa49038ed..245aa8d00 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -420,15 +420,7 @@ void PG::queue_recovery() dout(10) << "queue_recovery -- queuing" << dendl; recovery_queued = true; // Let cost per object be the average object size - auto num_bytes = static_cast<uint64_t>( - std::max<int64_t>( - 0, // ensure bytes is non-negative - info.stats.stats.sum.num_bytes)); - auto num_objects = static_cast<uint64_t>( - std::max<int64_t>( - 1, // ensure objects is non-negative and non-zero - info.stats.stats.sum.num_objects)); - uint64_t cost_per_object = std::max<uint64_t>(num_bytes / num_objects, 1); + uint64_t cost_per_object = get_average_object_size(); osd->queue_for_recovery( this, cost_per_object, recovery_state.get_recovery_op_priority() ); diff --git a/src/osd/PG.h b/src/osd/PG.h index 88c893b35..2f8b59061 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1019,6 +1019,19 @@ public: return num_bytes; } + uint64_t get_average_object_size() { + ceph_assert(ceph_mutex_is_locked_by_me(_lock)); + auto num_bytes = static_cast<uint64_t>( + std::max<int64_t>( + 0, // ensure bytes is non-negative + info.stats.stats.sum.num_bytes)); + auto num_objects = static_cast<uint64_t>( + std::max<int64_t>( + 1, // ensure objects is non-negative and non-zero + info.stats.stats.sum.num_objects)); + return std::max<uint64_t>(num_bytes / num_objects, 1); + } + protected: /* diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 243e127eb..49178dc40 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1667,7 +1667,11 @@ bool PrimaryLogPG::get_rw_locks(bool write_ordered, OpContext *ctx) * to get the second. */ if (write_ordered && ctx->op->may_read()) { - ctx->lock_type = RWState::RWEXCL; + if (ctx->op->may_read_data()) { + ctx->lock_type = RWState::RWEXCL; + } else { + ctx->lock_type = RWState::RWWRITE; + } } else if (write_ordered) { ctx->lock_type = RWState::RWWRITE; } else { @@ -5953,8 +5957,8 @@ int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) { encode(m, osd_op.outdata); // re-encode since it might be modified ::encode_destructively(data_bl, osd_op.outdata); - dout(10) << " sparse_read got " << r << " bytes from object " - << soid << dendl; + dout(10) << " sparse_read got " << m.size() << " extents and " << r + << " bytes from object " << soid << dendl; } ctx->delta_stats.num_rd_kb += shift_round_up(op.extent.length, 10); @@ -15627,8 +15631,10 @@ PrimaryLogPG::AwaitAsyncWork::AwaitAsyncWork(my_context ctx) NamedState(nullptr, "Trimming/AwaitAsyncWork") { auto *pg = context< SnapTrimmer >().pg; + // Determine cost in terms of the average object size + uint64_t cost_per_object = pg->get_average_object_size(); context< SnapTrimmer >().log_enter(state_name); - context< SnapTrimmer >().pg->osd->queue_for_snap_trim(pg); + context< SnapTrimmer >().pg->osd->queue_for_snap_trim(pg, cost_per_object); pg->state_set(PG_STATE_SNAPTRIM); pg->state_clear(PG_STATE_SNAPTRIM_ERROR); pg->publish_stats_to_osd(); diff --git a/src/osd/osd_op_util.cc b/src/osd/osd_op_util.cc index a33e2f110..d400a94db 100644 --- a/src/osd/osd_op_util.cc +++ b/src/osd/osd_op_util.cc @@ -16,6 +16,7 @@ bool OpInfo::check_rmw(int flag) const { ceph_assert(rmw_flags != 0); return rmw_flags & flag; } +// Returns true if op performs a read (including of the object_info). bool OpInfo::may_read() const { return need_read_cap() || check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ); } @@ -51,6 +52,16 @@ bool OpInfo::need_skip_promote() const { bool OpInfo::allows_returnvec() const { return check_rmw(CEPH_OSD_RMW_FLAG_RETURNVEC); } +/** + * may_read_data() + * + * Returns true if op reads information other than the object_info. Requires that the + * osd flush any prior writes prior to servicing this op. Includes any information not + * cached by the osd in the object_info or snapset. + */ +bool OpInfo::may_read_data() const { + return check_rmw(CEPH_OSD_RMW_FLAG_READ_DATA); +} void OpInfo::set_rmw_flags(int flags) { rmw_flags |= flags; @@ -67,6 +78,7 @@ void OpInfo::set_skip_handle_cache() { set_rmw_flags(CEPH_OSD_RMW_FLAG_SKIP_HAND void OpInfo::set_skip_promote() { set_rmw_flags(CEPH_OSD_RMW_FLAG_SKIP_PROMOTE); } void OpInfo::set_force_rwordered() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RWORDERED); } void OpInfo::set_returnvec() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RETURNVEC); } +void OpInfo::set_read_data() { set_rmw_flags(CEPH_OSD_RMW_FLAG_READ_DATA); } int OpInfo::set_from_op( @@ -108,8 +120,12 @@ int OpInfo::set_from_op( if (ceph_osd_op_mode_modify(iter->op.op)) set_write(); } - if (ceph_osd_op_mode_read(iter->op.op)) + if (ceph_osd_op_mode_read(iter->op.op)) { set_read(); + if (iter->op.op != CEPH_OSD_OP_STAT) { + set_read_data(); + } + } // set READ flag if there are src_oids if (iter->soid.oid.name.length()) @@ -202,6 +218,7 @@ int OpInfo::set_from_op( // watch state (and may return early if the watch exists) or, in // the case of ping, is simply a read op. set_read(); + set_read_data(); // fall through case CEPH_OSD_OP_NOTIFY: case CEPH_OSD_OP_NOTIFY_ACK: diff --git a/src/osd/osd_op_util.h b/src/osd/osd_op_util.h index 300fe40cc..fcd06c74b 100644 --- a/src/osd/osd_op_util.h +++ b/src/osd/osd_op_util.h @@ -47,6 +47,7 @@ public: bool check_rmw(int flag) const ; bool may_read() const; + bool may_read_data() const; bool may_write() const; bool may_cache() const; bool rwordered_forced() const; @@ -70,6 +71,7 @@ public: void set_skip_promote(); void set_force_rwordered(); void set_returnvec(); + void set_read_data(); int set_from_op( const MOSDOp *m, diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 14694de19..841a44b32 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -7371,3 +7371,31 @@ bool PGLSPlainFilter::filter(const hobject_t& obj, { return xattr_data.contents_equal(val.c_str(), val.size()); } + +std::string_view get_op_queue_type_name(const op_queue_type_t &q) +{ + switch (q) { + case op_queue_type_t::WeightedPriorityQueue: + return "wpq"; + case op_queue_type_t::mClockScheduler: + return "mclock_scheduler"; + case op_queue_type_t::PrioritizedQueue: + return "PrioritizedQueue"; + default: + return "unknown"; + } +} + +std::optional<op_queue_type_t> get_op_queue_type_by_name( + const std::string_view &s) +{ + if (s == "wpq") { + return op_queue_type_t::WeightedPriorityQueue; + } else if (s == "mclock_scheduler") { + return op_queue_type_t::mClockScheduler; + } else if (s == "PrioritizedQueue") { + return op_queue_type_t::PrioritizedQueue; + } else { + return std::nullopt; + } +} diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index afed5fa83..5a50a32e7 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -367,12 +367,14 @@ enum { CEPH_OSD_RMW_FLAG_SKIP_PROMOTE = (1 << 9), CEPH_OSD_RMW_FLAG_RWORDERED = (1 << 10), CEPH_OSD_RMW_FLAG_RETURNVEC = (1 << 11), + CEPH_OSD_RMW_FLAG_READ_DATA = (1 << 12), }; // pg stuff #define OSD_SUPERBLOCK_GOBJECT ghobject_t(hobject_t(sobject_t(object_t("osd_superblock"), 0))) +#define OSD_SUPERBLOCK_OMAP_KEY "osd_superblock" // placement seed (a hash value) typedef uint32_t ps_t; @@ -6636,4 +6638,18 @@ using missing_map_t = std::map<hobject_t, std::pair<std::optional<uint32_t>, std::optional<uint32_t>>>; +/** + * op_queue_type_t + * + * Supported op queue types + */ +enum class op_queue_type_t : uint8_t { + WeightedPriorityQueue = 0, + mClockScheduler, + PrioritizedQueue +}; +std::string_view get_op_queue_type_name(const op_queue_type_t &q); +std::optional<op_queue_type_t> get_op_queue_type_by_name( + const std::string_view &s); + #endif diff --git a/src/osd/scheduler/OpScheduler.cc b/src/osd/scheduler/OpScheduler.cc index cb5ef13b6..12e5bdb6c 100644 --- a/src/osd/scheduler/OpScheduler.cc +++ b/src/osd/scheduler/OpScheduler.cc @@ -23,30 +23,25 @@ namespace ceph::osd::scheduler { OpSchedulerRef make_scheduler( CephContext *cct, int whoami, uint32_t num_shards, int shard_id, - bool is_rotational, std::string_view osd_objectstore, MonClient *monc) + bool is_rotational, std::string_view osd_objectstore, + op_queue_type_t osd_scheduler, unsigned op_queue_cut_off, MonClient *monc) { - const std::string *type = &cct->_conf->osd_op_queue; - if (*type == "debug_random") { - static const std::string index_lookup[] = { "mclock_scheduler", - "wpq" }; - srand(time(NULL)); - unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); - type = &index_lookup[which]; - } - // Force the use of 'wpq' scheduler for filestore OSDs. // The 'mclock_scheduler' is not supported for filestore OSDs. - if (*type == "wpq" || osd_objectstore == "filestore") { + if (op_queue_type_t::WeightedPriorityQueue == osd_scheduler || + osd_objectstore == "filestore") { return std::make_unique< ClassedOpQueueScheduler<WeightedPriorityQueue<OpSchedulerItem, client>>>( cct, + op_queue_cut_off, cct->_conf->osd_op_pq_max_tokens_per_priority, cct->_conf->osd_op_pq_min_cost ); - } else if (*type == "mclock_scheduler") { + } else if (op_queue_type_t::mClockScheduler == osd_scheduler) { // default is 'mclock_scheduler' return std::make_unique< - mClockScheduler>(cct, whoami, num_shards, shard_id, is_rotational, monc); + mClockScheduler>(cct, whoami, num_shards, shard_id, is_rotational, + op_queue_cut_off, monc); } else { ceph_assert("Invalid choice of wq" == 0); } diff --git a/src/osd/scheduler/OpScheduler.h b/src/osd/scheduler/OpScheduler.h index 1575bcae4..570a2a162 100644 --- a/src/osd/scheduler/OpScheduler.h +++ b/src/osd/scheduler/OpScheduler.h @@ -18,6 +18,7 @@ #include <variant> #include "common/ceph_context.h" +#include "common/OpQueue.h" #include "mon/MonClient.h" #include "osd/scheduler/OpSchedulerItem.h" @@ -54,6 +55,9 @@ public: // Apply config changes to the scheduler (if any) virtual void update_configuration() = 0; + // Get the scheduler type set for the queue + virtual op_queue_type_t get_type() const = 0; + // Destructor virtual ~OpScheduler() {}; }; @@ -63,7 +67,8 @@ using OpSchedulerRef = std::unique_ptr<OpScheduler>; OpSchedulerRef make_scheduler( CephContext *cct, int whoami, uint32_t num_shards, int shard_id, - bool is_rotational, std::string_view osd_objectstore, MonClient *monc); + bool is_rotational, std::string_view osd_objectstore, + op_queue_type_t osd_scheduler, unsigned op_queue_cut_off, MonClient *monc); /** * Implements OpScheduler in terms of OpQueue @@ -78,21 +83,10 @@ class ClassedOpQueueScheduler final : public OpScheduler { unsigned cutoff; T queue; - static unsigned int get_io_prio_cut(CephContext *cct) { - if (cct->_conf->osd_op_queue_cut_off == "debug_random") { - srand(time(NULL)); - return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; - } else if (cct->_conf->osd_op_queue_cut_off == "high") { - return CEPH_MSG_PRIO_HIGH; - } else { - // default / catch-all is 'low' - return CEPH_MSG_PRIO_LOW; - } - } public: template <typename... Args> - ClassedOpQueueScheduler(CephContext *cct, Args&&... args) : - cutoff(get_io_prio_cut(cct)), + ClassedOpQueueScheduler(CephContext *cct, unsigned prio_cut, Args&&... args) : + cutoff(prio_cut), queue(std::forward<Args>(args)...) {} @@ -143,6 +137,10 @@ public: // no-op } + op_queue_type_t get_type() const final { + return queue.get_type(); + } + ~ClassedOpQueueScheduler() final {}; }; diff --git a/src/osd/scheduler/mClockScheduler.cc b/src/osd/scheduler/mClockScheduler.cc index 0ea519655..f72683d52 100644 --- a/src/osd/scheduler/mClockScheduler.cc +++ b/src/osd/scheduler/mClockScheduler.cc @@ -35,12 +35,14 @@ mClockScheduler::mClockScheduler(CephContext *cct, uint32_t num_shards, int shard_id, bool is_rotational, + unsigned cutoff_priority, MonClient *monc) : cct(cct), whoami(whoami), num_shards(num_shards), shard_id(shard_id), is_rotational(is_rotational), + cutoff_priority(cutoff_priority), monc(monc), scheduler( std::bind(&mClockScheduler::ClientRegistry::get_info, diff --git a/src/osd/scheduler/mClockScheduler.h b/src/osd/scheduler/mClockScheduler.h index f708b1d7a..16e7f911f 100644 --- a/src/osd/scheduler/mClockScheduler.h +++ b/src/osd/scheduler/mClockScheduler.h @@ -27,7 +27,6 @@ #include "osd/scheduler/OpScheduler.h" #include "common/config.h" #include "common/ceph_context.h" -#include "common/mClockPriorityQueue.h" #include "osd/scheduler/OpSchedulerItem.h" @@ -97,6 +96,7 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { const uint32_t num_shards; const int shard_id; const bool is_rotational; + const unsigned cutoff_priority; MonClient *monc; /** @@ -199,21 +199,6 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { }; } - static unsigned int get_io_prio_cut(CephContext *cct) { - if (cct->_conf->osd_op_queue_cut_off == "debug_random") { - std::random_device rd; - std::mt19937 random_gen(rd()); - return (random_gen() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; - } else if (cct->_conf->osd_op_queue_cut_off == "high") { - return CEPH_MSG_PRIO_HIGH; - } else { - // default / catch-all is 'low' - return CEPH_MSG_PRIO_LOW; - } - } - - unsigned cutoff_priority = get_io_prio_cut(cct); - /** * set_osd_capacity_params_from_config * @@ -233,7 +218,8 @@ class mClockScheduler : public OpScheduler, md_config_obs_t { public: mClockScheduler(CephContext *cct, int whoami, uint32_t num_shards, - int shard_id, bool is_rotational, MonClient *monc); + int shard_id, bool is_rotational, unsigned cutoff_priority, + MonClient *monc); ~mClockScheduler() override; /// Calculate scaled cost per item @@ -260,12 +246,18 @@ public: void dump(ceph::Formatter &f) const final; void print(std::ostream &ostream) const final { - ostream << "mClockScheduler"; + ostream << get_op_queue_type_name(get_type()); + ostream << ", cutoff=" << cutoff_priority; } // Update data associated with the modified mclock config key(s) void update_configuration() final; + // Return the scheduler type + op_queue_type_t get_type() const final { + return op_queue_type_t::mClockScheduler; + } + const char** get_tracked_conf_keys() const final; void handle_conf_change(const ConfigProxy& conf, const std::set<std::string> &changed) final; |