diff options
Diffstat (limited to '')
-rw-r--r-- | src/common/config_proxy.h | 192 |
1 files changed, 83 insertions, 109 deletions
diff --git a/src/common/config_proxy.h b/src/common/config_proxy.h index 02c670f60..b9b47d9ce 100644 --- a/src/common/config_proxy.h +++ b/src/common/config_proxy.h @@ -18,91 +18,51 @@ class ConfigProxy { */ ConfigValues values; using md_config_obs_t = ceph::md_config_obs_impl<ConfigProxy>; - ObserverMgr<md_config_obs_t> obs_mgr; + using ObsMgr = ObserverMgr<md_config_obs_t>; + ObsMgr obs_mgr; md_config_t config; /** A lock that protects the md_config_t internals. It is * recursive, for simplicity. * It is best if this lock comes first in the lock hierarchy. We will * hold this lock when calling configuration observers. */ - mutable ceph::recursive_mutex lock = - ceph::make_recursive_mutex("ConfigProxy::lock"); + mutable ceph::mutex lock = ceph::make_mutex("ConfigProxy::lock"); + ceph::condition_variable cond; - class CallGate { - private: - uint32_t call_count = 0; - ceph::mutex lock; - ceph::condition_variable cond; - public: - CallGate() - : lock(ceph::make_mutex("call::gate::lock")) { - } + using rev_obs_map_t = ObsMgr::rev_obs_map; - void enter() { - std::lock_guard<ceph::mutex> locker(lock); - ++call_count; + void _call_observers(rev_obs_map_t& rev_obs) { + ceph_assert(!ceph::mutex_debugging || !ceph_mutex_is_locked_by_me(lock)); + for (auto& [obs, keys] : rev_obs) { + (*obs)->handle_conf_change(*this, keys); } - void leave() { - std::lock_guard<ceph::mutex> locker(lock); - ceph_assert(call_count > 0); - if (--call_count == 0) { - cond.notify_all(); - } + rev_obs.clear(); // drop shared_ptrs + { + std::lock_guard l{lock}; + cond.notify_all(); } - void close() { - std::unique_lock<ceph::mutex> locker(lock); - while (call_count != 0) { - cond.wait(locker); - } - } - }; - - void call_gate_enter(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->enter(); - } - void call_gate_leave(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->leave(); } - void call_gate_close(md_config_obs_t *obs) { - auto p = obs_call_gate.find(obs); - ceph_assert(p != obs_call_gate.end()); - p->second->close(); - } - - using rev_obs_map_t = ObserverMgr<md_config_obs_t>::rev_obs_map; - typedef std::unique_ptr<CallGate> CallGateRef; - - std::map<md_config_obs_t*, CallGateRef> obs_call_gate; - - void call_observers(std::unique_lock<ceph::recursive_mutex>& locker, - rev_obs_map_t& rev_obs) { - // observers are notified outside of lock - locker.unlock(); - for (auto& [obs, keys] : rev_obs) { - obs->handle_conf_change(*this, keys); - } - locker.lock(); - - for (auto& rev_ob : rev_obs) { - call_gate_leave(rev_ob.first); + void _gather_changes(std::set<std::string> &changes, + rev_obs_map_t *rev_obs, std::ostream* oss) { + ceph_assert(ceph_mutex_is_locked_by_me(lock)); + std::map<std::string,bool> changes_present; + for (auto& change : changes) { + std::string dummy; + changes_present[change] = (0 == config.get_val(values, change, &dummy)); } + obs_mgr.for_each_change( + changes_present, + [this, rev_obs](auto obs, const std::string &key) { + _map_observer_changes(obs, key, rev_obs); + }, oss); + changes.clear(); } - void map_observer_changes(md_config_obs_t *obs, const std::string &key, + void _map_observer_changes(ObsMgr::config_obs_ptr obs, const std::string& key, rev_obs_map_t *rev_obs) { - ceph_assert(ceph_mutex_is_locked(lock)); + ceph_assert(ceph_mutex_is_locked_by_me(lock)); auto [it, new_entry] = rev_obs->emplace(obs, std::set<std::string>{}); it->second.emplace(key); - if (new_entry) { - // this needs to be done under lock as once this lock is - // dropped (before calling observers) a remove_observer() - // can sneak in and cause havoc. - call_gate_enter(obs); - } } public: @@ -150,12 +110,15 @@ public: std::forward<Args>(args)...); } void config_options(ceph::Formatter *f) const { + std::lock_guard l{lock}; config.config_options(f); } const decltype(md_config_t::schema)& get_schema() const { + std::lock_guard l{lock}; return config.schema; } const Option* get_schema(const std::string_view key) const { + std::lock_guard l{lock}; auto found = config.schema.find(key); if (found == config.schema.end()) { return nullptr; @@ -164,6 +127,7 @@ public: } } const Option *find_option(const std::string& name) const { + std::lock_guard l{lock}; return config.find_option(name); } void diff(ceph::Formatter *f, const std::string& name = {}) const { @@ -186,6 +150,7 @@ public: sections, key, out, emeta); } unsigned get_osd_pool_default_min_size(uint8_t size) const { + std::lock_guard l{lock}; return config.get_osd_pool_default_min_size(values, size); } void early_expand_meta(std::string &val, @@ -195,39 +160,46 @@ public: } // for those want to reexpand special meta, e.g, $pid void finalize_reexpand_meta() { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - if (config.finalize_reexpand_meta(values, obs_mgr)) { - _gather_changes(values.changed, &rev_obs, nullptr); + { + std::lock_guard locker(lock); + if (config.finalize_reexpand_meta(values, obs_mgr)) { + _gather_changes(values.changed, &rev_obs, nullptr); + } } - call_observers(locker, rev_obs); + _call_observers(rev_obs); } void add_observer(md_config_obs_t* obs) { std::lock_guard l(lock); obs_mgr.add_observer(obs); - obs_call_gate.emplace(obs, std::make_unique<CallGate>()); + cond.notify_all(); } void remove_observer(md_config_obs_t* obs) { - std::lock_guard l(lock); - call_gate_close(obs); - obs_call_gate.erase(obs); - obs_mgr.remove_observer(obs); + std::unique_lock l(lock); + auto wptr = obs_mgr.remove_observer(obs); + while (!wptr.expired()) { + cond.wait(l); + } } void call_all_observers() { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - obs_mgr.for_each_observer( - [this, &rev_obs](md_config_obs_t *obs, const std::string &key) { - map_observer_changes(obs, key, &rev_obs); - }); + { + std::lock_guard locker(lock); + obs_mgr.for_each_observer( + [this, &rev_obs](auto obs, const std::string& key) { + _map_observer_changes(obs, key, &rev_obs); + }); + } - call_observers(locker, rev_obs); + _call_observers(rev_obs); } void set_safe_to_start_threads() { + std::lock_guard l(lock); config.set_safe_to_start_threads(); } void _clear_safe_to_start_threads() { + std::lock_guard l(lock); config._clear_safe_to_start_threads(); } void show_config(std::ostream& out) { @@ -248,25 +220,18 @@ public: } // Expand all metavariables. Make any pending observer callbacks. void apply_changes(std::ostream* oss) { - std::unique_lock locker(lock); rev_obs_map_t rev_obs; - // apply changes until the cluster name is assigned - if (!values.cluster.empty()) { - // meta expands could have modified anything. Copy it all out again. - _gather_changes(values.changed, &rev_obs, oss); + { + std::lock_guard locker(lock); + // apply changes until the cluster name is assigned + if (!values.cluster.empty()) { + // meta expands could have modified anything. Copy it all out again. + _gather_changes(values.changed, &rev_obs, oss); + } } - call_observers(locker, rev_obs); - } - void _gather_changes(std::set<std::string> &changes, - rev_obs_map_t *rev_obs, std::ostream* oss) { - obs_mgr.for_each_change( - changes, *this, - [this, rev_obs](md_config_obs_t *obs, const std::string &key) { - map_observer_changes(obs, key, rev_obs); - }, oss); - changes.clear(); + _call_observers(rev_obs); } int set_val(const std::string_view key, const std::string& s, std::stringstream* err_ss=nullptr) { @@ -284,23 +249,27 @@ public: int set_mon_vals(CephContext *cct, const std::map<std::string,std::string,std::less<>>& kv, md_config_t::config_callback config_cb) { - std::unique_lock locker(lock); - int ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb); - + int ret; rev_obs_map_t rev_obs; - _gather_changes(values.changed, &rev_obs, nullptr); - call_observers(locker, rev_obs); + { + std::lock_guard locker(lock); + ret = config.set_mon_vals(cct, values, obs_mgr, kv, config_cb); + _gather_changes(values.changed, &rev_obs, nullptr); + } + + _call_observers(rev_obs); return ret; } int injectargs(const std::string &s, std::ostream *oss) { - std::unique_lock locker(lock); - int ret = config.injectargs(values, obs_mgr, s, oss); - + int ret; rev_obs_map_t rev_obs; - _gather_changes(values.changed, &rev_obs, oss); - - call_observers(locker, rev_obs); + { + std::lock_guard locker(lock); + ret = config.injectargs(values, obs_mgr, s, oss); + _gather_changes(values.changed, &rev_obs, oss); + } + _call_observers(rev_obs); return ret; } void parse_env(unsigned entity_type, @@ -319,12 +288,15 @@ public: conf_files, warnings, flags); } bool has_parse_error() const { + std::lock_guard l(lock); return !config.parse_error.empty(); } std::string get_parse_error() { + std::lock_guard l(lock); return config.parse_error; } void complain_about_parse_error(CephContext *cct) { + std::lock_guard l(lock); return config.complain_about_parse_error(cct); } void do_argv_commands() const { @@ -342,9 +314,11 @@ public: config.get_defaults_bl(values, bl); } const std::string& get_conf_path() const { + std::lock_guard l(lock); return config.get_conf_path(); } std::optional<std::string> get_val_default(std::string_view key) { + std::lock_guard l(lock); return config.get_val_default(key); } }; |