summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/scheduler')
-rw-r--r--src/crimson/osd/scheduler/mclock_scheduler.cc165
-rw-r--r--src/crimson/osd/scheduler/mclock_scheduler.h130
-rw-r--r--src/crimson/osd/scheduler/scheduler.cc181
-rw-r--r--src/crimson/osd/scheduler/scheduler.h82
4 files changed, 558 insertions, 0 deletions
diff --git a/src/crimson/osd/scheduler/mclock_scheduler.cc b/src/crimson/osd/scheduler/mclock_scheduler.cc
new file mode 100644
index 000000000..195ea8dd8
--- /dev/null
+++ b/src/crimson/osd/scheduler/mclock_scheduler.cc
@@ -0,0 +1,165 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <memory>
+#include <functional>
+
+#include "crimson/osd/scheduler/mclock_scheduler.h"
+#include "common/dout.h"
+
+namespace dmc = crimson::dmclock;
+using namespace std::placeholders;
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout
+
+
+namespace crimson::osd::scheduler {
+
+mClockScheduler::mClockScheduler(ConfigProxy &conf) :
+ scheduler(
+ std::bind(&mClockScheduler::ClientRegistry::get_info,
+ &client_registry,
+ _1),
+ dmc::AtLimit::Allow,
+ conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
+{
+ conf.add_observer(this);
+ client_registry.update_from_config(conf);
+}
+
+void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+{
+ default_external_client_info.update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_client_lim"));
+
+ internal_client_infos[
+ static_cast<size_t>(scheduler_class_t::background_recovery)].update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_lim"));
+
+ internal_client_infos[
+ static_cast<size_t>(scheduler_class_t::background_best_effort)].update(
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_res"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
+ conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_lim"));
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
+ const client_profile_id_t &client) const
+{
+ auto ret = external_client_infos.find(client);
+ if (ret == external_client_infos.end())
+ return &default_external_client_info;
+ else
+ return &(ret->second);
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
+ const scheduler_id_t &id) const {
+ switch (id.class_id) {
+ case scheduler_class_t::immediate:
+ ceph_assert(0 == "Cannot schedule immediate");
+ return (dmc::ClientInfo*)nullptr;
+ case scheduler_class_t::repop:
+ case scheduler_class_t::client:
+ return get_external_client(id.client_profile_id);
+ default:
+ ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size());
+ return &internal_client_infos[static_cast<size_t>(id.class_id)];
+ }
+}
+
+void mClockScheduler::dump(ceph::Formatter &f) const
+{
+}
+
+void mClockScheduler::enqueue(item_t&& item)
+{
+ auto id = get_scheduler_id(item);
+ auto cost = item.params.cost;
+
+ if (scheduler_class_t::immediate == item.params.klass) {
+ immediate.push_front(std::move(item));
+ } else {
+ scheduler.add_request(
+ std::move(item),
+ id,
+ cost);
+ }
+}
+
+void mClockScheduler::enqueue_front(item_t&& item)
+{
+ immediate.push_back(std::move(item));
+ // TODO: item may not be immediate, update mclock machinery to permit
+ // putting the item back in the queue
+}
+
+item_t mClockScheduler::dequeue()
+{
+ if (!immediate.empty()) {
+ auto ret = std::move(immediate.back());
+ immediate.pop_back();
+ return ret;
+ } else {
+ mclock_queue_t::PullReq result = scheduler.pull_request();
+ if (result.is_future()) {
+ ceph_assert(
+ 0 == "Not implemented, user would have to be able to be woken up");
+ return std::move(*(item_t*)nullptr);
+ } else if (result.is_none()) {
+ ceph_assert(
+ 0 == "Impossible, must have checked empty() first");
+ return std::move(*(item_t*)nullptr);
+ } else {
+ ceph_assert(result.is_retn());
+
+ auto &retn = result.get_retn();
+ return std::move(*retn.request);
+ }
+ }
+}
+
+const char** mClockScheduler::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "osd_mclock_scheduler_client_res",
+ "osd_mclock_scheduler_client_wgt",
+ "osd_mclock_scheduler_client_lim",
+ "osd_mclock_scheduler_background_recovery_res",
+ "osd_mclock_scheduler_background_recovery_wgt",
+ "osd_mclock_scheduler_background_recovery_lim",
+ "osd_mclock_scheduler_background_best_effort_res",
+ "osd_mclock_scheduler_background_best_effort_wgt",
+ "osd_mclock_scheduler_background_best_effort_lim",
+ NULL
+ };
+ return KEYS;
+}
+
+void mClockScheduler::handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set<std::string> &changed)
+{
+ client_registry.update_from_config(conf);
+}
+
+}
diff --git a/src/crimson/osd/scheduler/mclock_scheduler.h b/src/crimson/osd/scheduler/mclock_scheduler.h
new file mode 100644
index 000000000..c3edbe729
--- /dev/null
+++ b/src/crimson/osd/scheduler/mclock_scheduler.h
@@ -0,0 +1,130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include <ostream>
+#include <map>
+#include <vector>
+
+#include "boost/variant.hpp"
+
+#include "dmclock/src/dmclock_server.h"
+
+#include "crimson/osd/scheduler/scheduler.h"
+#include "common/config.h"
+#include "include/cmp.h"
+#include "common/ceph_context.h"
+
+
+namespace crimson::osd::scheduler {
+
+using client_id_t = uint64_t;
+using profile_id_t = uint64_t;
+
+struct client_profile_id_t {
+ client_id_t client_id;
+ profile_id_t profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+
+
+struct scheduler_id_t {
+ scheduler_class_t class_id;
+ client_profile_id_t client_profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+
+/**
+ * Scheduler implementation based on mclock.
+ *
+ * TODO: explain configs
+ */
+class mClockScheduler : public Scheduler, md_config_obs_t {
+
+ class ClientRegistry {
+ std::array<
+ crimson::dmclock::ClientInfo,
+ static_cast<size_t>(scheduler_class_t::client)
+ > internal_client_infos = {
+ // Placeholder, gets replaced with configured values
+ crimson::dmclock::ClientInfo(1, 1, 1),
+ crimson::dmclock::ClientInfo(1, 1, 1)
+ };
+
+ crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1};
+ std::map<client_profile_id_t,
+ crimson::dmclock::ClientInfo> external_client_infos;
+ const crimson::dmclock::ClientInfo *get_external_client(
+ const client_profile_id_t &client) const;
+ public:
+ void update_from_config(const ConfigProxy &conf);
+ const crimson::dmclock::ClientInfo *get_info(
+ const scheduler_id_t &id) const;
+ } client_registry;
+
+ using mclock_queue_t = crimson::dmclock::PullPriorityQueue<
+ scheduler_id_t,
+ item_t,
+ true,
+ true,
+ 2>;
+ mclock_queue_t scheduler;
+ std::list<item_t> immediate;
+
+ static scheduler_id_t get_scheduler_id(const item_t &item) {
+ return scheduler_id_t{
+ item.params.klass,
+ client_profile_id_t{
+ item.params.owner,
+ 0
+ }
+ };
+ }
+
+public:
+ mClockScheduler(ConfigProxy &conf);
+
+ // Enqueue op in the back of the regular queue
+ void enqueue(item_t &&item) final;
+
+ // Enqueue the op in the front of the regular queue
+ void enqueue_front(item_t &&item) final;
+
+ // Return an op to be dispatch
+ item_t dequeue() final;
+
+ // Returns if the queue is empty
+ bool empty() const final {
+ return immediate.empty() && scheduler.empty();
+ }
+
+ // Formatted output of the queue
+ void dump(ceph::Formatter &f) const final;
+
+ void print(std::ostream &ostream) const final {
+ ostream << "mClockScheduler";
+ }
+
+ const char** get_tracked_conf_keys() const final;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) final;
+};
+
+}
diff --git a/src/crimson/osd/scheduler/scheduler.cc b/src/crimson/osd/scheduler/scheduler.cc
new file mode 100644
index 000000000..c85cb388e
--- /dev/null
+++ b/src/crimson/osd/scheduler/scheduler.cc
@@ -0,0 +1,181 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <ostream>
+
+#include <seastar/core/print.hh>
+
+#include "crimson/osd/scheduler/scheduler.h"
+#include "crimson/osd/scheduler/mclock_scheduler.h"
+#include "common/WeightedPriorityQueue.h"
+
+namespace crimson::osd::scheduler {
+
+std::ostream &operator<<(std::ostream &lhs, const scheduler_class_t &c)
+{
+ switch (c) {
+ case scheduler_class_t::background_best_effort:
+ return lhs << "background_best_effort";
+ case scheduler_class_t::background_recovery:
+ return lhs << "background_recovery";
+ case scheduler_class_t::client:
+ return lhs << "client";
+ case scheduler_class_t::repop:
+ return lhs << "repop";
+ case scheduler_class_t::immediate:
+ return lhs << "immediate";
+ default:
+ return lhs;
+ }
+}
+
+/**
+ * Implements Scheduler in terms of OpQueue
+ *
+ * Templated on queue type to avoid dynamic dispatch, T should implement
+ * OpQueue<Scheduleritem_t, client_t>. This adapter is mainly responsible for
+ * the boilerplate priority cutoff/strict concept which is needed for
+ * OpQueue based implementations.
+ */
+template <typename T>
+class ClassedOpQueueScheduler final : public Scheduler {
+ const scheduler_class_t cutoff;
+ T queue;
+
+ using priority_t = uint64_t;
+ std::array<
+ priority_t,
+ static_cast<size_t>(scheduler_class_t::immediate)
+ > priority_map = {
+ // Placeholder, gets replaced with configured values
+ 0, 0, 0
+ };
+
+ static scheduler_class_t get_io_prio_cut(ConfigProxy &conf) {
+ if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+ srand(time(NULL));
+ return (rand() % 2 < 1) ?
+ scheduler_class_t::repop : scheduler_class_t::immediate;
+ } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+ return scheduler_class_t::immediate;
+ } else {
+ return scheduler_class_t::repop;
+ }
+ }
+
+ bool use_strict(scheduler_class_t kl) const {
+ return static_cast<uint8_t>(kl) >= static_cast<uint8_t>(cutoff);
+ }
+
+ priority_t get_priority(scheduler_class_t kl) const {
+ ceph_assert(static_cast<size_t>(kl) <
+ static_cast<size_t>(scheduler_class_t::immediate));
+ return priority_map[static_cast<size_t>(kl)];
+ }
+
+public:
+ template <typename... Args>
+ ClassedOpQueueScheduler(ConfigProxy &conf, Args&&... args) :
+ cutoff(get_io_prio_cut(conf)),
+ queue(std::forward<Args>(args)...)
+ {
+ priority_map[
+ static_cast<size_t>(scheduler_class_t::background_best_effort)
+ ] = conf.get_val<uint64_t>("osd_scrub_priority");
+ priority_map[
+ static_cast<size_t>(scheduler_class_t::background_recovery)
+ ] = conf.get_val<uint64_t>("osd_recovery_op_priority");
+ priority_map[
+ static_cast<size_t>(scheduler_class_t::client)
+ ] = conf.get_val<uint64_t>("osd_client_op_priority");
+ priority_map[
+ static_cast<size_t>(scheduler_class_t::repop)
+ ] = conf.get_val<uint64_t>("osd_client_op_priority");
+ }
+
+ void enqueue(item_t &&item) final {
+ if (use_strict(item.params.klass))
+ queue.enqueue_strict(
+ item.params.owner, get_priority(item.params.klass), std::move(item));
+ else
+ queue.enqueue(
+ item.params.owner, get_priority(item.params.klass),
+ item.params.cost, std::move(item));
+ }
+
+ void enqueue_front(item_t &&item) final {
+ if (use_strict(item.params.klass))
+ queue.enqueue_strict_front(
+ item.params.owner, get_priority(item.params.klass), std::move(item));
+ else
+ queue.enqueue_front(
+ item.params.owner, get_priority(item.params.klass),
+ item.params.cost, std::move(item));
+ }
+
+ bool empty() const final {
+ return queue.empty();
+ }
+
+ item_t dequeue() final {
+ return queue.dequeue();
+ }
+
+ void dump(ceph::Formatter &f) const final {
+ return queue.dump(&f);
+ }
+
+ void print(std::ostream &out) const final {
+ out << "ClassedOpQueueScheduler(queue=";
+ queue.print(out);
+ out << ", cutoff=" << cutoff << ")";
+ }
+
+ ~ClassedOpQueueScheduler() final {};
+};
+
+SchedulerRef make_scheduler(ConfigProxy &conf)
+{
+ const std::string _type = conf.get_val<std::string>("osd_op_queue");
+ const std::string *type = &_type;
+ if (*type == "debug_random") {
+ static const std::string index_lookup[] = { "mclock_scheduler",
+ "wpq" };
+ srand(time(NULL));
+ unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+ type = &index_lookup[which];
+ }
+
+ if (*type == "wpq" ) {
+ // default is 'wpq'
+ return std::make_unique<
+ ClassedOpQueueScheduler<WeightedPriorityQueue<item_t, client_t>>>(
+ conf,
+ conf.get_val<uint64_t>("osd_op_pq_max_tokens_per_priority"),
+ conf->osd_op_pq_min_cost
+ );
+ } else if (*type == "mclock_scheduler") {
+ return std::make_unique<mClockScheduler>(conf);
+ } else {
+ ceph_assert("Invalid choice of wq" == 0);
+ return std::unique_ptr<mClockScheduler>();
+ }
+}
+
+std::ostream &operator<<(std::ostream &lhs, const Scheduler &rhs) {
+ rhs.print(lhs);
+ return lhs;
+}
+
+}
diff --git a/src/crimson/osd/scheduler/scheduler.h b/src/crimson/osd/scheduler/scheduler.h
new file mode 100644
index 000000000..a014991ab
--- /dev/null
+++ b/src/crimson/osd/scheduler/scheduler.h
@@ -0,0 +1,82 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <ostream>
+
+#include "crimson/common/config_proxy.h"
+
+namespace crimson::osd::scheduler {
+
+enum class scheduler_class_t : uint8_t {
+ background_best_effort = 0,
+ background_recovery,
+ client,
+ repop,
+ immediate,
+};
+
+std::ostream &operator<<(std::ostream &, const scheduler_class_t &);
+
+using client_t = uint64_t;
+using cost_t = uint64_t;
+
+struct params_t {
+ cost_t cost = 1;
+ client_t owner;
+ scheduler_class_t klass;
+};
+
+struct item_t {
+ params_t params;
+ seastar::promise<> wake;
+};
+
+/**
+ * Base interface for classes responsible for choosing
+ * op processing order in the OSD.
+ */
+class Scheduler {
+public:
+ // Enqueue op for scheduling
+ virtual void enqueue(item_t &&item) = 0;
+
+ // Enqueue op for processing as though it were enqueued prior
+ // to other items already scheduled.
+ virtual void enqueue_front(item_t &&item) = 0;
+
+ // Returns true iff there are no ops scheduled
+ virtual bool empty() const = 0;
+
+ // Return next op to be processed
+ virtual item_t dequeue() = 0;
+
+ // Dump formatted representation for the queue
+ virtual void dump(ceph::Formatter &f) const = 0;
+
+ // Print human readable brief description with relevant parameters
+ virtual void print(std::ostream &out) const = 0;
+
+ // Destructor
+ virtual ~Scheduler() {};
+};
+
+std::ostream &operator<<(std::ostream &lhs, const Scheduler &);
+using SchedulerRef = std::unique_ptr<Scheduler>;
+
+SchedulerRef make_scheduler(ConfigProxy &);
+
+}