summaryrefslogtreecommitdiffstats
path: root/src/crimson/common/smp_helpers.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/crimson/common/smp_helpers.h92
1 files changed, 92 insertions, 0 deletions
diff --git a/src/crimson/common/smp_helpers.h b/src/crimson/common/smp_helpers.h
new file mode 100644
index 000000000..c2b7bd964
--- /dev/null
+++ b/src/crimson/common/smp_helpers.h
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <limits>
+
+#include <seastar/core/smp.hh>
+
+#include "crimson/common/errorator.h"
+#include "crimson/common/utility.h"
+
+namespace crimson {
+
+using core_id_t = seastar::shard_id;
+static constexpr core_id_t NULL_CORE = std::numeric_limits<core_id_t>::max();
+
+auto submit_to(core_id_t core, auto &&f) {
+ using ret_type = decltype(f());
+ if constexpr (is_errorated_future_v<ret_type>) {
+ auto ret = seastar::smp::submit_to(
+ core,
+ [f=std::move(f)]() mutable {
+ return f().to_base();
+ });
+ return ret_type(std::move(ret));
+ } else {
+ return seastar::smp::submit_to(core, std::move(f));
+ }
+}
+
+template <typename Obj, typename Method, typename... Args>
+auto proxy_method_on_core(
+ core_id_t core, Obj &obj, Method method, Args&&... args) {
+ return crimson::submit_to(
+ core,
+ [&obj, method,
+ arg_tuple=std::make_tuple(std::forward<Args>(args)...)]() mutable {
+ return apply_method_to_tuple(obj, method, std::move(arg_tuple));
+ });
+}
+
+/**
+ * reactor_map_seq
+ *
+ * Invokes f on each reactor sequentially, Caller may assume that
+ * f will not be invoked concurrently on multiple cores.
+ */
+template <typename F>
+auto reactor_map_seq(F &&f) {
+ using ret_type = decltype(f());
+ if constexpr (is_errorated_future_v<ret_type>) {
+ auto ret = crimson::do_for_each(
+ seastar::smp::all_cpus().begin(),
+ seastar::smp::all_cpus().end(),
+ [f=std::move(f)](auto core) mutable {
+ return seastar::smp::submit_to(
+ core,
+ [&f] {
+ return std::invoke(f);
+ });
+ });
+ return ret_type(ret);
+ } else {
+ return seastar::do_for_each(
+ seastar::smp::all_cpus().begin(),
+ seastar::smp::all_cpus().end(),
+ [f=std::move(f)](auto core) mutable {
+ return seastar::smp::submit_to(
+ core,
+ [&f] {
+ return std::invoke(f);
+ });
+ });
+ }
+}
+
+/**
+ * sharded_map_seq
+ *
+ * Invokes f on each shard of t sequentially. Caller may assume that
+ * f will not be invoked concurrently on multiple cores.
+ */
+template <typename T, typename F>
+auto sharded_map_seq(T &t, F &&f) {
+ return reactor_map_seq(
+ [&t, f=std::forward<F>(f)]() mutable {
+ return std::invoke(f, t.local());
+ });
+}
+
+}