diff options
Diffstat (limited to '')
-rw-r--r-- | src/common/CommandTable.h | 39 | ||||
-rw-r--r-- | src/common/OpQueue.h | 4 | ||||
-rw-r--r-- | src/common/PrioritizedQueue.h | 6 | ||||
-rw-r--r-- | src/common/WeightedPriorityQueue.h | 6 | ||||
-rw-r--r-- | src/common/bit_vector.hpp | 18 | ||||
-rw-r--r-- | src/common/ceph_mutex.h | 5 | ||||
-rw-r--r-- | src/common/ceph_time.h | 3 | ||||
-rw-r--r-- | src/common/config_obs_mgr.h | 39 | ||||
-rw-r--r-- | src/common/config_proxy.h | 192 | ||||
-rw-r--r-- | src/common/mClockPriorityQueue.h | 369 | ||||
-rw-r--r-- | src/common/options/cephfs-mirror.yaml.in | 13 | ||||
-rw-r--r-- | src/common/options/global.yaml.in | 2 | ||||
-rw-r--r-- | src/common/options/mds-client.yaml.in | 2 | ||||
-rw-r--r-- | src/common/options/mds.yaml.in | 39 | ||||
-rw-r--r-- | src/common/options/osd.yaml.in | 16 | ||||
-rw-r--r-- | src/common/options/rgw.yaml.in | 8 | ||||
-rw-r--r-- | src/common/weighted_shuffle.h | 2 |
17 files changed, 230 insertions, 533 deletions
diff --git a/src/common/CommandTable.h b/src/common/CommandTable.h index 53218d653..e777d72dc 100644 --- a/src/common/CommandTable.h +++ b/src/common/CommandTable.h @@ -23,6 +23,8 @@ class CommandOp public: ConnectionRef con; ceph_tid_t tid; + // multi_target_id == 0 means single target command + ceph_tid_t multi_target_id; std::vector<std::string> cmd; ceph::buffer::list inbl; @@ -48,9 +50,11 @@ class CommandOp } } - CommandOp(const ceph_tid_t t) : tid(t), on_finish(nullptr), + CommandOp(const ceph_tid_t t) : tid(t), multi_target_id(0), on_finish(nullptr), outbl(nullptr), outs(nullptr) {} CommandOp() : tid(0), on_finish(nullptr), outbl(nullptr), outs(nullptr) {} + CommandOp(const ceph_tid_t t, const ceph_tid_t multi_id) : tid(t), multi_target_id(multi_id), + on_finish(nullptr), outbl(nullptr), outs(nullptr) {} }; /** @@ -62,23 +66,38 @@ class CommandTable { protected: ceph_tid_t last_tid; + ceph_tid_t last_multi_target_id; + std::map<ceph_tid_t, T> commands; + std::map<ceph_tid_t, std::set<ceph_tid_t> > multi_targets; public: CommandTable() - : last_tid(0) + : last_tid(0), last_multi_target_id(0) {} ~CommandTable() { ceph_assert(commands.empty()); + for (const auto& pair : multi_targets) { + ceph_assert(pair.second.empty()); + } + } + + ceph_tid_t get_new_multi_target_id() + { + return ++last_multi_target_id; } - T& start_command() + T& start_command(ceph_tid_t multi_id=0) { ceph_tid_t tid = last_tid++; - commands.insert(std::make_pair(tid, T(tid)) ); + commands.insert(std::make_pair(tid, T(tid, multi_id)) ); + + if (multi_id != 0) { + multi_targets[multi_id].insert(tid); + } return commands.at(tid); } @@ -93,6 +112,11 @@ public: return commands.count(tid) > 0; } + std::size_t count_multi_commands(ceph_tid_t multi_id) + { + return multi_targets[multi_id].size(); + } + T& get_command(ceph_tid_t tid) { return commands.at(tid); @@ -100,11 +124,18 @@ public: void erase(ceph_tid_t tid) { + ceph_tid_t multi_id = commands.at(tid).multi_target_id; commands.erase(tid); + multi_targets[multi_id].erase(tid); + + if(count_multi_commands(multi_id) == 0) { + multi_targets.erase(multi_id); + } } void clear() { commands.clear(); + multi_targets.clear(); } }; diff --git a/src/common/OpQueue.h b/src/common/OpQueue.h index 0204f4b44..07104b21f 100644 --- a/src/common/OpQueue.h +++ b/src/common/OpQueue.h @@ -16,6 +16,7 @@ #define OP_QUEUE_H #include "include/msgr.h" +#include "osd/osd_types.h" #include <list> #include <functional> @@ -66,6 +67,9 @@ public: // Human readable brief description of queue and relevant parameters virtual void print(std::ostream &f) const = 0; + // Get the type of OpQueue implementation + virtual op_queue_type_t get_type() const = 0; + // Don't leak resources on destruction virtual ~OpQueue() {}; }; diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 9adf21aaf..0c006795e 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -345,7 +345,11 @@ public: } void print(std::ostream &ostream) const final { - ostream << "PrioritizedQueue"; + ostream << get_op_queue_type_name(get_type()); + } + + op_queue_type_t get_type() const final { + return op_queue_type_t::PrioritizedQueue; } }; diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h index cf34709b9..c8d92b5e0 100644 --- a/src/common/WeightedPriorityQueue.h +++ b/src/common/WeightedPriorityQueue.h @@ -346,7 +346,11 @@ class WeightedPriorityQueue : public OpQueue <T, K> } void print(std::ostream &ostream) const final { - ostream << "WeightedPriorityQueue"; + ostream << get_op_queue_type_name(get_type()); + } + + op_queue_type_t get_type() const final { + return op_queue_type_t::WeightedPriorityQueue; } }; diff --git a/src/common/bit_vector.hpp b/src/common/bit_vector.hpp index 9ce3e8b1e..961d9a019 100644 --- a/src/common/bit_vector.hpp +++ b/src/common/bit_vector.hpp @@ -83,7 +83,7 @@ public: }; public: - template <typename BitVectorT, typename DataIterator> + template <typename BitVectorT, typename DataIteratorT, typename ReferenceT> class IteratorImpl { private: friend class BitVector; @@ -94,7 +94,7 @@ public: // cached derived values uint64_t m_index = 0; uint64_t m_shift = 0; - DataIterator m_data_iterator; + DataIteratorT m_data_iterator; IteratorImpl(BitVectorT *bit_vector, uint64_t offset) : m_bit_vector(bit_vector), @@ -129,7 +129,7 @@ public: inline IteratorImpl operator++(int) { IteratorImpl iterator_impl(*this); - ++iterator_impl; + ++*this; return iterator_impl; } inline IteratorImpl operator+(uint64_t offset) { @@ -145,17 +145,15 @@ public: return (m_offset != rhs.m_offset || m_bit_vector != rhs.m_bit_vector); } - inline ConstReference operator*() const { - return ConstReference(m_data_iterator, m_shift); - } - inline Reference operator*() { - return Reference(m_data_iterator, m_shift); + inline ReferenceT operator*() const { + return ReferenceT(m_data_iterator, m_shift); } }; typedef IteratorImpl<const BitVector, - bufferlist::const_iterator> ConstIterator; - typedef IteratorImpl<BitVector, bufferlist::iterator> Iterator; + bufferlist::const_iterator, + ConstReference> ConstIterator; + typedef IteratorImpl<BitVector, bufferlist::iterator, Reference> Iterator; static const uint32_t BLOCK_SIZE; static const uint8_t BIT_COUNT = _bit_count; diff --git a/src/common/ceph_mutex.h b/src/common/ceph_mutex.h index 8d87e605b..5f7bfbed5 100644 --- a/src/common/ceph_mutex.h +++ b/src/common/ceph_mutex.h @@ -83,6 +83,7 @@ namespace ceph { return {}; } + static constexpr bool mutex_debugging = false; #define ceph_mutex_is_locked(m) true #define ceph_mutex_is_locked_by_me(m) true } @@ -130,6 +131,8 @@ namespace ceph { return {std::forward<Args>(args)...}; } + static constexpr bool mutex_debugging = true; + // debug methods #define ceph_mutex_is_locked(m) ((m).is_locked()) #define ceph_mutex_is_not_locked(m) (!(m).is_locked()) @@ -183,6 +186,8 @@ namespace ceph { return {}; } + static constexpr bool mutex_debugging = false; + // debug methods. Note that these can blindly return true // because any code that does anything other than assert these // are true is broken. diff --git a/src/common/ceph_time.h b/src/common/ceph_time.h index 292fa91ac..fc96a80a7 100644 --- a/src/common/ceph_time.h +++ b/src/common/ceph_time.h @@ -528,6 +528,9 @@ struct converts_to_timespec<Clock, std::void_t<decltype( template <typename Clock> constexpr bool converts_to_timespec_v = converts_to_timespec<Clock>::value; +template <typename Clock> +concept clock_with_timespec = converts_to_timespec_v<Clock>; + template<typename Rep, typename T> static Rep to_seconds(T t) { return std::chrono::duration_cast< diff --git a/src/common/config_obs_mgr.h b/src/common/config_obs_mgr.h index 06b3cf934..759930df9 100644 --- a/src/common/config_obs_mgr.h +++ b/src/common/config_obs_mgr.h @@ -14,13 +14,11 @@ class ConfigValues; // the changes of settings at runtime. template<class ConfigObs> class ObserverMgr : public ConfigTracker { - // Maps configuration options to the observer listening for them. - using obs_map_t = std::multimap<std::string, ConfigObs*>; - obs_map_t observers; - public: - typedef std::map<ConfigObs*, std::set<std::string>> rev_obs_map; - typedef std::function<void(ConfigObs*, const std::string&)> config_gather_cb; + using config_obs_ptr = std::shared_ptr<ConfigObs*>; + using config_obs_wptr = std::weak_ptr<ConfigObs*>; + typedef std::map<config_obs_ptr, std::set<std::string>> rev_obs_map; + typedef std::function<void(config_obs_ptr, const std::string&)> config_gather_cb; // Adds a new observer to this configuration. You can do this at any time, // but it will only receive notifications for the changes that happen after @@ -37,15 +35,18 @@ public: // you need to delete it yourself. // This function will assert if you try to delete an observer that isn't // there. - void remove_observer(ConfigObs* observer); + config_obs_wptr remove_observer(ConfigObs* observer); // invoke callback for every observers tracking keys void for_each_observer(config_gather_cb callback); // invoke callback for observers keys tracking the provided change set - template<class ConfigProxyT> - void for_each_change(const std::set<std::string>& changes, - ConfigProxyT& proxy, + void for_each_change(const std::map<std::string,bool>& changes, config_gather_cb callback, std::ostream *oss); bool is_tracking(const std::string& name) const override; + +private: + // Maps configuration options to the observer listening for them. + using obs_map_t = std::multimap<std::string, config_obs_ptr>; + obs_map_t observers; }; // we could put the implementations in a .cc file, and only instantiate the @@ -60,17 +61,20 @@ template<class ConfigObs> void ObserverMgr<ConfigObs>::add_observer(ConfigObs* observer) { const char **keys = observer->get_tracked_conf_keys(); + auto ptr = std::make_shared<ConfigObs*>(observer); for (const char ** k = keys; *k; ++k) { - observers.emplace(*k, observer); + observers.emplace(*k, ptr); } } template<class ConfigObs> -void ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer) +typename ObserverMgr<ConfigObs>::config_obs_wptr ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer) { [[maybe_unused]] bool found_obs = false; + config_obs_ptr ptr; for (auto o = observers.begin(); o != observers.end(); ) { - if (o->second == observer) { + if (*o->second == observer) { + ptr = std::move(o->second); observers.erase(o++); found_obs = true; } else { @@ -78,6 +82,7 @@ void ObserverMgr<ConfigObs>::remove_observer(ConfigObs* observer) } } ceph_assert(found_obs); + return config_obs_wptr(ptr); } template<class ConfigObs> @@ -89,17 +94,15 @@ void ObserverMgr<ConfigObs>::for_each_observer(config_gather_cb callback) } template<class ConfigObs> -template<class ConfigProxyT> -void ObserverMgr<ConfigObs>::for_each_change(const std::set<std::string>& changes, - ConfigProxyT& proxy, +void ObserverMgr<ConfigObs>::for_each_change(const std::map<std::string,bool>& changes, config_gather_cb callback, std::ostream *oss) { // create the reverse observer mapping, mapping observers to the set of // changed keys that they'll get. std::string val; - for (auto& key : changes) { + for (auto& [key, present] : changes) { auto [first, last] = observers.equal_range(key); - if ((oss) && !proxy.get_val(key, &val)) { + if ((oss) && present) { (*oss) << key << " = '" << val << "' "; if (first == last) { (*oss) << "(not observed, change may require restart) "; diff --git a/src/common/config_proxy.h b/src/common/config_proxy.h index 02c670f60..b9b47d9ce 100644 --- a/src/common/config_proxy.h +++ b/src/common/config_proxy.h @@ -18,91 +18,51 @@ class ConfigProxy { */ ConfigValues values; using md_config_obs_t = ceph::md_config_obs_impl<ConfigProxy>; - ObserverMgr<md_config_obs_t> obs_mgr; + using ObsMgr = ObserverMgr<md_config_obs_t>; + ObsMgr obs_mgr; md_config_t config; /** A lock that protects the md_config_t internals. It is * recursive, for simplicity. * It is best if this lock comes first in the lock hierarchy. We will * hold this lock when calling configuration observers. */ - mutable ceph::recursive_mutex lock = - ceph::make_recursive_mutex("ConfigProxy::lock"); + mutable ceph::mutex lock = ceph::make_mutex("ConfigProxy::lock"); + ceph::condition_variable cond; - class CallGate { - private: - uint32_t call_count = 0; - ceph::mutex lock; - ceph::condition_variable cond; - public: - CallGate() - : lock(ceph::make_mutex("call::gate::lock")) { - } + using rev_obs_map_t = ObsMgr::rev_obs_map; - void enter() { - std::lock_guard<ceph::mutex> locker(lock); - ++call_count; + void _call_observers(rev_obs_map_t& rev_obs) { + ceph_assert(!ceph::mutex_debugging || !ceph_mutex_is_locked_by_me(lock)); + for (auto& [obs, keys] : rev_obs) { + (*obs)->handle_conf_change(*this, keys); } - void leave() { - std::lock_guard<ceph::mutex> locker(lock); - ceph_assert(call_count > 0); - if (--call_count == 0) { - cond.notify_all(); - } + rev_obs.clear(); // drop shared_ptrs + { + std::lock_guard l{lock}; + cond.notify_all(); } - void close() { - std::unique_lock<ceph::mutex> locker(lock); - while (call_count != 0) { - cond.wait(locker); - } - } - }; - - void call_gate_enter(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->enter(); - } - void call_gate_leave(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->leave(); } - void call_gate_close(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->close(); - } - - using rev_obs_map_t = ObserverMgr<md_config_obs_t>::rev_obs_map; - typedef std::unique_ptr<CallGate> CallGateRef; - - std::map<md_config_obs_t*, CallGateRef> obs_call_gate; - - void call_observers(std::unique_lock<ceph::recursive_mutex>& locker, - rev_obs_map_t& rev_obs) { - // observers are notified outside of lock - locker.unlock(); - for (auto& [obs, keys] : rev_obs) { - obs->handle_conf_change(*this, keys); - } - locker.lock(); - - for (auto& rev_ob : rev_obs) { - call_gate_leave(rev_ob.first); + void _gather_changes(std::set<std::string> &changes, + rev_obs_map_t *rev_obs, std::ostream* oss) { + ceph_assert(ceph_mutex_is_locked_by_me(lock)); + std::map<std::string,bool> changes_present; + for (auto& change : changes) { + std::string dummy; + changes_present[change] = (0 == config.get_val(values, change, &dummy)); } + obs_mgr.for_each_change( + changes_present, + [this, rev_obs](auto obs, const std::string &key) { + _map_observer_changes(obs, key, rev_obs); + }, oss); + changes.clear(); } - void map_observer_changes(md_config_obs_t *obs, const std::string &key, + void _map_observer_changes(ObsMgr::config_obs_ptr obs, const std::string& key, rev_obs_map_t *rev_obs) { - ceph_assert(ceph_mutex_is_locked(lock)); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); auto [it, new_entry] = rev_obs->emplace(obs, std::set<std::string>{}); it->second.emplace(key); - if (new_entry) { - // this needs to be done under lock as once this lock is - // dropped (before calling observers) a remove_observer() - // can sneak in and cause havoc. - call_gate_enter(obs); - } } public: @@ -150,12 +110,15 @@ public: std::forward<Args>(args)...); } void config_options(ceph::Formatter *f) const { + std::lock_guard l{lock}; config.config_options(f); } const decltype(md_config_t::schema)& get_schema() const { + std::lock_guard l{lock}; return config.schema; } const Option* get_schema(const std::string_view key) const { + std::lock_guard l{lock}; auto found = config.schema.find(key); if (found == config.schema.end()) { return nullptr; @@ -164,6 +127,7 @@ public: } } const Option *find_option(const std::string& name) const { + std::lock_guard l{lock}; return config.find_option(name); } void diff(ceph::Formatter *f, const std::string& name = {}) const { @@ -186,6 +150,7 @@ public: sections, key, out, emeta); } unsigned get_osd_pool_default_min_size(uint8_t size) const { + std::lock_guard l{lock}; return config.get_osd_pool_default_min_size(values, size); } void early_expand_meta(std::string &val, @@ -195,39 +160,46 @@ public: } // for those want to reexpand special meta, e.g, $pid void finalize_reexpand_meta() { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - if (config.finalize_reexpand_meta(values, obs_mgr)) { - _gather_changes(values.changed, &rev_obs, nullptr); + { + std::lock_guard locker(lock); + if (config.finalize_reexpand_meta(values, obs_mgr)) { + _gather_changes(values.changed, &rev_obs, nullptr); + } } - call_observers(locker, rev_obs); + _call_observers(rev_obs); } void add_observer(md_config_obs_t* obs) { std::lock_guard l(lock); obs_mgr.add_observer(obs); - obs_call_gate.emplace(obs, std::make_unique<CallGate>()); + cond.notify_all(); } void remove_observer(md_config_obs_t* obs) { - std::lock_guard l(lock); - call_gate_close(obs); - obs_call_gate.erase(obs); - obs_mgr.remove_observer(obs); + std::unique_lock l(lock); + auto wptr = obs_mgr.remove_observer(obs); + while (!wptr.expired()) { + cond.wait(l); + } } void call_all_observers() { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - obs_mgr.for_each_observer( - [this, &rev_obs](md_config_obs_t *obs, const std::string &key) { - map_observer_changes(obs, key, &rev_obs); - }); + { + std::lock_guard locker(lock); + obs_mgr.for_each_observer( + [this, &rev_obs](auto obs, const std::string& key) { + _map_observer_changes(obs, key, &rev_obs); + }); + } - call_observers(locker, rev_obs); + _call_observers(rev_obs); } void set_safe_to_start_threads() { + std::lock_guard l(lock); config.set_safe_to_start_threads(); } void _clear_safe_to_start_threads() { + std::lock_guard l(lock); config._clear_safe_to_start_threads(); } void show_config(std::ostream& out) { @@ -248,25 +220,18 @@ public: } // Expand all metavariables. Make any pending observer callbacks. void apply_changes(std::ostream* oss) { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - // apply changes until the cluster name is assigned - if (!values.cluster.empty()) { - // meta expands could have modified anything. Copy it all out again. - _gather_changes(values.changed, &rev_obs, oss); + { + std::lock_guard locker(lock); + // apply changes until the cluster name is assigned + if (!values.cluster.empty()) { + // meta expands could have modified anything. Copy it all out again. + _gather_changes(values.changed, &rev_obs, oss); + } } - call_observers(locker, rev_obs); - } - void _gather_changes(std::set<std::string> &changes, - rev_obs_map_t *rev_obs, std::ostream* oss) { - obs_mgr.for_each_change( - changes, *this, - [this, rev_obs](md_config_obs_t *obs, const std::string &key) { - map_observer_changes(obs, key, rev_obs); - }, oss); - changes.clear(); + _call_observers(rev_obs); } int set_val(const std::string_view key, const std::string& s, std::stringstream* err_ss=nullptr) { @@ -284,23 +249,27 @@ public: int set_mon_vals(CephContext *cct, const std::map<std::string,std::string,std::less<>>& kv, md_config_t::config_callback config_cb) { - std::unique_lock locker(lock); - int ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb); - + int ret; rev_obs_map_t rev_obs; - _gather_changes(values.changed, &rev_obs, nullptr); - call_observers(locker, rev_obs); + { + std::lock_guard locker(lock); + ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb); + _gather_changes(values.changed, &rev_obs, nullptr); + } + + _call_observers(rev_obs); return ret; } int injectargs(const std::string &s, std::ostream *oss) { - std::unique_lock locker(lock); - int ret = config.injectargs(values, obs_mgr, s, oss); - + int ret; rev_obs_map_t rev_obs; - _gather_changes(values.changed, &rev_obs, oss); - - call_observers(locker, rev_obs); + { + std::lock_guard locker(lock); + ret = config.injectargs(values, obs_mgr, s, oss); + _gather_changes(values.changed, &rev_obs, oss); + } + _call_observers(rev_obs); return ret; } void parse_env(unsigned entity_type, @@ -319,12 +288,15 @@ public: conf_files, warnings, flags); } bool has_parse_error() const { + std::lock_guard l(lock); return !config.parse_error.empty(); } std::string get_parse_error() { + std::lock_guard l(lock); return config.parse_error; } void complain_about_parse_error(CephContext *cct) { + std::lock_guard l(lock); return config.complain_about_parse_error(cct); } void do_argv_commands() const { @@ -342,9 +314,11 @@ public: config.get_defaults_bl(values, bl); } const std::string& get_conf_path() const { + std::lock_guard l(lock); return config.get_conf_path(); } std::optional<std::string> get_val_default(std::string_view key) { + std::lock_guard l(lock); return config.get_val_default(key); } }; diff --git a/src/common/mClockPriorityQueue.h b/src/common/mClockPriorityQueue.h deleted file mode 100644 index c1f9f3c25..000000000 --- a/src/common/mClockPriorityQueue.h +++ /dev/null @@ -1,369 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 Red Hat Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#pragma once - - -#include <functional> -#include <map> -#include <list> -#include <cmath> - -#include "common/Formatter.h" -#include "common/OpQueue.h" - -#include "dmclock/src/dmclock_server.h" - -// the following is done to unclobber _ASSERT_H so it returns to the -// way ceph likes it -#include "include/ceph_assert.h" - - -namespace ceph { - - namespace dmc = crimson::dmclock; - - template <typename T, typename K> - class mClockQueue : public OpQueue <T, K> { - - using priority_t = unsigned; - using cost_t = unsigned; - - typedef std::list<std::pair<cost_t, T> > ListPairs; - - static void filter_list_pairs(ListPairs *l, - std::function<bool (T&&)> f) { - for (typename ListPairs::iterator i = l->end(); - i != l->begin(); - /* no inc */ - ) { - auto next = i; - --next; - if (f(std::move(next->second))) { - l->erase(next); - } else { - i = next; - } - } - } - - struct SubQueue { - private: - typedef std::map<K, ListPairs> Classes; - // client-class to ordered queue - Classes q; - - unsigned tokens, max_tokens; - - typename Classes::iterator cur; - - public: - - SubQueue(const SubQueue &other) - : q(other.q), - tokens(other.tokens), - max_tokens(other.max_tokens), - cur(q.begin()) {} - - SubQueue() - : tokens(0), - max_tokens(0), - cur(q.begin()) {} - - void set_max_tokens(unsigned mt) { - max_tokens = mt; - } - - unsigned get_max_tokens() const { - return max_tokens; - } - - unsigned num_tokens() const { - return tokens; - } - - void put_tokens(unsigned t) { - tokens += t; - if (tokens > max_tokens) { - tokens = max_tokens; - } - } - - void take_tokens(unsigned t) { - if (tokens > t) { - tokens -= t; - } else { - tokens = 0; - } - } - - void enqueue(K cl, cost_t cost, T&& item) { - q[cl].emplace_back(cost, std::move(item)); - if (cur == q.end()) - cur = q.begin(); - } - - void enqueue_front(K cl, cost_t cost, T&& item) { - q[cl].emplace_front(cost, std::move(item)); - if (cur == q.end()) - cur = q.begin(); - } - - const std::pair<cost_t, T>& front() const { - ceph_assert(!(q.empty())); - ceph_assert(cur != q.end()); - return cur->second.front(); - } - - std::pair<cost_t, T>& front() { - ceph_assert(!(q.empty())); - ceph_assert(cur != q.end()); - return cur->second.front(); - } - - void pop_front() { - ceph_assert(!(q.empty())); - ceph_assert(cur != q.end()); - cur->second.pop_front(); - if (cur->second.empty()) { - auto i = cur; - ++cur; - q.erase(i); - } else { - ++cur; - } - if (cur == q.end()) { - cur = q.begin(); - } - } - - unsigned get_size_slow() const { - unsigned count = 0; - for (const auto& cls : q) { - count += cls.second.size(); - } - return count; - } - - bool empty() const { - return q.empty(); - } - - void remove_by_filter(std::function<bool (T&&)> f) { - for (typename Classes::iterator i = q.begin(); - i != q.end(); - /* no-inc */) { - filter_list_pairs(&(i->second), f); - if (i->second.empty()) { - if (cur == i) { - ++cur; - } - i = q.erase(i); - } else { - ++i; - } - } - if (cur == q.end()) cur = q.begin(); - } - - void remove_by_class(K k, std::list<T> *out) { - typename Classes::iterator i = q.find(k); - if (i == q.end()) { - return; - } - if (i == cur) { - ++cur; - } - if (out) { - for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) { - out->push_front(std::move(j->second)); - } - } - q.erase(i); - if (cur == q.end()) cur = q.begin(); - } - - void dump(ceph::Formatter *f) const { - f->dump_int("size", get_size_slow()); - f->dump_int("num_keys", q.size()); - } - }; - - using SubQueues = std::map<priority_t, SubQueue>; - - SubQueues high_queue; - - using Queue = dmc::PullPriorityQueue<K,T,false>; - Queue queue; - - // when enqueue_front is called, rather than try to re-calc tags - // to put in mClock priority queue, we'll just keep a separate - // list from which we dequeue items first, and only when it's - // empty do we use queue. - std::list<std::pair<K,T>> queue_front; - - public: - - mClockQueue( - const typename Queue::ClientInfoFunc& info_func, - double anticipation_timeout = 0.0) : - queue(info_func, dmc::AtLimit::Allow, anticipation_timeout) - { - // empty - } - - unsigned get_size_slow() const { - unsigned total = 0; - total += queue_front.size(); - total += queue.request_count(); - for (auto i = high_queue.cbegin(); i != high_queue.cend(); ++i) { - ceph_assert(i->second.get_size_slow()); - total += i->second.get_size_slow(); - } - return total; - } - - // be sure to do things in reverse priority order and push_front - // to the list so items end up on list in front-to-back priority - // order - void remove_by_filter(std::function<bool (T&&)> filter_accum) { - queue.remove_by_req_filter([&] (std::unique_ptr<T>&& r) { - return filter_accum(std::move(*r)); - }, true); - - for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) { - if (filter_accum(std::move(i->second))) { - i = decltype(i){ queue_front.erase(std::next(i).base()) }; - } else { - ++i; - } - } - - for (typename SubQueues::iterator i = high_queue.begin(); - i != high_queue.end(); - /* no-inc */ ) { - i->second.remove_by_filter(filter_accum); - if (i->second.empty()) { - i = high_queue.erase(i); - } else { - ++i; - } - } - } - - void remove_by_class(K k, std::list<T> *out = nullptr) override final { - if (out) { - queue.remove_by_client(k, - true, - [&out] (std::unique_ptr<T>&& t) { - out->push_front(std::move(*t)); - }); - } else { - queue.remove_by_client(k, true); - } - - for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) { - if (k == i->first) { - if (nullptr != out) out->push_front(std::move(i->second)); - i = decltype(i){ queue_front.erase(std::next(i).base()) }; - } else { - ++i; - } - } - - for (auto i = high_queue.begin(); i != high_queue.end(); /* no-inc */) { - i->second.remove_by_class(k, out); - if (i->second.empty()) { - i = high_queue.erase(i); - } else { - ++i; - } - } - } - - void enqueue_strict(K cl, unsigned priority, T&& item) override final { - high_queue[priority].enqueue(cl, 1, std::move(item)); - } - - void enqueue_strict_front(K cl, unsigned priority, T&& item) override final { - high_queue[priority].enqueue_front(cl, 1, std::move(item)); - } - - void enqueue(K cl, unsigned priority, unsigned cost, T&& item) override final { - // priority is ignored - queue.add_request(std::move(item), cl, cost); - } - - void enqueue_front(K cl, - unsigned priority, - unsigned cost, - T&& item) override final { - queue_front.emplace_front(std::pair<K,T>(cl, std::move(item))); - } - - bool empty() const override final { - return queue.empty() && high_queue.empty() && queue_front.empty(); - } - - T dequeue() override final { - ceph_assert(!empty()); - - if (!high_queue.empty()) { - T ret = std::move(high_queue.rbegin()->second.front().second); - high_queue.rbegin()->second.pop_front(); - if (high_queue.rbegin()->second.empty()) { - high_queue.erase(high_queue.rbegin()->first); - } - return ret; - } - - if (!queue_front.empty()) { - T ret = std::move(queue_front.front().second); - queue_front.pop_front(); - return ret; - } - - auto pr = queue.pull_request(); - ceph_assert(pr.is_retn()); - auto& retn = pr.get_retn(); - return std::move(*(retn.request)); - } - - void dump(ceph::Formatter *f) const override final { - f->open_array_section("high_queues"); - for (typename SubQueues::const_iterator p = high_queue.begin(); - p != high_queue.end(); - ++p) { - f->open_object_section("subqueue"); - f->dump_int("priority", p->first); - p->second.dump(f); - f->close_section(); - } - f->close_section(); - - f->open_object_section("queue_front"); - f->dump_int("size", queue_front.size()); - f->close_section(); - - f->open_object_section("queue"); - f->dump_int("size", queue.request_count()); - f->close_section(); - } // dump - - void print(std::ostream &os) const final { - os << "mClockPriorityQueue"; - } - }; - -} // namespace ceph diff --git a/src/common/options/cephfs-mirror.yaml.in b/src/common/options/cephfs-mirror.yaml.in index 78f86dfb1..f82616187 100644 --- a/src/common/options/cephfs-mirror.yaml.in +++ b/src/common/options/cephfs-mirror.yaml.in @@ -91,4 +91,15 @@ options: default: 10 services: - cephfs-mirror - min: 0
\ No newline at end of file + min: 0 +- name: cephfs_mirror_perf_stats_prio + type: int + level: advanced + desc: Priority level for mirror daemon replication perf counters + long_desc: The daemon will send perf counter data to the manager daemon if the priority + is not lower than mgr_stats_threshold. + default: 5 + services: + - cephfs-mirror + min: 0 + max: 11 diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index fa426a115..f0eaedf5a 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -4964,7 +4964,7 @@ options: type: str level: advanced desc: Full set of rocksdb settings to override - default: compression=kNoCompression,max_write_buffer_number=64,min_write_buffer_number_to_merge=6,compaction_style=kCompactionStyleLevel,write_buffer_size=16777216,max_background_jobs=4,level0_file_num_compaction_trigger=8,max_bytes_for_level_base=1073741824,max_bytes_for_level_multiplier=8,compaction_readahead_size=2MB,max_total_wal_size=1073741824,writable_file_max_buffer_size=0 + default: compression=kLZ4Compression,max_write_buffer_number=64,min_write_buffer_number_to_merge=6,compaction_style=kCompactionStyleLevel,write_buffer_size=16777216,max_background_jobs=4,level0_file_num_compaction_trigger=8,max_bytes_for_level_base=1073741824,max_bytes_for_level_multiplier=8,compaction_readahead_size=2MB,max_total_wal_size=1073741824,writable_file_max_buffer_size=0 with_legacy: true - name: bluestore_rocksdb_options_annex type: str diff --git a/src/common/options/mds-client.yaml.in b/src/common/options/mds-client.yaml.in index 4e599d4cf..1f7600dee 100644 --- a/src/common/options/mds-client.yaml.in +++ b/src/common/options/mds-client.yaml.in @@ -315,6 +315,8 @@ options: default: true services: - mds_client + flags: + - startup with_legacy: true - name: client_force_lazyio type: bool diff --git a/src/common/options/mds.yaml.in b/src/common/options/mds.yaml.in index 6eb0702fc..6234b96cd 100644 --- a/src/common/options/mds.yaml.in +++ b/src/common/options/mds.yaml.in @@ -65,15 +65,6 @@ options: - mds flags: - runtime -# max xattr kv pairs size for each dir/file -- name: mds_max_xattr_pairs_size - type: size - level: advanced - desc: maximum aggregate size of extended attributes on a file - default: 64_K - services: - - mds - with_legacy: true - name: mds_cache_trim_interval type: secs level: advanced @@ -1175,6 +1166,24 @@ options: services: - mds with_legacy: true +# Max number of slow ops to track +- name: mds_op_history_slow_op_size + type: uint + level: advanced + desc: maximum size for list of historical slow operations + default: 20 + services: + - mds + with_legacy: true +# Track the op if over this threshold +- name: mds_op_history_slow_op_threshold + type: uint + level: advanced + desc: track the op if over this threshold + default: 10 + services: + - mds + with_legacy: true # how many seconds old makes an op complaint-worthy - name: mds_op_complaint_time type: float @@ -1533,4 +1542,16 @@ options: services: - mds flags: + - runtime +- name: defer_client_eviction_on_laggy_osds + type: bool + level: advanced + desc: Do not evict client if any osd is laggy + long_desc: Laggy OSD(s) can make clients laggy or unresponsive, this can + lead to their eviction, this option once enabled can help defer client + eviction. + default: false + services: + - mds + flags: - runtime diff --git a/src/common/options/osd.yaml.in b/src/common/options/osd.yaml.in index 7291ce11d..a9596cec1 100644 --- a/src/common/options/osd.yaml.in +++ b/src/common/options/osd.yaml.in @@ -182,7 +182,7 @@ options: desc: Maximum concurrent scrubs on a single OSD fmt_desc: The maximum number of simultaneous scrub operations for a Ceph OSD Daemon. - default: 1 + default: 3 with_legacy: true - name: osd_scrub_during_recovery type: bool @@ -895,13 +895,13 @@ options: desc: Do not store full-object checksums if the backend (bluestore) does its own checksums. Only usable with all BlueStore OSDs. default: false -# PrioritzedQueue (prio), Weighted Priority Queue (wpq ; default), -# mclock_opclass, mclock_client, or debug_random. "mclock_opclass" -# and "mclock_client" are based on the mClock/dmClock algorithm -# (Gulati, et al. 2010). "mclock_opclass" prioritizes based on the -# class the operation belongs to. "mclock_client" does the same but -# also works to ienforce fairness between clients. "debug_random" -# chooses among all four with equal probability. +# Weighted Priority Queue (wpq), mClock Scheduler (mclock_scheduler: default) +# or debug_random. "mclock_scheduler" is based on the mClock/dmClock +# algorithm (Gulati, et al. 2010). "mclock_scheduler" prioritizes based on +# the class the operation belongs to. "wpq" dequeues ops based on their +# priorities. "debug_random" chooses among the two with equal probability. +# Note: PrioritzedQueue (prio) implementation is not used for scheduling ops +# within OSDs and is therefore not listed. - name: osd_op_queue type: str level: advanced diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 241632a22..9c7f91f9e 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -233,7 +233,7 @@ options: long_desc: The lifecycle maintenance thread is responsible for lifecycle related maintenance work. The thread itself can be disabled, but in order for lifecycle to work correctly, at least one RGW in each zone needs to have this thread running. - Havingthe thread enabled on multiple RGW processes within the same zone can spread + Having the thread enabled on multiple RGW processes within the same zone can spread some of the maintenance work between them. default: true services: @@ -359,7 +359,11 @@ options: type: str level: advanced desc: Lifecycle allowed work time - long_desc: Local time window in which the lifecycle maintenance thread can work. + long_desc: Local time window in which the lifecycle maintenance thread can work. It expects + 24-hour time notation. For example, "00:00-23:59" means starting at midnight lifecycle + is allowed to run for the whole day (24 hours). When lifecycle completes, it waits for the + next maintenance window. In this example, if it completes at 01:00, it will resume processing + 23 hours later at the following midnight. default: 00:00-06:00 services: - rgw diff --git a/src/common/weighted_shuffle.h b/src/common/weighted_shuffle.h index 10def0a01..dd8f22da0 100644 --- a/src/common/weighted_shuffle.h +++ b/src/common/weighted_shuffle.h @@ -14,6 +14,8 @@ void weighted_shuffle(RandomIt first, RandomIt last, { if (first == last) { return; + } else if (std::accumulate(weight_first, weight_last, 0) == 0) { + return; } else { std::discrete_distribution d{weight_first, weight_last}; if (auto n = d(g); n > 0) { |