diff options
Diffstat (limited to 'src/crimson/osd/scheduler')
-rw-r--r-- | src/crimson/osd/scheduler/mclock_scheduler.cc | 165 | ||||
-rw-r--r-- | src/crimson/osd/scheduler/mclock_scheduler.h | 130 | ||||
-rw-r--r-- | src/crimson/osd/scheduler/scheduler.cc | 181 | ||||
-rw-r--r-- | src/crimson/osd/scheduler/scheduler.h | 82 |
4 files changed, 558 insertions, 0 deletions
diff --git a/src/crimson/osd/scheduler/mclock_scheduler.cc b/src/crimson/osd/scheduler/mclock_scheduler.cc new file mode 100644 index 000000000..195ea8dd8 --- /dev/null +++ b/src/crimson/osd/scheduler/mclock_scheduler.cc @@ -0,0 +1,165 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include <memory> +#include <functional> + +#include "crimson/osd/scheduler/mclock_scheduler.h" +#include "common/dout.h" + +namespace dmc = crimson::dmclock; +using namespace std::placeholders; + +#define dout_context cct +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix *_dout + + +namespace crimson::osd::scheduler { + +mClockScheduler::mClockScheduler(ConfigProxy &conf) : + scheduler( + std::bind(&mClockScheduler::ClientRegistry::get_info, + &client_registry, + _1), + dmc::AtLimit::Allow, + conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout")) +{ + conf.add_observer(this); + client_registry.update_from_config(conf); +} + +void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf) +{ + default_external_client_info.update( + conf.get_val<uint64_t>("osd_mclock_scheduler_client_res"), + conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"), + conf.get_val<uint64_t>("osd_mclock_scheduler_client_lim")); + + internal_client_infos[ + static_cast<size_t>(scheduler_class_t::background_recovery)].update( + conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_res"), + conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"), + conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_lim")); + + internal_client_infos[ + static_cast<size_t>(scheduler_class_t::background_best_effort)].update( + conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_res"), + conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"), + conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_lim")); +} + +const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client( + const client_profile_id_t &client) const +{ + auto ret = external_client_infos.find(client); + if (ret == external_client_infos.end()) + return &default_external_client_info; + else + return &(ret->second); +} + +const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info( + const scheduler_id_t &id) const { + switch (id.class_id) { + case scheduler_class_t::immediate: + ceph_assert(0 == "Cannot schedule immediate"); + return (dmc::ClientInfo*)nullptr; + case scheduler_class_t::repop: + case scheduler_class_t::client: + return get_external_client(id.client_profile_id); + default: + ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size()); + return &internal_client_infos[static_cast<size_t>(id.class_id)]; + } +} + +void mClockScheduler::dump(ceph::Formatter &f) const +{ +} + +void mClockScheduler::enqueue(item_t&& item) +{ + auto id = get_scheduler_id(item); + auto cost = item.params.cost; + + if (scheduler_class_t::immediate == item.params.klass) { + immediate.push_front(std::move(item)); + } else { + scheduler.add_request( + std::move(item), + id, + cost); + } +} + +void mClockScheduler::enqueue_front(item_t&& item) +{ + immediate.push_back(std::move(item)); + // TODO: item may not be immediate, update mclock machinery to permit + // putting the item back in the queue +} + +item_t mClockScheduler::dequeue() +{ + if (!immediate.empty()) { + auto ret = std::move(immediate.back()); + immediate.pop_back(); + return ret; + } else { + mclock_queue_t::PullReq result = scheduler.pull_request(); + if (result.is_future()) { + ceph_assert( + 0 == "Not implemented, user would have to be able to be woken up"); + return std::move(*(item_t*)nullptr); + } else if (result.is_none()) { + ceph_assert( + 0 == "Impossible, must have checked empty() first"); + return std::move(*(item_t*)nullptr); + } else { + ceph_assert(result.is_retn()); + + auto &retn = result.get_retn(); + return std::move(*retn.request); + } + } +} + +const char** mClockScheduler::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "osd_mclock_scheduler_client_res", + "osd_mclock_scheduler_client_wgt", + "osd_mclock_scheduler_client_lim", + "osd_mclock_scheduler_background_recovery_res", + "osd_mclock_scheduler_background_recovery_wgt", + "osd_mclock_scheduler_background_recovery_lim", + "osd_mclock_scheduler_background_best_effort_res", + "osd_mclock_scheduler_background_best_effort_wgt", + "osd_mclock_scheduler_background_best_effort_lim", + NULL + }; + return KEYS; +} + +void mClockScheduler::handle_conf_change( + const ConfigProxy& conf, + const std::set<std::string> &changed) +{ + client_registry.update_from_config(conf); +} + +} diff --git a/src/crimson/osd/scheduler/mclock_scheduler.h b/src/crimson/osd/scheduler/mclock_scheduler.h new file mode 100644 index 000000000..c3edbe729 --- /dev/null +++ b/src/crimson/osd/scheduler/mclock_scheduler.h @@ -0,0 +1,130 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include <ostream> +#include <map> +#include <vector> + +#include "boost/variant.hpp" + +#include "dmclock/src/dmclock_server.h" + +#include "crimson/osd/scheduler/scheduler.h" +#include "common/config.h" +#include "include/cmp.h" +#include "common/ceph_context.h" + + +namespace crimson::osd::scheduler { + +using client_id_t = uint64_t; +using profile_id_t = uint64_t; + +struct client_profile_id_t { + client_id_t client_id; + profile_id_t profile_id; +}; + +WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id) +WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id) + + +struct scheduler_id_t { + scheduler_class_t class_id; + client_profile_id_t client_profile_id; +}; + +WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id) +WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id) + +/** + * Scheduler implementation based on mclock. + * + * TODO: explain configs + */ +class mClockScheduler : public Scheduler, md_config_obs_t { + + class ClientRegistry { + std::array< + crimson::dmclock::ClientInfo, + static_cast<size_t>(scheduler_class_t::client) + > internal_client_infos = { + // Placeholder, gets replaced with configured values + crimson::dmclock::ClientInfo(1, 1, 1), + crimson::dmclock::ClientInfo(1, 1, 1) + }; + + crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1}; + std::map<client_profile_id_t, + crimson::dmclock::ClientInfo> external_client_infos; + const crimson::dmclock::ClientInfo *get_external_client( + const client_profile_id_t &client) const; + public: + void update_from_config(const ConfigProxy &conf); + const crimson::dmclock::ClientInfo *get_info( + const scheduler_id_t &id) const; + } client_registry; + + using mclock_queue_t = crimson::dmclock::PullPriorityQueue< + scheduler_id_t, + item_t, + true, + true, + 2>; + mclock_queue_t scheduler; + std::list<item_t> immediate; + + static scheduler_id_t get_scheduler_id(const item_t &item) { + return scheduler_id_t{ + item.params.klass, + client_profile_id_t{ + item.params.owner, + 0 + } + }; + } + +public: + mClockScheduler(ConfigProxy &conf); + + // Enqueue op in the back of the regular queue + void enqueue(item_t &&item) final; + + // Enqueue the op in the front of the regular queue + void enqueue_front(item_t &&item) final; + + // Return an op to be dispatch + item_t dequeue() final; + + // Returns if the queue is empty + bool empty() const final { + return immediate.empty() && scheduler.empty(); + } + + // Formatted output of the queue + void dump(ceph::Formatter &f) const final; + + void print(std::ostream &ostream) const final { + ostream << "mClockScheduler"; + } + + const char** get_tracked_conf_keys() const final; + void handle_conf_change(const ConfigProxy& conf, + const std::set<std::string> &changed) final; +}; + +} diff --git a/src/crimson/osd/scheduler/scheduler.cc b/src/crimson/osd/scheduler/scheduler.cc new file mode 100644 index 000000000..c85cb388e --- /dev/null +++ b/src/crimson/osd/scheduler/scheduler.cc @@ -0,0 +1,181 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <ostream> + +#include <seastar/core/print.hh> + +#include "crimson/osd/scheduler/scheduler.h" +#include "crimson/osd/scheduler/mclock_scheduler.h" +#include "common/WeightedPriorityQueue.h" + +namespace crimson::osd::scheduler { + +std::ostream &operator<<(std::ostream &lhs, const scheduler_class_t &c) +{ + switch (c) { + case scheduler_class_t::background_best_effort: + return lhs << "background_best_effort"; + case scheduler_class_t::background_recovery: + return lhs << "background_recovery"; + case scheduler_class_t::client: + return lhs << "client"; + case scheduler_class_t::repop: + return lhs << "repop"; + case scheduler_class_t::immediate: + return lhs << "immediate"; + default: + return lhs; + } +} + +/** + * Implements Scheduler in terms of OpQueue + * + * Templated on queue type to avoid dynamic dispatch, T should implement + * OpQueue<Scheduleritem_t, client_t>. This adapter is mainly responsible for + * the boilerplate priority cutoff/strict concept which is needed for + * OpQueue based implementations. + */ +template <typename T> +class ClassedOpQueueScheduler final : public Scheduler { + const scheduler_class_t cutoff; + T queue; + + using priority_t = uint64_t; + std::array< + priority_t, + static_cast<size_t>(scheduler_class_t::immediate) + > priority_map = { + // Placeholder, gets replaced with configured values + 0, 0, 0 + }; + + static scheduler_class_t get_io_prio_cut(ConfigProxy &conf) { + if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") { + srand(time(NULL)); + return (rand() % 2 < 1) ? + scheduler_class_t::repop : scheduler_class_t::immediate; + } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") { + return scheduler_class_t::immediate; + } else { + return scheduler_class_t::repop; + } + } + + bool use_strict(scheduler_class_t kl) const { + return static_cast<uint8_t>(kl) >= static_cast<uint8_t>(cutoff); + } + + priority_t get_priority(scheduler_class_t kl) const { + ceph_assert(static_cast<size_t>(kl) < + static_cast<size_t>(scheduler_class_t::immediate)); + return priority_map[static_cast<size_t>(kl)]; + } + +public: + template <typename... Args> + ClassedOpQueueScheduler(ConfigProxy &conf, Args&&... args) : + cutoff(get_io_prio_cut(conf)), + queue(std::forward<Args>(args)...) + { + priority_map[ + static_cast<size_t>(scheduler_class_t::background_best_effort) + ] = conf.get_val<uint64_t>("osd_scrub_priority"); + priority_map[ + static_cast<size_t>(scheduler_class_t::background_recovery) + ] = conf.get_val<uint64_t>("osd_recovery_op_priority"); + priority_map[ + static_cast<size_t>(scheduler_class_t::client) + ] = conf.get_val<uint64_t>("osd_client_op_priority"); + priority_map[ + static_cast<size_t>(scheduler_class_t::repop) + ] = conf.get_val<uint64_t>("osd_client_op_priority"); + } + + void enqueue(item_t &&item) final { + if (use_strict(item.params.klass)) + queue.enqueue_strict( + item.params.owner, get_priority(item.params.klass), std::move(item)); + else + queue.enqueue( + item.params.owner, get_priority(item.params.klass), + item.params.cost, std::move(item)); + } + + void enqueue_front(item_t &&item) final { + if (use_strict(item.params.klass)) + queue.enqueue_strict_front( + item.params.owner, get_priority(item.params.klass), std::move(item)); + else + queue.enqueue_front( + item.params.owner, get_priority(item.params.klass), + item.params.cost, std::move(item)); + } + + bool empty() const final { + return queue.empty(); + } + + item_t dequeue() final { + return queue.dequeue(); + } + + void dump(ceph::Formatter &f) const final { + return queue.dump(&f); + } + + void print(std::ostream &out) const final { + out << "ClassedOpQueueScheduler(queue="; + queue.print(out); + out << ", cutoff=" << cutoff << ")"; + } + + ~ClassedOpQueueScheduler() final {}; +}; + +SchedulerRef make_scheduler(ConfigProxy &conf) +{ + const std::string _type = conf.get_val<std::string>("osd_op_queue"); + const std::string *type = &_type; + if (*type == "debug_random") { + static const std::string index_lookup[] = { "mclock_scheduler", + "wpq" }; + srand(time(NULL)); + unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); + type = &index_lookup[which]; + } + + if (*type == "wpq" ) { + // default is 'wpq' + return std::make_unique< + ClassedOpQueueScheduler<WeightedPriorityQueue<item_t, client_t>>>( + conf, + conf.get_val<uint64_t>("osd_op_pq_max_tokens_per_priority"), + conf->osd_op_pq_min_cost + ); + } else if (*type == "mclock_scheduler") { + return std::make_unique<mClockScheduler>(conf); + } else { + ceph_assert("Invalid choice of wq" == 0); + return std::unique_ptr<mClockScheduler>(); + } +} + +std::ostream &operator<<(std::ostream &lhs, const Scheduler &rhs) { + rhs.print(lhs); + return lhs; +} + +} diff --git a/src/crimson/osd/scheduler/scheduler.h b/src/crimson/osd/scheduler/scheduler.h new file mode 100644 index 000000000..a014991ab --- /dev/null +++ b/src/crimson/osd/scheduler/scheduler.h @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/future.hh> +#include <ostream> + +#include "crimson/common/config_proxy.h" + +namespace crimson::osd::scheduler { + +enum class scheduler_class_t : uint8_t { + background_best_effort = 0, + background_recovery, + client, + repop, + immediate, +}; + +std::ostream &operator<<(std::ostream &, const scheduler_class_t &); + +using client_t = uint64_t; +using cost_t = uint64_t; + +struct params_t { + cost_t cost = 1; + client_t owner; + scheduler_class_t klass; +}; + +struct item_t { + params_t params; + seastar::promise<> wake; +}; + +/** + * Base interface for classes responsible for choosing + * op processing order in the OSD. + */ +class Scheduler { +public: + // Enqueue op for scheduling + virtual void enqueue(item_t &&item) = 0; + + // Enqueue op for processing as though it were enqueued prior + // to other items already scheduled. + virtual void enqueue_front(item_t &&item) = 0; + + // Returns true iff there are no ops scheduled + virtual bool empty() const = 0; + + // Return next op to be processed + virtual item_t dequeue() = 0; + + // Dump formatted representation for the queue + virtual void dump(ceph::Formatter &f) const = 0; + + // Print human readable brief description with relevant parameters + virtual void print(std::ostream &out) const = 0; + + // Destructor + virtual ~Scheduler() {}; +}; + +std::ostream &operator<<(std::ostream &lhs, const Scheduler &); +using SchedulerRef = std::unique_ptr<Scheduler>; + +SchedulerRef make_scheduler(ConfigProxy &); + +} |