summaryrefslogtreecommitdiffstats
path: root/src/common/Finisher.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/common/Finisher.h
parentInitial commit. (diff)
downloadceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.tar.xz
ceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/common/Finisher.h')
-rw-r--r--src/common/Finisher.h239
1 files changed, 239 insertions, 0 deletions
diff --git a/src/common/Finisher.h b/src/common/Finisher.h
new file mode 100644
index 000000000..f1060b0e4
--- /dev/null
+++ b/src/common/Finisher.h
@@ -0,0 +1,239 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_FINISHER_H
+#define CEPH_FINISHER_H
+
+#include "include/Context.h"
+#include "include/common_fwd.h"
+#include "common/Thread.h"
+#include "common/ceph_mutex.h"
+#include "common/perf_counters.h"
+#include "common/Cond.h"
+
+
+/// Finisher queue length performance counter ID.
+enum {
+ l_finisher_first = 997082,
+ l_finisher_queue_len,
+ l_finisher_complete_lat,
+ l_finisher_last
+};
+
+/** @brief Asynchronous cleanup class.
+ * Finisher asynchronously completes Contexts, which are simple classes
+ * representing callbacks, in a dedicated worker thread. Enqueuing
+ * contexts to complete is thread-safe.
+ */
+class Finisher {
+ CephContext *cct;
+ ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running.
+ ceph::condition_variable finisher_cond; ///< Signaled when there is something to process.
+ ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
+ bool finisher_stop; ///< Set when the finisher should stop.
+ bool finisher_running; ///< True when the finisher is currently executing contexts.
+ bool finisher_empty_wait; ///< True mean someone wait finisher empty.
+
+ /// Queue for contexts for which complete(0) will be called.
+ std::vector<std::pair<Context*,int>> finisher_queue;
+ std::vector<std::pair<Context*,int>> in_progress_queue;
+
+ std::string thread_name;
+
+ /// Performance counter for the finisher's queue length.
+ /// Only active for named finishers.
+ PerfCounters *logger;
+
+ void *finisher_thread_entry();
+
+ struct FinisherThread : public Thread {
+ Finisher *fin;
+ explicit FinisherThread(Finisher *f) : fin(f) {}
+ void* entry() override { return fin->finisher_thread_entry(); }
+ } finisher_thread;
+
+ public:
+ /// Add a context to complete, optionally specifying a parameter for the complete function.
+ void queue(Context *c, int r = 0) {
+ std::unique_lock ul(finisher_lock);
+ bool was_empty = finisher_queue.empty();
+ finisher_queue.push_back(std::make_pair(c, r));
+ if (was_empty) {
+ finisher_cond.notify_one();
+ }
+ if (logger)
+ logger->inc(l_finisher_queue_len);
+ }
+
+ void queue(std::list<Context*>& ls) {
+ {
+ std::unique_lock ul(finisher_lock);
+ if (finisher_queue.empty()) {
+ finisher_cond.notify_all();
+ }
+ for (auto i : ls) {
+ finisher_queue.push_back(std::make_pair(i, 0));
+ }
+ if (logger)
+ logger->inc(l_finisher_queue_len, ls.size());
+ }
+ ls.clear();
+ }
+ void queue(std::deque<Context*>& ls) {
+ {
+ std::unique_lock ul(finisher_lock);
+ if (finisher_queue.empty()) {
+ finisher_cond.notify_all();
+ }
+ for (auto i : ls) {
+ finisher_queue.push_back(std::make_pair(i, 0));
+ }
+ if (logger)
+ logger->inc(l_finisher_queue_len, ls.size());
+ }
+ ls.clear();
+ }
+ void queue(std::vector<Context*>& ls) {
+ {
+ std::unique_lock ul(finisher_lock);
+ if (finisher_queue.empty()) {
+ finisher_cond.notify_all();
+ }
+ for (auto i : ls) {
+ finisher_queue.push_back(std::make_pair(i, 0));
+ }
+ if (logger)
+ logger->inc(l_finisher_queue_len, ls.size());
+ }
+ ls.clear();
+ }
+
+ /// Start the worker thread.
+ void start();
+
+ /** @brief Stop the worker thread.
+ *
+ * Does not wait until all outstanding contexts are completed.
+ * To ensure that everything finishes, you should first shut down
+ * all sources that can add contexts to this finisher and call
+ * wait_for_empty() before calling stop(). */
+ void stop();
+
+ /** @brief Blocks until the finisher has nothing left to process.
+ * This function will also return when a concurrent call to stop()
+ * finishes, but this class should never be used in this way. */
+ void wait_for_empty();
+
+ /// Construct an anonymous Finisher.
+ /// Anonymous finishers do not log their queue length.
+ explicit Finisher(CephContext *cct_) :
+ cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")),
+ finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
+ thread_name("fn_anonymous"), logger(0),
+ finisher_thread(this) {}
+
+ /// Construct a named Finisher that logs its queue length.
+ Finisher(CephContext *cct_, std::string name, std::string tn) :
+ cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)),
+ finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
+ thread_name(tn), logger(0),
+ finisher_thread(this) {
+ PerfCountersBuilder b(cct, std::string("finisher-") + name,
+ l_finisher_first, l_finisher_last);
+ b.add_u64(l_finisher_queue_len, "queue_len");
+ b.add_time_avg(l_finisher_complete_lat, "complete_latency");
+ logger = b.create_perf_counters();
+ cct->get_perfcounters_collection()->add(logger);
+ logger->set(l_finisher_queue_len, 0);
+ logger->set(l_finisher_complete_lat, 0);
+ }
+
+ ~Finisher() {
+ if (logger && cct) {
+ cct->get_perfcounters_collection()->remove(logger);
+ delete logger;
+ }
+ }
+};
+
+/// Context that is completed asynchronously on the supplied finisher.
+class C_OnFinisher : public Context {
+ Context *con;
+ Finisher *fin;
+public:
+ C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) {
+ ceph_assert(fin != NULL);
+ ceph_assert(con != NULL);
+ }
+
+ ~C_OnFinisher() override {
+ if (con != nullptr) {
+ delete con;
+ con = nullptr;
+ }
+ }
+
+ void finish(int r) override {
+ fin->queue(con, r);
+ con = nullptr;
+ }
+};
+
+class ContextQueue {
+ std::list<Context *> q;
+ std::mutex q_mutex;
+ ceph::mutex& mutex;
+ ceph::condition_variable& cond;
+ std::atomic_bool q_empty = true;
+public:
+ ContextQueue(ceph::mutex& mut,
+ ceph::condition_variable& con)
+ : mutex(mut), cond(con) {}
+
+ void queue(std::list<Context *>& ls) {
+ bool was_empty = false;
+ {
+ std::scoped_lock l(q_mutex);
+ if (q.empty()) {
+ q.swap(ls);
+ was_empty = true;
+ } else {
+ q.insert(q.end(), ls.begin(), ls.end());
+ }
+ q_empty = q.empty();
+ }
+
+ if (was_empty) {
+ std::scoped_lock l{mutex};
+ cond.notify_all();
+ }
+
+ ls.clear();
+ }
+
+ void move_to(std::list<Context *>& ls) {
+ ls.clear();
+ std::scoped_lock l(q_mutex);
+ if (!q.empty()) {
+ q.swap(ls);
+ }
+ q_empty = true;
+ }
+
+ bool empty() {
+ return q_empty;
+ }
+};
+
+#endif