summaryrefslogtreecommitdiffstats
path: root/src/osd/scheduler
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/osd/scheduler
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/osd/scheduler/OpScheduler.cc56
-rw-r--r--src/osd/scheduler/OpScheduler.h147
-rw-r--r--src/osd/scheduler/OpSchedulerItem.cc259
-rw-r--r--src/osd/scheduler/OpSchedulerItem.h629
-rw-r--r--src/osd/scheduler/mClockScheduler.cc514
-rw-r--r--src/osd/scheduler/mClockScheduler.h204
6 files changed, 1809 insertions, 0 deletions
diff --git a/src/osd/scheduler/OpScheduler.cc b/src/osd/scheduler/OpScheduler.cc
new file mode 100644
index 000000000..3ce6fdb55
--- /dev/null
+++ b/src/osd/scheduler/OpScheduler.cc
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <ostream>
+
+#include "osd/scheduler/OpScheduler.h"
+
+#include "common/WeightedPriorityQueue.h"
+#include "osd/scheduler/mClockScheduler.h"
+
+namespace ceph::osd::scheduler {
+
+OpSchedulerRef make_scheduler(
+ CephContext *cct, uint32_t num_shards, bool is_rotational)
+{
+ const std::string *type = &cct->_conf->osd_op_queue;
+ if (*type == "debug_random") {
+ static const std::string index_lookup[] = { "mclock_scheduler",
+ "wpq" };
+ srand(time(NULL));
+ unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+ type = &index_lookup[which];
+ }
+
+ if (*type == "wpq" ) {
+ // default is 'wpq'
+ return std::make_unique<
+ ClassedOpQueueScheduler<WeightedPriorityQueue<OpSchedulerItem, client>>>(
+ cct,
+ cct->_conf->osd_op_pq_max_tokens_per_priority,
+ cct->_conf->osd_op_pq_min_cost
+ );
+ } else if (*type == "mclock_scheduler") {
+ return std::make_unique<mClockScheduler>(cct, num_shards, is_rotational);
+ } else {
+ ceph_assert("Invalid choice of wq" == 0);
+ }
+}
+
+std::ostream &operator<<(std::ostream &lhs, const OpScheduler &rhs) {
+ rhs.print(lhs);
+ return lhs;
+}
+
+}
diff --git a/src/osd/scheduler/OpScheduler.h b/src/osd/scheduler/OpScheduler.h
new file mode 100644
index 000000000..6e2bb5abd
--- /dev/null
+++ b/src/osd/scheduler/OpScheduler.h
@@ -0,0 +1,147 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <ostream>
+#include <variant>
+
+#include "common/ceph_context.h"
+#include "osd/scheduler/OpSchedulerItem.h"
+
+namespace ceph::osd::scheduler {
+
+using client = uint64_t;
+using WorkItem = std::variant<std::monostate, OpSchedulerItem, double>;
+
+/**
+ * Base interface for classes responsible for choosing
+ * op processing order in the OSD.
+ */
+class OpScheduler {
+public:
+ // Enqueue op for scheduling
+ virtual void enqueue(OpSchedulerItem &&item) = 0;
+
+ // Enqueue op for processing as though it were enqueued prior
+ // to other items already scheduled.
+ virtual void enqueue_front(OpSchedulerItem &&item) = 0;
+
+ // Returns true iff there are no ops scheduled
+ virtual bool empty() const = 0;
+
+ // Return next op to be processed
+ virtual WorkItem dequeue() = 0;
+
+ // Dump formatted representation for the queue
+ virtual void dump(ceph::Formatter &f) const = 0;
+
+ // Print human readable brief description with relevant parameters
+ virtual void print(std::ostream &out) const = 0;
+
+ // Apply config changes to the scheduler (if any)
+ virtual void update_configuration() = 0;
+
+ // Destructor
+ virtual ~OpScheduler() {};
+};
+
+std::ostream &operator<<(std::ostream &lhs, const OpScheduler &);
+using OpSchedulerRef = std::unique_ptr<OpScheduler>;
+
+OpSchedulerRef make_scheduler(
+ CephContext *cct, uint32_t num_shards, bool is_rotational);
+
+/**
+ * Implements OpScheduler in terms of OpQueue
+ *
+ * Templated on queue type to avoid dynamic dispatch, T should implement
+ * OpQueue<OpSchedulerItem, client>. This adapter is mainly responsible for
+ * the boilerplate priority cutoff/strict concept which is needed for
+ * OpQueue based implementations.
+ */
+template <typename T>
+class ClassedOpQueueScheduler final : public OpScheduler {
+ unsigned cutoff;
+ T queue;
+
+ static unsigned int get_io_prio_cut(CephContext *cct) {
+ if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+ srand(time(NULL));
+ return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (cct->_conf->osd_op_queue_cut_off == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ }
+public:
+ template <typename... Args>
+ ClassedOpQueueScheduler(CephContext *cct, Args&&... args) :
+ cutoff(get_io_prio_cut(cct)),
+ queue(std::forward<Args>(args)...)
+ {}
+
+ void enqueue(OpSchedulerItem &&item) final {
+ unsigned priority = item.get_priority();
+ unsigned cost = item.get_cost();
+
+ if (priority >= cutoff)
+ queue.enqueue_strict(
+ item.get_owner(), priority, std::move(item));
+ else
+ queue.enqueue(
+ item.get_owner(), priority, cost, std::move(item));
+ }
+
+ void enqueue_front(OpSchedulerItem &&item) final {
+ unsigned priority = item.get_priority();
+ unsigned cost = item.get_cost();
+ if (priority >= cutoff)
+ queue.enqueue_strict_front(
+ item.get_owner(),
+ priority, std::move(item));
+ else
+ queue.enqueue_front(
+ item.get_owner(),
+ priority, cost, std::move(item));
+ }
+
+ bool empty() const final {
+ return queue.empty();
+ }
+
+ WorkItem dequeue() final {
+ return queue.dequeue();
+ }
+
+ void dump(ceph::Formatter &f) const final {
+ return queue.dump(&f);
+ }
+
+ void print(std::ostream &out) const final {
+ out << "ClassedOpQueueScheduler(queue=";
+ queue.print(out);
+ out << ", cutoff=" << cutoff << ")";
+ }
+
+ void update_configuration() final {
+ // no-op
+ }
+
+ ~ClassedOpQueueScheduler() final {};
+};
+
+}
diff --git a/src/osd/scheduler/OpSchedulerItem.cc b/src/osd/scheduler/OpSchedulerItem.cc
new file mode 100644
index 000000000..27db1dfa3
--- /dev/null
+++ b/src/osd/scheduler/OpSchedulerItem.cc
@@ -0,0 +1,259 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "osd/scheduler/OpSchedulerItem.h"
+#include "osd/OSD.h"
+#ifdef HAVE_JAEGER
+#include "common/tracer.h"
+#endif
+
+namespace ceph::osd::scheduler {
+
+void PGOpItem::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+#ifdef HAVE_JAEGER
+ auto PGOpItem_span = jaeger_tracing::child_span("PGOpItem::run", op->osd_parent_span);
+#endif
+ osd->dequeue_op(pg, op, handle);
+ pg->unlock();
+}
+
+void PGPeeringItem::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->dequeue_peering_evt(sdata, pg.get(), evt, handle);
+}
+
+void PGSnapTrim::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ pg->snap_trimmer(epoch_queued);
+ pg->unlock();
+}
+
+void PGScrub::run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle)
+{
+ pg->scrub(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubAfterRepair::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->recovery_scrub(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubResched::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_scrub_resched(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubResourcesOK::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_resources_granted(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubDenied::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_resources_denied(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubPushesUpdate::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_pushes_update(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubAppliedUpdate::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_applied_update(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubUnblocked::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_unblocking(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubDigestUpdate::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_digest_update(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubGotLocalMap::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_local_map_ready(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubGotReplMaps::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_replmaps_ready(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubMapsCompared::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_maps_compared(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGRepScrub::run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle)
+{
+ pg->replica_scrub(epoch_queued, activation_index, handle);
+ pg->unlock();
+}
+
+void PGRepScrubResched::run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->replica_scrub_resched(epoch_queued, activation_index, handle);
+ pg->unlock();
+}
+
+void PGScrubReplicaPushes::run([[maybe_unused]] OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_replica_pushes(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubScrubFinished::run([[maybe_unused]] OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_scrub_is_finished(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubGetNextChunk::run([[maybe_unused]] OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_get_next_chunk(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubChunkIsBusy::run([[maybe_unused]] OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_chunk_busy(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGScrubChunkIsFree::run([[maybe_unused]] OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle)
+{
+ pg->scrub_send_chunk_free(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGRecovery::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle);
+ pg->unlock();
+}
+
+void PGRecoveryContext::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ c.release()->complete(handle);
+ pg->unlock();
+}
+
+void PGDelete::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->dequeue_delete(sdata, pg.get(), epoch_queued, handle);
+}
+
+void PGRecoveryMsg::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->dequeue_op(pg, op, handle);
+ pg->unlock();
+}
+
+}
diff --git a/src/osd/scheduler/OpSchedulerItem.h b/src/osd/scheduler/OpSchedulerItem.h
new file mode 100644
index 000000000..7ba59838e
--- /dev/null
+++ b/src/osd/scheduler/OpSchedulerItem.h
@@ -0,0 +1,629 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <ostream>
+
+#include "include/types.h"
+#include "include/utime.h"
+#include "osd/OpRequest.h"
+#include "osd/PG.h"
+#include "osd/PGPeeringEvent.h"
+#include "messages/MOSDOp.h"
+
+
+class OSD;
+class OSDShard;
+
+namespace ceph::osd::scheduler {
+
+enum class op_scheduler_class : uint8_t {
+ background_recovery = 0,
+ background_best_effort,
+ immediate,
+ client,
+};
+
+class OpSchedulerItem {
+public:
+ class OrderLocker {
+ public:
+ using Ref = std::unique_ptr<OrderLocker>;
+ virtual void lock() = 0;
+ virtual void unlock() = 0;
+ virtual ~OrderLocker() {}
+ };
+
+ // Abstraction for operations queueable in the op queue
+ class OpQueueable {
+ public:
+ enum class op_type_t {
+ client_op,
+ peering_event,
+ bg_snaptrim,
+ bg_recovery,
+ bg_scrub,
+ bg_pg_delete
+ };
+ using Ref = std::unique_ptr<OpQueueable>;
+
+ /// Items with the same queue token will end up in the same shard
+ virtual uint32_t get_queue_token() const = 0;
+
+ /* Items will be dequeued and locked atomically w.r.t. other items with the
+ * same ordering token */
+ virtual const spg_t& get_ordering_token() const = 0;
+ virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0;
+ virtual op_type_t get_op_type() const = 0;
+ virtual std::optional<OpRequestRef> maybe_get_op() const {
+ return std::nullopt;
+ }
+
+ virtual uint64_t get_reserved_pushes() const {
+ return 0;
+ }
+
+ virtual bool is_peering() const {
+ return false;
+ }
+ virtual bool peering_requires_pg() const {
+ ceph_abort();
+ }
+ virtual const PGCreateInfo *creates_pg() const {
+ return nullptr;
+ }
+
+ virtual std::ostream &print(std::ostream &rhs) const = 0;
+
+ virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
+ virtual op_scheduler_class get_scheduler_class() const = 0;
+
+ virtual ~OpQueueable() {}
+ friend std::ostream& operator<<(std::ostream& out, const OpQueueable& q) {
+ return q.print(out);
+ }
+
+ };
+
+private:
+ OpQueueable::Ref qitem;
+ int cost;
+ unsigned priority;
+ utime_t start_time;
+ uint64_t owner; ///< global id (e.g., client.XXX)
+ epoch_t map_epoch; ///< an epoch we expect the PG to exist in
+
+public:
+ OpSchedulerItem(
+ OpQueueable::Ref &&item,
+ int cost,
+ unsigned priority,
+ utime_t start_time,
+ uint64_t owner,
+ epoch_t e)
+ : qitem(std::move(item)),
+ cost(cost),
+ priority(priority),
+ start_time(start_time),
+ owner(owner),
+ map_epoch(e)
+ {}
+ OpSchedulerItem(OpSchedulerItem &&) = default;
+ OpSchedulerItem(const OpSchedulerItem &) = delete;
+ OpSchedulerItem &operator=(OpSchedulerItem &&) = default;
+ OpSchedulerItem &operator=(const OpSchedulerItem &) = delete;
+
+ OrderLocker::Ref get_order_locker(PGRef pg) {
+ return qitem->get_order_locker(pg);
+ }
+ uint32_t get_queue_token() const {
+ return qitem->get_queue_token();
+ }
+ const spg_t& get_ordering_token() const {
+ return qitem->get_ordering_token();
+ }
+ using op_type_t = OpQueueable::op_type_t;
+ OpQueueable::op_type_t get_op_type() const {
+ return qitem->get_op_type();
+ }
+ std::optional<OpRequestRef> maybe_get_op() const {
+ return qitem->maybe_get_op();
+ }
+ uint64_t get_reserved_pushes() const {
+ return qitem->get_reserved_pushes();
+ }
+ void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) {
+ qitem->run(osd, sdata, pg, handle);
+ }
+ unsigned get_priority() const { return priority; }
+ int get_cost() const { return cost; }
+ utime_t get_start_time() const { return start_time; }
+ uint64_t get_owner() const { return owner; }
+ epoch_t get_map_epoch() const { return map_epoch; }
+
+ bool is_peering() const {
+ return qitem->is_peering();
+ }
+
+ const PGCreateInfo *creates_pg() const {
+ return qitem->creates_pg();
+ }
+
+ bool peering_requires_pg() const {
+ return qitem->peering_requires_pg();
+ }
+
+ op_scheduler_class get_scheduler_class() const {
+ return qitem->get_scheduler_class();
+ }
+
+ friend std::ostream& operator<<(std::ostream& out, const OpSchedulerItem& item) {
+ out << "OpSchedulerItem("
+ << item.get_ordering_token() << " " << *item.qitem
+ << " prio " << item.get_priority()
+ << " cost " << item.get_cost()
+ << " e" << item.get_map_epoch();
+ if (item.get_reserved_pushes()) {
+ out << " reserved_pushes " << item.get_reserved_pushes();
+ }
+ return out << ")";
+ }
+}; // class OpSchedulerItem
+
+/// Implements boilerplate for operations queued for the pg lock
+class PGOpQueueable : public OpSchedulerItem::OpQueueable {
+ spg_t pgid;
+protected:
+ const spg_t& get_pgid() const {
+ return pgid;
+ }
+public:
+ explicit PGOpQueueable(spg_t pg) : pgid(pg) {}
+ uint32_t get_queue_token() const final {
+ return get_pgid().ps();
+ }
+
+ const spg_t& get_ordering_token() const final {
+ return get_pgid();
+ }
+
+ OpSchedulerItem::OrderLocker::Ref get_order_locker(PGRef pg) final {
+ class Locker : public OpSchedulerItem::OrderLocker {
+ PGRef pg;
+ public:
+ explicit Locker(PGRef pg) : pg(pg) {}
+ void lock() final {
+ pg->lock();
+ }
+ void unlock() final {
+ pg->unlock();
+ }
+ };
+ return OpSchedulerItem::OrderLocker::Ref(
+ new Locker(pg));
+ }
+};
+
+class PGOpItem : public PGOpQueueable {
+ OpRequestRef op;
+
+ const MOSDOp *maybe_get_mosd_op() const {
+ auto req = op->get_req();
+ if (req->get_type() == CEPH_MSG_OSD_OP) {
+ return op->get_req<MOSDOp>();
+ } else {
+ return nullptr;
+ }
+ }
+
+public:
+ PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {}
+ op_type_t get_op_type() const final {
+
+ return op_type_t::client_op;
+ }
+
+ std::ostream &print(std::ostream &rhs) const final {
+ return rhs << "PGOpItem(op=" << *(op->get_req()) << ")";
+ }
+
+ std::optional<OpRequestRef> maybe_get_op() const final {
+ return op;
+ }
+
+ op_scheduler_class get_scheduler_class() const final {
+ auto type = op->get_req()->get_type();
+ if (type == CEPH_MSG_OSD_OP ||
+ type == CEPH_MSG_OSD_BACKOFF) {
+ return op_scheduler_class::client;
+ } else {
+ return op_scheduler_class::immediate;
+ }
+ }
+
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+};
+
+class PGPeeringItem : public PGOpQueueable {
+ PGPeeringEventRef evt;
+public:
+ PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::peering_event;
+ }
+ std::ostream &print(std::ostream &rhs) const final {
+ return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
+ }
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ bool is_peering() const override {
+ return true;
+ }
+ bool peering_requires_pg() const override {
+ return evt->requires_pg;
+ }
+ const PGCreateInfo *creates_pg() const override {
+ return evt->create_info.get();
+ }
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::immediate;
+ }
+};
+
+class PGSnapTrim : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGSnapTrim(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_snaptrim;
+ }
+ std::ostream &print(std::ostream &rhs) const final {
+ return rhs << "PGSnapTrim(pgid=" << get_pgid()
+ << " epoch_queued=" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_best_effort;
+ }
+};
+
+class PGScrub : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGScrub(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_scrub;
+ }
+ std::ostream &print(std::ostream &rhs) const final {
+ return rhs << "PGScrub(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_best_effort;
+ }
+};
+
+class PGScrubItem : public PGOpQueueable {
+ protected:
+ epoch_t epoch_queued;
+ Scrub::act_token_t activation_index;
+ std::string_view message_name;
+ PGScrubItem(spg_t pg, epoch_t epoch_queued, std::string_view derivative_name)
+ : PGOpQueueable{pg}
+ , epoch_queued{epoch_queued}
+ , activation_index{0}
+ , message_name{derivative_name}
+ {}
+ PGScrubItem(spg_t pg,
+ epoch_t epoch_queued,
+ Scrub::act_token_t op_index,
+ std::string_view derivative_name)
+ : PGOpQueueable{pg}
+ , epoch_queued{epoch_queued}
+ , activation_index{op_index}
+ , message_name{derivative_name}
+ {}
+ op_type_t get_op_type() const final { return op_type_t::bg_scrub; }
+ std::ostream& print(std::ostream& rhs) const final
+ {
+ return rhs << message_name << "(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << " scrub-token=" << activation_index << ")";
+ }
+ void run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle& handle) override = 0;
+ op_scheduler_class get_scheduler_class() const final
+ {
+ return op_scheduler_class::background_best_effort;
+ }
+};
+
+class PGScrubResched : public PGScrubItem {
+ public:
+ PGScrubResched(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubResched"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+/**
+ * all replicas have granted our scrub resources request
+ */
+class PGScrubResourcesOK : public PGScrubItem {
+ public:
+ PGScrubResourcesOK(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubResourcesOK"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+/**
+ * scrub resources requests denied by replica(s)
+ */
+class PGScrubDenied : public PGScrubItem {
+ public:
+ PGScrubDenied(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubDenied"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+/**
+ * called when a repair process completes, to initiate scrubbing. No local/remote
+ * resources are allocated.
+ */
+class PGScrubAfterRepair : public PGScrubItem {
+ public:
+ PGScrubAfterRepair(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubAfterRepair"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubPushesUpdate : public PGScrubItem {
+ public:
+ PGScrubPushesUpdate(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubPushesUpdate"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubAppliedUpdate : public PGScrubItem {
+ public:
+ PGScrubAppliedUpdate(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubAppliedUpdate"}
+ {}
+ void run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ [[maybe_unused]] ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubUnblocked : public PGScrubItem {
+ public:
+ PGScrubUnblocked(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubUnblocked"}
+ {}
+ void run(OSD* osd,
+ OSDShard* sdata,
+ PGRef& pg,
+ [[maybe_unused]] ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubDigestUpdate : public PGScrubItem {
+ public:
+ PGScrubDigestUpdate(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubDigestUpdate"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubGotLocalMap : public PGScrubItem {
+ public:
+ PGScrubGotLocalMap(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubGotLocalMap"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubGotReplMaps : public PGScrubItem {
+ public:
+ PGScrubGotReplMaps(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubGotReplMaps"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubMapsCompared : public PGScrubItem {
+ public:
+ PGScrubMapsCompared(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubMapsCompared"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGRepScrub : public PGScrubItem {
+ public:
+ PGRepScrub(spg_t pg, epoch_t epoch_queued, Scrub::act_token_t op_token)
+ : PGScrubItem{pg, epoch_queued, op_token, "PGRepScrub"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGRepScrubResched : public PGScrubItem {
+ public:
+ PGRepScrubResched(spg_t pg, epoch_t epoch_queued, Scrub::act_token_t op_token)
+ : PGScrubItem{pg, epoch_queued, op_token, "PGRepScrubResched"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubReplicaPushes : public PGScrubItem {
+ public:
+ PGScrubReplicaPushes(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubReplicaPushes"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubScrubFinished : public PGScrubItem {
+ public:
+ PGScrubScrubFinished(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubScrubFinished"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubGetNextChunk : public PGScrubItem {
+ public:
+ PGScrubGetNextChunk(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubGetNextChunk"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubChunkIsBusy : public PGScrubItem {
+ public:
+ PGScrubChunkIsBusy(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubChunkIsBusy"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGScrubChunkIsFree : public PGScrubItem {
+ public:
+ PGScrubChunkIsFree(spg_t pg, epoch_t epoch_queued)
+ : PGScrubItem{pg, epoch_queued, "PGScrubChunkIsFree"}
+ {}
+ void run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle) final;
+};
+
+class PGRecovery : public PGOpQueueable {
+ epoch_t epoch_queued;
+ uint64_t reserved_pushes;
+public:
+ PGRecovery(
+ spg_t pg,
+ epoch_t epoch_queued,
+ uint64_t reserved_pushes)
+ : PGOpQueueable(pg),
+ epoch_queued(epoch_queued),
+ reserved_pushes(reserved_pushes) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_recovery;
+ }
+ std::ostream &print(std::ostream &rhs) const final {
+ return rhs << "PGRecovery(pgid=" << get_pgid()
+ << " epoch_queued=" << epoch_queued
+ << " reserved_pushes=" << reserved_pushes
+ << ")";
+ }
+ uint64_t get_reserved_pushes() const final {
+ return reserved_pushes;
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_recovery;
+ }
+};
+
+class PGRecoveryContext : public PGOpQueueable {
+ std::unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
+ epoch_t epoch;
+public:
+ PGRecoveryContext(spg_t pgid,
+ GenContext<ThreadPool::TPHandle&> *c, epoch_t epoch)
+ : PGOpQueueable(pgid),
+ c(c), epoch(epoch) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_recovery;
+ }
+ std::ostream &print(std::ostream &rhs) const final {
+ return rhs << "PGRecoveryContext(pgid=" << get_pgid()
+ << " c=" << c.get() << " epoch=" << epoch
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_recovery;
+ }
+};
+
+class PGDelete : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGDelete(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg),
+ epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_pg_delete;
+ }
+ std::ostream &print(std::ostream &rhs) const final {
+ return rhs << "PGDelete(" << get_pgid()
+ << " e" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_best_effort;
+ }
+};
+
+class PGRecoveryMsg : public PGOpQueueable {
+ OpRequestRef op;
+
+public:
+ PGRecoveryMsg(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_recovery;
+ }
+
+ std::ostream &print(std::ostream &rhs) const final {
+ return rhs << "PGRecoveryMsg(op=" << *(op->get_req()) << ")";
+ }
+
+ std::optional<OpRequestRef> maybe_get_op() const final {
+ return op;
+ }
+
+ op_scheduler_class get_scheduler_class() const final {
+ auto priority = op->get_req()->get_priority();
+ if (priority >= CEPH_MSG_PRIO_HIGH) {
+ return op_scheduler_class::immediate;
+ }
+ return op_scheduler_class::background_recovery;
+ }
+
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+};
+
+}
diff --git a/src/osd/scheduler/mClockScheduler.cc b/src/osd/scheduler/mClockScheduler.cc
new file mode 100644
index 000000000..f2f0ffc3d
--- /dev/null
+++ b/src/osd/scheduler/mClockScheduler.cc
@@ -0,0 +1,514 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <memory>
+#include <functional>
+
+#include "osd/scheduler/mClockScheduler.h"
+#include "common/dout.h"
+
+namespace dmc = crimson::dmclock;
+using namespace std::placeholders;
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout << "mClockScheduler: "
+
+
+namespace ceph::osd::scheduler {
+
+mClockScheduler::mClockScheduler(CephContext *cct,
+ uint32_t num_shards,
+ bool is_rotational)
+ : cct(cct),
+ num_shards(num_shards),
+ is_rotational(is_rotational),
+ scheduler(
+ std::bind(&mClockScheduler::ClientRegistry::get_info,
+ &client_registry,
+ _1),
+ dmc::AtLimit::Wait,
+ cct->_conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
+{
+ cct->_conf.add_observer(this);
+ ceph_assert(num_shards > 0);
+ set_max_osd_capacity();
+ set_osd_mclock_cost_per_io();
+ set_osd_mclock_cost_per_byte();
+ set_mclock_profile();
+ enable_mclock_profile_settings();
+ client_registry.update_from_config(cct->_conf);
+}
+
+void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+{
+ default_external_client_info.update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_lim"));
+
+ internal_client_infos[
+ static_cast<size_t>(op_scheduler_class::background_recovery)].update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_lim"));
+
+ internal_client_infos[
+ static_cast<size_t>(op_scheduler_class::background_best_effort)].update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_lim"));
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
+ const client_profile_id_t &client) const
+{
+ auto ret = external_client_infos.find(client);
+ if (ret == external_client_infos.end())
+ return &default_external_client_info;
+ else
+ return &(ret->second);
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
+ const scheduler_id_t &id) const {
+ switch (id.class_id) {
+ case op_scheduler_class::immediate:
+ ceph_assert(0 == "Cannot schedule immediate");
+ return (dmc::ClientInfo*)nullptr;
+ case op_scheduler_class::client:
+ return get_external_client(id.client_profile_id);
+ default:
+ ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size());
+ return &internal_client_infos[static_cast<size_t>(id.class_id)];
+ }
+}
+
+void mClockScheduler::set_max_osd_capacity()
+{
+ if (is_rotational) {
+ max_osd_capacity =
+ cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_hdd");
+ } else {
+ max_osd_capacity =
+ cct->_conf.get_val<double>("osd_mclock_max_capacity_iops_ssd");
+ }
+ // Set per op-shard iops limit
+ max_osd_capacity /= num_shards;
+ dout(1) << __func__ << " #op shards: " << num_shards
+ << std::fixed << std::setprecision(2)
+ << " max osd capacity(iops) per shard: " << max_osd_capacity
+ << dendl;
+}
+
+void mClockScheduler::set_osd_mclock_cost_per_io()
+{
+ std::chrono::seconds sec(1);
+ if (cct->_conf.get_val<double>("osd_mclock_cost_per_io_usec")) {
+ osd_mclock_cost_per_io =
+ cct->_conf.get_val<double>("osd_mclock_cost_per_io_usec");
+ } else {
+ if (is_rotational) {
+ osd_mclock_cost_per_io =
+ cct->_conf.get_val<double>("osd_mclock_cost_per_io_usec_hdd");
+ // For HDDs, convert value to seconds
+ osd_mclock_cost_per_io /= std::chrono::microseconds(sec).count();
+ } else {
+ // For SSDs, convert value to milliseconds
+ osd_mclock_cost_per_io =
+ cct->_conf.get_val<double>("osd_mclock_cost_per_io_usec_ssd");
+ osd_mclock_cost_per_io /= std::chrono::milliseconds(sec).count();
+ }
+ }
+ dout(1) << __func__ << " osd_mclock_cost_per_io: "
+ << std::fixed << std::setprecision(7) << osd_mclock_cost_per_io
+ << dendl;
+}
+
+void mClockScheduler::set_osd_mclock_cost_per_byte()
+{
+ std::chrono::seconds sec(1);
+ if (cct->_conf.get_val<double>("osd_mclock_cost_per_byte_usec")) {
+ osd_mclock_cost_per_byte =
+ cct->_conf.get_val<double>("osd_mclock_cost_per_byte_usec");
+ } else {
+ if (is_rotational) {
+ osd_mclock_cost_per_byte =
+ cct->_conf.get_val<double>("osd_mclock_cost_per_byte_usec_hdd");
+ // For HDDs, convert value to seconds
+ osd_mclock_cost_per_byte /= std::chrono::microseconds(sec).count();
+ } else {
+ osd_mclock_cost_per_byte =
+ cct->_conf.get_val<double>("osd_mclock_cost_per_byte_usec_ssd");
+ // For SSDs, convert value to milliseconds
+ osd_mclock_cost_per_byte /= std::chrono::milliseconds(sec).count();
+ }
+ }
+ dout(1) << __func__ << " osd_mclock_cost_per_byte: "
+ << std::fixed << std::setprecision(7) << osd_mclock_cost_per_byte
+ << dendl;
+}
+
+void mClockScheduler::set_mclock_profile()
+{
+ mclock_profile = cct->_conf.get_val<std::string>("osd_mclock_profile");
+ dout(1) << __func__ << " mclock profile: " << mclock_profile << dendl;
+}
+
+std::string mClockScheduler::get_mclock_profile()
+{
+ return mclock_profile;
+}
+
+void mClockScheduler::set_balanced_profile_allocations()
+{
+ // Client Allocation:
+ // reservation: 40% | weight: 1 | limit: 100% |
+ // Background Recovery Allocation:
+ // reservation: 40% | weight: 1 | limit: 150% |
+ // Background Best Effort Allocation:
+ // reservation: 20% | weight: 2 | limit: max |
+
+ // Client
+ uint64_t client_res = static_cast<uint64_t>(
+ std::round(0.40 * max_osd_capacity));
+ uint64_t client_lim = static_cast<uint64_t>(
+ std::round(max_osd_capacity));
+ uint64_t client_wgt = default_min;
+
+ // Background Recovery
+ uint64_t rec_res = static_cast<uint64_t>(
+ std::round(0.40 * max_osd_capacity));
+ uint64_t rec_lim = static_cast<uint64_t>(
+ std::round(1.5 * max_osd_capacity));
+ uint64_t rec_wgt = default_min;
+
+ // Background Best Effort
+ uint64_t best_effort_res = static_cast<uint64_t>(
+ std::round(0.20 * max_osd_capacity));
+ uint64_t best_effort_lim = default_max;
+ uint64_t best_effort_wgt = 2;
+
+ // Set the allocations for the mclock clients
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::client)].update(
+ client_res,
+ client_wgt,
+ client_lim);
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::background_recovery)].update(
+ rec_res,
+ rec_wgt,
+ rec_lim);
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::background_best_effort)].update(
+ best_effort_res,
+ best_effort_wgt,
+ best_effort_lim);
+}
+
+void mClockScheduler::set_high_recovery_ops_profile_allocations()
+{
+ // Client Allocation:
+ // reservation: 30% | weight: 1 | limit: 80% |
+ // Background Recovery Allocation:
+ // reservation: 60% | weight: 2 | limit: 200% |
+ // Background Best Effort Allocation:
+ // reservation: 1 | weight: 2 | limit: max |
+
+ // Client
+ uint64_t client_res = static_cast<uint64_t>(
+ std::round(0.30 * max_osd_capacity));
+ uint64_t client_lim = static_cast<uint64_t>(
+ std::round(0.80 * max_osd_capacity));
+ uint64_t client_wgt = default_min;
+
+ // Background Recovery
+ uint64_t rec_res = static_cast<uint64_t>(
+ std::round(0.60 * max_osd_capacity));
+ uint64_t rec_lim = static_cast<uint64_t>(
+ std::round(2.0 * max_osd_capacity));
+ uint64_t rec_wgt = 2;
+
+ // Background Best Effort
+ uint64_t best_effort_res = default_min;
+ uint64_t best_effort_lim = default_max;
+ uint64_t best_effort_wgt = 2;
+
+ // Set the allocations for the mclock clients
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::client)].update(
+ client_res,
+ client_wgt,
+ client_lim);
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::background_recovery)].update(
+ rec_res,
+ rec_wgt,
+ rec_lim);
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::background_best_effort)].update(
+ best_effort_res,
+ best_effort_wgt,
+ best_effort_lim);
+}
+
+void mClockScheduler::set_high_client_ops_profile_allocations()
+{
+ // Client Allocation:
+ // reservation: 50% | weight: 2 | limit: max |
+ // Background Recovery Allocation:
+ // reservation: 25% | weight: 1 | limit: 100% |
+ // Background Best Effort Allocation:
+ // reservation: 25% | weight: 2 | limit: max |
+
+ // Client
+ uint64_t client_res = static_cast<uint64_t>(
+ std::round(0.50 * max_osd_capacity));
+ uint64_t client_wgt = 2;
+ uint64_t client_lim = default_max;
+
+ // Background Recovery
+ uint64_t rec_res = static_cast<uint64_t>(
+ std::round(0.25 * max_osd_capacity));
+ uint64_t rec_lim = static_cast<uint64_t>(
+ std::round(max_osd_capacity));
+ uint64_t rec_wgt = default_min;
+
+ // Background Best Effort
+ uint64_t best_effort_res = static_cast<uint64_t>(
+ std::round(0.25 * max_osd_capacity));
+ uint64_t best_effort_lim = default_max;
+ uint64_t best_effort_wgt = 2;
+
+ // Set the allocations for the mclock clients
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::client)].update(
+ client_res,
+ client_wgt,
+ client_lim);
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::background_recovery)].update(
+ rec_res,
+ rec_wgt,
+ rec_lim);
+ client_allocs[
+ static_cast<size_t>(op_scheduler_class::background_best_effort)].update(
+ best_effort_res,
+ best_effort_wgt,
+ best_effort_lim);
+}
+
+void mClockScheduler::enable_mclock_profile_settings()
+{
+ // Nothing to do for "custom" profile
+ if (mclock_profile == "custom") {
+ return;
+ }
+
+ // Set mclock and ceph config options for the chosen profile
+ if (mclock_profile == "balanced") {
+ set_balanced_profile_allocations();
+ } else if (mclock_profile == "high_recovery_ops") {
+ set_high_recovery_ops_profile_allocations();
+ } else if (mclock_profile == "high_client_ops") {
+ set_high_client_ops_profile_allocations();
+ } else {
+ ceph_assert("Invalid choice of mclock profile" == 0);
+ return;
+ }
+
+ // Set the mclock config parameters
+ set_profile_config();
+}
+
+void mClockScheduler::set_profile_config()
+{
+ ClientAllocs client = client_allocs[
+ static_cast<size_t>(op_scheduler_class::client)];
+ ClientAllocs rec = client_allocs[
+ static_cast<size_t>(op_scheduler_class::background_recovery)];
+ ClientAllocs best_effort = client_allocs[
+ static_cast<size_t>(op_scheduler_class::background_best_effort)];
+
+ // Set external client params
+ cct->_conf.set_val("osd_mclock_scheduler_client_res",
+ std::to_string(client.res));
+ cct->_conf.set_val("osd_mclock_scheduler_client_wgt",
+ std::to_string(client.wgt));
+ cct->_conf.set_val("osd_mclock_scheduler_client_lim",
+ std::to_string(client.lim));
+
+ // Set background recovery client params
+ cct->_conf.set_val("osd_mclock_scheduler_background_recovery_res",
+ std::to_string(rec.res));
+ cct->_conf.set_val("osd_mclock_scheduler_background_recovery_wgt",
+ std::to_string(rec.wgt));
+ cct->_conf.set_val("osd_mclock_scheduler_background_recovery_lim",
+ std::to_string(rec.lim));
+
+ // Set background best effort client params
+ cct->_conf.set_val("osd_mclock_scheduler_background_best_effort_res",
+ std::to_string(best_effort.res));
+ cct->_conf.set_val("osd_mclock_scheduler_background_best_effort_wgt",
+ std::to_string(best_effort.wgt));
+ cct->_conf.set_val("osd_mclock_scheduler_background_best_effort_lim",
+ std::to_string(best_effort.lim));
+}
+
+int mClockScheduler::calc_scaled_cost(int item_cost)
+{
+ // Calculate total scaled cost in secs
+ int scaled_cost =
+ std::round(osd_mclock_cost_per_io + (osd_mclock_cost_per_byte * item_cost));
+ return std::max(scaled_cost, 1);
+}
+
+void mClockScheduler::update_configuration()
+{
+ // Apply configuration change. The expectation is that
+ // at least one of the tracked mclock config option keys
+ // is modified before calling this method.
+ cct->_conf.apply_changes(nullptr);
+}
+
+void mClockScheduler::dump(ceph::Formatter &f) const
+{
+}
+
+void mClockScheduler::enqueue(OpSchedulerItem&& item)
+{
+ auto id = get_scheduler_id(item);
+
+ // TODO: move this check into OpSchedulerItem, handle backwards compat
+ if (op_scheduler_class::immediate == id.class_id) {
+ immediate.push_front(std::move(item));
+ } else {
+ int cost = calc_scaled_cost(item.get_cost());
+ // Add item to scheduler queue
+ scheduler.add_request(
+ std::move(item),
+ id,
+ cost);
+ }
+}
+
+void mClockScheduler::enqueue_front(OpSchedulerItem&& item)
+{
+ immediate.push_back(std::move(item));
+ // TODO: item may not be immediate, update mclock machinery to permit
+ // putting the item back in the queue
+}
+
+WorkItem mClockScheduler::dequeue()
+{
+ if (!immediate.empty()) {
+ WorkItem work_item{std::move(immediate.back())};
+ immediate.pop_back();
+ return work_item;
+ } else {
+ mclock_queue_t::PullReq result = scheduler.pull_request();
+ if (result.is_future()) {
+ return result.getTime();
+ } else if (result.is_none()) {
+ ceph_assert(
+ 0 == "Impossible, must have checked empty() first");
+ return {};
+ } else {
+ ceph_assert(result.is_retn());
+
+ auto &retn = result.get_retn();
+ return std::move(*retn.request);
+ }
+ }
+}
+
+const char** mClockScheduler::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "osd_mclock_scheduler_client_res",
+ "osd_mclock_scheduler_client_wgt",
+ "osd_mclock_scheduler_client_lim",
+ "osd_mclock_scheduler_background_recovery_res",
+ "osd_mclock_scheduler_background_recovery_wgt",
+ "osd_mclock_scheduler_background_recovery_lim",
+ "osd_mclock_scheduler_background_best_effort_res",
+ "osd_mclock_scheduler_background_best_effort_wgt",
+ "osd_mclock_scheduler_background_best_effort_lim",
+ "osd_mclock_cost_per_io_usec",
+ "osd_mclock_cost_per_io_usec_hdd",
+ "osd_mclock_cost_per_io_usec_ssd",
+ "osd_mclock_cost_per_byte_usec",
+ "osd_mclock_cost_per_byte_usec_hdd",
+ "osd_mclock_cost_per_byte_usec_ssd",
+ "osd_mclock_max_capacity_iops_hdd",
+ "osd_mclock_max_capacity_iops_ssd",
+ "osd_mclock_profile",
+ NULL
+ };
+ return KEYS;
+}
+
+void mClockScheduler::handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set<std::string> &changed)
+{
+ if (changed.count("osd_mclock_cost_per_io_usec") ||
+ changed.count("osd_mclock_cost_per_io_usec_hdd") ||
+ changed.count("osd_mclock_cost_per_io_usec_ssd")) {
+ set_osd_mclock_cost_per_io();
+ }
+ if (changed.count("osd_mclock_cost_per_byte_usec") ||
+ changed.count("osd_mclock_cost_per_byte_usec_hdd") ||
+ changed.count("osd_mclock_cost_per_byte_usec_ssd")) {
+ set_osd_mclock_cost_per_byte();
+ }
+ if (changed.count("osd_mclock_max_capacity_iops_hdd") ||
+ changed.count("osd_mclock_max_capacity_iops_ssd")) {
+ set_max_osd_capacity();
+ if (mclock_profile != "custom") {
+ enable_mclock_profile_settings();
+ client_registry.update_from_config(conf);
+ }
+ }
+ if (changed.count("osd_mclock_profile")) {
+ set_mclock_profile();
+ if (mclock_profile != "custom") {
+ enable_mclock_profile_settings();
+ client_registry.update_from_config(conf);
+ }
+ }
+ if (changed.count("osd_mclock_scheduler_client_res") ||
+ changed.count("osd_mclock_scheduler_client_wgt") ||
+ changed.count("osd_mclock_scheduler_client_lim") ||
+ changed.count("osd_mclock_scheduler_background_recovery_res") ||
+ changed.count("osd_mclock_scheduler_background_recovery_wgt") ||
+ changed.count("osd_mclock_scheduler_background_recovery_lim") ||
+ changed.count("osd_mclock_scheduler_background_best_effort_res") ||
+ changed.count("osd_mclock_scheduler_background_best_effort_wgt") ||
+ changed.count("osd_mclock_scheduler_background_best_effort_lim")) {
+ if (mclock_profile == "custom") {
+ client_registry.update_from_config(conf);
+ }
+ }
+}
+
+mClockScheduler::~mClockScheduler()
+{
+ cct->_conf.remove_observer(this);
+}
+
+}
diff --git a/src/osd/scheduler/mClockScheduler.h b/src/osd/scheduler/mClockScheduler.h
new file mode 100644
index 000000000..32f3851ec
--- /dev/null
+++ b/src/osd/scheduler/mClockScheduler.h
@@ -0,0 +1,204 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include <ostream>
+#include <map>
+#include <vector>
+
+#include "boost/variant.hpp"
+
+#include "dmclock/src/dmclock_server.h"
+
+#include "osd/scheduler/OpScheduler.h"
+#include "common/config.h"
+#include "include/cmp.h"
+#include "common/ceph_context.h"
+#include "common/mClockPriorityQueue.h"
+#include "osd/scheduler/OpSchedulerItem.h"
+
+
+namespace ceph::osd::scheduler {
+
+constexpr uint64_t default_min = 1;
+constexpr uint64_t default_max = 999999;
+
+using client_id_t = uint64_t;
+using profile_id_t = uint64_t;
+
+struct client_profile_id_t {
+ client_id_t client_id;
+ profile_id_t profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+
+
+struct scheduler_id_t {
+ op_scheduler_class class_id;
+ client_profile_id_t client_profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+
+/**
+ * Scheduler implementation based on mclock.
+ *
+ * TODO: explain configs
+ */
+class mClockScheduler : public OpScheduler, md_config_obs_t {
+
+ CephContext *cct;
+ const uint32_t num_shards;
+ bool is_rotational;
+ double max_osd_capacity;
+ double osd_mclock_cost_per_io;
+ double osd_mclock_cost_per_byte;
+ std::string mclock_profile = "high_client_ops";
+ struct ClientAllocs {
+ uint64_t res;
+ uint64_t wgt;
+ uint64_t lim;
+
+ ClientAllocs(uint64_t _res, uint64_t _wgt, uint64_t _lim) {
+ update(_res, _wgt, _lim);
+ }
+
+ inline void update(uint64_t _res, uint64_t _wgt, uint64_t _lim) {
+ res = _res;
+ wgt = _wgt;
+ lim = _lim;
+ }
+ };
+ std::array<
+ ClientAllocs,
+ static_cast<size_t>(op_scheduler_class::client) + 1
+ > client_allocs = {
+ // Placeholder, get replaced with configured values
+ ClientAllocs(1, 1, 1), // background_recovery
+ ClientAllocs(1, 1, 1), // background_best_effort
+ ClientAllocs(1, 1, 1), // immediate (not used)
+ ClientAllocs(1, 1, 1) // client
+ };
+ class ClientRegistry {
+ std::array<
+ crimson::dmclock::ClientInfo,
+ static_cast<size_t>(op_scheduler_class::immediate)
+ > internal_client_infos = {
+ // Placeholder, gets replaced with configured values
+ crimson::dmclock::ClientInfo(1, 1, 1),
+ crimson::dmclock::ClientInfo(1, 1, 1)
+ };
+
+ crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1};
+ std::map<client_profile_id_t,
+ crimson::dmclock::ClientInfo> external_client_infos;
+ const crimson::dmclock::ClientInfo *get_external_client(
+ const client_profile_id_t &client) const;
+ public:
+ void update_from_config(const ConfigProxy &conf);
+ const crimson::dmclock::ClientInfo *get_info(
+ const scheduler_id_t &id) const;
+ } client_registry;
+
+ using mclock_queue_t = crimson::dmclock::PullPriorityQueue<
+ scheduler_id_t,
+ OpSchedulerItem,
+ true,
+ true,
+ 2>;
+ mclock_queue_t scheduler;
+ std::list<OpSchedulerItem> immediate;
+
+ static scheduler_id_t get_scheduler_id(const OpSchedulerItem &item) {
+ return scheduler_id_t{
+ item.get_scheduler_class(),
+ client_profile_id_t{
+ item.get_owner(),
+ 0
+ }
+ };
+ }
+
+public:
+ mClockScheduler(CephContext *cct, uint32_t num_shards, bool is_rotational);
+ ~mClockScheduler() override;
+
+ // Set the max osd capacity in iops
+ void set_max_osd_capacity();
+
+ // Set the cost per io for the osd
+ void set_osd_mclock_cost_per_io();
+
+ // Set the cost per byte for the osd
+ void set_osd_mclock_cost_per_byte();
+
+ // Set the mclock profile type to enable
+ void set_mclock_profile();
+
+ // Get the active mclock profile
+ std::string get_mclock_profile();
+
+ // Set "balanced" profile allocations
+ void set_balanced_profile_allocations();
+
+ // Set "high_recovery_ops" profile allocations
+ void set_high_recovery_ops_profile_allocations();
+
+ // Set "high_client_ops" profile allocations
+ void set_high_client_ops_profile_allocations();
+
+ // Set the mclock related config params based on the profile
+ void enable_mclock_profile_settings();
+
+ // Set mclock config parameter based on allocations
+ void set_profile_config();
+
+ // Calculate scale cost per item
+ int calc_scaled_cost(int cost);
+
+ // Enqueue op in the back of the regular queue
+ void enqueue(OpSchedulerItem &&item) final;
+
+ // Enqueue the op in the front of the regular queue
+ void enqueue_front(OpSchedulerItem &&item) final;
+
+ // Return an op to be dispatch
+ WorkItem dequeue() final;
+
+ // Returns if the queue is empty
+ bool empty() const final {
+ return immediate.empty() && scheduler.empty();
+ }
+
+ // Formatted output of the queue
+ void dump(ceph::Formatter &f) const final;
+
+ void print(std::ostream &ostream) const final {
+ ostream << "mClockScheduler";
+ }
+
+ // Update data associated with the modified mclock config key(s)
+ void update_configuration() final;
+
+ const char** get_tracked_conf_keys() const final;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) final;
+};
+
+}