summaryrefslogtreecommitdiffstats
path: root/src/librbd/TaskFinisher.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/librbd/TaskFinisher.h
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/librbd/TaskFinisher.h159
1 files changed, 159 insertions, 0 deletions
diff --git a/src/librbd/TaskFinisher.h b/src/librbd/TaskFinisher.h
new file mode 100644
index 00000000..410b8ee8
--- /dev/null
+++ b/src/librbd/TaskFinisher.h
@@ -0,0 +1,159 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef LIBRBD_TASK_FINISHER_H
+#define LIBRBD_TASK_FINISHER_H
+
+#include "include/Context.h"
+#include "common/ceph_context.h"
+#include "common/Finisher.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include <map>
+#include <utility>
+
+class CephContext;
+
+namespace librbd {
+
+struct TaskFinisherSingleton {
+ Mutex m_lock;
+ SafeTimer *m_safe_timer;
+ Finisher *m_finisher;
+
+ explicit TaskFinisherSingleton(CephContext *cct)
+ : m_lock("librbd::TaskFinisher::m_lock") {
+ m_safe_timer = new SafeTimer(cct, m_lock, false);
+ m_safe_timer->init();
+ m_finisher = new Finisher(cct, "librbd::TaskFinisher::m_finisher", "taskfin_librbd");
+ m_finisher->start();
+ }
+ virtual ~TaskFinisherSingleton() {
+ {
+ Mutex::Locker l(m_lock);
+ m_safe_timer->shutdown();
+ delete m_safe_timer;
+ }
+ m_finisher->wait_for_empty();
+ m_finisher->stop();
+ delete m_finisher;
+ }
+};
+
+
+template <typename Task>
+class TaskFinisher {
+public:
+ TaskFinisher(CephContext &cct) : m_cct(cct) {
+ auto& singleton =
+ cct.lookup_or_create_singleton_object<TaskFinisherSingleton>(
+ "librbd::TaskFinisher::m_safe_timer", false, &cct);
+ m_lock = &singleton.m_lock;
+ m_safe_timer = singleton.m_safe_timer;
+ m_finisher = singleton.m_finisher;
+ }
+
+ void cancel(const Task& task) {
+ Mutex::Locker l(*m_lock);
+ typename TaskContexts::iterator it = m_task_contexts.find(task);
+ if (it != m_task_contexts.end()) {
+ delete it->second.first;
+ m_safe_timer->cancel_event(it->second.second);
+ m_task_contexts.erase(it);
+ }
+ }
+
+ void cancel_all(Context *comp) {
+ {
+ Mutex::Locker l(*m_lock);
+ for (typename TaskContexts::iterator it = m_task_contexts.begin();
+ it != m_task_contexts.end(); ++it) {
+ delete it->second.first;
+ m_safe_timer->cancel_event(it->second.second);
+ }
+ m_task_contexts.clear();
+ }
+ m_finisher->queue(comp);
+ }
+
+ bool add_event_after(const Task& task, double seconds, Context *ctx) {
+ Mutex::Locker l(*m_lock);
+ if (m_task_contexts.count(task) != 0) {
+ // task already scheduled on finisher or timer
+ delete ctx;
+ return false;
+ }
+ C_Task *timer_ctx = new C_Task(this, task);
+ m_task_contexts[task] = std::make_pair(ctx, timer_ctx);
+
+ m_safe_timer->add_event_after(seconds, timer_ctx);
+ return true;
+ }
+
+ void queue(Context *ctx) {
+ m_finisher->queue(ctx);
+ }
+
+ bool queue(const Task& task, Context *ctx) {
+ Mutex::Locker l(*m_lock);
+ typename TaskContexts::iterator it = m_task_contexts.find(task);
+ if (it != m_task_contexts.end()) {
+ if (it->second.second != NULL) {
+ ceph_assert(m_safe_timer->cancel_event(it->second.second));
+ delete it->second.first;
+ } else {
+ // task already scheduled on the finisher
+ delete ctx;
+ return false;
+ }
+ }
+ m_task_contexts[task] = std::make_pair(ctx, reinterpret_cast<Context *>(0));
+
+ m_finisher->queue(new C_Task(this, task));
+ return true;
+ }
+
+private:
+ class C_Task : public Context {
+ public:
+ C_Task(TaskFinisher *task_finisher, const Task& task)
+ : m_task_finisher(task_finisher), m_task(task)
+ {
+ }
+ protected:
+ void finish(int r) override {
+ m_task_finisher->complete(m_task);
+ }
+ private:
+ TaskFinisher *m_task_finisher;
+ Task m_task;
+ };
+
+ CephContext &m_cct;
+
+ Mutex *m_lock;
+ Finisher *m_finisher;
+ SafeTimer *m_safe_timer;
+
+ typedef std::map<Task, std::pair<Context *, Context *> > TaskContexts;
+ TaskContexts m_task_contexts;
+
+ void complete(const Task& task) {
+ Context *ctx = NULL;
+ {
+ Mutex::Locker l(*m_lock);
+ typename TaskContexts::iterator it = m_task_contexts.find(task);
+ if (it != m_task_contexts.end()) {
+ ctx = it->second.first;
+ m_task_contexts.erase(it);
+ }
+ }
+
+ if (ctx != NULL) {
+ ctx->complete(0);
+ }
+ }
+};
+
+} // namespace librbd
+
+#endif // LIBRBD_TASK_FINISHER