summaryrefslogtreecommitdiffstats
path: root/src/crimson/common
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/common
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/common')
-rw-r--r--src/crimson/common/assert.cc81
-rw-r--r--src/crimson/common/auth_handler.h17
-rw-r--r--src/crimson/common/buffer_io.cc57
-rw-r--r--src/crimson/common/buffer_io.h21
-rw-r--r--src/crimson/common/condition_variable.h43
-rw-r--r--src/crimson/common/config_proxy.cc93
-rw-r--r--src/crimson/common/config_proxy.h222
-rw-r--r--src/crimson/common/errorator-loop.h91
-rw-r--r--src/crimson/common/errorator.h1358
-rw-r--r--src/crimson/common/exception.h54
-rw-r--r--src/crimson/common/fatal_signal.cc172
-rw-r--r--src/crimson/common/fatal_signal.h21
-rw-r--r--src/crimson/common/fixed_kv_node_layout.h730
-rw-r--r--src/crimson/common/formatter.cc40
-rw-r--r--src/crimson/common/formatter.h13
-rw-r--r--src/crimson/common/gated.h55
-rw-r--r--src/crimson/common/interruptible_future.h1600
-rw-r--r--src/crimson/common/layout.h737
-rw-r--r--src/crimson/common/local_shared_foreign_ptr.h245
-rw-r--r--src/crimson/common/log.cc21
-rw-r--r--src/crimson/common/log.h88
-rw-r--r--src/crimson/common/logclient.cc364
-rw-r--r--src/crimson/common/logclient.h232
-rw-r--r--src/crimson/common/operation.cc75
-rw-r--r--src/crimson/common/operation.h776
-rw-r--r--src/crimson/common/perf_counters_collection.cc41
-rw-r--r--src/crimson/common/perf_counters_collection.h49
-rw-r--r--src/crimson/common/shared_lru.h180
-rw-r--r--src/crimson/common/simple_lru.h141
-rw-r--r--src/crimson/common/smp_helpers.h92
-rw-r--r--src/crimson/common/throttle.cc64
-rw-r--r--src/crimson/common/throttle.h43
-rw-r--r--src/crimson/common/tmap_helpers.cc131
-rw-r--r--src/crimson/common/tmap_helpers.h40
-rw-r--r--src/crimson/common/tri_mutex.cc225
-rw-r--r--src/crimson/common/tri_mutex.h156
-rw-r--r--src/crimson/common/type_helpers.h8
-rw-r--r--src/crimson/common/utility.h38
38 files changed, 8414 insertions, 0 deletions
diff --git a/src/crimson/common/assert.cc b/src/crimson/common/assert.cc
new file mode 100644
index 000000000..07610c33f
--- /dev/null
+++ b/src/crimson/common/assert.cc
@@ -0,0 +1,81 @@
+#include <cstdarg>
+#include <iostream>
+
+#include <seastar/util/backtrace.hh>
+#include <seastar/core/reactor.hh>
+
+#include "include/ceph_assert.h"
+
+#include "crimson/common/log.h"
+
+namespace ceph {
+ [[gnu::cold]] void __ceph_assert_fail(const ceph::assert_data &ctx)
+ {
+ __ceph_assert_fail(ctx.assertion, ctx.file, ctx.line, ctx.function);
+ }
+
+ [[gnu::cold]] void __ceph_assert_fail(const char* assertion,
+ const char* file, int line,
+ const char* func)
+ {
+ seastar::logger& logger = crimson::get_logger(0);
+ logger.error("{}:{} : In function '{}', ceph_assert(%s)\n"
+ "{}",
+ file, line, func, assertion,
+ seastar::current_backtrace());
+ std::cout << std::flush;
+ abort();
+ }
+ [[gnu::cold]] void __ceph_assertf_fail(const char *assertion,
+ const char *file, int line,
+ const char *func, const char* msg,
+ ...)
+ {
+ char buf[8096];
+ va_list args;
+ va_start(args, msg);
+ std::vsnprintf(buf, sizeof(buf), msg, args);
+ va_end(args);
+
+ seastar::logger& logger = crimson::get_logger(0);
+ logger.error("{}:{} : In function '{}', ceph_assert(%s)\n"
+ "{}\n{}\n",
+ file, line, func, assertion,
+ buf,
+ seastar::current_backtrace());
+ std::cout << std::flush;
+ abort();
+ }
+
+ [[gnu::cold]] void __ceph_abort(const char* file, int line,
+ const char* func, const std::string& msg)
+ {
+ seastar::logger& logger = crimson::get_logger(0);
+ logger.error("{}:{} : In function '{}', abort(%s)\n"
+ "{}",
+ file, line, func, msg,
+ seastar::current_backtrace());
+ std::cout << std::flush;
+ abort();
+ }
+
+ [[gnu::cold]] void __ceph_abortf(const char* file, int line,
+ const char* func, const char* fmt,
+ ...)
+ {
+ char buf[8096];
+ va_list args;
+ va_start(args, fmt);
+ std::vsnprintf(buf, sizeof(buf), fmt, args);
+ va_end(args);
+
+ seastar::logger& logger = crimson::get_logger(0);
+ logger.error("{}:{} : In function '{}', abort()\n"
+ "{}\n{}\n",
+ file, line, func,
+ buf,
+ seastar::current_backtrace());
+ std::cout << std::flush;
+ abort();
+ }
+}
diff --git a/src/crimson/common/auth_handler.h b/src/crimson/common/auth_handler.h
new file mode 100644
index 000000000..d4140b6a2
--- /dev/null
+++ b/src/crimson/common/auth_handler.h
@@ -0,0 +1,17 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+class EntityName;
+class AuthCapsInfo;
+
+namespace crimson::common {
+class AuthHandler {
+public:
+ // the peer just got authorized
+ virtual void handle_authentication(const EntityName& name,
+ const AuthCapsInfo& caps) = 0;
+ virtual ~AuthHandler() = default;
+};
+}
diff --git a/src/crimson/common/buffer_io.cc b/src/crimson/common/buffer_io.cc
new file mode 100644
index 000000000..86edf7a6f
--- /dev/null
+++ b/src/crimson/common/buffer_io.cc
@@ -0,0 +1,57 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "buffer_io.h"
+
+#include <seastar/core/reactor.hh>
+#include <seastar/core/fstream.hh>
+#include <seastar/core/do_with.hh>
+
+#include "include/buffer.h"
+
+namespace crimson {
+
+seastar::future<> write_file(ceph::buffer::list&& bl,
+ seastar::sstring fn,
+ seastar::file_permissions permissions)
+{
+ const auto flags = (seastar::open_flags::wo |
+ seastar::open_flags::create |
+ seastar::open_flags::truncate);
+ seastar::file_open_options foo;
+ foo.create_permissions = permissions;
+ return seastar::open_file_dma(fn, flags, foo).then(
+ [bl=std::move(bl)](seastar::file f) {
+ return seastar::make_file_output_stream(f).then(
+ [bl=std::move(bl), f=std::move(f)](seastar::output_stream<char> out) {
+ return seastar::do_with(std::move(out),
+ std::move(f),
+ std::move(bl),
+ [](seastar::output_stream<char>& out,
+ seastar::file& f,
+ ceph::buffer::list& bl) {
+ return seastar::do_for_each(bl.buffers(), [&out](auto& buf) {
+ return out.write(buf.c_str(), buf.length());
+ }).then([&out] {
+ return out.close();
+ });
+ });
+ });
+ });
+}
+
+seastar::future<seastar::temporary_buffer<char>>
+read_file(const seastar::sstring fn)
+{
+ return seastar::open_file_dma(fn, seastar::open_flags::ro).then(
+ [] (seastar::file f) {
+ return f.size().then([f = std::move(f)](size_t s) {
+ return seastar::do_with(seastar::make_file_input_stream(f),
+ [s](seastar::input_stream<char>& in) {
+ return in.read_exactly(s);
+ });
+ });
+ });
+}
+
+}
diff --git a/src/crimson/common/buffer_io.h b/src/crimson/common/buffer_io.h
new file mode 100644
index 000000000..c5ece4a6f
--- /dev/null
+++ b/src/crimson/common/buffer_io.h
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <seastar/core/file-types.hh>
+
+#include "include/buffer_fwd.h"
+
+namespace crimson {
+ seastar::future<> write_file(ceph::buffer::list&& bl,
+ seastar::sstring fn,
+ seastar::file_permissions= // 0644
+ (seastar::file_permissions::user_read |
+ seastar::file_permissions::user_write |
+ seastar::file_permissions::group_read |
+ seastar::file_permissions::others_read));
+ seastar::future<seastar::temporary_buffer<char>>
+ read_file(const seastar::sstring fn);
+}
diff --git a/src/crimson/common/condition_variable.h b/src/crimson/common/condition_variable.h
new file mode 100644
index 000000000..19267f38a
--- /dev/null
+++ b/src/crimson/common/condition_variable.h
@@ -0,0 +1,43 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <seastar/core/condition-variable.hh>
+#include <seastar/core/loop.hh>
+
+#include "crimson/common/interruptible_future.h"
+
+namespace crimson {
+
+class condition_variable : public seastar::condition_variable {
+public:
+ template <typename Pred, typename Func>
+ auto wait(
+ Pred&& pred,
+ Func&& action) noexcept {
+ using func_result_t = std::invoke_result_t<Func>;
+ using intr_errorator_t = typename func_result_t::interrupt_errorator_type;
+ using intr_cond_t = typename func_result_t::interrupt_cond_type;
+ using interruptor = crimson::interruptible::interruptor<intr_cond_t>;
+ return interruptor::repeat(
+ [this, pred=std::forward<Pred>(pred),
+ action=std::forward<Func>(action)]()
+ -> typename intr_errorator_t::template future<seastar::stop_iteration> {
+ if (!pred()) {
+ return seastar::condition_variable::wait().then([] {
+ return seastar::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ });
+ } else {
+ return action().si_then([] {
+ return seastar::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ });
+ }
+ });
+ }
+};
+
+} // namespace crimson
diff --git a/src/crimson/common/config_proxy.cc b/src/crimson/common/config_proxy.cc
new file mode 100644
index 000000000..88d4679d5
--- /dev/null
+++ b/src/crimson/common/config_proxy.cc
@@ -0,0 +1,93 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "config_proxy.h"
+
+#include <filesystem>
+
+#include "crimson/common/buffer_io.h"
+
+namespace crimson::common {
+
+ConfigProxy::ConfigProxy(const EntityName& name, std::string_view cluster)
+{
+ if (seastar::this_shard_id() != 0) {
+ return;
+ }
+ // set the initial value on CPU#0
+ values.reset(seastar::make_lw_shared<ConfigValues>());
+ values.get()->name = name;
+ values.get()->cluster = cluster;
+ // and the only copy of md_config_impl<> is allocated on CPU#0
+ local_config.reset(new md_config_t{*values, obs_mgr, true});
+ if (name.is_mds()) {
+ local_config->set_val_default(*values, obs_mgr,
+ "keyring", "$mds_data/keyring");
+ } else if (name.is_osd()) {
+ local_config->set_val_default(*values, obs_mgr,
+ "keyring", "$osd_data/keyring");
+ }
+}
+
+seastar::future<> ConfigProxy::start()
+{
+ // populate values and config to all other shards
+ if (!values) {
+ return seastar::make_ready_future<>();
+ }
+ return container().invoke_on_others([this](auto& proxy) {
+ return values.copy().then([config=local_config.get(),
+ &proxy](auto foreign_values) {
+ proxy.values.reset();
+ proxy.values = std::move(foreign_values);
+ proxy.remote_config = config;
+ return seastar::make_ready_future<>();
+ });
+ });
+}
+
+void ConfigProxy::show_config(ceph::Formatter* f) const {
+ get_config().show_config(*values, f);
+}
+
+seastar::future<> ConfigProxy::parse_config_files(const std::string& conf_files)
+{
+ auto conffile_paths =
+ get_config().get_conffile_paths(*values,
+ conf_files.empty() ? nullptr : conf_files.c_str(),
+ &std::cerr,
+ CODE_ENVIRONMENT_DAEMON);
+ return seastar::do_with(std::move(conffile_paths), [this] (auto& paths) {
+ return seastar::repeat([path=paths.begin(), e=paths.end(), this]() mutable {
+ if (path == e) {
+ // tried all conffile, none of them works
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ }
+ return crimson::read_file(*path++).then([this](auto&& buf) {
+ return do_change([buf=std::move(buf), this](ConfigValues& values) {
+ if (get_config().parse_buffer(values, obs_mgr,
+ buf.get(), buf.size(),
+ &std::cerr) == 0) {
+ get_config().update_legacy_vals(values);
+ } else {
+ throw std::invalid_argument("parse error");
+ }
+ }).then([] {
+ // this one works!
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }).handle_exception_type([] (const std::filesystem::filesystem_error&) {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ }).handle_exception_type([] (const std::invalid_argument&) {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ });
+ });
+}
+
+ConfigProxy::ShardedConfig ConfigProxy::sharded_conf;
+}
diff --git a/src/crimson/common/config_proxy.h b/src/crimson/common/config_proxy.h
new file mode 100644
index 000000000..4c0e65507
--- /dev/null
+++ b/src/crimson/common/config_proxy.h
@@ -0,0 +1,222 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
+#include "common/config.h"
+#include "common/config_obs.h"
+#include "common/config_obs_mgr.h"
+#include "common/errno.h"
+
+namespace ceph {
+class Formatter;
+}
+
+namespace crimson::common {
+
+// a facade for managing config. each shard has its own copy of ConfigProxy.
+//
+// In seastar-osd, there could be multiple instances of @c ConfigValues in a
+// single process, as we are using a variant of read-copy-update mechinary to
+// update the settings at runtime.
+class ConfigProxy : public seastar::peering_sharded_service<ConfigProxy>
+{
+ using LocalConfigValues = seastar::lw_shared_ptr<ConfigValues>;
+ seastar::foreign_ptr<LocalConfigValues> values;
+
+ md_config_t* remote_config = nullptr;
+ std::unique_ptr<md_config_t> local_config;
+
+ using ConfigObserver = ceph::md_config_obs_impl<ConfigProxy>;
+ ObserverMgr<ConfigObserver> obs_mgr;
+
+ const md_config_t& get_config() const {
+ return remote_config ? *remote_config : * local_config;
+ }
+ md_config_t& get_config() {
+ return remote_config ? *remote_config : * local_config;
+ }
+
+ // apply changes to all shards
+ // @param func a functor which accepts @c "ConfigValues&"
+ template<typename Func>
+ seastar::future<> do_change(Func&& func) {
+ return container().invoke_on(values.get_owner_shard(),
+ [func = std::move(func)](ConfigProxy& owner) {
+ // apply the changes to a copy of the values
+ auto new_values = seastar::make_lw_shared(*owner.values);
+ new_values->changed.clear();
+ func(*new_values);
+
+ // always apply the new settings synchronously on the owner shard, to
+ // avoid racings with other do_change() calls in parallel.
+ ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
+ owner.values.reset(new_values);
+ owner.obs_mgr.for_each_change(owner.values->changed, owner,
+ [&rev_obs](ConfigObserver *obs,
+ const std::string &key) {
+ rev_obs[obs].insert(key);
+ }, nullptr);
+ for (auto& [obs, keys] : rev_obs) {
+ obs->handle_conf_change(owner, keys);
+ }
+
+ return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count),
+ [&owner, new_values] (auto cpu) {
+ return owner.container().invoke_on(cpu,
+ [foreign_values = seastar::make_foreign(new_values)](ConfigProxy& proxy) mutable {
+ proxy.values.reset();
+ proxy.values = std::move(foreign_values);
+
+ ObserverMgr<ConfigObserver>::rev_obs_map rev_obs;
+ proxy.obs_mgr.for_each_change(proxy.values->changed, proxy,
+ [&rev_obs](ConfigObserver *obs, const std::string& key) {
+ rev_obs[obs].insert(key);
+ }, nullptr);
+ for (auto& obs_keys : rev_obs) {
+ obs_keys.first->handle_conf_change(proxy, obs_keys.second);
+ }
+ });
+ }).finally([new_values] {
+ new_values->changed.clear();
+ });
+ });
+ }
+public:
+ ConfigProxy(const EntityName& name, std::string_view cluster);
+ const ConfigValues* operator->() const noexcept {
+ return values.get();
+ }
+ const ConfigValues get_config_values() {
+ return *values.get();
+ }
+ ConfigValues* operator->() noexcept {
+ return values.get();
+ }
+
+ void get_config_bl(uint64_t have_version,
+ ceph::buffer::list *bl,
+ uint64_t *got_version) {
+ get_config().get_config_bl(get_config_values(), have_version,
+ bl, got_version);
+ }
+ void get_defaults_bl(ceph::buffer::list *bl) {
+ get_config().get_defaults_bl(get_config_values(), bl);
+ }
+ seastar::future<> start();
+ // required by sharded<>
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ void add_observer(ConfigObserver* obs) {
+ obs_mgr.add_observer(obs);
+ }
+ void remove_observer(ConfigObserver* obs) {
+ obs_mgr.remove_observer(obs);
+ }
+ seastar::future<> rm_val(const std::string& key) {
+ return do_change([key, this](ConfigValues& values) {
+ auto ret = get_config().rm_val(values, key);
+ if (ret < 0) {
+ throw std::invalid_argument(cpp_strerror(ret));
+ }
+ });
+ }
+ seastar::future<> set_val(const std::string& key,
+ const std::string& val) {
+ return do_change([key, val, this](ConfigValues& values) {
+ std::stringstream err;
+ auto ret = get_config().set_val(values, obs_mgr, key, val, &err);
+ if (ret < 0) {
+ throw std::invalid_argument(err.str());
+ }
+ });
+ }
+ int get_val(std::string_view key, std::string *val) const {
+ return get_config().get_val(*values, key, val);
+ }
+ template<typename T>
+ const T get_val(std::string_view key) const {
+ return get_config().template get_val<T>(*values, key);
+ }
+
+ int get_all_sections(std::vector<std::string>& sections) const {
+ return get_config().get_all_sections(sections);
+ }
+
+ int get_val_from_conf_file(const std::vector<std::string>& sections,
+ const std::string& key, std::string& out,
+ bool expand_meta) const {
+ return get_config().get_val_from_conf_file(*values, sections, key,
+ out, expand_meta);
+ }
+
+ unsigned get_osd_pool_default_min_size(uint8_t size) const {
+ return get_config().get_osd_pool_default_min_size(*values, size);
+ }
+
+ seastar::future<>
+ set_mon_vals(const std::map<std::string,std::string,std::less<>>& kv) {
+ return do_change([kv, this](ConfigValues& values) {
+ get_config().set_mon_vals(nullptr, values, obs_mgr, kv, nullptr);
+ });
+ }
+
+ seastar::future<> inject_args(const std::string& s) {
+ return do_change([s, this](ConfigValues& values) {
+ std::stringstream err;
+ if (get_config().injectargs(values, obs_mgr, s, &err)) {
+ throw std::invalid_argument(err.str());
+ }
+ });
+ }
+ void show_config(ceph::Formatter* f) const;
+
+ seastar::future<> parse_argv(std::vector<const char*>& argv) {
+ // we could pass whatever is unparsed to seastar, but seastar::app_template
+ // is used for driving the seastar application, and
+ // crimson::common::ConfigProxy is not available until seastar engine is up
+ // and running, so we have to feed the command line args to app_template
+ // first, then pass them to ConfigProxy.
+ return do_change([&argv, this](ConfigValues& values) {
+ get_config().parse_argv(values,
+ obs_mgr,
+ argv,
+ CONF_CMDLINE);
+ });
+ }
+
+ seastar::future<> parse_env() {
+ return do_change([this](ConfigValues& values) {
+ get_config().parse_env(CEPH_ENTITY_TYPE_OSD,
+ values,
+ obs_mgr);
+ });
+ }
+
+ seastar::future<> parse_config_files(const std::string& conf_files);
+
+ using ShardedConfig = seastar::sharded<ConfigProxy>;
+
+private:
+ static ShardedConfig sharded_conf;
+ friend ConfigProxy& local_conf();
+ friend ShardedConfig& sharded_conf();
+};
+
+inline ConfigProxy& local_conf() {
+ return ConfigProxy::sharded_conf.local();
+}
+
+inline ConfigProxy::ShardedConfig& sharded_conf() {
+ return ConfigProxy::sharded_conf;
+}
+
+template<typename T>
+const T get_conf(const std::string& key) {
+ return local_conf().template get_val<T>(key);
+}
+
+}
diff --git a/src/crimson/common/errorator-loop.h b/src/crimson/common/errorator-loop.h
new file mode 100644
index 000000000..bb3b7fb15
--- /dev/null
+++ b/src/crimson/common/errorator-loop.h
@@ -0,0 +1,91 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+
+#include "crimson/common/errorator.h"
+
+
+namespace crimson {
+template <class... AllowedErrors>
+class parallel_for_each_state final : private seastar::continuation_base<> {
+ using future_t = typename errorator<AllowedErrors...>::template future<>;
+ std::vector<future_t> _incomplete;
+ seastar::promise<> _result;
+ std::exception_ptr _ex;
+private:
+ void wait_for_one() noexcept {
+ while (!_incomplete.empty() && _incomplete.back().available()) {
+ if (_incomplete.back().failed()) {
+ _ex = _incomplete.back().get_exception();
+ }
+ _incomplete.pop_back();
+ }
+ if (!_incomplete.empty()) {
+ seastar::internal::set_callback(std::move(_incomplete.back()),
+ static_cast<continuation_base<>*>(this));
+ _incomplete.pop_back();
+ return;
+ }
+ if (__builtin_expect(bool(_ex), false)) {
+ _result.set_exception(std::move(_ex));
+ } else {
+ _result.set_value();
+ }
+ delete this;
+ }
+ virtual void run_and_dispose() noexcept override {
+ if (_state.failed()) {
+ _ex = std::move(_state).get_exception();
+ }
+ _state = {};
+ wait_for_one();
+ }
+ task* waiting_task() noexcept override { return _result.waiting_task(); }
+public:
+ parallel_for_each_state(size_t n) {
+ _incomplete.reserve(n);
+ }
+ void add_future(future_t&& f) {
+ _incomplete.push_back(std::move(f));
+ }
+ future_t get_future() {
+ auto ret = _result.get_future();
+ wait_for_one();
+ return ret;
+ }
+};
+
+template <typename Iterator, typename Func, typename... AllowedErrors>
+static inline typename errorator<AllowedErrors...>::template future<>
+parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept {
+ parallel_for_each_state<AllowedErrors...>* s = nullptr;
+ // Process all elements, giving each future the following treatment:
+ // - available, not failed: do nothing
+ // - available, failed: collect exception in ex
+ // - not available: collect in s (allocating it if needed)
+ for (;first != last; ++first) {
+ auto f = seastar::futurize_invoke(std::forward<Func>(func), *first);
+ if (!f.available() || f.failed()) {
+ if (!s) {
+ using itraits = std::iterator_traits<Iterator>;
+ auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
+ first, last, typename itraits::iterator_category()) + 1);
+ s = new parallel_for_each_state<AllowedErrors...>(n);
+ }
+ s->add_future(std::move(f));
+ }
+ }
+ // If any futures were not available, hand off to parallel_for_each_state::start().
+ // Otherwise we can return a result immediately.
+ if (s) {
+ // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
+ // so this isn't a leak
+ return s->get_future();
+ }
+ return seastar::make_ready_future<>();
+}
+
+} // namespace crimson
diff --git a/src/crimson/common/errorator.h b/src/crimson/common/errorator.h
new file mode 100644
index 000000000..c5d63d5b9
--- /dev/null
+++ b/src/crimson/common/errorator.h
@@ -0,0 +1,1358 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <exception>
+#include <system_error>
+
+#include <seastar/core/future-util.hh>
+
+#include "crimson/common/utility.h"
+#include "include/ceph_assert.h"
+
+namespace crimson::interruptible {
+
+template <typename, typename>
+class parallel_for_each_state;
+
+template <typename, typename>
+class interruptible_future_detail;
+
+}
+
+namespace crimson {
+
+// crimson::do_for_each_state is the mirror of seastar::do_for_each_state with FutureT
+template <typename Iterator, typename AsyncAction, typename FutureT>
+class do_for_each_state final : public seastar::continuation_base<> {
+ Iterator _begin;
+ Iterator _end;
+ AsyncAction _action;
+ seastar::promise<> _pr;
+
+public:
+ do_for_each_state(Iterator begin, Iterator end, AsyncAction action,
+ FutureT&& first_unavailable)
+ : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
+ seastar::internal::set_callback(std::move(first_unavailable), this);
+ }
+ virtual void run_and_dispose() noexcept override {
+ std::unique_ptr<do_for_each_state> zis(this);
+ if (_state.failed()) {
+ _pr.set_urgent_state(std::move(_state));
+ return;
+ }
+ while (_begin != _end) {
+ auto f = seastar::futurize_invoke(_action, *_begin);
+ ++_begin;
+ if (f.failed()) {
+ f._forward_to(std::move(_pr));
+ return;
+ }
+ if (!f.available() || seastar::need_preempt()) {
+ _state = {};
+ seastar::internal::set_callback(std::move(f), this);
+ zis.release();
+ return;
+ }
+ }
+ _pr.set_value();
+ }
+ task* waiting_task() noexcept override {
+ return _pr.waiting_task();
+ }
+ FutureT get_future() {
+ return _pr.get_future();
+ }
+};
+
+template<typename Iterator, typename AsyncAction,
+ typename FutureT = std::invoke_result_t<AsyncAction, typename Iterator::reference>>
+inline FutureT do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
+ while (begin != end) {
+ auto f = seastar::futurize_invoke(action, *begin);
+ ++begin;
+ if (f.failed()) {
+ return f;
+ }
+ if (!f.available() || seastar::need_preempt()) {
+ // s will be freed by run_and_dispose()
+ auto* s = new crimson::do_for_each_state<Iterator, AsyncAction, FutureT>{
+ std::move(begin), std::move(end), std::move(action), std::move(f)};
+ return s->get_future();
+ }
+ }
+ return seastar::make_ready_future<>();
+}
+
+template<typename Iterator, typename AsyncAction>
+inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) {
+ return ::crimson::do_for_each_impl(begin, end, std::move(action));
+}
+
+template<typename Container, typename AsyncAction>
+inline auto do_for_each(Container& c, AsyncAction action) {
+ return ::crimson::do_for_each(std::begin(c), std::end(c), std::move(action));
+}
+
+template<typename AsyncAction>
+inline auto repeat(AsyncAction action) {
+ using errorator_t =
+ typename ::seastar::futurize_t<std::invoke_result_t<AsyncAction>>::errorator_type;
+
+ while (true) {
+ auto f = ::seastar::futurize_invoke(action);
+ if (f.failed()) {
+ return errorator_t::template make_exception_future2<>(
+ f.get_exception()
+ );
+ } else if (f.available()) {
+ if (auto done = f.get0()) {
+ return errorator_t::template make_ready_future<>();
+ }
+ } else {
+ return std::move(f)._then(
+ [action = std::move(action)] (auto stop) mutable {
+ if (stop == seastar::stop_iteration::yes) {
+ return errorator_t::template make_ready_future<>();
+ }
+ return ::crimson::repeat(
+ std::move(action));
+ });
+ }
+ }
+}
+
+// define the interface between error types and errorator
+template <class ConcreteErrorT>
+class error_t {
+ static constexpr const std::type_info& get_exception_ptr_type_info() {
+ return ConcreteErrorT::exception_ptr_type_info();
+ }
+
+ decltype(auto) static from_exception_ptr(std::exception_ptr ep) {
+ return ConcreteErrorT::from_exception_ptr(std::move(ep));
+ }
+
+ template <class... AllowedErrorsT>
+ friend struct errorator;
+
+ template <class ErrorVisitorT, class FuturatorT>
+ friend class maybe_handle_error_t;
+
+protected:
+ std::exception_ptr to_exception_ptr() const {
+ const auto* concrete_error = static_cast<const ConcreteErrorT*>(this);
+ return concrete_error->to_exception_ptr();
+ }
+
+public:
+ template <class Func>
+ static decltype(auto) handle(Func&& func) {
+ return ConcreteErrorT::handle(std::forward<Func>(func));
+ }
+};
+
+// unthrowable_wrapper ensures compilation failure when somebody
+// would like to `throw make_error<...>)()` instead of returning.
+// returning allows for the compile-time verification of future's
+// AllowedErrorsV and also avoid the burden of throwing.
+template <class ErrorT, ErrorT ErrorV>
+struct unthrowable_wrapper : error_t<unthrowable_wrapper<ErrorT, ErrorV>> {
+ unthrowable_wrapper(const unthrowable_wrapper&) = delete;
+ [[nodiscard]] static const auto& make() {
+ static constexpr unthrowable_wrapper instance{};
+ return instance;
+ }
+
+ static auto exception_ptr() {
+ return make().to_exception_ptr();
+ }
+
+ template<class Func>
+ static auto handle(Func&& func) {
+ return [
+ func = std::forward<Func>(func)
+ ] (const unthrowable_wrapper& raw_error) mutable -> decltype(auto) {
+ if constexpr (std::is_invocable_v<Func, ErrorT, decltype(raw_error)>) {
+ // check whether the handler wants to take the raw error object which
+ // would be the case if it wants conditionally handle-or-pass-further.
+ return std::invoke(std::forward<Func>(func),
+ ErrorV,
+ std::move(raw_error));
+ } else if constexpr (std::is_invocable_v<Func, ErrorT>) {
+ return std::invoke(std::forward<Func>(func), ErrorV);
+ } else {
+ return std::invoke(std::forward<Func>(func));
+ }
+ };
+ }
+
+ struct pass_further {
+ decltype(auto) operator()(const unthrowable_wrapper& e) {
+ return e;
+ }
+ };
+
+ struct discard {
+ decltype(auto) operator()(const unthrowable_wrapper&) {
+ }
+ };
+
+
+private:
+ // can be used only to initialize the `instance` member
+ explicit unthrowable_wrapper() = default;
+
+ // implement the errorable interface
+ struct throwable_carrier{};
+ static std::exception_ptr carrier_instance;
+
+ static constexpr const std::type_info& exception_ptr_type_info() {
+ return typeid(throwable_carrier);
+ }
+ auto to_exception_ptr() const {
+ // error codes don't need to instantiate `std::exception_ptr` each
+ // time as the code is actually a part of the type itself.
+ // `std::make_exception_ptr()` on modern enough GCCs is quite cheap
+ // (see the Gleb Natapov's patch eradicating throw/catch there),
+ // but using one instance per type boils down the overhead to just
+ // ref-counting.
+ return carrier_instance;
+ }
+ static const auto& from_exception_ptr(std::exception_ptr) {
+ return make();
+ }
+
+ friend class error_t<unthrowable_wrapper<ErrorT, ErrorV>>;
+};
+
+template <class ErrorT, ErrorT ErrorV>
+std::exception_ptr unthrowable_wrapper<ErrorT, ErrorV>::carrier_instance = \
+ std::make_exception_ptr<
+ unthrowable_wrapper<ErrorT, ErrorV>::throwable_carrier>({});
+
+
+template <class ErrorT>
+struct stateful_error_t : error_t<stateful_error_t<ErrorT>> {
+ template <class... Args>
+ explicit stateful_error_t(Args&&... args)
+ : ep(std::make_exception_ptr<ErrorT>(std::forward<Args>(args)...)) {
+ }
+
+ template<class Func>
+ static auto handle(Func&& func) {
+ return [
+ func = std::forward<Func>(func)
+ ] (stateful_error_t<ErrorT>&& e) mutable -> decltype(auto) {
+ if constexpr (std::is_invocable_v<Func>) {
+ return std::invoke(std::forward<Func>(func));
+ }
+ try {
+ std::rethrow_exception(e.ep);
+ } catch (const ErrorT& obj) {
+ if constexpr (std::is_invocable_v<Func, decltype(obj), decltype(e)>) {
+ return std::invoke(std::forward<Func>(func), obj, e);
+ } else if constexpr (std::is_invocable_v<Func, decltype(obj)>) {
+ return std::invoke(std::forward<Func>(func), obj);
+ }
+ }
+ ceph_abort_msg("exception type mismatch -- impossible!");
+ };
+ }
+
+private:
+ std::exception_ptr ep;
+
+ explicit stateful_error_t(std::exception_ptr ep) : ep(std::move(ep)) {}
+
+ static constexpr const std::type_info& exception_ptr_type_info() {
+ return typeid(ErrorT);
+ }
+ auto to_exception_ptr() const {
+ return ep;
+ }
+ static stateful_error_t<ErrorT> from_exception_ptr(std::exception_ptr ep) {
+ return stateful_error_t<ErrorT>(std::move(ep));
+ }
+
+ friend class error_t<stateful_error_t<ErrorT>>;
+};
+
+namespace _impl {
+ template <class T> struct always_false : std::false_type {};
+};
+
+template <class ErrorVisitorT, class FuturatorT>
+class maybe_handle_error_t {
+ const std::type_info& type_info;
+ typename FuturatorT::type result;
+ ErrorVisitorT errfunc;
+
+public:
+ maybe_handle_error_t(ErrorVisitorT&& errfunc, std::exception_ptr ep)
+ : type_info(*ep.__cxa_exception_type()),
+ result(FuturatorT::make_exception_future(std::move(ep))),
+ errfunc(std::forward<ErrorVisitorT>(errfunc)) {
+ }
+
+ template <class ErrorT>
+ void handle() {
+ static_assert(std::is_invocable<ErrorVisitorT, ErrorT>::value,
+ "provided Error Visitor is not exhaustive");
+ // In C++ throwing an exception isn't the sole way to signal
+ // error with it. This approach nicely fits cold, infrequent cases
+ // but when applied to a hot one, it will likely hurt performance.
+ //
+ // Alternative approach is to create `std::exception_ptr` on our
+ // own and place it in the future via `make_exception_future()`.
+ // When it comes to handling, the pointer can be interrogated for
+ // pointee's type with `__cxa_exception_type()` instead of costly
+ // re-throwing (via `std::rethrow_exception()`) and matching with
+ // `catch`. The limitation here is lack of support for hierarchies
+ // of exceptions. The code below checks for exact match only while
+ // `catch` would allow to match against a base class as well.
+ // However, this shouldn't be a big issue for `errorator` as Error
+ // Visitors are already checked for exhaustiveness at compile-time.
+ //
+ // NOTE: `__cxa_exception_type()` is an extension of the language.
+ // It should be available both in GCC and Clang but a fallback
+ // (based on `std::rethrow_exception()` and `catch`) can be made
+ // to handle other platforms if necessary.
+ if (type_info == ErrorT::error_t::get_exception_ptr_type_info()) {
+ // set `state::invalid` in internals of `seastar::future` to not
+ // call `report_failed_future()` during `operator=()`.
+ [[maybe_unused]] auto&& ep = std::move(result).get_exception();
+
+ using return_t = std::invoke_result_t<ErrorVisitorT, ErrorT>;
+ if constexpr (std::is_assignable_v<decltype(result), return_t>) {
+ result = std::invoke(std::forward<ErrorVisitorT>(errfunc),
+ ErrorT::error_t::from_exception_ptr(std::move(ep)));
+ } else if constexpr (std::is_same_v<return_t, void>) {
+ // void denotes explicit discarding
+ // execute for the sake a side effects. Typically this boils down
+ // to throwing an exception by the handler.
+ std::invoke(std::forward<ErrorVisitorT>(errfunc),
+ ErrorT::error_t::from_exception_ptr(std::move(ep)));
+ } else if constexpr (seastar::Future<decltype(result)>) {
+ // result is seastar::future but return_t is e.g. int. If so,
+ // the else clause cannot be used as seastar::future lacks
+ // errorator_type member.
+ result = seastar::make_ready_future<return_t>(
+ std::invoke(std::forward<ErrorVisitorT>(errfunc),
+ ErrorT::error_t::from_exception_ptr(std::move(ep))));
+ } else {
+ result = FuturatorT::type::errorator_type::template make_ready_future<return_t>(
+ std::invoke(std::forward<ErrorVisitorT>(errfunc),
+ ErrorT::error_t::from_exception_ptr(std::move(ep))));
+ }
+ }
+ }
+
+ auto get_result() && {
+ return std::move(result);
+ }
+};
+
+template <class FuncHead, class... FuncTail>
+static constexpr auto composer(FuncHead&& head, FuncTail&&... tail) {
+ return [
+ head = std::forward<FuncHead>(head),
+ // perfect forwarding in lambda's closure isn't available in C++17
+ // using tuple as workaround; see: https://stackoverflow.com/a/49902823
+ tail = std::make_tuple(std::forward<FuncTail>(tail)...)
+ ] (auto&&... args) mutable -> decltype(auto) {
+ if constexpr (std::is_invocable_v<FuncHead, decltype(args)...>) {
+ return std::invoke(std::forward<FuncHead>(head),
+ std::forward<decltype(args)>(args)...);
+ } else if constexpr (sizeof...(FuncTail) > 0) {
+ using next_composer_t = decltype(composer<FuncTail...>);
+ auto&& next = std::apply<next_composer_t>(composer<FuncTail...>,
+ std::move(tail));
+ return std::invoke(std::move(next),
+ std::forward<decltype(args)>(args)...);
+ } else {
+ static_assert(
+ std::is_invocable_v<FuncHead, decltype(args)...> ||
+ (sizeof...(FuncTail) > 0),
+ "composition is not exhaustive");
+ }
+ };
+}
+
+template <class ValueT>
+struct errorated_future_marker{};
+
+template <class... AllowedErrors>
+class parallel_for_each_state;
+
+template <class T>
+static inline constexpr bool is_error_v = std::is_base_of_v<error_t<T>, T>;
+
+template <typename... AllowedErrors>
+struct errorator;
+
+template <typename Iterator, typename Func, typename... AllowedErrors>
+static inline typename errorator<AllowedErrors...>::template future<>
+parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept;
+
+template <class... AllowedErrors>
+struct errorator {
+
+ static_assert((... && is_error_v<AllowedErrors>),
+ "errorator expects presence of ::is_error in all error types");
+
+ template <class ErrorT>
+ struct contains_once {
+ static constexpr bool value =
+ (0 + ... + std::is_same_v<ErrorT, AllowedErrors>) == 1;
+ };
+ template <class... Errors>
+ struct contains_once<errorator<Errors...>> {
+ static constexpr bool value = (... && contains_once<Errors>::value);
+ };
+ template <class T>
+ static constexpr bool contains_once_v = contains_once<T>::value;
+
+ static_assert((... && contains_once_v<AllowedErrors>),
+ "no error type in errorator can be duplicated");
+
+ struct ready_future_marker{};
+ struct exception_future_marker{};
+
+private:
+ // see the comment for `using future = _future` below.
+ template <class>
+ class [[nodiscard]] _future {};
+ template <class ValueT>
+ class [[nodiscard]] _future<::crimson::errorated_future_marker<ValueT>>
+ : private seastar::future<ValueT> {
+ using base_t = seastar::future<ValueT>;
+ // we need the friendship for the sake of `get_exception() &&` when
+ // `safe_then()` is going to return an errorated future as a result of
+ // chaining. In contrast to `seastar::future`, errorator<T...>::future`
+ // has this member private.
+ template <class ErrorVisitor, class Futurator>
+ friend class maybe_handle_error_t;
+
+ // any `seastar::futurize` specialization must be able to access the base.
+ // see : `satisfy_with_result_of()` far below.
+ template <typename>
+ friend struct seastar::futurize;
+
+ template <typename T1, typename T2, typename... More>
+ friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more);
+
+ template <class, class = std::void_t<>>
+ struct get_errorator {
+ // generic template for non-errorated things (plain types and
+ // vanilla seastar::future as well).
+ using type = errorator<>;
+ };
+ template <class FutureT>
+ struct get_errorator<FutureT,
+ std::void_t<typename FutureT::errorator_type>> {
+ using type = typename FutureT::errorator_type;
+ };
+ template <class T>
+ using get_errorator_t = typename get_errorator<T>::type;
+
+ template <class ValueFuncErroratorT, class... ErrorVisitorRetsT>
+ struct make_errorator {
+ // NOP. The generic template.
+ };
+ template <class... ValueFuncAllowedErrors,
+ class ErrorVisitorRetsHeadT,
+ class... ErrorVisitorRetsTailT>
+ struct make_errorator<errorator<ValueFuncAllowedErrors...>,
+ ErrorVisitorRetsHeadT,
+ ErrorVisitorRetsTailT...> {
+ private:
+ using step_errorator = errorator<ValueFuncAllowedErrors...>;
+ // add ErrorVisitorRetsHeadT only if 1) it's an error type and
+ // 2) isn't already included in the errorator's error set.
+ // It's enough to negate contains_once_v as any errorator<...>
+ // type is already guaranteed to be free of duplications.
+ using _next_errorator = std::conditional_t<
+ is_error_v<ErrorVisitorRetsHeadT> &&
+ !step_errorator::template contains_once_v<ErrorVisitorRetsHeadT>,
+ typename step_errorator::template extend<ErrorVisitorRetsHeadT>,
+ step_errorator>;
+ using maybe_head_ertr = get_errorator_t<ErrorVisitorRetsHeadT>;
+ using next_errorator =
+ typename _next_errorator::template extend_ertr<maybe_head_ertr>;
+
+ public:
+ using type = typename make_errorator<next_errorator,
+ ErrorVisitorRetsTailT...>::type;
+ };
+ // finish the recursion
+ template <class... ValueFuncAllowedErrors>
+ struct make_errorator<errorator<ValueFuncAllowedErrors...>> {
+ using type = ::crimson::errorator<ValueFuncAllowedErrors...>;
+ };
+ template <class... Args>
+ using make_errorator_t = typename make_errorator<Args...>::type;
+
+ using base_t::base_t;
+
+ template <class Futurator, class Future, class ErrorVisitor>
+ [[gnu::noinline]]
+ static auto _safe_then_handle_errors(Future&& future,
+ ErrorVisitor&& errfunc) {
+ maybe_handle_error_t<ErrorVisitor, Futurator> maybe_handle_error(
+ std::forward<ErrorVisitor>(errfunc),
+ std::move(future).get_exception()
+ );
+ (maybe_handle_error.template handle<AllowedErrors>() , ...);
+ return std::move(maybe_handle_error).get_result();
+ }
+
+ protected:
+ using base_t::get_exception;
+ public:
+ using errorator_type = ::crimson::errorator<AllowedErrors...>;
+ using promise_type = seastar::promise<ValueT>;
+
+ using base_t::available;
+ using base_t::failed;
+ // need this because of the legacy in PG::do_osd_ops().
+ using base_t::handle_exception_type;
+
+ [[gnu::always_inline]]
+ _future(base_t&& base)
+ : base_t(std::move(base)) {
+ }
+
+ base_t to_base() && {
+ return std::move(*this);
+ }
+
+ template <class... A>
+ [[gnu::always_inline]]
+ _future(ready_future_marker, A&&... a)
+ : base_t(::seastar::make_ready_future<ValueT>(std::forward<A>(a)...)) {
+ }
+ [[gnu::always_inline]]
+ _future(exception_future_marker, ::seastar::future_state_base&& state) noexcept
+ : base_t(::seastar::futurize<base_t>::make_exception_future(std::move(state))) {
+ }
+ [[gnu::always_inline]]
+ _future(exception_future_marker, std::exception_ptr&& ep) noexcept
+ : base_t(::seastar::futurize<base_t>::make_exception_future(std::move(ep))) {
+ }
+
+ template <template <class...> class ErroratedFuture,
+ class = std::void_t<
+ typename ErroratedFuture<
+ ::crimson::errorated_future_marker<ValueT>>::errorator_type>>
+ operator ErroratedFuture<errorated_future_marker<ValueT>> () && {
+ using dest_errorator_t = \
+ typename ErroratedFuture<
+ ::crimson::errorated_future_marker<ValueT>>::errorator_type;
+ static_assert(dest_errorator_t::template contains_once_v<errorator_type>,
+ "conversion is possible to more-or-eq errorated future!");
+ return static_cast<base_t&&>(*this);
+ }
+
+ // initialize future as failed without throwing. `make_exception_future()`
+ // internally uses `std::make_exception_ptr()`. cppreference.com shouldn't
+ // be misinterpreted when it says:
+ //
+ // "This is done as if executing the following code:
+ // try {
+ // throw e;
+ // } catch(...) {
+ // return std::current_exception();
+ // }",
+ //
+ // the "as if" is absolutely crucial because modern GCCs employ optimized
+ // path for it. See:
+ // * https://gcc.gnu.org/git/?p=gcc.git;a=commit;h=cce8e59224e18858749a2324bce583bcfd160d6c,
+ // * https://gcc.gnu.org/ml/gcc-patches/2016-08/msg00373.html.
+ //
+ // This behavior, combined with `__cxa_exception_type()` for inspecting
+ // exception's type, allows for throw/catch-free handling of stateless
+ // exceptions (which is fine for error codes). Stateful jumbos would be
+ // actually a bit harder as `_M_get()` is private, and thus rethrowing is
+ // necessary to get to the state inside. However, it's not unthinkable to
+ // see another extension bringing operator*() to the exception pointer...
+ //
+ // TODO: we don't really need to `make_exception_ptr` each time. It still
+ // allocates memory underneath while can be replaced with single instance
+ // per type created on start-up.
+ template <class ErrorT,
+ class DecayedT = std::decay_t<ErrorT>,
+ bool IsError = is_error_v<DecayedT>,
+ class = std::enable_if_t<IsError>>
+ _future(ErrorT&& e)
+ : base_t(
+ seastar::make_exception_future<ValueT>(
+ errorator_type::make_exception_ptr(e))) {
+ static_assert(errorator_type::contains_once_v<DecayedT>,
+ "ErrorT is not enlisted in errorator");
+ }
+
+ template <class ValueFuncT, class ErrorVisitorT>
+ auto safe_then(ValueFuncT&& valfunc, ErrorVisitorT&& errfunc) {
+ static_assert((... && std::is_invocable_v<ErrorVisitorT,
+ AllowedErrors>),
+ "provided Error Visitor is not exhaustive");
+ static_assert(std::is_void_v<ValueT> ? std::is_invocable_v<ValueFuncT>
+ : std::is_invocable_v<ValueFuncT, ValueT>,
+ "Value Func is not invocable with future's value");
+ using value_func_result_t =
+ typename std::conditional_t<std::is_void_v<ValueT>,
+ std::invoke_result<ValueFuncT>,
+ std::invoke_result<ValueFuncT, ValueT>>::type;
+ // recognize whether there can be any error coming from the Value
+ // Function.
+ using value_func_errorator_t = get_errorator_t<value_func_result_t>;
+ // mutate the Value Function's errorator to harvest errors coming
+ // from the Error Visitor. Yes, it's perfectly fine to fail error
+ // handling at one step and delegate even broader set of issues
+ // to next continuation.
+ using return_errorator_t = make_errorator_t<
+ value_func_errorator_t,
+ std::decay_t<std::invoke_result_t<ErrorVisitorT, AllowedErrors>>...>;
+ // OK, now we know about all errors next continuation must take
+ // care about. If Visitor handled everything and the Value Func
+ // doesn't return any, we'll finish with errorator<>::future
+ // which is just vanilla seastar::future – that's it, next cont
+ // finally could use `.then()`!
+ using futurator_t = \
+ typename return_errorator_t::template futurize<value_func_result_t>;
+ // `seastar::futurize`, used internally by `then_wrapped()`, would
+ // wrap any non-`seastar::future` type coming from Value Func into
+ // `seastar::future`. As we really don't want to end with things
+ // like `seastar::future<errorator::future<...>>`, we need either:
+ // * convert the errorated future into plain in the lambda below
+ // and back here or
+ // * specialize the `seastar::futurize<T>` to get proper kind of
+ // future directly from `::then_wrapped()`.
+ // As C++17 doesn't guarantee copy elision when non-same types are
+ // involved while examination of assemblies from GCC 8.1 confirmed
+ // extra copying, switch to the second approach has been made.
+ return this->then_wrapped(
+ [ valfunc = std::forward<ValueFuncT>(valfunc),
+ errfunc = std::forward<ErrorVisitorT>(errfunc)
+ ] (auto&& future) mutable noexcept {
+ if (__builtin_expect(future.failed(), false)) {
+ return _safe_then_handle_errors<futurator_t>(
+ std::move(future), std::forward<ErrorVisitorT>(errfunc));
+ } else {
+ // NOTE: using `seastar::future::get()` here is a bit bloaty
+ // as the method rechecks availability of future's value and,
+ // if it's unavailable, does the `::do_wait()` path (yes, it
+ // targets `seastar::thread`). Actually this is dead code as
+ // `then_wrapped()` executes the lambda only when the future
+ // is available (which means: failed or ready). However, GCC
+ // hasn't optimized it out:
+ //
+ // if (__builtin_expect(future.failed(), false)) {
+ // ea25: 48 83 bd c8 fe ff ff cmpq $0x2,-0x138(%rbp)
+ // ea2c: 02
+ // ea2d: 0f 87 f0 05 00 00 ja f023 <ceph::osd::
+ // ...
+ // /// If get() is called in a \ref seastar::thread context,
+ // /// then it need not be available; instead, the thread will
+ // /// be paused until the future becomes available.
+ // [[gnu::always_inline]]
+ // std::tuple<T...> get() {
+ // if (!_state.available()) {
+ // ea3a: 0f 85 1b 05 00 00 jne ef5b <ceph::osd::
+ // }
+ // ...
+ //
+ // I don't perceive this as huge issue. Though, it cannot be
+ // claimed errorator has 0 overhead on hot path. The perfect
+ // solution here would be mark the `::get_available_state()`
+ // as `protected` and use dedicated `get_value()` exactly as
+ // `::then()` already does.
+ return futurator_t::invoke(std::forward<ValueFuncT>(valfunc),
+ std::move(future).get());
+ }
+ });
+ }
+
+ /**
+ * unsafe_thread_get
+ *
+ * Only valid within a seastar_thread. Ignores errorator protections
+ * and throws any contained exceptions.
+ *
+ * Should really only be used within test code
+ * (see test/crimson/gtest_seastar.h).
+ */
+ auto &&unsafe_get() {
+ return seastar::future<ValueT>::get();
+ }
+ auto unsafe_get0() {
+ return seastar::future<ValueT>::get0();
+ }
+
+ template <class FuncT>
+ _future finally(FuncT &&func) {
+ return this->then_wrapped(
+ [func = std::forward<FuncT>(func)](auto &&result) mutable noexcept {
+ if constexpr (seastar::InvokeReturnsAnyFuture<FuncT>) {
+ return ::seastar::futurize_invoke(std::forward<FuncT>(func)).then_wrapped(
+ [result = std::move(result)](auto&& f_res) mutable {
+ // TODO: f_res.failed()
+ (void)f_res.discard_result();
+ return std::move(result);
+ });
+ } else {
+ try {
+ func();
+ } catch (...) {
+ // TODO: rethrow
+ }
+ return std::move(result);
+ }
+ });
+ }
+
+ _future<::crimson::errorated_future_marker<void>>
+ discard_result() noexcept {
+ return safe_then([](auto&&) {});
+ }
+
+ // taking ErrorFuncOne and ErrorFuncTwo separately from ErrorFuncTail
+ // to avoid SFINAE
+ template <class ValueFunc,
+ class ErrorFuncHead,
+ class... ErrorFuncTail>
+ auto safe_then(ValueFunc&& value_func,
+ ErrorFuncHead&& error_func_head,
+ ErrorFuncTail&&... error_func_tail) {
+ static_assert(sizeof...(ErrorFuncTail) > 0);
+ return safe_then(
+ std::forward<ValueFunc>(value_func),
+ composer(std::forward<ErrorFuncHead>(error_func_head),
+ std::forward<ErrorFuncTail>(error_func_tail)...));
+ }
+
+ template <class ValueFunc>
+ auto safe_then(ValueFunc&& value_func) {
+ return safe_then(std::forward<ValueFunc>(value_func),
+ errorator_type::pass_further{});
+ }
+
+ template <class ValueFunc,
+ class... ErrorFuncs>
+ auto safe_then_unpack(ValueFunc&& value_func,
+ ErrorFuncs&&... error_funcs) {
+ return safe_then(
+ [value_func=std::move(value_func)] (ValueT&& tuple) mutable {
+ assert_moveable(value_func);
+ return std::apply(std::move(value_func), std::move(tuple));
+ },
+ std::forward<ErrorFuncs>(error_funcs)...
+ );
+ }
+
+ template <class Func>
+ void then(Func&&) = delete;
+
+ template <class ErrorVisitorT>
+ auto handle_error(ErrorVisitorT&& errfunc) {
+ static_assert((... && std::is_invocable_v<ErrorVisitorT,
+ AllowedErrors>),
+ "provided Error Visitor is not exhaustive");
+ using return_errorator_t = make_errorator_t<
+ errorator<>,
+ std::decay_t<std::invoke_result_t<ErrorVisitorT, AllowedErrors>>...>;
+ using futurator_t = \
+ typename return_errorator_t::template futurize<::seastar::future<ValueT>>;
+ return this->then_wrapped(
+ [ errfunc = std::forward<ErrorVisitorT>(errfunc)
+ ] (auto&& future) mutable noexcept {
+ if (__builtin_expect(future.failed(), false)) {
+ return _safe_then_handle_errors<futurator_t>(
+ std::move(future), std::forward<ErrorVisitorT>(errfunc));
+ } else {
+ return typename futurator_t::type{ std::move(future) };
+ }
+ });
+ }
+
+ template <class ErrorFuncHead,
+ class... ErrorFuncTail>
+ auto handle_error(ErrorFuncHead&& error_func_head,
+ ErrorFuncTail&&... error_func_tail) {
+ static_assert(sizeof...(ErrorFuncTail) > 0);
+ return this->handle_error(
+ composer(std::forward<ErrorFuncHead>(error_func_head),
+ std::forward<ErrorFuncTail>(error_func_tail)...));
+ }
+
+ private:
+ // for ::crimson::do_for_each
+ template <class Func>
+ auto _then(Func&& func) {
+ return base_t::then(std::forward<Func>(func));
+ }
+ template <class T>
+ auto _forward_to(T&& pr) {
+ return base_t::forward_to(std::forward<T>(pr));
+ }
+ template<typename Iterator, typename AsyncAction>
+ friend inline auto ::crimson::do_for_each(Iterator begin,
+ Iterator end,
+ AsyncAction action);
+
+ template <typename Iterator, typename AsyncAction, typename FutureT>
+ friend class ::crimson::do_for_each_state;
+
+ template<typename AsyncAction>
+ friend inline auto ::crimson::repeat(AsyncAction action);
+
+ template <typename Result>
+ friend class ::seastar::future;
+
+ // let seastar::do_with_impl to up-cast us to seastar::future.
+ template<typename T, typename F>
+ friend inline auto ::seastar::internal::do_with_impl(T&& rvalue, F&& f);
+ template<typename T1, typename T2, typename T3_or_F, typename... More>
+ friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
+ template<typename, typename>
+ friend class ::crimson::interruptible::interruptible_future_detail;
+ friend class ::crimson::parallel_for_each_state<AllowedErrors...>;
+ template <typename IC, typename FT>
+ friend class ::crimson::interruptible::parallel_for_each_state;
+ };
+
+ class Enabler {};
+
+ template <typename T>
+ using EnableIf = typename std::enable_if<contains_once_v<std::decay_t<T>>, Enabler>::type;
+
+ template <typename ErrorFunc>
+ struct all_same_way_t {
+ ErrorFunc func;
+ all_same_way_t(ErrorFunc &&error_func)
+ : func(std::forward<ErrorFunc>(error_func)) {}
+
+ template <typename ErrorT, EnableIf<ErrorT>...>
+ decltype(auto) operator()(ErrorT&& e) {
+ using decayed_t = std::decay_t<decltype(e)>;
+ auto&& handler =
+ decayed_t::error_t::handle(std::forward<ErrorFunc>(func));
+ static_assert(std::is_invocable_v<decltype(handler), ErrorT>);
+ return std::invoke(std::move(handler), std::forward<ErrorT>(e));
+ }
+ };
+
+public:
+ // HACK: `errorated_future_marker` and `_future` is just a hack to
+ // specialize `seastar::futurize` for category of class templates:
+ // `future<...>` from distinct errorators. Such tricks are usually
+ // performed basing on SFINAE and `std::void_t` to check existence
+ // of a trait/member (`future<...>::errorator_type` in our case).
+ // Unfortunately, this technique can't be applied as the `futurize`
+ // lacks the optional parameter. The problem looks awfully similar
+ // to following SO item: https://stackoverflow.com/a/38860413.
+ template <class ValueT=void>
+ using future = _future<::crimson::errorated_future_marker<ValueT>>;
+
+ // the visitor that forwards handling of all errors to next continuation
+ struct pass_further {
+ template <class ErrorT, EnableIf<ErrorT>...>
+ decltype(auto) operator()(ErrorT&& e) {
+ static_assert(contains_once_v<std::decay_t<ErrorT>>,
+ "passing further disallowed ErrorT");
+ return std::forward<ErrorT>(e);
+ }
+ };
+
+ struct discard_all {
+ template <class ErrorT, EnableIf<ErrorT>...>
+ void operator()(ErrorT&&) {
+ static_assert(contains_once_v<std::decay_t<ErrorT>>,
+ "discarding disallowed ErrorT");
+ }
+ };
+
+ template <typename T>
+ static future<T> make_errorator_future(seastar::future<T>&& fut) {
+ return std::move(fut);
+ }
+
+ // assert_all{ "TODO" };
+ class assert_all {
+ const char* const msg = nullptr;
+ public:
+ template <std::size_t N>
+ assert_all(const char (&msg)[N])
+ : msg(msg) {
+ }
+ assert_all() = default;
+
+ template <class ErrorT, EnableIf<ErrorT>...>
+ void operator()(ErrorT&&) {
+ static_assert(contains_once_v<std::decay_t<ErrorT>>,
+ "discarding disallowed ErrorT");
+ if (msg) {
+ ceph_abort_msg(msg);
+ } else {
+ ceph_abort();
+ }
+ }
+ };
+
+ template <class ErrorFunc>
+ static decltype(auto) all_same_way(ErrorFunc&& error_func) {
+ return all_same_way_t<ErrorFunc>{std::forward<ErrorFunc>(error_func)};
+ };
+
+ // get a new errorator by extending current one with new errors
+ template <class... NewAllowedErrorsT>
+ using extend = errorator<AllowedErrors..., NewAllowedErrorsT...>;
+
+ // get a new errorator by summing and deduplicating error set of
+ // the errorator `unify<>` is applied on with another errorator
+ // provided as template parameter.
+ template <class OtherErroratorT>
+ struct unify {
+ // 1st: generic NOP template
+ };
+ template <class OtherAllowedErrorsHead,
+ class... OtherAllowedErrorsTail>
+ struct unify<errorator<OtherAllowedErrorsHead,
+ OtherAllowedErrorsTail...>> {
+ private:
+ // 2nd: specialization for errorators with non-empty error set.
+ //
+ // split error set of other errorator, passed as template param,
+ // into head and tail. Mix error set of this errorator with head
+ // of the other one only if it isn't already present in the set.
+ using step_errorator = std::conditional_t<
+ contains_once_v<OtherAllowedErrorsHead> == false,
+ errorator<AllowedErrors..., OtherAllowedErrorsHead>,
+ errorator<AllowedErrors...>>;
+ using rest_errorator = errorator<OtherAllowedErrorsTail...>;
+
+ public:
+ using type = typename step_errorator::template unify<rest_errorator>::type;
+ };
+ template <class... EmptyPack>
+ struct unify<errorator<EmptyPack...>> {
+ // 3rd: recursion finisher
+ static_assert(sizeof...(EmptyPack) == 0);
+ using type = errorator<AllowedErrors...>;
+ };
+
+ // get a new errorator by extending current one with another errorator
+ template <class E>
+ using extend_ertr = typename unify<E>::type;
+
+ template <typename T=void, typename... A>
+ static future<T> make_ready_future(A&&... value) {
+ return future<T>(ready_future_marker(), std::forward<A>(value)...);
+ }
+
+ template <typename T=void>
+ static
+ future<T> make_exception_future2(std::exception_ptr&& ex) noexcept {
+ return future<T>(exception_future_marker(), std::move(ex));
+ }
+ template <typename T=void>
+ static
+ future<T> make_exception_future2(seastar::future_state_base&& state) noexcept {
+ return future<T>(exception_future_marker(), std::move(state));
+ }
+ template <typename T=void, typename Exception>
+ static
+ future<T> make_exception_future2(Exception&& ex) noexcept {
+ return make_exception_future2<T>(std::make_exception_ptr(std::forward<Exception>(ex)));
+ }
+
+ static auto now() {
+ return make_ready_future<>();
+ }
+
+ template <typename Container, typename Func>
+ static inline auto parallel_for_each(Container&& container, Func&& func) noexcept {
+ return crimson::parallel_for_each<decltype(std::begin(container)), Func, AllowedErrors...>(
+ std::begin(container),
+ std::end(container),
+ std::forward<Func>(func));
+ }
+
+ template <typename Iterator, typename Func>
+ static inline errorator<AllowedErrors...>::future<>
+ parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept {
+ return crimson::parallel_for_each<Iterator, Func, AllowedErrors...>(
+ first,
+ last,
+ std::forward<Func>(func));
+ }
+private:
+ template <class T>
+ class futurize {
+ using vanilla_futurize = seastar::futurize<T>;
+
+ // explicit specializations for nested type is not allowed unless both
+ // the member template and the enclosing template are specialized. see
+ // section temp.expl.spec, N4659
+ template <class Stored, int Dummy = 0>
+ struct stored_to_future {
+ using type = future<Stored>;
+ };
+ template <int Dummy>
+ struct stored_to_future <seastar::internal::monostate, Dummy> {
+ using type = future<>;
+ };
+
+ public:
+ using type =
+ typename stored_to_future<typename vanilla_futurize::value_type>::type;
+
+ template <class Func, class... Args>
+ static type invoke(Func&& func, Args&&... args) {
+ try {
+ return vanilla_futurize::invoke(std::forward<Func>(func),
+ std::forward<Args>(args)...);
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <class Func>
+ static type invoke(Func&& func, seastar::internal::monostate) {
+ try {
+ return vanilla_futurize::invoke(std::forward<Func>(func));
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <typename Arg>
+ static type make_exception_future(Arg&& arg) {
+ return vanilla_futurize::make_exception_future(std::forward<Arg>(arg));
+ }
+ };
+ template <template <class...> class ErroratedFutureT,
+ class ValueT>
+ class futurize<ErroratedFutureT<::crimson::errorated_future_marker<ValueT>>> {
+ public:
+ using type = ::crimson::errorator<AllowedErrors...>::future<ValueT>;
+
+ template <class Func, class... Args>
+ static type invoke(Func&& func, Args&&... args) {
+ try {
+ return ::seastar::futurize_invoke(std::forward<Func>(func),
+ std::forward<Args>(args)...);
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <class Func>
+ static type invoke(Func&& func, seastar::internal::monostate) {
+ try {
+ return ::seastar::futurize_invoke(std::forward<Func>(func));
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <typename Arg>
+ static type make_exception_future(Arg&& arg) {
+ return ::crimson::errorator<AllowedErrors...>::make_exception_future2<ValueT>(std::forward<Arg>(arg));
+ }
+ };
+
+ template <typename InterruptCond, typename FutureType>
+ class futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>> {
+ public:
+ using type = ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, typename futurize<FutureType>::type>;
+
+ template <typename Func, typename... Args>
+ static type invoke(Func&& func, Args&&... args) {
+ try {
+ return ::seastar::futurize_invoke(std::forward<Func>(func),
+ std::forward<Args>(args)...);
+ } catch(...) {
+ return seastar::futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>>::make_exception_future(
+ std::current_exception());
+ }
+ }
+ template <typename Func>
+ static type invoke(Func&& func, seastar::internal::monostate) {
+ try {
+ return ::seastar::futurize_invoke(std::forward<Func>(func));
+ } catch(...) {
+ return seastar::futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>>::make_exception_future(
+ std::current_exception());
+ }
+ }
+ template <typename Arg>
+ static type make_exception_future(Arg&& arg) {
+ return ::seastar::futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, FutureType>>::make_exception_future(
+ std::forward<Arg>(arg));
+ }
+ };
+
+ template <class ErrorT>
+ static std::exception_ptr make_exception_ptr(ErrorT&& e) {
+ // calling via interface class due to encapsulation and friend relations.
+ return e.error_t<std::decay_t<ErrorT>>::to_exception_ptr();
+ }
+
+ // needed because of:
+ // * return_errorator_t::template futurize<...> in `safe_then()`,
+ // * conversion to `std::exception_ptr` in `future::future(ErrorT&&)`.
+ // the friendship with all errorators is an idea from Kefu to fix build
+ // issues on GCC 9. This version likely fixes some access violation bug
+ // we were exploiting before.
+ template <class...>
+ friend class errorator;
+ template<typename, typename>
+ friend class ::crimson::interruptible::interruptible_future_detail;
+}; // class errorator, generic template
+
+// no errors? errorator<>::future is plain seastar::future then!
+template <>
+class errorator<> {
+public:
+ template <class ValueT=void>
+ using future = ::seastar::futurize_t<ValueT>;
+
+ template <class T>
+ using futurize = ::seastar::futurize<T>;
+
+ // get a new errorator by extending current one with errors
+ template <class... NewAllowedErrors>
+ using extend = errorator<NewAllowedErrors...>;
+
+ // get a new errorator by extending current one with another errorator
+ template <class E>
+ using extend_ertr = E;
+
+ // errorator with empty error set never contains any error
+ template <class T>
+ static constexpr bool contains_once_v = false;
+}; // class errorator, <> specialization
+
+
+template <class ErroratorOne,
+ class ErroratorTwo,
+ class... FurtherErrators>
+struct compound_errorator {
+private:
+ // generic template. Empty `FurtherErrators` are handled by
+ // the specialization below.
+ static_assert(sizeof...(FurtherErrators) > 0);
+ using step =
+ typename compound_errorator<ErroratorOne, ErroratorTwo>::type;
+
+public:
+ using type =
+ typename compound_errorator<step, FurtherErrators...>::type;
+};
+template <class ErroratorOne,
+ class ErroratorTwo>
+struct compound_errorator<ErroratorOne, ErroratorTwo> {
+ // specialization for empty `FurtherErrators` arg pack
+ using type =
+ typename ErroratorOne::template unify<ErroratorTwo>::type;
+};
+template <class... Args>
+using compound_errorator_t = typename compound_errorator<Args...>::type;
+
+// this is conjunction of two nasty features: C++14's variable template
+// and inline global variable of C++17. The latter is crucial to ensure
+// the variable will get the same address across all translation units.
+template <int ErrorV>
+inline std::error_code ec = std::error_code(ErrorV, std::generic_category());
+
+template <int ErrorV>
+using ct_error_code = unthrowable_wrapper<const std::error_code&, ec<ErrorV>>;
+
+namespace ct_error {
+ using enoent = ct_error_code<static_cast<int>(std::errc::no_such_file_or_directory)>;
+ using enodata = ct_error_code<static_cast<int>(std::errc::no_message_available)>;
+ using invarg = ct_error_code<static_cast<int>(std::errc::invalid_argument)>;
+ using input_output_error = ct_error_code<static_cast<int>(std::errc::io_error)>;
+ using object_corrupted = ct_error_code<static_cast<int>(std::errc::illegal_byte_sequence)>;
+ using permission_denied = ct_error_code<static_cast<int>(std::errc::permission_denied)>;
+ using operation_not_supported =
+ ct_error_code<static_cast<int>(std::errc::operation_not_supported)>;
+ using not_connected = ct_error_code<static_cast<int>(std::errc::not_connected)>;
+ using timed_out = ct_error_code<static_cast<int>(std::errc::timed_out)>;
+ using erange =
+ ct_error_code<static_cast<int>(std::errc::result_out_of_range)>;
+ using ebadf =
+ ct_error_code<static_cast<int>(std::errc::bad_file_descriptor)>;
+ using enospc =
+ ct_error_code<static_cast<int>(std::errc::no_space_on_device)>;
+ using value_too_large = ct_error_code<static_cast<int>(std::errc::value_too_large)>;
+ using eagain =
+ ct_error_code<static_cast<int>(std::errc::resource_unavailable_try_again)>;
+ using file_too_large =
+ ct_error_code<static_cast<int>(std::errc::file_too_large)>;
+ using address_in_use = ct_error_code<static_cast<int>(std::errc::address_in_use)>;
+ using address_not_available = ct_error_code<static_cast<int>(std::errc::address_not_available)>;
+ using ecanceled = ct_error_code<static_cast<int>(std::errc::operation_canceled)>;
+ using einprogress = ct_error_code<static_cast<int>(std::errc::operation_in_progress)>;
+ using enametoolong = ct_error_code<static_cast<int>(std::errc::filename_too_long)>;
+ using eexist = ct_error_code<static_cast<int>(std::errc::file_exists)>;
+ using edquot = ct_error_code<int(122)>;
+ constexpr int cmp_fail_error_value = 4095;
+ using cmp_fail = ct_error_code<int(cmp_fail_error_value)>;
+
+ struct pass_further_all {
+ template <class ErrorT>
+ decltype(auto) operator()(ErrorT&& e) {
+ return std::forward<ErrorT>(e);
+ }
+ };
+
+ struct discard_all {
+ template <class ErrorT>
+ void operator()(ErrorT&&) {
+ }
+ };
+
+ class assert_all {
+ const char* const msg = nullptr;
+ public:
+ template <std::size_t N>
+ assert_all(const char (&msg)[N])
+ : msg(msg) {
+ }
+ assert_all() = default;
+
+ template <class ErrorT>
+ void operator()(ErrorT&&) {
+ if (msg) {
+ ceph_abort(msg);
+ } else {
+ ceph_abort();
+ }
+ }
+ };
+
+ template <class ErrorFunc>
+ static decltype(auto) all_same_way(ErrorFunc&& error_func) {
+ return [
+ error_func = std::forward<ErrorFunc>(error_func)
+ ] (auto&& e) mutable -> decltype(auto) {
+ using decayed_t = std::decay_t<decltype(e)>;
+ auto&& handler =
+ decayed_t::error_t::handle(std::forward<ErrorFunc>(error_func));
+ return std::invoke(std::move(handler), std::forward<decltype(e)>(e));
+ };
+ };
+}
+
+using stateful_errc = stateful_error_t<std::errc>;
+using stateful_errint = stateful_error_t<int>;
+using stateful_ec = stateful_error_t<std::error_code>;
+
+template <typename F>
+struct is_errorated_future {
+ static constexpr bool value = false;
+};
+template <template <class...> class ErroratedFutureT,
+ class ValueT>
+struct is_errorated_future<
+ ErroratedFutureT<::crimson::errorated_future_marker<ValueT>>
+ > {
+ static constexpr bool value = true;
+};
+template <typename T>
+constexpr bool is_errorated_future_v = is_errorated_future<T>::value;
+
+} // namespace crimson
+
+
+// open the `seastar` namespace to specialize `futurize`. This is not
+// pretty for sure. I just hope it's not worse than e.g. specializing
+// `hash` in the `std` namespace. The justification is copy avoidance
+// in `future<...>::safe_then()`. See the comments there for details.
+namespace seastar {
+
+// Container is a placeholder for errorator::_future<> template
+template <template <class> class Container,
+ class Value>
+struct futurize<Container<::crimson::errorated_future_marker<Value>>> {
+ using errorator_type = typename Container<
+ ::crimson::errorated_future_marker<Value>>::errorator_type;
+
+ using type = typename errorator_type::template future<Value>;
+ using value_type = seastar::internal::future_stored_type_t<Value>;
+
+ template<typename Func, typename... FuncArgs>
+ [[gnu::always_inline]]
+ static type apply(Func&& func, std::tuple<FuncArgs...>&& args) noexcept {
+ try {
+ return std::apply(
+ std::forward<Func>(func),
+ std::forward<std::tuple<FuncArgs...>>(args));
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template<typename Func, typename... FuncArgs>
+ [[gnu::always_inline]]
+ static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
+ try {
+ return func(std::forward<FuncArgs>(args)...);
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <class Func>
+ [[gnu::always_inline]]
+ static type invoke(Func&& func, seastar::internal::monostate) noexcept {
+ try {
+ return func();
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <typename Arg>
+ [[gnu::always_inline]]
+ static type make_exception_future(Arg&& arg) {
+ return errorator_type::template make_exception_future2<Value>(std::forward<Arg>(arg));
+ }
+
+private:
+ template<typename PromiseT, typename Func>
+ static void satisfy_with_result_of(PromiseT&& pr, Func&& func) {
+ // this may use the protected variant of `seastar::future::forward_to()`
+ // because:
+ // 1. `seastar::future` established a friendship with with all
+ // specializations of `seastar::futurize`, including this
+ // one (we're in the `seastar` namespace!) WHILE
+ // 2. any errorated future declares now the friendship with any
+ // `seastar::futurize<...>`.
+ func().forward_to(std::move(pr));
+ }
+ template <typename U>
+ friend class future;
+};
+
+template <template <class> class Container,
+ class Value>
+struct continuation_base_from_future<Container<::crimson::errorated_future_marker<Value>>> {
+ using type = continuation_base<Value>;
+};
+
+} // namespace seastar
diff --git a/src/crimson/common/exception.h b/src/crimson/common/exception.h
new file mode 100644
index 000000000..682fef69b
--- /dev/null
+++ b/src/crimson/common/exception.h
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <exception>
+#include <seastar/core/future.hh>
+#include <seastar/core/future-util.hh>
+
+#include "crimson/common/log.h"
+#include "crimson/common/interruptible_future.h"
+
+namespace crimson::common {
+
+class interruption : public std::exception
+{};
+
+class system_shutdown_exception final : public interruption{
+public:
+ const char* what() const noexcept final {
+ return "system shutting down";
+ }
+};
+
+class actingset_changed final : public interruption {
+public:
+ actingset_changed(bool sp) : still_primary(sp) {}
+ const char* what() const noexcept final {
+ return "acting set changed";
+ }
+ bool is_primary() const {
+ return still_primary;
+ }
+private:
+ const bool still_primary;
+};
+
+template<typename Func, typename... Args>
+inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args)
+{
+ return seastar::futurize_invoke(std::forward<Func>(func),
+ std::forward<Args>(args)...)
+ .handle_exception([](std::exception_ptr eptr) {
+ if (*eptr.__cxa_exception_type() ==
+ typeid(crimson::common::system_shutdown_exception)) {
+ crimson::get_logger(ceph_subsys_osd).debug(
+ "operation skipped, system shutdown");
+ return seastar::now();
+ }
+ std::rethrow_exception(eptr);
+ });
+}
+
+}
diff --git a/src/crimson/common/fatal_signal.cc b/src/crimson/common/fatal_signal.cc
new file mode 100644
index 000000000..f2983769d
--- /dev/null
+++ b/src/crimson/common/fatal_signal.cc
@@ -0,0 +1,172 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "fatal_signal.h"
+
+#include <csignal>
+#include <iostream>
+#include <string_view>
+
+#define BOOST_STACKTRACE_USE_ADDR2LINE
+#include <boost/stacktrace.hpp>
+#include <seastar/core/reactor.hh>
+
+#include "common/safe_io.h"
+#include "include/scope_guard.h"
+
+FatalSignal::FatalSignal()
+{
+ install_oneshot_signals_handler<SIGSEGV,
+ SIGABRT,
+ SIGBUS,
+ SIGILL,
+ SIGFPE,
+ SIGXCPU,
+ SIGXFSZ,
+ SIGSYS>();
+}
+
+template <int... SigNums>
+void FatalSignal::install_oneshot_signals_handler()
+{
+ (install_oneshot_signal_handler<SigNums>() , ...);
+}
+
+static void reraise_fatal(const int signum)
+{
+ // use default handler to dump core
+ ::signal(signum, SIG_DFL);
+
+ // normally, we won't get here. if we do, something is very weird.
+ if (::raise(signum)) {
+ std::cerr << "reraise_fatal: failed to re-raise signal " << signum
+ << std::endl;
+ } else {
+ std::cerr << "reraise_fatal: default handler for signal " << signum
+ << " didn't terminate the process?" << std::endl;
+ }
+ std::cerr << std::flush;
+ ::_exit(1);
+}
+
+[[gnu::noinline]] void FatalSignal::signal_entry(
+ const int signum,
+ siginfo_t* const info,
+ void*)
+{
+ if (static std::atomic_bool handled{false}; handled.exchange(true)) {
+ return;
+ }
+ assert(info);
+ FatalSignal::signaled(signum, *info);
+ reraise_fatal(signum);
+}
+
+template <int SigNum>
+void FatalSignal::install_oneshot_signal_handler()
+{
+ struct sigaction sa;
+ // it's a bad idea to use a lambda here. On GCC there are `operator()`
+ // and `_FUN()`. Controlling their inlineability is hard (impossible?).
+ sa.sa_sigaction = signal_entry;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_flags = SA_SIGINFO | SA_RESTART | SA_NODEFER;
+ if constexpr (SigNum == SIGSEGV) {
+ sa.sa_flags |= SA_ONSTACK;
+ }
+ [[maybe_unused]] auto r = ::sigaction(SigNum, &sa, nullptr);
+ assert(r == 0);
+}
+
+
+[[gnu::noinline]] static void print_backtrace(std::string_view cause) {
+ std::cerr << cause;
+ if (seastar::engine_is_ready()) {
+ std::cerr << " on shard " << seastar::this_shard_id();
+ }
+ // nobody wants to see things like `FatalSignal::signaled()` or
+ // `print_backtrace()` in our backtraces. `+ 1` is for the extra
+ // frame created by kernel (signal trampoline, it will take care
+ // about e.g. sigreturn(2) calling; see the man page).
+ constexpr std::size_t FRAMES_TO_SKIP = 3 + 1;
+ std::cerr << ".\nBacktrace:\n";
+ std::cerr << boost::stacktrace::stacktrace(
+ FRAMES_TO_SKIP,
+ static_cast<std::size_t>(-1)/* max depth same as the default one */);
+ std::cerr << std::flush;
+ // TODO: dump crash related meta data to $crash_dir
+ // see handle_fatal_signal()
+}
+
+static void print_segv_info(const siginfo_t& siginfo)
+{
+ std::cerr \
+ << "Dump of siginfo:" << std::endl
+ << " si_signo: " << siginfo.si_signo << std::endl
+ << " si_errno: " << siginfo.si_errno << std::endl
+ << " si_code: " << siginfo.si_code << std::endl
+ << " si_pid: " << siginfo.si_pid << std::endl
+ << " si_uid: " << siginfo.si_uid << std::endl
+ << " si_status: " << siginfo.si_status << std::endl
+ << " si_utime: " << siginfo.si_utime << std::endl
+ << " si_stime: " << siginfo.si_stime << std::endl
+ << " si_int: " << siginfo.si_int << std::endl
+ << " si_ptr: " << siginfo.si_ptr << std::endl
+ << " si_overrun: " << siginfo.si_overrun << std::endl
+ << " si_timerid: " << siginfo.si_timerid << std::endl
+ << " si_addr: " << siginfo.si_addr << std::endl
+ << " si_band: " << siginfo.si_band << std::endl
+ << " si_fd: " << siginfo.si_fd << std::endl
+ << " si_addr_lsb: " << siginfo.si_addr_lsb << std::endl
+ << " si_lower: " << siginfo.si_lower << std::endl
+ << " si_upper: " << siginfo.si_upper << std::endl
+ << " si_pkey: " << siginfo.si_pkey << std::endl
+ << " si_call_addr: " << siginfo.si_call_addr << std::endl
+ << " si_syscall: " << siginfo.si_syscall << std::endl
+ << " si_arch: " << siginfo.si_arch << std::endl;
+ std::cerr << std::flush;
+}
+
+static void print_proc_maps()
+{
+ const int fd = ::open("/proc/self/maps", O_RDONLY);
+ if (fd < 0) {
+ std::cerr << "can't open /proc/self/maps. procfs not mounted?" << std::endl;
+ return;
+ }
+ const auto fd_guard = make_scope_guard([fd] {
+ ::close(fd);
+ });
+ std::cerr << "Content of /proc/self/maps:" << std::endl;
+ while (true) {
+ char chunk[4096] = {0, };
+ const ssize_t r = safe_read(fd, chunk, sizeof(chunk) - 1);
+ if (r < 0) {
+ std::cerr << "error while reading /proc/self/maps: " << r << std::endl;
+ return;
+ } else {
+ std::cerr << chunk << std::flush;
+ if (r < static_cast<ssize_t>(sizeof(chunk) - 1)) {
+ return; // eof
+ }
+ }
+ }
+}
+
+[[gnu::noinline]] void FatalSignal::signaled(const int signum,
+ const siginfo_t& siginfo)
+{
+ switch (signum) {
+ case SIGSEGV:
+ print_backtrace("Segmentation fault");
+ print_segv_info(siginfo);
+ break;
+ case SIGABRT:
+ print_backtrace("Aborting");
+ break;
+ default:
+ print_backtrace(fmt::format("Signal {}", signum));
+ break;
+ }
+ print_proc_maps();
+}
diff --git a/src/crimson/common/fatal_signal.h b/src/crimson/common/fatal_signal.h
new file mode 100644
index 000000000..626017c93
--- /dev/null
+++ b/src/crimson/common/fatal_signal.h
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <csignal>
+
+class FatalSignal {
+public:
+ FatalSignal();
+
+private:
+ static void signal_entry(int signum, siginfo_t* siginfo, void* p);
+ static void signaled(int signum, const siginfo_t& siginfo);
+
+ template <int... SigNums>
+ void install_oneshot_signals_handler();
+
+ template <int SigNum>
+ void install_oneshot_signal_handler();
+};
diff --git a/src/crimson/common/fixed_kv_node_layout.h b/src/crimson/common/fixed_kv_node_layout.h
new file mode 100644
index 000000000..676563594
--- /dev/null
+++ b/src/crimson/common/fixed_kv_node_layout.h
@@ -0,0 +1,730 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <algorithm>
+#include <iostream>
+
+#include <boost/iterator/counting_iterator.hpp>
+
+#include "include/byteorder.h"
+
+#include "crimson/common/layout.h"
+
+namespace crimson::common {
+
+template <typename T, bool is_const>
+struct maybe_const_t {
+};
+template<typename T>
+struct maybe_const_t<T, true> {
+ using type = const T*;
+};
+template<typename T>
+struct maybe_const_t<T, false> {
+ using type = T*;
+};
+
+
+/**
+ * FixedKVNodeLayout
+ *
+ * Reusable implementation of a fixed size block mapping
+ * K -> V with internal representations KINT and VINT.
+ *
+ * Uses absl::container_internal::Layout for the actual memory layout.
+ *
+ * The primary interface exposed is centered on the iterator
+ * and related methods.
+ *
+ * Also included are helpers for doing splits and merges as for a btree.
+ */
+template <
+ size_t CAPACITY,
+ typename Meta,
+ typename MetaInt,
+ typename K,
+ typename KINT,
+ typename V,
+ typename VINT,
+ bool VALIDATE_INVARIANTS=true>
+class FixedKVNodeLayout {
+ char *buf = nullptr;
+
+ using L = absl::container_internal::Layout<ceph_le32, MetaInt, KINT, VINT>;
+ static constexpr L layout{1, 1, CAPACITY, CAPACITY};
+
+public:
+ template <bool is_const>
+ struct iter_t {
+ friend class FixedKVNodeLayout;
+ using parent_t = typename maybe_const_t<FixedKVNodeLayout, is_const>::type;
+
+ parent_t node;
+ uint16_t offset = 0;
+
+ iter_t() = default;
+ iter_t(
+ parent_t parent,
+ uint16_t offset) : node(parent), offset(offset) {}
+
+ iter_t(const iter_t &) noexcept = default;
+ iter_t(iter_t &&) noexcept = default;
+ template<bool is_const_ = is_const>
+ iter_t(const iter_t<false>& it, std::enable_if_t<is_const_, int> = 0)
+ : iter_t{it.node, it.offset}
+ {}
+ iter_t &operator=(const iter_t &) = default;
+ iter_t &operator=(iter_t &&) = default;
+
+ // Work nicely with for loops without requiring a nested type.
+ using reference = iter_t&;
+ iter_t &operator*() { return *this; }
+ iter_t *operator->() { return this; }
+
+ iter_t operator++(int) {
+ auto ret = *this;
+ ++offset;
+ return ret;
+ }
+
+ iter_t &operator++() {
+ ++offset;
+ return *this;
+ }
+
+ iter_t operator--(int) {
+ assert(offset > 0);
+ auto ret = *this;
+ --offset;
+ return ret;
+ }
+
+ iter_t &operator--() {
+ assert(offset > 0);
+ --offset;
+ return *this;
+ }
+
+ uint16_t operator-(const iter_t &rhs) const {
+ assert(rhs.node == node);
+ return offset - rhs.offset;
+ }
+
+ iter_t operator+(uint16_t off) const {
+ return iter_t(
+ node,
+ offset + off);
+ }
+ iter_t operator-(uint16_t off) const {
+ return iter_t(
+ node,
+ offset - off);
+ }
+
+ friend bool operator==(const iter_t &lhs, const iter_t &rhs) {
+ assert(lhs.node == rhs.node);
+ return lhs.offset == rhs.offset;
+ }
+
+ friend bool operator!=(const iter_t &lhs, const iter_t &rhs) {
+ return !(lhs == rhs);
+ }
+
+ friend bool operator==(const iter_t<is_const> &lhs, const iter_t<!is_const> &rhs) {
+ assert(lhs.node == rhs.node);
+ return lhs.offset == rhs.offset;
+ }
+ friend bool operator!=(const iter_t<is_const> &lhs, const iter_t<!is_const> &rhs) {
+ return !(lhs == rhs);
+ }
+ K get_key() const {
+ return K(node->get_key_ptr()[offset]);
+ }
+
+ K get_next_key_or_max() const {
+ auto next = *this + 1;
+ if (next == node->end())
+ return std::numeric_limits<K>::max();
+ else
+ return next->get_key();
+ }
+
+ void set_val(V val) const {
+ static_assert(!is_const);
+ node->get_val_ptr()[offset] = VINT(val);
+ }
+
+ V get_val() const {
+ return V(node->get_val_ptr()[offset]);
+ };
+
+ bool contains(K addr) const {
+ return (get_key() <= addr) && (get_next_key_or_max() > addr);
+ }
+
+ uint16_t get_offset() const {
+ return offset;
+ }
+
+ private:
+ void set_key(K _lb) const {
+ static_assert(!is_const);
+ KINT lb;
+ lb = _lb;
+ node->get_key_ptr()[offset] = lb;
+ }
+
+ typename maybe_const_t<char, is_const>::type get_key_ptr() const {
+ return reinterpret_cast<
+ typename maybe_const_t<char, is_const>::type>(
+ node->get_key_ptr() + offset);
+ }
+
+ typename maybe_const_t<char, is_const>::type get_val_ptr() const {
+ return reinterpret_cast<
+ typename maybe_const_t<char, is_const>::type>(
+ node->get_val_ptr() + offset);
+ }
+ };
+ using const_iterator = iter_t<true>;
+ using iterator = iter_t<false>;
+
+ struct delta_t {
+ enum class op_t : uint8_t {
+ INSERT,
+ REMOVE,
+ UPDATE,
+ } op;
+ KINT key;
+ VINT val;
+
+ void replay(FixedKVNodeLayout &l) {
+ switch (op) {
+ case op_t::INSERT: {
+ l.insert(l.lower_bound(key), key, val);
+ break;
+ }
+ case op_t::REMOVE: {
+ auto iter = l.find(key);
+ assert(iter != l.end());
+ l.remove(iter);
+ break;
+ }
+ case op_t::UPDATE: {
+ auto iter = l.find(key);
+ assert(iter != l.end());
+ l.update(iter, val);
+ break;
+ }
+ default:
+ assert(0 == "Impossible");
+ }
+ }
+
+ bool operator==(const delta_t &rhs) const {
+ return op == rhs.op &&
+ key == rhs.key &&
+ val == rhs.val;
+ }
+ };
+
+public:
+ class delta_buffer_t {
+ std::vector<delta_t> buffer;
+ public:
+ bool empty() const {
+ return buffer.empty();
+ }
+ void insert(
+ const K &key,
+ const V &val) {
+ KINT k;
+ k = key;
+ buffer.push_back(
+ delta_t{
+ delta_t::op_t::INSERT,
+ k,
+ VINT(val)
+ });
+ }
+ void update(
+ const K &key,
+ const V &val) {
+ KINT k;
+ k = key;
+ buffer.push_back(
+ delta_t{
+ delta_t::op_t::UPDATE,
+ k,
+ VINT(val)
+ });
+ }
+ void remove(const K &key) {
+ KINT k;
+ k = key;
+ buffer.push_back(
+ delta_t{
+ delta_t::op_t::REMOVE,
+ k,
+ VINT()
+ });
+ }
+ void replay(FixedKVNodeLayout &node) {
+ for (auto &i: buffer) {
+ i.replay(node);
+ }
+ }
+ size_t get_bytes() const {
+ return buffer.size() * sizeof(delta_t);
+ }
+ void copy_out(char *out, size_t len) {
+ assert(len == get_bytes());
+ ::memcpy(out, reinterpret_cast<const void *>(buffer.data()), get_bytes());
+ buffer.clear();
+ }
+ void copy_in(const char *out, size_t len) {
+ assert(empty());
+ assert(len % sizeof(delta_t) == 0);
+ buffer = std::vector(
+ reinterpret_cast<const delta_t*>(out),
+ reinterpret_cast<const delta_t*>(out + len));
+ }
+ bool operator==(const delta_buffer_t &rhs) const {
+ return buffer == rhs.buffer;
+ }
+ };
+
+ void journal_insert(
+ const_iterator _iter,
+ const K &key,
+ const V &val,
+ delta_buffer_t *recorder) {
+ auto iter = iterator(this, _iter.offset);
+ if (recorder) {
+ recorder->insert(
+ key,
+ val);
+ }
+ insert(iter, key, val);
+ }
+
+ void journal_update(
+ const_iterator _iter,
+ const V &val,
+ delta_buffer_t *recorder) {
+ auto iter = iterator(this, _iter.offset);
+ if (recorder) {
+ recorder->update(iter->get_key(), val);
+ }
+ update(iter, val);
+ }
+
+ void journal_replace(
+ const_iterator _iter,
+ const K &key,
+ const V &val,
+ delta_buffer_t *recorder) {
+ auto iter = iterator(this, _iter.offset);
+ if (recorder) {
+ recorder->remove(iter->get_key());
+ recorder->insert(key, val);
+ }
+ replace(iter, key, val);
+ }
+
+
+ void journal_remove(
+ const_iterator _iter,
+ delta_buffer_t *recorder) {
+ auto iter = iterator(this, _iter.offset);
+ if (recorder) {
+ recorder->remove(iter->get_key());
+ }
+ remove(iter);
+ }
+
+
+ FixedKVNodeLayout(char *buf) :
+ buf(buf) {}
+
+ virtual ~FixedKVNodeLayout() = default;
+
+ const_iterator begin() const {
+ return const_iterator(
+ this,
+ 0);
+ }
+
+ const_iterator end() const {
+ return const_iterator(
+ this,
+ get_size());
+ }
+
+ iterator begin() {
+ return iterator(
+ this,
+ 0);
+ }
+
+ iterator end() {
+ return iterator(
+ this,
+ get_size());
+ }
+
+ const_iterator iter_idx(uint16_t off) const {
+ return const_iterator(
+ this,
+ off);
+ }
+
+ const_iterator find(K l) const {
+ auto ret = begin();
+ for (; ret != end(); ++ret) {
+ if (ret->get_key() == l)
+ break;
+ }
+ return ret;
+ }
+ iterator find(K l) {
+ const auto &tref = *this;
+ return iterator(this, tref.find(l).offset);
+ }
+
+ const_iterator lower_bound(K l) const {
+ auto it = std::lower_bound(boost::make_counting_iterator<uint16_t>(0),
+ boost::make_counting_iterator<uint16_t>(get_size()),
+ l,
+ [this](uint16_t i, K key) {
+ const_iterator iter(this, i);
+ return iter->get_key() < key;
+ });
+ return const_iterator(this, *it);
+ }
+
+ iterator lower_bound(K l) {
+ const auto &tref = *this;
+ return iterator(this, tref.lower_bound(l).offset);
+ }
+
+ const_iterator upper_bound(K l) const {
+ auto it = std::upper_bound(boost::make_counting_iterator<uint16_t>(0),
+ boost::make_counting_iterator<uint16_t>(get_size()),
+ l,
+ [this](K key, uint16_t i) {
+ const_iterator iter(this, i);
+ return key < iter->get_key();
+ });
+ return const_iterator(this, *it);
+ }
+
+ iterator upper_bound(K l) {
+ const auto &tref = *this;
+ return iterator(this, tref.upper_bound(l).offset);
+ }
+
+ const_iterator get_split_pivot() const {
+ return iter_idx(get_size() / 2);
+ }
+
+ uint16_t get_size() const {
+ return *layout.template Pointer<0>(buf);
+ }
+
+ /**
+ * set_size
+ *
+ * Set size representation to match size
+ */
+ void set_size(uint16_t size) {
+ *layout.template Pointer<0>(buf) = size;
+ }
+
+ /**
+ * get_meta/set_meta
+ *
+ * Enables stashing a templated type within the layout.
+ * Cannot be modified after initial write as it is not represented
+ * in delta_t
+ */
+ Meta get_meta() const {
+ MetaInt &metaint = *layout.template Pointer<1>(buf);
+ return Meta(metaint);
+ }
+ void set_meta(const Meta &meta) {
+ *layout.template Pointer<1>(buf) = MetaInt(meta);
+ }
+
+ constexpr static size_t get_capacity() {
+ return CAPACITY;
+ }
+
+ bool operator==(const FixedKVNodeLayout &rhs) const {
+ if (get_size() != rhs.get_size()) {
+ return false;
+ }
+
+ auto iter = begin();
+ auto iter2 = rhs.begin();
+ while (iter != end()) {
+ if (iter->get_key() != iter2->get_key() ||
+ iter->get_val() != iter2->get_val()) {
+ return false;
+ }
+ iter++;
+ iter2++;
+ }
+ return true;
+ }
+
+ /**
+ * split_into
+ *
+ * Takes *this and splits its contents into left and right.
+ */
+ K split_into(
+ FixedKVNodeLayout &left,
+ FixedKVNodeLayout &right) const {
+ auto piviter = get_split_pivot();
+
+ left.copy_from_foreign(left.begin(), begin(), piviter);
+ left.set_size(piviter - begin());
+
+ right.copy_from_foreign(right.begin(), piviter, end());
+ right.set_size(end() - piviter);
+
+ auto [lmeta, rmeta] = get_meta().split_into(piviter->get_key());
+ left.set_meta(lmeta);
+ right.set_meta(rmeta);
+
+ return piviter->get_key();
+ }
+
+ /**
+ * merge_from
+ *
+ * Takes two nodes and copies their contents into *this.
+ *
+ * precondition: left.size() + right.size() < CAPACITY
+ */
+ void merge_from(
+ const FixedKVNodeLayout &left,
+ const FixedKVNodeLayout &right)
+ {
+ copy_from_foreign(
+ end(),
+ left.begin(),
+ left.end());
+ set_size(left.get_size());
+ copy_from_foreign(
+ end(),
+ right.begin(),
+ right.end());
+ set_size(left.get_size() + right.get_size());
+ set_meta(Meta::merge_from(left.get_meta(), right.get_meta()));
+ }
+
+ /**
+ * balance_into_new_nodes
+ *
+ * Takes the contents of left and right and copies them into
+ * replacement_left and replacement_right such that in the
+ * event that the number of elements is odd the extra goes to
+ * the left side iff prefer_left.
+ */
+ static K balance_into_new_nodes(
+ const FixedKVNodeLayout &left,
+ const FixedKVNodeLayout &right,
+ bool prefer_left,
+ FixedKVNodeLayout &replacement_left,
+ FixedKVNodeLayout &replacement_right)
+ {
+ auto total = left.get_size() + right.get_size();
+ auto pivot_idx = (left.get_size() + right.get_size()) / 2;
+ if (total % 2 && prefer_left) {
+ pivot_idx++;
+ }
+ auto replacement_pivot = pivot_idx >= left.get_size() ?
+ right.iter_idx(pivot_idx - left.get_size())->get_key() :
+ left.iter_idx(pivot_idx)->get_key();
+
+ if (pivot_idx < left.get_size()) {
+ replacement_left.copy_from_foreign(
+ replacement_left.end(),
+ left.begin(),
+ left.iter_idx(pivot_idx));
+ replacement_left.set_size(pivot_idx);
+
+ replacement_right.copy_from_foreign(
+ replacement_right.end(),
+ left.iter_idx(pivot_idx),
+ left.end());
+
+ replacement_right.set_size(left.get_size() - pivot_idx);
+ replacement_right.copy_from_foreign(
+ replacement_right.end(),
+ right.begin(),
+ right.end());
+ replacement_right.set_size(total - pivot_idx);
+ } else {
+ replacement_left.copy_from_foreign(
+ replacement_left.end(),
+ left.begin(),
+ left.end());
+ replacement_left.set_size(left.get_size());
+
+ replacement_left.copy_from_foreign(
+ replacement_left.end(),
+ right.begin(),
+ right.iter_idx(pivot_idx - left.get_size()));
+ replacement_left.set_size(pivot_idx);
+
+ replacement_right.copy_from_foreign(
+ replacement_right.end(),
+ right.iter_idx(pivot_idx - left.get_size()),
+ right.end());
+ replacement_right.set_size(total - pivot_idx);
+ }
+
+ auto [lmeta, rmeta] = Meta::rebalance(
+ left.get_meta(), right.get_meta(), replacement_pivot);
+ replacement_left.set_meta(lmeta);
+ replacement_right.set_meta(rmeta);
+ return replacement_pivot;
+ }
+
+private:
+ void insert(
+ iterator iter,
+ const K &key,
+ const V &val) {
+ if (VALIDATE_INVARIANTS) {
+ if (iter != begin()) {
+ assert((iter - 1)->get_key() < key);
+ }
+ if (iter != end()) {
+ assert(iter->get_key() > key);
+ }
+ assert(get_size() < CAPACITY);
+ }
+ copy_from_local(iter + 1, iter, end());
+ iter->set_key(key);
+ iter->set_val(val);
+ set_size(get_size() + 1);
+ }
+
+ void update(
+ iterator iter,
+ V val) {
+ assert(iter != end());
+ iter->set_val(val);
+ }
+
+ void replace(
+ iterator iter,
+ const K &key,
+ const V &val) {
+ assert(iter != end());
+ if (VALIDATE_INVARIANTS) {
+ if (iter != begin()) {
+ assert((iter - 1)->get_key() < key);
+ }
+ if ((iter + 1) != end()) {
+ assert((iter + 1)->get_key() > key);
+ }
+ }
+ iter->set_key(key);
+ iter->set_val(val);
+ }
+
+ void remove(iterator iter) {
+ assert(iter != end());
+ copy_from_local(iter, iter + 1, end());
+ set_size(get_size() - 1);
+ }
+
+ /**
+ * get_key_ptr
+ *
+ * Get pointer to start of key array
+ */
+ KINT *get_key_ptr() {
+ return layout.template Pointer<2>(buf);
+ }
+ const KINT *get_key_ptr() const {
+ return layout.template Pointer<2>(buf);
+ }
+
+ /**
+ * get_val_ptr
+ *
+ * Get pointer to start of val array
+ */
+ VINT *get_val_ptr() {
+ return layout.template Pointer<3>(buf);
+ }
+ const VINT *get_val_ptr() const {
+ return layout.template Pointer<3>(buf);
+ }
+
+ /**
+ * node_resolve/unresolve_vals
+ *
+ * If the representation for values depends in some way on the
+ * node in which they are located, users may implement
+ * resolve/unresolve to enable copy_from_foreign to handle that
+ * transition.
+ */
+ virtual void node_resolve_vals(iterator from, iterator to) const {}
+ virtual void node_unresolve_vals(iterator from, iterator to) const {}
+
+ /**
+ * copy_from_foreign
+ *
+ * Copies entries from [from_src, to_src) to tgt.
+ *
+ * tgt and from_src must be from different nodes.
+ * from_src and to_src must be from the same node.
+ */
+ static void copy_from_foreign(
+ iterator tgt,
+ const_iterator from_src,
+ const_iterator to_src) {
+ assert(tgt->node != from_src->node);
+ assert(to_src->node == from_src->node);
+ memcpy(
+ tgt->get_val_ptr(), from_src->get_val_ptr(),
+ to_src->get_val_ptr() - from_src->get_val_ptr());
+ memcpy(
+ tgt->get_key_ptr(), from_src->get_key_ptr(),
+ to_src->get_key_ptr() - from_src->get_key_ptr());
+ from_src->node->node_resolve_vals(tgt, tgt + (to_src - from_src));
+ tgt->node->node_unresolve_vals(tgt, tgt + (to_src - from_src));
+ }
+
+ /**
+ * copy_from_local
+ *
+ * Copies entries from [from_src, to_src) to tgt.
+ *
+ * tgt, from_src, and to_src must be from the same node.
+ */
+ static void copy_from_local(
+ iterator tgt,
+ iterator from_src,
+ iterator to_src) {
+ assert(tgt->node == from_src->node);
+ assert(to_src->node == from_src->node);
+ memmove(
+ tgt->get_val_ptr(), from_src->get_val_ptr(),
+ to_src->get_val_ptr() - from_src->get_val_ptr());
+ memmove(
+ tgt->get_key_ptr(), from_src->get_key_ptr(),
+ to_src->get_key_ptr() - from_src->get_key_ptr());
+ }
+};
+
+}
diff --git a/src/crimson/common/formatter.cc b/src/crimson/common/formatter.cc
new file mode 100644
index 000000000..ab371ddbf
--- /dev/null
+++ b/src/crimson/common/formatter.cc
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "formatter.h"
+
+#include <fmt/format.h>
+#if FMT_VERSION >= 60000
+#include <fmt/chrono.h>
+#else
+#include <fmt/time.h>
+#endif
+
+
+template <>
+struct fmt::formatter<seastar::lowres_system_clock::time_point> {
+ // ignore the format string
+ template <typename ParseContext>
+ constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }
+
+ template <typename FormatContext>
+ auto format(const seastar::lowres_system_clock::time_point& t,
+ FormatContext& ctx) {
+ std::time_t tt = std::chrono::duration_cast<std::chrono::seconds>(
+ t.time_since_epoch()).count();
+ auto milliseconds = (t.time_since_epoch() %
+ std::chrono::seconds(1)).count();
+ return fmt::format_to(ctx.out(), "{:%Y-%m-%d %H:%M:%S} {:03d}",
+ fmt::localtime(tt), milliseconds);
+ }
+};
+
+namespace std {
+
+ostream& operator<<(ostream& out,
+ const seastar::lowres_system_clock::time_point& t)
+{
+ return out << fmt::format("{}", t);
+}
+
+}
diff --git a/src/crimson/common/formatter.h b/src/crimson/common/formatter.h
new file mode 100644
index 000000000..9b7be428a
--- /dev/null
+++ b/src/crimson/common/formatter.h
@@ -0,0 +1,13 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/lowres_clock.hh>
+
+#include "common/ceph_time.h"
+
+namespace std {
+
+ostream& operator<<(ostream& out,
+ const seastar::lowres_system_clock::time_point& t);
+
+}
diff --git a/src/crimson/common/gated.h b/src/crimson/common/gated.h
new file mode 100644
index 000000000..559a889a3
--- /dev/null
+++ b/src/crimson/common/gated.h
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/gate.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/future-util.hh>
+
+#include "crimson/common/exception.h"
+#include "crimson/common/log.h"
+#include "include/ceph_assert.h"
+
+namespace crimson::common {
+
+class Gated {
+ public:
+ static seastar::logger& gated_logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+ template <typename Func, typename T>
+ inline void dispatch_in_background(const char* what, T& who, Func&& func) {
+ (void) dispatch(what, who, func);
+ }
+ template <typename Func, typename T>
+ inline seastar::future<> dispatch(const char* what, T& who, Func&& func) {
+ return seastar::with_gate(pending_dispatch, std::forward<Func>(func)
+ ).handle_exception([what, &who] (std::exception_ptr eptr) {
+ if (*eptr.__cxa_exception_type() == typeid(system_shutdown_exception)) {
+ gated_logger().debug(
+ "{}, {} skipped, system shutdown", who, what);
+ return;
+ }
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception& e) {
+ gated_logger().error(
+ "{} dispatch() {} caught exception: {}", who, what, e.what());
+ }
+ assert(*eptr.__cxa_exception_type()
+ == typeid(seastar::gate_closed_exception));
+ });
+ }
+
+ seastar::future<> close() {
+ return pending_dispatch.close();
+ }
+ bool is_closed() const {
+ return pending_dispatch.is_closed();
+ }
+ private:
+ seastar::gate pending_dispatch;
+};
+
+}// namespace crimson::common
diff --git a/src/crimson/common/interruptible_future.h b/src/crimson/common/interruptible_future.h
new file mode 100644
index 000000000..c0e2c346c
--- /dev/null
+++ b/src/crimson/common/interruptible_future.h
@@ -0,0 +1,1600 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future-util.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/when_all.hh>
+#include <seastar/core/thread.hh>
+
+#include "crimson/common/log.h"
+#include "crimson/common/errorator.h"
+#ifndef NDEBUG
+#define INTR_FUT_DEBUG(FMT_MSG, ...) crimson::get_logger(ceph_subsys_).trace(FMT_MSG, ##__VA_ARGS__)
+#else
+#define INTR_FUT_DEBUG(FMT_MSG, ...)
+#endif
+
+// The interrupt condition generally works this way:
+//
+// 1. It is created by call_with_interruption_impl method, and is recorded in the thread
+// local global variable "::crimson::interruptible::interrupt_cond".
+// 2. Any continuation that's created within the execution of the continuation
+// that calls the call_with_interruption_impl method will capture the "interrupt_cond";
+// and when they starts to run, they will put that capture interruption condition
+// into "::crimson::interruptible::interrupt_cond" so that further continuations
+// created can also capture the interruption condition;
+// 3. At the end of the continuation run, the global "interrupt_cond" will be cleared
+// to prevent other continuations that are not supposed to be interrupted wrongly
+// capture an interruption condition.
+// With this approach, continuations capture the interrupt condition at their creation,
+// restore the interrupt conditions at the beginning of their execution and clear those
+// interrupt conditions at the end of their execution. So the global "interrupt_cond"
+// only hold valid interrupt conditions when the corresponding continuations are actually
+// running after which it gets cleared. Since continuations can't be executed simultaneously,
+// different continuation chains won't be able to interfere with each other.
+//
+// The global "interrupt_cond" can work as a signal about whether the continuation
+// is supposed to be interrupted, the reason that the global "interrupt_cond"
+// exists is that there may be this scenario:
+//
+// Say there's some method PG::func1(), in which the continuations created may
+// or may not be supposed to be interrupted in different situations. If we don't
+// have a global signal, we have to add an extra parameter to every method like
+// PG::func1() to indicate whether the current run should create to-be-interrupted
+// continuations or not.
+//
+// interruptor::with_interruption() and helpers can be used by users to wrap a future in
+// the interruption machinery.
+
+namespace crimson::os::seastore {
+ class TransactionConflictCondition;
+}
+
+// GCC tries to instantiate
+// seastar::lw_shared_ptr<crimson::os::seastore::TransactionConflictCondition>.
+// but we *may* not have the definition of TransactionConflictCondition at this moment,
+// a full specialization for lw_shared_ptr_accessors helps to bypass the default
+// lw_shared_ptr_accessors implementation, where std::is_base_of<.., T> is used.
+namespace seastar::internal {
+ template<>
+ struct lw_shared_ptr_accessors<::crimson::os::seastore::TransactionConflictCondition, void>
+ : lw_shared_ptr_accessors_no_esft<::crimson::os::seastore::TransactionConflictCondition>
+ {};
+}
+
+SEASTAR_CONCEPT(
+namespace crimson::interruptible {
+ template<typename InterruptCond, typename FutureType>
+ class interruptible_future_detail;
+}
+namespace seastar::impl {
+ template <typename InterruptCond, typename FutureType, typename... Rest>
+ struct is_tuple_of_futures<std::tuple<crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>, Rest...>>
+ : is_tuple_of_futures<std::tuple<Rest...>> {};
+}
+)
+
+namespace crimson::interruptible {
+
+struct ready_future_marker {};
+struct exception_future_marker {};
+
+template <typename InterruptCond>
+class interruptible_future_builder;
+
+template <typename InterruptCond>
+struct interruptor;
+
+template <typename InterruptCond>
+using InterruptCondRef = seastar::lw_shared_ptr<InterruptCond>;
+
+template <typename InterruptCond>
+struct interrupt_cond_t {
+ InterruptCondRef<InterruptCond> interrupt_cond;
+ uint64_t ref_count = 0;
+ void set(
+ InterruptCondRef<InterruptCond>& ic) {
+ INTR_FUT_DEBUG(
+ "{}: going to set interrupt_cond: {}, ic: {}",
+ __func__,
+ (void*)interrupt_cond.get(),
+ (void*)ic.get());
+ if (!interrupt_cond) {
+ interrupt_cond = ic;
+ }
+ assert(interrupt_cond.get() == ic.get());
+ ref_count++;
+ INTR_FUT_DEBUG(
+ "{}: interrupt_cond: {}, ref_count: {}",
+ __func__,
+ (void*)interrupt_cond.get(),
+ ref_count);
+ }
+ void reset() {
+ if (--ref_count == 0) {
+ INTR_FUT_DEBUG(
+ "{}: clearing interrupt_cond: {},{}",
+ __func__,
+ (void*)interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ interrupt_cond.release();
+ } else {
+ INTR_FUT_DEBUG(
+ "{}: end without clearing interrupt_cond: {},{}, ref_count: {}",
+ __func__,
+ (void*)interrupt_cond.get(),
+ typeid(InterruptCond).name(),
+ ref_count);
+ }
+ }
+};
+
+template <typename InterruptCond>
+thread_local interrupt_cond_t<InterruptCond> interrupt_cond;
+
+extern template thread_local interrupt_cond_t<crimson::os::seastore::TransactionConflictCondition>
+interrupt_cond<crimson::os::seastore::TransactionConflictCondition>;
+
+template <typename InterruptCond, typename FutureType>
+class [[nodiscard]] interruptible_future_detail {};
+
+template <typename FutureType>
+struct is_interruptible_future : public std::false_type {};
+
+template <typename InterruptCond, typename FutureType>
+struct is_interruptible_future<
+ interruptible_future_detail<
+ InterruptCond,
+ FutureType>>
+ : public std::true_type {};
+template <typename FutureType>
+concept IsInterruptibleFuture = is_interruptible_future<FutureType>::value;
+template <typename Func, typename... Args>
+concept InvokeReturnsInterruptibleFuture =
+ IsInterruptibleFuture<std::invoke_result_t<Func, Args...>>;
+
+namespace internal {
+
+template <typename InterruptCond, typename Func, typename... Args>
+auto call_with_interruption_impl(
+ InterruptCondRef<InterruptCond> interrupt_condition,
+ Func&& func, Args&&... args)
+{
+ using futurator_t = seastar::futurize<std::invoke_result_t<Func, Args...>>;
+ // there might be a case like this:
+ // with_interruption([] {
+ // interruptor::do_for_each([] {
+ // ...
+ // return interruptible_errorated_future();
+ // }).safe_then_interruptible([] {
+ // ...
+ // });
+ // })
+ // In this case, as crimson::do_for_each would directly do futurize_invoke
+ // for "call_with_interruption", we have to make sure this invocation would
+ // not errorly release ::crimson::interruptible::interrupt_cond<InterruptCond>
+
+ // If there exists an interrupt condition, which means "Func" may not be
+ // permitted to run as a result of the interruption, test it. If it does
+ // need to be interrupted, return an interruption; otherwise, restore the
+ // global "interrupt_cond" with the interruption condition, and go ahead
+ // executing the Func.
+ assert(interrupt_condition);
+ auto fut = interrupt_condition->template may_interrupt<
+ typename futurator_t::type>();
+ INTR_FUT_DEBUG(
+ "call_with_interruption_impl: may_interrupt: {}, "
+ "local interrupt_condition: {}, "
+ "global interrupt_cond: {},{}",
+ (bool)fut,
+ (void*)interrupt_condition.get(),
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ if (fut) {
+ return std::move(*fut);
+ }
+ interrupt_cond<InterruptCond>.set(interrupt_condition);
+
+ auto fut2 = seastar::futurize_invoke(
+ std::forward<Func>(func),
+ std::forward<Args>(args)...);
+ // Clear the global "interrupt_cond" to prevent it from interfering other
+ // continuation chains.
+ interrupt_cond<InterruptCond>.reset();
+ return fut2;
+}
+
+}
+
+template <typename InterruptCond, typename Func, seastar::Future Ret>
+requires (!InterruptCond::template is_interruption_v<Ret>)
+auto call_with_interruption(
+ InterruptCondRef<InterruptCond> interrupt_condition,
+ Func&& func, Ret&& fut)
+{
+ using Result = std::invoke_result_t<Func, Ret>;
+ // if "T" is already an interrupt exception, return it directly;
+ // otherwise, upper layer application may encounter errors executing
+ // the "Func" body.
+ if (fut.failed()) {
+ std::exception_ptr eptr = fut.get_exception();
+ if (interrupt_condition->is_interruption(eptr)) {
+ return seastar::futurize<Result>::make_exception_future(std::move(eptr));
+ }
+ return internal::call_with_interruption_impl(
+ interrupt_condition,
+ std::forward<Func>(func),
+ seastar::futurize<Ret>::make_exception_future(
+ std::move(eptr)));
+ }
+ return internal::call_with_interruption_impl(
+ interrupt_condition,
+ std::forward<Func>(func),
+ std::move(fut));
+}
+
+template <typename InterruptCond, typename Func, typename T>
+requires (InterruptCond::template is_interruption_v<T>)
+auto call_with_interruption(
+ InterruptCondRef<InterruptCond> interrupt_condition,
+ Func&& func, T&& arg)
+{
+ using Result = std::invoke_result_t<Func, T>;
+ // if "T" is already an interrupt exception, return it directly;
+ // otherwise, upper layer application may encounter errors executing
+ // the "Func" body.
+ return seastar::futurize<Result>::make_exception_future(
+ std::get<0>(std::tuple(std::forward<T>(arg))));
+}
+
+template <typename InterruptCond, typename Func, typename T>
+requires (!InterruptCond::template is_interruption_v<T>) && (!seastar::Future<T>)
+auto call_with_interruption(
+ InterruptCondRef<InterruptCond> interrupt_condition,
+ Func&& func, T&& arg)
+{
+ return internal::call_with_interruption_impl(
+ interrupt_condition,
+ std::forward<Func>(func),
+ std::forward<T>(arg));
+}
+
+template <typename InterruptCond, typename Func,
+ typename Result = std::invoke_result_t<Func>>
+auto call_with_interruption(
+ InterruptCondRef<InterruptCond> interrupt_condition,
+ Func&& func)
+{
+ return internal::call_with_interruption_impl(
+ interrupt_condition,
+ std::forward<Func>(func));
+}
+
+template <typename InterruptCond, typename Func, typename... T,
+ typename Result = std::invoke_result_t<Func, T...>>
+Result non_futurized_call_with_interruption(
+ InterruptCondRef<InterruptCond> interrupt_condition,
+ Func&& func, T&&... args)
+{
+ assert(interrupt_condition);
+ auto fut = interrupt_condition->template may_interrupt<seastar::future<>>();
+ INTR_FUT_DEBUG(
+ "non_futurized_call_with_interruption may_interrupt: {}, "
+ "interrupt_condition: {}, interrupt_cond: {},{}",
+ (bool)fut,
+ (void*)interrupt_condition.get(),
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ if (fut) {
+ std::rethrow_exception(fut->get_exception());
+ }
+ interrupt_cond<InterruptCond>.set(interrupt_condition);
+ try {
+ if constexpr (std::is_void_v<Result>) {
+ std::invoke(std::forward<Func>(func), std::forward<T>(args)...);
+
+ // Clear the global "interrupt_cond" to prevent it from interfering other
+ // continuation chains.
+ interrupt_cond<InterruptCond>.reset();
+ return;
+ } else {
+ auto&& err = std::invoke(std::forward<Func>(func), std::forward<T>(args)...);
+ interrupt_cond<InterruptCond>.reset();
+ return std::forward<Result>(err);
+ }
+ } catch (std::exception& e) {
+ // Clear the global "interrupt_cond" to prevent it from interfering other
+ // continuation chains.
+ interrupt_cond<InterruptCond>.reset();
+ throw e;
+ }
+}
+
+template <typename InterruptCond, typename Errorator>
+struct interruptible_errorator;
+
+template <typename T>
+struct parallel_for_each_ret {
+ static_assert(seastar::Future<T>);
+ using type = seastar::future<>;
+};
+
+template <template <typename...> typename ErroratedFuture, typename T>
+struct parallel_for_each_ret<
+ ErroratedFuture<
+ ::crimson::errorated_future_marker<T>>> {
+ using type = ErroratedFuture<::crimson::errorated_future_marker<void>>;
+};
+
+template <typename InterruptCond, typename FutureType>
+class parallel_for_each_state final : private seastar::continuation_base<> {
+ using elem_ret_t = std::conditional_t<
+ IsInterruptibleFuture<FutureType>,
+ typename FutureType::core_type,
+ FutureType>;
+ using future_t = interruptible_future_detail<
+ InterruptCond,
+ typename parallel_for_each_ret<elem_ret_t>::type>;
+ std::vector<future_t> _incomplete;
+ seastar::promise<> _result;
+ std::exception_ptr _ex;
+private:
+ void wait_for_one() noexcept {
+ while (!_incomplete.empty() && _incomplete.back().available()) {
+ if (_incomplete.back().failed()) {
+ _ex = _incomplete.back().get_exception();
+ }
+ _incomplete.pop_back();
+ }
+ if (!_incomplete.empty()) {
+ seastar::internal::set_callback(std::move(_incomplete.back()),
+ static_cast<continuation_base<>*>(this));
+ _incomplete.pop_back();
+ return;
+ }
+ if (__builtin_expect(bool(_ex), false)) {
+ _result.set_exception(std::move(_ex));
+ } else {
+ _result.set_value();
+ }
+ delete this;
+ }
+ virtual void run_and_dispose() noexcept override {
+ if (_state.failed()) {
+ _ex = std::move(_state).get_exception();
+ }
+ _state = {};
+ wait_for_one();
+ }
+ task* waiting_task() noexcept override { return _result.waiting_task(); }
+public:
+ parallel_for_each_state(size_t n) {
+ _incomplete.reserve(n);
+ }
+ void add_future(future_t&& f) {
+ _incomplete.push_back(std::move(f));
+ }
+ future_t get_future() {
+ auto ret = _result.get_future();
+ wait_for_one();
+ return ret;
+ }
+ static future_t now() {
+ return seastar::now();
+ }
+};
+
+template <typename InterruptCond, typename T>
+class [[nodiscard]] interruptible_future_detail<InterruptCond, seastar::future<T>>
+ : private seastar::future<T> {
+public:
+ using core_type = seastar::future<T>;
+ template <typename U>
+ using interrupt_futurize_t =
+ typename interruptor<InterruptCond>::template futurize_t<U>;
+ using core_type::get0;
+ using core_type::core_type;
+ using core_type::get_exception;
+ using core_type::ignore_ready_future;
+
+ [[gnu::always_inline]]
+ interruptible_future_detail(seastar::future<T>&& base)
+ : core_type(std::move(base))
+ {}
+
+ using value_type = typename seastar::future<T>::value_type;
+ using tuple_type = typename seastar::future<T>::tuple_type;
+
+ [[gnu::always_inline]]
+ value_type&& get() {
+ if (core_type::available()) {
+ return core_type::get();
+ } else {
+ // destined to wait!
+ auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
+ INTR_FUT_DEBUG(
+ "interruptible_future_detail::get() waiting, interrupt_cond: {},{}",
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ interrupt_cond<InterruptCond>.reset();
+ auto&& value = core_type::get();
+ interrupt_cond<InterruptCond>.set(interruption_condition);
+ INTR_FUT_DEBUG(
+ "interruptible_future_detail::get() got, interrupt_cond: {},{}",
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ return std::move(value);
+ }
+ }
+
+ using core_type::available;
+ using core_type::failed;
+
+ template <typename Func,
+ typename Result = interrupt_futurize_t<
+ std::invoke_result_t<Func, seastar::future<T>>>>
+ [[gnu::always_inline]]
+ Result then_wrapped_interruptible(Func&& func) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ return core_type::then_wrapped(
+ [func=std::move(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& fut) mutable {
+ return call_with_interruption(
+ std::move(interrupt_condition),
+ std::forward<Func>(func),
+ std::move(fut));
+ });
+ }
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ auto then_interruptible(Func&& func) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ if constexpr (std::is_void_v<T>) {
+ auto fut = core_type::then(
+ [func=std::move(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ () mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func));
+ });
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ } else {
+ auto fut = core_type::then(
+ [func=std::move(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (T&& arg) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func),
+ std::forward<T>(arg));
+ });
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
+ }
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ auto then_unpack_interruptible(Func&& func) {
+ return then_interruptible([func=std::forward<Func>(func)](T&& tuple) mutable {
+ return std::apply(std::forward<Func>(func), std::move(tuple));
+ });
+ }
+
+ template <typename Func,
+ typename Result =interrupt_futurize_t<
+ std::result_of_t<Func(std::exception_ptr)>>>
+ [[gnu::always_inline]]
+ Result handle_exception_interruptible(Func&& func) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ return core_type::then_wrapped(
+ [func=std::forward<Func>(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& fut) mutable {
+ if (!fut.failed()) {
+ return seastar::make_ready_future<T>(fut.get());
+ } else {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func),
+ fut.get_exception());
+ }
+ });
+ }
+
+ template <bool may_interrupt = true, typename Func,
+ typename Result = interrupt_futurize_t<
+ std::result_of_t<Func()>>>
+ [[gnu::always_inline]]
+ Result finally_interruptible(Func&& func) {
+ if constexpr (may_interrupt) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ return core_type::then_wrapped(
+ [func=std::forward<Func>(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& fut) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func));
+ });
+ } else {
+ return core_type::finally(std::forward<Func>(func));
+ }
+ }
+
+ template <typename Func,
+ typename Result = interrupt_futurize_t<
+ std::result_of_t<Func(
+ typename seastar::function_traits<Func>::template arg<0>::type)>>>
+ [[gnu::always_inline]]
+ Result handle_exception_type_interruptible(Func&& func) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ using trait = seastar::function_traits<Func>;
+ static_assert(trait::arity == 1, "func can take only one parameter");
+ using ex_type = typename trait::template arg<0>::type;
+ return core_type::then_wrapped(
+ [func=std::forward<Func>(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& fut) mutable -> Result {
+ if (!fut.failed()) {
+ return seastar::make_ready_future<T>(fut.get());
+ } else {
+ try {
+ std::rethrow_exception(fut.get_exception());
+ } catch (ex_type& ex) {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func), ex);
+ }
+ }
+ });
+ }
+
+
+ using my_type = interruptible_future_detail<InterruptCond, seastar::future<T>>;
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ my_type finally(Func&& func) {
+ return core_type::finally(std::forward<Func>(func));
+ }
+private:
+ template <typename Func>
+ [[gnu::always_inline]]
+ auto handle_interruption(Func&& func) {
+ return core_type::then_wrapped(
+ [func=std::move(func)](auto&& fut) mutable {
+ if (fut.failed()) {
+ std::exception_ptr ex = fut.get_exception();
+ if (InterruptCond::is_interruption(ex)) {
+ return seastar::futurize_invoke(std::move(func), std::move(ex));
+ } else {
+ return seastar::make_exception_future<T>(std::move(ex));
+ }
+ } else {
+ return seastar::make_ready_future<T>(fut.get());
+ }
+ });
+ }
+
+ seastar::future<T> to_future() {
+ return static_cast<core_type&&>(std::move(*this));
+ }
+ // this is only supposed to be invoked by seastar functions
+ template <typename Func,
+ typename Result = interrupt_futurize_t<
+ std::result_of_t<Func(seastar::future<T>)>>>
+ [[gnu::always_inline]]
+ Result then_wrapped(Func&& func) {
+ return core_type::then_wrapped(
+ [func=std::move(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& fut) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::forward<Func>(func),
+ std::move(fut));
+ });
+ }
+ friend interruptor<InterruptCond>;
+ friend class interruptible_future_builder<InterruptCond>;
+ template <typename U>
+ friend struct ::seastar::futurize;
+ template <typename>
+ friend class ::seastar::future;
+ template <typename HeldState, typename Future>
+ friend class seastar::internal::do_with_state;
+ template<typename TX, typename F>
+ friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f);
+ template<typename T1, typename T2, typename T3_or_F, typename... More>
+ friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
+ template <typename T1, typename T2, typename... More>
+ friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more);
+ template <typename, typename>
+ friend class ::crimson::maybe_handle_error_t;
+ template <typename>
+ friend class ::seastar::internal::extract_values_from_futures_vector;
+ template <typename, typename>
+ friend class interruptible_future_detail;
+ template <typename ResolvedVectorTransform, typename Future>
+ friend inline typename ResolvedVectorTransform::future_type
+ seastar::internal::complete_when_all(
+ std::vector<Future>&& futures,
+ typename std::vector<Future>::iterator pos) noexcept;
+ template <typename>
+ friend class ::seastar::internal::when_all_state_component;
+ template <typename Lock, typename Func>
+ friend inline auto seastar::with_lock(Lock& lock, Func&& f);
+ template <typename IC, typename FT>
+ friend class parallel_for_each_state;
+};
+
+template <typename InterruptCond, typename Errorator>
+struct interruptible_errorator {
+ using base_ertr = Errorator;
+ using intr_cond_t = InterruptCond;
+
+ template <typename ValueT = void>
+ using future = interruptible_future_detail<InterruptCond,
+ typename Errorator::template future<ValueT>>;
+
+ template <class... NewAllowedErrorsT>
+ using extend = interruptible_errorator<
+ InterruptCond,
+ typename Errorator::template extend<NewAllowedErrorsT...>>;
+
+ template <class Ertr>
+ using extend_ertr = interruptible_errorator<
+ InterruptCond,
+ typename Errorator::template extend_ertr<Ertr>>;
+
+ template <typename ValueT = void, typename... A>
+ static interruptible_future_detail<
+ InterruptCond,
+ typename Errorator::template future<ValueT>>
+ make_ready_future(A&&... value) {
+ return interruptible_future_detail<
+ InterruptCond, typename Errorator::template future<ValueT>>(
+ Errorator::template make_ready_future<ValueT>(
+ std::forward<A>(value)...));
+ }
+ static interruptible_future_detail<
+ InterruptCond,
+ typename Errorator::template future<>> now() {
+ return interruptible_future_detail<
+ InterruptCond, typename Errorator::template future<>>(
+ Errorator::now());
+ }
+
+ using pass_further = typename Errorator::pass_further;
+};
+
+template <typename InterruptCond,
+ template <typename...> typename ErroratedFuture,
+ typename T>
+class [[nodiscard]] interruptible_future_detail<
+ InterruptCond,
+ ErroratedFuture<::crimson::errorated_future_marker<T>>>
+ : private ErroratedFuture<::crimson::errorated_future_marker<T>>
+{
+public:
+ using core_type = ErroratedFuture<crimson::errorated_future_marker<T>>;
+ using errorator_type = typename core_type::errorator_type;
+ using interrupt_errorator_type =
+ interruptible_errorator<InterruptCond, errorator_type>;
+ using interrupt_cond_type = InterruptCond;
+
+ template <typename U>
+ using interrupt_futurize_t =
+ typename interruptor<InterruptCond>::template futurize_t<U>;
+
+ using core_type::available;
+ using core_type::failed;
+ using core_type::core_type;
+ using core_type::get_exception;
+
+ using value_type = typename core_type::value_type;
+
+ interruptible_future_detail(seastar::future<T>&& fut)
+ : core_type(std::move(fut))
+ {}
+
+ template <template <typename...> typename ErroratedFuture2,
+ typename... U>
+ [[gnu::always_inline]]
+ interruptible_future_detail(
+ ErroratedFuture2<::crimson::errorated_future_marker<U...>>&& fut)
+ : core_type(std::move(fut)) {}
+
+ template <template <typename...> typename ErroratedFuture2,
+ typename... U>
+ [[gnu::always_inline]]
+ interruptible_future_detail(
+ interruptible_future_detail<InterruptCond,
+ ErroratedFuture2<::crimson::errorated_future_marker<U...>>>&& fut)
+ : core_type(static_cast<typename std::decay_t<decltype(fut)>::core_type&&>(fut)) {
+ using src_errorator_t = \
+ typename ErroratedFuture2<
+ ::crimson::errorated_future_marker<U...>>::errorator_type;
+ static_assert(core_type::errorator_type::template contains_once_v<
+ src_errorator_t>,
+ "conversion is only possible from less-or-eq errorated future!");
+ }
+
+ [[gnu::always_inline]]
+ interruptible_future_detail(
+ interruptible_future_detail<InterruptCond, seastar::future<T>>&& fut)
+ : core_type(static_cast<seastar::future<T>&&>(fut)) {}
+
+ template <class... A>
+ [[gnu::always_inline]]
+ interruptible_future_detail(ready_future_marker, A&&... a)
+ : core_type(::seastar::make_ready_future<typename core_type::value_type>(
+ std::forward<A>(a)...)) {
+ }
+ [[gnu::always_inline]]
+ interruptible_future_detail(exception_future_marker, ::seastar::future_state_base&& state) noexcept
+ : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(state))) {
+ }
+ [[gnu::always_inline]]
+ interruptible_future_detail(exception_future_marker, std::exception_ptr&& ep) noexcept
+ : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(ep))) {
+ }
+
+ template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
+ std::enable_if_t<!interruptible, int> = 0>
+ [[gnu::always_inline]]
+ auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
+ auto fut = core_type::safe_then(
+ std::forward<ValueInterruptCondT>(valfunc),
+ std::forward<ErrorVisitorT>(errfunc));
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
+
+ template <typename... Args>
+ auto si_then(Args&&... args) {
+ return safe_then_interruptible(std::forward<Args>(args)...);
+ }
+
+
+ template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
+ typename U = T, std::enable_if_t<!std::is_void_v<U> && interruptible, int> = 0>
+ [[gnu::always_inline]]
+ auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ auto fut = core_type::safe_then(
+ [func=std::move(valfunc),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (T&& args) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func),
+ std::forward<T>(args));
+ }, [func=std::move(errfunc),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& err) mutable -> decltype(auto) {
+ constexpr bool return_void = std::is_void_v<
+ std::invoke_result_t<ErrorVisitorT,
+ std::decay_t<decltype(err)>>>;
+ constexpr bool return_err = ::crimson::is_error_v<
+ std::decay_t<std::invoke_result_t<ErrorVisitorT,
+ std::decay_t<decltype(err)>>>>;
+ if constexpr (return_err || return_void) {
+ return non_futurized_call_with_interruption(
+ interrupt_condition,
+ std::move(func),
+ std::move(err));
+ } else {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func),
+ std::move(err));
+ }
+ });
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
+
+ template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT,
+ typename U = T, std::enable_if_t<std::is_void_v<U> && interruptible, int> = 0>
+ [[gnu::always_inline]]
+ auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ auto fut = core_type::safe_then(
+ [func=std::move(valfunc),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ () mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func));
+ }, [func=std::move(errfunc),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& err) mutable -> decltype(auto) {
+ constexpr bool return_void = std::is_void_v<
+ std::invoke_result_t<ErrorVisitorT,
+ std::decay_t<decltype(err)>>>;
+ constexpr bool return_err = ::crimson::is_error_v<
+ std::decay_t<std::invoke_result_t<ErrorVisitorT,
+ std::decay_t<decltype(err)>>>>;
+ if constexpr (return_err || return_void) {
+ return non_futurized_call_with_interruption(
+ interrupt_condition,
+ std::move(func),
+ std::move(err));
+ } else {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func),
+ std::move(err));
+ }
+ });
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
+
+ template <bool interruptible = true, typename ValueInterruptCondT,
+ typename U = T, std::enable_if_t<std::is_void_v<T> && interruptible, int> = 0>
+ [[gnu::always_inline]]
+ auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ auto fut = core_type::safe_then(
+ [func=std::move(valfunc),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ () mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func));
+ });
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
+
+ template <typename ValFuncT, typename ErrorFuncT>
+ [[gnu::always_inline]]
+ auto safe_then_unpack_interruptible(ValFuncT&& func, ErrorFuncT&& errfunc) {
+ return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable {
+ return std::apply(std::forward<ValFuncT>(func), std::move(tuple));
+ }, std::forward<ErrorFuncT>(errfunc));
+ }
+
+ template <typename ValFuncT>
+ [[gnu::always_inline]]
+ auto safe_then_unpack_interruptible(ValFuncT&& func) {
+ return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable {
+ return std::apply(std::forward<ValFuncT>(func), std::move(tuple));
+ });
+ }
+
+ template <bool interruptible = true, typename ValueInterruptCondT,
+ typename U = T, std::enable_if_t<!std::is_void_v<T> && interruptible, int> = 0>
+ [[gnu::always_inline]]
+ auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ auto fut = core_type::safe_then(
+ [func=std::move(valfunc),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (T&& arg) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(func),
+ std::forward<T>(arg));
+ });
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
+
+ template <bool interruptible = true, typename ValueInterruptCondT,
+ std::enable_if_t<!interruptible, int> = 0>
+ [[gnu::always_inline]]
+ auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
+ auto fut = core_type::safe_then(std::forward<ValueInterruptCondT>(valfunc));
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
+
+ template <typename ValueInterruptCondT,
+ typename ErrorVisitorHeadT,
+ typename... ErrorVisitorTailT>
+ [[gnu::always_inline]]
+ auto safe_then_interruptible(ValueInterruptCondT&& valfunc,
+ ErrorVisitorHeadT&& err_func_head,
+ ErrorVisitorTailT&&... err_func_tail) {
+ return safe_then_interruptible(
+ std::forward<ValueInterruptCondT>(valfunc),
+ ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head),
+ std::forward<ErrorVisitorTailT>(err_func_tail)...));
+ }
+
+ template <typename ValueInterruptCondT,
+ typename ErrorVisitorHeadT,
+ typename... ErrorVisitorTailT>
+ [[gnu::always_inline]]
+ auto safe_then_interruptible_tuple(ValueInterruptCondT&& valfunc,
+ ErrorVisitorHeadT&& err_func_head,
+ ErrorVisitorTailT&&... err_func_tail) {
+ return safe_then_interruptible(
+ std::forward<ValueInterruptCondT>(valfunc),
+ ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head),
+ std::forward<ErrorVisitorTailT>(err_func_tail)...));
+ }
+
+ template <typename ValFuncT,
+ typename ErrorVisitorHeadT,
+ typename... ErrorVisitorTailT>
+ [[gnu::always_inline]]
+ auto safe_then_unpack_interruptible_tuple(
+ ValFuncT&& valfunc,
+ ErrorVisitorHeadT&& err_func_head,
+ ErrorVisitorTailT&&... err_func_tail) {
+ return safe_then_interruptible_tuple(
+ [valfunc=std::forward<ValFuncT>(valfunc)](T&& tuple) mutable {
+ return std::apply(std::forward<ValFuncT>(valfunc), std::move(tuple));
+ },
+ ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head),
+ std::forward<ErrorVisitorTailT>(err_func_tail)...));
+ }
+
+ template <bool interruptible = true, typename ErrorFunc>
+ auto handle_error_interruptible(ErrorFunc&& errfunc) {
+ if constexpr (interruptible) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ auto fut = core_type::handle_error(
+ [errfunc=std::move(errfunc),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& err) mutable -> decltype(auto) {
+ constexpr bool return_void = std::is_void_v<
+ std::invoke_result_t<ErrorFunc,
+ std::decay_t<decltype(err)>>>;
+ constexpr bool return_err = ::crimson::is_error_v<
+ std::decay_t<std::invoke_result_t<ErrorFunc,
+ std::decay_t<decltype(err)>>>>;
+ if constexpr (return_err || return_void) {
+ return non_futurized_call_with_interruption(
+ interrupt_condition,
+ std::move(errfunc),
+ std::move(err));
+ } else {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(errfunc),
+ std::move(err));
+ }
+ });
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ } else {
+ return core_type::handle_error(std::forward<ErrorFunc>(errfunc));
+ }
+ }
+
+ template <typename ErrorFuncHead,
+ typename... ErrorFuncTail>
+ auto handle_error_interruptible(ErrorFuncHead&& error_func_head,
+ ErrorFuncTail&&... error_func_tail) {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ static_assert(sizeof...(ErrorFuncTail) > 0);
+ return this->handle_error_interruptible(
+ ::crimson::composer(
+ std::forward<ErrorFuncHead>(error_func_head),
+ std::forward<ErrorFuncTail>(error_func_tail)...));
+ }
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ auto finally(Func&& func) {
+ auto fut = core_type::finally(std::forward<Func>(func));
+ return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
+ }
+
+private:
+ using core_type::_then;
+ template <typename Func>
+ [[gnu::always_inline]]
+ auto handle_interruption(Func&& func) {
+ // see errorator.h safe_then definition
+ using func_result_t =
+ typename std::invoke_result<Func, std::exception_ptr>::type;
+ using func_ertr_t =
+ typename core_type::template get_errorator_t<func_result_t>;
+ using this_ertr_t = typename core_type::errorator_type;
+ using ret_ertr_t = typename this_ertr_t::template extend_ertr<func_ertr_t>;
+ using futurator_t = typename ret_ertr_t::template futurize<func_result_t>;
+ return core_type::then_wrapped(
+ [func=std::move(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (auto&& fut) mutable
+ -> typename futurator_t::type {
+ if (fut.failed()) {
+ std::exception_ptr ex = fut.get_exception();
+ if (InterruptCond::is_interruption(ex)) {
+ return futurator_t::invoke(std::move(func), std::move(ex));
+ } else {
+ return futurator_t::make_exception_future(std::move(ex));
+ }
+ } else {
+ return std::move(fut);
+ }
+ });
+ }
+
+ ErroratedFuture<::crimson::errorated_future_marker<T>>
+ to_future() {
+ return static_cast<core_type&&>(std::move(*this));
+ }
+
+ friend class interruptor<InterruptCond>;
+ friend class interruptible_future_builder<InterruptCond>;
+ template <typename U>
+ friend struct ::seastar::futurize;
+ template <typename>
+ friend class ::seastar::future;
+ template<typename TX, typename F>
+ friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f);
+ template<typename T1, typename T2, typename T3_or_F, typename... More>
+ friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more);
+ template <typename T1, typename T2, typename... More>
+ friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more);
+ template <typename HeldState, typename Future>
+ friend class seastar::internal::do_with_state;
+ template <typename, typename>
+ friend class ::crimson::maybe_handle_error_t;
+ template <typename, typename>
+ friend class interruptible_future_detail;
+ template <typename Lock, typename Func>
+ friend inline auto seastar::with_lock(Lock& lock, Func&& f);
+ template <typename IC, typename FT>
+ friend class parallel_for_each_state;
+};
+
+template <typename InterruptCond, typename T = void>
+using interruptible_future =
+ interruptible_future_detail<InterruptCond, seastar::future<T>>;
+
+template <typename InterruptCond, typename Errorator, typename T = void>
+using interruptible_errorated_future =
+ interruptible_future_detail<
+ InterruptCond,
+ typename Errorator::template future<T>>;
+
+template <typename InterruptCond>
+struct interruptor
+{
+public:
+ using condition = InterruptCond;
+
+ template <typename FutureType>
+ [[gnu::always_inline]]
+ static interruptible_future_detail<InterruptCond, FutureType>
+ make_interruptible(FutureType&& fut) {
+ return interruptible_future_detail<InterruptCond, FutureType>(std::move(fut));
+ }
+
+ [[gnu::always_inline]]
+ static interruptible_future_detail<InterruptCond, seastar::future<>> now() {
+ return interruptible_future_detail<
+ InterruptCond,
+ seastar::future<>>(seastar::now());
+ }
+
+ template <typename ValueT = void, typename... A>
+ [[gnu::always_inline]]
+ static interruptible_future_detail<InterruptCond, seastar::future<ValueT>>
+ make_ready_future(A&&... value) {
+ return interruptible_future_detail<InterruptCond, seastar::future<ValueT>>(
+ seastar::make_ready_future<ValueT>(std::forward<A>(value)...));
+ }
+
+ template <typename T>
+ struct futurize {
+ using type = interruptible_future_detail<
+ InterruptCond, typename seastar::futurize<T>::type>;
+ };
+
+ template <typename FutureType>
+ struct futurize<interruptible_future_detail<InterruptCond, FutureType>> {
+ using type = interruptible_future_detail<InterruptCond, FutureType>;
+ };
+
+ template <typename T>
+ using futurize_t = typename futurize<T>::type;
+
+ template <typename Container, typename AsyncAction>
+ [[gnu::always_inline]]
+ static auto do_for_each(Container& c, AsyncAction&& action) {
+ return do_for_each(std::begin(c), std::end(c),
+ std::forward<AsyncAction>(action));
+ }
+
+ template <typename OpFunc, typename OnInterrupt,
+ typename... Params>
+ static inline auto with_interruption_cond(
+ OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCond &&cond, Params&&... params) {
+ INTR_FUT_DEBUG(
+ "with_interruption_cond: interrupt_cond: {}",
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get());
+ return internal::call_with_interruption_impl(
+ seastar::make_lw_shared<InterruptCond>(std::move(cond)),
+ std::forward<OpFunc>(opfunc),
+ std::forward<Params>(params)...
+ ).template handle_interruption(std::move(efunc));
+ }
+
+ template <typename OpFunc, typename OnInterrupt,
+ typename... InterruptCondParams>
+ static inline auto with_interruption(
+ OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCondParams&&... params) {
+ return with_interruption_cond(
+ std::forward<OpFunc>(opfunc),
+ std::forward<OnInterrupt>(efunc),
+ InterruptCond(std::forward<InterruptCondParams>(params)...));
+ }
+
+ template <typename Error,
+ typename Func,
+ typename... Params>
+ static inline auto with_interruption_to_error(
+ Func &&f, InterruptCond &&cond, Params&&... params) {
+ using func_result_t = std::invoke_result_t<Func, Params...>;
+ using func_ertr_t =
+ typename seastar::template futurize<
+ func_result_t>::core_type::errorator_type;
+ using with_trans_ertr =
+ typename func_ertr_t::template extend_ertr<errorator<Error>>;
+
+ using value_type = typename func_result_t::value_type;
+ using ftype = typename std::conditional_t<
+ std::is_same_v<value_type, seastar::internal::monostate>,
+ typename with_trans_ertr::template future<>,
+ typename with_trans_ertr::template future<value_type>>;
+
+ return with_interruption_cond(
+ std::forward<Func>(f),
+ [](auto e) -> ftype {
+ return Error::make();
+ },
+ std::forward<InterruptCond>(cond),
+ std::forward<Params>(params)...);
+ }
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ static auto wrap_function(Func&& func) {
+ return [func=std::forward<Func>(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::forward<Func>(func));
+ };
+ }
+
+ template <typename Iterator,
+ InvokeReturnsInterruptibleFuture<typename Iterator::reference> AsyncAction>
+ [[gnu::always_inline]]
+ static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
+ using Result = std::invoke_result_t<AsyncAction, typename Iterator::reference>;
+ if constexpr (seastar::Future<typename Result::core_type>) {
+ return make_interruptible(
+ ::seastar::do_for_each(begin, end,
+ [action=std::move(action),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (typename Iterator::reference x) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(action),
+ std::forward<decltype(*begin)>(x)).to_future();
+ })
+ );
+ } else {
+ return make_interruptible(
+ ::crimson::do_for_each(begin, end,
+ [action=std::move(action),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (typename Iterator::reference x) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(action),
+ std::forward<decltype(*begin)>(x)).to_future();
+ })
+ );
+ }
+ }
+
+ template <typename Iterator, typename AsyncAction>
+ requires (!InvokeReturnsInterruptibleFuture<AsyncAction, typename Iterator::reference>)
+ [[gnu::always_inline]]
+ static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
+ if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction, typename Iterator::reference>) {
+ return make_interruptible(
+ ::seastar::do_for_each(begin, end,
+ [action=std::move(action),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (typename Iterator::reference x) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(action),
+ std::forward<decltype(*begin)>(x));
+ })
+ );
+ } else {
+ return make_interruptible(
+ ::crimson::do_for_each(begin, end,
+ [action=std::move(action),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (typename Iterator::reference x) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(action),
+ std::forward<decltype(*begin)>(x));
+ })
+ );
+ }
+ }
+
+ template <InvokeReturnsInterruptibleFuture AsyncAction>
+ [[gnu::always_inline]]
+ static auto repeat(AsyncAction&& action) {
+ using Result = std::invoke_result_t<AsyncAction>;
+ if constexpr (seastar::Future<typename Result::core_type>) {
+ return make_interruptible(
+ ::seastar::repeat(
+ [action=std::move(action),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(action)).to_future();
+ })
+ );
+ } else {
+ return make_interruptible(
+ ::crimson::repeat(
+ [action=std::move(action),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(action)).to_future();
+ })
+ );
+ }
+ }
+ template <typename AsyncAction>
+ requires (!InvokeReturnsInterruptibleFuture<AsyncAction>)
+ [[gnu::always_inline]]
+ static auto repeat(AsyncAction&& action) {
+ if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction>) {
+ return make_interruptible(
+ ::seastar::repeat(
+ [action=std::move(action),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(action));
+ })
+ );
+ } else {
+ return make_interruptible(
+ ::crimson::repeat(
+ [action=std::move(action),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
+ return call_with_interruption(
+ interrupt_condition,
+ std::move(action));
+ })
+ );
+ }
+ }
+
+ template <typename Iterator, typename Func>
+ static inline auto parallel_for_each(
+ Iterator begin,
+ Iterator end,
+ Func&& func
+ ) noexcept {
+ using ResultType = std::invoke_result_t<Func, typename Iterator::reference>;
+ parallel_for_each_state<InterruptCond, ResultType>* s = nullptr;
+ auto decorated_func =
+ [func=std::forward<Func>(func),
+ interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+ (decltype(*Iterator())&& x) mutable {
+ return call_with_interruption(
+ interrupt_condition,
+ std::forward<Func>(func),
+ std::forward<decltype(*begin)>(x));
+ };
+ // Process all elements, giving each future the following treatment:
+ // - available, not failed: do nothing
+ // - available, failed: collect exception in ex
+ // - not available: collect in s (allocating it if needed)
+ while (begin != end) {
+ auto f = seastar::futurize_invoke(decorated_func, *begin++);
+ if (!f.available() || f.failed()) {
+ if (!s) {
+ using itraits = std::iterator_traits<Iterator>;
+ auto n = (seastar::internal::iterator_range_estimate_vector_capacity(
+ begin, end, typename itraits::iterator_category()) + 1);
+ s = new parallel_for_each_state<InterruptCond, ResultType>(n);
+ }
+ s->add_future(std::move(f));
+ }
+ }
+ // If any futures were not available, hand off to parallel_for_each_state::start().
+ // Otherwise we can return a result immediately.
+ if (s) {
+ // s->get_future() takes ownership of s (and chains it to one of the futures it contains)
+ // so this isn't a leak
+ return s->get_future();
+ }
+ return parallel_for_each_state<InterruptCond, ResultType>::now();
+ }
+
+ template <typename Container, typename Func>
+ static inline auto parallel_for_each(Container& container, Func&& func) noexcept {
+ return parallel_for_each(
+ std::begin(container),
+ std::end(container),
+ std::forward<Func>(func));
+ }
+
+ template <typename Iterator, typename Mapper, typename Initial, typename Reduce>
+ static inline interruptible_future<InterruptCond, Initial> map_reduce(
+ Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce&& reduce) {
+ struct state {
+ Initial result;
+ Reduce reduce;
+ };
+ auto s = seastar::make_lw_shared(state{std::move(initial), std::move(reduce)});
+ interruptible_future<InterruptCond> ret = seastar::make_ready_future<>();
+ while (begin != end) {
+ ret = seastar::futurize_invoke(mapper, *begin++).then_wrapped_interruptible(
+ [s = s.get(), ret = std::move(ret)] (auto f) mutable {
+ try {
+ s->result = s->reduce(std::move(s->result), std::move(f.get0()));
+ return std::move(ret);
+ } catch (...) {
+ return std::move(ret).then_wrapped_interruptible([ex = std::current_exception()] (auto f) {
+ f.ignore_ready_future();
+ return seastar::make_exception_future<>(ex);
+ });
+ }
+ });
+ }
+ return ret.then_interruptible([s] {
+ return seastar::make_ready_future<Initial>(std::move(s->result));
+ });
+ }
+ template <typename Range, typename Mapper, typename Initial, typename Reduce>
+ static inline interruptible_future<InterruptCond, Initial> map_reduce(
+ Range&& range, Mapper&& mapper, Initial initial, Reduce&& reduce) {
+ return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper),
+ std::move(initial), std::move(reduce));
+ }
+
+ template<typename Fut>
+ requires seastar::Future<Fut> || IsInterruptibleFuture<Fut>
+ static auto futurize_invoke_if_func(Fut&& fut) noexcept {
+ return std::forward<Fut>(fut);
+ }
+
+ template<typename Func>
+ requires (!seastar::Future<Func>) && (!IsInterruptibleFuture<Func>)
+ static auto futurize_invoke_if_func(Func&& func) noexcept {
+ return seastar::futurize_invoke(std::forward<Func>(func));
+ }
+
+ template <typename... FutOrFuncs>
+ static inline auto when_all(FutOrFuncs&&... fut_or_funcs) noexcept {
+ return ::seastar::internal::when_all_impl(
+ futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
+ }
+
+ template <typename... FutOrFuncs>
+ static inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) noexcept {
+ return ::seastar::internal::when_all_succeed_impl(
+ futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
+ }
+
+ template <typename Func,
+ typename Result = futurize_t<std::invoke_result_t<Func>>>
+ static inline Result async(Func&& func) {
+ auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
+ INTR_FUT_DEBUG(
+ "interruptible_future_detail::async() yielding out, "
+ "interrupt_cond {},{} cleared",
+ (void*)interruption_condition.get(),
+ typeid(InterruptCond).name());
+ interrupt_cond<InterruptCond>.reset();
+ auto ret = seastar::async([func=std::forward<Func>(func),
+ interruption_condition] () mutable {
+ return non_futurized_call_with_interruption(
+ interruption_condition, std::forward<Func>(func));
+ });
+ interrupt_cond<InterruptCond>.set(interruption_condition);
+ INTR_FUT_DEBUG(
+ "interruptible_future_detail::async() yield back, interrupt_cond: {},{}",
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ return ret;
+ }
+
+ template <class FutureT>
+ static decltype(auto) green_get(FutureT&& fut) {
+ if (fut.available()) {
+ return fut.get();
+ } else {
+ // destined to wait!
+ auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
+ INTR_FUT_DEBUG(
+ "green_get() waiting, interrupt_cond: {},{}",
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ interrupt_cond<InterruptCond>.reset();
+ auto&& value = fut.get();
+ interrupt_cond<InterruptCond>.set(interruption_condition);
+ INTR_FUT_DEBUG(
+ "green_get() got, interrupt_cond: {},{}",
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ return std::move(value);
+ }
+ }
+
+ static void yield() {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
+ INTR_FUT_DEBUG(
+ "interruptible_future_detail::yield() yielding out, "
+ "interrupt_cond {},{} cleared",
+ (void*)interruption_condition.get(),
+ typeid(InterruptCond).name());
+ interrupt_cond<InterruptCond>.reset();
+ seastar::thread::yield();
+ interrupt_cond<InterruptCond>.set(interruption_condition);
+ INTR_FUT_DEBUG(
+ "interruptible_future_detail::yield() yield back, interrupt_cond: {},{}",
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ }
+
+ static void maybe_yield() {
+ ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+ if (seastar::thread::should_yield()) {
+ auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
+ INTR_FUT_DEBUG(
+ "interruptible_future_detail::may_yield() yielding out, "
+ "interrupt_cond {},{} cleared",
+ (void*)interruption_condition.get(),
+ typeid(InterruptCond).name());
+ interrupt_cond<InterruptCond>.reset();
+ seastar::thread::yield();
+ interrupt_cond<InterruptCond>.set(interruption_condition);
+ INTR_FUT_DEBUG(
+ "interruptible_future_detail::may_yield() yield back, interrupt_cond: {},{}",
+ (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
+ typeid(InterruptCond).name());
+ }
+ }
+};
+
+} // namespace crimson::interruptible
+
+namespace seastar {
+
+template <typename InterruptCond, typename... T>
+struct futurize<::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, seastar::future<T...>>> {
+ using type = ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond, seastar::future<T...>>;
+
+ using value_type = typename type::value_type;
+ using tuple_type = typename type::tuple_type;
+
+ static type from_tuple(tuple_type&& value) {
+ return type(ready_future_marker(), std::move(value));
+ }
+ static type from_tuple(const tuple_type& value) {
+ return type(ready_future_marker(), value);
+ }
+ static type from_tuple(value_type&& value) {
+ return type(ready_future_marker(), std::move(value));
+ }
+ static type from_tuple(const value_type& value) {
+ return type(ready_future_marker(), value);
+ }
+
+ template <typename Func, typename... FuncArgs>
+ [[gnu::always_inline]]
+ static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
+ try {
+ return func(std::forward<FuncArgs>(args)...);
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ static type invoke(Func&& func, seastar::internal::monostate) noexcept {
+ try {
+ return ::seastar::futurize_invoke(std::forward<Func>(func));
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <typename Arg>
+ static inline type make_exception_future(Arg&& arg) noexcept {
+ return seastar::make_exception_future<T...>(std::forward<Arg>(arg));
+ }
+
+ static inline type make_exception_future(future_state_base&& state) noexcept {
+ return seastar::internal::make_exception_future<T...>(std::move(state));
+ }
+
+ template<typename PromiseT, typename Func>
+ static void satisfy_with_result_of(PromiseT&& pr, Func&& func) {
+ func().forward_to(std::move(pr));
+ }
+};
+
+template <typename InterruptCond,
+ template <typename...> typename ErroratedFuture,
+ typename... T>
+struct futurize<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond,
+ ErroratedFuture<::crimson::errorated_future_marker<T...>>
+ >
+> {
+ using type = ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond,
+ ErroratedFuture<::crimson::errorated_future_marker<T...>>>;
+ using core_type = ErroratedFuture<
+ ::crimson::errorated_future_marker<T...>>;
+ using errorator_type =
+ ::crimson::interruptible::interruptible_errorator<
+ InterruptCond,
+ typename ErroratedFuture<
+ ::crimson::errorated_future_marker<T...>>::errorator_type>;
+
+ template<typename Func, typename... FuncArgs>
+ static inline type invoke(Func&& func, FuncArgs&&... args) noexcept {
+ try {
+ return func(std::forward<FuncArgs>(args)...);
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <typename Func>
+ [[gnu::always_inline]]
+ static type invoke(Func&& func, seastar::internal::monostate) noexcept {
+ try {
+ return ::seastar::futurize_invoke(std::forward<Func>(func));
+ } catch (...) {
+ return make_exception_future(std::current_exception());
+ }
+ }
+
+ template <typename Arg>
+ static inline type make_exception_future(Arg&& arg) noexcept {
+ return core_type::errorator_type::template make_exception_future2<T...>(
+ std::forward<Arg>(arg));
+ }
+
+ template<typename PromiseT, typename Func>
+ static void satisfy_with_result_of(PromiseT&& pr, Func&& func) {
+ func().forward_to(std::move(pr));
+ }
+
+};
+
+template <typename InterruptCond, typename FutureType>
+struct continuation_base_from_future<
+ ::crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>> {
+ using type = typename seastar::continuation_base_from_future<FutureType>::type;
+};
+
+template <typename InterruptCond, typename FutureType>
+struct is_future<
+ ::crimson::interruptible::interruptible_future_detail<
+ InterruptCond,
+ FutureType>>
+ : std::true_type {};
+} // namespace seastar
diff --git a/src/crimson/common/layout.h b/src/crimson/common/layout.h
new file mode 100644
index 000000000..9d54ecd1d
--- /dev/null
+++ b/src/crimson/common/layout.h
@@ -0,0 +1,737 @@
+// Copyright 2018 The Abseil Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// MOTIVATION AND TUTORIAL
+//
+// If you want to put in a single heap allocation N doubles followed by M ints,
+// it's easy if N and M are known at compile time.
+//
+// struct S {
+// double a[N];
+// int b[M];
+// };
+//
+// S* p = new S;
+//
+// But what if N and M are known only in run time? Class template Layout to the
+// rescue! It's a portable generalization of the technique known as struct hack.
+//
+// // This object will tell us everything we need to know about the memory
+// // layout of double[N] followed by int[M]. It's structurally identical to
+// // size_t[2] that stores N and M. It's very cheap to create.
+// const Layout<double, int> layout(N, M);
+//
+// // Allocate enough memory for both arrays. `AllocSize()` tells us how much
+// // memory is needed. We are free to use any allocation function we want as
+// // long as it returns aligned memory.
+// std::unique_ptr<unsigned char[]> p(new unsigned char[layout.AllocSize()]);
+//
+// // Obtain the pointer to the array of doubles.
+// // Equivalent to `reinterpret_cast<double*>(p.get())`.
+// //
+// // We could have written layout.Pointer<0>(p) instead. If all the types are
+// // unique you can use either form, but if some types are repeated you must
+// // use the index form.
+// double* a = layout.Pointer<double>(p.get());
+//
+// // Obtain the pointer to the array of ints.
+// // Equivalent to `reinterpret_cast<int*>(p.get() + N * 8)`.
+// int* b = layout.Pointer<int>(p);
+//
+// If we are unable to specify sizes of all fields, we can pass as many sizes as
+// we can to `Partial()`. In return, it'll allow us to access the fields whose
+// locations and sizes can be computed from the provided information.
+// `Partial()` comes in handy when the array sizes are embedded into the
+// allocation.
+//
+// // size_t[1] containing N, size_t[1] containing M, double[N], int[M].
+// using L = Layout<size_t, size_t, double, int>;
+//
+// unsigned char* Allocate(size_t n, size_t m) {
+// const L layout(1, 1, n, m);
+// unsigned char* p = new unsigned char[layout.AllocSize()];
+// *layout.Pointer<0>(p) = n;
+// *layout.Pointer<1>(p) = m;
+// return p;
+// }
+//
+// void Use(unsigned char* p) {
+// // First, extract N and M.
+// // Specify that the first array has only one element. Using `prefix` we
+// // can access the first two arrays but not more.
+// constexpr auto prefix = L::Partial(1);
+// size_t n = *prefix.Pointer<0>(p);
+// size_t m = *prefix.Pointer<1>(p);
+//
+// // Now we can get pointers to the payload.
+// const L layout(1, 1, n, m);
+// double* a = layout.Pointer<double>(p);
+// int* b = layout.Pointer<int>(p);
+// }
+//
+// The layout we used above combines fixed-size with dynamically-sized fields.
+// This is quite common. Layout is optimized for this use case and generates
+// optimal code. All computations that can be performed at compile time are
+// indeed performed at compile time.
+//
+// Efficiency tip: The order of fields matters. In `Layout<T1, ..., TN>` try to
+// ensure that `alignof(T1) >= ... >= alignof(TN)`. This way you'll have no
+// padding in between arrays.
+//
+// You can manually override the alignment of an array by wrapping the type in
+// `Aligned<T, N>`. `Layout<..., Aligned<T, N>, ...>` has exactly the same API
+// and behavior as `Layout<..., T, ...>` except that the first element of the
+// array of `T` is aligned to `N` (the rest of the elements follow without
+// padding). `N` cannot be less than `alignof(T)`.
+//
+// `AllocSize()` and `Pointer()` are the most basic methods for dealing with
+// memory layouts. Check out the reference or code below to discover more.
+//
+// EXAMPLE
+//
+// // Immutable move-only string with sizeof equal to sizeof(void*). The
+// // string size and the characters are kept in the same heap allocation.
+// class CompactString {
+// public:
+// CompactString(const char* s = "") {
+// const size_t size = strlen(s);
+// // size_t[1] followed by char[size + 1].
+// const L layout(1, size + 1);
+// p_.reset(new unsigned char[layout.AllocSize()]);
+// // If running under ASAN, mark the padding bytes, if any, to catch
+// // memory errors.
+// layout.PoisonPadding(p_.get());
+// // Store the size in the allocation.
+// *layout.Pointer<size_t>(p_.get()) = size;
+// // Store the characters in the allocation.
+// memcpy(layout.Pointer<char>(p_.get()), s, size + 1);
+// }
+//
+// size_t size() const {
+// // Equivalent to reinterpret_cast<size_t&>(*p).
+// return *L::Partial().Pointer<size_t>(p_.get());
+// }
+//
+// const char* c_str() const {
+// // Equivalent to reinterpret_cast<char*>(p.get() + sizeof(size_t)).
+// // The argument in Partial(1) specifies that we have size_t[1] in front
+// // of the characters.
+// return L::Partial(1).Pointer<char>(p_.get());
+// }
+//
+// private:
+// // Our heap allocation contains a size_t followed by an array of chars.
+// using L = Layout<size_t, char>;
+// std::unique_ptr<unsigned char[]> p_;
+// };
+//
+// int main() {
+// CompactString s = "hello";
+// assert(s.size() == 5);
+// assert(strcmp(s.c_str(), "hello") == 0);
+// }
+//
+// DOCUMENTATION
+//
+// The interface exported by this file consists of:
+// - class `Layout<>` and its public members.
+// - The public members of class `internal_layout::LayoutImpl<>`. That class
+// isn't intended to be used directly, and its name and template parameter
+// list are internal implementation details, but the class itself provides
+// most of the functionality in this file. See comments on its members for
+// detailed documentation.
+//
+// `Layout<T1,... Tn>::Partial(count1,..., countm)` (where `m` <= `n`) returns a
+// `LayoutImpl<>` object. `Layout<T1,..., Tn> layout(count1,..., countn)`
+// creates a `Layout` object, which exposes the same functionality by inheriting
+// from `LayoutImpl<>`.
+
+#ifndef ABSL_CONTAINER_INTERNAL_LAYOUT_H_
+#define ABSL_CONTAINER_INTERNAL_LAYOUT_H_
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <ostream>
+#include <string>
+#include <tuple>
+#include <type_traits>
+#include <typeinfo>
+#include <utility>
+
+#ifdef ADDRESS_SANITIZER
+#include <sanitizer/asan_interface.h>
+#endif
+
+// for C++20 std::span
+#include <boost/beast/core/span.hpp>
+#include <fmt/format.h>
+
+#if defined(__GXX_RTTI)
+#define ABSL_INTERNAL_HAS_CXA_DEMANGLE
+#endif
+
+#ifdef ABSL_INTERNAL_HAS_CXA_DEMANGLE
+#include <cxxabi.h>
+#endif
+
+namespace absl {
+namespace container_internal {
+
+// A type wrapper that instructs `Layout` to use the specific alignment for the
+// array. `Layout<..., Aligned<T, N>, ...>` has exactly the same API
+// and behavior as `Layout<..., T, ...>` except that the first element of the
+// array of `T` is aligned to `N` (the rest of the elements follow without
+// padding).
+//
+// Requires: `N >= alignof(T)` and `N` is a power of 2.
+template <class T, size_t N>
+struct Aligned;
+
+namespace internal_layout {
+
+template <class T>
+struct NotAligned {};
+
+template <class T, size_t N>
+struct NotAligned<const Aligned<T, N>> {
+ static_assert(sizeof(T) == 0, "Aligned<T, N> cannot be const-qualified");
+};
+
+template <size_t>
+using IntToSize = size_t;
+
+template <class>
+using TypeToSize = size_t;
+
+template <class T>
+struct Type : NotAligned<T> {
+ using type = T;
+};
+
+template <class T, size_t N>
+struct Type<Aligned<T, N>> {
+ using type = T;
+};
+
+template <class T>
+struct SizeOf : NotAligned<T>, std::integral_constant<size_t, sizeof(T)> {};
+
+template <class T, size_t N>
+struct SizeOf<Aligned<T, N>> : std::integral_constant<size_t, sizeof(T)> {};
+
+// Note: workaround for https://gcc.gnu.org/PR88115
+template <class T>
+struct AlignOf : NotAligned<T> {
+ static constexpr size_t value = alignof(T);
+};
+
+template <class T, size_t N>
+struct AlignOf<Aligned<T, N>> {
+ static_assert(N % alignof(T) == 0,
+ "Custom alignment can't be lower than the type's alignment");
+ static constexpr size_t value = N;
+};
+
+// Does `Ts...` contain `T`?
+template <class T, class... Ts>
+using Contains = std::disjunction<std::is_same<T, Ts>...>;
+
+template <class From, class To>
+using CopyConst =
+ typename std::conditional_t<std::is_const_v<From>, const To, To>;
+
+// Note: We're not qualifying this with absl:: because it doesn't compile under
+// MSVC.
+template <class T>
+using SliceType = boost::beast::span<T>;
+
+// This namespace contains no types. It prevents functions defined in it from
+// being found by ADL.
+namespace adl_barrier {
+
+template <class Needle, class... Ts>
+constexpr size_t Find(Needle, Needle, Ts...) {
+ static_assert(!Contains<Needle, Ts...>(), "Duplicate element type");
+ return 0;
+}
+
+template <class Needle, class T, class... Ts>
+constexpr size_t Find(Needle, T, Ts...) {
+ return adl_barrier::Find(Needle(), Ts()...) + 1;
+}
+
+constexpr bool IsPow2(size_t n) { return !(n & (n - 1)); }
+
+// Returns `q * m` for the smallest `q` such that `q * m >= n`.
+// Requires: `m` is a power of two. It's enforced by IsLegalElementType below.
+constexpr size_t Align(size_t n, size_t m) { return (n + m - 1) & ~(m - 1); }
+
+constexpr size_t Min(size_t a, size_t b) { return b < a ? b : a; }
+
+constexpr size_t Max(size_t a) { return a; }
+
+template <class... Ts>
+constexpr size_t Max(size_t a, size_t b, Ts... rest) {
+ return adl_barrier::Max(b < a ? a : b, rest...);
+}
+
+template <class T>
+std::string TypeName() {
+ std::string out;
+ int status = 0;
+ char* demangled = nullptr;
+#ifdef ABSL_INTERNAL_HAS_CXA_DEMANGLE
+ demangled = abi::__cxa_demangle(typeid(T).name(), nullptr, nullptr, &status);
+#endif
+ if (status == 0 && demangled != nullptr) { // Demangling succeeded.
+ out = fmt::format("<{}>", demangled);
+ free(demangled);
+ } else {
+#if defined(__GXX_RTTI) || defined(_CPPRTTI)
+ out = fmt::format("<{}>", typeid(T).name());
+#endif
+ }
+ return out;
+}
+
+} // namespace adl_barrier
+
+template <bool C>
+using EnableIf = typename std::enable_if_t<C, int>;
+
+// Can `T` be a template argument of `Layout`?
+template <class T>
+using IsLegalElementType = std::integral_constant<
+ bool, !std::is_reference_v<T> && !std::is_volatile_v<T> &&
+ !std::is_reference_v<typename Type<T>::type> &&
+ !std::is_volatile_v<typename Type<T>::type> &&
+ adl_barrier::IsPow2(AlignOf<T>::value)>;
+
+template <class Elements, class SizeSeq, class OffsetSeq>
+class LayoutImpl;
+
+// Public base class of `Layout` and the result type of `Layout::Partial()`.
+//
+// `Elements...` contains all template arguments of `Layout` that created this
+// instance.
+//
+// `SizeSeq...` is `[0, NumSizes)` where `NumSizes` is the number of arguments
+// passed to `Layout::Partial()` or `Layout::Layout()`.
+//
+// `OffsetSeq...` is `[0, NumOffsets)` where `NumOffsets` is
+// `Min(sizeof...(Elements), NumSizes + 1)` (the number of arrays for which we
+// can compute offsets).
+template <class... Elements, size_t... SizeSeq, size_t... OffsetSeq>
+class LayoutImpl<std::tuple<Elements...>, std::index_sequence<SizeSeq...>,
+ std::index_sequence<OffsetSeq...>> {
+ private:
+ static_assert(sizeof...(Elements) > 0, "At least one field is required");
+ static_assert(std::conjunction_v<IsLegalElementType<Elements>...>,
+ "Invalid element type (see IsLegalElementType)");
+
+ enum {
+ NumTypes = sizeof...(Elements),
+ NumSizes = sizeof...(SizeSeq),
+ NumOffsets = sizeof...(OffsetSeq),
+ };
+
+ // These are guaranteed by `Layout`.
+ static_assert(NumOffsets == adl_barrier::Min(NumTypes, NumSizes + 1),
+ "Internal error");
+ static_assert(NumTypes > 0, "Internal error");
+
+ // Returns the index of `T` in `Elements...`. Results in a compilation error
+ // if `Elements...` doesn't contain exactly one instance of `T`.
+ template <class T>
+ static constexpr size_t ElementIndex() {
+ static_assert(Contains<Type<T>, Type<typename Type<Elements>::type>...>(),
+ "Type not found");
+ return adl_barrier::Find(Type<T>(),
+ Type<typename Type<Elements>::type>()...);
+ }
+
+ template <size_t N>
+ using ElementAlignment =
+ AlignOf<typename std::tuple_element<N, std::tuple<Elements...>>::type>;
+
+ public:
+ // Element types of all arrays packed in a tuple.
+ using ElementTypes = std::tuple<typename Type<Elements>::type...>;
+
+ // Element type of the Nth array.
+ template <size_t N>
+ using ElementType = typename std::tuple_element<N, ElementTypes>::type;
+
+ constexpr explicit LayoutImpl(IntToSize<SizeSeq>... sizes)
+ : size_{sizes...} {}
+
+ // Alignment of the layout, equal to the strictest alignment of all elements.
+ // All pointers passed to the methods of layout must be aligned to this value.
+ static constexpr size_t Alignment() {
+ return adl_barrier::Max(AlignOf<Elements>::value...);
+ }
+
+ // Offset in bytes of the Nth array.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // assert(x.Offset<0>() == 0); // The ints starts from 0.
+ // assert(x.Offset<1>() == 16); // The doubles starts from 16.
+ //
+ // Requires: `N <= NumSizes && N < sizeof...(Ts)`.
+ template <size_t N, EnableIf<N == 0> = 0>
+ constexpr size_t Offset() const {
+ return 0;
+ }
+
+ template <size_t N, EnableIf<N != 0> = 0>
+ constexpr size_t Offset() const {
+ static_assert(N < NumOffsets, "Index out of bounds");
+ return adl_barrier::Align(
+ Offset<N - 1>() + SizeOf<ElementType<N - 1>>() * size_[N - 1],
+ ElementAlignment<N>::value);
+ }
+
+ // Offset in bytes of the array with the specified element type. There must
+ // be exactly one such array and its zero-based index must be at most
+ // `NumSizes`.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // assert(x.Offset<int>() == 0); // The ints starts from 0.
+ // assert(x.Offset<double>() == 16); // The doubles starts from 16.
+ template <class T>
+ constexpr size_t Offset() const {
+ return Offset<ElementIndex<T>()>();
+ }
+
+ // Offsets in bytes of all arrays for which the offsets are known.
+ constexpr std::array<size_t, NumOffsets> Offsets() const {
+ return {{Offset<OffsetSeq>()...}};
+ }
+
+ // The number of elements in the Nth array. This is the Nth argument of
+ // `Layout::Partial()` or `Layout::Layout()` (zero-based).
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // assert(x.Size<0>() == 3);
+ // assert(x.Size<1>() == 4);
+ //
+ // Requires: `N < NumSizes`.
+ template <size_t N>
+ constexpr size_t Size() const {
+ static_assert(N < NumSizes, "Index out of bounds");
+ return size_[N];
+ }
+
+ // The number of elements in the array with the specified element type.
+ // There must be exactly one such array and its zero-based index must be
+ // at most `NumSizes`.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // assert(x.Size<int>() == 3);
+ // assert(x.Size<double>() == 4);
+ template <class T>
+ constexpr size_t Size() const {
+ return Size<ElementIndex<T>()>();
+ }
+
+ // The number of elements of all arrays for which they are known.
+ constexpr std::array<size_t, NumSizes> Sizes() const {
+ return {{Size<SizeSeq>()...}};
+ }
+
+ // Pointer to the beginning of the Nth array.
+ //
+ // `Char` must be `[const] [signed|unsigned] char`.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // unsigned char* p = new unsigned char[x.AllocSize()];
+ // int* ints = x.Pointer<0>(p);
+ // double* doubles = x.Pointer<1>(p);
+ //
+ // Requires: `N <= NumSizes && N < sizeof...(Ts)`.
+ // Requires: `p` is aligned to `Alignment()`.
+ template <size_t N, class Char>
+ CopyConst<Char, ElementType<N>>* Pointer(Char* p) const {
+ using C = typename std::remove_const<Char>::type;
+ static_assert(
+ std::is_same<C, char>() || std::is_same<C, unsigned char>() ||
+ std::is_same<C, signed char>(),
+ "The argument must be a pointer to [const] [signed|unsigned] char");
+ constexpr size_t alignment = Alignment();
+ (void)alignment;
+ assert(reinterpret_cast<uintptr_t>(p) % alignment == 0);
+ return reinterpret_cast<CopyConst<Char, ElementType<N>>*>(p + Offset<N>());
+ }
+
+ // Pointer to the beginning of the array with the specified element type.
+ // There must be exactly one such array and its zero-based index must be at
+ // most `NumSizes`.
+ //
+ // `Char` must be `[const] [signed|unsigned] char`.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // unsigned char* p = new unsigned char[x.AllocSize()];
+ // int* ints = x.Pointer<int>(p);
+ // double* doubles = x.Pointer<double>(p);
+ //
+ // Requires: `p` is aligned to `Alignment()`.
+ template <class T, class Char>
+ CopyConst<Char, T>* Pointer(Char* p) const {
+ return Pointer<ElementIndex<T>()>(p);
+ }
+
+ // Pointers to all arrays for which pointers are known.
+ //
+ // `Char` must be `[const] [signed|unsigned] char`.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // unsigned char* p = new unsigned char[x.AllocSize()];
+ //
+ // int* ints;
+ // double* doubles;
+ // std::tie(ints, doubles) = x.Pointers(p);
+ //
+ // Requires: `p` is aligned to `Alignment()`.
+ //
+ // Note: We're not using ElementType alias here because it does not compile
+ // under MSVC.
+ template <class Char>
+ std::tuple<CopyConst<
+ Char, typename std::tuple_element<OffsetSeq, ElementTypes>::type>*...>
+ Pointers(Char* p) const {
+ return std::tuple<CopyConst<Char, ElementType<OffsetSeq>>*...>(
+ Pointer<OffsetSeq>(p)...);
+ }
+
+ // The Nth array.
+ //
+ // `Char` must be `[const] [signed|unsigned] char`.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // unsigned char* p = new unsigned char[x.AllocSize()];
+ // Span<int> ints = x.Slice<0>(p);
+ // Span<double> doubles = x.Slice<1>(p);
+ //
+ // Requires: `N < NumSizes`.
+ // Requires: `p` is aligned to `Alignment()`.
+ template <size_t N, class Char>
+ SliceType<CopyConst<Char, ElementType<N>>> Slice(Char* p) const {
+ return SliceType<CopyConst<Char, ElementType<N>>>(Pointer<N>(p), Size<N>());
+ }
+
+ // The array with the specified element type. There must be exactly one
+ // such array and its zero-based index must be less than `NumSizes`.
+ //
+ // `Char` must be `[const] [signed|unsigned] char`.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // unsigned char* p = new unsigned char[x.AllocSize()];
+ // Span<int> ints = x.Slice<int>(p);
+ // Span<double> doubles = x.Slice<double>(p);
+ //
+ // Requires: `p` is aligned to `Alignment()`.
+ template <class T, class Char>
+ SliceType<CopyConst<Char, T>> Slice(Char* p) const {
+ return Slice<ElementIndex<T>()>(p);
+ }
+
+ // All arrays with known sizes.
+ //
+ // `Char` must be `[const] [signed|unsigned] char`.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // unsigned char* p = new unsigned char[x.AllocSize()];
+ //
+ // Span<int> ints;
+ // Span<double> doubles;
+ // std::tie(ints, doubles) = x.Slices(p);
+ //
+ // Requires: `p` is aligned to `Alignment()`.
+ //
+ // Note: We're not using ElementType alias here because it does not compile
+ // under MSVC.
+ template <class Char>
+ std::tuple<SliceType<CopyConst<
+ Char, typename std::tuple_element<SizeSeq, ElementTypes>::type>>...>
+ Slices(Char* p) const {
+ // Workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=63875 (fixed
+ // in 6.1).
+ (void)p;
+ return std::tuple<SliceType<CopyConst<Char, ElementType<SizeSeq>>>...>(
+ Slice<SizeSeq>(p)...);
+ }
+
+ // The size of the allocation that fits all arrays.
+ //
+ // // int[3], 4 bytes of padding, double[4].
+ // Layout<int, double> x(3, 4);
+ // unsigned char* p = new unsigned char[x.AllocSize()]; // 48 bytes
+ //
+ // Requires: `NumSizes == sizeof...(Ts)`.
+ constexpr size_t AllocSize() const {
+ static_assert(NumTypes == NumSizes, "You must specify sizes of all fields");
+ return Offset<NumTypes - 1>() +
+ SizeOf<ElementType<NumTypes - 1>>() * size_[NumTypes - 1];
+ }
+
+ // If built with --config=asan, poisons padding bytes (if any) in the
+ // allocation. The pointer must point to a memory block at least
+ // `AllocSize()` bytes in length.
+ //
+ // `Char` must be `[const] [signed|unsigned] char`.
+ //
+ // Requires: `p` is aligned to `Alignment()`.
+ template <class Char, size_t N = NumOffsets - 1, EnableIf<N == 0> = 0>
+ void PoisonPadding(const Char* p) const {
+ Pointer<0>(p); // verify the requirements on `Char` and `p`
+ }
+
+ template <class Char, size_t N = NumOffsets - 1, EnableIf<N != 0> = 0>
+ void PoisonPadding(const Char* p) const {
+ static_assert(N < NumOffsets, "Index out of bounds");
+ (void)p;
+#ifdef ADDRESS_SANITIZER
+ PoisonPadding<Char, N - 1>(p);
+ // The `if` is an optimization. It doesn't affect the observable behaviour.
+ if (ElementAlignment<N - 1>::value % ElementAlignment<N>::value) {
+ size_t start =
+ Offset<N - 1>() + SizeOf<ElementType<N - 1>>() * size_[N - 1];
+ ASAN_POISON_MEMORY_REGION(p + start, Offset<N>() - start);
+ }
+#endif
+ }
+
+ // Human-readable description of the memory layout. Useful for debugging.
+ // Slow.
+ //
+ // // char[5], 3 bytes of padding, int[3], 4 bytes of padding, followed
+ // // by an unknown number of doubles.
+ // auto x = Layout<char, int, double>::Partial(5, 3);
+ // assert(x.DebugString() ==
+ // "@0<char>(1)[5]; @8<int>(4)[3]; @24<double>(8)");
+ //
+ // Each field is in the following format: @offset<type>(sizeof)[size] (<type>
+ // may be missing depending on the target platform). For example,
+ // @8<int>(4)[3] means that at offset 8 we have an array of ints, where each
+ // int is 4 bytes, and we have 3 of those ints. The size of the last field may
+ // be missing (as in the example above). Only fields with known offsets are
+ // described. Type names may differ across platforms: one compiler might
+ // produce "unsigned*" where another produces "unsigned int *".
+ std::string DebugString() const {
+ const auto offsets = Offsets();
+ const size_t sizes[] = {SizeOf<ElementType<OffsetSeq>>()...};
+ const std::string types[] = {
+ adl_barrier::TypeName<ElementType<OffsetSeq>>()...};
+ std::string res = fmt::format("@0{}({})", types[0], sizes[0]);
+ for (size_t i = 0; i != NumOffsets - 1; ++i) {
+ res += fmt::format("[{}]; @({})", size_[i], offsets[i + 1], types[i + 1], sizes[i + 1]);
+ }
+ // NumSizes is a constant that may be zero. Some compilers cannot see that
+ // inside the if statement "size_[NumSizes - 1]" must be valid.
+ int last = static_cast<int>(NumSizes) - 1;
+ if (NumTypes == NumSizes && last >= 0) {
+ res += fmt::format("[{}]", size_[last]);
+ }
+ return res;
+ }
+
+ private:
+ // Arguments of `Layout::Partial()` or `Layout::Layout()`.
+ size_t size_[NumSizes > 0 ? NumSizes : 1];
+};
+
+template <size_t NumSizes, class... Ts>
+using LayoutType = LayoutImpl<
+ std::tuple<Ts...>, std::make_index_sequence<NumSizes>,
+ std::make_index_sequence<adl_barrier::Min(sizeof...(Ts), NumSizes + 1)>>;
+
+} // namespace internal_layout
+
+// Descriptor of arrays of various types and sizes laid out in memory one after
+// another. See the top of the file for documentation.
+//
+// Check out the public API of internal_layout::LayoutImpl above. The type is
+// internal to the library but its methods are public, and they are inherited
+// by `Layout`.
+template <class... Ts>
+class Layout : public internal_layout::LayoutType<sizeof...(Ts), Ts...> {
+ public:
+ static_assert(sizeof...(Ts) > 0, "At least one field is required");
+ static_assert(
+ std::conjunction_v<internal_layout::IsLegalElementType<Ts>...>,
+ "Invalid element type (see IsLegalElementType)");
+
+ // The result type of `Partial()` with `NumSizes` arguments.
+ template <size_t NumSizes>
+ using PartialType = internal_layout::LayoutType<NumSizes, Ts...>;
+
+ // `Layout` knows the element types of the arrays we want to lay out in
+ // memory but not the number of elements in each array.
+ // `Partial(size1, ..., sizeN)` allows us to specify the latter. The
+ // resulting immutable object can be used to obtain pointers to the
+ // individual arrays.
+ //
+ // It's allowed to pass fewer array sizes than the number of arrays. E.g.,
+ // if all you need is to the offset of the second array, you only need to
+ // pass one argument -- the number of elements in the first array.
+ //
+ // // int[3] followed by 4 bytes of padding and an unknown number of
+ // // doubles.
+ // auto x = Layout<int, double>::Partial(3);
+ // // doubles start at byte 16.
+ // assert(x.Offset<1>() == 16);
+ //
+ // If you know the number of elements in all arrays, you can still call
+ // `Partial()` but it's more convenient to use the constructor of `Layout`.
+ //
+ // Layout<int, double> x(3, 5);
+ //
+ // Note: The sizes of the arrays must be specified in number of elements,
+ // not in bytes.
+ //
+ // Requires: `sizeof...(Sizes) <= sizeof...(Ts)`.
+ // Requires: all arguments are convertible to `size_t`.
+ template <class... Sizes>
+ static constexpr PartialType<sizeof...(Sizes)> Partial(Sizes&&... sizes) {
+ static_assert(sizeof...(Sizes) <= sizeof...(Ts));
+ return PartialType<sizeof...(Sizes)>(std::forward<Sizes>(sizes)...);
+ }
+
+ // Creates a layout with the sizes of all arrays specified. If you know
+ // only the sizes of the first N arrays (where N can be zero), you can use
+ // `Partial()` defined above. The constructor is essentially equivalent to
+ // calling `Partial()` and passing in all array sizes; the constructor is
+ // provided as a convenient abbreviation.
+ //
+ // Note: The sizes of the arrays must be specified in number of elements,
+ // not in bytes.
+ constexpr explicit Layout(internal_layout::TypeToSize<Ts>... sizes)
+ : internal_layout::LayoutType<sizeof...(Ts), Ts...>(sizes...) {}
+};
+
+} // namespace container_internal
+} // namespace absl
+
+#endif // ABSL_CONTAINER_INTERNAL_LAYOUT_H_
diff --git a/src/crimson/common/local_shared_foreign_ptr.h b/src/crimson/common/local_shared_foreign_ptr.h
new file mode 100644
index 000000000..c4bd1099a
--- /dev/null
+++ b/src/crimson/common/local_shared_foreign_ptr.h
@@ -0,0 +1,245 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+
+#include <seastar/core/smp.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/sharded.hh>
+
+namespace crimson {
+
+/**
+ * local_shared_foreign_ptr
+ *
+ * See seastar/include/seastar/core/sharded.hh:foreign_ptr
+ *
+ * seastar::foreign_ptr wraps a smart ptr by proxying the copy() and destructor
+ * operations back to the original core. This works well except that copy()
+ * requires a cross-core call. We need a smart_ptr which allows cross-core
+ * caching of (for example) OSDMaps, but we want to avoid the overhead inherent
+ * in incrementing the source smart_ptr on every copy. Thus,
+ * local_shared_foreign_ptr maintains a core-local foreign_ptr back to the
+ * original core instance with core-local ref counting.
+ */
+template <typename PtrType>
+class local_shared_foreign_ptr {
+ using element_type = typename std::pointer_traits<PtrType>::element_type;
+ using pointer = element_type*;
+
+ seastar::lw_shared_ptr<seastar::foreign_ptr<PtrType>> ptr;
+
+ /// Wraps a pointer object and remembers the current core.
+ local_shared_foreign_ptr(seastar::foreign_ptr<PtrType> &&fptr)
+ : ptr(fptr ? seastar::make_lw_shared(std::move(fptr)) : nullptr) {
+ assert(!ptr || (ptr && *ptr));
+ }
+
+ template <typename T>
+ friend local_shared_foreign_ptr<T> make_local_shared_foreign(
+ seastar::foreign_ptr<T> &&);
+
+public:
+ /// Constructs a null local_shared_foreign_ptr<>.
+ local_shared_foreign_ptr() = default;
+
+ /// Constructs a null local_shared_foreign_ptr<>.
+ local_shared_foreign_ptr(std::nullptr_t) : local_shared_foreign_ptr() {}
+
+ /// Moves a local_shared_foreign_ptr<> to another object.
+ local_shared_foreign_ptr(local_shared_foreign_ptr&& other) = default;
+
+ /// Copies a local_shared_foreign_ptr<>
+ local_shared_foreign_ptr(const local_shared_foreign_ptr &other) = default;
+
+ /// Releases reference to ptr eventually releasing the contained foreign_ptr
+ ~local_shared_foreign_ptr() = default;
+
+ /// Creates a copy of this foreign ptr. Only works if the stored ptr is copyable.
+ seastar::future<seastar::foreign_ptr<PtrType>> get_foreign() const noexcept {
+ assert(!ptr || (ptr && *ptr));
+ return ptr ? ptr->copy() :
+ seastar::make_ready_future<seastar::foreign_ptr<PtrType>>(nullptr);
+ }
+
+ /// Accesses the wrapped object.
+ element_type& operator*() const noexcept {
+ assert(ptr && *ptr);
+ return **ptr;
+ }
+ /// Accesses the wrapped object.
+ element_type* operator->() const noexcept {
+ assert(ptr && *ptr);
+ return &**ptr;
+ }
+
+ /// Access the raw pointer to the wrapped object.
+ pointer get() const noexcept {
+ assert(!ptr || (ptr && *ptr));
+ return ptr ? ptr->get() : nullptr;
+ }
+
+ /// Return the owner-shard of the contained foreign_ptr.
+ unsigned get_owner_shard() const noexcept {
+ assert(!ptr || (ptr && *ptr));
+ return ptr ? ptr->get_owner_shard() : seastar::this_shard_id();
+ }
+
+ /// Checks whether the wrapped pointer is non-null.
+ operator bool() const noexcept {
+ assert(!ptr || (ptr && *ptr));
+ return static_cast<bool>(ptr);
+ }
+
+ /// Move-assigns a \c local_shared_foreign_ptr<>.
+ local_shared_foreign_ptr& operator=(local_shared_foreign_ptr&& other) noexcept {
+ ptr = std::move(other.ptr);
+ return *this;
+ }
+
+ /// Copy-assigns a \c local_shared_foreign_ptr<>.
+ local_shared_foreign_ptr& operator=(const local_shared_foreign_ptr& other) noexcept {
+ ptr = other.ptr;
+ return *this;
+ }
+
+ /// Reset the containing ptr
+ void reset() noexcept {
+ assert(!ptr || (ptr && *ptr));
+ ptr = nullptr;
+ }
+};
+
+/// Wraps a smart_ptr T in a local_shared_foreign_ptr<>.
+template <typename T>
+local_shared_foreign_ptr<T> make_local_shared_foreign(
+ seastar::foreign_ptr<T> &&ptr) {
+ return local_shared_foreign_ptr<T>(std::move(ptr));
+}
+
+/// Wraps ptr in a local_shared_foreign_ptr<>.
+template <typename T>
+local_shared_foreign_ptr<T> make_local_shared_foreign(T &&ptr) {
+ return make_local_shared_foreign<T>(
+ ptr ? seastar::make_foreign(std::forward<T>(ptr)) : nullptr);
+}
+
+template <typename T, typename U>
+inline bool operator==(const local_shared_foreign_ptr<T> &x,
+ const local_shared_foreign_ptr<U> &y) {
+ return x.get() == y.get();
+}
+
+template <typename T>
+inline bool operator==(const local_shared_foreign_ptr<T> &x, std::nullptr_t) {
+ return x.get() == nullptr;
+}
+
+template <typename T>
+inline bool operator==(std::nullptr_t, const local_shared_foreign_ptr<T>& y) {
+ return nullptr == y.get();
+}
+
+template <typename T, typename U>
+inline bool operator!=(const local_shared_foreign_ptr<T> &x,
+ const local_shared_foreign_ptr<U> &y) {
+ return x.get() != y.get();
+}
+
+template <typename T>
+inline bool operator!=(const local_shared_foreign_ptr<T> &x, std::nullptr_t) {
+ return x.get() != nullptr;
+}
+
+template <typename T>
+inline bool operator!=(std::nullptr_t, const local_shared_foreign_ptr<T>& y) {
+ return nullptr != y.get();
+}
+
+template <typename T, typename U>
+inline bool operator<(const local_shared_foreign_ptr<T> &x,
+ const local_shared_foreign_ptr<U> &y) {
+ return x.get() < y.get();
+}
+
+template <typename T>
+inline bool operator<(const local_shared_foreign_ptr<T> &x, std::nullptr_t) {
+ return x.get() < nullptr;
+}
+
+template <typename T>
+inline bool operator<(std::nullptr_t, const local_shared_foreign_ptr<T>& y) {
+ return nullptr < y.get();
+}
+
+template <typename T, typename U>
+inline bool operator<=(const local_shared_foreign_ptr<T> &x,
+ const local_shared_foreign_ptr<U> &y) {
+ return x.get() <= y.get();
+}
+
+template <typename T>
+inline bool operator<=(const local_shared_foreign_ptr<T> &x, std::nullptr_t) {
+ return x.get() <= nullptr;
+}
+
+template <typename T>
+inline bool operator<=(std::nullptr_t, const local_shared_foreign_ptr<T>& y) {
+ return nullptr <= y.get();
+}
+
+template <typename T, typename U>
+inline bool operator>(const local_shared_foreign_ptr<T> &x,
+ const local_shared_foreign_ptr<U> &y) {
+ return x.get() > y.get();
+}
+
+template <typename T>
+inline bool operator>(const local_shared_foreign_ptr<T> &x, std::nullptr_t) {
+ return x.get() > nullptr;
+}
+
+template <typename T>
+inline bool operator>(std::nullptr_t, const local_shared_foreign_ptr<T>& y) {
+ return nullptr > y.get();
+}
+
+template <typename T, typename U>
+inline bool operator>=(const local_shared_foreign_ptr<T> &x,
+ const local_shared_foreign_ptr<U> &y) {
+ return x.get() >= y.get();
+}
+
+template <typename T>
+inline bool operator>=(const local_shared_foreign_ptr<T> &x, std::nullptr_t) {
+ return x.get() >= nullptr;
+}
+
+template <typename T>
+inline bool operator>=(std::nullptr_t, const local_shared_foreign_ptr<T>& y) {
+ return nullptr >= y.get();
+}
+
+}
+
+namespace std {
+
+template <typename T>
+struct hash<crimson::local_shared_foreign_ptr<T>>
+ : private hash<typename std::pointer_traits<T>::element_type *> {
+ size_t operator()(const crimson::local_shared_foreign_ptr<T>& p) const {
+ return hash<typename std::pointer_traits<T>::element_type *>::operator()(p.get());
+ }
+};
+
+}
+
+namespace seastar {
+
+template<typename T>
+struct is_smart_ptr<crimson::local_shared_foreign_ptr<T>> : std::true_type {};
+
+}
diff --git a/src/crimson/common/log.cc b/src/crimson/common/log.cc
new file mode 100644
index 000000000..cae9f6a7b
--- /dev/null
+++ b/src/crimson/common/log.cc
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "log.h"
+
+static std::array<seastar::logger, ceph_subsys_get_num()> loggers{
+#define SUBSYS(name, log_level, gather_level) \
+ seastar::logger(#name),
+#define DEFAULT_SUBSYS(log_level, gather_level) \
+ seastar::logger("none"),
+ #include "common/subsys.h"
+#undef SUBSYS
+#undef DEFAULT_SUBSYS
+};
+
+namespace crimson {
+seastar::logger& get_logger(int subsys) {
+ assert(subsys < ceph_subsys_max);
+ return loggers[subsys];
+}
+}
diff --git a/src/crimson/common/log.h b/src/crimson/common/log.h
new file mode 100644
index 000000000..27ff550d8
--- /dev/null
+++ b/src/crimson/common/log.h
@@ -0,0 +1,88 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <fmt/format.h>
+#include <seastar/util/log.hh>
+
+#include "common/subsys_types.h"
+
+namespace crimson {
+seastar::logger& get_logger(int subsys);
+static inline seastar::log_level to_log_level(int level) {
+ if (level < 0) {
+ return seastar::log_level::error;
+ } else if (level < 1) {
+ return seastar::log_level::warn;
+ } else if (level <= 5) {
+ return seastar::log_level::info;
+ } else if (level <= 20) {
+ return seastar::log_level::debug;
+ } else {
+ return seastar::log_level::trace;
+ }
+}
+}
+
+/* Logging convenience macros
+ *
+ * The intention here is to standardize prefixing log lines with the function name
+ * and a context prefix (like the operator<< for the PG). Place
+ *
+ * SET_SUBSYS(osd);
+ *
+ * at the top of the file to declare the log lines within the file as being (in this case)
+ * in the osd subsys. At the beginning of each method/function, add
+ *
+ * LOG_PREFIX(Class::method_name)
+ *
+ * to set the FNAME symbol to Class::method_name. In order to use the log macros
+ * within lambdas, capture FNAME by value.
+ *
+ * Log lines can then be declared using the appropriate macro below.
+ */
+
+#define SET_SUBSYS(subname_) static constexpr auto SOURCE_SUBSYS = ceph_subsys_##subname_
+#define LOCAL_LOGGER crimson::get_logger(SOURCE_SUBSYS)
+#define LOGGER(subname_) crimson::get_logger(ceph_subsys_##subname_)
+#define LOG_PREFIX(x) constexpr auto FNAME = #x
+
+#define LOG(level_, MSG, ...) \
+ LOCAL_LOGGER.log(level_, "{}: " MSG, FNAME , ##__VA_ARGS__)
+#define SUBLOG(subname_, level_, MSG, ...) \
+ LOGGER(subname_).log(level_, "{}: " MSG, FNAME , ##__VA_ARGS__)
+
+#define TRACE(...) LOG(seastar::log_level::trace, __VA_ARGS__)
+#define SUBTRACE(subname_, ...) SUBLOG(subname_, seastar::log_level::trace, __VA_ARGS__)
+
+#define DEBUG(...) LOG(seastar::log_level::debug, __VA_ARGS__)
+#define SUBDEBUG(subname_, ...) SUBLOG(subname_, seastar::log_level::debug, __VA_ARGS__)
+
+#define INFO(...) LOG(seastar::log_level::info, __VA_ARGS__)
+#define SUBINFO(subname_, ...) SUBLOG(subname_, seastar::log_level::info, __VA_ARGS__)
+
+#define WARN(...) LOG(seastar::log_level::warn, __VA_ARGS__)
+#define SUBWARN(subname_, ...) SUBLOG(subname_, seastar::log_level::warn, __VA_ARGS__)
+
+#define ERROR(...) LOG(seastar::log_level::error, __VA_ARGS__)
+#define SUBERROR(subname_, ...) SUBLOG(subname_, seastar::log_level::error, __VA_ARGS__)
+
+// *DPP macros are intended to take DoutPrefixProvider implementations, but anything with
+// an operator<< will work as a prefix
+
+#define SUBLOGDPP(subname_, level_, MSG, dpp, ...) \
+ LOGGER(subname_).log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__)
+#define SUBTRACEDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::trace, __VA_ARGS__)
+#define SUBDEBUGDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::debug, __VA_ARGS__)
+#define SUBINFODPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::info, __VA_ARGS__)
+#define SUBWARNDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::warn, __VA_ARGS__)
+#define SUBERRORDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::error, __VA_ARGS__)
+
+#define LOGDPP(level_, MSG, dpp, ...) \
+ LOCAL_LOGGER.log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__)
+#define TRACEDPP(...) LOGDPP(seastar::log_level::trace, __VA_ARGS__)
+#define DEBUGDPP(...) LOGDPP(seastar::log_level::debug, __VA_ARGS__)
+#define INFODPP(...) LOGDPP(seastar::log_level::info, __VA_ARGS__)
+#define WARNDPP(...) LOGDPP(seastar::log_level::warn, __VA_ARGS__)
+#define ERRORDPP(...) LOGDPP(seastar::log_level::error, __VA_ARGS__)
diff --git a/src/crimson/common/logclient.cc b/src/crimson/common/logclient.cc
new file mode 100644
index 000000000..d402ecd19
--- /dev/null
+++ b/src/crimson/common/logclient.cc
@@ -0,0 +1,364 @@
+#include "crimson/common/logclient.h"
+#include <fmt/ranges.h>
+#include "include/str_map.h"
+#include "messages/MLog.h"
+#include "messages/MLogAck.h"
+#include "messages/MMonGetVersion.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/mon/MonClient.h"
+#include "mon/MonMap.h"
+#include "common/Graylog.h"
+
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::string;
+using crimson::common::local_conf;
+
+namespace {
+ seastar::logger& logger()
+ {
+ return crimson::get_logger(ceph_subsys_monc);
+ }
+}
+
+//TODO: in order to avoid unnecessary maps declarations and moving around,
+// create a named structure containing the maps and return optional
+// fit to it.
+int parse_log_client_options(CephContext *cct,
+ map<string,string> &log_to_monitors,
+ map<string,string> &log_to_syslog,
+ map<string,string> &log_channels,
+ map<string,string> &log_prios,
+ map<string,string> &log_to_graylog,
+ map<string,string> &log_to_graylog_host,
+ map<string,string> &log_to_graylog_port,
+ uuid_d &fsid,
+ string &host)
+{
+ ostringstream oss;
+
+ int r = get_conf_str_map_helper(
+ cct->_conf.get_val<string>("clog_to_monitors"), oss,
+ &log_to_monitors, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ logger().error("{} error parsing 'clog_to_monitors'", __func__);
+ return r;
+ }
+
+ r = get_conf_str_map_helper(
+ cct->_conf.get_val<string>("clog_to_syslog"), oss,
+ &log_to_syslog, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ logger().error("{} error parsing 'clog_to_syslog'", __func__);
+ return r;
+ }
+
+ r = get_conf_str_map_helper(
+ cct->_conf.get_val<string>("clog_to_syslog_facility"), oss,
+ &log_channels, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ logger().error("{} error parsing 'clog_to_syslog_facility'", __func__);
+ return r;
+ }
+
+ r = get_conf_str_map_helper(
+ cct->_conf.get_val<string>("clog_to_syslog_level"), oss,
+ &log_prios, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ logger().error("{} error parsing 'clog_to_syslog_level'", __func__);
+ return r;
+ }
+
+ r = get_conf_str_map_helper(
+ cct->_conf.get_val<string>("clog_to_graylog"), oss,
+ &log_to_graylog, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ logger().error("{} error parsing 'clog_to_graylog'", __func__);
+ return r;
+ }
+
+ r = get_conf_str_map_helper(
+ cct->_conf.get_val<string>("clog_to_graylog_host"), oss,
+ &log_to_graylog_host, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ logger().error("{} error parsing 'clog_to_graylog_host'", __func__);
+ return r;
+ }
+
+ r = get_conf_str_map_helper(
+ cct->_conf.get_val<string>("clog_to_graylog_port"), oss,
+ &log_to_graylog_port, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ logger().error("{} error parsing 'clog_to_graylog_port'", __func__);
+ return r;
+ }
+
+ fsid = cct->_conf.get_val<uuid_d>("fsid");
+ host = cct->_conf->host;
+ return 0;
+}
+
+LogChannel::LogChannel(LogClient *lc, const string &channel)
+ : parent(lc), log_channel(channel), log_to_syslog(false),
+ log_to_monitors(false)
+{
+}
+
+LogChannel::LogChannel(LogClient *lc, const string &channel,
+ const string &facility, const string &prio)
+ : parent(lc), log_channel(channel), log_prio(prio),
+ syslog_facility(facility), log_to_syslog(false),
+ log_to_monitors(false)
+{
+}
+
+LogClient::LogClient(crimson::net::Messenger *m,
+ logclient_flag_t flags)
+ : messenger(m), is_mon(flags & FLAG_MON),
+ last_log_sent(0), last_log(0)
+{
+}
+
+void LogChannel::set_log_to_monitors(bool v)
+{
+ if (log_to_monitors != v) {
+ parent->reset();
+ log_to_monitors = v;
+ }
+}
+
+void LogChannel::update_config(map<string,string> &log_to_monitors,
+ map<string,string> &log_to_syslog,
+ map<string,string> &log_channels,
+ map<string,string> &log_prios,
+ map<string,string> &log_to_graylog,
+ map<string,string> &log_to_graylog_host,
+ map<string,string> &log_to_graylog_port,
+ uuid_d &fsid,
+ string &host)
+{
+ logger().debug(
+ "{} log_to_monitors {} log_to_syslog {} log_channels {} log_prios {}",
+ __func__, log_to_monitors, log_to_syslog, log_channels, log_prios);
+ bool to_monitors = (get_str_map_key(log_to_monitors, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY) == "true");
+ bool to_syslog = (get_str_map_key(log_to_syslog, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY) == "true");
+ string syslog_facility = get_str_map_key(log_channels, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY);
+ string prio = get_str_map_key(log_prios, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY);
+ bool to_graylog = (get_str_map_key(log_to_graylog, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY) == "true");
+ string graylog_host = get_str_map_key(log_to_graylog_host, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY);
+ string graylog_port_str = get_str_map_key(log_to_graylog_port, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY);
+ int graylog_port = atoi(graylog_port_str.c_str());
+
+ set_log_to_monitors(to_monitors);
+ set_log_to_syslog(to_syslog);
+ set_syslog_facility(syslog_facility);
+ set_log_prio(prio);
+
+ if (to_graylog && !graylog) { /* should but isn't */
+ graylog = seastar::make_shared<ceph::logging::Graylog>("clog");
+ } else if (!to_graylog && graylog) { /* shouldn't but is */
+ graylog = nullptr;
+ }
+
+ if (to_graylog && graylog) {
+ graylog->set_fsid(fsid);
+ graylog->set_hostname(host);
+ }
+
+ if (graylog && (!graylog_host.empty()) && (graylog_port != 0)) {
+ graylog->set_destination(graylog_host, graylog_port);
+ }
+
+ logger().debug("{} to_monitors: {} to_syslog: {}"
+ "syslog_facility: {} prio: {} to_graylog: {} graylog_host: {}"
+ "graylog_port: {}", __func__, (to_monitors ? "true" : "false"),
+ (to_syslog ? "true" : "false"), syslog_facility, prio,
+ (to_graylog ? "true" : "false"), graylog_host, graylog_port);
+}
+
+void LogChannel::do_log(clog_type prio, std::stringstream& ss)
+{
+ while (!ss.eof()) {
+ string s;
+ getline(ss, s);
+ if (!s.empty()) {
+ do_log(prio, s);
+ }
+ }
+}
+
+void LogChannel::do_log(clog_type prio, const std::string& s)
+{
+ if (CLOG_ERROR == prio) {
+ logger().error("log {} : {}", prio, s);
+ } else {
+ logger().warn("log {} : {}", prio, s);
+ }
+ LogEntry e;
+ e.stamp = ceph_clock_now();
+ e.addrs = parent->get_myaddrs();
+ e.name = parent->get_myname();
+ e.rank = parent->get_myrank();
+ e.prio = prio;
+ e.msg = s;
+ e.channel = get_log_channel();
+
+ // seq and who should be set for syslog/graylog/log_to_mon
+ // log to monitor?
+ if (log_to_monitors) {
+ e.seq = parent->queue(e);
+ } else {
+ e.seq = parent->get_next_seq();
+ }
+
+ // log to syslog?
+ if (do_log_to_syslog()) {
+ logger().warn("{} log to syslog", __func__);
+ e.log_to_syslog(get_log_prio(), get_syslog_facility());
+ }
+
+ // log to graylog?
+ if (do_log_to_graylog()) {
+ logger().warn("{} log to graylog", __func__);
+ graylog->log_log_entry(&e);
+ }
+}
+
+MessageURef LogClient::get_mon_log_message(log_flushing_t flush_flag)
+{
+ if (flush_flag == log_flushing_t::FLUSH) {
+ if (log_queue.empty()) {
+ return {};
+ }
+ // reset session
+ last_log_sent = log_queue.front().seq;
+ }
+ return _get_mon_log_message();
+}
+
+bool LogClient::are_pending() const
+{
+ return last_log > last_log_sent;
+}
+
+MessageURef LogClient::_get_mon_log_message()
+{
+ if (log_queue.empty()) {
+ return {};
+ }
+
+ // only send entries that haven't been sent yet during this mon
+ // session! monclient needs to call reset_session() on mon session
+ // reset for this to work right.
+
+ if (last_log_sent == last_log) {
+ return {};
+ }
+
+ // limit entries per message
+ const int64_t num_unsent = last_log - last_log_sent;
+ int64_t num_to_send;
+ if (local_conf()->mon_client_max_log_entries_per_message > 0) {
+ num_to_send = std::min(num_unsent,
+ local_conf()->mon_client_max_log_entries_per_message);
+ } else {
+ num_to_send = num_unsent;
+ }
+
+ logger().debug("log_queue is {} last_log {} sent {} num {} unsent {}"
+ " sending {}", log_queue.size(), last_log,
+ last_log_sent, log_queue.size(), num_unsent, num_to_send);
+ ceph_assert((unsigned)num_unsent <= log_queue.size());
+ auto log_iter = log_queue.begin();
+ std::deque<LogEntry> out_log_queue; /* will send the logs contained here */
+ while (log_iter->seq <= last_log_sent) {
+ ++log_iter;
+ ceph_assert(log_iter != log_queue.end());
+ }
+ while (num_to_send--) {
+ ceph_assert(log_iter != log_queue.end());
+ out_log_queue.push_back(*log_iter);
+ last_log_sent = log_iter->seq;
+ logger().debug(" will send {}", *log_iter);
+ ++log_iter;
+ }
+
+ return crimson::make_message<MLog>(m_fsid,
+ std::move(out_log_queue));
+}
+
+version_t LogClient::queue(LogEntry &entry)
+{
+ entry.seq = ++last_log;
+ log_queue.push_back(entry);
+
+ return entry.seq;
+}
+
+void LogClient::reset()
+{
+ if (log_queue.size()) {
+ log_queue.clear();
+ }
+ last_log_sent = last_log;
+}
+
+uint64_t LogClient::get_next_seq()
+{
+ return ++last_log;
+}
+
+entity_addrvec_t LogClient::get_myaddrs() const
+{
+ return messenger->get_myaddrs();
+}
+
+entity_name_t LogClient::get_myrank()
+{
+ return messenger->get_myname();
+}
+
+const EntityName& LogClient::get_myname() const
+{
+ return local_conf()->name;
+}
+
+seastar::future<> LogClient::handle_log_ack(Ref<MLogAck> m)
+{
+ logger().debug("handle_log_ack {}", *m);
+
+ version_t last = m->last;
+
+ auto q = log_queue.begin();
+ while (q != log_queue.end()) {
+ const LogEntry &entry(*q);
+ if (entry.seq > last)
+ break;
+ logger().debug(" logged {}", entry);
+ q = log_queue.erase(q);
+ }
+ return seastar::now();
+}
+
+LogChannelRef LogClient::create_channel(const std::string& name) {
+ auto it = channels.find(name);
+ if (it == channels.end()) {
+ it = channels.insert(it,
+ {name, seastar::make_lw_shared<LogChannel>(this, name)});
+ }
+ return it->second;
+}
+
+seastar::future<> LogClient::set_fsid(const uuid_d& fsid) {
+ m_fsid = fsid;
+ return seastar::now();
+}
+
diff --git a/src/crimson/common/logclient.h b/src/crimson/common/logclient.h
new file mode 100644
index 000000000..ab9b25091
--- /dev/null
+++ b/src/crimson/common/logclient.h
@@ -0,0 +1,232 @@
+#ifndef CEPH_LOGCLIENT_H
+#define CEPH_LOGCLIENT_H
+
+#include "common/LogEntry.h"
+#include "common/ostream_temp.h"
+#include "common/ref.h"
+#include "include/health.h"
+#include "crimson/net/Fwd.h"
+
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/timer.hh>
+
+class LogClient;
+class MLog;
+class MLogAck;
+class Message;
+struct uuid_d;
+struct Connection;
+
+class LogChannel;
+
+namespace ceph {
+namespace logging {
+ class Graylog;
+}
+}
+
+template<typename Message> using Ref = boost::intrusive_ptr<Message>;
+namespace crimson::net {
+ class Messenger;
+}
+
+enum class log_flushing_t {
+ NO_FLUSH,
+ FLUSH
+};
+
+int parse_log_client_options(CephContext *cct,
+ std::map<std::string,std::string> &log_to_monitors,
+ std::map<std::string,std::string> &log_to_syslog,
+ std::map<std::string,std::string> &log_channels,
+ std::map<std::string,std::string> &log_prios,
+ std::map<std::string,std::string> &log_to_graylog,
+ std::map<std::string,std::string> &log_to_graylog_host,
+ std::map<std::string,std::string> &log_to_graylog_port,
+ uuid_d &fsid,
+ std::string &host);
+
+/** Manage where we output to and at which priority
+ *
+ * Not to be confused with the LogClient, which is the almighty coordinator
+ * of channels. We just deal with the boring part of the logging: send to
+ * syslog, send to file, generate LogEntry and queue it for the LogClient.
+ *
+ * Past queueing the LogEntry, the LogChannel is done with the whole thing.
+ * LogClient will deal with sending and handling of LogEntries.
+ */
+class LogChannel : public LoggerSinkSet
+{
+public:
+ LogChannel(LogClient *lc, const std::string &channel);
+ LogChannel(LogClient *lc, const std::string &channel,
+ const std::string &facility, const std::string &prio);
+
+ OstreamTemp debug() {
+ return OstreamTemp(CLOG_DEBUG, this);
+ }
+ void debug(std::stringstream &s) final {
+ do_log(CLOG_DEBUG, s);
+ }
+ /**
+ * Convenience function mapping health status to
+ * the appropriate cluster log severity.
+ */
+ OstreamTemp health(health_status_t health) {
+ switch(health) {
+ case HEALTH_OK:
+ return info();
+ case HEALTH_WARN:
+ return warn();
+ case HEALTH_ERR:
+ return error();
+ default:
+ // Invalid health_status_t value
+ ceph_abort();
+ }
+ }
+ OstreamTemp info() final {
+ return OstreamTemp(CLOG_INFO, this);
+ }
+ void info(std::stringstream &s) final {
+ do_log(CLOG_INFO, s);
+ }
+ OstreamTemp warn() final {
+ return OstreamTemp(CLOG_WARN, this);
+ }
+ void warn(std::stringstream &s) final {
+ do_log(CLOG_WARN, s);
+ }
+ OstreamTemp error() final {
+ return OstreamTemp(CLOG_ERROR, this);
+ }
+ void error(std::stringstream &s) final {
+ do_log(CLOG_ERROR, s);
+ }
+ OstreamTemp sec() final {
+ return OstreamTemp(CLOG_SEC, this);
+ }
+ void sec(std::stringstream &s) final {
+ do_log(CLOG_SEC, s);
+ }
+
+ void set_log_to_monitors(bool v);
+ void set_log_to_syslog(bool v) {
+ log_to_syslog = v;
+ }
+ void set_log_channel(const std::string& v) {
+ log_channel = v;
+ }
+ void set_log_prio(const std::string& v) {
+ log_prio = v;
+ }
+ void set_syslog_facility(const std::string& v) {
+ syslog_facility = v;
+ }
+ const std::string& get_log_prio() const { return log_prio; }
+ const std::string& get_log_channel() const { return log_channel; }
+ const std::string& get_syslog_facility() const { return syslog_facility; }
+ bool must_log_to_syslog() const { return log_to_syslog; }
+ /**
+ * Do we want to log to syslog?
+ *
+ * @return true if log_to_syslog is true and both channel and prio
+ * are not empty; false otherwise.
+ */
+ bool do_log_to_syslog() {
+ return must_log_to_syslog() &&
+ !log_prio.empty() && !log_channel.empty();
+ }
+ bool must_log_to_monitors() { return log_to_monitors; }
+
+ bool do_log_to_graylog() {
+ return (graylog != nullptr);
+ }
+
+ using Ref = seastar::lw_shared_ptr<LogChannel>;
+
+ /**
+ * update config values from parsed k/v std::map for each config option
+ *
+ * Pick out the relevant value based on our channel.
+ */
+ void update_config(std::map<std::string,std::string> &log_to_monitors,
+ std::map<std::string,std::string> &log_to_syslog,
+ std::map<std::string,std::string> &log_channels,
+ std::map<std::string,std::string> &log_prios,
+ std::map<std::string,std::string> &log_to_graylog,
+ std::map<std::string,std::string> &log_to_graylog_host,
+ std::map<std::string,std::string> &log_to_graylog_port,
+ uuid_d &fsid,
+ std::string &host);
+
+ void do_log(clog_type prio, std::stringstream& ss) final;
+ void do_log(clog_type prio, const std::string& s) final;
+
+private:
+ LogClient *parent;
+ std::string log_channel;
+ std::string log_prio;
+ std::string syslog_facility;
+ bool log_to_syslog;
+ bool log_to_monitors;
+ seastar::shared_ptr<ceph::logging::Graylog> graylog;
+};
+
+using LogChannelRef = LogChannel::Ref;
+
+class LogClient
+{
+public:
+ enum logclient_flag_t {
+ NO_FLAGS = 0,
+ FLAG_MON = 0x1,
+ };
+
+ LogClient(crimson::net::Messenger *m, logclient_flag_t flags);
+
+ virtual ~LogClient() = default;
+
+ seastar::future<> handle_log_ack(Ref<MLogAck> m);
+ MessageURef get_mon_log_message(log_flushing_t flush_flag);
+ bool are_pending() const;
+
+ LogChannelRef create_channel() {
+ return create_channel(CLOG_CHANNEL_DEFAULT);
+ }
+
+ LogChannelRef create_channel(const std::string& name);
+
+ void destroy_channel(const std::string& name) {
+ channels.erase(name);
+ }
+
+ void shutdown() {
+ channels.clear();
+ }
+
+ uint64_t get_next_seq();
+ entity_addrvec_t get_myaddrs() const;
+ const EntityName& get_myname() const;
+ entity_name_t get_myrank();
+ version_t queue(LogEntry &entry);
+ void reset();
+ seastar::future<> set_fsid(const uuid_d& fsid);
+
+private:
+ MessageURef _get_mon_log_message();
+
+ crimson::net::Messenger *messenger;
+ bool is_mon;
+ version_t last_log_sent;
+ version_t last_log;
+ std::deque<LogEntry> log_queue;
+
+ std::map<std::string, LogChannelRef> channels;
+ uuid_d m_fsid;
+};
+#endif
+
diff --git a/src/crimson/common/operation.cc b/src/crimson/common/operation.cc
new file mode 100644
index 000000000..53399fb9b
--- /dev/null
+++ b/src/crimson/common/operation.cc
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "operation.h"
+
+namespace crimson {
+
+void Operation::dump(ceph::Formatter* f) const
+{
+ f->open_object_section("operation");
+ f->dump_string("type", get_type_name());
+ f->dump_unsigned("id", id);
+ {
+ f->open_object_section("detail");
+ dump_detail(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void Operation::dump_brief(ceph::Formatter* f) const
+{
+ f->open_object_section("operation");
+ f->dump_string("type", get_type_name());
+ f->dump_unsigned("id", id);
+ f->close_section();
+}
+
+std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) {
+ lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail=";
+ rhs.print(lhs);
+ lhs << ")";
+ return lhs;
+}
+
+void Blocker::dump(ceph::Formatter* f) const
+{
+ f->open_object_section("blocker");
+ f->dump_string("op_type", get_type_name());
+ {
+ f->open_object_section("detail");
+ dump_detail(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+namespace detail {
+void dump_time_event(const char* name,
+ const utime_t& timestamp,
+ ceph::Formatter* f)
+{
+ assert(f);
+ f->open_object_section("time_event");
+ f->dump_string("name", name);
+ f->dump_stream("initiated_at") << timestamp;
+ f->close_section();
+}
+
+void dump_blocking_event(const char* name,
+ const utime_t& timestamp,
+ const Blocker* const blocker,
+ ceph::Formatter* f)
+{
+ assert(f);
+ f->open_object_section("blocking_event");
+ f->dump_string("name", name);
+ f->dump_stream("initiated_at") << timestamp;
+ if (blocker) {
+ blocker->dump(f);
+ }
+ f->close_section();
+}
+} // namespace detail
+} // namespace crimson
diff --git a/src/crimson/common/operation.h b/src/crimson/common/operation.h
new file mode 100644
index 000000000..6df2c99fd
--- /dev/null
+++ b/src/crimson/common/operation.h
@@ -0,0 +1,776 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <algorithm>
+#include <array>
+#include <set>
+#include <vector>
+#include <boost/core/demangle.hpp>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/shared_mutex.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/future-util.hh>
+
+#include "include/ceph_assert.h"
+#include "include/utime.h"
+#include "common/Clock.h"
+#include "common/Formatter.h"
+#include "crimson/common/interruptible_future.h"
+#include "crimson/common/smp_helpers.h"
+#include "crimson/common/log.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson {
+
+using registry_hook_t = boost::intrusive::list_member_hook<
+ boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
+
+class Operation;
+class Blocker;
+
+
+namespace detail {
+void dump_time_event(const char* name,
+ const utime_t& timestamp,
+ ceph::Formatter* f);
+void dump_blocking_event(const char* name,
+ const utime_t& timestamp,
+ const Blocker* blocker,
+ ceph::Formatter* f);
+} // namespace detail
+
+/**
+ * Provides an interface for dumping diagnostic information about
+ * why a particular op is not making progress.
+ */
+class Blocker {
+public:
+ void dump(ceph::Formatter *f) const;
+ virtual ~Blocker() = default;
+
+private:
+ virtual void dump_detail(ceph::Formatter *f) const = 0;
+ virtual const char *get_type_name() const = 0;
+};
+
+// the main template. by default an operation has no extenral
+// event handler (the empty tuple). specializing the template
+// allows to define backends on per-operation-type manner.
+// NOTE: basically this could be a function but C++ disallows
+// differentiating return type among specializations.
+template <class T>
+struct EventBackendRegistry {
+ template <typename...> static constexpr bool always_false = false;
+
+ static std::tuple<> get_backends() {
+ static_assert(always_false<T>, "Registry specialization not found");
+ return {};
+ }
+};
+
+template <class T>
+struct Event {
+ T* that() {
+ return static_cast<T*>(this);
+ }
+ const T* that() const {
+ return static_cast<const T*>(this);
+ }
+
+ template <class OpT, class... Args>
+ void trigger(OpT&& op, Args&&... args) {
+ that()->internal_backend.handle(*that(),
+ std::forward<OpT>(op),
+ std::forward<Args>(args)...);
+ // let's call `handle()` for concrete event type from each single
+ // of our backends. the order in the registry matters.
+ std::apply([&, //args=std::forward_as_tuple(std::forward<Args>(args)...),
+ this] (auto... backend) {
+ (..., backend.handle(*that(),
+ std::forward<OpT>(op),
+ std::forward<Args>(args)...));
+ }, EventBackendRegistry<std::decay_t<OpT>>::get_backends());
+ }
+};
+
+
+// simplest event type for recording things like beginning or end
+// of TrackableOperation's life.
+template <class T>
+struct TimeEvent : Event<T> {
+ struct Backend {
+ // `T` is passed solely to let implementations to discriminate
+ // basing on the type-of-event.
+ virtual void handle(T&, const Operation&) = 0;
+ };
+
+ // for the sake of dumping ops-in-flight.
+ struct InternalBackend final : Backend {
+ void handle(T&, const Operation&) override {
+ timestamp = ceph_clock_now();
+ }
+ utime_t timestamp;
+ } internal_backend;
+
+ void dump(ceph::Formatter *f) const {
+ auto demangled_name = boost::core::demangle(typeid(T).name());
+ detail::dump_time_event(
+ demangled_name.c_str(),
+ internal_backend.timestamp, f);
+ }
+
+ auto get_timestamp() const {
+ return internal_backend.timestamp;
+ }
+};
+
+
+template <typename T>
+class BlockerT : public Blocker {
+public:
+ struct BlockingEvent : Event<typename T::BlockingEvent> {
+ using Blocker = std::decay_t<T>;
+
+ struct Backend {
+ // `T` is based solely to let implementations to discriminate
+ // basing on the type-of-event.
+ virtual void handle(typename T::BlockingEvent&, const Operation&, const T&) = 0;
+ };
+
+ struct InternalBackend : Backend {
+ void handle(typename T::BlockingEvent&,
+ const Operation&,
+ const T& blocker) override {
+ this->timestamp = ceph_clock_now();
+ this->blocker = &blocker;
+ }
+
+ utime_t timestamp;
+ const T* blocker;
+ } internal_backend;
+
+ // we don't want to make any BlockerT to be aware and coupled with
+ // an operation. to not templatize an entire path from an op to
+ // a blocker, type erasuring is used.
+ struct TriggerI {
+ TriggerI(BlockingEvent& event) : event(event) {}
+
+ template <class FutureT>
+ auto maybe_record_blocking(FutureT&& fut, const T& blocker) {
+ if (!fut.available()) {
+ // a full blown call via vtable. that's the cost for templatization
+ // avoidance. anyway, most of the things actually have the type
+ // knowledge.
+ record_blocking(blocker);
+ return std::forward<FutureT>(fut).finally(
+ [&event=this->event, &blocker] () mutable {
+ // beware trigger instance may be already dead when this
+ // is executed!
+ record_unblocking(event, blocker);
+ });
+ }
+ return std::forward<FutureT>(fut);
+ }
+ virtual ~TriggerI() = default;
+ protected:
+ // it's for the sake of erasing the OpT type
+ virtual void record_blocking(const T& blocker) = 0;
+
+ static void record_unblocking(BlockingEvent& event, const T& blocker) {
+ assert(event.internal_backend.blocker == &blocker);
+ event.internal_backend.blocker = nullptr;
+ }
+
+ BlockingEvent& event;
+ };
+
+ template <class OpT>
+ struct Trigger : TriggerI {
+ Trigger(BlockingEvent& event, const OpT& op) : TriggerI(event), op(op) {}
+
+ template <class FutureT>
+ auto maybe_record_blocking(FutureT&& fut, const T& blocker) {
+ if (!fut.available()) {
+ // no need for the dynamic dispatch! if we're lucky, a compiler
+ // should collapse all these abstractions into a bunch of movs.
+ this->Trigger::record_blocking(blocker);
+ return std::forward<FutureT>(fut).finally(
+ [&event=this->event, &blocker] () mutable {
+ Trigger::record_unblocking(event, blocker);
+ });
+ }
+ return std::forward<FutureT>(fut);
+ }
+
+ const OpT &get_op() { return op; }
+
+ protected:
+ void record_blocking(const T& blocker) override {
+ this->event.trigger(op, blocker);
+ }
+
+ const OpT& op;
+ };
+
+ void dump(ceph::Formatter *f) const {
+ auto demangled_name = boost::core::demangle(typeid(T).name());
+ detail::dump_blocking_event(
+ demangled_name.c_str(),
+ internal_backend.timestamp,
+ internal_backend.blocker,
+ f);
+ }
+ };
+
+ virtual ~BlockerT() = default;
+ template <class TriggerT, class... Args>
+ decltype(auto) track_blocking(TriggerT&& trigger, Args&&... args) {
+ return std::forward<TriggerT>(trigger).maybe_record_blocking(
+ std::forward<Args>(args)..., static_cast<const T&>(*this));
+ }
+
+private:
+ const char *get_type_name() const final {
+ return static_cast<const T*>(this)->type_name;
+ }
+};
+
+template <class T>
+struct AggregateBlockingEvent {
+ struct TriggerI {
+ protected:
+ struct TriggerContainerI {
+ virtual typename T::TriggerI& get_trigger() = 0;
+ virtual ~TriggerContainerI() = default;
+ };
+ using TriggerContainerIRef = std::unique_ptr<TriggerContainerI>;
+ virtual TriggerContainerIRef create_part_trigger() = 0;
+
+ public:
+ template <class FutureT>
+ auto maybe_record_blocking(FutureT&& fut,
+ const typename T::Blocker& blocker) {
+ // AggregateBlockingEvent is supposed to be used on relatively cold
+ // paths (recovery), so we don't need to worry about the dynamic
+ // polymothps / dynamic memory's overhead.
+ auto tcont = create_part_trigger();
+ return tcont->get_trigger().maybe_record_blocking(
+ std::move(fut), blocker
+ ).finally([tcont=std::move(tcont)] {});
+ }
+
+ virtual ~TriggerI() = default;
+ };
+
+ template <class OpT>
+ struct Trigger final : TriggerI {
+ Trigger(AggregateBlockingEvent& event, const OpT& op)
+ : event(event), op(op) {}
+
+ class TriggerContainer final : public TriggerI::TriggerContainerI {
+ AggregateBlockingEvent& event;
+ typename decltype(event.events)::iterator iter;
+ typename T::template Trigger<OpT> trigger;
+
+ typename T::TriggerI &get_trigger() final {
+ return trigger;
+ }
+
+ public:
+ TriggerContainer(AggregateBlockingEvent& _event, const OpT& op) :
+ event(_event),
+ iter(event.events.emplace(event.events.end())),
+ trigger(*iter, op) {}
+
+ ~TriggerContainer() final {
+ event.events.erase(iter);
+ }
+ };
+
+ protected:
+ typename TriggerI::TriggerContainerIRef create_part_trigger() final {
+ return std::make_unique<TriggerContainer>(event, op);
+ }
+
+ private:
+ AggregateBlockingEvent& event;
+ const OpT& op;
+ };
+
+private:
+ std::list<T> events;
+ template <class OpT>
+ friend class Trigger;
+};
+
+/**
+ * Common base for all crimson-osd operations. Mainly provides
+ * an interface for registering ops in flight and dumping
+ * diagnostic information.
+ */
+class Operation : public boost::intrusive_ref_counter<
+ Operation, boost::thread_unsafe_counter> {
+ public:
+ using id_t = uint64_t;
+ static constexpr id_t NULL_ID = std::numeric_limits<uint64_t>::max();
+ id_t get_id() const {
+ return id;
+ }
+
+ static constexpr bool is_trackable = false;
+
+ virtual unsigned get_type() const = 0;
+ virtual const char *get_type_name() const = 0;
+ virtual void print(std::ostream &) const = 0;
+
+ void dump(ceph::Formatter *f) const;
+ void dump_brief(ceph::Formatter *f) const;
+ virtual ~Operation() = default;
+
+ private:
+ virtual void dump_detail(ceph::Formatter *f) const = 0;
+
+ registry_hook_t registry_hook;
+
+ id_t id = 0;
+ void set_id(id_t in_id) {
+ id = in_id;
+ }
+
+ friend class OperationRegistryI;
+ template <size_t>
+ friend class OperationRegistryT;
+};
+using OperationRef = boost::intrusive_ptr<Operation>;
+
+std::ostream &operator<<(std::ostream &, const Operation &op);
+
+/**
+ * Maintains a set of lists of all active ops.
+ */
+class OperationRegistryI {
+ using op_list_member_option = boost::intrusive::member_hook<
+ Operation,
+ registry_hook_t,
+ &Operation::registry_hook
+ >;
+
+ friend class Operation;
+ seastar::timer<seastar::lowres_clock> shutdown_timer;
+ seastar::promise<> shutdown;
+
+protected:
+ virtual void do_register(Operation *op) = 0;
+ virtual bool registries_empty() const = 0;
+ virtual void do_stop() = 0;
+
+public:
+ using op_list = boost::intrusive::list<
+ Operation,
+ op_list_member_option,
+ boost::intrusive::constant_time_size<false>>;
+
+ template <typename T, typename... Args>
+ auto create_operation(Args&&... args) {
+ boost::intrusive_ptr<T> op = new T(std::forward<Args>(args)...);
+ do_register(&*op);
+ return op;
+ }
+
+ seastar::future<> stop() {
+ crimson::get_logger(ceph_subsys_osd).info("OperationRegistryI::{}", __func__);
+ do_stop();
+ shutdown_timer.set_callback([this] {
+ if (registries_empty()) {
+ shutdown.set_value();
+ shutdown_timer.cancel();
+ }
+ });
+ shutdown_timer.arm_periodic(
+ std::chrono::milliseconds(100/*TODO: use option instead*/));
+ return shutdown.get_future();
+ }
+};
+
+
+template <size_t NUM_REGISTRIES>
+class OperationRegistryT : public OperationRegistryI {
+ Operation::id_t next_id = 0;
+ std::array<
+ op_list,
+ NUM_REGISTRIES
+ > registries;
+
+protected:
+ void do_register(Operation *op) final {
+ const auto op_type = op->get_type();
+ registries[op_type].push_back(*op);
+ op->set_id(++next_id);
+ }
+
+ bool registries_empty() const final {
+ return std::all_of(registries.begin(),
+ registries.end(),
+ [](auto& opl) {
+ return opl.empty();
+ });
+ }
+
+protected:
+ OperationRegistryT(core_id_t core)
+ // Use core to initialize upper 8 bits of counters to ensure that
+ // ids generated by different cores are disjoint
+ : next_id(static_cast<id_t>(core) <<
+ (std::numeric_limits<id_t>::digits - 8))
+ {}
+
+ template <size_t REGISTRY_INDEX>
+ const op_list& get_registry() const {
+ static_assert(
+ REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
+ return registries[REGISTRY_INDEX];
+ }
+
+ template <size_t REGISTRY_INDEX>
+ op_list& get_registry() {
+ static_assert(
+ REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
+ return registries[REGISTRY_INDEX];
+ }
+
+public:
+ /// Iterate over live ops
+ template <typename F>
+ void for_each_op(F &&f) const {
+ for (const auto &registry: registries) {
+ for (const auto &op: registry) {
+ std::invoke(f, op);
+ }
+ }
+ }
+
+ /// Removes op from registry
+ void remove_from_registry(Operation &op) {
+ const auto op_type = op.get_type();
+ registries[op_type].erase(op_list::s_iterator_to(op));
+ }
+
+ /// Adds op to registry
+ void add_to_registry(Operation &op) {
+ const auto op_type = op.get_type();
+ registries[op_type].push_back(op);
+ }
+};
+
+class PipelineExitBarrierI {
+public:
+ using Ref = std::unique_ptr<PipelineExitBarrierI>;
+
+ /// Waits for exit barrier
+ virtual std::optional<seastar::future<>> wait() = 0;
+
+ /// Releases pipeline stage, can only be called after wait
+ virtual void exit() = 0;
+
+ /// Releases pipeline resources without waiting on barrier
+ virtual void cancel() = 0;
+
+ /// Must ensure that resources are released, likely by calling cancel()
+ virtual ~PipelineExitBarrierI() {}
+};
+
+template <class T>
+class PipelineStageIT : public BlockerT<T> {
+ const core_id_t core = seastar::this_shard_id();
+public:
+ core_id_t get_core() const { return core; }
+
+ template <class... Args>
+ decltype(auto) enter(Args&&... args) {
+ return static_cast<T*>(this)->enter(std::forward<Args>(args)...);
+ }
+};
+
+class PipelineHandle {
+ PipelineExitBarrierI::Ref barrier;
+
+ std::optional<seastar::future<>> wait_barrier() {
+ return barrier ? barrier->wait() : std::nullopt;
+ }
+
+public:
+ PipelineHandle() = default;
+
+ PipelineHandle(const PipelineHandle&) = delete;
+ PipelineHandle(PipelineHandle&&) = default;
+ PipelineHandle &operator=(const PipelineHandle&) = delete;
+ PipelineHandle &operator=(PipelineHandle&&) = default;
+
+ /**
+ * Returns a future which unblocks when the handle has entered the passed
+ * OrderedPipelinePhase. If already in a phase, enter will also release
+ * that phase after placing itself in the queue for the next one to preserve
+ * ordering.
+ */
+ template <typename OpT, typename T>
+ seastar::future<>
+ enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
+ ceph_assert(stage.get_core() == seastar::this_shard_id());
+ auto wait_fut = wait_barrier();
+ if (wait_fut.has_value()) {
+ return wait_fut.value().then([this, &stage, t=std::move(t)] () mutable {
+ auto fut = t.maybe_record_blocking(stage.enter(t), stage);
+ exit();
+ return std::move(fut).then(
+ [this, t=std::move(t)](auto &&barrier_ref) mutable {
+ barrier = std::move(barrier_ref);
+ return seastar::now();
+ });
+ });
+ } else {
+ auto fut = t.maybe_record_blocking(stage.enter(t), stage);
+ exit();
+ return std::move(fut).then(
+ [this, t=std::move(t)](auto &&barrier_ref) mutable {
+ barrier = std::move(barrier_ref);
+ return seastar::now();
+ });
+ }
+ }
+
+ /**
+ * Completes pending exit barrier without entering a new one.
+ */
+ seastar::future<> complete() {
+ auto ret = wait_barrier();
+ barrier.reset();
+ return ret ? std::move(ret.value()) : seastar::now();
+ }
+
+ /**
+ * Exits current phase, skips exit barrier, should only be used for op
+ * failure. Permitting the handle to be destructed as the same effect.
+ */
+ void exit() {
+ barrier.reset();
+ }
+
+};
+
+/**
+ * Ensures that at most one op may consider itself in the phase at a time.
+ * Ops will see enter() unblock in the order in which they tried to enter
+ * the phase. entering (though not necessarily waiting for the future to
+ * resolve) a new phase prior to exiting the previous one will ensure that
+ * the op ordering is preserved.
+ */
+template <class T>
+class OrderedExclusivePhaseT : public PipelineStageIT<T> {
+ void dump_detail(ceph::Formatter *f) const final {
+ f->dump_unsigned("waiting", waiting);
+ if (held_by != Operation::NULL_ID) {
+ f->dump_unsigned("held_by_operation_id", held_by);
+ }
+ }
+
+ class ExitBarrier final : public PipelineExitBarrierI {
+ OrderedExclusivePhaseT *phase;
+ Operation::id_t op_id;
+ public:
+ ExitBarrier(OrderedExclusivePhaseT *phase, Operation::id_t id)
+ : phase(phase), op_id(id) {}
+
+ std::optional<seastar::future<>> wait() final {
+ return std::nullopt;
+ }
+
+ void exit() final {
+ if (phase) {
+ auto *p = phase;
+ auto id = op_id;
+ phase = nullptr;
+ std::ignore = seastar::smp::submit_to(
+ p->get_core(),
+ [p, id] {
+ p->exit(id);
+ });
+ }
+ }
+
+ void cancel() final {
+ exit();
+ }
+
+ ~ExitBarrier() final {
+ cancel();
+ }
+ };
+
+ void exit(Operation::id_t op_id) {
+ clear_held_by(op_id);
+ mutex.unlock();
+ }
+
+public:
+ template <class TriggerT>
+ seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
+ waiting++;
+ return mutex.lock().then([this, op_id=t.get_op().get_id()] {
+ ceph_assert_always(waiting > 0);
+ --waiting;
+ set_held_by(op_id);
+ return PipelineExitBarrierI::Ref(new ExitBarrier{this, op_id});
+ });
+ }
+
+private:
+ void set_held_by(Operation::id_t id) {
+ ceph_assert_always(held_by == Operation::NULL_ID);
+ held_by = id;
+ }
+
+ void clear_held_by(Operation::id_t id) {
+ ceph_assert_always(held_by == id);
+ held_by = Operation::NULL_ID;
+ }
+
+ unsigned waiting = 0;
+ seastar::shared_mutex mutex;
+ Operation::id_t held_by = Operation::NULL_ID;
+};
+
+/**
+ * Permits multiple ops to inhabit the stage concurrently, but ensures that
+ * they will proceed to the next stage in the order in which they called
+ * enter.
+ */
+template <class T>
+class OrderedConcurrentPhaseT : public PipelineStageIT<T> {
+ using base_t = PipelineStageIT<T>;
+public:
+ struct BlockingEvent : base_t::BlockingEvent {
+ using base_t::BlockingEvent::BlockingEvent;
+
+ struct ExitBarrierEvent : TimeEvent<ExitBarrierEvent> {};
+
+ template <class OpT>
+ struct Trigger : base_t::BlockingEvent::template Trigger<OpT> {
+ using base_t::BlockingEvent::template Trigger<OpT>::Trigger;
+
+ template <class FutureT>
+ decltype(auto) maybe_record_exit_barrier(FutureT&& fut) {
+ if (!fut.available()) {
+ exit_barrier_event.trigger(this->op);
+ }
+ return std::forward<FutureT>(fut);
+ }
+
+ ExitBarrierEvent exit_barrier_event;
+ };
+ };
+
+private:
+ void dump_detail(ceph::Formatter *f) const final {}
+
+ template <class TriggerT>
+ class ExitBarrier final : public PipelineExitBarrierI {
+ OrderedConcurrentPhaseT *phase;
+ std::optional<seastar::future<>> barrier;
+ TriggerT trigger;
+ public:
+ ExitBarrier(
+ OrderedConcurrentPhaseT *phase,
+ seastar::future<> &&barrier,
+ TriggerT& trigger) : phase(phase), barrier(std::move(barrier)), trigger(trigger) {}
+
+ std::optional<seastar::future<>> wait() final {
+ assert(phase);
+ assert(barrier);
+ auto ret = std::move(*barrier);
+ barrier = std::nullopt;
+ return trigger.maybe_record_exit_barrier(std::move(ret));
+ }
+
+ void exit() final {
+ if (barrier) {
+ static_cast<void>(
+ std::move(*barrier).then([phase=this->phase] { phase->mutex.unlock(); }));
+ barrier = std::nullopt;
+ phase = nullptr;
+ }
+ if (phase) {
+ std::ignore = seastar::smp::submit_to(
+ phase->get_core(),
+ [this] {
+ phase->mutex.unlock();
+ phase = nullptr;
+ });
+ }
+ }
+
+ void cancel() final {
+ exit();
+ }
+
+ ~ExitBarrier() final {
+ cancel();
+ }
+ };
+
+public:
+ template <class TriggerT>
+ seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
+ return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
+ new ExitBarrier<TriggerT>{this, mutex.lock(), t});
+ }
+
+private:
+ seastar::shared_mutex mutex;
+};
+
+/**
+ * Imposes no ordering or exclusivity at all. Ops enter without constraint and
+ * may exit in any order. Useful mainly for informational purposes between
+ * stages with constraints.
+ */
+template <class T>
+class UnorderedStageT : public PipelineStageIT<T> {
+ void dump_detail(ceph::Formatter *f) const final {}
+
+ class ExitBarrier final : public PipelineExitBarrierI {
+ public:
+ ExitBarrier() = default;
+
+ std::optional<seastar::future<>> wait() final {
+ return std::nullopt;
+ }
+
+ void exit() final {}
+
+ void cancel() final {}
+
+ ~ExitBarrier() final {}
+ };
+
+public:
+ template <class... IgnoreArgs>
+ seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) {
+ return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
+ new ExitBarrier);
+ }
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::Operation> : fmt::ostream_formatter {};
+#endif
diff --git a/src/crimson/common/perf_counters_collection.cc b/src/crimson/common/perf_counters_collection.cc
new file mode 100644
index 000000000..254d85278
--- /dev/null
+++ b/src/crimson/common/perf_counters_collection.cc
@@ -0,0 +1,41 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "perf_counters_collection.h"
+
+namespace crimson::common {
+PerfCountersCollection::PerfCountersCollection()
+{
+ perf_collection = std::make_unique<PerfCountersCollectionImpl>();
+}
+PerfCountersCollection::~PerfCountersCollection()
+{
+ perf_collection->clear();
+}
+
+PerfCountersCollectionImpl* PerfCountersCollection:: get_perf_collection()
+{
+ return perf_collection.get();
+}
+
+void PerfCountersCollection::dump_formatted(ceph::Formatter *f, bool schema,
+ bool dump_labeled,
+ const std::string &logger,
+ const std::string &counter)
+{
+ perf_collection->dump_formatted(f, schema, dump_labeled, logger, counter);
+}
+
+PerfCountersCollection::ShardedPerfCountersCollection PerfCountersCollection::sharded_perf_coll;
+
+void PerfCountersDeleter::operator()(PerfCounters* p) noexcept
+{
+ if (cct) {
+ cct->get_perfcounters_collection()->remove(p);
+ }
+ delete p;
+}
+
+}
+
diff --git a/src/crimson/common/perf_counters_collection.h b/src/crimson/common/perf_counters_collection.h
new file mode 100644
index 000000000..ae0c8670c
--- /dev/null
+++ b/src/crimson/common/perf_counters_collection.h
@@ -0,0 +1,49 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "common/perf_counters.h"
+#include "include/common_fwd.h"
+#include <seastar/core/sharded.hh>
+
+using crimson::common::PerfCountersCollectionImpl;
+namespace crimson::common {
+class PerfCountersCollection: public seastar::sharded<PerfCountersCollection>
+{
+ using ShardedPerfCountersCollection = seastar::sharded<PerfCountersCollection>;
+
+private:
+ std::unique_ptr<PerfCountersCollectionImpl> perf_collection;
+ static ShardedPerfCountersCollection sharded_perf_coll;
+ friend PerfCountersCollection& local_perf_coll();
+ friend ShardedPerfCountersCollection& sharded_perf_coll();
+
+public:
+ PerfCountersCollection();
+ ~PerfCountersCollection();
+ PerfCountersCollectionImpl* get_perf_collection();
+ void dump_formatted(ceph::Formatter *f, bool schema, bool dump_labeled,
+ const std::string &logger = "",
+ const std::string &counter = "");
+};
+
+inline PerfCountersCollection::ShardedPerfCountersCollection& sharded_perf_coll(){
+ return PerfCountersCollection::sharded_perf_coll;
+}
+
+inline PerfCountersCollection& local_perf_coll() {
+ return PerfCountersCollection::sharded_perf_coll.local();
+}
+
+class PerfCountersDeleter {
+ CephContext* cct;
+
+public:
+ PerfCountersDeleter() noexcept : cct(nullptr) {}
+ PerfCountersDeleter(CephContext* cct) noexcept : cct(cct) {}
+ void operator()(PerfCounters* p) noexcept;
+};
+}
+using PerfCountersRef = std::unique_ptr<crimson::common::PerfCounters, crimson::common::PerfCountersDeleter>;
+
diff --git a/src/crimson/common/shared_lru.h b/src/crimson/common/shared_lru.h
new file mode 100644
index 000000000..186f02a61
--- /dev/null
+++ b/src/crimson/common/shared_lru.h
@@ -0,0 +1,180 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <boost/smart_ptr/local_shared_ptr.hpp>
+#include <boost/smart_ptr/weak_ptr.hpp>
+#include "simple_lru.h"
+
+/// SharedLRU does its best to cache objects. It not only tracks the objects
+/// in its LRU cache with strong references, it also tracks objects with
+/// weak_ptr even if the cache does not hold any strong references to them. so
+/// that it can return the objects after they are evicted, as long as they've
+/// ever been cached and have not been destroyed yet.
+template<class K, class V>
+class SharedLRU {
+ using shared_ptr_t = boost::local_shared_ptr<V>;
+ using weak_ptr_t = boost::weak_ptr<V>;
+ using value_type = std::pair<K, shared_ptr_t>;
+
+ // weak_refs is already ordered, and we don't use accessors like
+ // LRUCache::lower_bound(), so unordered LRUCache would suffice.
+ SimpleLRU<K, shared_ptr_t, false> cache;
+ std::map<K, std::pair<weak_ptr_t, V*>> weak_refs;
+
+ struct Deleter {
+ SharedLRU<K,V>* cache;
+ const K key;
+ void operator()(V* ptr) {
+ cache->_erase_weak(key);
+ delete ptr;
+ }
+ };
+ void _erase_weak(const K& key) {
+ weak_refs.erase(key);
+ }
+public:
+ SharedLRU(size_t max_size = 20)
+ : cache{max_size}
+ {}
+ ~SharedLRU() {
+ cache.clear();
+ // initially, we were assuming that no pointer obtained from SharedLRU
+ // can outlive the lru itself. However, since going with the interruption
+ // concept for handling shutdowns, this is no longer valid.
+ weak_refs.clear();
+ }
+ /**
+ * Returns a reference to the given key, and perform an insertion if such
+ * key does not already exist
+ */
+ shared_ptr_t operator[](const K& key);
+ /**
+ * Returns true iff there are no live references left to anything that has been
+ * in the cache.
+ */
+ bool empty() const {
+ return weak_refs.empty();
+ }
+ size_t size() const {
+ return cache.size();
+ }
+ size_t capacity() const {
+ return cache.capacity();
+ }
+ /***
+ * Inserts a key if not present, or bumps it to the front of the LRU if
+ * it is, and then gives you a reference to the value. If the key already
+ * existed, you are responsible for deleting the new value you tried to
+ * insert.
+ *
+ * @param key The key to insert
+ * @param value The value that goes with the key
+ * @param existed Set to true if the value was already in the
+ * map, false otherwise
+ * @return A reference to the map's value for the given key
+ */
+ shared_ptr_t insert(const K& key, std::unique_ptr<V> value);
+ // clear all strong reference from the lru.
+ void clear() {
+ cache.clear();
+ }
+ shared_ptr_t find(const K& key);
+ // return the last element that is not greater than key
+ shared_ptr_t lower_bound(const K& key);
+ // return the first element that is greater than key
+ std::optional<value_type> upper_bound(const K& key);
+
+ void erase(const K& key) {
+ cache.erase(key);
+ _erase_weak(key);
+ }
+};
+
+template<class K, class V>
+typename SharedLRU<K,V>::shared_ptr_t
+SharedLRU<K,V>::insert(const K& key, std::unique_ptr<V> value)
+{
+ shared_ptr_t val;
+ if (auto found = weak_refs.find(key); found != weak_refs.end()) {
+ val = found->second.first.lock();
+ }
+ if (!val) {
+ val.reset(value.release(), Deleter{this, key});
+ weak_refs.emplace(key, std::make_pair(val, val.get()));
+ }
+ cache.insert(key, val);
+ return val;
+}
+
+template<class K, class V>
+typename SharedLRU<K,V>::shared_ptr_t
+SharedLRU<K,V>::operator[](const K& key)
+{
+ if (auto found = cache.find(key); found) {
+ return *found;
+ }
+ shared_ptr_t val;
+ if (auto found = weak_refs.find(key); found != weak_refs.end()) {
+ val = found->second.first.lock();
+ }
+ if (!val) {
+ val.reset(new V{}, Deleter{this, key});
+ weak_refs.emplace(key, std::make_pair(val, val.get()));
+ }
+ cache.insert(key, val);
+ return val;
+}
+
+template<class K, class V>
+typename SharedLRU<K,V>::shared_ptr_t
+SharedLRU<K,V>::find(const K& key)
+{
+ if (auto found = cache.find(key); found) {
+ return *found;
+ }
+ shared_ptr_t val;
+ if (auto found = weak_refs.find(key); found != weak_refs.end()) {
+ val = found->second.first.lock();
+ }
+ if (val) {
+ cache.insert(key, val);
+ }
+ return val;
+}
+
+template<class K, class V>
+typename SharedLRU<K,V>::shared_ptr_t
+SharedLRU<K,V>::lower_bound(const K& key)
+{
+ if (weak_refs.empty()) {
+ return {};
+ }
+ auto found = weak_refs.lower_bound(key);
+ if (found == weak_refs.end()) {
+ --found;
+ }
+ if (auto val = found->second.first.lock(); val) {
+ cache.insert(key, val);
+ return val;
+ } else {
+ return {};
+ }
+}
+
+template<class K, class V>
+std::optional<typename SharedLRU<K,V>::value_type>
+SharedLRU<K,V>::upper_bound(const K& key)
+{
+ for (auto found = weak_refs.upper_bound(key);
+ found != weak_refs.end();
+ ++found) {
+ if (auto val = found->second.first.lock(); val) {
+ return std::make_pair(found->first, val);
+ }
+ }
+ return std::nullopt;
+}
diff --git a/src/crimson/common/simple_lru.h b/src/crimson/common/simple_lru.h
new file mode 100644
index 000000000..1419c4885
--- /dev/null
+++ b/src/crimson/common/simple_lru.h
@@ -0,0 +1,141 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <list>
+#include <map>
+#include <optional>
+#include <type_traits>
+#include <unordered_map>
+
+template <class Key, class Value, bool Ordered>
+class SimpleLRU {
+ static_assert(std::is_default_constructible_v<Value>);
+ using list_type = std::list<Key>;
+ template<class K, class V>
+ using map_t = std::conditional_t<Ordered,
+ std::map<K, V>,
+ std::unordered_map<K, V>>;
+ using map_type = map_t<Key, std::pair<Value, typename list_type::iterator>>;
+ list_type lru;
+ map_type cache;
+ const size_t max_size;
+
+public:
+ SimpleLRU(size_t size = 20)
+ : cache(size),
+ max_size(size)
+ {}
+ size_t size() const {
+ return cache.size();
+ }
+ size_t capacity() const {
+ return max_size;
+ }
+ using insert_return_type = std::pair<Value, bool>;
+ insert_return_type insert(const Key& key, Value value);
+ std::optional<Value> find(const Key& key);
+ std::optional<std::enable_if<Ordered, Value>> lower_bound(const Key& key);
+ void erase(const Key& key);
+ void clear();
+private:
+ // bump the item to the front of the lru list
+ Value _lru_add(typename map_type::iterator found);
+ // evict the last element of most recently used list
+ void _evict();
+};
+
+template <class Key, class Value, bool Ordered>
+typename SimpleLRU<Key,Value,Ordered>::insert_return_type
+SimpleLRU<Key,Value,Ordered>::insert(const Key& key, Value value)
+{
+ if constexpr(Ordered) {
+ auto found = cache.lower_bound(key);
+ if (found != cache.end() && found->first == key) {
+ // already exists
+ return {found->second.first, true};
+ } else {
+ if (size() >= capacity()) {
+ _evict();
+ }
+ lru.push_front(key);
+ // use lower_bound as hint to save the lookup
+ cache.emplace_hint(found, key, std::make_pair(value, lru.begin()));
+ return {std::move(value), false};
+ }
+ } else {
+ // cache is not ordered
+ auto found = cache.find(key);
+ if (found != cache.end()) {
+ // already exists
+ return {found->second.first, true};
+ } else {
+ if (size() >= capacity()) {
+ _evict();
+ }
+ lru.push_front(key);
+ cache.emplace(key, std::make_pair(value, lru.begin()));
+ return {std::move(value), false};
+ }
+ }
+}
+
+template <class Key, class Value, bool Ordered>
+std::optional<Value> SimpleLRU<Key,Value,Ordered>::find(const Key& key)
+{
+ if (auto found = cache.find(key); found != cache.end()){
+ return _lru_add(found);
+ } else {
+ return {};
+ }
+}
+
+template <class Key, class Value, bool Ordered>
+std::optional<std::enable_if<Ordered, Value>>
+SimpleLRU<Key,Value,Ordered>::lower_bound(const Key& key)
+{
+ if (auto found = cache.lower_bound(key); found != cache.end()) {
+ return _lru_add(found);
+ } else {
+ return {};
+ }
+}
+
+template <class Key, class Value, bool Ordered>
+void SimpleLRU<Key,Value,Ordered>::clear()
+{
+ lru.clear();
+ cache.clear();
+}
+
+template <class Key, class Value, bool Ordered>
+void SimpleLRU<Key,Value,Ordered>::erase(const Key& key)
+{
+ if (auto found = cache.find(key); found != cache.end()) {
+ lru.erase(found->second.second);
+ cache.erase(found);
+ }
+}
+
+template <class Key, class Value, bool Ordered>
+Value SimpleLRU<Key,Value,Ordered>::_lru_add(
+ typename SimpleLRU<Key,Value,Ordered>::map_type::iterator found)
+{
+ auto& [value, in_lru] = found->second;
+ if (in_lru != lru.begin()){
+ // move item to the front
+ lru.splice(lru.begin(), lru, in_lru);
+ }
+ // the item is already at the front
+ return value;
+}
+
+template <class Key, class Value, bool Ordered>
+void SimpleLRU<Key,Value,Ordered>::_evict()
+{
+ // evict the last element of most recently used list
+ auto last = --lru.end();
+ cache.erase(*last);
+ lru.erase(last);
+}
diff --git a/src/crimson/common/smp_helpers.h b/src/crimson/common/smp_helpers.h
new file mode 100644
index 000000000..c2b7bd964
--- /dev/null
+++ b/src/crimson/common/smp_helpers.h
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <limits>
+
+#include <seastar/core/smp.hh>
+
+#include "crimson/common/errorator.h"
+#include "crimson/common/utility.h"
+
+namespace crimson {
+
+using core_id_t = seastar::shard_id;
+static constexpr core_id_t NULL_CORE = std::numeric_limits<core_id_t>::max();
+
+auto submit_to(core_id_t core, auto &&f) {
+ using ret_type = decltype(f());
+ if constexpr (is_errorated_future_v<ret_type>) {
+ auto ret = seastar::smp::submit_to(
+ core,
+ [f=std::move(f)]() mutable {
+ return f().to_base();
+ });
+ return ret_type(std::move(ret));
+ } else {
+ return seastar::smp::submit_to(core, std::move(f));
+ }
+}
+
+template <typename Obj, typename Method, typename... Args>
+auto proxy_method_on_core(
+ core_id_t core, Obj &obj, Method method, Args&&... args) {
+ return crimson::submit_to(
+ core,
+ [&obj, method,
+ arg_tuple=std::make_tuple(std::forward<Args>(args)...)]() mutable {
+ return apply_method_to_tuple(obj, method, std::move(arg_tuple));
+ });
+}
+
+/**
+ * reactor_map_seq
+ *
+ * Invokes f on each reactor sequentially, Caller may assume that
+ * f will not be invoked concurrently on multiple cores.
+ */
+template <typename F>
+auto reactor_map_seq(F &&f) {
+ using ret_type = decltype(f());
+ if constexpr (is_errorated_future_v<ret_type>) {
+ auto ret = crimson::do_for_each(
+ seastar::smp::all_cpus().begin(),
+ seastar::smp::all_cpus().end(),
+ [f=std::move(f)](auto core) mutable {
+ return seastar::smp::submit_to(
+ core,
+ [&f] {
+ return std::invoke(f);
+ });
+ });
+ return ret_type(ret);
+ } else {
+ return seastar::do_for_each(
+ seastar::smp::all_cpus().begin(),
+ seastar::smp::all_cpus().end(),
+ [f=std::move(f)](auto core) mutable {
+ return seastar::smp::submit_to(
+ core,
+ [&f] {
+ return std::invoke(f);
+ });
+ });
+ }
+}
+
+/**
+ * sharded_map_seq
+ *
+ * Invokes f on each shard of t sequentially. Caller may assume that
+ * f will not be invoked concurrently on multiple cores.
+ */
+template <typename T, typename F>
+auto sharded_map_seq(T &t, F &&f) {
+ return reactor_map_seq(
+ [&t, f=std::forward<F>(f)]() mutable {
+ return std::invoke(f, t.local());
+ });
+}
+
+}
diff --git a/src/crimson/common/throttle.cc b/src/crimson/common/throttle.cc
new file mode 100644
index 000000000..88d1859f3
--- /dev/null
+++ b/src/crimson/common/throttle.cc
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "throttle.h"
+
+namespace crimson::common {
+
+int64_t Throttle::take(int64_t c)
+{
+ if (max == 0u) {
+ return 0;
+ }
+ count += c;
+ return count;
+}
+
+int64_t Throttle::put(int64_t c)
+{
+ if (max == 0u) {
+ return 0;
+ }
+ if (!c) {
+ return count;
+ }
+ on_free_slots.signal();
+ count -= c;
+ return count;
+}
+
+seastar::future<> Throttle::get(size_t c)
+{
+ if (max == 0u) {
+ return seastar::make_ready_future<>();
+ }
+ pending++;
+ return on_free_slots.wait([this, c] {
+ return !_should_wait(c);
+ }).then([this, c] {
+ pending--;
+ count += c;
+ return seastar::make_ready_future<>();
+ });
+}
+
+void Throttle::reset_max(size_t m) {
+ if (max == m) {
+ return;
+ }
+
+ if (m > max) {
+ on_free_slots.signal();
+ }
+ max = m;
+}
+
+bool Throttle::_should_wait(size_t c) const {
+ if (!max) {
+ return false;
+ }
+ return ((c <= max && count + c > max) || // normally stay under max
+ (c >= max && count > max)); // except for large c
+}
+
+} // namespace crimson::common
diff --git a/src/crimson/common/throttle.h b/src/crimson/common/throttle.h
new file mode 100644
index 000000000..2998cb5f8
--- /dev/null
+++ b/src/crimson/common/throttle.h
@@ -0,0 +1,43 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/condition-variable.hh>
+// pull seastar::timer<...>::timer definitions. FIX SEASTAR or reactor.hh
+// is obligatory and should be included everywhere?
+#include <seastar/core/reactor.hh>
+
+#include "common/ThrottleInterface.h"
+
+namespace crimson::common {
+
+class Throttle final : public ThrottleInterface {
+ size_t max = 0;
+ size_t count = 0;
+ size_t pending = 0;
+ // we cannot change the "count" of seastar::semaphore after it is created,
+ // so use condition_variable instead.
+ seastar::condition_variable on_free_slots;
+public:
+ explicit Throttle(size_t m)
+ : max(m)
+ {}
+ int64_t take(int64_t c = 1) override;
+ int64_t put(int64_t c = 1) override;
+ seastar::future<> get(size_t c);
+ size_t get_current() const {
+ return count;
+ }
+ size_t get_max() const {
+ return max;
+ }
+ size_t get_pending() const {
+ return pending;
+ }
+ void reset_max(size_t m);
+private:
+ bool _should_wait(size_t c) const;
+};
+
+} // namespace crimson::common
diff --git a/src/crimson/common/tmap_helpers.cc b/src/crimson/common/tmap_helpers.cc
new file mode 100644
index 000000000..9c14ebc45
--- /dev/null
+++ b/src/crimson/common/tmap_helpers.cc
@@ -0,0 +1,131 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/common/tmap_helpers.h"
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "include/rados.h"
+
+namespace detail {
+
+#define decode_or_return(v, bp) \
+ try { \
+ ::decode(v, bp); \
+ } catch (...) { \
+ return -EINVAL; \
+ }
+
+class TMapContents {
+ std::map<std::string, bufferlist> keys;
+ bufferlist header;
+public:
+ TMapContents() = default;
+
+ int decode(bufferlist::const_iterator &bliter) {
+ keys.clear();
+ header.clear();
+ if (bliter.end()) {
+ return 0;
+ }
+ decode_or_return(header, bliter);
+ __u32 num_keys;
+ decode_or_return(num_keys, bliter);
+ for (; num_keys > 0; --num_keys) {
+ std::string key;
+ decode_or_return(key, bliter);
+ decode_or_return(keys[key], bliter);
+ }
+ return 0;
+ }
+
+ bufferlist encode() {
+ bufferlist bl;
+ ::encode(header, bl);
+ ::encode(static_cast<__u32>(keys.size()), bl);
+ for (auto &[k, v]: keys) {
+ ::encode(k, bl);
+ ::encode(v, bl);
+ }
+ return bl;
+ }
+
+ int update(bufferlist::const_iterator in) {
+ while (!in.end()) {
+ __u8 op;
+ decode_or_return(op, in);
+
+ if (op == CEPH_OSD_TMAP_HDR) {
+ decode_or_return(header, in);
+ continue;
+ }
+
+ std::string key;
+ decode_or_return(key, in);
+
+ switch (op) {
+ case CEPH_OSD_TMAP_SET: {
+ decode_or_return(keys[key], in);
+ break;
+ }
+ case CEPH_OSD_TMAP_CREATE: {
+ if (keys.contains(key)) {
+ return -EEXIST;
+ }
+ decode_or_return(keys[key], in);
+ break;
+ }
+ case CEPH_OSD_TMAP_RM: {
+ auto kiter = keys.find(key);
+ if (kiter == keys.end()) {
+ return -ENOENT;
+ }
+ keys.erase(kiter);
+ break;
+ }
+ case CEPH_OSD_TMAP_RMSLOPPY: {
+ keys.erase(key);
+ break;
+ }
+ }
+ }
+ return 0;
+ }
+
+ int put(bufferlist::const_iterator in) {
+ return 0;
+ }
+};
+
+}
+
+namespace crimson::common {
+
+using do_tmap_up_ret = tl::expected<bufferlist, int>;
+do_tmap_up_ret do_tmap_up(bufferlist::const_iterator in, bufferlist contents)
+{
+ detail::TMapContents tmap;
+ auto bliter = contents.cbegin();
+ int r = tmap.decode(bliter);
+ if (r < 0) {
+ return tl::unexpected(r);
+ }
+ r = tmap.update(in);
+ if (r < 0) {
+ return tl::unexpected(r);
+ }
+ return tmap.encode();
+}
+
+using do_tmap_up_ret = tl::expected<bufferlist, int>;
+do_tmap_up_ret do_tmap_put(bufferlist::const_iterator in)
+{
+ detail::TMapContents tmap;
+ int r = tmap.decode(in);
+ if (r < 0) {
+ return tl::unexpected(r);
+ }
+ return tmap.encode();
+}
+
+}
diff --git a/src/crimson/common/tmap_helpers.h b/src/crimson/common/tmap_helpers.h
new file mode 100644
index 000000000..446dbea2a
--- /dev/null
+++ b/src/crimson/common/tmap_helpers.h
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "include/expected.hpp"
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+
+namespace crimson::common {
+
+/**
+ * do_tmap_up
+ *
+ * Performs tmap update instructions encoded in buffer referenced by in.
+ *
+ * @param [in] in iterator to buffer containing encoded tmap update operations
+ * @param [in] contents current contents of object
+ * @return buffer containing new object contents,
+ * -EINVAL for decoding errors,
+ * -EEXIST for CEPH_OSD_TMAP_CREATE on a key that exists
+ * -ENOENT for CEPH_OSD_TMAP_RM on a key that does not exist
+ */
+using do_tmap_up_ret = tl::expected<bufferlist, int>;
+do_tmap_up_ret do_tmap_up(bufferlist::const_iterator in, bufferlist contents);
+
+/**
+ * do_tmap_put
+ *
+ * Validates passed buffer pointed to by in and returns resulting object buffer.
+ *
+ * @param [in] in iterator to buffer containing tmap encoding
+ * @return buffer containing validated tmap encoded by in
+ * -EINVAL for decoding errors,
+ */
+using do_tmap_up_ret = tl::expected<bufferlist, int>;
+do_tmap_up_ret do_tmap_put(bufferlist::const_iterator in);
+
+}
diff --git a/src/crimson/common/tri_mutex.cc b/src/crimson/common/tri_mutex.cc
new file mode 100644
index 000000000..e4b181280
--- /dev/null
+++ b/src/crimson/common/tri_mutex.cc
@@ -0,0 +1,225 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tri_mutex.h"
+
+seastar::future<> read_lock::lock()
+{
+ return static_cast<tri_mutex*>(this)->lock_for_read();
+}
+
+void read_lock::unlock()
+{
+ static_cast<tri_mutex*>(this)->unlock_for_read();
+}
+
+seastar::future<> write_lock::lock()
+{
+ return static_cast<tri_mutex*>(this)->lock_for_write(false);
+}
+
+void write_lock::unlock()
+{
+ static_cast<tri_mutex*>(this)->unlock_for_write();
+}
+
+seastar::future<> excl_lock::lock()
+{
+ return static_cast<tri_mutex*>(this)->lock_for_excl();
+}
+
+void excl_lock::unlock()
+{
+ static_cast<tri_mutex*>(this)->unlock_for_excl();
+}
+
+seastar::future<> excl_lock_from_read::lock()
+{
+ static_cast<tri_mutex*>(this)->promote_from_read();
+ return seastar::make_ready_future<>();
+}
+
+void excl_lock_from_read::unlock()
+{
+ static_cast<tri_mutex*>(this)->demote_to_read();
+}
+
+seastar::future<> excl_lock_from_write::lock()
+{
+ static_cast<tri_mutex*>(this)->promote_from_write();
+ return seastar::make_ready_future<>();
+}
+
+void excl_lock_from_write::unlock()
+{
+ static_cast<tri_mutex*>(this)->demote_to_write();
+}
+
+seastar::future<> excl_lock_from_excl::lock()
+{
+ return seastar::make_ready_future<>();
+}
+
+void excl_lock_from_excl::unlock()
+{
+}
+
+tri_mutex::~tri_mutex()
+{
+ assert(!is_acquired());
+}
+
+seastar::future<> tri_mutex::lock_for_read()
+{
+ if (try_lock_for_read()) {
+ return seastar::make_ready_future<>();
+ }
+ waiters.emplace_back(seastar::promise<>(), type_t::read);
+ return waiters.back().pr.get_future();
+}
+
+bool tri_mutex::try_lock_for_read() noexcept
+{
+ if (!writers && !exclusively_used && waiters.empty()) {
+ ++readers;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void tri_mutex::unlock_for_read()
+{
+ assert(readers > 0);
+ if (--readers == 0) {
+ wake();
+ }
+}
+
+void tri_mutex::promote_from_read()
+{
+ assert(readers == 1);
+ --readers;
+ exclusively_used = true;
+}
+
+void tri_mutex::demote_to_read()
+{
+ assert(exclusively_used);
+ exclusively_used = false;
+ ++readers;
+}
+
+seastar::future<> tri_mutex::lock_for_write(bool greedy)
+{
+ if (try_lock_for_write(greedy)) {
+ return seastar::make_ready_future<>();
+ }
+ waiters.emplace_back(seastar::promise<>(), type_t::write);
+ return waiters.back().pr.get_future();
+}
+
+bool tri_mutex::try_lock_for_write(bool greedy) noexcept
+{
+ if (!readers && !exclusively_used) {
+ if (greedy || waiters.empty()) {
+ ++writers;
+ return true;
+ }
+ }
+ return false;
+}
+
+void tri_mutex::unlock_for_write()
+{
+ assert(writers > 0);
+ if (--writers == 0) {
+ wake();
+ }
+}
+
+void tri_mutex::promote_from_write()
+{
+ assert(writers == 1);
+ --writers;
+ exclusively_used = true;
+}
+
+void tri_mutex::demote_to_write()
+{
+ assert(exclusively_used);
+ exclusively_used = false;
+ ++writers;
+}
+
+// for exclusive users
+seastar::future<> tri_mutex::lock_for_excl()
+{
+ if (try_lock_for_excl()) {
+ return seastar::make_ready_future<>();
+ }
+ waiters.emplace_back(seastar::promise<>(), type_t::exclusive);
+ return waiters.back().pr.get_future();
+}
+
+bool tri_mutex::try_lock_for_excl() noexcept
+{
+ if (readers == 0u && writers == 0u && !exclusively_used) {
+ exclusively_used = true;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void tri_mutex::unlock_for_excl()
+{
+ assert(exclusively_used);
+ exclusively_used = false;
+ wake();
+}
+
+bool tri_mutex::is_acquired() const
+{
+ if (readers != 0u) {
+ return true;
+ } else if (writers != 0u) {
+ return true;
+ } else if (exclusively_used) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void tri_mutex::wake()
+{
+ assert(!readers && !writers && !exclusively_used);
+ type_t type = type_t::none;
+ while (!waiters.empty()) {
+ auto& waiter = waiters.front();
+ if (type == type_t::exclusive) {
+ break;
+ } if (type == type_t::none) {
+ type = waiter.type;
+ } else if (type != waiter.type) {
+ // to be woken in the next batch
+ break;
+ }
+ switch (type) {
+ case type_t::read:
+ ++readers;
+ break;
+ case type_t::write:
+ ++writers;
+ break;
+ case type_t::exclusive:
+ exclusively_used = true;
+ break;
+ default:
+ assert(0);
+ }
+ waiter.pr.set_value();
+ waiters.pop_front();
+ }
+}
diff --git a/src/crimson/common/tri_mutex.h b/src/crimson/common/tri_mutex.h
new file mode 100644
index 000000000..0533f3539
--- /dev/null
+++ b/src/crimson/common/tri_mutex.h
@@ -0,0 +1,156 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <seastar/core/circular_buffer.hh>
+
+class read_lock {
+public:
+ seastar::future<> lock();
+ void unlock();
+};
+
+class write_lock {
+public:
+ seastar::future<> lock();
+ void unlock();
+};
+
+class excl_lock {
+public:
+ seastar::future<> lock();
+ void unlock();
+};
+
+// promote from read to excl
+class excl_lock_from_read {
+public:
+ seastar::future<> lock();
+ void unlock();
+};
+
+// promote from write to excl
+class excl_lock_from_write {
+public:
+ seastar::future<> lock();
+ void unlock();
+};
+
+// promote from excl to excl
+class excl_lock_from_excl {
+public:
+ seastar::future<> lock();
+ void unlock();
+};
+
+/// shared/exclusive mutual exclusion
+///
+/// this lock design uses reader and writer is entirely and completely
+/// independent of the conventional reader/writer lock usage. Here, what we
+/// mean is that we can pipeline reads, and we can pipeline writes, but we
+/// cannot allow a read while writes are in progress or a write while reads are
+/// in progress. Any rmw operation is therefore exclusive.
+///
+/// tri_mutex is based on seastar::shared_mutex, but instead of two kinds of
+/// waiters, tri_mutex keeps track of three kinds of lock users:
+/// - readers
+/// - writers
+/// - exclusive users
+class tri_mutex : private read_lock,
+ write_lock,
+ excl_lock,
+ excl_lock_from_read,
+ excl_lock_from_write,
+ excl_lock_from_excl
+{
+public:
+ tri_mutex() = default;
+ ~tri_mutex();
+
+ read_lock& for_read() {
+ return *this;
+ }
+ write_lock& for_write() {
+ return *this;
+ }
+ excl_lock& for_excl() {
+ return *this;
+ }
+ excl_lock_from_read& excl_from_read() {
+ return *this;
+ }
+ excl_lock_from_write& excl_from_write() {
+ return *this;
+ }
+ excl_lock_from_excl& excl_from_excl() {
+ return *this;
+ }
+
+ // for shared readers
+ seastar::future<> lock_for_read();
+ bool try_lock_for_read() noexcept;
+ void unlock_for_read();
+ void promote_from_read();
+ void demote_to_read();
+ unsigned get_readers() const {
+ return readers;
+ }
+
+ // for shared writers
+ seastar::future<> lock_for_write(bool greedy);
+ bool try_lock_for_write(bool greedy) noexcept;
+ void unlock_for_write();
+ void promote_from_write();
+ void demote_to_write();
+ unsigned get_writers() const {
+ return writers;
+ }
+
+ // for exclusive users
+ seastar::future<> lock_for_excl();
+ bool try_lock_for_excl() noexcept;
+ void unlock_for_excl();
+ bool is_excl_acquired() const {
+ return exclusively_used;
+ }
+
+ bool is_acquired() const;
+
+ /// pass the provided exception to any waiting waiters
+ template<typename Exception>
+ void abort(Exception ex) {
+ while (!waiters.empty()) {
+ auto& waiter = waiters.front();
+ waiter.pr.set_exception(std::make_exception_ptr(ex));
+ waiters.pop_front();
+ }
+ }
+
+private:
+ void wake();
+ unsigned readers = 0;
+ unsigned writers = 0;
+ bool exclusively_used = false;
+ enum class type_t : uint8_t {
+ read,
+ write,
+ exclusive,
+ none,
+ };
+ struct waiter_t {
+ waiter_t(seastar::promise<>&& pr, type_t type)
+ : pr(std::move(pr)), type(type)
+ {}
+ seastar::promise<> pr;
+ type_t type;
+ };
+ seastar::circular_buffer<waiter_t> waiters;
+ friend class read_lock;
+ friend class write_lock;
+ friend class excl_lock;
+ friend class excl_lock_from_read;
+ friend class excl_lock_from_write;
+ friend class excl_lock_from_excl;
+};
diff --git a/src/crimson/common/type_helpers.h b/src/crimson/common/type_helpers.h
new file mode 100644
index 000000000..4c606581f
--- /dev/null
+++ b/src/crimson/common/type_helpers.h
@@ -0,0 +1,8 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "boost/intrusive_ptr.hpp"
+
+template<typename T> using Ref = boost::intrusive_ptr<T>;
diff --git a/src/crimson/common/utility.h b/src/crimson/common/utility.h
new file mode 100644
index 000000000..86b308155
--- /dev/null
+++ b/src/crimson/common/utility.h
@@ -0,0 +1,38 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <type_traits>
+
+namespace _impl {
+ template <class T> struct always_false : std::false_type {};
+};
+
+template <class T>
+void assert_moveable(T& t) {
+ // It's fine
+}
+template <class T>
+void assert_moveable(const T& t) {
+ static_assert(_impl::always_false<T>::value, "unable to move-out from T");
+}
+
+namespace internal {
+
+template <typename Obj, typename Method, typename ArgTuple, size_t... I>
+static auto _apply_method_to_tuple(
+ Obj &obj, Method method, ArgTuple &&tuple,
+ std::index_sequence<I...>) {
+ return (obj.*method)(std::get<I>(std::forward<ArgTuple>(tuple))...);
+}
+
+}
+
+template <typename Obj, typename Method, typename ArgTuple>
+auto apply_method_to_tuple(Obj &obj, Method method, ArgTuple &&tuple) {
+ constexpr auto tuple_size = std::tuple_size_v<ArgTuple>;
+ return internal::_apply_method_to_tuple(
+ obj, method, std::forward<ArgTuple>(tuple),
+ std::make_index_sequence<tuple_size>());
+}