From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/crimson/common/assert.cc | 81 ++ src/crimson/common/auth_handler.h | 17 + src/crimson/common/buffer_io.cc | 57 + src/crimson/common/buffer_io.h | 21 + src/crimson/common/condition_variable.h | 43 + src/crimson/common/config_proxy.cc | 93 ++ src/crimson/common/config_proxy.h | 222 ++++ src/crimson/common/errorator-loop.h | 91 ++ src/crimson/common/errorator.h | 1358 ++++++++++++++++++++ src/crimson/common/exception.h | 54 + src/crimson/common/fatal_signal.cc | 172 +++ src/crimson/common/fatal_signal.h | 21 + src/crimson/common/fixed_kv_node_layout.h | 730 +++++++++++ src/crimson/common/formatter.cc | 40 + src/crimson/common/formatter.h | 13 + src/crimson/common/gated.h | 55 + src/crimson/common/interruptible_future.h | 1600 ++++++++++++++++++++++++ src/crimson/common/layout.h | 737 +++++++++++ src/crimson/common/local_shared_foreign_ptr.h | 245 ++++ src/crimson/common/log.cc | 21 + src/crimson/common/log.h | 88 ++ src/crimson/common/logclient.cc | 364 ++++++ src/crimson/common/logclient.h | 232 ++++ src/crimson/common/operation.cc | 75 ++ src/crimson/common/operation.h | 776 ++++++++++++ src/crimson/common/perf_counters_collection.cc | 41 + src/crimson/common/perf_counters_collection.h | 49 + src/crimson/common/shared_lru.h | 180 +++ src/crimson/common/simple_lru.h | 141 +++ src/crimson/common/smp_helpers.h | 92 ++ src/crimson/common/throttle.cc | 64 + src/crimson/common/throttle.h | 43 + src/crimson/common/tmap_helpers.cc | 131 ++ src/crimson/common/tmap_helpers.h | 40 + src/crimson/common/tri_mutex.cc | 225 ++++ src/crimson/common/tri_mutex.h | 156 +++ src/crimson/common/type_helpers.h | 8 + src/crimson/common/utility.h | 38 + 38 files changed, 8414 insertions(+) create mode 100644 src/crimson/common/assert.cc create mode 100644 src/crimson/common/auth_handler.h create mode 100644 src/crimson/common/buffer_io.cc create mode 100644 src/crimson/common/buffer_io.h create mode 100644 src/crimson/common/condition_variable.h create mode 100644 src/crimson/common/config_proxy.cc create mode 100644 src/crimson/common/config_proxy.h create mode 100644 src/crimson/common/errorator-loop.h create mode 100644 src/crimson/common/errorator.h create mode 100644 src/crimson/common/exception.h create mode 100644 src/crimson/common/fatal_signal.cc create mode 100644 src/crimson/common/fatal_signal.h create mode 100644 src/crimson/common/fixed_kv_node_layout.h create mode 100644 src/crimson/common/formatter.cc create mode 100644 src/crimson/common/formatter.h create mode 100644 src/crimson/common/gated.h create mode 100644 src/crimson/common/interruptible_future.h create mode 100644 src/crimson/common/layout.h create mode 100644 src/crimson/common/local_shared_foreign_ptr.h create mode 100644 src/crimson/common/log.cc create mode 100644 src/crimson/common/log.h create mode 100644 src/crimson/common/logclient.cc create mode 100644 src/crimson/common/logclient.h create mode 100644 src/crimson/common/operation.cc create mode 100644 src/crimson/common/operation.h create mode 100644 src/crimson/common/perf_counters_collection.cc create mode 100644 src/crimson/common/perf_counters_collection.h create mode 100644 src/crimson/common/shared_lru.h create mode 100644 src/crimson/common/simple_lru.h create mode 100644 src/crimson/common/smp_helpers.h create mode 100644 src/crimson/common/throttle.cc create mode 100644 src/crimson/common/throttle.h create mode 100644 src/crimson/common/tmap_helpers.cc create mode 100644 src/crimson/common/tmap_helpers.h create mode 100644 src/crimson/common/tri_mutex.cc create mode 100644 src/crimson/common/tri_mutex.h create mode 100644 src/crimson/common/type_helpers.h create mode 100644 src/crimson/common/utility.h (limited to 'src/crimson/common') diff --git a/src/crimson/common/assert.cc b/src/crimson/common/assert.cc new file mode 100644 index 000000000..07610c33f --- /dev/null +++ b/src/crimson/common/assert.cc @@ -0,0 +1,81 @@ +#include +#include + +#include +#include + +#include "include/ceph_assert.h" + +#include "crimson/common/log.h" + +namespace ceph { + [[gnu::cold]] void __ceph_assert_fail(const ceph::assert_data &ctx) + { + __ceph_assert_fail(ctx.assertion, ctx.file, ctx.line, ctx.function); + } + + [[gnu::cold]] void __ceph_assert_fail(const char* assertion, + const char* file, int line, + const char* func) + { + seastar::logger& logger = crimson::get_logger(0); + logger.error("{}:{} : In function '{}', ceph_assert(%s)\n" + "{}", + file, line, func, assertion, + seastar::current_backtrace()); + std::cout << std::flush; + abort(); + } + [[gnu::cold]] void __ceph_assertf_fail(const char *assertion, + const char *file, int line, + const char *func, const char* msg, + ...) + { + char buf[8096]; + va_list args; + va_start(args, msg); + std::vsnprintf(buf, sizeof(buf), msg, args); + va_end(args); + + seastar::logger& logger = crimson::get_logger(0); + logger.error("{}:{} : In function '{}', ceph_assert(%s)\n" + "{}\n{}\n", + file, line, func, assertion, + buf, + seastar::current_backtrace()); + std::cout << std::flush; + abort(); + } + + [[gnu::cold]] void __ceph_abort(const char* file, int line, + const char* func, const std::string& msg) + { + seastar::logger& logger = crimson::get_logger(0); + logger.error("{}:{} : In function '{}', abort(%s)\n" + "{}", + file, line, func, msg, + seastar::current_backtrace()); + std::cout << std::flush; + abort(); + } + + [[gnu::cold]] void __ceph_abortf(const char* file, int line, + const char* func, const char* fmt, + ...) + { + char buf[8096]; + va_list args; + va_start(args, fmt); + std::vsnprintf(buf, sizeof(buf), fmt, args); + va_end(args); + + seastar::logger& logger = crimson::get_logger(0); + logger.error("{}:{} : In function '{}', abort()\n" + "{}\n{}\n", + file, line, func, + buf, + seastar::current_backtrace()); + std::cout << std::flush; + abort(); + } +} diff --git a/src/crimson/common/auth_handler.h b/src/crimson/common/auth_handler.h new file mode 100644 index 000000000..d4140b6a2 --- /dev/null +++ b/src/crimson/common/auth_handler.h @@ -0,0 +1,17 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +class EntityName; +class AuthCapsInfo; + +namespace crimson::common { +class AuthHandler { +public: + // the peer just got authorized + virtual void handle_authentication(const EntityName& name, + const AuthCapsInfo& caps) = 0; + virtual ~AuthHandler() = default; +}; +} diff --git a/src/crimson/common/buffer_io.cc b/src/crimson/common/buffer_io.cc new file mode 100644 index 000000000..86edf7a6f --- /dev/null +++ b/src/crimson/common/buffer_io.cc @@ -0,0 +1,57 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "buffer_io.h" + +#include +#include +#include + +#include "include/buffer.h" + +namespace crimson { + +seastar::future<> write_file(ceph::buffer::list&& bl, + seastar::sstring fn, + seastar::file_permissions permissions) +{ + const auto flags = (seastar::open_flags::wo | + seastar::open_flags::create | + seastar::open_flags::truncate); + seastar::file_open_options foo; + foo.create_permissions = permissions; + return seastar::open_file_dma(fn, flags, foo).then( + [bl=std::move(bl)](seastar::file f) { + return seastar::make_file_output_stream(f).then( + [bl=std::move(bl), f=std::move(f)](seastar::output_stream out) { + return seastar::do_with(std::move(out), + std::move(f), + std::move(bl), + [](seastar::output_stream& out, + seastar::file& f, + ceph::buffer::list& bl) { + return seastar::do_for_each(bl.buffers(), [&out](auto& buf) { + return out.write(buf.c_str(), buf.length()); + }).then([&out] { + return out.close(); + }); + }); + }); + }); +} + +seastar::future> +read_file(const seastar::sstring fn) +{ + return seastar::open_file_dma(fn, seastar::open_flags::ro).then( + [] (seastar::file f) { + return f.size().then([f = std::move(f)](size_t s) { + return seastar::do_with(seastar::make_file_input_stream(f), + [s](seastar::input_stream& in) { + return in.read_exactly(s); + }); + }); + }); +} + +} diff --git a/src/crimson/common/buffer_io.h b/src/crimson/common/buffer_io.h new file mode 100644 index 000000000..c5ece4a6f --- /dev/null +++ b/src/crimson/common/buffer_io.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "include/buffer_fwd.h" + +namespace crimson { + seastar::future<> write_file(ceph::buffer::list&& bl, + seastar::sstring fn, + seastar::file_permissions= // 0644 + (seastar::file_permissions::user_read | + seastar::file_permissions::user_write | + seastar::file_permissions::group_read | + seastar::file_permissions::others_read)); + seastar::future> + read_file(const seastar::sstring fn); +} diff --git a/src/crimson/common/condition_variable.h b/src/crimson/common/condition_variable.h new file mode 100644 index 000000000..19267f38a --- /dev/null +++ b/src/crimson/common/condition_variable.h @@ -0,0 +1,43 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include + +#include "crimson/common/interruptible_future.h" + +namespace crimson { + +class condition_variable : public seastar::condition_variable { +public: + template + auto wait( + Pred&& pred, + Func&& action) noexcept { + using func_result_t = std::invoke_result_t; + using intr_errorator_t = typename func_result_t::interrupt_errorator_type; + using intr_cond_t = typename func_result_t::interrupt_cond_type; + using interruptor = crimson::interruptible::interruptor; + return interruptor::repeat( + [this, pred=std::forward(pred), + action=std::forward(action)]() + -> typename intr_errorator_t::template future { + if (!pred()) { + return seastar::condition_variable::wait().then([] { + return seastar::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + }); + } else { + return action().si_then([] { + return seastar::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + }); + } + }); + } +}; + +} // namespace crimson diff --git a/src/crimson/common/config_proxy.cc b/src/crimson/common/config_proxy.cc new file mode 100644 index 000000000..88d4679d5 --- /dev/null +++ b/src/crimson/common/config_proxy.cc @@ -0,0 +1,93 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "config_proxy.h" + +#include + +#include "crimson/common/buffer_io.h" + +namespace crimson::common { + +ConfigProxy::ConfigProxy(const EntityName& name, std::string_view cluster) +{ + if (seastar::this_shard_id() != 0) { + return; + } + // set the initial value on CPU#0 + values.reset(seastar::make_lw_shared()); + values.get()->name = name; + values.get()->cluster = cluster; + // and the only copy of md_config_impl<> is allocated on CPU#0 + local_config.reset(new md_config_t{*values, obs_mgr, true}); + if (name.is_mds()) { + local_config->set_val_default(*values, obs_mgr, + "keyring", "$mds_data/keyring"); + } else if (name.is_osd()) { + local_config->set_val_default(*values, obs_mgr, + "keyring", "$osd_data/keyring"); + } +} + +seastar::future<> ConfigProxy::start() +{ + // populate values and config to all other shards + if (!values) { + return seastar::make_ready_future<>(); + } + return container().invoke_on_others([this](auto& proxy) { + return values.copy().then([config=local_config.get(), + &proxy](auto foreign_values) { + proxy.values.reset(); + proxy.values = std::move(foreign_values); + proxy.remote_config = config; + return seastar::make_ready_future<>(); + }); + }); +} + +void ConfigProxy::show_config(ceph::Formatter* f) const { + get_config().show_config(*values, f); +} + +seastar::future<> ConfigProxy::parse_config_files(const std::string& conf_files) +{ + auto conffile_paths = + get_config().get_conffile_paths(*values, + conf_files.empty() ? nullptr : conf_files.c_str(), + &std::cerr, + CODE_ENVIRONMENT_DAEMON); + return seastar::do_with(std::move(conffile_paths), [this] (auto& paths) { + return seastar::repeat([path=paths.begin(), e=paths.end(), this]() mutable { + if (path == e) { + // tried all conffile, none of them works + return seastar::make_ready_future( + seastar::stop_iteration::yes); + } + return crimson::read_file(*path++).then([this](auto&& buf) { + return do_change([buf=std::move(buf), this](ConfigValues& values) { + if (get_config().parse_buffer(values, obs_mgr, + buf.get(), buf.size(), + &std::cerr) == 0) { + get_config().update_legacy_vals(values); + } else { + throw std::invalid_argument("parse error"); + } + }).then([] { + // this one works! + return seastar::make_ready_future( + seastar::stop_iteration::yes); + }); + }).handle_exception_type([] (const std::filesystem::filesystem_error&) { + return seastar::make_ready_future( + seastar::stop_iteration::no); + }).handle_exception_type([] (const std::invalid_argument&) { + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + }); + }); +} + +ConfigProxy::ShardedConfig ConfigProxy::sharded_conf; +} diff --git a/src/crimson/common/config_proxy.h b/src/crimson/common/config_proxy.h new file mode 100644 index 000000000..4c0e65507 --- /dev/null +++ b/src/crimson/common/config_proxy.h @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include "common/config.h" +#include "common/config_obs.h" +#include "common/config_obs_mgr.h" +#include "common/errno.h" + +namespace ceph { +class Formatter; +} + +namespace crimson::common { + +// a facade for managing config. each shard has its own copy of ConfigProxy. +// +// In seastar-osd, there could be multiple instances of @c ConfigValues in a +// single process, as we are using a variant of read-copy-update mechinary to +// update the settings at runtime. +class ConfigProxy : public seastar::peering_sharded_service +{ + using LocalConfigValues = seastar::lw_shared_ptr; + seastar::foreign_ptr values; + + md_config_t* remote_config = nullptr; + std::unique_ptr local_config; + + using ConfigObserver = ceph::md_config_obs_impl; + ObserverMgr obs_mgr; + + const md_config_t& get_config() const { + return remote_config ? *remote_config : * local_config; + } + md_config_t& get_config() { + return remote_config ? *remote_config : * local_config; + } + + // apply changes to all shards + // @param func a functor which accepts @c "ConfigValues&" + template + seastar::future<> do_change(Func&& func) { + return container().invoke_on(values.get_owner_shard(), + [func = std::move(func)](ConfigProxy& owner) { + // apply the changes to a copy of the values + auto new_values = seastar::make_lw_shared(*owner.values); + new_values->changed.clear(); + func(*new_values); + + // always apply the new settings synchronously on the owner shard, to + // avoid racings with other do_change() calls in parallel. + ObserverMgr::rev_obs_map rev_obs; + owner.values.reset(new_values); + owner.obs_mgr.for_each_change(owner.values->changed, owner, + [&rev_obs](ConfigObserver *obs, + const std::string &key) { + rev_obs[obs].insert(key); + }, nullptr); + for (auto& [obs, keys] : rev_obs) { + obs->handle_conf_change(owner, keys); + } + + return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count), + [&owner, new_values] (auto cpu) { + return owner.container().invoke_on(cpu, + [foreign_values = seastar::make_foreign(new_values)](ConfigProxy& proxy) mutable { + proxy.values.reset(); + proxy.values = std::move(foreign_values); + + ObserverMgr::rev_obs_map rev_obs; + proxy.obs_mgr.for_each_change(proxy.values->changed, proxy, + [&rev_obs](ConfigObserver *obs, const std::string& key) { + rev_obs[obs].insert(key); + }, nullptr); + for (auto& obs_keys : rev_obs) { + obs_keys.first->handle_conf_change(proxy, obs_keys.second); + } + }); + }).finally([new_values] { + new_values->changed.clear(); + }); + }); + } +public: + ConfigProxy(const EntityName& name, std::string_view cluster); + const ConfigValues* operator->() const noexcept { + return values.get(); + } + const ConfigValues get_config_values() { + return *values.get(); + } + ConfigValues* operator->() noexcept { + return values.get(); + } + + void get_config_bl(uint64_t have_version, + ceph::buffer::list *bl, + uint64_t *got_version) { + get_config().get_config_bl(get_config_values(), have_version, + bl, got_version); + } + void get_defaults_bl(ceph::buffer::list *bl) { + get_config().get_defaults_bl(get_config_values(), bl); + } + seastar::future<> start(); + // required by sharded<> + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + void add_observer(ConfigObserver* obs) { + obs_mgr.add_observer(obs); + } + void remove_observer(ConfigObserver* obs) { + obs_mgr.remove_observer(obs); + } + seastar::future<> rm_val(const std::string& key) { + return do_change([key, this](ConfigValues& values) { + auto ret = get_config().rm_val(values, key); + if (ret < 0) { + throw std::invalid_argument(cpp_strerror(ret)); + } + }); + } + seastar::future<> set_val(const std::string& key, + const std::string& val) { + return do_change([key, val, this](ConfigValues& values) { + std::stringstream err; + auto ret = get_config().set_val(values, obs_mgr, key, val, &err); + if (ret < 0) { + throw std::invalid_argument(err.str()); + } + }); + } + int get_val(std::string_view key, std::string *val) const { + return get_config().get_val(*values, key, val); + } + template + const T get_val(std::string_view key) const { + return get_config().template get_val(*values, key); + } + + int get_all_sections(std::vector& sections) const { + return get_config().get_all_sections(sections); + } + + int get_val_from_conf_file(const std::vector& sections, + const std::string& key, std::string& out, + bool expand_meta) const { + return get_config().get_val_from_conf_file(*values, sections, key, + out, expand_meta); + } + + unsigned get_osd_pool_default_min_size(uint8_t size) const { + return get_config().get_osd_pool_default_min_size(*values, size); + } + + seastar::future<> + set_mon_vals(const std::map>& kv) { + return do_change([kv, this](ConfigValues& values) { + get_config().set_mon_vals(nullptr, values, obs_mgr, kv, nullptr); + }); + } + + seastar::future<> inject_args(const std::string& s) { + return do_change([s, this](ConfigValues& values) { + std::stringstream err; + if (get_config().injectargs(values, obs_mgr, s, &err)) { + throw std::invalid_argument(err.str()); + } + }); + } + void show_config(ceph::Formatter* f) const; + + seastar::future<> parse_argv(std::vector& argv) { + // we could pass whatever is unparsed to seastar, but seastar::app_template + // is used for driving the seastar application, and + // crimson::common::ConfigProxy is not available until seastar engine is up + // and running, so we have to feed the command line args to app_template + // first, then pass them to ConfigProxy. + return do_change([&argv, this](ConfigValues& values) { + get_config().parse_argv(values, + obs_mgr, + argv, + CONF_CMDLINE); + }); + } + + seastar::future<> parse_env() { + return do_change([this](ConfigValues& values) { + get_config().parse_env(CEPH_ENTITY_TYPE_OSD, + values, + obs_mgr); + }); + } + + seastar::future<> parse_config_files(const std::string& conf_files); + + using ShardedConfig = seastar::sharded; + +private: + static ShardedConfig sharded_conf; + friend ConfigProxy& local_conf(); + friend ShardedConfig& sharded_conf(); +}; + +inline ConfigProxy& local_conf() { + return ConfigProxy::sharded_conf.local(); +} + +inline ConfigProxy::ShardedConfig& sharded_conf() { + return ConfigProxy::sharded_conf; +} + +template +const T get_conf(const std::string& key) { + return local_conf().template get_val(key); +} + +} diff --git a/src/crimson/common/errorator-loop.h b/src/crimson/common/errorator-loop.h new file mode 100644 index 000000000..bb3b7fb15 --- /dev/null +++ b/src/crimson/common/errorator-loop.h @@ -0,0 +1,91 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include + +#include "crimson/common/errorator.h" + + +namespace crimson { +template +class parallel_for_each_state final : private seastar::continuation_base<> { + using future_t = typename errorator::template future<>; + std::vector _incomplete; + seastar::promise<> _result; + std::exception_ptr _ex; +private: + void wait_for_one() noexcept { + while (!_incomplete.empty() && _incomplete.back().available()) { + if (_incomplete.back().failed()) { + _ex = _incomplete.back().get_exception(); + } + _incomplete.pop_back(); + } + if (!_incomplete.empty()) { + seastar::internal::set_callback(std::move(_incomplete.back()), + static_cast*>(this)); + _incomplete.pop_back(); + return; + } + if (__builtin_expect(bool(_ex), false)) { + _result.set_exception(std::move(_ex)); + } else { + _result.set_value(); + } + delete this; + } + virtual void run_and_dispose() noexcept override { + if (_state.failed()) { + _ex = std::move(_state).get_exception(); + } + _state = {}; + wait_for_one(); + } + task* waiting_task() noexcept override { return _result.waiting_task(); } +public: + parallel_for_each_state(size_t n) { + _incomplete.reserve(n); + } + void add_future(future_t&& f) { + _incomplete.push_back(std::move(f)); + } + future_t get_future() { + auto ret = _result.get_future(); + wait_for_one(); + return ret; + } +}; + +template +static inline typename errorator::template future<> +parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept { + parallel_for_each_state* s = nullptr; + // Process all elements, giving each future the following treatment: + // - available, not failed: do nothing + // - available, failed: collect exception in ex + // - not available: collect in s (allocating it if needed) + for (;first != last; ++first) { + auto f = seastar::futurize_invoke(std::forward(func), *first); + if (!f.available() || f.failed()) { + if (!s) { + using itraits = std::iterator_traits; + auto n = (seastar::internal::iterator_range_estimate_vector_capacity( + first, last, typename itraits::iterator_category()) + 1); + s = new parallel_for_each_state(n); + } + s->add_future(std::move(f)); + } + } + // If any futures were not available, hand off to parallel_for_each_state::start(). + // Otherwise we can return a result immediately. + if (s) { + // s->get_future() takes ownership of s (and chains it to one of the futures it contains) + // so this isn't a leak + return s->get_future(); + } + return seastar::make_ready_future<>(); +} + +} // namespace crimson diff --git a/src/crimson/common/errorator.h b/src/crimson/common/errorator.h new file mode 100644 index 000000000..c5d63d5b9 --- /dev/null +++ b/src/crimson/common/errorator.h @@ -0,0 +1,1358 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include +#include + +#include + +#include "crimson/common/utility.h" +#include "include/ceph_assert.h" + +namespace crimson::interruptible { + +template +class parallel_for_each_state; + +template +class interruptible_future_detail; + +} + +namespace crimson { + +// crimson::do_for_each_state is the mirror of seastar::do_for_each_state with FutureT +template +class do_for_each_state final : public seastar::continuation_base<> { + Iterator _begin; + Iterator _end; + AsyncAction _action; + seastar::promise<> _pr; + +public: + do_for_each_state(Iterator begin, Iterator end, AsyncAction action, + FutureT&& first_unavailable) + : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) { + seastar::internal::set_callback(std::move(first_unavailable), this); + } + virtual void run_and_dispose() noexcept override { + std::unique_ptr zis(this); + if (_state.failed()) { + _pr.set_urgent_state(std::move(_state)); + return; + } + while (_begin != _end) { + auto f = seastar::futurize_invoke(_action, *_begin); + ++_begin; + if (f.failed()) { + f._forward_to(std::move(_pr)); + return; + } + if (!f.available() || seastar::need_preempt()) { + _state = {}; + seastar::internal::set_callback(std::move(f), this); + zis.release(); + return; + } + } + _pr.set_value(); + } + task* waiting_task() noexcept override { + return _pr.waiting_task(); + } + FutureT get_future() { + return _pr.get_future(); + } +}; + +template> +inline FutureT do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) { + while (begin != end) { + auto f = seastar::futurize_invoke(action, *begin); + ++begin; + if (f.failed()) { + return f; + } + if (!f.available() || seastar::need_preempt()) { + // s will be freed by run_and_dispose() + auto* s = new crimson::do_for_each_state{ + std::move(begin), std::move(end), std::move(action), std::move(f)}; + return s->get_future(); + } + } + return seastar::make_ready_future<>(); +} + +template +inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) { + return ::crimson::do_for_each_impl(begin, end, std::move(action)); +} + +template +inline auto do_for_each(Container& c, AsyncAction action) { + return ::crimson::do_for_each(std::begin(c), std::end(c), std::move(action)); +} + +template +inline auto repeat(AsyncAction action) { + using errorator_t = + typename ::seastar::futurize_t>::errorator_type; + + while (true) { + auto f = ::seastar::futurize_invoke(action); + if (f.failed()) { + return errorator_t::template make_exception_future2<>( + f.get_exception() + ); + } else if (f.available()) { + if (auto done = f.get0()) { + return errorator_t::template make_ready_future<>(); + } + } else { + return std::move(f)._then( + [action = std::move(action)] (auto stop) mutable { + if (stop == seastar::stop_iteration::yes) { + return errorator_t::template make_ready_future<>(); + } + return ::crimson::repeat( + std::move(action)); + }); + } + } +} + +// define the interface between error types and errorator +template +class error_t { + static constexpr const std::type_info& get_exception_ptr_type_info() { + return ConcreteErrorT::exception_ptr_type_info(); + } + + decltype(auto) static from_exception_ptr(std::exception_ptr ep) { + return ConcreteErrorT::from_exception_ptr(std::move(ep)); + } + + template + friend struct errorator; + + template + friend class maybe_handle_error_t; + +protected: + std::exception_ptr to_exception_ptr() const { + const auto* concrete_error = static_cast(this); + return concrete_error->to_exception_ptr(); + } + +public: + template + static decltype(auto) handle(Func&& func) { + return ConcreteErrorT::handle(std::forward(func)); + } +}; + +// unthrowable_wrapper ensures compilation failure when somebody +// would like to `throw make_error<...>)()` instead of returning. +// returning allows for the compile-time verification of future's +// AllowedErrorsV and also avoid the burden of throwing. +template +struct unthrowable_wrapper : error_t> { + unthrowable_wrapper(const unthrowable_wrapper&) = delete; + [[nodiscard]] static const auto& make() { + static constexpr unthrowable_wrapper instance{}; + return instance; + } + + static auto exception_ptr() { + return make().to_exception_ptr(); + } + + template + static auto handle(Func&& func) { + return [ + func = std::forward(func) + ] (const unthrowable_wrapper& raw_error) mutable -> decltype(auto) { + if constexpr (std::is_invocable_v) { + // check whether the handler wants to take the raw error object which + // would be the case if it wants conditionally handle-or-pass-further. + return std::invoke(std::forward(func), + ErrorV, + std::move(raw_error)); + } else if constexpr (std::is_invocable_v) { + return std::invoke(std::forward(func), ErrorV); + } else { + return std::invoke(std::forward(func)); + } + }; + } + + struct pass_further { + decltype(auto) operator()(const unthrowable_wrapper& e) { + return e; + } + }; + + struct discard { + decltype(auto) operator()(const unthrowable_wrapper&) { + } + }; + + +private: + // can be used only to initialize the `instance` member + explicit unthrowable_wrapper() = default; + + // implement the errorable interface + struct throwable_carrier{}; + static std::exception_ptr carrier_instance; + + static constexpr const std::type_info& exception_ptr_type_info() { + return typeid(throwable_carrier); + } + auto to_exception_ptr() const { + // error codes don't need to instantiate `std::exception_ptr` each + // time as the code is actually a part of the type itself. + // `std::make_exception_ptr()` on modern enough GCCs is quite cheap + // (see the Gleb Natapov's patch eradicating throw/catch there), + // but using one instance per type boils down the overhead to just + // ref-counting. + return carrier_instance; + } + static const auto& from_exception_ptr(std::exception_ptr) { + return make(); + } + + friend class error_t>; +}; + +template +std::exception_ptr unthrowable_wrapper::carrier_instance = \ + std::make_exception_ptr< + unthrowable_wrapper::throwable_carrier>({}); + + +template +struct stateful_error_t : error_t> { + template + explicit stateful_error_t(Args&&... args) + : ep(std::make_exception_ptr(std::forward(args)...)) { + } + + template + static auto handle(Func&& func) { + return [ + func = std::forward(func) + ] (stateful_error_t&& e) mutable -> decltype(auto) { + if constexpr (std::is_invocable_v) { + return std::invoke(std::forward(func)); + } + try { + std::rethrow_exception(e.ep); + } catch (const ErrorT& obj) { + if constexpr (std::is_invocable_v) { + return std::invoke(std::forward(func), obj, e); + } else if constexpr (std::is_invocable_v) { + return std::invoke(std::forward(func), obj); + } + } + ceph_abort_msg("exception type mismatch -- impossible!"); + }; + } + +private: + std::exception_ptr ep; + + explicit stateful_error_t(std::exception_ptr ep) : ep(std::move(ep)) {} + + static constexpr const std::type_info& exception_ptr_type_info() { + return typeid(ErrorT); + } + auto to_exception_ptr() const { + return ep; + } + static stateful_error_t from_exception_ptr(std::exception_ptr ep) { + return stateful_error_t(std::move(ep)); + } + + friend class error_t>; +}; + +namespace _impl { + template struct always_false : std::false_type {}; +}; + +template +class maybe_handle_error_t { + const std::type_info& type_info; + typename FuturatorT::type result; + ErrorVisitorT errfunc; + +public: + maybe_handle_error_t(ErrorVisitorT&& errfunc, std::exception_ptr ep) + : type_info(*ep.__cxa_exception_type()), + result(FuturatorT::make_exception_future(std::move(ep))), + errfunc(std::forward(errfunc)) { + } + + template + void handle() { + static_assert(std::is_invocable::value, + "provided Error Visitor is not exhaustive"); + // In C++ throwing an exception isn't the sole way to signal + // error with it. This approach nicely fits cold, infrequent cases + // but when applied to a hot one, it will likely hurt performance. + // + // Alternative approach is to create `std::exception_ptr` on our + // own and place it in the future via `make_exception_future()`. + // When it comes to handling, the pointer can be interrogated for + // pointee's type with `__cxa_exception_type()` instead of costly + // re-throwing (via `std::rethrow_exception()`) and matching with + // `catch`. The limitation here is lack of support for hierarchies + // of exceptions. The code below checks for exact match only while + // `catch` would allow to match against a base class as well. + // However, this shouldn't be a big issue for `errorator` as Error + // Visitors are already checked for exhaustiveness at compile-time. + // + // NOTE: `__cxa_exception_type()` is an extension of the language. + // It should be available both in GCC and Clang but a fallback + // (based on `std::rethrow_exception()` and `catch`) can be made + // to handle other platforms if necessary. + if (type_info == ErrorT::error_t::get_exception_ptr_type_info()) { + // set `state::invalid` in internals of `seastar::future` to not + // call `report_failed_future()` during `operator=()`. + [[maybe_unused]] auto&& ep = std::move(result).get_exception(); + + using return_t = std::invoke_result_t; + if constexpr (std::is_assignable_v) { + result = std::invoke(std::forward(errfunc), + ErrorT::error_t::from_exception_ptr(std::move(ep))); + } else if constexpr (std::is_same_v) { + // void denotes explicit discarding + // execute for the sake a side effects. Typically this boils down + // to throwing an exception by the handler. + std::invoke(std::forward(errfunc), + ErrorT::error_t::from_exception_ptr(std::move(ep))); + } else if constexpr (seastar::Future) { + // result is seastar::future but return_t is e.g. int. If so, + // the else clause cannot be used as seastar::future lacks + // errorator_type member. + result = seastar::make_ready_future( + std::invoke(std::forward(errfunc), + ErrorT::error_t::from_exception_ptr(std::move(ep)))); + } else { + result = FuturatorT::type::errorator_type::template make_ready_future( + std::invoke(std::forward(errfunc), + ErrorT::error_t::from_exception_ptr(std::move(ep)))); + } + } + } + + auto get_result() && { + return std::move(result); + } +}; + +template +static constexpr auto composer(FuncHead&& head, FuncTail&&... tail) { + return [ + head = std::forward(head), + // perfect forwarding in lambda's closure isn't available in C++17 + // using tuple as workaround; see: https://stackoverflow.com/a/49902823 + tail = std::make_tuple(std::forward(tail)...) + ] (auto&&... args) mutable -> decltype(auto) { + if constexpr (std::is_invocable_v) { + return std::invoke(std::forward(head), + std::forward(args)...); + } else if constexpr (sizeof...(FuncTail) > 0) { + using next_composer_t = decltype(composer); + auto&& next = std::apply(composer, + std::move(tail)); + return std::invoke(std::move(next), + std::forward(args)...); + } else { + static_assert( + std::is_invocable_v || + (sizeof...(FuncTail) > 0), + "composition is not exhaustive"); + } + }; +} + +template +struct errorated_future_marker{}; + +template +class parallel_for_each_state; + +template +static inline constexpr bool is_error_v = std::is_base_of_v, T>; + +template +struct errorator; + +template +static inline typename errorator::template future<> +parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept; + +template +struct errorator { + + static_assert((... && is_error_v), + "errorator expects presence of ::is_error in all error types"); + + template + struct contains_once { + static constexpr bool value = + (0 + ... + std::is_same_v) == 1; + }; + template + struct contains_once> { + static constexpr bool value = (... && contains_once::value); + }; + template + static constexpr bool contains_once_v = contains_once::value; + + static_assert((... && contains_once_v), + "no error type in errorator can be duplicated"); + + struct ready_future_marker{}; + struct exception_future_marker{}; + +private: + // see the comment for `using future = _future` below. + template + class [[nodiscard]] _future {}; + template + class [[nodiscard]] _future<::crimson::errorated_future_marker> + : private seastar::future { + using base_t = seastar::future; + // we need the friendship for the sake of `get_exception() &&` when + // `safe_then()` is going to return an errorated future as a result of + // chaining. In contrast to `seastar::future`, errorator::future` + // has this member private. + template + friend class maybe_handle_error_t; + + // any `seastar::futurize` specialization must be able to access the base. + // see : `satisfy_with_result_of()` far below. + template + friend struct seastar::futurize; + + template + friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); + + template > + struct get_errorator { + // generic template for non-errorated things (plain types and + // vanilla seastar::future as well). + using type = errorator<>; + }; + template + struct get_errorator> { + using type = typename FutureT::errorator_type; + }; + template + using get_errorator_t = typename get_errorator::type; + + template + struct make_errorator { + // NOP. The generic template. + }; + template + struct make_errorator, + ErrorVisitorRetsHeadT, + ErrorVisitorRetsTailT...> { + private: + using step_errorator = errorator; + // add ErrorVisitorRetsHeadT only if 1) it's an error type and + // 2) isn't already included in the errorator's error set. + // It's enough to negate contains_once_v as any errorator<...> + // type is already guaranteed to be free of duplications. + using _next_errorator = std::conditional_t< + is_error_v && + !step_errorator::template contains_once_v, + typename step_errorator::template extend, + step_errorator>; + using maybe_head_ertr = get_errorator_t; + using next_errorator = + typename _next_errorator::template extend_ertr; + + public: + using type = typename make_errorator::type; + }; + // finish the recursion + template + struct make_errorator> { + using type = ::crimson::errorator; + }; + template + using make_errorator_t = typename make_errorator::type; + + using base_t::base_t; + + template + [[gnu::noinline]] + static auto _safe_then_handle_errors(Future&& future, + ErrorVisitor&& errfunc) { + maybe_handle_error_t maybe_handle_error( + std::forward(errfunc), + std::move(future).get_exception() + ); + (maybe_handle_error.template handle() , ...); + return std::move(maybe_handle_error).get_result(); + } + + protected: + using base_t::get_exception; + public: + using errorator_type = ::crimson::errorator; + using promise_type = seastar::promise; + + using base_t::available; + using base_t::failed; + // need this because of the legacy in PG::do_osd_ops(). + using base_t::handle_exception_type; + + [[gnu::always_inline]] + _future(base_t&& base) + : base_t(std::move(base)) { + } + + base_t to_base() && { + return std::move(*this); + } + + template + [[gnu::always_inline]] + _future(ready_future_marker, A&&... a) + : base_t(::seastar::make_ready_future(std::forward(a)...)) { + } + [[gnu::always_inline]] + _future(exception_future_marker, ::seastar::future_state_base&& state) noexcept + : base_t(::seastar::futurize::make_exception_future(std::move(state))) { + } + [[gnu::always_inline]] + _future(exception_future_marker, std::exception_ptr&& ep) noexcept + : base_t(::seastar::futurize::make_exception_future(std::move(ep))) { + } + + template