diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/common/Finisher.h | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/common/Finisher.h')
-rw-r--r-- | src/common/Finisher.h | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/src/common/Finisher.h b/src/common/Finisher.h new file mode 100644 index 000000000..f1060b0e4 --- /dev/null +++ b/src/common/Finisher.h @@ -0,0 +1,239 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_FINISHER_H +#define CEPH_FINISHER_H + +#include "include/Context.h" +#include "include/common_fwd.h" +#include "common/Thread.h" +#include "common/ceph_mutex.h" +#include "common/perf_counters.h" +#include "common/Cond.h" + + +/// Finisher queue length performance counter ID. +enum { + l_finisher_first = 997082, + l_finisher_queue_len, + l_finisher_complete_lat, + l_finisher_last +}; + +/** @brief Asynchronous cleanup class. + * Finisher asynchronously completes Contexts, which are simple classes + * representing callbacks, in a dedicated worker thread. Enqueuing + * contexts to complete is thread-safe. + */ +class Finisher { + CephContext *cct; + ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running. + ceph::condition_variable finisher_cond; ///< Signaled when there is something to process. + ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process. + bool finisher_stop; ///< Set when the finisher should stop. + bool finisher_running; ///< True when the finisher is currently executing contexts. + bool finisher_empty_wait; ///< True mean someone wait finisher empty. + + /// Queue for contexts for which complete(0) will be called. + std::vector<std::pair<Context*,int>> finisher_queue; + std::vector<std::pair<Context*,int>> in_progress_queue; + + std::string thread_name; + + /// Performance counter for the finisher's queue length. + /// Only active for named finishers. + PerfCounters *logger; + + void *finisher_thread_entry(); + + struct FinisherThread : public Thread { + Finisher *fin; + explicit FinisherThread(Finisher *f) : fin(f) {} + void* entry() override { return fin->finisher_thread_entry(); } + } finisher_thread; + + public: + /// Add a context to complete, optionally specifying a parameter for the complete function. + void queue(Context *c, int r = 0) { + std::unique_lock ul(finisher_lock); + bool was_empty = finisher_queue.empty(); + finisher_queue.push_back(std::make_pair(c, r)); + if (was_empty) { + finisher_cond.notify_one(); + } + if (logger) + logger->inc(l_finisher_queue_len); + } + + void queue(std::list<Context*>& ls) { + { + std::unique_lock ul(finisher_lock); + if (finisher_queue.empty()) { + finisher_cond.notify_all(); + } + for (auto i : ls) { + finisher_queue.push_back(std::make_pair(i, 0)); + } + if (logger) + logger->inc(l_finisher_queue_len, ls.size()); + } + ls.clear(); + } + void queue(std::deque<Context*>& ls) { + { + std::unique_lock ul(finisher_lock); + if (finisher_queue.empty()) { + finisher_cond.notify_all(); + } + for (auto i : ls) { + finisher_queue.push_back(std::make_pair(i, 0)); + } + if (logger) + logger->inc(l_finisher_queue_len, ls.size()); + } + ls.clear(); + } + void queue(std::vector<Context*>& ls) { + { + std::unique_lock ul(finisher_lock); + if (finisher_queue.empty()) { + finisher_cond.notify_all(); + } + for (auto i : ls) { + finisher_queue.push_back(std::make_pair(i, 0)); + } + if (logger) + logger->inc(l_finisher_queue_len, ls.size()); + } + ls.clear(); + } + + /// Start the worker thread. + void start(); + + /** @brief Stop the worker thread. + * + * Does not wait until all outstanding contexts are completed. + * To ensure that everything finishes, you should first shut down + * all sources that can add contexts to this finisher and call + * wait_for_empty() before calling stop(). */ + void stop(); + + /** @brief Blocks until the finisher has nothing left to process. + * This function will also return when a concurrent call to stop() + * finishes, but this class should never be used in this way. */ + void wait_for_empty(); + + /// Construct an anonymous Finisher. + /// Anonymous finishers do not log their queue length. + explicit Finisher(CephContext *cct_) : + cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")), + finisher_stop(false), finisher_running(false), finisher_empty_wait(false), + thread_name("fn_anonymous"), logger(0), + finisher_thread(this) {} + + /// Construct a named Finisher that logs its queue length. + Finisher(CephContext *cct_, std::string name, std::string tn) : + cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)), + finisher_stop(false), finisher_running(false), finisher_empty_wait(false), + thread_name(tn), logger(0), + finisher_thread(this) { + PerfCountersBuilder b(cct, std::string("finisher-") + name, + l_finisher_first, l_finisher_last); + b.add_u64(l_finisher_queue_len, "queue_len"); + b.add_time_avg(l_finisher_complete_lat, "complete_latency"); + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + logger->set(l_finisher_queue_len, 0); + logger->set(l_finisher_complete_lat, 0); + } + + ~Finisher() { + if (logger && cct) { + cct->get_perfcounters_collection()->remove(logger); + delete logger; + } + } +}; + +/// Context that is completed asynchronously on the supplied finisher. +class C_OnFinisher : public Context { + Context *con; + Finisher *fin; +public: + C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) { + ceph_assert(fin != NULL); + ceph_assert(con != NULL); + } + + ~C_OnFinisher() override { + if (con != nullptr) { + delete con; + con = nullptr; + } + } + + void finish(int r) override { + fin->queue(con, r); + con = nullptr; + } +}; + +class ContextQueue { + std::list<Context *> q; + std::mutex q_mutex; + ceph::mutex& mutex; + ceph::condition_variable& cond; + std::atomic_bool q_empty = true; +public: + ContextQueue(ceph::mutex& mut, + ceph::condition_variable& con) + : mutex(mut), cond(con) {} + + void queue(std::list<Context *>& ls) { + bool was_empty = false; + { + std::scoped_lock l(q_mutex); + if (q.empty()) { + q.swap(ls); + was_empty = true; + } else { + q.insert(q.end(), ls.begin(), ls.end()); + } + q_empty = q.empty(); + } + + if (was_empty) { + std::scoped_lock l{mutex}; + cond.notify_all(); + } + + ls.clear(); + } + + void move_to(std::list<Context *>& ls) { + ls.clear(); + std::scoped_lock l(q_mutex); + if (!q.empty()) { + q.swap(ls); + } + q_empty = true; + } + + bool empty() { + return q_empty; + } +}; + +#endif |