diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/osd/scheduler | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/osd/scheduler/OpScheduler.cc | 56 | ||||
-rw-r--r-- | src/osd/scheduler/OpScheduler.h | 147 | ||||
-rw-r--r-- | src/osd/scheduler/OpSchedulerItem.cc | 259 | ||||
-rw-r--r-- | src/osd/scheduler/OpSchedulerItem.h | 629 | ||||
-rw-r--r-- | src/osd/scheduler/mClockScheduler.cc | 514 | ||||
-rw-r--r-- | src/osd/scheduler/mClockScheduler.h | 204 |
6 files changed, 1809 insertions, 0 deletions
diff --git a/src/osd/scheduler/OpScheduler.cc b/src/osd/scheduler/OpScheduler.cc new file mode 100644 index 000000000..3ce6fdb55 --- /dev/null +++ b/src/osd/scheduler/OpScheduler.cc @@ -0,0 +1,56 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <ostream> + +#include "osd/scheduler/OpScheduler.h" + +#include "common/WeightedPriorityQueue.h" +#include "osd/scheduler/mClockScheduler.h" + +namespace ceph::osd::scheduler { + +OpSchedulerRef make_scheduler( + CephContext *cct, uint32_t num_shards, bool is_rotational) +{ + const std::string *type = &cct->_conf->osd_op_queue; + if (*type == "debug_random") { + static const std::string index_lookup[] = { "mclock_scheduler", + "wpq" }; + srand(time(NULL)); + unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); + type = &index_lookup[which]; + } + + if (*type == "wpq" ) { + // default is 'wpq' + return std::make_unique< + ClassedOpQueueScheduler<WeightedPriorityQueue<OpSchedulerItem, client>>>( + cct, + cct->_conf->osd_op_pq_max_tokens_per_priority, + cct->_conf->osd_op_pq_min_cost + ); + } else if (*type == "mclock_scheduler") { + return std::make_unique<mClockScheduler>(cct, num_shards, is_rotational); + } else { + ceph_assert("Invalid choice of wq" == 0); + } +} + +std::ostream &operator<<(std::ostream &lhs, const OpScheduler &rhs) { + rhs.print(lhs); + return lhs; +} + +} diff --git a/src/osd/scheduler/OpScheduler.h b/src/osd/scheduler/OpScheduler.h new file mode 100644 index 000000000..6e2bb5abd --- /dev/null +++ b/src/osd/scheduler/OpScheduler.h @@ -0,0 +1,147 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <ostream> +#include <variant> + +#include "common/ceph_context.h" +#include "osd/scheduler/OpSchedulerItem.h" + +namespace ceph::osd::scheduler { + +using client = uint64_t; +using WorkItem = std::variant<std::monostate, OpSchedulerItem, double>; + +/** + * Base interface for classes responsible for choosing + * op processing order in the OSD. + */ +class OpScheduler { +public: + // Enqueue op for scheduling + virtual void enqueue(OpSchedulerItem &&item) = 0; + + // Enqueue op for processing as though it were enqueued prior + // to other items already scheduled. + virtual void enqueue_front(OpSchedulerItem &&item) = 0; + + // Returns true iff there are no ops scheduled + virtual bool empty() const = 0; + + // Return next op to be processed + virtual WorkItem dequeue() = 0; + + // Dump formatted representation for the queue + virtual void dump(ceph::Formatter &f) const = 0; + + // Print human readable brief description with relevant parameters + virtual void print(std::ostream &out) const = 0; + + // Apply config changes to the scheduler (if any) + virtual void update_configuration() = 0; + + // Destructor + virtual ~OpScheduler() {}; +}; + +std::ostream &operator<<(std::ostream &lhs, const OpScheduler &); +using OpSchedulerRef = std::unique_ptr<OpScheduler>; + +OpSchedulerRef make_scheduler( + CephContext *cct, uint32_t num_shards, bool is_rotational); + +/** + * Implements OpScheduler in terms of OpQueue + * + * Templated on queue type to avoid dynamic dispatch, T should implement + * OpQueue<OpSchedulerItem, client>. This adapter is mainly responsible for + * the boilerplate priority cutoff/strict concept which is needed for + * OpQueue based implementations. + */ +template <typename T> +class ClassedOpQueueScheduler final : public OpScheduler { + unsigned cutoff; + T queue; + + static unsigned int get_io_prio_cut(CephContext *cct) { + if (cct->_conf->osd_op_queue_cut_off == "debug_random") { + srand(time(NULL)); + return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (cct->_conf->osd_op_queue_cut_off == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + } +public: + template <typename... Args> + ClassedOpQueueScheduler(CephContext *cct, Args&&... args) : + cutoff(get_io_prio_cut(cct)), + queue(std::forward<Args>(args)...) + {} + + void enqueue(OpSchedulerItem &&item) final { + unsigned priority = item.get_priority(); + unsigned cost = item.get_cost(); + + if (priority >= cutoff) + queue.enqueue_strict( + item.get_owner(), priority, std::move(item)); + else + queue.enqueue( + item.get_owner(), priority, cost, std::move(item)); + } + + void enqueue_front(OpSchedulerItem &&item) final { + unsigned priority = item.get_priority(); + unsigned cost = item.get_cost(); + if (priority >= cutoff) + queue.enqueue_strict_front( + item.get_owner(), + priority, std::move(item)); + else + queue.enqueue_front( + item.get_owner(), + priority, cost, std::move(item)); + } + + bool empty() const final { + return queue.empty(); + } + + WorkItem dequeue() final { + return queue.dequeue(); + } + + void dump(ceph::Formatter &f) const final { + return queue.dump(&f); + } + + void print(std::ostream &out) const final { + out << "ClassedOpQueueScheduler(queue="; + queue.print(out); + out << ", cutoff=" << cutoff << ")"; + } + + void update_configuration() final { + // no-op + } + + ~ClassedOpQueueScheduler() final {}; +}; + +} diff --git a/src/osd/scheduler/OpSchedulerItem.cc b/src/osd/scheduler/OpSchedulerItem.cc new file mode 100644 index 000000000..27db1dfa3 --- /dev/null +++ b/src/osd/scheduler/OpSchedulerItem.cc @@ -0,0 +1,259 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "osd/scheduler/OpSchedulerItem.h" +#include "osd/OSD.h" +#ifdef HAVE_JAEGER +#include "common/tracer.h" +#endif + +namespace ceph::osd::scheduler { + +void PGOpItem::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ +#ifdef HAVE_JAEGER + auto PGOpItem_span = jaeger_tracing::child_span("PGOpItem::run", op->osd_parent_span); +#endif + osd->dequeue_op(pg, op, handle); + pg->unlock(); +} + +void PGPeeringItem::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + osd->dequeue_peering_evt(sdata, pg.get(), evt, handle); +} + +void PGSnapTrim::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + pg->snap_trimmer(epoch_queued); + pg->unlock(); +} + +void PGScrub::run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) +{ + pg->scrub(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubAfterRepair::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->recovery_scrub(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubResched::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_scrub_resched(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubResourcesOK::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_resources_granted(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubDenied::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_resources_denied(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubPushesUpdate::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_pushes_update(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubAppliedUpdate::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_applied_update(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubUnblocked::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_unblocking(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubDigestUpdate::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_digest_update(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubGotLocalMap::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_local_map_ready(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubGotReplMaps::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_replmaps_ready(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubMapsCompared::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_maps_compared(epoch_queued, handle); + pg->unlock(); +} + +void PGRepScrub::run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) +{ + pg->replica_scrub(epoch_queued, activation_index, handle); + pg->unlock(); +} + +void PGRepScrubResched::run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->replica_scrub_resched(epoch_queued, activation_index, handle); + pg->unlock(); +} + +void PGScrubReplicaPushes::run([[maybe_unused]] OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_replica_pushes(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubScrubFinished::run([[maybe_unused]] OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_scrub_is_finished(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubGetNextChunk::run([[maybe_unused]] OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_get_next_chunk(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubChunkIsBusy::run([[maybe_unused]] OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_chunk_busy(epoch_queued, handle); + pg->unlock(); +} + +void PGScrubChunkIsFree::run([[maybe_unused]] OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) +{ + pg->scrub_send_chunk_free(epoch_queued, handle); + pg->unlock(); +} + +void PGRecovery::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle); + pg->unlock(); +} + +void PGRecoveryContext::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + c.release()->complete(handle); + pg->unlock(); +} + +void PGDelete::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + osd->dequeue_delete(sdata, pg.get(), epoch_queued, handle); +} + +void PGRecoveryMsg::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + osd->dequeue_op(pg, op, handle); + pg->unlock(); +} + +} diff --git a/src/osd/scheduler/OpSchedulerItem.h b/src/osd/scheduler/OpSchedulerItem.h new file mode 100644 index 000000000..7ba59838e --- /dev/null +++ b/src/osd/scheduler/OpSchedulerItem.h @@ -0,0 +1,629 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <ostream> + +#include "include/types.h" +#include "include/utime.h" +#include "osd/OpRequest.h" +#include "osd/PG.h" +#include "osd/PGPeeringEvent.h" +#include "messages/MOSDOp.h" + + +class OSD; +class OSDShard; + +namespace ceph::osd::scheduler { + +enum class op_scheduler_class : uint8_t { + background_recovery = 0, + background_best_effort, + immediate, + client, +}; + +class OpSchedulerItem { +public: + class OrderLocker { + public: + using Ref = std::unique_ptr<OrderLocker>; + virtual void lock() = 0; + virtual void unlock() = 0; + virtual ~OrderLocker() {} + }; + + // Abstraction for operations queueable in the op queue + class OpQueueable { + public: + enum class op_type_t { + client_op, + peering_event, + bg_snaptrim, + bg_recovery, + bg_scrub, + bg_pg_delete + }; + using Ref = std::unique_ptr<OpQueueable>; + + /// Items with the same queue token will end up in the same shard + virtual uint32_t get_queue_token() const = 0; + + /* Items will be dequeued and locked atomically w.r.t. other items with the + * same ordering token */ + virtual const spg_t& get_ordering_token() const = 0; + virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0; + virtual op_type_t get_op_type() const = 0; + virtual std::optional<OpRequestRef> maybe_get_op() const { + return std::nullopt; + } + + virtual uint64_t get_reserved_pushes() const { + return 0; + } + + virtual bool is_peering() const { + return false; + } + virtual bool peering_requires_pg() const { + ceph_abort(); + } + virtual const PGCreateInfo *creates_pg() const { + return nullptr; + } + + virtual std::ostream &print(std::ostream &rhs) const = 0; + + virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0; + virtual op_scheduler_class get_scheduler_class() const = 0; + + virtual ~OpQueueable() {} + friend std::ostream& operator<<(std::ostream& out, const OpQueueable& q) { + return q.print(out); + } + + }; + +private: + OpQueueable::Ref qitem; + int cost; + unsigned priority; + utime_t start_time; + uint64_t owner; ///< global id (e.g., client.XXX) + epoch_t map_epoch; ///< an epoch we expect the PG to exist in + +public: + OpSchedulerItem( + OpQueueable::Ref &&item, + int cost, + unsigned priority, + utime_t start_time, + uint64_t owner, + epoch_t e) + : qitem(std::move(item)), + cost(cost), + priority(priority), + start_time(start_time), + owner(owner), + map_epoch(e) + {} + OpSchedulerItem(OpSchedulerItem &&) = default; + OpSchedulerItem(const OpSchedulerItem &) = delete; + OpSchedulerItem &operator=(OpSchedulerItem &&) = default; + OpSchedulerItem &operator=(const OpSchedulerItem &) = delete; + + OrderLocker::Ref get_order_locker(PGRef pg) { + return qitem->get_order_locker(pg); + } + uint32_t get_queue_token() const { + return qitem->get_queue_token(); + } + const spg_t& get_ordering_token() const { + return qitem->get_ordering_token(); + } + using op_type_t = OpQueueable::op_type_t; + OpQueueable::op_type_t get_op_type() const { + return qitem->get_op_type(); + } + std::optional<OpRequestRef> maybe_get_op() const { + return qitem->maybe_get_op(); + } + uint64_t get_reserved_pushes() const { + return qitem->get_reserved_pushes(); + } + void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) { + qitem->run(osd, sdata, pg, handle); + } + unsigned get_priority() const { return priority; } + int get_cost() const { return cost; } + utime_t get_start_time() const { return start_time; } + uint64_t get_owner() const { return owner; } + epoch_t get_map_epoch() const { return map_epoch; } + + bool is_peering() const { + return qitem->is_peering(); + } + + const PGCreateInfo *creates_pg() const { + return qitem->creates_pg(); + } + + bool peering_requires_pg() const { + return qitem->peering_requires_pg(); + } + + op_scheduler_class get_scheduler_class() const { + return qitem->get_scheduler_class(); + } + + friend std::ostream& operator<<(std::ostream& out, const OpSchedulerItem& item) { + out << "OpSchedulerItem(" + << item.get_ordering_token() << " " << *item.qitem + << " prio " << item.get_priority() + << " cost " << item.get_cost() + << " e" << item.get_map_epoch(); + if (item.get_reserved_pushes()) { + out << " reserved_pushes " << item.get_reserved_pushes(); + } + return out << ")"; + } +}; // class OpSchedulerItem + +/// Implements boilerplate for operations queued for the pg lock +class PGOpQueueable : public OpSchedulerItem::OpQueueable { + spg_t pgid; +protected: + const spg_t& get_pgid() const { + return pgid; + } +public: + explicit PGOpQueueable(spg_t pg) : pgid(pg) {} + uint32_t get_queue_token() const final { + return get_pgid().ps(); + } + + const spg_t& get_ordering_token() const final { + return get_pgid(); + } + + OpSchedulerItem::OrderLocker::Ref get_order_locker(PGRef pg) final { + class Locker : public OpSchedulerItem::OrderLocker { + PGRef pg; + public: + explicit Locker(PGRef pg) : pg(pg) {} + void lock() final { + pg->lock(); + } + void unlock() final { + pg->unlock(); + } + }; + return OpSchedulerItem::OrderLocker::Ref( + new Locker(pg)); + } +}; + +class PGOpItem : public PGOpQueueable { + OpRequestRef op; + + const MOSDOp *maybe_get_mosd_op() const { + auto req = op->get_req(); + if (req->get_type() == CEPH_MSG_OSD_OP) { + return op->get_req<MOSDOp>(); + } else { + return nullptr; + } + } + +public: + PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {} + op_type_t get_op_type() const final { + + return op_type_t::client_op; + } + + std::ostream &print(std::ostream &rhs) const final { + return rhs << "PGOpItem(op=" << *(op->get_req()) << ")"; + } + + std::optional<OpRequestRef> maybe_get_op() const final { + return op; + } + + op_scheduler_class get_scheduler_class() const final { + auto type = op->get_req()->get_type(); + if (type == CEPH_MSG_OSD_OP || + type == CEPH_MSG_OSD_BACKOFF) { + return op_scheduler_class::client; + } else { + return op_scheduler_class::immediate; + } + } + + void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; +}; + +class PGPeeringItem : public PGOpQueueable { + PGPeeringEventRef evt; +public: + PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {} + op_type_t get_op_type() const final { + return op_type_t::peering_event; + } + std::ostream &print(std::ostream &rhs) const final { + return rhs << "PGPeeringEvent(" << evt->get_desc() << ")"; + } + void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + bool is_peering() const override { + return true; + } + bool peering_requires_pg() const override { + return evt->requires_pg; + } + const PGCreateInfo *creates_pg() const override { + return evt->create_info.get(); + } + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::immediate; + } +}; + +class PGSnapTrim : public PGOpQueueable { + epoch_t epoch_queued; +public: + PGSnapTrim( + spg_t pg, + epoch_t epoch_queued) + : PGOpQueueable(pg), epoch_queued(epoch_queued) {} + op_type_t get_op_type() const final { + return op_type_t::bg_snaptrim; + } + std::ostream &print(std::ostream &rhs) const final { + return rhs << "PGSnapTrim(pgid=" << get_pgid() + << " epoch_queued=" << epoch_queued + << ")"; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_best_effort; + } +}; + +class PGScrub : public PGOpQueueable { + epoch_t epoch_queued; +public: + PGScrub( + spg_t pg, + epoch_t epoch_queued) + : PGOpQueueable(pg), epoch_queued(epoch_queued) {} + op_type_t get_op_type() const final { + return op_type_t::bg_scrub; + } + std::ostream &print(std::ostream &rhs) const final { + return rhs << "PGScrub(pgid=" << get_pgid() + << "epoch_queued=" << epoch_queued + << ")"; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_best_effort; + } +}; + +class PGScrubItem : public PGOpQueueable { + protected: + epoch_t epoch_queued; + Scrub::act_token_t activation_index; + std::string_view message_name; + PGScrubItem(spg_t pg, epoch_t epoch_queued, std::string_view derivative_name) + : PGOpQueueable{pg} + , epoch_queued{epoch_queued} + , activation_index{0} + , message_name{derivative_name} + {} + PGScrubItem(spg_t pg, + epoch_t epoch_queued, + Scrub::act_token_t op_index, + std::string_view derivative_name) + : PGOpQueueable{pg} + , epoch_queued{epoch_queued} + , activation_index{op_index} + , message_name{derivative_name} + {} + op_type_t get_op_type() const final { return op_type_t::bg_scrub; } + std::ostream& print(std::ostream& rhs) const final + { + return rhs << message_name << "(pgid=" << get_pgid() + << "epoch_queued=" << epoch_queued + << " scrub-token=" << activation_index << ")"; + } + void run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + ThreadPool::TPHandle& handle) override = 0; + op_scheduler_class get_scheduler_class() const final + { + return op_scheduler_class::background_best_effort; + } +}; + +class PGScrubResched : public PGScrubItem { + public: + PGScrubResched(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubResched"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +/** + * all replicas have granted our scrub resources request + */ +class PGScrubResourcesOK : public PGScrubItem { + public: + PGScrubResourcesOK(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubResourcesOK"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +/** + * scrub resources requests denied by replica(s) + */ +class PGScrubDenied : public PGScrubItem { + public: + PGScrubDenied(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubDenied"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +/** + * called when a repair process completes, to initiate scrubbing. No local/remote + * resources are allocated. + */ +class PGScrubAfterRepair : public PGScrubItem { + public: + PGScrubAfterRepair(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubAfterRepair"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubPushesUpdate : public PGScrubItem { + public: + PGScrubPushesUpdate(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubPushesUpdate"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubAppliedUpdate : public PGScrubItem { + public: + PGScrubAppliedUpdate(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubAppliedUpdate"} + {} + void run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + [[maybe_unused]] ThreadPool::TPHandle& handle) final; +}; + +class PGScrubUnblocked : public PGScrubItem { + public: + PGScrubUnblocked(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubUnblocked"} + {} + void run(OSD* osd, + OSDShard* sdata, + PGRef& pg, + [[maybe_unused]] ThreadPool::TPHandle& handle) final; +}; + +class PGScrubDigestUpdate : public PGScrubItem { + public: + PGScrubDigestUpdate(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubDigestUpdate"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubGotLocalMap : public PGScrubItem { + public: + PGScrubGotLocalMap(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubGotLocalMap"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubGotReplMaps : public PGScrubItem { + public: + PGScrubGotReplMaps(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubGotReplMaps"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubMapsCompared : public PGScrubItem { + public: + PGScrubMapsCompared(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubMapsCompared"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGRepScrub : public PGScrubItem { + public: + PGRepScrub(spg_t pg, epoch_t epoch_queued, Scrub::act_token_t op_token) + : PGScrubItem{pg, epoch_queued, op_token, "PGRepScrub"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGRepScrubResched : public PGScrubItem { + public: + PGRepScrubResched(spg_t pg, epoch_t epoch_queued, Scrub::act_token_t op_token) + : PGScrubItem{pg, epoch_queued, op_token, "PGRepScrubResched"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubReplicaPushes : public PGScrubItem { + public: + PGScrubReplicaPushes(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubReplicaPushes"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubScrubFinished : public PGScrubItem { + public: + PGScrubScrubFinished(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubScrubFinished"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubGetNextChunk : public PGScrubItem { + public: + PGScrubGetNextChunk(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubGetNextChunk"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubChunkIsBusy : public PGScrubItem { + public: + PGScrubChunkIsBusy(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubChunkIsBusy"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGScrubChunkIsFree : public PGScrubItem { + public: + PGScrubChunkIsFree(spg_t pg, epoch_t epoch_queued) + : PGScrubItem{pg, epoch_queued, "PGScrubChunkIsFree"} + {} + void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final; +}; + +class PGRecovery : public PGOpQueueable { + epoch_t epoch_queued; + uint64_t reserved_pushes; +public: + PGRecovery( + spg_t pg, + epoch_t epoch_queued, + uint64_t reserved_pushes) + : PGOpQueueable(pg), + epoch_queued(epoch_queued), + reserved_pushes(reserved_pushes) {} + op_type_t get_op_type() const final { + return op_type_t::bg_recovery; + } + std::ostream &print(std::ostream &rhs) const final { + return rhs << "PGRecovery(pgid=" << get_pgid() + << " epoch_queued=" << epoch_queued + << " reserved_pushes=" << reserved_pushes + << ")"; + } + uint64_t get_reserved_pushes() const final { + return reserved_pushes; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_recovery; + } +}; + +class PGRecoveryContext : public PGOpQueueable { + std::unique_ptr<GenContext<ThreadPool::TPHandle&>> c; + epoch_t epoch; +public: + PGRecoveryContext(spg_t pgid, + GenContext<ThreadPool::TPHandle&> *c, epoch_t epoch) + : PGOpQueueable(pgid), + c(c), epoch(epoch) {} + op_type_t get_op_type() const final { + return op_type_t::bg_recovery; + } + std::ostream &print(std::ostream &rhs) const final { + return rhs << "PGRecoveryContext(pgid=" << get_pgid() + << " c=" << c.get() << " epoch=" << epoch + << ")"; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_recovery; + } +}; + +class PGDelete : public PGOpQueueable { + epoch_t epoch_queued; +public: + PGDelete( + spg_t pg, + epoch_t epoch_queued) + : PGOpQueueable(pg), + epoch_queued(epoch_queued) {} + op_type_t get_op_type() const final { + return op_type_t::bg_pg_delete; + } + std::ostream &print(std::ostream &rhs) const final { + return rhs << "PGDelete(" << get_pgid() + << " e" << epoch_queued + << ")"; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_best_effort; + } +}; + +class PGRecoveryMsg : public PGOpQueueable { + OpRequestRef op; + +public: + PGRecoveryMsg(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {} + op_type_t get_op_type() const final { + return op_type_t::bg_recovery; + } + + std::ostream &print(std::ostream &rhs) const final { + return rhs << "PGRecoveryMsg(op=" << *(op->get_req()) << ")"; + } + + std::optional<OpRequestRef> maybe_get_op() const final { + return op; + } + + op_scheduler_class get_scheduler_class() const final { + auto priority = op->get_req()->get_priority(); + if (priority >= CEPH_MSG_PRIO_HIGH) { + return op_scheduler_class::immediate; + } + return op_scheduler_class::background_recovery; + } + + void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; +}; + +} diff --git a/src/osd/scheduler/mClockScheduler.cc b/src/osd/scheduler/mClockScheduler.cc new file mode 100644 index 000000000..f2f0ffc3d --- /dev/null +++ b/src/osd/scheduler/mClockScheduler.cc @@ -0,0 +1,514 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include <memory> +#include <functional> + +#include "osd/scheduler/mClockScheduler.h" +#include "common/dout.h" + +namespace dmc = crimson::dmclock; +using namespace std::placeholders; + +#define dout_context cct +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix *_dout << "mClockScheduler: " + + +namespace ceph::osd::scheduler { + +mClockScheduler::mClockScheduler(CephContext *cct, + uint32_t num_shards, + bool is_rotational) + : cct(cct), + num_shards(num_shards), + is_rotational(is_rotational), + scheduler( + std::bind(&mClockScheduler::ClientRegistry::get_info, + &client_registry, + _1), + dmc::AtLimit::Wait, + cct->_conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout")) +{ + cct->_conf.add_observer(this); + ceph_assert(num_shards > 0); + set_max_osd_capacity(); + set_osd_mclock_cost_per_io(); + set_osd_mclock_cost_per_byte(); + set_mclock_profile(); + enable_mclock_profile_settings(); + client_registry.update_from_config(cct->_conf); +} + +void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf) +{ + default_external_client_info.update( + conf.get_val<uint64_t>("osd_mclock_scheduler_client_res"), + conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"), + conf.get_val<uint64_t>("osd_mclock_scheduler_client_lim")); + + internal_client_infos[ + static_cast<size_t>(op_scheduler_class::background_recovery)].update( + conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_res"), + conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"), + conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_lim")); + + internal_client_infos[ + static_cast<size_t>(op_scheduler_class::background_best_effort)].update( + conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_res"), + conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"), + conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_lim")); +} + +const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client( + const client_profile_id_t &client) const +{ + auto ret = external_client_infos.find(client); + if (ret == external_client_infos.end()) + return &default_external_client_info; + else + return &(ret->second); +} + +const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info( + const scheduler_id_t &id) const { + switch (id.class_id) { + case op_scheduler_class::immediate: + ceph_assert(0 == "Cannot schedule immediate"); + return (dmc::ClientInfo*)nullptr; + case op_scheduler_class::client: + return get_external_client(id.client_profile_id); + default: + ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size()); + return &internal_client_infos[static_cast<size_t>(id.class_id)]; + } +} + +void mClockScheduler::set_max_osd_capacity() +{ + if (is_rotational) { + max_osd_capacity = + cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_hdd"); + } else { + max_osd_capacity = + cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_ssd"); + } + // Set per op-shard iops limit + max_osd_capacity /= num_shards; + dout(1) << __func__ << " #op shards: " << num_shards + << std::fixed << std::setprecision(2) + << " max osd capacity(iops) per shard: " << max_osd_capacity + << dendl; +} + +void mClockScheduler::set_osd_mclock_cost_per_io() +{ + std::chrono::seconds sec(1); + if (cct->_conf.get_val<double>("osd_mclock_cost_per_io_usec")) { + osd_mclock_cost_per_io = + cct->_conf.get_val<double>("osd_mclock_cost_per_io_usec"); + } else { + if (is_rotational) { + osd_mclock_cost_per_io = + cct->_conf.get_val<double>("osd_mclock_cost_per_io_usec_hdd"); + // For HDDs, convert value to seconds + osd_mclock_cost_per_io /= std::chrono::microseconds(sec).count(); + } else { + // For SSDs, convert value to milliseconds + osd_mclock_cost_per_io = + cct->_conf.get_val<double>("osd_mclock_cost_per_io_usec_ssd"); + osd_mclock_cost_per_io /= std::chrono::milliseconds(sec).count(); + } + } + dout(1) << __func__ << " osd_mclock_cost_per_io: " + << std::fixed << std::setprecision(7) << osd_mclock_cost_per_io + << dendl; +} + +void mClockScheduler::set_osd_mclock_cost_per_byte() +{ + std::chrono::seconds sec(1); + if (cct->_conf.get_val<double>("osd_mclock_cost_per_byte_usec")) { + osd_mclock_cost_per_byte = + cct->_conf.get_val<double>("osd_mclock_cost_per_byte_usec"); + } else { + if (is_rotational) { + osd_mclock_cost_per_byte = + cct->_conf.get_val<double>("osd_mclock_cost_per_byte_usec_hdd"); + // For HDDs, convert value to seconds + osd_mclock_cost_per_byte /= std::chrono::microseconds(sec).count(); + } else { + osd_mclock_cost_per_byte = + cct->_conf.get_val<double>("osd_mclock_cost_per_byte_usec_ssd"); + // For SSDs, convert value to milliseconds + osd_mclock_cost_per_byte /= std::chrono::milliseconds(sec).count(); + } + } + dout(1) << __func__ << " osd_mclock_cost_per_byte: " + << std::fixed << std::setprecision(7) << osd_mclock_cost_per_byte + << dendl; +} + +void mClockScheduler::set_mclock_profile() +{ + mclock_profile = cct->_conf.get_val<std::string>("osd_mclock_profile"); + dout(1) << __func__ << " mclock profile: " << mclock_profile << dendl; +} + +std::string mClockScheduler::get_mclock_profile() +{ + return mclock_profile; +} + +void mClockScheduler::set_balanced_profile_allocations() +{ + // Client Allocation: + // reservation: 40% | weight: 1 | limit: 100% | + // Background Recovery Allocation: + // reservation: 40% | weight: 1 | limit: 150% | + // Background Best Effort Allocation: + // reservation: 20% | weight: 2 | limit: max | + + // Client + uint64_t client_res = static_cast<uint64_t>( + std::round(0.40 * max_osd_capacity)); + uint64_t client_lim = static_cast<uint64_t>( + std::round(max_osd_capacity)); + uint64_t client_wgt = default_min; + + // Background Recovery + uint64_t rec_res = static_cast<uint64_t>( + std::round(0.40 * max_osd_capacity)); + uint64_t rec_lim = static_cast<uint64_t>( + std::round(1.5 * max_osd_capacity)); + uint64_t rec_wgt = default_min; + + // Background Best Effort + uint64_t best_effort_res = static_cast<uint64_t>( + std::round(0.20 * max_osd_capacity)); + uint64_t best_effort_lim = default_max; + uint64_t best_effort_wgt = 2; + + // Set the allocations for the mclock clients + client_allocs[ + static_cast<size_t>(op_scheduler_class::client)].update( + client_res, + client_wgt, + client_lim); + client_allocs[ + static_cast<size_t>(op_scheduler_class::background_recovery)].update( + rec_res, + rec_wgt, + rec_lim); + client_allocs[ + static_cast<size_t>(op_scheduler_class::background_best_effort)].update( + best_effort_res, + best_effort_wgt, + best_effort_lim); +} + +void mClockScheduler::set_high_recovery_ops_profile_allocations() +{ + // Client Allocation: + // reservation: 30% | weight: 1 | limit: 80% | + // Background Recovery Allocation: + // reservation: 60% | weight: 2 | limit: 200% | + // Background Best Effort Allocation: + // reservation: 1 | weight: 2 | limit: max | + + // Client + uint64_t client_res = static_cast<uint64_t>( + std::round(0.30 * max_osd_capacity)); + uint64_t client_lim = static_cast<uint64_t>( + std::round(0.80 * max_osd_capacity)); + uint64_t client_wgt = default_min; + + // Background Recovery + uint64_t rec_res = static_cast<uint64_t>( + std::round(0.60 * max_osd_capacity)); + uint64_t rec_lim = static_cast<uint64_t>( + std::round(2.0 * max_osd_capacity)); + uint64_t rec_wgt = 2; + + // Background Best Effort + uint64_t best_effort_res = default_min; + uint64_t best_effort_lim = default_max; + uint64_t best_effort_wgt = 2; + + // Set the allocations for the mclock clients + client_allocs[ + static_cast<size_t>(op_scheduler_class::client)].update( + client_res, + client_wgt, + client_lim); + client_allocs[ + static_cast<size_t>(op_scheduler_class::background_recovery)].update( + rec_res, + rec_wgt, + rec_lim); + client_allocs[ + static_cast<size_t>(op_scheduler_class::background_best_effort)].update( + best_effort_res, + best_effort_wgt, + best_effort_lim); +} + +void mClockScheduler::set_high_client_ops_profile_allocations() +{ + // Client Allocation: + // reservation: 50% | weight: 2 | limit: max | + // Background Recovery Allocation: + // reservation: 25% | weight: 1 | limit: 100% | + // Background Best Effort Allocation: + // reservation: 25% | weight: 2 | limit: max | + + // Client + uint64_t client_res = static_cast<uint64_t>( + std::round(0.50 * max_osd_capacity)); + uint64_t client_wgt = 2; + uint64_t client_lim = default_max; + + // Background Recovery + uint64_t rec_res = static_cast<uint64_t>( + std::round(0.25 * max_osd_capacity)); + uint64_t rec_lim = static_cast<uint64_t>( + std::round(max_osd_capacity)); + uint64_t rec_wgt = default_min; + + // Background Best Effort + uint64_t best_effort_res = static_cast<uint64_t>( + std::round(0.25 * max_osd_capacity)); + uint64_t best_effort_lim = default_max; + uint64_t best_effort_wgt = 2; + + // Set the allocations for the mclock clients + client_allocs[ + static_cast<size_t>(op_scheduler_class::client)].update( + client_res, + client_wgt, + client_lim); + client_allocs[ + static_cast<size_t>(op_scheduler_class::background_recovery)].update( + rec_res, + rec_wgt, + rec_lim); + client_allocs[ + static_cast<size_t>(op_scheduler_class::background_best_effort)].update( + best_effort_res, + best_effort_wgt, + best_effort_lim); +} + +void mClockScheduler::enable_mclock_profile_settings() +{ + // Nothing to do for "custom" profile + if (mclock_profile == "custom") { + return; + } + + // Set mclock and ceph config options for the chosen profile + if (mclock_profile == "balanced") { + set_balanced_profile_allocations(); + } else if (mclock_profile == "high_recovery_ops") { + set_high_recovery_ops_profile_allocations(); + } else if (mclock_profile == "high_client_ops") { + set_high_client_ops_profile_allocations(); + } else { + ceph_assert("Invalid choice of mclock profile" == 0); + return; + } + + // Set the mclock config parameters + set_profile_config(); +} + +void mClockScheduler::set_profile_config() +{ + ClientAllocs client = client_allocs[ + static_cast<size_t>(op_scheduler_class::client)]; + ClientAllocs rec = client_allocs[ + static_cast<size_t>(op_scheduler_class::background_recovery)]; + ClientAllocs best_effort = client_allocs[ + static_cast<size_t>(op_scheduler_class::background_best_effort)]; + + // Set external client params + cct->_conf.set_val("osd_mclock_scheduler_client_res", + std::to_string(client.res)); + cct->_conf.set_val("osd_mclock_scheduler_client_wgt", + std::to_string(client.wgt)); + cct->_conf.set_val("osd_mclock_scheduler_client_lim", + std::to_string(client.lim)); + + // Set background recovery client params + cct->_conf.set_val("osd_mclock_scheduler_background_recovery_res", + std::to_string(rec.res)); + cct->_conf.set_val("osd_mclock_scheduler_background_recovery_wgt", + std::to_string(rec.wgt)); + cct->_conf.set_val("osd_mclock_scheduler_background_recovery_lim", + std::to_string(rec.lim)); + + // Set background best effort client params + cct->_conf.set_val("osd_mclock_scheduler_background_best_effort_res", + std::to_string(best_effort.res)); + cct->_conf.set_val("osd_mclock_scheduler_background_best_effort_wgt", + std::to_string(best_effort.wgt)); + cct->_conf.set_val("osd_mclock_scheduler_background_best_effort_lim", + std::to_string(best_effort.lim)); +} + +int mClockScheduler::calc_scaled_cost(int item_cost) +{ + // Calculate total scaled cost in secs + int scaled_cost = + std::round(osd_mclock_cost_per_io + (osd_mclock_cost_per_byte * item_cost)); + return std::max(scaled_cost, 1); +} + +void mClockScheduler::update_configuration() +{ + // Apply configuration change. The expectation is that + // at least one of the tracked mclock config option keys + // is modified before calling this method. + cct->_conf.apply_changes(nullptr); +} + +void mClockScheduler::dump(ceph::Formatter &f) const +{ +} + +void mClockScheduler::enqueue(OpSchedulerItem&& item) +{ + auto id = get_scheduler_id(item); + + // TODO: move this check into OpSchedulerItem, handle backwards compat + if (op_scheduler_class::immediate == id.class_id) { + immediate.push_front(std::move(item)); + } else { + int cost = calc_scaled_cost(item.get_cost()); + // Add item to scheduler queue + scheduler.add_request( + std::move(item), + id, + cost); + } +} + +void mClockScheduler::enqueue_front(OpSchedulerItem&& item) +{ + immediate.push_back(std::move(item)); + // TODO: item may not be immediate, update mclock machinery to permit + // putting the item back in the queue +} + +WorkItem mClockScheduler::dequeue() +{ + if (!immediate.empty()) { + WorkItem work_item{std::move(immediate.back())}; + immediate.pop_back(); + return work_item; + } else { + mclock_queue_t::PullReq result = scheduler.pull_request(); + if (result.is_future()) { + return result.getTime(); + } else if (result.is_none()) { + ceph_assert( + 0 == "Impossible, must have checked empty() first"); + return {}; + } else { + ceph_assert(result.is_retn()); + + auto &retn = result.get_retn(); + return std::move(*retn.request); + } + } +} + +const char** mClockScheduler::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "osd_mclock_scheduler_client_res", + "osd_mclock_scheduler_client_wgt", + "osd_mclock_scheduler_client_lim", + "osd_mclock_scheduler_background_recovery_res", + "osd_mclock_scheduler_background_recovery_wgt", + "osd_mclock_scheduler_background_recovery_lim", + "osd_mclock_scheduler_background_best_effort_res", + "osd_mclock_scheduler_background_best_effort_wgt", + "osd_mclock_scheduler_background_best_effort_lim", + "osd_mclock_cost_per_io_usec", + "osd_mclock_cost_per_io_usec_hdd", + "osd_mclock_cost_per_io_usec_ssd", + "osd_mclock_cost_per_byte_usec", + "osd_mclock_cost_per_byte_usec_hdd", + "osd_mclock_cost_per_byte_usec_ssd", + "osd_mclock_max_capacity_iops_hdd", + "osd_mclock_max_capacity_iops_ssd", + "osd_mclock_profile", + NULL + }; + return KEYS; +} + +void mClockScheduler::handle_conf_change( + const ConfigProxy& conf, + const std::set<std::string> &changed) +{ + if (changed.count("osd_mclock_cost_per_io_usec") || + changed.count("osd_mclock_cost_per_io_usec_hdd") || + changed.count("osd_mclock_cost_per_io_usec_ssd")) { + set_osd_mclock_cost_per_io(); + } + if (changed.count("osd_mclock_cost_per_byte_usec") || + changed.count("osd_mclock_cost_per_byte_usec_hdd") || + changed.count("osd_mclock_cost_per_byte_usec_ssd")) { + set_osd_mclock_cost_per_byte(); + } + if (changed.count("osd_mclock_max_capacity_iops_hdd") || + changed.count("osd_mclock_max_capacity_iops_ssd")) { + set_max_osd_capacity(); + if (mclock_profile != "custom") { + enable_mclock_profile_settings(); + client_registry.update_from_config(conf); + } + } + if (changed.count("osd_mclock_profile")) { + set_mclock_profile(); + if (mclock_profile != "custom") { + enable_mclock_profile_settings(); + client_registry.update_from_config(conf); + } + } + if (changed.count("osd_mclock_scheduler_client_res") || + changed.count("osd_mclock_scheduler_client_wgt") || + changed.count("osd_mclock_scheduler_client_lim") || + changed.count("osd_mclock_scheduler_background_recovery_res") || + changed.count("osd_mclock_scheduler_background_recovery_wgt") || + changed.count("osd_mclock_scheduler_background_recovery_lim") || + changed.count("osd_mclock_scheduler_background_best_effort_res") || + changed.count("osd_mclock_scheduler_background_best_effort_wgt") || + changed.count("osd_mclock_scheduler_background_best_effort_lim")) { + if (mclock_profile == "custom") { + client_registry.update_from_config(conf); + } + } +} + +mClockScheduler::~mClockScheduler() +{ + cct->_conf.remove_observer(this); +} + +} diff --git a/src/osd/scheduler/mClockScheduler.h b/src/osd/scheduler/mClockScheduler.h new file mode 100644 index 000000000..32f3851ec --- /dev/null +++ b/src/osd/scheduler/mClockScheduler.h @@ -0,0 +1,204 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include <ostream> +#include <map> +#include <vector> + +#include "boost/variant.hpp" + +#include "dmclock/src/dmclock_server.h" + +#include "osd/scheduler/OpScheduler.h" +#include "common/config.h" +#include "include/cmp.h" +#include "common/ceph_context.h" +#include "common/mClockPriorityQueue.h" +#include "osd/scheduler/OpSchedulerItem.h" + + +namespace ceph::osd::scheduler { + +constexpr uint64_t default_min = 1; +constexpr uint64_t default_max = 999999; + +using client_id_t = uint64_t; +using profile_id_t = uint64_t; + +struct client_profile_id_t { + client_id_t client_id; + profile_id_t profile_id; +}; + +WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id) +WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id) + + +struct scheduler_id_t { + op_scheduler_class class_id; + client_profile_id_t client_profile_id; +}; + +WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id) +WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id) + +/** + * Scheduler implementation based on mclock. + * + * TODO: explain configs + */ +class mClockScheduler : public OpScheduler, md_config_obs_t { + + CephContext *cct; + const uint32_t num_shards; + bool is_rotational; + double max_osd_capacity; + double osd_mclock_cost_per_io; + double osd_mclock_cost_per_byte; + std::string mclock_profile = "high_client_ops"; + struct ClientAllocs { + uint64_t res; + uint64_t wgt; + uint64_t lim; + + ClientAllocs(uint64_t _res, uint64_t _wgt, uint64_t _lim) { + update(_res, _wgt, _lim); + } + + inline void update(uint64_t _res, uint64_t _wgt, uint64_t _lim) { + res = _res; + wgt = _wgt; + lim = _lim; + } + }; + std::array< + ClientAllocs, + static_cast<size_t>(op_scheduler_class::client) + 1 + > client_allocs = { + // Placeholder, get replaced with configured values + ClientAllocs(1, 1, 1), // background_recovery + ClientAllocs(1, 1, 1), // background_best_effort + ClientAllocs(1, 1, 1), // immediate (not used) + ClientAllocs(1, 1, 1) // client + }; + class ClientRegistry { + std::array< + crimson::dmclock::ClientInfo, + static_cast<size_t>(op_scheduler_class::immediate) + > internal_client_infos = { + // Placeholder, gets replaced with configured values + crimson::dmclock::ClientInfo(1, 1, 1), + crimson::dmclock::ClientInfo(1, 1, 1) + }; + + crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1}; + std::map<client_profile_id_t, + crimson::dmclock::ClientInfo> external_client_infos; + const crimson::dmclock::ClientInfo *get_external_client( + const client_profile_id_t &client) const; + public: + void update_from_config(const ConfigProxy &conf); + const crimson::dmclock::ClientInfo *get_info( + const scheduler_id_t &id) const; + } client_registry; + + using mclock_queue_t = crimson::dmclock::PullPriorityQueue< + scheduler_id_t, + OpSchedulerItem, + true, + true, + 2>; + mclock_queue_t scheduler; + std::list<OpSchedulerItem> immediate; + + static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) { + return scheduler_id_t{ + item.get_scheduler_class(), + client_profile_id_t{ + item.get_owner(), + 0 + } + }; + } + +public: + mClockScheduler(CephContext *cct, uint32_t num_shards, bool is_rotational); + ~mClockScheduler() override; + + // Set the max osd capacity in iops + void set_max_osd_capacity(); + + // Set the cost per io for the osd + void set_osd_mclock_cost_per_io(); + + // Set the cost per byte for the osd + void set_osd_mclock_cost_per_byte(); + + // Set the mclock profile type to enable + void set_mclock_profile(); + + // Get the active mclock profile + std::string get_mclock_profile(); + + // Set "balanced" profile allocations + void set_balanced_profile_allocations(); + + // Set "high_recovery_ops" profile allocations + void set_high_recovery_ops_profile_allocations(); + + // Set "high_client_ops" profile allocations + void set_high_client_ops_profile_allocations(); + + // Set the mclock related config params based on the profile + void enable_mclock_profile_settings(); + + // Set mclock config parameter based on allocations + void set_profile_config(); + + // Calculate scale cost per item + int calc_scaled_cost(int cost); + + // Enqueue op in the back of the regular queue + void enqueue(OpSchedulerItem &&item) final; + + // Enqueue the op in the front of the regular queue + void enqueue_front(OpSchedulerItem &&item) final; + + // Return an op to be dispatch + WorkItem dequeue() final; + + // Returns if the queue is empty + bool empty() const final { + return immediate.empty() && scheduler.empty(); + } + + // Formatted output of the queue + void dump(ceph::Formatter &f) const final; + + void print(std::ostream &ostream) const final { + ostream << "mClockScheduler"; + } + + // Update data associated with the modified mclock config key(s) + void update_configuration() final; + + const char** get_tracked_conf_keys() const final; + void handle_conf_change(const ConfigProxy& conf, + const std::set<std::string> &changed) final; +}; + +} |