summaryrefslogtreecommitdiffstats
path: root/src/include/Context.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/include/Context.h535
1 files changed, 535 insertions, 0 deletions
diff --git a/src/include/Context.h b/src/include/Context.h
new file mode 100644
index 000000000..bef85ca5b
--- /dev/null
+++ b/src/include/Context.h
@@ -0,0 +1,535 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_CONTEXT_H
+#define CEPH_CONTEXT_H
+
+#include "common/dout.h"
+
+#include <functional>
+#include <list>
+#include <memory>
+#include <set>
+
+#include <boost/function.hpp>
+#include <boost/system/error_code.hpp>
+
+#include "common/error_code.h"
+
+#include "include/ceph_assert.h"
+#include "common/ceph_mutex.h"
+
+#define mydout(cct, v) lgeneric_subdout(cct, context, v)
+
+/*
+ * GenContext - abstract callback class
+ */
+template <typename T>
+class GenContext {
+ GenContext(const GenContext& other);
+ const GenContext& operator=(const GenContext& other);
+
+ protected:
+ virtual void finish(T t) = 0;
+
+ public:
+ GenContext() {}
+ virtual ~GenContext() {} // we want a virtual destructor!!!
+
+ template <typename C>
+ void complete(C &&t) {
+ finish(std::forward<C>(t));
+ delete this;
+ }
+
+ template <typename C>
+ void operator()(C &&t) noexcept {
+ complete(std::forward<C>(t));
+ }
+
+ template<typename U = T>
+ auto operator()() noexcept
+ -> typename std::enable_if<std::is_default_constructible<U>::value,
+ void>::type {
+ complete(T{});
+ }
+
+
+ std::reference_wrapper<GenContext> func() {
+ return std::ref(*this);
+ }
+};
+
+template <typename T>
+using GenContextURef = std::unique_ptr<GenContext<T> >;
+
+/*
+ * Context - abstract callback class
+ */
+class Finisher;
+class Context {
+ Context(const Context& other);
+ const Context& operator=(const Context& other);
+
+ protected:
+ virtual void finish(int r) = 0;
+
+ // variant of finish that is safe to call "synchronously." override should
+ // return true.
+ virtual bool sync_finish(int r) {
+ return false;
+ }
+
+ public:
+ Context() {}
+ virtual ~Context() {} // we want a virtual destructor!!!
+ virtual void complete(int r) {
+ finish(r);
+ delete this;
+ }
+ virtual bool sync_complete(int r) {
+ if (sync_finish(r)) {
+ delete this;
+ return true;
+ }
+ return false;
+ }
+ void complete(boost::system::error_code ec) {
+ complete(ceph::from_error_code(ec));
+ }
+ void operator()(boost::system::error_code ec) noexcept {
+ complete(ec);
+ }
+
+ void operator()() noexcept {
+ complete({});
+ }
+
+ std::reference_wrapper<Context> func() {
+ return std::ref(*this);
+ }
+};
+
+/**
+ * Simple context holding a single object
+ */
+template<class T>
+class ContainerContext : public Context {
+ T obj;
+public:
+ ContainerContext(T &obj) : obj(obj) {}
+ void finish(int r) override {}
+};
+template <typename T>
+ContainerContext<T> *make_container_context(T &&t) {
+ return new ContainerContext<T>(std::forward<T>(t));
+}
+
+template <class T>
+struct Wrapper : public Context {
+ Context *to_run;
+ T val;
+ Wrapper(Context *to_run, T val) : to_run(to_run), val(val) {}
+ void finish(int r) override {
+ if (to_run)
+ to_run->complete(r);
+ }
+};
+struct RunOnDelete {
+ Context *to_run;
+ RunOnDelete(Context *to_run) : to_run(to_run) {}
+ ~RunOnDelete() {
+ if (to_run)
+ to_run->complete(0);
+ }
+};
+typedef std::shared_ptr<RunOnDelete> RunOnDeleteRef;
+
+template <typename T>
+class LambdaContext : public Context {
+public:
+ LambdaContext(T &&t) : t(std::forward<T>(t)) {}
+ void finish(int r) override {
+ if constexpr (std::is_invocable_v<T, int>)
+ t(r);
+ else
+ t();
+ }
+private:
+ T t;
+};
+
+template <typename T>
+LambdaContext<T> *make_lambda_context(T &&t) {
+ return new LambdaContext<T>(std::move(t));
+}
+
+template <typename F, typename T>
+struct LambdaGenContext : GenContext<T> {
+ F f;
+ LambdaGenContext(F &&f) : f(std::forward<F>(f)) {}
+ void finish(T t) override {
+ f(std::forward<T>(t));
+ }
+};
+template <typename T, typename F>
+GenContextURef<T> make_gen_lambda_context(F &&f) {
+ return GenContextURef<T>(new LambdaGenContext<F, T>(std::move(f)));
+}
+
+/*
+ * finish and destroy a list of Contexts
+ */
+template<class C>
+inline void finish_contexts(CephContext *cct, C& finished, int result = 0)
+{
+ if (finished.empty())
+ return;
+
+ C ls;
+ ls.swap(finished); // swap out of place to avoid weird loops
+
+ if (cct)
+ mydout(cct,10) << ls.size() << " contexts to finish with " << result << dendl;
+ for (Context* c : ls) {
+ if (cct)
+ mydout(cct,10) << "---- " << c << dendl;
+ c->complete(result);
+ }
+}
+
+class C_NoopContext : public Context {
+public:
+ void finish(int r) override { }
+};
+
+
+struct C_Lock : public Context {
+ ceph::mutex *lock;
+ Context *fin;
+ C_Lock(ceph::mutex *l, Context *c) : lock(l), fin(c) {}
+ ~C_Lock() override {
+ delete fin;
+ }
+ void finish(int r) override {
+ if (fin) {
+ std::lock_guard l{*lock};
+ fin->complete(r);
+ fin = NULL;
+ }
+ }
+};
+
+/*
+ * C_Contexts - set of Contexts
+ *
+ * ContextType must be an ancestor class of ContextInstanceType, or the same class.
+ * ContextInstanceType must be default-constructable.
+ */
+template <class ContextType, class ContextInstanceType, class Container = std::list<ContextType *>>
+class C_ContextsBase : public ContextInstanceType {
+public:
+ CephContext *cct;
+ Container contexts;
+
+ C_ContextsBase(CephContext *cct_)
+ : cct(cct_)
+ {
+ }
+ ~C_ContextsBase() override {
+ for (auto c : contexts) {
+ delete c;
+ }
+ }
+ void add(ContextType* c) {
+ contexts.push_back(c);
+ }
+ void take(Container& ls) {
+ Container c;
+ c.swap(ls);
+ if constexpr (std::is_same_v<Container, std::list<ContextType *>>) {
+ contexts.splice(contexts.end(), c);
+ } else {
+ contexts.insert(contexts.end(), c.begin(), c.end());
+ }
+ }
+ void complete(int r) override {
+ // Neuter any ContextInstanceType custom complete(), because although
+ // I want to look like it, I don't actually want to run its code.
+ Context::complete(r);
+ }
+ void finish(int r) override {
+ finish_contexts(cct, contexts, r);
+ }
+ bool empty() { return contexts.empty(); }
+
+ template<class C>
+ static ContextType *list_to_context(C& cs) {
+ if (cs.size() == 0) {
+ return 0;
+ } else if (cs.size() == 1) {
+ ContextType *c = cs.front();
+ cs.clear();
+ return c;
+ } else {
+ C_ContextsBase<ContextType, ContextInstanceType> *c(new C_ContextsBase<ContextType, ContextInstanceType>(0));
+ c->take(cs);
+ return c;
+ }
+ }
+};
+
+typedef C_ContextsBase<Context, Context> C_Contexts;
+
+/*
+ * C_Gather
+ *
+ * ContextType must be an ancestor class of ContextInstanceType, or the same class.
+ * ContextInstanceType must be default-constructable.
+ *
+ * BUG:? only reports error from last sub to have an error return
+ */
+template <class ContextType, class ContextInstanceType>
+class C_GatherBase {
+private:
+ CephContext *cct;
+ int result = 0;
+ ContextType *onfinish;
+#ifdef DEBUG_GATHER
+ std::set<ContextType*> waitfor;
+#endif
+ int sub_created_count = 0;
+ int sub_existing_count = 0;
+ mutable ceph::recursive_mutex lock =
+ ceph::make_recursive_mutex("C_GatherBase::lock"); // disable lockdep
+ bool activated = false;
+
+ void sub_finish(ContextType* sub, int r) {
+ lock.lock();
+#ifdef DEBUG_GATHER
+ ceph_assert(waitfor.count(sub));
+ waitfor.erase(sub);
+#endif
+ --sub_existing_count;
+ mydout(cct,10) << "C_GatherBase " << this << ".sub_finish(r=" << r << ") " << sub
+#ifdef DEBUG_GATHER
+ << " (remaining " << waitfor << ")"
+#endif
+ << dendl;
+ if (r < 0 && result == 0)
+ result = r;
+ if ((activated == false) || (sub_existing_count != 0)) {
+ lock.unlock();
+ return;
+ }
+ lock.unlock();
+ delete_me();
+ }
+
+ void delete_me() {
+ if (onfinish) {
+ onfinish->complete(result);
+ onfinish = 0;
+ }
+ delete this;
+ }
+
+ class C_GatherSub : public ContextInstanceType {
+ C_GatherBase *gather;
+ public:
+ C_GatherSub(C_GatherBase *g) : gather(g) {}
+ void complete(int r) override {
+ // Cancel any customized complete() functionality
+ // from the Context subclass we're templated for,
+ // we only want to hit that in onfinish, not at each
+ // sub finish. e.g. MDSInternalContext.
+ Context::complete(r);
+ }
+ void finish(int r) override {
+ gather->sub_finish(this, r);
+ gather = 0;
+ }
+ ~C_GatherSub() override {
+ if (gather)
+ gather->sub_finish(this, 0);
+ }
+ };
+
+public:
+ C_GatherBase(CephContext *cct_, ContextType *onfinish_)
+ : cct(cct_), onfinish(onfinish_)
+ {
+ mydout(cct,10) << "C_GatherBase " << this << ".new" << dendl;
+ }
+ ~C_GatherBase() {
+ mydout(cct,10) << "C_GatherBase " << this << ".delete" << dendl;
+ }
+ void set_finisher(ContextType *onfinish_) {
+ std::lock_guard l{lock};
+ ceph_assert(!onfinish);
+ onfinish = onfinish_;
+ }
+ void activate() {
+ lock.lock();
+ ceph_assert(activated == false);
+ activated = true;
+ if (sub_existing_count != 0) {
+ lock.unlock();
+ return;
+ }
+ lock.unlock();
+ delete_me();
+ }
+ ContextType *new_sub() {
+ std::lock_guard l{lock};
+ ceph_assert(activated == false);
+ sub_created_count++;
+ sub_existing_count++;
+ ContextType *s = new C_GatherSub(this);
+#ifdef DEBUG_GATHER
+ waitfor.insert(s);
+#endif
+ mydout(cct,10) << "C_GatherBase " << this << ".new_sub is " << sub_created_count << " " << s << dendl;
+ return s;
+ }
+
+ inline int get_sub_existing_count() const {
+ std::lock_guard l{lock};
+ return sub_existing_count;
+ }
+
+ inline int get_sub_created_count() const {
+ std::lock_guard l{lock};
+ return sub_created_count;
+ }
+};
+
+/*
+ * The C_GatherBuilder remembers each C_Context created by
+ * C_GatherBuilder.new_sub() in a C_Gather. When a C_Context created
+ * by new_sub() is complete(), C_Gather forgets about it. When
+ * C_GatherBuilder notices that there are no C_Context left in
+ * C_Gather, it calls complete() on the C_Context provided as the
+ * second argument of the constructor (finisher).
+ *
+ * How to use C_GatherBuilder:
+ *
+ * 1. Create a C_GatherBuilder on the stack
+ * 2. Call gather_bld.new_sub() as many times as you want to create new subs
+ * It is safe to call this 0 times, or 100, or anything in between.
+ * 3. If you didn't supply a finisher in the C_GatherBuilder constructor,
+ * set one with gather_bld.set_finisher(my_finisher)
+ * 4. Call gather_bld.activate()
+ *
+ * Example:
+ *
+ * C_SaferCond all_done;
+ * C_GatherBuilder gb(g_ceph_context, all_done);
+ * j.submit_entry(1, first, 0, gb.new_sub()); // add a C_Context to C_Gather
+ * j.submit_entry(2, first, 0, gb.new_sub()); // add a C_Context to C_Gather
+ * gb.activate(); // consume C_Context as soon as they complete()
+ * all_done.wait(); // all_done is complete() after all new_sub() are complete()
+ *
+ * The finisher may be called at any point after step 4, including immediately
+ * from the activate() function.
+ * The finisher will never be called before activate().
+ *
+ * Note: Currently, subs must be manually freed by the caller (for some reason.)
+ */
+template <class ContextType, class GatherType>
+class C_GatherBuilderBase
+{
+public:
+ C_GatherBuilderBase(CephContext *cct_)
+ : cct(cct_), c_gather(NULL), finisher(NULL), activated(false)
+ {
+ }
+ C_GatherBuilderBase(CephContext *cct_, ContextType *finisher_)
+ : cct(cct_), c_gather(NULL), finisher(finisher_), activated(false)
+ {
+ }
+ ~C_GatherBuilderBase() {
+ if (c_gather) {
+ ceph_assert(activated); // Don't forget to activate your C_Gather!
+ }
+ else {
+ delete finisher;
+ }
+ }
+ ContextType *new_sub() {
+ if (!c_gather) {
+ c_gather = new GatherType(cct, finisher);
+ }
+ return c_gather->new_sub();
+ }
+ void activate() {
+ if (!c_gather)
+ return;
+ ceph_assert(finisher != NULL);
+ activated = true;
+ c_gather->activate();
+ }
+ void set_finisher(ContextType *finisher_) {
+ finisher = finisher_;
+ if (c_gather)
+ c_gather->set_finisher(finisher);
+ }
+ GatherType *get() const {
+ return c_gather;
+ }
+ bool has_subs() const {
+ return (c_gather != NULL);
+ }
+ int num_subs_created() {
+ ceph_assert(!activated);
+ if (c_gather == NULL)
+ return 0;
+ return c_gather->get_sub_created_count();
+ }
+ int num_subs_remaining() {
+ ceph_assert(!activated);
+ if (c_gather == NULL)
+ return 0;
+ return c_gather->get_sub_existing_count();
+ }
+
+private:
+ CephContext *cct;
+ GatherType *c_gather;
+ ContextType *finisher;
+ bool activated;
+};
+
+typedef C_GatherBase<Context, Context> C_Gather;
+typedef C_GatherBuilderBase<Context, C_Gather > C_GatherBuilder;
+
+template <class ContextType>
+class ContextFactory {
+public:
+ virtual ~ContextFactory() {}
+ virtual ContextType *build() = 0;
+};
+
+inline auto lambdafy(Context *c) {
+ return [fin = std::unique_ptr<Context>(c)]
+ (boost::system::error_code ec) mutable {
+ fin.release()->complete(ceph::from_error_code(ec));
+ };
+}
+
+
+#undef mydout
+
+#endif