diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/common | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/common')
38 files changed, 8414 insertions, 0 deletions
diff --git a/src/crimson/common/assert.cc b/src/crimson/common/assert.cc new file mode 100644 index 000000000..07610c33f --- /dev/null +++ b/src/crimson/common/assert.cc @@ -0,0 +1,81 @@ +#include <cstdarg> +#include <iostream> + +#include <seastar/util/backtrace.hh> +#include <seastar/core/reactor.hh> + +#include "include/ceph_assert.h" + +#include "crimson/common/log.h" + +namespace ceph { + [[gnu::cold]] void __ceph_assert_fail(const ceph::assert_data &ctx) + { + __ceph_assert_fail(ctx.assertion, ctx.file, ctx.line, ctx.function); + } + + [[gnu::cold]] void __ceph_assert_fail(const char* assertion, + const char* file, int line, + const char* func) + { + seastar::logger& logger = crimson::get_logger(0); + logger.error("{}:{} : In function '{}', ceph_assert(%s)\n" + "{}", + file, line, func, assertion, + seastar::current_backtrace()); + std::cout << std::flush; + abort(); + } + [[gnu::cold]] void __ceph_assertf_fail(const char *assertion, + const char *file, int line, + const char *func, const char* msg, + ...) + { + char buf[8096]; + va_list args; + va_start(args, msg); + std::vsnprintf(buf, sizeof(buf), msg, args); + va_end(args); + + seastar::logger& logger = crimson::get_logger(0); + logger.error("{}:{} : In function '{}', ceph_assert(%s)\n" + "{}\n{}\n", + file, line, func, assertion, + buf, + seastar::current_backtrace()); + std::cout << std::flush; + abort(); + } + + [[gnu::cold]] void __ceph_abort(const char* file, int line, + const char* func, const std::string& msg) + { + seastar::logger& logger = crimson::get_logger(0); + logger.error("{}:{} : In function '{}', abort(%s)\n" + "{}", + file, line, func, msg, + seastar::current_backtrace()); + std::cout << std::flush; + abort(); + } + + [[gnu::cold]] void __ceph_abortf(const char* file, int line, + const char* func, const char* fmt, + ...) + { + char buf[8096]; + va_list args; + va_start(args, fmt); + std::vsnprintf(buf, sizeof(buf), fmt, args); + va_end(args); + + seastar::logger& logger = crimson::get_logger(0); + logger.error("{}:{} : In function '{}', abort()\n" + "{}\n{}\n", + file, line, func, + buf, + seastar::current_backtrace()); + std::cout << std::flush; + abort(); + } +} diff --git a/src/crimson/common/auth_handler.h b/src/crimson/common/auth_handler.h new file mode 100644 index 000000000..d4140b6a2 --- /dev/null +++ b/src/crimson/common/auth_handler.h @@ -0,0 +1,17 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +class EntityName; +class AuthCapsInfo; + +namespace crimson::common { +class AuthHandler { +public: + // the peer just got authorized + virtual void handle_authentication(const EntityName& name, + const AuthCapsInfo& caps) = 0; + virtual ~AuthHandler() = default; +}; +} diff --git a/src/crimson/common/buffer_io.cc b/src/crimson/common/buffer_io.cc new file mode 100644 index 000000000..86edf7a6f --- /dev/null +++ b/src/crimson/common/buffer_io.cc @@ -0,0 +1,57 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "buffer_io.h" + +#include <seastar/core/reactor.hh> +#include <seastar/core/fstream.hh> +#include <seastar/core/do_with.hh> + +#include "include/buffer.h" + +namespace crimson { + +seastar::future<> write_file(ceph::buffer::list&& bl, + seastar::sstring fn, + seastar::file_permissions permissions) +{ + const auto flags = (seastar::open_flags::wo | + seastar::open_flags::create | + seastar::open_flags::truncate); + seastar::file_open_options foo; + foo.create_permissions = permissions; + return seastar::open_file_dma(fn, flags, foo).then( + [bl=std::move(bl)](seastar::file f) { + return seastar::make_file_output_stream(f).then( + [bl=std::move(bl), f=std::move(f)](seastar::output_stream<char> out) { + return seastar::do_with(std::move(out), + std::move(f), + std::move(bl), + [](seastar::output_stream<char>& out, + seastar::file& f, + ceph::buffer::list& bl) { + return seastar::do_for_each(bl.buffers(), [&out](auto& buf) { + return out.write(buf.c_str(), buf.length()); + }).then([&out] { + return out.close(); + }); + }); + }); + }); +} + +seastar::future<seastar::temporary_buffer<char>> +read_file(const seastar::sstring fn) +{ + return seastar::open_file_dma(fn, seastar::open_flags::ro).then( + [] (seastar::file f) { + return f.size().then([f = std::move(f)](size_t s) { + return seastar::do_with(seastar::make_file_input_stream(f), + [s](seastar::input_stream<char>& in) { + return in.read_exactly(s); + }); + }); + }); +} + +} diff --git a/src/crimson/common/buffer_io.h b/src/crimson/common/buffer_io.h new file mode 100644 index 000000000..c5ece4a6f --- /dev/null +++ b/src/crimson/common/buffer_io.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> +#include <seastar/core/file-types.hh> + +#include "include/buffer_fwd.h" + +namespace crimson { + seastar::future<> write_file(ceph::buffer::list&& bl, + seastar::sstring fn, + seastar::file_permissions= // 0644 + (seastar::file_permissions::user_read | + seastar::file_permissions::user_write | + seastar::file_permissions::group_read | + seastar::file_permissions::others_read)); + seastar::future<seastar::temporary_buffer<char>> + read_file(const seastar::sstring fn); +} diff --git a/src/crimson/common/condition_variable.h b/src/crimson/common/condition_variable.h new file mode 100644 index 000000000..19267f38a --- /dev/null +++ b/src/crimson/common/condition_variable.h @@ -0,0 +1,43 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> +#include <seastar/core/condition-variable.hh> +#include <seastar/core/loop.hh> + +#include "crimson/common/interruptible_future.h" + +namespace crimson { + +class condition_variable : public seastar::condition_variable { +public: + template <typename Pred, typename Func> + auto wait( + Pred&& pred, + Func&& action) noexcept { + using func_result_t = std::invoke_result_t<Func>; + using intr_errorator_t = typename func_result_t::interrupt_errorator_type; + using intr_cond_t = typename func_result_t::interrupt_cond_type; + using interruptor = crimson::interruptible::interruptor<intr_cond_t>; + return interruptor::repeat( + [this, pred=std::forward<Pred>(pred), + action=std::forward<Func>(action)]() + -> typename intr_errorator_t::template future<seastar::stop_iteration> { + if (!pred()) { + return seastar::condition_variable::wait().then([] { + return seastar::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::no); + }); + } else { + return action().si_then([] { + return seastar::make_ready_future< + seastar::stop_iteration>(seastar::stop_iteration::yes); + }); + } + }); + } +}; + +} // namespace crimson diff --git a/src/crimson/common/config_proxy.cc b/src/crimson/common/config_proxy.cc new file mode 100644 index 000000000..88d4679d5 --- /dev/null +++ b/src/crimson/common/config_proxy.cc @@ -0,0 +1,93 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "config_proxy.h" + +#include <filesystem> + +#include "crimson/common/buffer_io.h" + +namespace crimson::common { + +ConfigProxy::ConfigProxy(const EntityName& name, std::string_view cluster) +{ + if (seastar::this_shard_id() != 0) { + return; + } + // set the initial value on CPU#0 + values.reset(seastar::make_lw_shared<ConfigValues>()); + values.get()->name = name; + values.get()->cluster = cluster; + // and the only copy of md_config_impl<> is allocated on CPU#0 + local_config.reset(new md_config_t{*values, obs_mgr, true}); + if (name.is_mds()) { + local_config->set_val_default(*values, obs_mgr, + "keyring", "$mds_data/keyring"); + } else if (name.is_osd()) { + local_config->set_val_default(*values, obs_mgr, + "keyring", "$osd_data/keyring"); + } +} + +seastar::future<> ConfigProxy::start() +{ + // populate values and config to all other shards + if (!values) { + return seastar::make_ready_future<>(); + } + return container().invoke_on_others([this](auto& proxy) { + return values.copy().then([config=local_config.get(), + &proxy](auto foreign_values) { + proxy.values.reset(); + proxy.values = std::move(foreign_values); + proxy.remote_config = config; + return seastar::make_ready_future<>(); + }); + }); +} + +void ConfigProxy::show_config(ceph::Formatter* f) const { + get_config().show_config(*values, f); +} + +seastar::future<> ConfigProxy::parse_config_files(const std::string& conf_files) +{ + auto conffile_paths = + get_config().get_conffile_paths(*values, + conf_files.empty() ? nullptr : conf_files.c_str(), + &std::cerr, + CODE_ENVIRONMENT_DAEMON); + return seastar::do_with(std::move(conffile_paths), [this] (auto& paths) { + return seastar::repeat([path=paths.begin(), e=paths.end(), this]() mutable { + if (path == e) { + // tried all conffile, none of them works + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::yes); + } + return crimson::read_file(*path++).then([this](auto&& buf) { + return do_change([buf=std::move(buf), this](ConfigValues& values) { + if (get_config().parse_buffer(values, obs_mgr, + buf.get(), buf.size(), + &std::cerr) == 0) { + get_config().update_legacy_vals(values); + } else { + throw std::invalid_argument("parse error"); + } + }).then([] { + // this one works! + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::yes); + }); + }).handle_exception_type([] (const std::filesystem::filesystem_error&) { + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::no); + }).handle_exception_type([] (const std::invalid_argument&) { + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::no); + }); + }); + }); +} + +ConfigProxy::ShardedConfig ConfigProxy::sharded_conf; +} diff --git a/src/crimson/common/config_proxy.h b/src/crimson/common/config_proxy.h new file mode 100644 index 000000000..4c0e65507 --- /dev/null +++ b/src/crimson/common/config_proxy.h @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/reactor.hh> +#include <seastar/core/sharded.hh> +#include "common/config.h" +#include "common/config_obs.h" +#include "common/config_obs_mgr.h" +#include "common/errno.h" + +namespace ceph { +class Formatter; +} + +namespace crimson::common { + +// a facade for managing config. each shard has its own copy of ConfigProxy. +// +// In seastar-osd, there could be multiple instances of @c ConfigValues in a +// single process, as we are using a variant of read-copy-update mechinary to +// update the settings at runtime. +class ConfigProxy : public seastar::peering_sharded_service<ConfigProxy> +{ + using LocalConfigValues = seastar::lw_shared_ptr<ConfigValues>; + seastar::foreign_ptr<LocalConfigValues> values; + + md_config_t* remote_config = nullptr; + std::unique_ptr<md_config_t> local_config; + + using ConfigObserver = ceph::md_config_obs_impl<ConfigProxy>; + ObserverMgr<ConfigObserver> obs_mgr; + + const md_config_t& get_config() const { + return remote_config ? *remote_config : * local_config; + } + md_config_t& get_config() { + return remote_config ? *remote_config : * local_config; + } + + // apply changes to all shards + // @param func a functor which accepts @c "ConfigValues&" + template<typename Func> + seastar::future<> do_change(Func&& func) { + return container().invoke_on(values.get_owner_shard(), + [func = std::move(func)](ConfigProxy& owner) { + // apply the changes to a copy of the values + auto new_values = seastar::make_lw_shared(*owner.values); + new_values->changed.clear(); + func(*new_values); + + // always apply the new settings synchronously on the owner shard, to + // avoid racings with other do_change() calls in parallel. + ObserverMgr<ConfigObserver>::rev_obs_map rev_obs; + owner.values.reset(new_values); + owner.obs_mgr.for_each_change(owner.values->changed, owner, + [&rev_obs](ConfigObserver *obs, + const std::string &key) { + rev_obs[obs].insert(key); + }, nullptr); + for (auto& [obs, keys] : rev_obs) { + obs->handle_conf_change(owner, keys); + } + + return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count), + [&owner, new_values] (auto cpu) { + return owner.container().invoke_on(cpu, + [foreign_values = seastar::make_foreign(new_values)](ConfigProxy& proxy) mutable { + proxy.values.reset(); + proxy.values = std::move(foreign_values); + + ObserverMgr<ConfigObserver>::rev_obs_map rev_obs; + proxy.obs_mgr.for_each_change(proxy.values->changed, proxy, + [&rev_obs](ConfigObserver *obs, const std::string& key) { + rev_obs[obs].insert(key); + }, nullptr); + for (auto& obs_keys : rev_obs) { + obs_keys.first->handle_conf_change(proxy, obs_keys.second); + } + }); + }).finally([new_values] { + new_values->changed.clear(); + }); + }); + } +public: + ConfigProxy(const EntityName& name, std::string_view cluster); + const ConfigValues* operator->() const noexcept { + return values.get(); + } + const ConfigValues get_config_values() { + return *values.get(); + } + ConfigValues* operator->() noexcept { + return values.get(); + } + + void get_config_bl(uint64_t have_version, + ceph::buffer::list *bl, + uint64_t *got_version) { + get_config().get_config_bl(get_config_values(), have_version, + bl, got_version); + } + void get_defaults_bl(ceph::buffer::list *bl) { + get_config().get_defaults_bl(get_config_values(), bl); + } + seastar::future<> start(); + // required by sharded<> + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + void add_observer(ConfigObserver* obs) { + obs_mgr.add_observer(obs); + } + void remove_observer(ConfigObserver* obs) { + obs_mgr.remove_observer(obs); + } + seastar::future<> rm_val(const std::string& key) { + return do_change([key, this](ConfigValues& values) { + auto ret = get_config().rm_val(values, key); + if (ret < 0) { + throw std::invalid_argument(cpp_strerror(ret)); + } + }); + } + seastar::future<> set_val(const std::string& key, + const std::string& val) { + return do_change([key, val, this](ConfigValues& values) { + std::stringstream err; + auto ret = get_config().set_val(values, obs_mgr, key, val, &err); + if (ret < 0) { + throw std::invalid_argument(err.str()); + } + }); + } + int get_val(std::string_view key, std::string *val) const { + return get_config().get_val(*values, key, val); + } + template<typename T> + const T get_val(std::string_view key) const { + return get_config().template get_val<T>(*values, key); + } + + int get_all_sections(std::vector<std::string>& sections) const { + return get_config().get_all_sections(sections); + } + + int get_val_from_conf_file(const std::vector<std::string>& sections, + const std::string& key, std::string& out, + bool expand_meta) const { + return get_config().get_val_from_conf_file(*values, sections, key, + out, expand_meta); + } + + unsigned get_osd_pool_default_min_size(uint8_t size) const { + return get_config().get_osd_pool_default_min_size(*values, size); + } + + seastar::future<> + set_mon_vals(const std::map<std::string,std::string,std::less<>>& kv) { + return do_change([kv, this](ConfigValues& values) { + get_config().set_mon_vals(nullptr, values, obs_mgr, kv, nullptr); + }); + } + + seastar::future<> inject_args(const std::string& s) { + return do_change([s, this](ConfigValues& values) { + std::stringstream err; + if (get_config().injectargs(values, obs_mgr, s, &err)) { + throw std::invalid_argument(err.str()); + } + }); + } + void show_config(ceph::Formatter* f) const; + + seastar::future<> parse_argv(std::vector<const char*>& argv) { + // we could pass whatever is unparsed to seastar, but seastar::app_template + // is used for driving the seastar application, and + // crimson::common::ConfigProxy is not available until seastar engine is up + // and running, so we have to feed the command line args to app_template + // first, then pass them to ConfigProxy. + return do_change([&argv, this](ConfigValues& values) { + get_config().parse_argv(values, + obs_mgr, + argv, + CONF_CMDLINE); + }); + } + + seastar::future<> parse_env() { + return do_change([this](ConfigValues& values) { + get_config().parse_env(CEPH_ENTITY_TYPE_OSD, + values, + obs_mgr); + }); + } + + seastar::future<> parse_config_files(const std::string& conf_files); + + using ShardedConfig = seastar::sharded<ConfigProxy>; + +private: + static ShardedConfig sharded_conf; + friend ConfigProxy& local_conf(); + friend ShardedConfig& sharded_conf(); +}; + +inline ConfigProxy& local_conf() { + return ConfigProxy::sharded_conf.local(); +} + +inline ConfigProxy::ShardedConfig& sharded_conf() { + return ConfigProxy::sharded_conf; +} + +template<typename T> +const T get_conf(const std::string& key) { + return local_conf().template get_val<T>(key); +} + +} diff --git a/src/crimson/common/errorator-loop.h b/src/crimson/common/errorator-loop.h new file mode 100644 index 000000000..bb3b7fb15 --- /dev/null +++ b/src/crimson/common/errorator-loop.h @@ -0,0 +1,91 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include <seastar/core/future.hh> + +#include "crimson/common/errorator.h" + + +namespace crimson { +template <class... AllowedErrors> +class parallel_for_each_state final : private seastar::continuation_base<> { + using future_t = typename errorator<AllowedErrors...>::template future<>; + std::vector<future_t> _incomplete; + seastar::promise<> _result; + std::exception_ptr _ex; +private: + void wait_for_one() noexcept { + while (!_incomplete.empty() && _incomplete.back().available()) { + if (_incomplete.back().failed()) { + _ex = _incomplete.back().get_exception(); + } + _incomplete.pop_back(); + } + if (!_incomplete.empty()) { + seastar::internal::set_callback(std::move(_incomplete.back()), + static_cast<continuation_base<>*>(this)); + _incomplete.pop_back(); + return; + } + if (__builtin_expect(bool(_ex), false)) { + _result.set_exception(std::move(_ex)); + } else { + _result.set_value(); + } + delete this; + } + virtual void run_and_dispose() noexcept override { + if (_state.failed()) { + _ex = std::move(_state).get_exception(); + } + _state = {}; + wait_for_one(); + } + task* waiting_task() noexcept override { return _result.waiting_task(); } +public: + parallel_for_each_state(size_t n) { + _incomplete.reserve(n); + } + void add_future(future_t&& f) { + _incomplete.push_back(std::move(f)); + } + future_t get_future() { + auto ret = _result.get_future(); + wait_for_one(); + return ret; + } +}; + +template <typename Iterator, typename Func, typename... AllowedErrors> +static inline typename errorator<AllowedErrors...>::template future<> +parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept { + parallel_for_each_state<AllowedErrors...>* s = nullptr; + // Process all elements, giving each future the following treatment: + // - available, not failed: do nothing + // - available, failed: collect exception in ex + // - not available: collect in s (allocating it if needed) + for (;first != last; ++first) { + auto f = seastar::futurize_invoke(std::forward<Func>(func), *first); + if (!f.available() || f.failed()) { + if (!s) { + using itraits = std::iterator_traits<Iterator>; + auto n = (seastar::internal::iterator_range_estimate_vector_capacity( + first, last, typename itraits::iterator_category()) + 1); + s = new parallel_for_each_state<AllowedErrors...>(n); + } + s->add_future(std::move(f)); + } + } + // If any futures were not available, hand off to parallel_for_each_state::start(). + // Otherwise we can return a result immediately. + if (s) { + // s->get_future() takes ownership of s (and chains it to one of the futures it contains) + // so this isn't a leak + return s->get_future(); + } + return seastar::make_ready_future<>(); +} + +} // namespace crimson diff --git a/src/crimson/common/errorator.h b/src/crimson/common/errorator.h new file mode 100644 index 000000000..c5d63d5b9 --- /dev/null +++ b/src/crimson/common/errorator.h @@ -0,0 +1,1358 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include <exception> +#include <system_error> + +#include <seastar/core/future-util.hh> + +#include "crimson/common/utility.h" +#include "include/ceph_assert.h" + +namespace crimson::interruptible { + +template <typename, typename> +class parallel_for_each_state; + +template <typename, typename> +class interruptible_future_detail; + +} + +namespace crimson { + +// crimson::do_for_each_state is the mirror of seastar::do_for_each_state with FutureT +template <typename Iterator, typename AsyncAction, typename FutureT> +class do_for_each_state final : public seastar::continuation_base<> { + Iterator _begin; + Iterator _end; + AsyncAction _action; + seastar::promise<> _pr; + +public: + do_for_each_state(Iterator begin, Iterator end, AsyncAction action, + FutureT&& first_unavailable) + : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) { + seastar::internal::set_callback(std::move(first_unavailable), this); + } + virtual void run_and_dispose() noexcept override { + std::unique_ptr<do_for_each_state> zis(this); + if (_state.failed()) { + _pr.set_urgent_state(std::move(_state)); + return; + } + while (_begin != _end) { + auto f = seastar::futurize_invoke(_action, *_begin); + ++_begin; + if (f.failed()) { + f._forward_to(std::move(_pr)); + return; + } + if (!f.available() || seastar::need_preempt()) { + _state = {}; + seastar::internal::set_callback(std::move(f), this); + zis.release(); + return; + } + } + _pr.set_value(); + } + task* waiting_task() noexcept override { + return _pr.waiting_task(); + } + FutureT get_future() { + return _pr.get_future(); + } +}; + +template<typename Iterator, typename AsyncAction, + typename FutureT = std::invoke_result_t<AsyncAction, typename Iterator::reference>> +inline FutureT do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) { + while (begin != end) { + auto f = seastar::futurize_invoke(action, *begin); + ++begin; + if (f.failed()) { + return f; + } + if (!f.available() || seastar::need_preempt()) { + // s will be freed by run_and_dispose() + auto* s = new crimson::do_for_each_state<Iterator, AsyncAction, FutureT>{ + std::move(begin), std::move(end), std::move(action), std::move(f)}; + return s->get_future(); + } + } + return seastar::make_ready_future<>(); +} + +template<typename Iterator, typename AsyncAction> +inline auto do_for_each(Iterator begin, Iterator end, AsyncAction action) { + return ::crimson::do_for_each_impl(begin, end, std::move(action)); +} + +template<typename Container, typename AsyncAction> +inline auto do_for_each(Container& c, AsyncAction action) { + return ::crimson::do_for_each(std::begin(c), std::end(c), std::move(action)); +} + +template<typename AsyncAction> +inline auto repeat(AsyncAction action) { + using errorator_t = + typename ::seastar::futurize_t<std::invoke_result_t<AsyncAction>>::errorator_type; + + while (true) { + auto f = ::seastar::futurize_invoke(action); + if (f.failed()) { + return errorator_t::template make_exception_future2<>( + f.get_exception() + ); + } else if (f.available()) { + if (auto done = f.get0()) { + return errorator_t::template make_ready_future<>(); + } + } else { + return std::move(f)._then( + [action = std::move(action)] (auto stop) mutable { + if (stop == seastar::stop_iteration::yes) { + return errorator_t::template make_ready_future<>(); + } + return ::crimson::repeat( + std::move(action)); + }); + } + } +} + +// define the interface between error types and errorator +template <class ConcreteErrorT> +class error_t { + static constexpr const std::type_info& get_exception_ptr_type_info() { + return ConcreteErrorT::exception_ptr_type_info(); + } + + decltype(auto) static from_exception_ptr(std::exception_ptr ep) { + return ConcreteErrorT::from_exception_ptr(std::move(ep)); + } + + template <class... AllowedErrorsT> + friend struct errorator; + + template <class ErrorVisitorT, class FuturatorT> + friend class maybe_handle_error_t; + +protected: + std::exception_ptr to_exception_ptr() const { + const auto* concrete_error = static_cast<const ConcreteErrorT*>(this); + return concrete_error->to_exception_ptr(); + } + +public: + template <class Func> + static decltype(auto) handle(Func&& func) { + return ConcreteErrorT::handle(std::forward<Func>(func)); + } +}; + +// unthrowable_wrapper ensures compilation failure when somebody +// would like to `throw make_error<...>)()` instead of returning. +// returning allows for the compile-time verification of future's +// AllowedErrorsV and also avoid the burden of throwing. +template <class ErrorT, ErrorT ErrorV> +struct unthrowable_wrapper : error_t<unthrowable_wrapper<ErrorT, ErrorV>> { + unthrowable_wrapper(const unthrowable_wrapper&) = delete; + [[nodiscard]] static const auto& make() { + static constexpr unthrowable_wrapper instance{}; + return instance; + } + + static auto exception_ptr() { + return make().to_exception_ptr(); + } + + template<class Func> + static auto handle(Func&& func) { + return [ + func = std::forward<Func>(func) + ] (const unthrowable_wrapper& raw_error) mutable -> decltype(auto) { + if constexpr (std::is_invocable_v<Func, ErrorT, decltype(raw_error)>) { + // check whether the handler wants to take the raw error object which + // would be the case if it wants conditionally handle-or-pass-further. + return std::invoke(std::forward<Func>(func), + ErrorV, + std::move(raw_error)); + } else if constexpr (std::is_invocable_v<Func, ErrorT>) { + return std::invoke(std::forward<Func>(func), ErrorV); + } else { + return std::invoke(std::forward<Func>(func)); + } + }; + } + + struct pass_further { + decltype(auto) operator()(const unthrowable_wrapper& e) { + return e; + } + }; + + struct discard { + decltype(auto) operator()(const unthrowable_wrapper&) { + } + }; + + +private: + // can be used only to initialize the `instance` member + explicit unthrowable_wrapper() = default; + + // implement the errorable interface + struct throwable_carrier{}; + static std::exception_ptr carrier_instance; + + static constexpr const std::type_info& exception_ptr_type_info() { + return typeid(throwable_carrier); + } + auto to_exception_ptr() const { + // error codes don't need to instantiate `std::exception_ptr` each + // time as the code is actually a part of the type itself. + // `std::make_exception_ptr()` on modern enough GCCs is quite cheap + // (see the Gleb Natapov's patch eradicating throw/catch there), + // but using one instance per type boils down the overhead to just + // ref-counting. + return carrier_instance; + } + static const auto& from_exception_ptr(std::exception_ptr) { + return make(); + } + + friend class error_t<unthrowable_wrapper<ErrorT, ErrorV>>; +}; + +template <class ErrorT, ErrorT ErrorV> +std::exception_ptr unthrowable_wrapper<ErrorT, ErrorV>::carrier_instance = \ + std::make_exception_ptr< + unthrowable_wrapper<ErrorT, ErrorV>::throwable_carrier>({}); + + +template <class ErrorT> +struct stateful_error_t : error_t<stateful_error_t<ErrorT>> { + template <class... Args> + explicit stateful_error_t(Args&&... args) + : ep(std::make_exception_ptr<ErrorT>(std::forward<Args>(args)...)) { + } + + template<class Func> + static auto handle(Func&& func) { + return [ + func = std::forward<Func>(func) + ] (stateful_error_t<ErrorT>&& e) mutable -> decltype(auto) { + if constexpr (std::is_invocable_v<Func>) { + return std::invoke(std::forward<Func>(func)); + } + try { + std::rethrow_exception(e.ep); + } catch (const ErrorT& obj) { + if constexpr (std::is_invocable_v<Func, decltype(obj), decltype(e)>) { + return std::invoke(std::forward<Func>(func), obj, e); + } else if constexpr (std::is_invocable_v<Func, decltype(obj)>) { + return std::invoke(std::forward<Func>(func), obj); + } + } + ceph_abort_msg("exception type mismatch -- impossible!"); + }; + } + +private: + std::exception_ptr ep; + + explicit stateful_error_t(std::exception_ptr ep) : ep(std::move(ep)) {} + + static constexpr const std::type_info& exception_ptr_type_info() { + return typeid(ErrorT); + } + auto to_exception_ptr() const { + return ep; + } + static stateful_error_t<ErrorT> from_exception_ptr(std::exception_ptr ep) { + return stateful_error_t<ErrorT>(std::move(ep)); + } + + friend class error_t<stateful_error_t<ErrorT>>; +}; + +namespace _impl { + template <class T> struct always_false : std::false_type {}; +}; + +template <class ErrorVisitorT, class FuturatorT> +class maybe_handle_error_t { + const std::type_info& type_info; + typename FuturatorT::type result; + ErrorVisitorT errfunc; + +public: + maybe_handle_error_t(ErrorVisitorT&& errfunc, std::exception_ptr ep) + : type_info(*ep.__cxa_exception_type()), + result(FuturatorT::make_exception_future(std::move(ep))), + errfunc(std::forward<ErrorVisitorT>(errfunc)) { + } + + template <class ErrorT> + void handle() { + static_assert(std::is_invocable<ErrorVisitorT, ErrorT>::value, + "provided Error Visitor is not exhaustive"); + // In C++ throwing an exception isn't the sole way to signal + // error with it. This approach nicely fits cold, infrequent cases + // but when applied to a hot one, it will likely hurt performance. + // + // Alternative approach is to create `std::exception_ptr` on our + // own and place it in the future via `make_exception_future()`. + // When it comes to handling, the pointer can be interrogated for + // pointee's type with `__cxa_exception_type()` instead of costly + // re-throwing (via `std::rethrow_exception()`) and matching with + // `catch`. The limitation here is lack of support for hierarchies + // of exceptions. The code below checks for exact match only while + // `catch` would allow to match against a base class as well. + // However, this shouldn't be a big issue for `errorator` as Error + // Visitors are already checked for exhaustiveness at compile-time. + // + // NOTE: `__cxa_exception_type()` is an extension of the language. + // It should be available both in GCC and Clang but a fallback + // (based on `std::rethrow_exception()` and `catch`) can be made + // to handle other platforms if necessary. + if (type_info == ErrorT::error_t::get_exception_ptr_type_info()) { + // set `state::invalid` in internals of `seastar::future` to not + // call `report_failed_future()` during `operator=()`. + [[maybe_unused]] auto&& ep = std::move(result).get_exception(); + + using return_t = std::invoke_result_t<ErrorVisitorT, ErrorT>; + if constexpr (std::is_assignable_v<decltype(result), return_t>) { + result = std::invoke(std::forward<ErrorVisitorT>(errfunc), + ErrorT::error_t::from_exception_ptr(std::move(ep))); + } else if constexpr (std::is_same_v<return_t, void>) { + // void denotes explicit discarding + // execute for the sake a side effects. Typically this boils down + // to throwing an exception by the handler. + std::invoke(std::forward<ErrorVisitorT>(errfunc), + ErrorT::error_t::from_exception_ptr(std::move(ep))); + } else if constexpr (seastar::Future<decltype(result)>) { + // result is seastar::future but return_t is e.g. int. If so, + // the else clause cannot be used as seastar::future lacks + // errorator_type member. + result = seastar::make_ready_future<return_t>( + std::invoke(std::forward<ErrorVisitorT>(errfunc), + ErrorT::error_t::from_exception_ptr(std::move(ep)))); + } else { + result = FuturatorT::type::errorator_type::template make_ready_future<return_t>( + std::invoke(std::forward<ErrorVisitorT>(errfunc), + ErrorT::error_t::from_exception_ptr(std::move(ep)))); + } + } + } + + auto get_result() && { + return std::move(result); + } +}; + +template <class FuncHead, class... FuncTail> +static constexpr auto composer(FuncHead&& head, FuncTail&&... tail) { + return [ + head = std::forward<FuncHead>(head), + // perfect forwarding in lambda's closure isn't available in C++17 + // using tuple as workaround; see: https://stackoverflow.com/a/49902823 + tail = std::make_tuple(std::forward<FuncTail>(tail)...) + ] (auto&&... args) mutable -> decltype(auto) { + if constexpr (std::is_invocable_v<FuncHead, decltype(args)...>) { + return std::invoke(std::forward<FuncHead>(head), + std::forward<decltype(args)>(args)...); + } else if constexpr (sizeof...(FuncTail) > 0) { + using next_composer_t = decltype(composer<FuncTail...>); + auto&& next = std::apply<next_composer_t>(composer<FuncTail...>, + std::move(tail)); + return std::invoke(std::move(next), + std::forward<decltype(args)>(args)...); + } else { + static_assert( + std::is_invocable_v<FuncHead, decltype(args)...> || + (sizeof...(FuncTail) > 0), + "composition is not exhaustive"); + } + }; +} + +template <class ValueT> +struct errorated_future_marker{}; + +template <class... AllowedErrors> +class parallel_for_each_state; + +template <class T> +static inline constexpr bool is_error_v = std::is_base_of_v<error_t<T>, T>; + +template <typename... AllowedErrors> +struct errorator; + +template <typename Iterator, typename Func, typename... AllowedErrors> +static inline typename errorator<AllowedErrors...>::template future<> +parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept; + +template <class... AllowedErrors> +struct errorator { + + static_assert((... && is_error_v<AllowedErrors>), + "errorator expects presence of ::is_error in all error types"); + + template <class ErrorT> + struct contains_once { + static constexpr bool value = + (0 + ... + std::is_same_v<ErrorT, AllowedErrors>) == 1; + }; + template <class... Errors> + struct contains_once<errorator<Errors...>> { + static constexpr bool value = (... && contains_once<Errors>::value); + }; + template <class T> + static constexpr bool contains_once_v = contains_once<T>::value; + + static_assert((... && contains_once_v<AllowedErrors>), + "no error type in errorator can be duplicated"); + + struct ready_future_marker{}; + struct exception_future_marker{}; + +private: + // see the comment for `using future = _future` below. + template <class> + class [[nodiscard]] _future {}; + template <class ValueT> + class [[nodiscard]] _future<::crimson::errorated_future_marker<ValueT>> + : private seastar::future<ValueT> { + using base_t = seastar::future<ValueT>; + // we need the friendship for the sake of `get_exception() &&` when + // `safe_then()` is going to return an errorated future as a result of + // chaining. In contrast to `seastar::future`, errorator<T...>::future` + // has this member private. + template <class ErrorVisitor, class Futurator> + friend class maybe_handle_error_t; + + // any `seastar::futurize` specialization must be able to access the base. + // see : `satisfy_with_result_of()` far below. + template <typename> + friend struct seastar::futurize; + + template <typename T1, typename T2, typename... More> + friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); + + template <class, class = std::void_t<>> + struct get_errorator { + // generic template for non-errorated things (plain types and + // vanilla seastar::future as well). + using type = errorator<>; + }; + template <class FutureT> + struct get_errorator<FutureT, + std::void_t<typename FutureT::errorator_type>> { + using type = typename FutureT::errorator_type; + }; + template <class T> + using get_errorator_t = typename get_errorator<T>::type; + + template <class ValueFuncErroratorT, class... ErrorVisitorRetsT> + struct make_errorator { + // NOP. The generic template. + }; + template <class... ValueFuncAllowedErrors, + class ErrorVisitorRetsHeadT, + class... ErrorVisitorRetsTailT> + struct make_errorator<errorator<ValueFuncAllowedErrors...>, + ErrorVisitorRetsHeadT, + ErrorVisitorRetsTailT...> { + private: + using step_errorator = errorator<ValueFuncAllowedErrors...>; + // add ErrorVisitorRetsHeadT only if 1) it's an error type and + // 2) isn't already included in the errorator's error set. + // It's enough to negate contains_once_v as any errorator<...> + // type is already guaranteed to be free of duplications. + using _next_errorator = std::conditional_t< + is_error_v<ErrorVisitorRetsHeadT> && + !step_errorator::template contains_once_v<ErrorVisitorRetsHeadT>, + typename step_errorator::template extend<ErrorVisitorRetsHeadT>, + step_errorator>; + using maybe_head_ertr = get_errorator_t<ErrorVisitorRetsHeadT>; + using next_errorator = + typename _next_errorator::template extend_ertr<maybe_head_ertr>; + + public: + using type = typename make_errorator<next_errorator, + ErrorVisitorRetsTailT...>::type; + }; + // finish the recursion + template <class... ValueFuncAllowedErrors> + struct make_errorator<errorator<ValueFuncAllowedErrors...>> { + using type = ::crimson::errorator<ValueFuncAllowedErrors...>; + }; + template <class... Args> + using make_errorator_t = typename make_errorator<Args...>::type; + + using base_t::base_t; + + template <class Futurator, class Future, class ErrorVisitor> + [[gnu::noinline]] + static auto _safe_then_handle_errors(Future&& future, + ErrorVisitor&& errfunc) { + maybe_handle_error_t<ErrorVisitor, Futurator> maybe_handle_error( + std::forward<ErrorVisitor>(errfunc), + std::move(future).get_exception() + ); + (maybe_handle_error.template handle<AllowedErrors>() , ...); + return std::move(maybe_handle_error).get_result(); + } + + protected: + using base_t::get_exception; + public: + using errorator_type = ::crimson::errorator<AllowedErrors...>; + using promise_type = seastar::promise<ValueT>; + + using base_t::available; + using base_t::failed; + // need this because of the legacy in PG::do_osd_ops(). + using base_t::handle_exception_type; + + [[gnu::always_inline]] + _future(base_t&& base) + : base_t(std::move(base)) { + } + + base_t to_base() && { + return std::move(*this); + } + + template <class... A> + [[gnu::always_inline]] + _future(ready_future_marker, A&&... a) + : base_t(::seastar::make_ready_future<ValueT>(std::forward<A>(a)...)) { + } + [[gnu::always_inline]] + _future(exception_future_marker, ::seastar::future_state_base&& state) noexcept + : base_t(::seastar::futurize<base_t>::make_exception_future(std::move(state))) { + } + [[gnu::always_inline]] + _future(exception_future_marker, std::exception_ptr&& ep) noexcept + : base_t(::seastar::futurize<base_t>::make_exception_future(std::move(ep))) { + } + + template <template <class...> class ErroratedFuture, + class = std::void_t< + typename ErroratedFuture< + ::crimson::errorated_future_marker<ValueT>>::errorator_type>> + operator ErroratedFuture<errorated_future_marker<ValueT>> () && { + using dest_errorator_t = \ + typename ErroratedFuture< + ::crimson::errorated_future_marker<ValueT>>::errorator_type; + static_assert(dest_errorator_t::template contains_once_v<errorator_type>, + "conversion is possible to more-or-eq errorated future!"); + return static_cast<base_t&&>(*this); + } + + // initialize future as failed without throwing. `make_exception_future()` + // internally uses `std::make_exception_ptr()`. cppreference.com shouldn't + // be misinterpreted when it says: + // + // "This is done as if executing the following code: + // try { + // throw e; + // } catch(...) { + // return std::current_exception(); + // }", + // + // the "as if" is absolutely crucial because modern GCCs employ optimized + // path for it. See: + // * https://gcc.gnu.org/git/?p=gcc.git;a=commit;h=cce8e59224e18858749a2324bce583bcfd160d6c, + // * https://gcc.gnu.org/ml/gcc-patches/2016-08/msg00373.html. + // + // This behavior, combined with `__cxa_exception_type()` for inspecting + // exception's type, allows for throw/catch-free handling of stateless + // exceptions (which is fine for error codes). Stateful jumbos would be + // actually a bit harder as `_M_get()` is private, and thus rethrowing is + // necessary to get to the state inside. However, it's not unthinkable to + // see another extension bringing operator*() to the exception pointer... + // + // TODO: we don't really need to `make_exception_ptr` each time. It still + // allocates memory underneath while can be replaced with single instance + // per type created on start-up. + template <class ErrorT, + class DecayedT = std::decay_t<ErrorT>, + bool IsError = is_error_v<DecayedT>, + class = std::enable_if_t<IsError>> + _future(ErrorT&& e) + : base_t( + seastar::make_exception_future<ValueT>( + errorator_type::make_exception_ptr(e))) { + static_assert(errorator_type::contains_once_v<DecayedT>, + "ErrorT is not enlisted in errorator"); + } + + template <class ValueFuncT, class ErrorVisitorT> + auto safe_then(ValueFuncT&& valfunc, ErrorVisitorT&& errfunc) { + static_assert((... && std::is_invocable_v<ErrorVisitorT, + AllowedErrors>), + "provided Error Visitor is not exhaustive"); + static_assert(std::is_void_v<ValueT> ? std::is_invocable_v<ValueFuncT> + : std::is_invocable_v<ValueFuncT, ValueT>, + "Value Func is not invocable with future's value"); + using value_func_result_t = + typename std::conditional_t<std::is_void_v<ValueT>, + std::invoke_result<ValueFuncT>, + std::invoke_result<ValueFuncT, ValueT>>::type; + // recognize whether there can be any error coming from the Value + // Function. + using value_func_errorator_t = get_errorator_t<value_func_result_t>; + // mutate the Value Function's errorator to harvest errors coming + // from the Error Visitor. Yes, it's perfectly fine to fail error + // handling at one step and delegate even broader set of issues + // to next continuation. + using return_errorator_t = make_errorator_t< + value_func_errorator_t, + std::decay_t<std::invoke_result_t<ErrorVisitorT, AllowedErrors>>...>; + // OK, now we know about all errors next continuation must take + // care about. If Visitor handled everything and the Value Func + // doesn't return any, we'll finish with errorator<>::future + // which is just vanilla seastar::future – that's it, next cont + // finally could use `.then()`! + using futurator_t = \ + typename return_errorator_t::template futurize<value_func_result_t>; + // `seastar::futurize`, used internally by `then_wrapped()`, would + // wrap any non-`seastar::future` type coming from Value Func into + // `seastar::future`. As we really don't want to end with things + // like `seastar::future<errorator::future<...>>`, we need either: + // * convert the errorated future into plain in the lambda below + // and back here or + // * specialize the `seastar::futurize<T>` to get proper kind of + // future directly from `::then_wrapped()`. + // As C++17 doesn't guarantee copy elision when non-same types are + // involved while examination of assemblies from GCC 8.1 confirmed + // extra copying, switch to the second approach has been made. + return this->then_wrapped( + [ valfunc = std::forward<ValueFuncT>(valfunc), + errfunc = std::forward<ErrorVisitorT>(errfunc) + ] (auto&& future) mutable noexcept { + if (__builtin_expect(future.failed(), false)) { + return _safe_then_handle_errors<futurator_t>( + std::move(future), std::forward<ErrorVisitorT>(errfunc)); + } else { + // NOTE: using `seastar::future::get()` here is a bit bloaty + // as the method rechecks availability of future's value and, + // if it's unavailable, does the `::do_wait()` path (yes, it + // targets `seastar::thread`). Actually this is dead code as + // `then_wrapped()` executes the lambda only when the future + // is available (which means: failed or ready). However, GCC + // hasn't optimized it out: + // + // if (__builtin_expect(future.failed(), false)) { + // ea25: 48 83 bd c8 fe ff ff cmpq $0x2,-0x138(%rbp) + // ea2c: 02 + // ea2d: 0f 87 f0 05 00 00 ja f023 <ceph::osd:: + // ... + // /// If get() is called in a \ref seastar::thread context, + // /// then it need not be available; instead, the thread will + // /// be paused until the future becomes available. + // [[gnu::always_inline]] + // std::tuple<T...> get() { + // if (!_state.available()) { + // ea3a: 0f 85 1b 05 00 00 jne ef5b <ceph::osd:: + // } + // ... + // + // I don't perceive this as huge issue. Though, it cannot be + // claimed errorator has 0 overhead on hot path. The perfect + // solution here would be mark the `::get_available_state()` + // as `protected` and use dedicated `get_value()` exactly as + // `::then()` already does. + return futurator_t::invoke(std::forward<ValueFuncT>(valfunc), + std::move(future).get()); + } + }); + } + + /** + * unsafe_thread_get + * + * Only valid within a seastar_thread. Ignores errorator protections + * and throws any contained exceptions. + * + * Should really only be used within test code + * (see test/crimson/gtest_seastar.h). + */ + auto &&unsafe_get() { + return seastar::future<ValueT>::get(); + } + auto unsafe_get0() { + return seastar::future<ValueT>::get0(); + } + + template <class FuncT> + _future finally(FuncT &&func) { + return this->then_wrapped( + [func = std::forward<FuncT>(func)](auto &&result) mutable noexcept { + if constexpr (seastar::InvokeReturnsAnyFuture<FuncT>) { + return ::seastar::futurize_invoke(std::forward<FuncT>(func)).then_wrapped( + [result = std::move(result)](auto&& f_res) mutable { + // TODO: f_res.failed() + (void)f_res.discard_result(); + return std::move(result); + }); + } else { + try { + func(); + } catch (...) { + // TODO: rethrow + } + return std::move(result); + } + }); + } + + _future<::crimson::errorated_future_marker<void>> + discard_result() noexcept { + return safe_then([](auto&&) {}); + } + + // taking ErrorFuncOne and ErrorFuncTwo separately from ErrorFuncTail + // to avoid SFINAE + template <class ValueFunc, + class ErrorFuncHead, + class... ErrorFuncTail> + auto safe_then(ValueFunc&& value_func, + ErrorFuncHead&& error_func_head, + ErrorFuncTail&&... error_func_tail) { + static_assert(sizeof...(ErrorFuncTail) > 0); + return safe_then( + std::forward<ValueFunc>(value_func), + composer(std::forward<ErrorFuncHead>(error_func_head), + std::forward<ErrorFuncTail>(error_func_tail)...)); + } + + template <class ValueFunc> + auto safe_then(ValueFunc&& value_func) { + return safe_then(std::forward<ValueFunc>(value_func), + errorator_type::pass_further{}); + } + + template <class ValueFunc, + class... ErrorFuncs> + auto safe_then_unpack(ValueFunc&& value_func, + ErrorFuncs&&... error_funcs) { + return safe_then( + [value_func=std::move(value_func)] (ValueT&& tuple) mutable { + assert_moveable(value_func); + return std::apply(std::move(value_func), std::move(tuple)); + }, + std::forward<ErrorFuncs>(error_funcs)... + ); + } + + template <class Func> + void then(Func&&) = delete; + + template <class ErrorVisitorT> + auto handle_error(ErrorVisitorT&& errfunc) { + static_assert((... && std::is_invocable_v<ErrorVisitorT, + AllowedErrors>), + "provided Error Visitor is not exhaustive"); + using return_errorator_t = make_errorator_t< + errorator<>, + std::decay_t<std::invoke_result_t<ErrorVisitorT, AllowedErrors>>...>; + using futurator_t = \ + typename return_errorator_t::template futurize<::seastar::future<ValueT>>; + return this->then_wrapped( + [ errfunc = std::forward<ErrorVisitorT>(errfunc) + ] (auto&& future) mutable noexcept { + if (__builtin_expect(future.failed(), false)) { + return _safe_then_handle_errors<futurator_t>( + std::move(future), std::forward<ErrorVisitorT>(errfunc)); + } else { + return typename futurator_t::type{ std::move(future) }; + } + }); + } + + template <class ErrorFuncHead, + class... ErrorFuncTail> + auto handle_error(ErrorFuncHead&& error_func_head, + ErrorFuncTail&&... error_func_tail) { + static_assert(sizeof...(ErrorFuncTail) > 0); + return this->handle_error( + composer(std::forward<ErrorFuncHead>(error_func_head), + std::forward<ErrorFuncTail>(error_func_tail)...)); + } + + private: + // for ::crimson::do_for_each + template <class Func> + auto _then(Func&& func) { + return base_t::then(std::forward<Func>(func)); + } + template <class T> + auto _forward_to(T&& pr) { + return base_t::forward_to(std::forward<T>(pr)); + } + template<typename Iterator, typename AsyncAction> + friend inline auto ::crimson::do_for_each(Iterator begin, + Iterator end, + AsyncAction action); + + template <typename Iterator, typename AsyncAction, typename FutureT> + friend class ::crimson::do_for_each_state; + + template<typename AsyncAction> + friend inline auto ::crimson::repeat(AsyncAction action); + + template <typename Result> + friend class ::seastar::future; + + // let seastar::do_with_impl to up-cast us to seastar::future. + template<typename T, typename F> + friend inline auto ::seastar::internal::do_with_impl(T&& rvalue, F&& f); + template<typename T1, typename T2, typename T3_or_F, typename... More> + friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); + template<typename, typename> + friend class ::crimson::interruptible::interruptible_future_detail; + friend class ::crimson::parallel_for_each_state<AllowedErrors...>; + template <typename IC, typename FT> + friend class ::crimson::interruptible::parallel_for_each_state; + }; + + class Enabler {}; + + template <typename T> + using EnableIf = typename std::enable_if<contains_once_v<std::decay_t<T>>, Enabler>::type; + + template <typename ErrorFunc> + struct all_same_way_t { + ErrorFunc func; + all_same_way_t(ErrorFunc &&error_func) + : func(std::forward<ErrorFunc>(error_func)) {} + + template <typename ErrorT, EnableIf<ErrorT>...> + decltype(auto) operator()(ErrorT&& e) { + using decayed_t = std::decay_t<decltype(e)>; + auto&& handler = + decayed_t::error_t::handle(std::forward<ErrorFunc>(func)); + static_assert(std::is_invocable_v<decltype(handler), ErrorT>); + return std::invoke(std::move(handler), std::forward<ErrorT>(e)); + } + }; + +public: + // HACK: `errorated_future_marker` and `_future` is just a hack to + // specialize `seastar::futurize` for category of class templates: + // `future<...>` from distinct errorators. Such tricks are usually + // performed basing on SFINAE and `std::void_t` to check existence + // of a trait/member (`future<...>::errorator_type` in our case). + // Unfortunately, this technique can't be applied as the `futurize` + // lacks the optional parameter. The problem looks awfully similar + // to following SO item: https://stackoverflow.com/a/38860413. + template <class ValueT=void> + using future = _future<::crimson::errorated_future_marker<ValueT>>; + + // the visitor that forwards handling of all errors to next continuation + struct pass_further { + template <class ErrorT, EnableIf<ErrorT>...> + decltype(auto) operator()(ErrorT&& e) { + static_assert(contains_once_v<std::decay_t<ErrorT>>, + "passing further disallowed ErrorT"); + return std::forward<ErrorT>(e); + } + }; + + struct discard_all { + template <class ErrorT, EnableIf<ErrorT>...> + void operator()(ErrorT&&) { + static_assert(contains_once_v<std::decay_t<ErrorT>>, + "discarding disallowed ErrorT"); + } + }; + + template <typename T> + static future<T> make_errorator_future(seastar::future<T>&& fut) { + return std::move(fut); + } + + // assert_all{ "TODO" }; + class assert_all { + const char* const msg = nullptr; + public: + template <std::size_t N> + assert_all(const char (&msg)[N]) + : msg(msg) { + } + assert_all() = default; + + template <class ErrorT, EnableIf<ErrorT>...> + void operator()(ErrorT&&) { + static_assert(contains_once_v<std::decay_t<ErrorT>>, + "discarding disallowed ErrorT"); + if (msg) { + ceph_abort_msg(msg); + } else { + ceph_abort(); + } + } + }; + + template <class ErrorFunc> + static decltype(auto) all_same_way(ErrorFunc&& error_func) { + return all_same_way_t<ErrorFunc>{std::forward<ErrorFunc>(error_func)}; + }; + + // get a new errorator by extending current one with new errors + template <class... NewAllowedErrorsT> + using extend = errorator<AllowedErrors..., NewAllowedErrorsT...>; + + // get a new errorator by summing and deduplicating error set of + // the errorator `unify<>` is applied on with another errorator + // provided as template parameter. + template <class OtherErroratorT> + struct unify { + // 1st: generic NOP template + }; + template <class OtherAllowedErrorsHead, + class... OtherAllowedErrorsTail> + struct unify<errorator<OtherAllowedErrorsHead, + OtherAllowedErrorsTail...>> { + private: + // 2nd: specialization for errorators with non-empty error set. + // + // split error set of other errorator, passed as template param, + // into head and tail. Mix error set of this errorator with head + // of the other one only if it isn't already present in the set. + using step_errorator = std::conditional_t< + contains_once_v<OtherAllowedErrorsHead> == false, + errorator<AllowedErrors..., OtherAllowedErrorsHead>, + errorator<AllowedErrors...>>; + using rest_errorator = errorator<OtherAllowedErrorsTail...>; + + public: + using type = typename step_errorator::template unify<rest_errorator>::type; + }; + template <class... EmptyPack> + struct unify<errorator<EmptyPack...>> { + // 3rd: recursion finisher + static_assert(sizeof...(EmptyPack) == 0); + using type = errorator<AllowedErrors...>; + }; + + // get a new errorator by extending current one with another errorator + template <class E> + using extend_ertr = typename unify<E>::type; + + template <typename T=void, typename... A> + static future<T> make_ready_future(A&&... value) { + return future<T>(ready_future_marker(), std::forward<A>(value)...); + } + + template <typename T=void> + static + future<T> make_exception_future2(std::exception_ptr&& ex) noexcept { + return future<T>(exception_future_marker(), std::move(ex)); + } + template <typename T=void> + static + future<T> make_exception_future2(seastar::future_state_base&& state) noexcept { + return future<T>(exception_future_marker(), std::move(state)); + } + template <typename T=void, typename Exception> + static + future<T> make_exception_future2(Exception&& ex) noexcept { + return make_exception_future2<T>(std::make_exception_ptr(std::forward<Exception>(ex))); + } + + static auto now() { + return make_ready_future<>(); + } + + template <typename Container, typename Func> + static inline auto parallel_for_each(Container&& container, Func&& func) noexcept { + return crimson::parallel_for_each<decltype(std::begin(container)), Func, AllowedErrors...>( + std::begin(container), + std::end(container), + std::forward<Func>(func)); + } + + template <typename Iterator, typename Func> + static inline errorator<AllowedErrors...>::future<> + parallel_for_each(Iterator first, Iterator last, Func&& func) noexcept { + return crimson::parallel_for_each<Iterator, Func, AllowedErrors...>( + first, + last, + std::forward<Func>(func)); + } +private: + template <class T> + class futurize { + using vanilla_futurize = seastar::futurize<T>; + + // explicit specializations for nested type is not allowed unless both + // the member template and the enclosing template are specialized. see + // section temp.expl.spec, N4659 + template <class Stored, int Dummy = 0> + struct stored_to_future { + using type = future<Stored>; + }; + template <int Dummy> + struct stored_to_future <seastar::internal::monostate, Dummy> { + using type = future<>; + }; + + public: + using type = + typename stored_to_future<typename vanilla_futurize::value_type>::type; + + template <class Func, class... Args> + static type invoke(Func&& func, Args&&... args) { + try { + return vanilla_futurize::invoke(std::forward<Func>(func), + std::forward<Args>(args)...); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <class Func> + static type invoke(Func&& func, seastar::internal::monostate) { + try { + return vanilla_futurize::invoke(std::forward<Func>(func)); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Arg> + static type make_exception_future(Arg&& arg) { + return vanilla_futurize::make_exception_future(std::forward<Arg>(arg)); + } + }; + template <template <class...> class ErroratedFutureT, + class ValueT> + class futurize<ErroratedFutureT<::crimson::errorated_future_marker<ValueT>>> { + public: + using type = ::crimson::errorator<AllowedErrors...>::future<ValueT>; + + template <class Func, class... Args> + static type invoke(Func&& func, Args&&... args) { + try { + return ::seastar::futurize_invoke(std::forward<Func>(func), + std::forward<Args>(args)...); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <class Func> + static type invoke(Func&& func, seastar::internal::monostate) { + try { + return ::seastar::futurize_invoke(std::forward<Func>(func)); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Arg> + static type make_exception_future(Arg&& arg) { + return ::crimson::errorator<AllowedErrors...>::make_exception_future2<ValueT>(std::forward<Arg>(arg)); + } + }; + + template <typename InterruptCond, typename FutureType> + class futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>> { + public: + using type = ::crimson::interruptible::interruptible_future_detail< + InterruptCond, typename futurize<FutureType>::type>; + + template <typename Func, typename... Args> + static type invoke(Func&& func, Args&&... args) { + try { + return ::seastar::futurize_invoke(std::forward<Func>(func), + std::forward<Args>(args)...); + } catch(...) { + return seastar::futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>>::make_exception_future( + std::current_exception()); + } + } + template <typename Func> + static type invoke(Func&& func, seastar::internal::monostate) { + try { + return ::seastar::futurize_invoke(std::forward<Func>(func)); + } catch(...) { + return seastar::futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>>::make_exception_future( + std::current_exception()); + } + } + template <typename Arg> + static type make_exception_future(Arg&& arg) { + return ::seastar::futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, FutureType>>::make_exception_future( + std::forward<Arg>(arg)); + } + }; + + template <class ErrorT> + static std::exception_ptr make_exception_ptr(ErrorT&& e) { + // calling via interface class due to encapsulation and friend relations. + return e.error_t<std::decay_t<ErrorT>>::to_exception_ptr(); + } + + // needed because of: + // * return_errorator_t::template futurize<...> in `safe_then()`, + // * conversion to `std::exception_ptr` in `future::future(ErrorT&&)`. + // the friendship with all errorators is an idea from Kefu to fix build + // issues on GCC 9. This version likely fixes some access violation bug + // we were exploiting before. + template <class...> + friend class errorator; + template<typename, typename> + friend class ::crimson::interruptible::interruptible_future_detail; +}; // class errorator, generic template + +// no errors? errorator<>::future is plain seastar::future then! +template <> +class errorator<> { +public: + template <class ValueT=void> + using future = ::seastar::futurize_t<ValueT>; + + template <class T> + using futurize = ::seastar::futurize<T>; + + // get a new errorator by extending current one with errors + template <class... NewAllowedErrors> + using extend = errorator<NewAllowedErrors...>; + + // get a new errorator by extending current one with another errorator + template <class E> + using extend_ertr = E; + + // errorator with empty error set never contains any error + template <class T> + static constexpr bool contains_once_v = false; +}; // class errorator, <> specialization + + +template <class ErroratorOne, + class ErroratorTwo, + class... FurtherErrators> +struct compound_errorator { +private: + // generic template. Empty `FurtherErrators` are handled by + // the specialization below. + static_assert(sizeof...(FurtherErrators) > 0); + using step = + typename compound_errorator<ErroratorOne, ErroratorTwo>::type; + +public: + using type = + typename compound_errorator<step, FurtherErrators...>::type; +}; +template <class ErroratorOne, + class ErroratorTwo> +struct compound_errorator<ErroratorOne, ErroratorTwo> { + // specialization for empty `FurtherErrators` arg pack + using type = + typename ErroratorOne::template unify<ErroratorTwo>::type; +}; +template <class... Args> +using compound_errorator_t = typename compound_errorator<Args...>::type; + +// this is conjunction of two nasty features: C++14's variable template +// and inline global variable of C++17. The latter is crucial to ensure +// the variable will get the same address across all translation units. +template <int ErrorV> +inline std::error_code ec = std::error_code(ErrorV, std::generic_category()); + +template <int ErrorV> +using ct_error_code = unthrowable_wrapper<const std::error_code&, ec<ErrorV>>; + +namespace ct_error { + using enoent = ct_error_code<static_cast<int>(std::errc::no_such_file_or_directory)>; + using enodata = ct_error_code<static_cast<int>(std::errc::no_message_available)>; + using invarg = ct_error_code<static_cast<int>(std::errc::invalid_argument)>; + using input_output_error = ct_error_code<static_cast<int>(std::errc::io_error)>; + using object_corrupted = ct_error_code<static_cast<int>(std::errc::illegal_byte_sequence)>; + using permission_denied = ct_error_code<static_cast<int>(std::errc::permission_denied)>; + using operation_not_supported = + ct_error_code<static_cast<int>(std::errc::operation_not_supported)>; + using not_connected = ct_error_code<static_cast<int>(std::errc::not_connected)>; + using timed_out = ct_error_code<static_cast<int>(std::errc::timed_out)>; + using erange = + ct_error_code<static_cast<int>(std::errc::result_out_of_range)>; + using ebadf = + ct_error_code<static_cast<int>(std::errc::bad_file_descriptor)>; + using enospc = + ct_error_code<static_cast<int>(std::errc::no_space_on_device)>; + using value_too_large = ct_error_code<static_cast<int>(std::errc::value_too_large)>; + using eagain = + ct_error_code<static_cast<int>(std::errc::resource_unavailable_try_again)>; + using file_too_large = + ct_error_code<static_cast<int>(std::errc::file_too_large)>; + using address_in_use = ct_error_code<static_cast<int>(std::errc::address_in_use)>; + using address_not_available = ct_error_code<static_cast<int>(std::errc::address_not_available)>; + using ecanceled = ct_error_code<static_cast<int>(std::errc::operation_canceled)>; + using einprogress = ct_error_code<static_cast<int>(std::errc::operation_in_progress)>; + using enametoolong = ct_error_code<static_cast<int>(std::errc::filename_too_long)>; + using eexist = ct_error_code<static_cast<int>(std::errc::file_exists)>; + using edquot = ct_error_code<int(122)>; + constexpr int cmp_fail_error_value = 4095; + using cmp_fail = ct_error_code<int(cmp_fail_error_value)>; + + struct pass_further_all { + template <class ErrorT> + decltype(auto) operator()(ErrorT&& e) { + return std::forward<ErrorT>(e); + } + }; + + struct discard_all { + template <class ErrorT> + void operator()(ErrorT&&) { + } + }; + + class assert_all { + const char* const msg = nullptr; + public: + template <std::size_t N> + assert_all(const char (&msg)[N]) + : msg(msg) { + } + assert_all() = default; + + template <class ErrorT> + void operator()(ErrorT&&) { + if (msg) { + ceph_abort(msg); + } else { + ceph_abort(); + } + } + }; + + template <class ErrorFunc> + static decltype(auto) all_same_way(ErrorFunc&& error_func) { + return [ + error_func = std::forward<ErrorFunc>(error_func) + ] (auto&& e) mutable -> decltype(auto) { + using decayed_t = std::decay_t<decltype(e)>; + auto&& handler = + decayed_t::error_t::handle(std::forward<ErrorFunc>(error_func)); + return std::invoke(std::move(handler), std::forward<decltype(e)>(e)); + }; + }; +} + +using stateful_errc = stateful_error_t<std::errc>; +using stateful_errint = stateful_error_t<int>; +using stateful_ec = stateful_error_t<std::error_code>; + +template <typename F> +struct is_errorated_future { + static constexpr bool value = false; +}; +template <template <class...> class ErroratedFutureT, + class ValueT> +struct is_errorated_future< + ErroratedFutureT<::crimson::errorated_future_marker<ValueT>> + > { + static constexpr bool value = true; +}; +template <typename T> +constexpr bool is_errorated_future_v = is_errorated_future<T>::value; + +} // namespace crimson + + +// open the `seastar` namespace to specialize `futurize`. This is not +// pretty for sure. I just hope it's not worse than e.g. specializing +// `hash` in the `std` namespace. The justification is copy avoidance +// in `future<...>::safe_then()`. See the comments there for details. +namespace seastar { + +// Container is a placeholder for errorator::_future<> template +template <template <class> class Container, + class Value> +struct futurize<Container<::crimson::errorated_future_marker<Value>>> { + using errorator_type = typename Container< + ::crimson::errorated_future_marker<Value>>::errorator_type; + + using type = typename errorator_type::template future<Value>; + using value_type = seastar::internal::future_stored_type_t<Value>; + + template<typename Func, typename... FuncArgs> + [[gnu::always_inline]] + static type apply(Func&& func, std::tuple<FuncArgs...>&& args) noexcept { + try { + return std::apply( + std::forward<Func>(func), + std::forward<std::tuple<FuncArgs...>>(args)); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template<typename Func, typename... FuncArgs> + [[gnu::always_inline]] + static inline type invoke(Func&& func, FuncArgs&&... args) noexcept { + try { + return func(std::forward<FuncArgs>(args)...); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <class Func> + [[gnu::always_inline]] + static type invoke(Func&& func, seastar::internal::monostate) noexcept { + try { + return func(); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Arg> + [[gnu::always_inline]] + static type make_exception_future(Arg&& arg) { + return errorator_type::template make_exception_future2<Value>(std::forward<Arg>(arg)); + } + +private: + template<typename PromiseT, typename Func> + static void satisfy_with_result_of(PromiseT&& pr, Func&& func) { + // this may use the protected variant of `seastar::future::forward_to()` + // because: + // 1. `seastar::future` established a friendship with with all + // specializations of `seastar::futurize`, including this + // one (we're in the `seastar` namespace!) WHILE + // 2. any errorated future declares now the friendship with any + // `seastar::futurize<...>`. + func().forward_to(std::move(pr)); + } + template <typename U> + friend class future; +}; + +template <template <class> class Container, + class Value> +struct continuation_base_from_future<Container<::crimson::errorated_future_marker<Value>>> { + using type = continuation_base<Value>; +}; + +} // namespace seastar diff --git a/src/crimson/common/exception.h b/src/crimson/common/exception.h new file mode 100644 index 000000000..682fef69b --- /dev/null +++ b/src/crimson/common/exception.h @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <exception> +#include <seastar/core/future.hh> +#include <seastar/core/future-util.hh> + +#include "crimson/common/log.h" +#include "crimson/common/interruptible_future.h" + +namespace crimson::common { + +class interruption : public std::exception +{}; + +class system_shutdown_exception final : public interruption{ +public: + const char* what() const noexcept final { + return "system shutting down"; + } +}; + +class actingset_changed final : public interruption { +public: + actingset_changed(bool sp) : still_primary(sp) {} + const char* what() const noexcept final { + return "acting set changed"; + } + bool is_primary() const { + return still_primary; + } +private: + const bool still_primary; +}; + +template<typename Func, typename... Args> +inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args) +{ + return seastar::futurize_invoke(std::forward<Func>(func), + std::forward<Args>(args)...) + .handle_exception([](std::exception_ptr eptr) { + if (*eptr.__cxa_exception_type() == + typeid(crimson::common::system_shutdown_exception)) { + crimson::get_logger(ceph_subsys_osd).debug( + "operation skipped, system shutdown"); + return seastar::now(); + } + std::rethrow_exception(eptr); + }); +} + +} diff --git a/src/crimson/common/fatal_signal.cc b/src/crimson/common/fatal_signal.cc new file mode 100644 index 000000000..f2983769d --- /dev/null +++ b/src/crimson/common/fatal_signal.cc @@ -0,0 +1,172 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include "fatal_signal.h" + +#include <csignal> +#include <iostream> +#include <string_view> + +#define BOOST_STACKTRACE_USE_ADDR2LINE +#include <boost/stacktrace.hpp> +#include <seastar/core/reactor.hh> + +#include "common/safe_io.h" +#include "include/scope_guard.h" + +FatalSignal::FatalSignal() +{ + install_oneshot_signals_handler<SIGSEGV, + SIGABRT, + SIGBUS, + SIGILL, + SIGFPE, + SIGXCPU, + SIGXFSZ, + SIGSYS>(); +} + +template <int... SigNums> +void FatalSignal::install_oneshot_signals_handler() +{ + (install_oneshot_signal_handler<SigNums>() , ...); +} + +static void reraise_fatal(const int signum) +{ + // use default handler to dump core + ::signal(signum, SIG_DFL); + + // normally, we won't get here. if we do, something is very weird. + if (::raise(signum)) { + std::cerr << "reraise_fatal: failed to re-raise signal " << signum + << std::endl; + } else { + std::cerr << "reraise_fatal: default handler for signal " << signum + << " didn't terminate the process?" << std::endl; + } + std::cerr << std::flush; + ::_exit(1); +} + +[[gnu::noinline]] void FatalSignal::signal_entry( + const int signum, + siginfo_t* const info, + void*) +{ + if (static std::atomic_bool handled{false}; handled.exchange(true)) { + return; + } + assert(info); + FatalSignal::signaled(signum, *info); + reraise_fatal(signum); +} + +template <int SigNum> +void FatalSignal::install_oneshot_signal_handler() +{ + struct sigaction sa; + // it's a bad idea to use a lambda here. On GCC there are `operator()` + // and `_FUN()`. Controlling their inlineability is hard (impossible?). + sa.sa_sigaction = signal_entry; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_SIGINFO | SA_RESTART | SA_NODEFER; + if constexpr (SigNum == SIGSEGV) { + sa.sa_flags |= SA_ONSTACK; + } + [[maybe_unused]] auto r = ::sigaction(SigNum, &sa, nullptr); + assert(r == 0); +} + + +[[gnu::noinline]] static void print_backtrace(std::string_view cause) { + std::cerr << cause; + if (seastar::engine_is_ready()) { + std::cerr << " on shard " << seastar::this_shard_id(); + } + // nobody wants to see things like `FatalSignal::signaled()` or + // `print_backtrace()` in our backtraces. `+ 1` is for the extra + // frame created by kernel (signal trampoline, it will take care + // about e.g. sigreturn(2) calling; see the man page). + constexpr std::size_t FRAMES_TO_SKIP = 3 + 1; + std::cerr << ".\nBacktrace:\n"; + std::cerr << boost::stacktrace::stacktrace( + FRAMES_TO_SKIP, + static_cast<std::size_t>(-1)/* max depth same as the default one */); + std::cerr << std::flush; + // TODO: dump crash related meta data to $crash_dir + // see handle_fatal_signal() +} + +static void print_segv_info(const siginfo_t& siginfo) +{ + std::cerr \ + << "Dump of siginfo:" << std::endl + << " si_signo: " << siginfo.si_signo << std::endl + << " si_errno: " << siginfo.si_errno << std::endl + << " si_code: " << siginfo.si_code << std::endl + << " si_pid: " << siginfo.si_pid << std::endl + << " si_uid: " << siginfo.si_uid << std::endl + << " si_status: " << siginfo.si_status << std::endl + << " si_utime: " << siginfo.si_utime << std::endl + << " si_stime: " << siginfo.si_stime << std::endl + << " si_int: " << siginfo.si_int << std::endl + << " si_ptr: " << siginfo.si_ptr << std::endl + << " si_overrun: " << siginfo.si_overrun << std::endl + << " si_timerid: " << siginfo.si_timerid << std::endl + << " si_addr: " << siginfo.si_addr << std::endl + << " si_band: " << siginfo.si_band << std::endl + << " si_fd: " << siginfo.si_fd << std::endl + << " si_addr_lsb: " << siginfo.si_addr_lsb << std::endl + << " si_lower: " << siginfo.si_lower << std::endl + << " si_upper: " << siginfo.si_upper << std::endl + << " si_pkey: " << siginfo.si_pkey << std::endl + << " si_call_addr: " << siginfo.si_call_addr << std::endl + << " si_syscall: " << siginfo.si_syscall << std::endl + << " si_arch: " << siginfo.si_arch << std::endl; + std::cerr << std::flush; +} + +static void print_proc_maps() +{ + const int fd = ::open("/proc/self/maps", O_RDONLY); + if (fd < 0) { + std::cerr << "can't open /proc/self/maps. procfs not mounted?" << std::endl; + return; + } + const auto fd_guard = make_scope_guard([fd] { + ::close(fd); + }); + std::cerr << "Content of /proc/self/maps:" << std::endl; + while (true) { + char chunk[4096] = {0, }; + const ssize_t r = safe_read(fd, chunk, sizeof(chunk) - 1); + if (r < 0) { + std::cerr << "error while reading /proc/self/maps: " << r << std::endl; + return; + } else { + std::cerr << chunk << std::flush; + if (r < static_cast<ssize_t>(sizeof(chunk) - 1)) { + return; // eof + } + } + } +} + +[[gnu::noinline]] void FatalSignal::signaled(const int signum, + const siginfo_t& siginfo) +{ + switch (signum) { + case SIGSEGV: + print_backtrace("Segmentation fault"); + print_segv_info(siginfo); + break; + case SIGABRT: + print_backtrace("Aborting"); + break; + default: + print_backtrace(fmt::format("Signal {}", signum)); + break; + } + print_proc_maps(); +} diff --git a/src/crimson/common/fatal_signal.h b/src/crimson/common/fatal_signal.h new file mode 100644 index 000000000..626017c93 --- /dev/null +++ b/src/crimson/common/fatal_signal.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <csignal> + +class FatalSignal { +public: + FatalSignal(); + +private: + static void signal_entry(int signum, siginfo_t* siginfo, void* p); + static void signaled(int signum, const siginfo_t& siginfo); + + template <int... SigNums> + void install_oneshot_signals_handler(); + + template <int SigNum> + void install_oneshot_signal_handler(); +}; diff --git a/src/crimson/common/fixed_kv_node_layout.h b/src/crimson/common/fixed_kv_node_layout.h new file mode 100644 index 000000000..676563594 --- /dev/null +++ b/src/crimson/common/fixed_kv_node_layout.h @@ -0,0 +1,730 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <algorithm> +#include <iostream> + +#include <boost/iterator/counting_iterator.hpp> + +#include "include/byteorder.h" + +#include "crimson/common/layout.h" + +namespace crimson::common { + +template <typename T, bool is_const> +struct maybe_const_t { +}; +template<typename T> +struct maybe_const_t<T, true> { + using type = const T*; +}; +template<typename T> +struct maybe_const_t<T, false> { + using type = T*; +}; + + +/** + * FixedKVNodeLayout + * + * Reusable implementation of a fixed size block mapping + * K -> V with internal representations KINT and VINT. + * + * Uses absl::container_internal::Layout for the actual memory layout. + * + * The primary interface exposed is centered on the iterator + * and related methods. + * + * Also included are helpers for doing splits and merges as for a btree. + */ +template < + size_t CAPACITY, + typename Meta, + typename MetaInt, + typename K, + typename KINT, + typename V, + typename VINT, + bool VALIDATE_INVARIANTS=true> +class FixedKVNodeLayout { + char *buf = nullptr; + + using L = absl::container_internal::Layout<ceph_le32, MetaInt, KINT, VINT>; + static constexpr L layout{1, 1, CAPACITY, CAPACITY}; + +public: + template <bool is_const> + struct iter_t { + friend class FixedKVNodeLayout; + using parent_t = typename maybe_const_t<FixedKVNodeLayout, is_const>::type; + + parent_t node; + uint16_t offset = 0; + + iter_t() = default; + iter_t( + parent_t parent, + uint16_t offset) : node(parent), offset(offset) {} + + iter_t(const iter_t &) noexcept = default; + iter_t(iter_t &&) noexcept = default; + template<bool is_const_ = is_const> + iter_t(const iter_t<false>& it, std::enable_if_t<is_const_, int> = 0) + : iter_t{it.node, it.offset} + {} + iter_t &operator=(const iter_t &) = default; + iter_t &operator=(iter_t &&) = default; + + // Work nicely with for loops without requiring a nested type. + using reference = iter_t&; + iter_t &operator*() { return *this; } + iter_t *operator->() { return this; } + + iter_t operator++(int) { + auto ret = *this; + ++offset; + return ret; + } + + iter_t &operator++() { + ++offset; + return *this; + } + + iter_t operator--(int) { + assert(offset > 0); + auto ret = *this; + --offset; + return ret; + } + + iter_t &operator--() { + assert(offset > 0); + --offset; + return *this; + } + + uint16_t operator-(const iter_t &rhs) const { + assert(rhs.node == node); + return offset - rhs.offset; + } + + iter_t operator+(uint16_t off) const { + return iter_t( + node, + offset + off); + } + iter_t operator-(uint16_t off) const { + return iter_t( + node, + offset - off); + } + + friend bool operator==(const iter_t &lhs, const iter_t &rhs) { + assert(lhs.node == rhs.node); + return lhs.offset == rhs.offset; + } + + friend bool operator!=(const iter_t &lhs, const iter_t &rhs) { + return !(lhs == rhs); + } + + friend bool operator==(const iter_t<is_const> &lhs, const iter_t<!is_const> &rhs) { + assert(lhs.node == rhs.node); + return lhs.offset == rhs.offset; + } + friend bool operator!=(const iter_t<is_const> &lhs, const iter_t<!is_const> &rhs) { + return !(lhs == rhs); + } + K get_key() const { + return K(node->get_key_ptr()[offset]); + } + + K get_next_key_or_max() const { + auto next = *this + 1; + if (next == node->end()) + return std::numeric_limits<K>::max(); + else + return next->get_key(); + } + + void set_val(V val) const { + static_assert(!is_const); + node->get_val_ptr()[offset] = VINT(val); + } + + V get_val() const { + return V(node->get_val_ptr()[offset]); + }; + + bool contains(K addr) const { + return (get_key() <= addr) && (get_next_key_or_max() > addr); + } + + uint16_t get_offset() const { + return offset; + } + + private: + void set_key(K _lb) const { + static_assert(!is_const); + KINT lb; + lb = _lb; + node->get_key_ptr()[offset] = lb; + } + + typename maybe_const_t<char, is_const>::type get_key_ptr() const { + return reinterpret_cast< + typename maybe_const_t<char, is_const>::type>( + node->get_key_ptr() + offset); + } + + typename maybe_const_t<char, is_const>::type get_val_ptr() const { + return reinterpret_cast< + typename maybe_const_t<char, is_const>::type>( + node->get_val_ptr() + offset); + } + }; + using const_iterator = iter_t<true>; + using iterator = iter_t<false>; + + struct delta_t { + enum class op_t : uint8_t { + INSERT, + REMOVE, + UPDATE, + } op; + KINT key; + VINT val; + + void replay(FixedKVNodeLayout &l) { + switch (op) { + case op_t::INSERT: { + l.insert(l.lower_bound(key), key, val); + break; + } + case op_t::REMOVE: { + auto iter = l.find(key); + assert(iter != l.end()); + l.remove(iter); + break; + } + case op_t::UPDATE: { + auto iter = l.find(key); + assert(iter != l.end()); + l.update(iter, val); + break; + } + default: + assert(0 == "Impossible"); + } + } + + bool operator==(const delta_t &rhs) const { + return op == rhs.op && + key == rhs.key && + val == rhs.val; + } + }; + +public: + class delta_buffer_t { + std::vector<delta_t> buffer; + public: + bool empty() const { + return buffer.empty(); + } + void insert( + const K &key, + const V &val) { + KINT k; + k = key; + buffer.push_back( + delta_t{ + delta_t::op_t::INSERT, + k, + VINT(val) + }); + } + void update( + const K &key, + const V &val) { + KINT k; + k = key; + buffer.push_back( + delta_t{ + delta_t::op_t::UPDATE, + k, + VINT(val) + }); + } + void remove(const K &key) { + KINT k; + k = key; + buffer.push_back( + delta_t{ + delta_t::op_t::REMOVE, + k, + VINT() + }); + } + void replay(FixedKVNodeLayout &node) { + for (auto &i: buffer) { + i.replay(node); + } + } + size_t get_bytes() const { + return buffer.size() * sizeof(delta_t); + } + void copy_out(char *out, size_t len) { + assert(len == get_bytes()); + ::memcpy(out, reinterpret_cast<const void *>(buffer.data()), get_bytes()); + buffer.clear(); + } + void copy_in(const char *out, size_t len) { + assert(empty()); + assert(len % sizeof(delta_t) == 0); + buffer = std::vector( + reinterpret_cast<const delta_t*>(out), + reinterpret_cast<const delta_t*>(out + len)); + } + bool operator==(const delta_buffer_t &rhs) const { + return buffer == rhs.buffer; + } + }; + + void journal_insert( + const_iterator _iter, + const K &key, + const V &val, + delta_buffer_t *recorder) { + auto iter = iterator(this, _iter.offset); + if (recorder) { + recorder->insert( + key, + val); + } + insert(iter, key, val); + } + + void journal_update( + const_iterator _iter, + const V &val, + delta_buffer_t *recorder) { + auto iter = iterator(this, _iter.offset); + if (recorder) { + recorder->update(iter->get_key(), val); + } + update(iter, val); + } + + void journal_replace( + const_iterator _iter, + const K &key, + const V &val, + delta_buffer_t *recorder) { + auto iter = iterator(this, _iter.offset); + if (recorder) { + recorder->remove(iter->get_key()); + recorder->insert(key, val); + } + replace(iter, key, val); + } + + + void journal_remove( + const_iterator _iter, + delta_buffer_t *recorder) { + auto iter = iterator(this, _iter.offset); + if (recorder) { + recorder->remove(iter->get_key()); + } + remove(iter); + } + + + FixedKVNodeLayout(char *buf) : + buf(buf) {} + + virtual ~FixedKVNodeLayout() = default; + + const_iterator begin() const { + return const_iterator( + this, + 0); + } + + const_iterator end() const { + return const_iterator( + this, + get_size()); + } + + iterator begin() { + return iterator( + this, + 0); + } + + iterator end() { + return iterator( + this, + get_size()); + } + + const_iterator iter_idx(uint16_t off) const { + return const_iterator( + this, + off); + } + + const_iterator find(K l) const { + auto ret = begin(); + for (; ret != end(); ++ret) { + if (ret->get_key() == l) + break; + } + return ret; + } + iterator find(K l) { + const auto &tref = *this; + return iterator(this, tref.find(l).offset); + } + + const_iterator lower_bound(K l) const { + auto it = std::lower_bound(boost::make_counting_iterator<uint16_t>(0), + boost::make_counting_iterator<uint16_t>(get_size()), + l, + [this](uint16_t i, K key) { + const_iterator iter(this, i); + return iter->get_key() < key; + }); + return const_iterator(this, *it); + } + + iterator lower_bound(K l) { + const auto &tref = *this; + return iterator(this, tref.lower_bound(l).offset); + } + + const_iterator upper_bound(K l) const { + auto it = std::upper_bound(boost::make_counting_iterator<uint16_t>(0), + boost::make_counting_iterator<uint16_t>(get_size()), + l, + [this](K key, uint16_t i) { + const_iterator iter(this, i); + return key < iter->get_key(); + }); + return const_iterator(this, *it); + } + + iterator upper_bound(K l) { + const auto &tref = *this; + return iterator(this, tref.upper_bound(l).offset); + } + + const_iterator get_split_pivot() const { + return iter_idx(get_size() / 2); + } + + uint16_t get_size() const { + return *layout.template Pointer<0>(buf); + } + + /** + * set_size + * + * Set size representation to match size + */ + void set_size(uint16_t size) { + *layout.template Pointer<0>(buf) = size; + } + + /** + * get_meta/set_meta + * + * Enables stashing a templated type within the layout. + * Cannot be modified after initial write as it is not represented + * in delta_t + */ + Meta get_meta() const { + MetaInt &metaint = *layout.template Pointer<1>(buf); + return Meta(metaint); + } + void set_meta(const Meta &meta) { + *layout.template Pointer<1>(buf) = MetaInt(meta); + } + + constexpr static size_t get_capacity() { + return CAPACITY; + } + + bool operator==(const FixedKVNodeLayout &rhs) const { + if (get_size() != rhs.get_size()) { + return false; + } + + auto iter = begin(); + auto iter2 = rhs.begin(); + while (iter != end()) { + if (iter->get_key() != iter2->get_key() || + iter->get_val() != iter2->get_val()) { + return false; + } + iter++; + iter2++; + } + return true; + } + + /** + * split_into + * + * Takes *this and splits its contents into left and right. + */ + K split_into( + FixedKVNodeLayout &left, + FixedKVNodeLayout &right) const { + auto piviter = get_split_pivot(); + + left.copy_from_foreign(left.begin(), begin(), piviter); + left.set_size(piviter - begin()); + + right.copy_from_foreign(right.begin(), piviter, end()); + right.set_size(end() - piviter); + + auto [lmeta, rmeta] = get_meta().split_into(piviter->get_key()); + left.set_meta(lmeta); + right.set_meta(rmeta); + + return piviter->get_key(); + } + + /** + * merge_from + * + * Takes two nodes and copies their contents into *this. + * + * precondition: left.size() + right.size() < CAPACITY + */ + void merge_from( + const FixedKVNodeLayout &left, + const FixedKVNodeLayout &right) + { + copy_from_foreign( + end(), + left.begin(), + left.end()); + set_size(left.get_size()); + copy_from_foreign( + end(), + right.begin(), + right.end()); + set_size(left.get_size() + right.get_size()); + set_meta(Meta::merge_from(left.get_meta(), right.get_meta())); + } + + /** + * balance_into_new_nodes + * + * Takes the contents of left and right and copies them into + * replacement_left and replacement_right such that in the + * event that the number of elements is odd the extra goes to + * the left side iff prefer_left. + */ + static K balance_into_new_nodes( + const FixedKVNodeLayout &left, + const FixedKVNodeLayout &right, + bool prefer_left, + FixedKVNodeLayout &replacement_left, + FixedKVNodeLayout &replacement_right) + { + auto total = left.get_size() + right.get_size(); + auto pivot_idx = (left.get_size() + right.get_size()) / 2; + if (total % 2 && prefer_left) { + pivot_idx++; + } + auto replacement_pivot = pivot_idx >= left.get_size() ? + right.iter_idx(pivot_idx - left.get_size())->get_key() : + left.iter_idx(pivot_idx)->get_key(); + + if (pivot_idx < left.get_size()) { + replacement_left.copy_from_foreign( + replacement_left.end(), + left.begin(), + left.iter_idx(pivot_idx)); + replacement_left.set_size(pivot_idx); + + replacement_right.copy_from_foreign( + replacement_right.end(), + left.iter_idx(pivot_idx), + left.end()); + + replacement_right.set_size(left.get_size() - pivot_idx); + replacement_right.copy_from_foreign( + replacement_right.end(), + right.begin(), + right.end()); + replacement_right.set_size(total - pivot_idx); + } else { + replacement_left.copy_from_foreign( + replacement_left.end(), + left.begin(), + left.end()); + replacement_left.set_size(left.get_size()); + + replacement_left.copy_from_foreign( + replacement_left.end(), + right.begin(), + right.iter_idx(pivot_idx - left.get_size())); + replacement_left.set_size(pivot_idx); + + replacement_right.copy_from_foreign( + replacement_right.end(), + right.iter_idx(pivot_idx - left.get_size()), + right.end()); + replacement_right.set_size(total - pivot_idx); + } + + auto [lmeta, rmeta] = Meta::rebalance( + left.get_meta(), right.get_meta(), replacement_pivot); + replacement_left.set_meta(lmeta); + replacement_right.set_meta(rmeta); + return replacement_pivot; + } + +private: + void insert( + iterator iter, + const K &key, + const V &val) { + if (VALIDATE_INVARIANTS) { + if (iter != begin()) { + assert((iter - 1)->get_key() < key); + } + if (iter != end()) { + assert(iter->get_key() > key); + } + assert(get_size() < CAPACITY); + } + copy_from_local(iter + 1, iter, end()); + iter->set_key(key); + iter->set_val(val); + set_size(get_size() + 1); + } + + void update( + iterator iter, + V val) { + assert(iter != end()); + iter->set_val(val); + } + + void replace( + iterator iter, + const K &key, + const V &val) { + assert(iter != end()); + if (VALIDATE_INVARIANTS) { + if (iter != begin()) { + assert((iter - 1)->get_key() < key); + } + if ((iter + 1) != end()) { + assert((iter + 1)->get_key() > key); + } + } + iter->set_key(key); + iter->set_val(val); + } + + void remove(iterator iter) { + assert(iter != end()); + copy_from_local(iter, iter + 1, end()); + set_size(get_size() - 1); + } + + /** + * get_key_ptr + * + * Get pointer to start of key array + */ + KINT *get_key_ptr() { + return layout.template Pointer<2>(buf); + } + const KINT *get_key_ptr() const { + return layout.template Pointer<2>(buf); + } + + /** + * get_val_ptr + * + * Get pointer to start of val array + */ + VINT *get_val_ptr() { + return layout.template Pointer<3>(buf); + } + const VINT *get_val_ptr() const { + return layout.template Pointer<3>(buf); + } + + /** + * node_resolve/unresolve_vals + * + * If the representation for values depends in some way on the + * node in which they are located, users may implement + * resolve/unresolve to enable copy_from_foreign to handle that + * transition. + */ + virtual void node_resolve_vals(iterator from, iterator to) const {} + virtual void node_unresolve_vals(iterator from, iterator to) const {} + + /** + * copy_from_foreign + * + * Copies entries from [from_src, to_src) to tgt. + * + * tgt and from_src must be from different nodes. + * from_src and to_src must be from the same node. + */ + static void copy_from_foreign( + iterator tgt, + const_iterator from_src, + const_iterator to_src) { + assert(tgt->node != from_src->node); + assert(to_src->node == from_src->node); + memcpy( + tgt->get_val_ptr(), from_src->get_val_ptr(), + to_src->get_val_ptr() - from_src->get_val_ptr()); + memcpy( + tgt->get_key_ptr(), from_src->get_key_ptr(), + to_src->get_key_ptr() - from_src->get_key_ptr()); + from_src->node->node_resolve_vals(tgt, tgt + (to_src - from_src)); + tgt->node->node_unresolve_vals(tgt, tgt + (to_src - from_src)); + } + + /** + * copy_from_local + * + * Copies entries from [from_src, to_src) to tgt. + * + * tgt, from_src, and to_src must be from the same node. + */ + static void copy_from_local( + iterator tgt, + iterator from_src, + iterator to_src) { + assert(tgt->node == from_src->node); + assert(to_src->node == from_src->node); + memmove( + tgt->get_val_ptr(), from_src->get_val_ptr(), + to_src->get_val_ptr() - from_src->get_val_ptr()); + memmove( + tgt->get_key_ptr(), from_src->get_key_ptr(), + to_src->get_key_ptr() - from_src->get_key_ptr()); + } +}; + +} diff --git a/src/crimson/common/formatter.cc b/src/crimson/common/formatter.cc new file mode 100644 index 000000000..ab371ddbf --- /dev/null +++ b/src/crimson/common/formatter.cc @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "formatter.h" + +#include <fmt/format.h> +#if FMT_VERSION >= 60000 +#include <fmt/chrono.h> +#else +#include <fmt/time.h> +#endif + + +template <> +struct fmt::formatter<seastar::lowres_system_clock::time_point> { + // ignore the format string + template <typename ParseContext> + constexpr auto parse(ParseContext &ctx) { return ctx.begin(); } + + template <typename FormatContext> + auto format(const seastar::lowres_system_clock::time_point& t, + FormatContext& ctx) { + std::time_t tt = std::chrono::duration_cast<std::chrono::seconds>( + t.time_since_epoch()).count(); + auto milliseconds = (t.time_since_epoch() % + std::chrono::seconds(1)).count(); + return fmt::format_to(ctx.out(), "{:%Y-%m-%d %H:%M:%S} {:03d}", + fmt::localtime(tt), milliseconds); + } +}; + +namespace std { + +ostream& operator<<(ostream& out, + const seastar::lowres_system_clock::time_point& t) +{ + return out << fmt::format("{}", t); +} + +} diff --git a/src/crimson/common/formatter.h b/src/crimson/common/formatter.h new file mode 100644 index 000000000..9b7be428a --- /dev/null +++ b/src/crimson/common/formatter.h @@ -0,0 +1,13 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <seastar/core/lowres_clock.hh> + +#include "common/ceph_time.h" + +namespace std { + +ostream& operator<<(ostream& out, + const seastar::lowres_system_clock::time_point& t); + +} diff --git a/src/crimson/common/gated.h b/src/crimson/common/gated.h new file mode 100644 index 000000000..559a889a3 --- /dev/null +++ b/src/crimson/common/gated.h @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/gate.hh> +#include <seastar/core/future.hh> +#include <seastar/core/future-util.hh> + +#include "crimson/common/exception.h" +#include "crimson/common/log.h" +#include "include/ceph_assert.h" + +namespace crimson::common { + +class Gated { + public: + static seastar::logger& gated_logger() { + return crimson::get_logger(ceph_subsys_osd); + } + template <typename Func, typename T> + inline void dispatch_in_background(const char* what, T& who, Func&& func) { + (void) dispatch(what, who, func); + } + template <typename Func, typename T> + inline seastar::future<> dispatch(const char* what, T& who, Func&& func) { + return seastar::with_gate(pending_dispatch, std::forward<Func>(func) + ).handle_exception([what, &who] (std::exception_ptr eptr) { + if (*eptr.__cxa_exception_type() == typeid(system_shutdown_exception)) { + gated_logger().debug( + "{}, {} skipped, system shutdown", who, what); + return; + } + try { + std::rethrow_exception(eptr); + } catch (std::exception& e) { + gated_logger().error( + "{} dispatch() {} caught exception: {}", who, what, e.what()); + } + assert(*eptr.__cxa_exception_type() + == typeid(seastar::gate_closed_exception)); + }); + } + + seastar::future<> close() { + return pending_dispatch.close(); + } + bool is_closed() const { + return pending_dispatch.is_closed(); + } + private: + seastar::gate pending_dispatch; +}; + +}// namespace crimson::common diff --git a/src/crimson/common/interruptible_future.h b/src/crimson/common/interruptible_future.h new file mode 100644 index 000000000..c0e2c346c --- /dev/null +++ b/src/crimson/common/interruptible_future.h @@ -0,0 +1,1600 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future-util.hh> +#include <seastar/core/do_with.hh> +#include <seastar/core/when_all.hh> +#include <seastar/core/thread.hh> + +#include "crimson/common/log.h" +#include "crimson/common/errorator.h" +#ifndef NDEBUG +#define INTR_FUT_DEBUG(FMT_MSG, ...) crimson::get_logger(ceph_subsys_).trace(FMT_MSG, ##__VA_ARGS__) +#else +#define INTR_FUT_DEBUG(FMT_MSG, ...) +#endif + +// The interrupt condition generally works this way: +// +// 1. It is created by call_with_interruption_impl method, and is recorded in the thread +// local global variable "::crimson::interruptible::interrupt_cond". +// 2. Any continuation that's created within the execution of the continuation +// that calls the call_with_interruption_impl method will capture the "interrupt_cond"; +// and when they starts to run, they will put that capture interruption condition +// into "::crimson::interruptible::interrupt_cond" so that further continuations +// created can also capture the interruption condition; +// 3. At the end of the continuation run, the global "interrupt_cond" will be cleared +// to prevent other continuations that are not supposed to be interrupted wrongly +// capture an interruption condition. +// With this approach, continuations capture the interrupt condition at their creation, +// restore the interrupt conditions at the beginning of their execution and clear those +// interrupt conditions at the end of their execution. So the global "interrupt_cond" +// only hold valid interrupt conditions when the corresponding continuations are actually +// running after which it gets cleared. Since continuations can't be executed simultaneously, +// different continuation chains won't be able to interfere with each other. +// +// The global "interrupt_cond" can work as a signal about whether the continuation +// is supposed to be interrupted, the reason that the global "interrupt_cond" +// exists is that there may be this scenario: +// +// Say there's some method PG::func1(), in which the continuations created may +// or may not be supposed to be interrupted in different situations. If we don't +// have a global signal, we have to add an extra parameter to every method like +// PG::func1() to indicate whether the current run should create to-be-interrupted +// continuations or not. +// +// interruptor::with_interruption() and helpers can be used by users to wrap a future in +// the interruption machinery. + +namespace crimson::os::seastore { + class TransactionConflictCondition; +} + +// GCC tries to instantiate +// seastar::lw_shared_ptr<crimson::os::seastore::TransactionConflictCondition>. +// but we *may* not have the definition of TransactionConflictCondition at this moment, +// a full specialization for lw_shared_ptr_accessors helps to bypass the default +// lw_shared_ptr_accessors implementation, where std::is_base_of<.., T> is used. +namespace seastar::internal { + template<> + struct lw_shared_ptr_accessors<::crimson::os::seastore::TransactionConflictCondition, void> + : lw_shared_ptr_accessors_no_esft<::crimson::os::seastore::TransactionConflictCondition> + {}; +} + +SEASTAR_CONCEPT( +namespace crimson::interruptible { + template<typename InterruptCond, typename FutureType> + class interruptible_future_detail; +} +namespace seastar::impl { + template <typename InterruptCond, typename FutureType, typename... Rest> + struct is_tuple_of_futures<std::tuple<crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>, Rest...>> + : is_tuple_of_futures<std::tuple<Rest...>> {}; +} +) + +namespace crimson::interruptible { + +struct ready_future_marker {}; +struct exception_future_marker {}; + +template <typename InterruptCond> +class interruptible_future_builder; + +template <typename InterruptCond> +struct interruptor; + +template <typename InterruptCond> +using InterruptCondRef = seastar::lw_shared_ptr<InterruptCond>; + +template <typename InterruptCond> +struct interrupt_cond_t { + InterruptCondRef<InterruptCond> interrupt_cond; + uint64_t ref_count = 0; + void set( + InterruptCondRef<InterruptCond>& ic) { + INTR_FUT_DEBUG( + "{}: going to set interrupt_cond: {}, ic: {}", + __func__, + (void*)interrupt_cond.get(), + (void*)ic.get()); + if (!interrupt_cond) { + interrupt_cond = ic; + } + assert(interrupt_cond.get() == ic.get()); + ref_count++; + INTR_FUT_DEBUG( + "{}: interrupt_cond: {}, ref_count: {}", + __func__, + (void*)interrupt_cond.get(), + ref_count); + } + void reset() { + if (--ref_count == 0) { + INTR_FUT_DEBUG( + "{}: clearing interrupt_cond: {},{}", + __func__, + (void*)interrupt_cond.get(), + typeid(InterruptCond).name()); + interrupt_cond.release(); + } else { + INTR_FUT_DEBUG( + "{}: end without clearing interrupt_cond: {},{}, ref_count: {}", + __func__, + (void*)interrupt_cond.get(), + typeid(InterruptCond).name(), + ref_count); + } + } +}; + +template <typename InterruptCond> +thread_local interrupt_cond_t<InterruptCond> interrupt_cond; + +extern template thread_local interrupt_cond_t<crimson::os::seastore::TransactionConflictCondition> +interrupt_cond<crimson::os::seastore::TransactionConflictCondition>; + +template <typename InterruptCond, typename FutureType> +class [[nodiscard]] interruptible_future_detail {}; + +template <typename FutureType> +struct is_interruptible_future : public std::false_type {}; + +template <typename InterruptCond, typename FutureType> +struct is_interruptible_future< + interruptible_future_detail< + InterruptCond, + FutureType>> + : public std::true_type {}; +template <typename FutureType> +concept IsInterruptibleFuture = is_interruptible_future<FutureType>::value; +template <typename Func, typename... Args> +concept InvokeReturnsInterruptibleFuture = + IsInterruptibleFuture<std::invoke_result_t<Func, Args...>>; + +namespace internal { + +template <typename InterruptCond, typename Func, typename... Args> +auto call_with_interruption_impl( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, Args&&... args) +{ + using futurator_t = seastar::futurize<std::invoke_result_t<Func, Args...>>; + // there might be a case like this: + // with_interruption([] { + // interruptor::do_for_each([] { + // ... + // return interruptible_errorated_future(); + // }).safe_then_interruptible([] { + // ... + // }); + // }) + // In this case, as crimson::do_for_each would directly do futurize_invoke + // for "call_with_interruption", we have to make sure this invocation would + // not errorly release ::crimson::interruptible::interrupt_cond<InterruptCond> + + // If there exists an interrupt condition, which means "Func" may not be + // permitted to run as a result of the interruption, test it. If it does + // need to be interrupted, return an interruption; otherwise, restore the + // global "interrupt_cond" with the interruption condition, and go ahead + // executing the Func. + assert(interrupt_condition); + auto fut = interrupt_condition->template may_interrupt< + typename futurator_t::type>(); + INTR_FUT_DEBUG( + "call_with_interruption_impl: may_interrupt: {}, " + "local interrupt_condition: {}, " + "global interrupt_cond: {},{}", + (bool)fut, + (void*)interrupt_condition.get(), + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + if (fut) { + return std::move(*fut); + } + interrupt_cond<InterruptCond>.set(interrupt_condition); + + auto fut2 = seastar::futurize_invoke( + std::forward<Func>(func), + std::forward<Args>(args)...); + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + interrupt_cond<InterruptCond>.reset(); + return fut2; +} + +} + +template <typename InterruptCond, typename Func, seastar::Future Ret> +requires (!InterruptCond::template is_interruption_v<Ret>) +auto call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, Ret&& fut) +{ + using Result = std::invoke_result_t<Func, Ret>; + // if "T" is already an interrupt exception, return it directly; + // otherwise, upper layer application may encounter errors executing + // the "Func" body. + if (fut.failed()) { + std::exception_ptr eptr = fut.get_exception(); + if (interrupt_condition->is_interruption(eptr)) { + return seastar::futurize<Result>::make_exception_future(std::move(eptr)); + } + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward<Func>(func), + seastar::futurize<Ret>::make_exception_future( + std::move(eptr))); + } + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward<Func>(func), + std::move(fut)); +} + +template <typename InterruptCond, typename Func, typename T> +requires (InterruptCond::template is_interruption_v<T>) +auto call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, T&& arg) +{ + using Result = std::invoke_result_t<Func, T>; + // if "T" is already an interrupt exception, return it directly; + // otherwise, upper layer application may encounter errors executing + // the "Func" body. + return seastar::futurize<Result>::make_exception_future( + std::get<0>(std::tuple(std::forward<T>(arg)))); +} + +template <typename InterruptCond, typename Func, typename T> +requires (!InterruptCond::template is_interruption_v<T>) && (!seastar::Future<T>) +auto call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, T&& arg) +{ + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward<Func>(func), + std::forward<T>(arg)); +} + +template <typename InterruptCond, typename Func, + typename Result = std::invoke_result_t<Func>> +auto call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func) +{ + return internal::call_with_interruption_impl( + interrupt_condition, + std::forward<Func>(func)); +} + +template <typename InterruptCond, typename Func, typename... T, + typename Result = std::invoke_result_t<Func, T...>> +Result non_futurized_call_with_interruption( + InterruptCondRef<InterruptCond> interrupt_condition, + Func&& func, T&&... args) +{ + assert(interrupt_condition); + auto fut = interrupt_condition->template may_interrupt<seastar::future<>>(); + INTR_FUT_DEBUG( + "non_futurized_call_with_interruption may_interrupt: {}, " + "interrupt_condition: {}, interrupt_cond: {},{}", + (bool)fut, + (void*)interrupt_condition.get(), + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + if (fut) { + std::rethrow_exception(fut->get_exception()); + } + interrupt_cond<InterruptCond>.set(interrupt_condition); + try { + if constexpr (std::is_void_v<Result>) { + std::invoke(std::forward<Func>(func), std::forward<T>(args)...); + + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + interrupt_cond<InterruptCond>.reset(); + return; + } else { + auto&& err = std::invoke(std::forward<Func>(func), std::forward<T>(args)...); + interrupt_cond<InterruptCond>.reset(); + return std::forward<Result>(err); + } + } catch (std::exception& e) { + // Clear the global "interrupt_cond" to prevent it from interfering other + // continuation chains. + interrupt_cond<InterruptCond>.reset(); + throw e; + } +} + +template <typename InterruptCond, typename Errorator> +struct interruptible_errorator; + +template <typename T> +struct parallel_for_each_ret { + static_assert(seastar::Future<T>); + using type = seastar::future<>; +}; + +template <template <typename...> typename ErroratedFuture, typename T> +struct parallel_for_each_ret< + ErroratedFuture< + ::crimson::errorated_future_marker<T>>> { + using type = ErroratedFuture<::crimson::errorated_future_marker<void>>; +}; + +template <typename InterruptCond, typename FutureType> +class parallel_for_each_state final : private seastar::continuation_base<> { + using elem_ret_t = std::conditional_t< + IsInterruptibleFuture<FutureType>, + typename FutureType::core_type, + FutureType>; + using future_t = interruptible_future_detail< + InterruptCond, + typename parallel_for_each_ret<elem_ret_t>::type>; + std::vector<future_t> _incomplete; + seastar::promise<> _result; + std::exception_ptr _ex; +private: + void wait_for_one() noexcept { + while (!_incomplete.empty() && _incomplete.back().available()) { + if (_incomplete.back().failed()) { + _ex = _incomplete.back().get_exception(); + } + _incomplete.pop_back(); + } + if (!_incomplete.empty()) { + seastar::internal::set_callback(std::move(_incomplete.back()), + static_cast<continuation_base<>*>(this)); + _incomplete.pop_back(); + return; + } + if (__builtin_expect(bool(_ex), false)) { + _result.set_exception(std::move(_ex)); + } else { + _result.set_value(); + } + delete this; + } + virtual void run_and_dispose() noexcept override { + if (_state.failed()) { + _ex = std::move(_state).get_exception(); + } + _state = {}; + wait_for_one(); + } + task* waiting_task() noexcept override { return _result.waiting_task(); } +public: + parallel_for_each_state(size_t n) { + _incomplete.reserve(n); + } + void add_future(future_t&& f) { + _incomplete.push_back(std::move(f)); + } + future_t get_future() { + auto ret = _result.get_future(); + wait_for_one(); + return ret; + } + static future_t now() { + return seastar::now(); + } +}; + +template <typename InterruptCond, typename T> +class [[nodiscard]] interruptible_future_detail<InterruptCond, seastar::future<T>> + : private seastar::future<T> { +public: + using core_type = seastar::future<T>; + template <typename U> + using interrupt_futurize_t = + typename interruptor<InterruptCond>::template futurize_t<U>; + using core_type::get0; + using core_type::core_type; + using core_type::get_exception; + using core_type::ignore_ready_future; + + [[gnu::always_inline]] + interruptible_future_detail(seastar::future<T>&& base) + : core_type(std::move(base)) + {} + + using value_type = typename seastar::future<T>::value_type; + using tuple_type = typename seastar::future<T>::tuple_type; + + [[gnu::always_inline]] + value_type&& get() { + if (core_type::available()) { + return core_type::get(); + } else { + // destined to wait! + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "interruptible_future_detail::get() waiting, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + auto&& value = core_type::get(); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "interruptible_future_detail::get() got, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + return std::move(value); + } + } + + using core_type::available; + using core_type::failed; + + template <typename Func, + typename Result = interrupt_futurize_t< + std::invoke_result_t<Func, seastar::future<T>>>> + [[gnu::always_inline]] + Result then_wrapped_interruptible(Func&& func) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + return core_type::then_wrapped( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable { + return call_with_interruption( + std::move(interrupt_condition), + std::forward<Func>(func), + std::move(fut)); + }); + } + + template <typename Func> + [[gnu::always_inline]] + auto then_interruptible(Func&& func) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + if constexpr (std::is_void_v<T>) { + auto fut = core_type::then( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + () mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } else { + auto fut = core_type::then( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (T&& arg) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::forward<T>(arg)); + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + } + + template <typename Func> + [[gnu::always_inline]] + auto then_unpack_interruptible(Func&& func) { + return then_interruptible([func=std::forward<Func>(func)](T&& tuple) mutable { + return std::apply(std::forward<Func>(func), std::move(tuple)); + }); + } + + template <typename Func, + typename Result =interrupt_futurize_t< + std::result_of_t<Func(std::exception_ptr)>>> + [[gnu::always_inline]] + Result handle_exception_interruptible(Func&& func) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + return core_type::then_wrapped( + [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable { + if (!fut.failed()) { + return seastar::make_ready_future<T>(fut.get()); + } else { + return call_with_interruption( + interrupt_condition, + std::move(func), + fut.get_exception()); + } + }); + } + + template <bool may_interrupt = true, typename Func, + typename Result = interrupt_futurize_t< + std::result_of_t<Func()>>> + [[gnu::always_inline]] + Result finally_interruptible(Func&& func) { + if constexpr (may_interrupt) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + return core_type::then_wrapped( + [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }); + } else { + return core_type::finally(std::forward<Func>(func)); + } + } + + template <typename Func, + typename Result = interrupt_futurize_t< + std::result_of_t<Func( + typename seastar::function_traits<Func>::template arg<0>::type)>>> + [[gnu::always_inline]] + Result handle_exception_type_interruptible(Func&& func) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + using trait = seastar::function_traits<Func>; + static_assert(trait::arity == 1, "func can take only one parameter"); + using ex_type = typename trait::template arg<0>::type; + return core_type::then_wrapped( + [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable -> Result { + if (!fut.failed()) { + return seastar::make_ready_future<T>(fut.get()); + } else { + try { + std::rethrow_exception(fut.get_exception()); + } catch (ex_type& ex) { + return call_with_interruption( + interrupt_condition, + std::move(func), ex); + } + } + }); + } + + + using my_type = interruptible_future_detail<InterruptCond, seastar::future<T>>; + + template <typename Func> + [[gnu::always_inline]] + my_type finally(Func&& func) { + return core_type::finally(std::forward<Func>(func)); + } +private: + template <typename Func> + [[gnu::always_inline]] + auto handle_interruption(Func&& func) { + return core_type::then_wrapped( + [func=std::move(func)](auto&& fut) mutable { + if (fut.failed()) { + std::exception_ptr ex = fut.get_exception(); + if (InterruptCond::is_interruption(ex)) { + return seastar::futurize_invoke(std::move(func), std::move(ex)); + } else { + return seastar::make_exception_future<T>(std::move(ex)); + } + } else { + return seastar::make_ready_future<T>(fut.get()); + } + }); + } + + seastar::future<T> to_future() { + return static_cast<core_type&&>(std::move(*this)); + } + // this is only supposed to be invoked by seastar functions + template <typename Func, + typename Result = interrupt_futurize_t< + std::result_of_t<Func(seastar::future<T>)>>> + [[gnu::always_inline]] + Result then_wrapped(Func&& func) { + return core_type::then_wrapped( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable { + return call_with_interruption( + interrupt_condition, + std::forward<Func>(func), + std::move(fut)); + }); + } + friend interruptor<InterruptCond>; + friend class interruptible_future_builder<InterruptCond>; + template <typename U> + friend struct ::seastar::futurize; + template <typename> + friend class ::seastar::future; + template <typename HeldState, typename Future> + friend class seastar::internal::do_with_state; + template<typename TX, typename F> + friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f); + template<typename T1, typename T2, typename T3_or_F, typename... More> + friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); + template <typename T1, typename T2, typename... More> + friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); + template <typename, typename> + friend class ::crimson::maybe_handle_error_t; + template <typename> + friend class ::seastar::internal::extract_values_from_futures_vector; + template <typename, typename> + friend class interruptible_future_detail; + template <typename ResolvedVectorTransform, typename Future> + friend inline typename ResolvedVectorTransform::future_type + seastar::internal::complete_when_all( + std::vector<Future>&& futures, + typename std::vector<Future>::iterator pos) noexcept; + template <typename> + friend class ::seastar::internal::when_all_state_component; + template <typename Lock, typename Func> + friend inline auto seastar::with_lock(Lock& lock, Func&& f); + template <typename IC, typename FT> + friend class parallel_for_each_state; +}; + +template <typename InterruptCond, typename Errorator> +struct interruptible_errorator { + using base_ertr = Errorator; + using intr_cond_t = InterruptCond; + + template <typename ValueT = void> + using future = interruptible_future_detail<InterruptCond, + typename Errorator::template future<ValueT>>; + + template <class... NewAllowedErrorsT> + using extend = interruptible_errorator< + InterruptCond, + typename Errorator::template extend<NewAllowedErrorsT...>>; + + template <class Ertr> + using extend_ertr = interruptible_errorator< + InterruptCond, + typename Errorator::template extend_ertr<Ertr>>; + + template <typename ValueT = void, typename... A> + static interruptible_future_detail< + InterruptCond, + typename Errorator::template future<ValueT>> + make_ready_future(A&&... value) { + return interruptible_future_detail< + InterruptCond, typename Errorator::template future<ValueT>>( + Errorator::template make_ready_future<ValueT>( + std::forward<A>(value)...)); + } + static interruptible_future_detail< + InterruptCond, + typename Errorator::template future<>> now() { + return interruptible_future_detail< + InterruptCond, typename Errorator::template future<>>( + Errorator::now()); + } + + using pass_further = typename Errorator::pass_further; +}; + +template <typename InterruptCond, + template <typename...> typename ErroratedFuture, + typename T> +class [[nodiscard]] interruptible_future_detail< + InterruptCond, + ErroratedFuture<::crimson::errorated_future_marker<T>>> + : private ErroratedFuture<::crimson::errorated_future_marker<T>> +{ +public: + using core_type = ErroratedFuture<crimson::errorated_future_marker<T>>; + using errorator_type = typename core_type::errorator_type; + using interrupt_errorator_type = + interruptible_errorator<InterruptCond, errorator_type>; + using interrupt_cond_type = InterruptCond; + + template <typename U> + using interrupt_futurize_t = + typename interruptor<InterruptCond>::template futurize_t<U>; + + using core_type::available; + using core_type::failed; + using core_type::core_type; + using core_type::get_exception; + + using value_type = typename core_type::value_type; + + interruptible_future_detail(seastar::future<T>&& fut) + : core_type(std::move(fut)) + {} + + template <template <typename...> typename ErroratedFuture2, + typename... U> + [[gnu::always_inline]] + interruptible_future_detail( + ErroratedFuture2<::crimson::errorated_future_marker<U...>>&& fut) + : core_type(std::move(fut)) {} + + template <template <typename...> typename ErroratedFuture2, + typename... U> + [[gnu::always_inline]] + interruptible_future_detail( + interruptible_future_detail<InterruptCond, + ErroratedFuture2<::crimson::errorated_future_marker<U...>>>&& fut) + : core_type(static_cast<typename std::decay_t<decltype(fut)>::core_type&&>(fut)) { + using src_errorator_t = \ + typename ErroratedFuture2< + ::crimson::errorated_future_marker<U...>>::errorator_type; + static_assert(core_type::errorator_type::template contains_once_v< + src_errorator_t>, + "conversion is only possible from less-or-eq errorated future!"); + } + + [[gnu::always_inline]] + interruptible_future_detail( + interruptible_future_detail<InterruptCond, seastar::future<T>>&& fut) + : core_type(static_cast<seastar::future<T>&&>(fut)) {} + + template <class... A> + [[gnu::always_inline]] + interruptible_future_detail(ready_future_marker, A&&... a) + : core_type(::seastar::make_ready_future<typename core_type::value_type>( + std::forward<A>(a)...)) { + } + [[gnu::always_inline]] + interruptible_future_detail(exception_future_marker, ::seastar::future_state_base&& state) noexcept + : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(state))) { + } + [[gnu::always_inline]] + interruptible_future_detail(exception_future_marker, std::exception_ptr&& ep) noexcept + : core_type(::seastar::futurize<core_type>::make_exception_future(std::move(ep))) { + } + + template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, + std::enable_if_t<!interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { + auto fut = core_type::safe_then( + std::forward<ValueInterruptCondT>(valfunc), + std::forward<ErrorVisitorT>(errfunc)); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <typename... Args> + auto si_then(Args&&... args) { + return safe_then_interruptible(std::forward<Args>(args)...); + } + + + template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, + typename U = T, std::enable_if_t<!std::is_void_v<U> && interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::safe_then( + [func=std::move(valfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (T&& args) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::forward<T>(args)); + }, [func=std::move(errfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& err) mutable -> decltype(auto) { + constexpr bool return_void = std::is_void_v< + std::invoke_result_t<ErrorVisitorT, + std::decay_t<decltype(err)>>>; + constexpr bool return_err = ::crimson::is_error_v< + std::decay_t<std::invoke_result_t<ErrorVisitorT, + std::decay_t<decltype(err)>>>>; + if constexpr (return_err || return_void) { + return non_futurized_call_with_interruption( + interrupt_condition, + std::move(func), + std::move(err)); + } else { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::move(err)); + } + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template<bool interruptible = true, typename ValueInterruptCondT, typename ErrorVisitorT, + typename U = T, std::enable_if_t<std::is_void_v<U> && interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::safe_then( + [func=std::move(valfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + () mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }, [func=std::move(errfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& err) mutable -> decltype(auto) { + constexpr bool return_void = std::is_void_v< + std::invoke_result_t<ErrorVisitorT, + std::decay_t<decltype(err)>>>; + constexpr bool return_err = ::crimson::is_error_v< + std::decay_t<std::invoke_result_t<ErrorVisitorT, + std::decay_t<decltype(err)>>>>; + if constexpr (return_err || return_void) { + return non_futurized_call_with_interruption( + interrupt_condition, + std::move(func), + std::move(err)); + } else { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::move(err)); + } + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <bool interruptible = true, typename ValueInterruptCondT, + typename U = T, std::enable_if_t<std::is_void_v<T> && interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::safe_then( + [func=std::move(valfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + () mutable { + return call_with_interruption( + interrupt_condition, + std::move(func)); + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <typename ValFuncT, typename ErrorFuncT> + [[gnu::always_inline]] + auto safe_then_unpack_interruptible(ValFuncT&& func, ErrorFuncT&& errfunc) { + return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable { + return std::apply(std::forward<ValFuncT>(func), std::move(tuple)); + }, std::forward<ErrorFuncT>(errfunc)); + } + + template <typename ValFuncT> + [[gnu::always_inline]] + auto safe_then_unpack_interruptible(ValFuncT&& func) { + return safe_then_interruptible([func=std::forward<ValFuncT>(func)](T&& tuple) mutable { + return std::apply(std::forward<ValFuncT>(func), std::move(tuple)); + }); + } + + template <bool interruptible = true, typename ValueInterruptCondT, + typename U = T, std::enable_if_t<!std::is_void_v<T> && interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::safe_then( + [func=std::move(valfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (T&& arg) mutable { + return call_with_interruption( + interrupt_condition, + std::move(func), + std::forward<T>(arg)); + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <bool interruptible = true, typename ValueInterruptCondT, + std::enable_if_t<!interruptible, int> = 0> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { + auto fut = core_type::safe_then(std::forward<ValueInterruptCondT>(valfunc)); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + + template <typename ValueInterruptCondT, + typename ErrorVisitorHeadT, + typename... ErrorVisitorTailT> + [[gnu::always_inline]] + auto safe_then_interruptible(ValueInterruptCondT&& valfunc, + ErrorVisitorHeadT&& err_func_head, + ErrorVisitorTailT&&... err_func_tail) { + return safe_then_interruptible( + std::forward<ValueInterruptCondT>(valfunc), + ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), + std::forward<ErrorVisitorTailT>(err_func_tail)...)); + } + + template <typename ValueInterruptCondT, + typename ErrorVisitorHeadT, + typename... ErrorVisitorTailT> + [[gnu::always_inline]] + auto safe_then_interruptible_tuple(ValueInterruptCondT&& valfunc, + ErrorVisitorHeadT&& err_func_head, + ErrorVisitorTailT&&... err_func_tail) { + return safe_then_interruptible( + std::forward<ValueInterruptCondT>(valfunc), + ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), + std::forward<ErrorVisitorTailT>(err_func_tail)...)); + } + + template <typename ValFuncT, + typename ErrorVisitorHeadT, + typename... ErrorVisitorTailT> + [[gnu::always_inline]] + auto safe_then_unpack_interruptible_tuple( + ValFuncT&& valfunc, + ErrorVisitorHeadT&& err_func_head, + ErrorVisitorTailT&&... err_func_tail) { + return safe_then_interruptible_tuple( + [valfunc=std::forward<ValFuncT>(valfunc)](T&& tuple) mutable { + return std::apply(std::forward<ValFuncT>(valfunc), std::move(tuple)); + }, + ::crimson::composer(std::forward<ErrorVisitorHeadT>(err_func_head), + std::forward<ErrorVisitorTailT>(err_func_tail)...)); + } + + template <bool interruptible = true, typename ErrorFunc> + auto handle_error_interruptible(ErrorFunc&& errfunc) { + if constexpr (interruptible) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto fut = core_type::handle_error( + [errfunc=std::move(errfunc), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& err) mutable -> decltype(auto) { + constexpr bool return_void = std::is_void_v< + std::invoke_result_t<ErrorFunc, + std::decay_t<decltype(err)>>>; + constexpr bool return_err = ::crimson::is_error_v< + std::decay_t<std::invoke_result_t<ErrorFunc, + std::decay_t<decltype(err)>>>>; + if constexpr (return_err || return_void) { + return non_futurized_call_with_interruption( + interrupt_condition, + std::move(errfunc), + std::move(err)); + } else { + return call_with_interruption( + interrupt_condition, + std::move(errfunc), + std::move(err)); + } + }); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } else { + return core_type::handle_error(std::forward<ErrorFunc>(errfunc)); + } + } + + template <typename ErrorFuncHead, + typename... ErrorFuncTail> + auto handle_error_interruptible(ErrorFuncHead&& error_func_head, + ErrorFuncTail&&... error_func_tail) { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + static_assert(sizeof...(ErrorFuncTail) > 0); + return this->handle_error_interruptible( + ::crimson::composer( + std::forward<ErrorFuncHead>(error_func_head), + std::forward<ErrorFuncTail>(error_func_tail)...)); + } + + template <typename Func> + [[gnu::always_inline]] + auto finally(Func&& func) { + auto fut = core_type::finally(std::forward<Func>(func)); + return (interrupt_futurize_t<decltype(fut)>)(std::move(fut)); + } + +private: + using core_type::_then; + template <typename Func> + [[gnu::always_inline]] + auto handle_interruption(Func&& func) { + // see errorator.h safe_then definition + using func_result_t = + typename std::invoke_result<Func, std::exception_ptr>::type; + using func_ertr_t = + typename core_type::template get_errorator_t<func_result_t>; + using this_ertr_t = typename core_type::errorator_type; + using ret_ertr_t = typename this_ertr_t::template extend_ertr<func_ertr_t>; + using futurator_t = typename ret_ertr_t::template futurize<func_result_t>; + return core_type::then_wrapped( + [func=std::move(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (auto&& fut) mutable + -> typename futurator_t::type { + if (fut.failed()) { + std::exception_ptr ex = fut.get_exception(); + if (InterruptCond::is_interruption(ex)) { + return futurator_t::invoke(std::move(func), std::move(ex)); + } else { + return futurator_t::make_exception_future(std::move(ex)); + } + } else { + return std::move(fut); + } + }); + } + + ErroratedFuture<::crimson::errorated_future_marker<T>> + to_future() { + return static_cast<core_type&&>(std::move(*this)); + } + + friend class interruptor<InterruptCond>; + friend class interruptible_future_builder<InterruptCond>; + template <typename U> + friend struct ::seastar::futurize; + template <typename> + friend class ::seastar::future; + template<typename TX, typename F> + friend inline auto ::seastar::internal::do_with_impl(TX&& rvalue, F&& f); + template<typename T1, typename T2, typename T3_or_F, typename... More> + friend inline auto ::seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more); + template <typename T1, typename T2, typename... More> + friend auto seastar::internal::do_with_impl(T1&& rv1, T2&& rv2, More&&... more); + template <typename HeldState, typename Future> + friend class seastar::internal::do_with_state; + template <typename, typename> + friend class ::crimson::maybe_handle_error_t; + template <typename, typename> + friend class interruptible_future_detail; + template <typename Lock, typename Func> + friend inline auto seastar::with_lock(Lock& lock, Func&& f); + template <typename IC, typename FT> + friend class parallel_for_each_state; +}; + +template <typename InterruptCond, typename T = void> +using interruptible_future = + interruptible_future_detail<InterruptCond, seastar::future<T>>; + +template <typename InterruptCond, typename Errorator, typename T = void> +using interruptible_errorated_future = + interruptible_future_detail< + InterruptCond, + typename Errorator::template future<T>>; + +template <typename InterruptCond> +struct interruptor +{ +public: + using condition = InterruptCond; + + template <typename FutureType> + [[gnu::always_inline]] + static interruptible_future_detail<InterruptCond, FutureType> + make_interruptible(FutureType&& fut) { + return interruptible_future_detail<InterruptCond, FutureType>(std::move(fut)); + } + + [[gnu::always_inline]] + static interruptible_future_detail<InterruptCond, seastar::future<>> now() { + return interruptible_future_detail< + InterruptCond, + seastar::future<>>(seastar::now()); + } + + template <typename ValueT = void, typename... A> + [[gnu::always_inline]] + static interruptible_future_detail<InterruptCond, seastar::future<ValueT>> + make_ready_future(A&&... value) { + return interruptible_future_detail<InterruptCond, seastar::future<ValueT>>( + seastar::make_ready_future<ValueT>(std::forward<A>(value)...)); + } + + template <typename T> + struct futurize { + using type = interruptible_future_detail< + InterruptCond, typename seastar::futurize<T>::type>; + }; + + template <typename FutureType> + struct futurize<interruptible_future_detail<InterruptCond, FutureType>> { + using type = interruptible_future_detail<InterruptCond, FutureType>; + }; + + template <typename T> + using futurize_t = typename futurize<T>::type; + + template <typename Container, typename AsyncAction> + [[gnu::always_inline]] + static auto do_for_each(Container& c, AsyncAction&& action) { + return do_for_each(std::begin(c), std::end(c), + std::forward<AsyncAction>(action)); + } + + template <typename OpFunc, typename OnInterrupt, + typename... Params> + static inline auto with_interruption_cond( + OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCond &&cond, Params&&... params) { + INTR_FUT_DEBUG( + "with_interruption_cond: interrupt_cond: {}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get()); + return internal::call_with_interruption_impl( + seastar::make_lw_shared<InterruptCond>(std::move(cond)), + std::forward<OpFunc>(opfunc), + std::forward<Params>(params)... + ).template handle_interruption(std::move(efunc)); + } + + template <typename OpFunc, typename OnInterrupt, + typename... InterruptCondParams> + static inline auto with_interruption( + OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCondParams&&... params) { + return with_interruption_cond( + std::forward<OpFunc>(opfunc), + std::forward<OnInterrupt>(efunc), + InterruptCond(std::forward<InterruptCondParams>(params)...)); + } + + template <typename Error, + typename Func, + typename... Params> + static inline auto with_interruption_to_error( + Func &&f, InterruptCond &&cond, Params&&... params) { + using func_result_t = std::invoke_result_t<Func, Params...>; + using func_ertr_t = + typename seastar::template futurize< + func_result_t>::core_type::errorator_type; + using with_trans_ertr = + typename func_ertr_t::template extend_ertr<errorator<Error>>; + + using value_type = typename func_result_t::value_type; + using ftype = typename std::conditional_t< + std::is_same_v<value_type, seastar::internal::monostate>, + typename with_trans_ertr::template future<>, + typename with_trans_ertr::template future<value_type>>; + + return with_interruption_cond( + std::forward<Func>(f), + [](auto e) -> ftype { + return Error::make(); + }, + std::forward<InterruptCond>(cond), + std::forward<Params>(params)...); + } + + template <typename Func> + [[gnu::always_inline]] + static auto wrap_function(Func&& func) { + return [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable { + return call_with_interruption( + interrupt_condition, + std::forward<Func>(func)); + }; + } + + template <typename Iterator, + InvokeReturnsInterruptibleFuture<typename Iterator::reference> AsyncAction> + [[gnu::always_inline]] + static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) { + using Result = std::invoke_result_t<AsyncAction, typename Iterator::reference>; + if constexpr (seastar::Future<typename Result::core_type>) { + return make_interruptible( + ::seastar::do_for_each(begin, end, + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (typename Iterator::reference x) mutable { + return call_with_interruption( + interrupt_condition, + std::move(action), + std::forward<decltype(*begin)>(x)).to_future(); + }) + ); + } else { + return make_interruptible( + ::crimson::do_for_each(begin, end, + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (typename Iterator::reference x) mutable { + return call_with_interruption( + interrupt_condition, + std::move(action), + std::forward<decltype(*begin)>(x)).to_future(); + }) + ); + } + } + + template <typename Iterator, typename AsyncAction> + requires (!InvokeReturnsInterruptibleFuture<AsyncAction, typename Iterator::reference>) + [[gnu::always_inline]] + static auto do_for_each(Iterator begin, Iterator end, AsyncAction&& action) { + if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction, typename Iterator::reference>) { + return make_interruptible( + ::seastar::do_for_each(begin, end, + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (typename Iterator::reference x) mutable { + return call_with_interruption( + interrupt_condition, + std::move(action), + std::forward<decltype(*begin)>(x)); + }) + ); + } else { + return make_interruptible( + ::crimson::do_for_each(begin, end, + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (typename Iterator::reference x) mutable { + return call_with_interruption( + interrupt_condition, + std::move(action), + std::forward<decltype(*begin)>(x)); + }) + ); + } + } + + template <InvokeReturnsInterruptibleFuture AsyncAction> + [[gnu::always_inline]] + static auto repeat(AsyncAction&& action) { + using Result = std::invoke_result_t<AsyncAction>; + if constexpr (seastar::Future<typename Result::core_type>) { + return make_interruptible( + ::seastar::repeat( + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { + return call_with_interruption( + interrupt_condition, + std::move(action)).to_future(); + }) + ); + } else { + return make_interruptible( + ::crimson::repeat( + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable { + return call_with_interruption( + interrupt_condition, + std::move(action)).to_future(); + }) + ); + } + } + template <typename AsyncAction> + requires (!InvokeReturnsInterruptibleFuture<AsyncAction>) + [[gnu::always_inline]] + static auto repeat(AsyncAction&& action) { + if constexpr (seastar::InvokeReturnsAnyFuture<AsyncAction>) { + return make_interruptible( + ::seastar::repeat( + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { + return call_with_interruption( + interrupt_condition, + std::move(action)); + }) + ); + } else { + return make_interruptible( + ::crimson::repeat( + [action=std::move(action), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] { + return call_with_interruption( + interrupt_condition, + std::move(action)); + }) + ); + } + } + + template <typename Iterator, typename Func> + static inline auto parallel_for_each( + Iterator begin, + Iterator end, + Func&& func + ) noexcept { + using ResultType = std::invoke_result_t<Func, typename Iterator::reference>; + parallel_for_each_state<InterruptCond, ResultType>* s = nullptr; + auto decorated_func = + [func=std::forward<Func>(func), + interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] + (decltype(*Iterator())&& x) mutable { + return call_with_interruption( + interrupt_condition, + std::forward<Func>(func), + std::forward<decltype(*begin)>(x)); + }; + // Process all elements, giving each future the following treatment: + // - available, not failed: do nothing + // - available, failed: collect exception in ex + // - not available: collect in s (allocating it if needed) + while (begin != end) { + auto f = seastar::futurize_invoke(decorated_func, *begin++); + if (!f.available() || f.failed()) { + if (!s) { + using itraits = std::iterator_traits<Iterator>; + auto n = (seastar::internal::iterator_range_estimate_vector_capacity( + begin, end, typename itraits::iterator_category()) + 1); + s = new parallel_for_each_state<InterruptCond, ResultType>(n); + } + s->add_future(std::move(f)); + } + } + // If any futures were not available, hand off to parallel_for_each_state::start(). + // Otherwise we can return a result immediately. + if (s) { + // s->get_future() takes ownership of s (and chains it to one of the futures it contains) + // so this isn't a leak + return s->get_future(); + } + return parallel_for_each_state<InterruptCond, ResultType>::now(); + } + + template <typename Container, typename Func> + static inline auto parallel_for_each(Container& container, Func&& func) noexcept { + return parallel_for_each( + std::begin(container), + std::end(container), + std::forward<Func>(func)); + } + + template <typename Iterator, typename Mapper, typename Initial, typename Reduce> + static inline interruptible_future<InterruptCond, Initial> map_reduce( + Iterator begin, Iterator end, Mapper&& mapper, Initial initial, Reduce&& reduce) { + struct state { + Initial result; + Reduce reduce; + }; + auto s = seastar::make_lw_shared(state{std::move(initial), std::move(reduce)}); + interruptible_future<InterruptCond> ret = seastar::make_ready_future<>(); + while (begin != end) { + ret = seastar::futurize_invoke(mapper, *begin++).then_wrapped_interruptible( + [s = s.get(), ret = std::move(ret)] (auto f) mutable { + try { + s->result = s->reduce(std::move(s->result), std::move(f.get0())); + return std::move(ret); + } catch (...) { + return std::move(ret).then_wrapped_interruptible([ex = std::current_exception()] (auto f) { + f.ignore_ready_future(); + return seastar::make_exception_future<>(ex); + }); + } + }); + } + return ret.then_interruptible([s] { + return seastar::make_ready_future<Initial>(std::move(s->result)); + }); + } + template <typename Range, typename Mapper, typename Initial, typename Reduce> + static inline interruptible_future<InterruptCond, Initial> map_reduce( + Range&& range, Mapper&& mapper, Initial initial, Reduce&& reduce) { + return map_reduce(std::begin(range), std::end(range), std::forward<Mapper>(mapper), + std::move(initial), std::move(reduce)); + } + + template<typename Fut> + requires seastar::Future<Fut> || IsInterruptibleFuture<Fut> + static auto futurize_invoke_if_func(Fut&& fut) noexcept { + return std::forward<Fut>(fut); + } + + template<typename Func> + requires (!seastar::Future<Func>) && (!IsInterruptibleFuture<Func>) + static auto futurize_invoke_if_func(Func&& func) noexcept { + return seastar::futurize_invoke(std::forward<Func>(func)); + } + + template <typename... FutOrFuncs> + static inline auto when_all(FutOrFuncs&&... fut_or_funcs) noexcept { + return ::seastar::internal::when_all_impl( + futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...); + } + + template <typename... FutOrFuncs> + static inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) noexcept { + return ::seastar::internal::when_all_succeed_impl( + futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...); + } + + template <typename Func, + typename Result = futurize_t<std::invoke_result_t<Func>>> + static inline Result async(Func&& func) { + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "interruptible_future_detail::async() yielding out, " + "interrupt_cond {},{} cleared", + (void*)interruption_condition.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + auto ret = seastar::async([func=std::forward<Func>(func), + interruption_condition] () mutable { + return non_futurized_call_with_interruption( + interruption_condition, std::forward<Func>(func)); + }); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "interruptible_future_detail::async() yield back, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + return ret; + } + + template <class FutureT> + static decltype(auto) green_get(FutureT&& fut) { + if (fut.available()) { + return fut.get(); + } else { + // destined to wait! + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "green_get() waiting, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + auto&& value = fut.get(); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "green_get() got, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + return std::move(value); + } + } + + static void yield() { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "interruptible_future_detail::yield() yielding out, " + "interrupt_cond {},{} cleared", + (void*)interruption_condition.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + seastar::thread::yield(); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "interruptible_future_detail::yield() yield back, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + } + + static void maybe_yield() { + ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond); + if (seastar::thread::should_yield()) { + auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond; + INTR_FUT_DEBUG( + "interruptible_future_detail::may_yield() yielding out, " + "interrupt_cond {},{} cleared", + (void*)interruption_condition.get(), + typeid(InterruptCond).name()); + interrupt_cond<InterruptCond>.reset(); + seastar::thread::yield(); + interrupt_cond<InterruptCond>.set(interruption_condition); + INTR_FUT_DEBUG( + "interruptible_future_detail::may_yield() yield back, interrupt_cond: {},{}", + (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(), + typeid(InterruptCond).name()); + } + } +}; + +} // namespace crimson::interruptible + +namespace seastar { + +template <typename InterruptCond, typename... T> +struct futurize<::crimson::interruptible::interruptible_future_detail< + InterruptCond, seastar::future<T...>>> { + using type = ::crimson::interruptible::interruptible_future_detail< + InterruptCond, seastar::future<T...>>; + + using value_type = typename type::value_type; + using tuple_type = typename type::tuple_type; + + static type from_tuple(tuple_type&& value) { + return type(ready_future_marker(), std::move(value)); + } + static type from_tuple(const tuple_type& value) { + return type(ready_future_marker(), value); + } + static type from_tuple(value_type&& value) { + return type(ready_future_marker(), std::move(value)); + } + static type from_tuple(const value_type& value) { + return type(ready_future_marker(), value); + } + + template <typename Func, typename... FuncArgs> + [[gnu::always_inline]] + static inline type invoke(Func&& func, FuncArgs&&... args) noexcept { + try { + return func(std::forward<FuncArgs>(args)...); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Func> + [[gnu::always_inline]] + static type invoke(Func&& func, seastar::internal::monostate) noexcept { + try { + return ::seastar::futurize_invoke(std::forward<Func>(func)); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Arg> + static inline type make_exception_future(Arg&& arg) noexcept { + return seastar::make_exception_future<T...>(std::forward<Arg>(arg)); + } + + static inline type make_exception_future(future_state_base&& state) noexcept { + return seastar::internal::make_exception_future<T...>(std::move(state)); + } + + template<typename PromiseT, typename Func> + static void satisfy_with_result_of(PromiseT&& pr, Func&& func) { + func().forward_to(std::move(pr)); + } +}; + +template <typename InterruptCond, + template <typename...> typename ErroratedFuture, + typename... T> +struct futurize< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, + ErroratedFuture<::crimson::errorated_future_marker<T...>> + > +> { + using type = ::crimson::interruptible::interruptible_future_detail< + InterruptCond, + ErroratedFuture<::crimson::errorated_future_marker<T...>>>; + using core_type = ErroratedFuture< + ::crimson::errorated_future_marker<T...>>; + using errorator_type = + ::crimson::interruptible::interruptible_errorator< + InterruptCond, + typename ErroratedFuture< + ::crimson::errorated_future_marker<T...>>::errorator_type>; + + template<typename Func, typename... FuncArgs> + static inline type invoke(Func&& func, FuncArgs&&... args) noexcept { + try { + return func(std::forward<FuncArgs>(args)...); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Func> + [[gnu::always_inline]] + static type invoke(Func&& func, seastar::internal::monostate) noexcept { + try { + return ::seastar::futurize_invoke(std::forward<Func>(func)); + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + template <typename Arg> + static inline type make_exception_future(Arg&& arg) noexcept { + return core_type::errorator_type::template make_exception_future2<T...>( + std::forward<Arg>(arg)); + } + + template<typename PromiseT, typename Func> + static void satisfy_with_result_of(PromiseT&& pr, Func&& func) { + func().forward_to(std::move(pr)); + } + +}; + +template <typename InterruptCond, typename FutureType> +struct continuation_base_from_future< + ::crimson::interruptible::interruptible_future_detail<InterruptCond, FutureType>> { + using type = typename seastar::continuation_base_from_future<FutureType>::type; +}; + +template <typename InterruptCond, typename FutureType> +struct is_future< + ::crimson::interruptible::interruptible_future_detail< + InterruptCond, + FutureType>> + : std::true_type {}; +} // namespace seastar diff --git a/src/crimson/common/layout.h b/src/crimson/common/layout.h new file mode 100644 index 000000000..9d54ecd1d --- /dev/null +++ b/src/crimson/common/layout.h @@ -0,0 +1,737 @@ +// Copyright 2018 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// MOTIVATION AND TUTORIAL +// +// If you want to put in a single heap allocation N doubles followed by M ints, +// it's easy if N and M are known at compile time. +// +// struct S { +// double a[N]; +// int b[M]; +// }; +// +// S* p = new S; +// +// But what if N and M are known only in run time? Class template Layout to the +// rescue! It's a portable generalization of the technique known as struct hack. +// +// // This object will tell us everything we need to know about the memory +// // layout of double[N] followed by int[M]. It's structurally identical to +// // size_t[2] that stores N and M. It's very cheap to create. +// const Layout<double, int> layout(N, M); +// +// // Allocate enough memory for both arrays. `AllocSize()` tells us how much +// // memory is needed. We are free to use any allocation function we want as +// // long as it returns aligned memory. +// std::unique_ptr<unsigned char[]> p(new unsigned char[layout.AllocSize()]); +// +// // Obtain the pointer to the array of doubles. +// // Equivalent to `reinterpret_cast<double*>(p.get())`. +// // +// // We could have written layout.Pointer<0>(p) instead. If all the types are +// // unique you can use either form, but if some types are repeated you must +// // use the index form. +// double* a = layout.Pointer<double>(p.get()); +// +// // Obtain the pointer to the array of ints. +// // Equivalent to `reinterpret_cast<int*>(p.get() + N * 8)`. +// int* b = layout.Pointer<int>(p); +// +// If we are unable to specify sizes of all fields, we can pass as many sizes as +// we can to `Partial()`. In return, it'll allow us to access the fields whose +// locations and sizes can be computed from the provided information. +// `Partial()` comes in handy when the array sizes are embedded into the +// allocation. +// +// // size_t[1] containing N, size_t[1] containing M, double[N], int[M]. +// using L = Layout<size_t, size_t, double, int>; +// +// unsigned char* Allocate(size_t n, size_t m) { +// const L layout(1, 1, n, m); +// unsigned char* p = new unsigned char[layout.AllocSize()]; +// *layout.Pointer<0>(p) = n; +// *layout.Pointer<1>(p) = m; +// return p; +// } +// +// void Use(unsigned char* p) { +// // First, extract N and M. +// // Specify that the first array has only one element. Using `prefix` we +// // can access the first two arrays but not more. +// constexpr auto prefix = L::Partial(1); +// size_t n = *prefix.Pointer<0>(p); +// size_t m = *prefix.Pointer<1>(p); +// +// // Now we can get pointers to the payload. +// const L layout(1, 1, n, m); +// double* a = layout.Pointer<double>(p); +// int* b = layout.Pointer<int>(p); +// } +// +// The layout we used above combines fixed-size with dynamically-sized fields. +// This is quite common. Layout is optimized for this use case and generates +// optimal code. All computations that can be performed at compile time are +// indeed performed at compile time. +// +// Efficiency tip: The order of fields matters. In `Layout<T1, ..., TN>` try to +// ensure that `alignof(T1) >= ... >= alignof(TN)`. This way you'll have no +// padding in between arrays. +// +// You can manually override the alignment of an array by wrapping the type in +// `Aligned<T, N>`. `Layout<..., Aligned<T, N>, ...>` has exactly the same API +// and behavior as `Layout<..., T, ...>` except that the first element of the +// array of `T` is aligned to `N` (the rest of the elements follow without +// padding). `N` cannot be less than `alignof(T)`. +// +// `AllocSize()` and `Pointer()` are the most basic methods for dealing with +// memory layouts. Check out the reference or code below to discover more. +// +// EXAMPLE +// +// // Immutable move-only string with sizeof equal to sizeof(void*). The +// // string size and the characters are kept in the same heap allocation. +// class CompactString { +// public: +// CompactString(const char* s = "") { +// const size_t size = strlen(s); +// // size_t[1] followed by char[size + 1]. +// const L layout(1, size + 1); +// p_.reset(new unsigned char[layout.AllocSize()]); +// // If running under ASAN, mark the padding bytes, if any, to catch +// // memory errors. +// layout.PoisonPadding(p_.get()); +// // Store the size in the allocation. +// *layout.Pointer<size_t>(p_.get()) = size; +// // Store the characters in the allocation. +// memcpy(layout.Pointer<char>(p_.get()), s, size + 1); +// } +// +// size_t size() const { +// // Equivalent to reinterpret_cast<size_t&>(*p). +// return *L::Partial().Pointer<size_t>(p_.get()); +// } +// +// const char* c_str() const { +// // Equivalent to reinterpret_cast<char*>(p.get() + sizeof(size_t)). +// // The argument in Partial(1) specifies that we have size_t[1] in front +// // of the characters. +// return L::Partial(1).Pointer<char>(p_.get()); +// } +// +// private: +// // Our heap allocation contains a size_t followed by an array of chars. +// using L = Layout<size_t, char>; +// std::unique_ptr<unsigned char[]> p_; +// }; +// +// int main() { +// CompactString s = "hello"; +// assert(s.size() == 5); +// assert(strcmp(s.c_str(), "hello") == 0); +// } +// +// DOCUMENTATION +// +// The interface exported by this file consists of: +// - class `Layout<>` and its public members. +// - The public members of class `internal_layout::LayoutImpl<>`. That class +// isn't intended to be used directly, and its name and template parameter +// list are internal implementation details, but the class itself provides +// most of the functionality in this file. See comments on its members for +// detailed documentation. +// +// `Layout<T1,... Tn>::Partial(count1,..., countm)` (where `m` <= `n`) returns a +// `LayoutImpl<>` object. `Layout<T1,..., Tn> layout(count1,..., countn)` +// creates a `Layout` object, which exposes the same functionality by inheriting +// from `LayoutImpl<>`. + +#ifndef ABSL_CONTAINER_INTERNAL_LAYOUT_H_ +#define ABSL_CONTAINER_INTERNAL_LAYOUT_H_ + +#include <assert.h> +#include <stddef.h> +#include <stdint.h> +#include <ostream> +#include <string> +#include <tuple> +#include <type_traits> +#include <typeinfo> +#include <utility> + +#ifdef ADDRESS_SANITIZER +#include <sanitizer/asan_interface.h> +#endif + +// for C++20 std::span +#include <boost/beast/core/span.hpp> +#include <fmt/format.h> + +#if defined(__GXX_RTTI) +#define ABSL_INTERNAL_HAS_CXA_DEMANGLE +#endif + +#ifdef ABSL_INTERNAL_HAS_CXA_DEMANGLE +#include <cxxabi.h> +#endif + +namespace absl { +namespace container_internal { + +// A type wrapper that instructs `Layout` to use the specific alignment for the +// array. `Layout<..., Aligned<T, N>, ...>` has exactly the same API +// and behavior as `Layout<..., T, ...>` except that the first element of the +// array of `T` is aligned to `N` (the rest of the elements follow without +// padding). +// +// Requires: `N >= alignof(T)` and `N` is a power of 2. +template <class T, size_t N> +struct Aligned; + +namespace internal_layout { + +template <class T> +struct NotAligned {}; + +template <class T, size_t N> +struct NotAligned<const Aligned<T, N>> { + static_assert(sizeof(T) == 0, "Aligned<T, N> cannot be const-qualified"); +}; + +template <size_t> +using IntToSize = size_t; + +template <class> +using TypeToSize = size_t; + +template <class T> +struct Type : NotAligned<T> { + using type = T; +}; + +template <class T, size_t N> +struct Type<Aligned<T, N>> { + using type = T; +}; + +template <class T> +struct SizeOf : NotAligned<T>, std::integral_constant<size_t, sizeof(T)> {}; + +template <class T, size_t N> +struct SizeOf<Aligned<T, N>> : std::integral_constant<size_t, sizeof(T)> {}; + +// Note: workaround for https://gcc.gnu.org/PR88115 +template <class T> +struct AlignOf : NotAligned<T> { + static constexpr size_t value = alignof(T); +}; + +template <class T, size_t N> +struct AlignOf<Aligned<T, N>> { + static_assert(N % alignof(T) == 0, + "Custom alignment can't be lower than the type's alignment"); + static constexpr size_t value = N; +}; + +// Does `Ts...` contain `T`? +template <class T, class... Ts> +using Contains = std::disjunction<std::is_same<T, Ts>...>; + +template <class From, class To> +using CopyConst = + typename std::conditional_t<std::is_const_v<From>, const To, To>; + +// Note: We're not qualifying this with absl:: because it doesn't compile under +// MSVC. +template <class T> +using SliceType = boost::beast::span<T>; + +// This namespace contains no types. It prevents functions defined in it from +// being found by ADL. +namespace adl_barrier { + +template <class Needle, class... Ts> +constexpr size_t Find(Needle, Needle, Ts...) { + static_assert(!Contains<Needle, Ts...>(), "Duplicate element type"); + return 0; +} + +template <class Needle, class T, class... Ts> +constexpr size_t Find(Needle, T, Ts...) { + return adl_barrier::Find(Needle(), Ts()...) + 1; +} + +constexpr bool IsPow2(size_t n) { return !(n & (n - 1)); } + +// Returns `q * m` for the smallest `q` such that `q * m >= n`. +// Requires: `m` is a power of two. It's enforced by IsLegalElementType below. +constexpr size_t Align(size_t n, size_t m) { return (n + m - 1) & ~(m - 1); } + +constexpr size_t Min(size_t a, size_t b) { return b < a ? b : a; } + +constexpr size_t Max(size_t a) { return a; } + +template <class... Ts> +constexpr size_t Max(size_t a, size_t b, Ts... rest) { + return adl_barrier::Max(b < a ? a : b, rest...); +} + +template <class T> +std::string TypeName() { + std::string out; + int status = 0; + char* demangled = nullptr; +#ifdef ABSL_INTERNAL_HAS_CXA_DEMANGLE + demangled = abi::__cxa_demangle(typeid(T).name(), nullptr, nullptr, &status); +#endif + if (status == 0 && demangled != nullptr) { // Demangling succeeded. + out = fmt::format("<{}>", demangled); + free(demangled); + } else { +#if defined(__GXX_RTTI) || defined(_CPPRTTI) + out = fmt::format("<{}>", typeid(T).name()); +#endif + } + return out; +} + +} // namespace adl_barrier + +template <bool C> +using EnableIf = typename std::enable_if_t<C, int>; + +// Can `T` be a template argument of `Layout`? +template <class T> +using IsLegalElementType = std::integral_constant< + bool, !std::is_reference_v<T> && !std::is_volatile_v<T> && + !std::is_reference_v<typename Type<T>::type> && + !std::is_volatile_v<typename Type<T>::type> && + adl_barrier::IsPow2(AlignOf<T>::value)>; + +template <class Elements, class SizeSeq, class OffsetSeq> +class LayoutImpl; + +// Public base class of `Layout` and the result type of `Layout::Partial()`. +// +// `Elements...` contains all template arguments of `Layout` that created this +// instance. +// +// `SizeSeq...` is `[0, NumSizes)` where `NumSizes` is the number of arguments +// passed to `Layout::Partial()` or `Layout::Layout()`. +// +// `OffsetSeq...` is `[0, NumOffsets)` where `NumOffsets` is +// `Min(sizeof...(Elements), NumSizes + 1)` (the number of arrays for which we +// can compute offsets). +template <class... Elements, size_t... SizeSeq, size_t... OffsetSeq> +class LayoutImpl<std::tuple<Elements...>, std::index_sequence<SizeSeq...>, + std::index_sequence<OffsetSeq...>> { + private: + static_assert(sizeof...(Elements) > 0, "At least one field is required"); + static_assert(std::conjunction_v<IsLegalElementType<Elements>...>, + "Invalid element type (see IsLegalElementType)"); + + enum { + NumTypes = sizeof...(Elements), + NumSizes = sizeof...(SizeSeq), + NumOffsets = sizeof...(OffsetSeq), + }; + + // These are guaranteed by `Layout`. + static_assert(NumOffsets == adl_barrier::Min(NumTypes, NumSizes + 1), + "Internal error"); + static_assert(NumTypes > 0, "Internal error"); + + // Returns the index of `T` in `Elements...`. Results in a compilation error + // if `Elements...` doesn't contain exactly one instance of `T`. + template <class T> + static constexpr size_t ElementIndex() { + static_assert(Contains<Type<T>, Type<typename Type<Elements>::type>...>(), + "Type not found"); + return adl_barrier::Find(Type<T>(), + Type<typename Type<Elements>::type>()...); + } + + template <size_t N> + using ElementAlignment = + AlignOf<typename std::tuple_element<N, std::tuple<Elements...>>::type>; + + public: + // Element types of all arrays packed in a tuple. + using ElementTypes = std::tuple<typename Type<Elements>::type...>; + + // Element type of the Nth array. + template <size_t N> + using ElementType = typename std::tuple_element<N, ElementTypes>::type; + + constexpr explicit LayoutImpl(IntToSize<SizeSeq>... sizes) + : size_{sizes...} {} + + // Alignment of the layout, equal to the strictest alignment of all elements. + // All pointers passed to the methods of layout must be aligned to this value. + static constexpr size_t Alignment() { + return adl_barrier::Max(AlignOf<Elements>::value...); + } + + // Offset in bytes of the Nth array. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // assert(x.Offset<0>() == 0); // The ints starts from 0. + // assert(x.Offset<1>() == 16); // The doubles starts from 16. + // + // Requires: `N <= NumSizes && N < sizeof...(Ts)`. + template <size_t N, EnableIf<N == 0> = 0> + constexpr size_t Offset() const { + return 0; + } + + template <size_t N, EnableIf<N != 0> = 0> + constexpr size_t Offset() const { + static_assert(N < NumOffsets, "Index out of bounds"); + return adl_barrier::Align( + Offset<N - 1>() + SizeOf<ElementType<N - 1>>() * size_[N - 1], + ElementAlignment<N>::value); + } + + // Offset in bytes of the array with the specified element type. There must + // be exactly one such array and its zero-based index must be at most + // `NumSizes`. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // assert(x.Offset<int>() == 0); // The ints starts from 0. + // assert(x.Offset<double>() == 16); // The doubles starts from 16. + template <class T> + constexpr size_t Offset() const { + return Offset<ElementIndex<T>()>(); + } + + // Offsets in bytes of all arrays for which the offsets are known. + constexpr std::array<size_t, NumOffsets> Offsets() const { + return {{Offset<OffsetSeq>()...}}; + } + + // The number of elements in the Nth array. This is the Nth argument of + // `Layout::Partial()` or `Layout::Layout()` (zero-based). + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // assert(x.Size<0>() == 3); + // assert(x.Size<1>() == 4); + // + // Requires: `N < NumSizes`. + template <size_t N> + constexpr size_t Size() const { + static_assert(N < NumSizes, "Index out of bounds"); + return size_[N]; + } + + // The number of elements in the array with the specified element type. + // There must be exactly one such array and its zero-based index must be + // at most `NumSizes`. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // assert(x.Size<int>() == 3); + // assert(x.Size<double>() == 4); + template <class T> + constexpr size_t Size() const { + return Size<ElementIndex<T>()>(); + } + + // The number of elements of all arrays for which they are known. + constexpr std::array<size_t, NumSizes> Sizes() const { + return {{Size<SizeSeq>()...}}; + } + + // Pointer to the beginning of the Nth array. + // + // `Char` must be `[const] [signed|unsigned] char`. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // unsigned char* p = new unsigned char[x.AllocSize()]; + // int* ints = x.Pointer<0>(p); + // double* doubles = x.Pointer<1>(p); + // + // Requires: `N <= NumSizes && N < sizeof...(Ts)`. + // Requires: `p` is aligned to `Alignment()`. + template <size_t N, class Char> + CopyConst<Char, ElementType<N>>* Pointer(Char* p) const { + using C = typename std::remove_const<Char>::type; + static_assert( + std::is_same<C, char>() || std::is_same<C, unsigned char>() || + std::is_same<C, signed char>(), + "The argument must be a pointer to [const] [signed|unsigned] char"); + constexpr size_t alignment = Alignment(); + (void)alignment; + assert(reinterpret_cast<uintptr_t>(p) % alignment == 0); + return reinterpret_cast<CopyConst<Char, ElementType<N>>*>(p + Offset<N>()); + } + + // Pointer to the beginning of the array with the specified element type. + // There must be exactly one such array and its zero-based index must be at + // most `NumSizes`. + // + // `Char` must be `[const] [signed|unsigned] char`. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // unsigned char* p = new unsigned char[x.AllocSize()]; + // int* ints = x.Pointer<int>(p); + // double* doubles = x.Pointer<double>(p); + // + // Requires: `p` is aligned to `Alignment()`. + template <class T, class Char> + CopyConst<Char, T>* Pointer(Char* p) const { + return Pointer<ElementIndex<T>()>(p); + } + + // Pointers to all arrays for which pointers are known. + // + // `Char` must be `[const] [signed|unsigned] char`. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // unsigned char* p = new unsigned char[x.AllocSize()]; + // + // int* ints; + // double* doubles; + // std::tie(ints, doubles) = x.Pointers(p); + // + // Requires: `p` is aligned to `Alignment()`. + // + // Note: We're not using ElementType alias here because it does not compile + // under MSVC. + template <class Char> + std::tuple<CopyConst< + Char, typename std::tuple_element<OffsetSeq, ElementTypes>::type>*...> + Pointers(Char* p) const { + return std::tuple<CopyConst<Char, ElementType<OffsetSeq>>*...>( + Pointer<OffsetSeq>(p)...); + } + + // The Nth array. + // + // `Char` must be `[const] [signed|unsigned] char`. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // unsigned char* p = new unsigned char[x.AllocSize()]; + // Span<int> ints = x.Slice<0>(p); + // Span<double> doubles = x.Slice<1>(p); + // + // Requires: `N < NumSizes`. + // Requires: `p` is aligned to `Alignment()`. + template <size_t N, class Char> + SliceType<CopyConst<Char, ElementType<N>>> Slice(Char* p) const { + return SliceType<CopyConst<Char, ElementType<N>>>(Pointer<N>(p), Size<N>()); + } + + // The array with the specified element type. There must be exactly one + // such array and its zero-based index must be less than `NumSizes`. + // + // `Char` must be `[const] [signed|unsigned] char`. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // unsigned char* p = new unsigned char[x.AllocSize()]; + // Span<int> ints = x.Slice<int>(p); + // Span<double> doubles = x.Slice<double>(p); + // + // Requires: `p` is aligned to `Alignment()`. + template <class T, class Char> + SliceType<CopyConst<Char, T>> Slice(Char* p) const { + return Slice<ElementIndex<T>()>(p); + } + + // All arrays with known sizes. + // + // `Char` must be `[const] [signed|unsigned] char`. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // unsigned char* p = new unsigned char[x.AllocSize()]; + // + // Span<int> ints; + // Span<double> doubles; + // std::tie(ints, doubles) = x.Slices(p); + // + // Requires: `p` is aligned to `Alignment()`. + // + // Note: We're not using ElementType alias here because it does not compile + // under MSVC. + template <class Char> + std::tuple<SliceType<CopyConst< + Char, typename std::tuple_element<SizeSeq, ElementTypes>::type>>...> + Slices(Char* p) const { + // Workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=63875 (fixed + // in 6.1). + (void)p; + return std::tuple<SliceType<CopyConst<Char, ElementType<SizeSeq>>>...>( + Slice<SizeSeq>(p)...); + } + + // The size of the allocation that fits all arrays. + // + // // int[3], 4 bytes of padding, double[4]. + // Layout<int, double> x(3, 4); + // unsigned char* p = new unsigned char[x.AllocSize()]; // 48 bytes + // + // Requires: `NumSizes == sizeof...(Ts)`. + constexpr size_t AllocSize() const { + static_assert(NumTypes == NumSizes, "You must specify sizes of all fields"); + return Offset<NumTypes - 1>() + + SizeOf<ElementType<NumTypes - 1>>() * size_[NumTypes - 1]; + } + + // If built with --config=asan, poisons padding bytes (if any) in the + // allocation. The pointer must point to a memory block at least + // `AllocSize()` bytes in length. + // + // `Char` must be `[const] [signed|unsigned] char`. + // + // Requires: `p` is aligned to `Alignment()`. + template <class Char, size_t N = NumOffsets - 1, EnableIf<N == 0> = 0> + void PoisonPadding(const Char* p) const { + Pointer<0>(p); // verify the requirements on `Char` and `p` + } + + template <class Char, size_t N = NumOffsets - 1, EnableIf<N != 0> = 0> + void PoisonPadding(const Char* p) const { + static_assert(N < NumOffsets, "Index out of bounds"); + (void)p; +#ifdef ADDRESS_SANITIZER + PoisonPadding<Char, N - 1>(p); + // The `if` is an optimization. It doesn't affect the observable behaviour. + if (ElementAlignment<N - 1>::value % ElementAlignment<N>::value) { + size_t start = + Offset<N - 1>() + SizeOf<ElementType<N - 1>>() * size_[N - 1]; + ASAN_POISON_MEMORY_REGION(p + start, Offset<N>() - start); + } +#endif + } + + // Human-readable description of the memory layout. Useful for debugging. + // Slow. + // + // // char[5], 3 bytes of padding, int[3], 4 bytes of padding, followed + // // by an unknown number of doubles. + // auto x = Layout<char, int, double>::Partial(5, 3); + // assert(x.DebugString() == + // "@0<char>(1)[5]; @8<int>(4)[3]; @24<double>(8)"); + // + // Each field is in the following format: @offset<type>(sizeof)[size] (<type> + // may be missing depending on the target platform). For example, + // @8<int>(4)[3] means that at offset 8 we have an array of ints, where each + // int is 4 bytes, and we have 3 of those ints. The size of the last field may + // be missing (as in the example above). Only fields with known offsets are + // described. Type names may differ across platforms: one compiler might + // produce "unsigned*" where another produces "unsigned int *". + std::string DebugString() const { + const auto offsets = Offsets(); + const size_t sizes[] = {SizeOf<ElementType<OffsetSeq>>()...}; + const std::string types[] = { + adl_barrier::TypeName<ElementType<OffsetSeq>>()...}; + std::string res = fmt::format("@0{}({})", types[0], sizes[0]); + for (size_t i = 0; i != NumOffsets - 1; ++i) { + res += fmt::format("[{}]; @({})", size_[i], offsets[i + 1], types[i + 1], sizes[i + 1]); + } + // NumSizes is a constant that may be zero. Some compilers cannot see that + // inside the if statement "size_[NumSizes - 1]" must be valid. + int last = static_cast<int>(NumSizes) - 1; + if (NumTypes == NumSizes && last >= 0) { + res += fmt::format("[{}]", size_[last]); + } + return res; + } + + private: + // Arguments of `Layout::Partial()` or `Layout::Layout()`. + size_t size_[NumSizes > 0 ? NumSizes : 1]; +}; + +template <size_t NumSizes, class... Ts> +using LayoutType = LayoutImpl< + std::tuple<Ts...>, std::make_index_sequence<NumSizes>, + std::make_index_sequence<adl_barrier::Min(sizeof...(Ts), NumSizes + 1)>>; + +} // namespace internal_layout + +// Descriptor of arrays of various types and sizes laid out in memory one after +// another. See the top of the file for documentation. +// +// Check out the public API of internal_layout::LayoutImpl above. The type is +// internal to the library but its methods are public, and they are inherited +// by `Layout`. +template <class... Ts> +class Layout : public internal_layout::LayoutType<sizeof...(Ts), Ts...> { + public: + static_assert(sizeof...(Ts) > 0, "At least one field is required"); + static_assert( + std::conjunction_v<internal_layout::IsLegalElementType<Ts>...>, + "Invalid element type (see IsLegalElementType)"); + + // The result type of `Partial()` with `NumSizes` arguments. + template <size_t NumSizes> + using PartialType = internal_layout::LayoutType<NumSizes, Ts...>; + + // `Layout` knows the element types of the arrays we want to lay out in + // memory but not the number of elements in each array. + // `Partial(size1, ..., sizeN)` allows us to specify the latter. The + // resulting immutable object can be used to obtain pointers to the + // individual arrays. + // + // It's allowed to pass fewer array sizes than the number of arrays. E.g., + // if all you need is to the offset of the second array, you only need to + // pass one argument -- the number of elements in the first array. + // + // // int[3] followed by 4 bytes of padding and an unknown number of + // // doubles. + // auto x = Layout<int, double>::Partial(3); + // // doubles start at byte 16. + // assert(x.Offset<1>() == 16); + // + // If you know the number of elements in all arrays, you can still call + // `Partial()` but it's more convenient to use the constructor of `Layout`. + // + // Layout<int, double> x(3, 5); + // + // Note: The sizes of the arrays must be specified in number of elements, + // not in bytes. + // + // Requires: `sizeof...(Sizes) <= sizeof...(Ts)`. + // Requires: all arguments are convertible to `size_t`. + template <class... Sizes> + static constexpr PartialType<sizeof...(Sizes)> Partial(Sizes&&... sizes) { + static_assert(sizeof...(Sizes) <= sizeof...(Ts)); + return PartialType<sizeof...(Sizes)>(std::forward<Sizes>(sizes)...); + } + + // Creates a layout with the sizes of all arrays specified. If you know + // only the sizes of the first N arrays (where N can be zero), you can use + // `Partial()` defined above. The constructor is essentially equivalent to + // calling `Partial()` and passing in all array sizes; the constructor is + // provided as a convenient abbreviation. + // + // Note: The sizes of the arrays must be specified in number of elements, + // not in bytes. + constexpr explicit Layout(internal_layout::TypeToSize<Ts>... sizes) + : internal_layout::LayoutType<sizeof...(Ts), Ts...>(sizes...) {} +}; + +} // namespace container_internal +} // namespace absl + +#endif // ABSL_CONTAINER_INTERNAL_LAYOUT_H_ diff --git a/src/crimson/common/local_shared_foreign_ptr.h b/src/crimson/common/local_shared_foreign_ptr.h new file mode 100644 index 000000000..c4bd1099a --- /dev/null +++ b/src/crimson/common/local_shared_foreign_ptr.h @@ -0,0 +1,245 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> + +#include <seastar/core/smp.hh> +#include <seastar/core/future.hh> +#include <seastar/core/sharded.hh> + +namespace crimson { + +/** + * local_shared_foreign_ptr + * + * See seastar/include/seastar/core/sharded.hh:foreign_ptr + * + * seastar::foreign_ptr wraps a smart ptr by proxying the copy() and destructor + * operations back to the original core. This works well except that copy() + * requires a cross-core call. We need a smart_ptr which allows cross-core + * caching of (for example) OSDMaps, but we want to avoid the overhead inherent + * in incrementing the source smart_ptr on every copy. Thus, + * local_shared_foreign_ptr maintains a core-local foreign_ptr back to the + * original core instance with core-local ref counting. + */ +template <typename PtrType> +class local_shared_foreign_ptr { + using element_type = typename std::pointer_traits<PtrType>::element_type; + using pointer = element_type*; + + seastar::lw_shared_ptr<seastar::foreign_ptr<PtrType>> ptr; + + /// Wraps a pointer object and remembers the current core. + local_shared_foreign_ptr(seastar::foreign_ptr<PtrType> &&fptr) + : ptr(fptr ? seastar::make_lw_shared(std::move(fptr)) : nullptr) { + assert(!ptr || (ptr && *ptr)); + } + + template <typename T> + friend local_shared_foreign_ptr<T> make_local_shared_foreign( + seastar::foreign_ptr<T> &&); + +public: + /// Constructs a null local_shared_foreign_ptr<>. + local_shared_foreign_ptr() = default; + + /// Constructs a null local_shared_foreign_ptr<>. + local_shared_foreign_ptr(std::nullptr_t) : local_shared_foreign_ptr() {} + + /// Moves a local_shared_foreign_ptr<> to another object. + local_shared_foreign_ptr(local_shared_foreign_ptr&& other) = default; + + /// Copies a local_shared_foreign_ptr<> + local_shared_foreign_ptr(const local_shared_foreign_ptr &other) = default; + + /// Releases reference to ptr eventually releasing the contained foreign_ptr + ~local_shared_foreign_ptr() = default; + + /// Creates a copy of this foreign ptr. Only works if the stored ptr is copyable. + seastar::future<seastar::foreign_ptr<PtrType>> get_foreign() const noexcept { + assert(!ptr || (ptr && *ptr)); + return ptr ? ptr->copy() : + seastar::make_ready_future<seastar::foreign_ptr<PtrType>>(nullptr); + } + + /// Accesses the wrapped object. + element_type& operator*() const noexcept { + assert(ptr && *ptr); + return **ptr; + } + /// Accesses the wrapped object. + element_type* operator->() const noexcept { + assert(ptr && *ptr); + return &**ptr; + } + + /// Access the raw pointer to the wrapped object. + pointer get() const noexcept { + assert(!ptr || (ptr && *ptr)); + return ptr ? ptr->get() : nullptr; + } + + /// Return the owner-shard of the contained foreign_ptr. + unsigned get_owner_shard() const noexcept { + assert(!ptr || (ptr && *ptr)); + return ptr ? ptr->get_owner_shard() : seastar::this_shard_id(); + } + + /// Checks whether the wrapped pointer is non-null. + operator bool() const noexcept { + assert(!ptr || (ptr && *ptr)); + return static_cast<bool>(ptr); + } + + /// Move-assigns a \c local_shared_foreign_ptr<>. + local_shared_foreign_ptr& operator=(local_shared_foreign_ptr&& other) noexcept { + ptr = std::move(other.ptr); + return *this; + } + + /// Copy-assigns a \c local_shared_foreign_ptr<>. + local_shared_foreign_ptr& operator=(const local_shared_foreign_ptr& other) noexcept { + ptr = other.ptr; + return *this; + } + + /// Reset the containing ptr + void reset() noexcept { + assert(!ptr || (ptr && *ptr)); + ptr = nullptr; + } +}; + +/// Wraps a smart_ptr T in a local_shared_foreign_ptr<>. +template <typename T> +local_shared_foreign_ptr<T> make_local_shared_foreign( + seastar::foreign_ptr<T> &&ptr) { + return local_shared_foreign_ptr<T>(std::move(ptr)); +} + +/// Wraps ptr in a local_shared_foreign_ptr<>. +template <typename T> +local_shared_foreign_ptr<T> make_local_shared_foreign(T &&ptr) { + return make_local_shared_foreign<T>( + ptr ? seastar::make_foreign(std::forward<T>(ptr)) : nullptr); +} + +template <typename T, typename U> +inline bool operator==(const local_shared_foreign_ptr<T> &x, + const local_shared_foreign_ptr<U> &y) { + return x.get() == y.get(); +} + +template <typename T> +inline bool operator==(const local_shared_foreign_ptr<T> &x, std::nullptr_t) { + return x.get() == nullptr; +} + +template <typename T> +inline bool operator==(std::nullptr_t, const local_shared_foreign_ptr<T>& y) { + return nullptr == y.get(); +} + +template <typename T, typename U> +inline bool operator!=(const local_shared_foreign_ptr<T> &x, + const local_shared_foreign_ptr<U> &y) { + return x.get() != y.get(); +} + +template <typename T> +inline bool operator!=(const local_shared_foreign_ptr<T> &x, std::nullptr_t) { + return x.get() != nullptr; +} + +template <typename T> +inline bool operator!=(std::nullptr_t, const local_shared_foreign_ptr<T>& y) { + return nullptr != y.get(); +} + +template <typename T, typename U> +inline bool operator<(const local_shared_foreign_ptr<T> &x, + const local_shared_foreign_ptr<U> &y) { + return x.get() < y.get(); +} + +template <typename T> +inline bool operator<(const local_shared_foreign_ptr<T> &x, std::nullptr_t) { + return x.get() < nullptr; +} + +template <typename T> +inline bool operator<(std::nullptr_t, const local_shared_foreign_ptr<T>& y) { + return nullptr < y.get(); +} + +template <typename T, typename U> +inline bool operator<=(const local_shared_foreign_ptr<T> &x, + const local_shared_foreign_ptr<U> &y) { + return x.get() <= y.get(); +} + +template <typename T> +inline bool operator<=(const local_shared_foreign_ptr<T> &x, std::nullptr_t) { + return x.get() <= nullptr; +} + +template <typename T> +inline bool operator<=(std::nullptr_t, const local_shared_foreign_ptr<T>& y) { + return nullptr <= y.get(); +} + +template <typename T, typename U> +inline bool operator>(const local_shared_foreign_ptr<T> &x, + const local_shared_foreign_ptr<U> &y) { + return x.get() > y.get(); +} + +template <typename T> +inline bool operator>(const local_shared_foreign_ptr<T> &x, std::nullptr_t) { + return x.get() > nullptr; +} + +template <typename T> +inline bool operator>(std::nullptr_t, const local_shared_foreign_ptr<T>& y) { + return nullptr > y.get(); +} + +template <typename T, typename U> +inline bool operator>=(const local_shared_foreign_ptr<T> &x, + const local_shared_foreign_ptr<U> &y) { + return x.get() >= y.get(); +} + +template <typename T> +inline bool operator>=(const local_shared_foreign_ptr<T> &x, std::nullptr_t) { + return x.get() >= nullptr; +} + +template <typename T> +inline bool operator>=(std::nullptr_t, const local_shared_foreign_ptr<T>& y) { + return nullptr >= y.get(); +} + +} + +namespace std { + +template <typename T> +struct hash<crimson::local_shared_foreign_ptr<T>> + : private hash<typename std::pointer_traits<T>::element_type *> { + size_t operator()(const crimson::local_shared_foreign_ptr<T>& p) const { + return hash<typename std::pointer_traits<T>::element_type *>::operator()(p.get()); + } +}; + +} + +namespace seastar { + +template<typename T> +struct is_smart_ptr<crimson::local_shared_foreign_ptr<T>> : std::true_type {}; + +} diff --git a/src/crimson/common/log.cc b/src/crimson/common/log.cc new file mode 100644 index 000000000..cae9f6a7b --- /dev/null +++ b/src/crimson/common/log.cc @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "log.h" + +static std::array<seastar::logger, ceph_subsys_get_num()> loggers{ +#define SUBSYS(name, log_level, gather_level) \ + seastar::logger(#name), +#define DEFAULT_SUBSYS(log_level, gather_level) \ + seastar::logger("none"), + #include "common/subsys.h" +#undef SUBSYS +#undef DEFAULT_SUBSYS +}; + +namespace crimson { +seastar::logger& get_logger(int subsys) { + assert(subsys < ceph_subsys_max); + return loggers[subsys]; +} +} diff --git a/src/crimson/common/log.h b/src/crimson/common/log.h new file mode 100644 index 000000000..27ff550d8 --- /dev/null +++ b/src/crimson/common/log.h @@ -0,0 +1,88 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <fmt/format.h> +#include <seastar/util/log.hh> + +#include "common/subsys_types.h" + +namespace crimson { +seastar::logger& get_logger(int subsys); +static inline seastar::log_level to_log_level(int level) { + if (level < 0) { + return seastar::log_level::error; + } else if (level < 1) { + return seastar::log_level::warn; + } else if (level <= 5) { + return seastar::log_level::info; + } else if (level <= 20) { + return seastar::log_level::debug; + } else { + return seastar::log_level::trace; + } +} +} + +/* Logging convenience macros + * + * The intention here is to standardize prefixing log lines with the function name + * and a context prefix (like the operator<< for the PG). Place + * + * SET_SUBSYS(osd); + * + * at the top of the file to declare the log lines within the file as being (in this case) + * in the osd subsys. At the beginning of each method/function, add + * + * LOG_PREFIX(Class::method_name) + * + * to set the FNAME symbol to Class::method_name. In order to use the log macros + * within lambdas, capture FNAME by value. + * + * Log lines can then be declared using the appropriate macro below. + */ + +#define SET_SUBSYS(subname_) static constexpr auto SOURCE_SUBSYS = ceph_subsys_##subname_ +#define LOCAL_LOGGER crimson::get_logger(SOURCE_SUBSYS) +#define LOGGER(subname_) crimson::get_logger(ceph_subsys_##subname_) +#define LOG_PREFIX(x) constexpr auto FNAME = #x + +#define LOG(level_, MSG, ...) \ + LOCAL_LOGGER.log(level_, "{}: " MSG, FNAME , ##__VA_ARGS__) +#define SUBLOG(subname_, level_, MSG, ...) \ + LOGGER(subname_).log(level_, "{}: " MSG, FNAME , ##__VA_ARGS__) + +#define TRACE(...) LOG(seastar::log_level::trace, __VA_ARGS__) +#define SUBTRACE(subname_, ...) SUBLOG(subname_, seastar::log_level::trace, __VA_ARGS__) + +#define DEBUG(...) LOG(seastar::log_level::debug, __VA_ARGS__) +#define SUBDEBUG(subname_, ...) SUBLOG(subname_, seastar::log_level::debug, __VA_ARGS__) + +#define INFO(...) LOG(seastar::log_level::info, __VA_ARGS__) +#define SUBINFO(subname_, ...) SUBLOG(subname_, seastar::log_level::info, __VA_ARGS__) + +#define WARN(...) LOG(seastar::log_level::warn, __VA_ARGS__) +#define SUBWARN(subname_, ...) SUBLOG(subname_, seastar::log_level::warn, __VA_ARGS__) + +#define ERROR(...) LOG(seastar::log_level::error, __VA_ARGS__) +#define SUBERROR(subname_, ...) SUBLOG(subname_, seastar::log_level::error, __VA_ARGS__) + +// *DPP macros are intended to take DoutPrefixProvider implementations, but anything with +// an operator<< will work as a prefix + +#define SUBLOGDPP(subname_, level_, MSG, dpp, ...) \ + LOGGER(subname_).log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__) +#define SUBTRACEDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::trace, __VA_ARGS__) +#define SUBDEBUGDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::debug, __VA_ARGS__) +#define SUBINFODPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::info, __VA_ARGS__) +#define SUBWARNDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::warn, __VA_ARGS__) +#define SUBERRORDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::error, __VA_ARGS__) + +#define LOGDPP(level_, MSG, dpp, ...) \ + LOCAL_LOGGER.log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__) +#define TRACEDPP(...) LOGDPP(seastar::log_level::trace, __VA_ARGS__) +#define DEBUGDPP(...) LOGDPP(seastar::log_level::debug, __VA_ARGS__) +#define INFODPP(...) LOGDPP(seastar::log_level::info, __VA_ARGS__) +#define WARNDPP(...) LOGDPP(seastar::log_level::warn, __VA_ARGS__) +#define ERRORDPP(...) LOGDPP(seastar::log_level::error, __VA_ARGS__) diff --git a/src/crimson/common/logclient.cc b/src/crimson/common/logclient.cc new file mode 100644 index 000000000..d402ecd19 --- /dev/null +++ b/src/crimson/common/logclient.cc @@ -0,0 +1,364 @@ +#include "crimson/common/logclient.h" +#include <fmt/ranges.h> +#include "include/str_map.h" +#include "messages/MLog.h" +#include "messages/MLogAck.h" +#include "messages/MMonGetVersion.h" +#include "crimson/net/Messenger.h" +#include "crimson/mon/MonClient.h" +#include "mon/MonMap.h" +#include "common/Graylog.h" + +using std::map; +using std::ostream; +using std::ostringstream; +using std::string; +using crimson::common::local_conf; + +namespace { + seastar::logger& logger() + { + return crimson::get_logger(ceph_subsys_monc); + } +} + +//TODO: in order to avoid unnecessary maps declarations and moving around, +// create a named structure containing the maps and return optional +// fit to it. +int parse_log_client_options(CephContext *cct, + map<string,string> &log_to_monitors, + map<string,string> &log_to_syslog, + map<string,string> &log_channels, + map<string,string> &log_prios, + map<string,string> &log_to_graylog, + map<string,string> &log_to_graylog_host, + map<string,string> &log_to_graylog_port, + uuid_d &fsid, + string &host) +{ + ostringstream oss; + + int r = get_conf_str_map_helper( + cct->_conf.get_val<string>("clog_to_monitors"), oss, + &log_to_monitors, CLOG_CONFIG_DEFAULT_KEY); + if (r < 0) { + logger().error("{} error parsing 'clog_to_monitors'", __func__); + return r; + } + + r = get_conf_str_map_helper( + cct->_conf.get_val<string>("clog_to_syslog"), oss, + &log_to_syslog, CLOG_CONFIG_DEFAULT_KEY); + if (r < 0) { + logger().error("{} error parsing 'clog_to_syslog'", __func__); + return r; + } + + r = get_conf_str_map_helper( + cct->_conf.get_val<string>("clog_to_syslog_facility"), oss, + &log_channels, CLOG_CONFIG_DEFAULT_KEY); + if (r < 0) { + logger().error("{} error parsing 'clog_to_syslog_facility'", __func__); + return r; + } + + r = get_conf_str_map_helper( + cct->_conf.get_val<string>("clog_to_syslog_level"), oss, + &log_prios, CLOG_CONFIG_DEFAULT_KEY); + if (r < 0) { + logger().error("{} error parsing 'clog_to_syslog_level'", __func__); + return r; + } + + r = get_conf_str_map_helper( + cct->_conf.get_val<string>("clog_to_graylog"), oss, + &log_to_graylog, CLOG_CONFIG_DEFAULT_KEY); + if (r < 0) { + logger().error("{} error parsing 'clog_to_graylog'", __func__); + return r; + } + + r = get_conf_str_map_helper( + cct->_conf.get_val<string>("clog_to_graylog_host"), oss, + &log_to_graylog_host, CLOG_CONFIG_DEFAULT_KEY); + if (r < 0) { + logger().error("{} error parsing 'clog_to_graylog_host'", __func__); + return r; + } + + r = get_conf_str_map_helper( + cct->_conf.get_val<string>("clog_to_graylog_port"), oss, + &log_to_graylog_port, CLOG_CONFIG_DEFAULT_KEY); + if (r < 0) { + logger().error("{} error parsing 'clog_to_graylog_port'", __func__); + return r; + } + + fsid = cct->_conf.get_val<uuid_d>("fsid"); + host = cct->_conf->host; + return 0; +} + +LogChannel::LogChannel(LogClient *lc, const string &channel) + : parent(lc), log_channel(channel), log_to_syslog(false), + log_to_monitors(false) +{ +} + +LogChannel::LogChannel(LogClient *lc, const string &channel, + const string &facility, const string &prio) + : parent(lc), log_channel(channel), log_prio(prio), + syslog_facility(facility), log_to_syslog(false), + log_to_monitors(false) +{ +} + +LogClient::LogClient(crimson::net::Messenger *m, + logclient_flag_t flags) + : messenger(m), is_mon(flags & FLAG_MON), + last_log_sent(0), last_log(0) +{ +} + +void LogChannel::set_log_to_monitors(bool v) +{ + if (log_to_monitors != v) { + parent->reset(); + log_to_monitors = v; + } +} + +void LogChannel::update_config(map<string,string> &log_to_monitors, + map<string,string> &log_to_syslog, + map<string,string> &log_channels, + map<string,string> &log_prios, + map<string,string> &log_to_graylog, + map<string,string> &log_to_graylog_host, + map<string,string> &log_to_graylog_port, + uuid_d &fsid, + string &host) +{ + logger().debug( + "{} log_to_monitors {} log_to_syslog {} log_channels {} log_prios {}", + __func__, log_to_monitors, log_to_syslog, log_channels, log_prios); + bool to_monitors = (get_str_map_key(log_to_monitors, log_channel, + &CLOG_CONFIG_DEFAULT_KEY) == "true"); + bool to_syslog = (get_str_map_key(log_to_syslog, log_channel, + &CLOG_CONFIG_DEFAULT_KEY) == "true"); + string syslog_facility = get_str_map_key(log_channels, log_channel, + &CLOG_CONFIG_DEFAULT_KEY); + string prio = get_str_map_key(log_prios, log_channel, + &CLOG_CONFIG_DEFAULT_KEY); + bool to_graylog = (get_str_map_key(log_to_graylog, log_channel, + &CLOG_CONFIG_DEFAULT_KEY) == "true"); + string graylog_host = get_str_map_key(log_to_graylog_host, log_channel, + &CLOG_CONFIG_DEFAULT_KEY); + string graylog_port_str = get_str_map_key(log_to_graylog_port, log_channel, + &CLOG_CONFIG_DEFAULT_KEY); + int graylog_port = atoi(graylog_port_str.c_str()); + + set_log_to_monitors(to_monitors); + set_log_to_syslog(to_syslog); + set_syslog_facility(syslog_facility); + set_log_prio(prio); + + if (to_graylog && !graylog) { /* should but isn't */ + graylog = seastar::make_shared<ceph::logging::Graylog>("clog"); + } else if (!to_graylog && graylog) { /* shouldn't but is */ + graylog = nullptr; + } + + if (to_graylog && graylog) { + graylog->set_fsid(fsid); + graylog->set_hostname(host); + } + + if (graylog && (!graylog_host.empty()) && (graylog_port != 0)) { + graylog->set_destination(graylog_host, graylog_port); + } + + logger().debug("{} to_monitors: {} to_syslog: {}" + "syslog_facility: {} prio: {} to_graylog: {} graylog_host: {}" + "graylog_port: {}", __func__, (to_monitors ? "true" : "false"), + (to_syslog ? "true" : "false"), syslog_facility, prio, + (to_graylog ? "true" : "false"), graylog_host, graylog_port); +} + +void LogChannel::do_log(clog_type prio, std::stringstream& ss) +{ + while (!ss.eof()) { + string s; + getline(ss, s); + if (!s.empty()) { + do_log(prio, s); + } + } +} + +void LogChannel::do_log(clog_type prio, const std::string& s) +{ + if (CLOG_ERROR == prio) { + logger().error("log {} : {}", prio, s); + } else { + logger().warn("log {} : {}", prio, s); + } + LogEntry e; + e.stamp = ceph_clock_now(); + e.addrs = parent->get_myaddrs(); + e.name = parent->get_myname(); + e.rank = parent->get_myrank(); + e.prio = prio; + e.msg = s; + e.channel = get_log_channel(); + + // seq and who should be set for syslog/graylog/log_to_mon + // log to monitor? + if (log_to_monitors) { + e.seq = parent->queue(e); + } else { + e.seq = parent->get_next_seq(); + } + + // log to syslog? + if (do_log_to_syslog()) { + logger().warn("{} log to syslog", __func__); + e.log_to_syslog(get_log_prio(), get_syslog_facility()); + } + + // log to graylog? + if (do_log_to_graylog()) { + logger().warn("{} log to graylog", __func__); + graylog->log_log_entry(&e); + } +} + +MessageURef LogClient::get_mon_log_message(log_flushing_t flush_flag) +{ + if (flush_flag == log_flushing_t::FLUSH) { + if (log_queue.empty()) { + return {}; + } + // reset session + last_log_sent = log_queue.front().seq; + } + return _get_mon_log_message(); +} + +bool LogClient::are_pending() const +{ + return last_log > last_log_sent; +} + +MessageURef LogClient::_get_mon_log_message() +{ + if (log_queue.empty()) { + return {}; + } + + // only send entries that haven't been sent yet during this mon + // session! monclient needs to call reset_session() on mon session + // reset for this to work right. + + if (last_log_sent == last_log) { + return {}; + } + + // limit entries per message + const int64_t num_unsent = last_log - last_log_sent; + int64_t num_to_send; + if (local_conf()->mon_client_max_log_entries_per_message > 0) { + num_to_send = std::min(num_unsent, + local_conf()->mon_client_max_log_entries_per_message); + } else { + num_to_send = num_unsent; + } + + logger().debug("log_queue is {} last_log {} sent {} num {} unsent {}" + " sending {}", log_queue.size(), last_log, + last_log_sent, log_queue.size(), num_unsent, num_to_send); + ceph_assert((unsigned)num_unsent <= log_queue.size()); + auto log_iter = log_queue.begin(); + std::deque<LogEntry> out_log_queue; /* will send the logs contained here */ + while (log_iter->seq <= last_log_sent) { + ++log_iter; + ceph_assert(log_iter != log_queue.end()); + } + while (num_to_send--) { + ceph_assert(log_iter != log_queue.end()); + out_log_queue.push_back(*log_iter); + last_log_sent = log_iter->seq; + logger().debug(" will send {}", *log_iter); + ++log_iter; + } + + return crimson::make_message<MLog>(m_fsid, + std::move(out_log_queue)); +} + +version_t LogClient::queue(LogEntry &entry) +{ + entry.seq = ++last_log; + log_queue.push_back(entry); + + return entry.seq; +} + +void LogClient::reset() +{ + if (log_queue.size()) { + log_queue.clear(); + } + last_log_sent = last_log; +} + +uint64_t LogClient::get_next_seq() +{ + return ++last_log; +} + +entity_addrvec_t LogClient::get_myaddrs() const +{ + return messenger->get_myaddrs(); +} + +entity_name_t LogClient::get_myrank() +{ + return messenger->get_myname(); +} + +const EntityName& LogClient::get_myname() const +{ + return local_conf()->name; +} + +seastar::future<> LogClient::handle_log_ack(Ref<MLogAck> m) +{ + logger().debug("handle_log_ack {}", *m); + + version_t last = m->last; + + auto q = log_queue.begin(); + while (q != log_queue.end()) { + const LogEntry &entry(*q); + if (entry.seq > last) + break; + logger().debug(" logged {}", entry); + q = log_queue.erase(q); + } + return seastar::now(); +} + +LogChannelRef LogClient::create_channel(const std::string& name) { + auto it = channels.find(name); + if (it == channels.end()) { + it = channels.insert(it, + {name, seastar::make_lw_shared<LogChannel>(this, name)}); + } + return it->second; +} + +seastar::future<> LogClient::set_fsid(const uuid_d& fsid) { + m_fsid = fsid; + return seastar::now(); +} + diff --git a/src/crimson/common/logclient.h b/src/crimson/common/logclient.h new file mode 100644 index 000000000..ab9b25091 --- /dev/null +++ b/src/crimson/common/logclient.h @@ -0,0 +1,232 @@ +#ifndef CEPH_LOGCLIENT_H +#define CEPH_LOGCLIENT_H + +#include "common/LogEntry.h" +#include "common/ostream_temp.h" +#include "common/ref.h" +#include "include/health.h" +#include "crimson/net/Fwd.h" + +#include <seastar/core/future.hh> +#include <seastar/core/gate.hh> +#include <seastar/core/lowres_clock.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/timer.hh> + +class LogClient; +class MLog; +class MLogAck; +class Message; +struct uuid_d; +struct Connection; + +class LogChannel; + +namespace ceph { +namespace logging { + class Graylog; +} +} + +template<typename Message> using Ref = boost::intrusive_ptr<Message>; +namespace crimson::net { + class Messenger; +} + +enum class log_flushing_t { + NO_FLUSH, + FLUSH +}; + +int parse_log_client_options(CephContext *cct, + std::map<std::string,std::string> &log_to_monitors, + std::map<std::string,std::string> &log_to_syslog, + std::map<std::string,std::string> &log_channels, + std::map<std::string,std::string> &log_prios, + std::map<std::string,std::string> &log_to_graylog, + std::map<std::string,std::string> &log_to_graylog_host, + std::map<std::string,std::string> &log_to_graylog_port, + uuid_d &fsid, + std::string &host); + +/** Manage where we output to and at which priority + * + * Not to be confused with the LogClient, which is the almighty coordinator + * of channels. We just deal with the boring part of the logging: send to + * syslog, send to file, generate LogEntry and queue it for the LogClient. + * + * Past queueing the LogEntry, the LogChannel is done with the whole thing. + * LogClient will deal with sending and handling of LogEntries. + */ +class LogChannel : public LoggerSinkSet +{ +public: + LogChannel(LogClient *lc, const std::string &channel); + LogChannel(LogClient *lc, const std::string &channel, + const std::string &facility, const std::string &prio); + + OstreamTemp debug() { + return OstreamTemp(CLOG_DEBUG, this); + } + void debug(std::stringstream &s) final { + do_log(CLOG_DEBUG, s); + } + /** + * Convenience function mapping health status to + * the appropriate cluster log severity. + */ + OstreamTemp health(health_status_t health) { + switch(health) { + case HEALTH_OK: + return info(); + case HEALTH_WARN: + return warn(); + case HEALTH_ERR: + return error(); + default: + // Invalid health_status_t value + ceph_abort(); + } + } + OstreamTemp info() final { + return OstreamTemp(CLOG_INFO, this); + } + void info(std::stringstream &s) final { + do_log(CLOG_INFO, s); + } + OstreamTemp warn() final { + return OstreamTemp(CLOG_WARN, this); + } + void warn(std::stringstream &s) final { + do_log(CLOG_WARN, s); + } + OstreamTemp error() final { + return OstreamTemp(CLOG_ERROR, this); + } + void error(std::stringstream &s) final { + do_log(CLOG_ERROR, s); + } + OstreamTemp sec() final { + return OstreamTemp(CLOG_SEC, this); + } + void sec(std::stringstream &s) final { + do_log(CLOG_SEC, s); + } + + void set_log_to_monitors(bool v); + void set_log_to_syslog(bool v) { + log_to_syslog = v; + } + void set_log_channel(const std::string& v) { + log_channel = v; + } + void set_log_prio(const std::string& v) { + log_prio = v; + } + void set_syslog_facility(const std::string& v) { + syslog_facility = v; + } + const std::string& get_log_prio() const { return log_prio; } + const std::string& get_log_channel() const { return log_channel; } + const std::string& get_syslog_facility() const { return syslog_facility; } + bool must_log_to_syslog() const { return log_to_syslog; } + /** + * Do we want to log to syslog? + * + * @return true if log_to_syslog is true and both channel and prio + * are not empty; false otherwise. + */ + bool do_log_to_syslog() { + return must_log_to_syslog() && + !log_prio.empty() && !log_channel.empty(); + } + bool must_log_to_monitors() { return log_to_monitors; } + + bool do_log_to_graylog() { + return (graylog != nullptr); + } + + using Ref = seastar::lw_shared_ptr<LogChannel>; + + /** + * update config values from parsed k/v std::map for each config option + * + * Pick out the relevant value based on our channel. + */ + void update_config(std::map<std::string,std::string> &log_to_monitors, + std::map<std::string,std::string> &log_to_syslog, + std::map<std::string,std::string> &log_channels, + std::map<std::string,std::string> &log_prios, + std::map<std::string,std::string> &log_to_graylog, + std::map<std::string,std::string> &log_to_graylog_host, + std::map<std::string,std::string> &log_to_graylog_port, + uuid_d &fsid, + std::string &host); + + void do_log(clog_type prio, std::stringstream& ss) final; + void do_log(clog_type prio, const std::string& s) final; + +private: + LogClient *parent; + std::string log_channel; + std::string log_prio; + std::string syslog_facility; + bool log_to_syslog; + bool log_to_monitors; + seastar::shared_ptr<ceph::logging::Graylog> graylog; +}; + +using LogChannelRef = LogChannel::Ref; + +class LogClient +{ +public: + enum logclient_flag_t { + NO_FLAGS = 0, + FLAG_MON = 0x1, + }; + + LogClient(crimson::net::Messenger *m, logclient_flag_t flags); + + virtual ~LogClient() = default; + + seastar::future<> handle_log_ack(Ref<MLogAck> m); + MessageURef get_mon_log_message(log_flushing_t flush_flag); + bool are_pending() const; + + LogChannelRef create_channel() { + return create_channel(CLOG_CHANNEL_DEFAULT); + } + + LogChannelRef create_channel(const std::string& name); + + void destroy_channel(const std::string& name) { + channels.erase(name); + } + + void shutdown() { + channels.clear(); + } + + uint64_t get_next_seq(); + entity_addrvec_t get_myaddrs() const; + const EntityName& get_myname() const; + entity_name_t get_myrank(); + version_t queue(LogEntry &entry); + void reset(); + seastar::future<> set_fsid(const uuid_d& fsid); + +private: + MessageURef _get_mon_log_message(); + + crimson::net::Messenger *messenger; + bool is_mon; + version_t last_log_sent; + version_t last_log; + std::deque<LogEntry> log_queue; + + std::map<std::string, LogChannelRef> channels; + uuid_d m_fsid; +}; +#endif + diff --git a/src/crimson/common/operation.cc b/src/crimson/common/operation.cc new file mode 100644 index 000000000..53399fb9b --- /dev/null +++ b/src/crimson/common/operation.cc @@ -0,0 +1,75 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "operation.h" + +namespace crimson { + +void Operation::dump(ceph::Formatter* f) const +{ + f->open_object_section("operation"); + f->dump_string("type", get_type_name()); + f->dump_unsigned("id", id); + { + f->open_object_section("detail"); + dump_detail(f); + f->close_section(); + } + f->close_section(); +} + +void Operation::dump_brief(ceph::Formatter* f) const +{ + f->open_object_section("operation"); + f->dump_string("type", get_type_name()); + f->dump_unsigned("id", id); + f->close_section(); +} + +std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) { + lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail="; + rhs.print(lhs); + lhs << ")"; + return lhs; +} + +void Blocker::dump(ceph::Formatter* f) const +{ + f->open_object_section("blocker"); + f->dump_string("op_type", get_type_name()); + { + f->open_object_section("detail"); + dump_detail(f); + f->close_section(); + } + f->close_section(); +} + +namespace detail { +void dump_time_event(const char* name, + const utime_t& timestamp, + ceph::Formatter* f) +{ + assert(f); + f->open_object_section("time_event"); + f->dump_string("name", name); + f->dump_stream("initiated_at") << timestamp; + f->close_section(); +} + +void dump_blocking_event(const char* name, + const utime_t& timestamp, + const Blocker* const blocker, + ceph::Formatter* f) +{ + assert(f); + f->open_object_section("blocking_event"); + f->dump_string("name", name); + f->dump_stream("initiated_at") << timestamp; + if (blocker) { + blocker->dump(f); + } + f->close_section(); +} +} // namespace detail +} // namespace crimson diff --git a/src/crimson/common/operation.h b/src/crimson/common/operation.h new file mode 100644 index 000000000..6df2c99fd --- /dev/null +++ b/src/crimson/common/operation.h @@ -0,0 +1,776 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include <algorithm> +#include <array> +#include <set> +#include <vector> +#include <boost/core/demangle.hpp> +#include <boost/intrusive/list.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include <seastar/core/shared_mutex.hh> +#include <seastar/core/future.hh> +#include <seastar/core/timer.hh> +#include <seastar/core/lowres_clock.hh> +#include <seastar/core/future-util.hh> + +#include "include/ceph_assert.h" +#include "include/utime.h" +#include "common/Clock.h" +#include "common/Formatter.h" +#include "crimson/common/interruptible_future.h" +#include "crimson/common/smp_helpers.h" +#include "crimson/common/log.h" + +namespace ceph { + class Formatter; +} + +namespace crimson { + +using registry_hook_t = boost::intrusive::list_member_hook< + boost::intrusive::link_mode<boost::intrusive::auto_unlink>>; + +class Operation; +class Blocker; + + +namespace detail { +void dump_time_event(const char* name, + const utime_t& timestamp, + ceph::Formatter* f); +void dump_blocking_event(const char* name, + const utime_t& timestamp, + const Blocker* blocker, + ceph::Formatter* f); +} // namespace detail + +/** + * Provides an interface for dumping diagnostic information about + * why a particular op is not making progress. + */ +class Blocker { +public: + void dump(ceph::Formatter *f) const; + virtual ~Blocker() = default; + +private: + virtual void dump_detail(ceph::Formatter *f) const = 0; + virtual const char *get_type_name() const = 0; +}; + +// the main template. by default an operation has no extenral +// event handler (the empty tuple). specializing the template +// allows to define backends on per-operation-type manner. +// NOTE: basically this could be a function but C++ disallows +// differentiating return type among specializations. +template <class T> +struct EventBackendRegistry { + template <typename...> static constexpr bool always_false = false; + + static std::tuple<> get_backends() { + static_assert(always_false<T>, "Registry specialization not found"); + return {}; + } +}; + +template <class T> +struct Event { + T* that() { + return static_cast<T*>(this); + } + const T* that() const { + return static_cast<const T*>(this); + } + + template <class OpT, class... Args> + void trigger(OpT&& op, Args&&... args) { + that()->internal_backend.handle(*that(), + std::forward<OpT>(op), + std::forward<Args>(args)...); + // let's call `handle()` for concrete event type from each single + // of our backends. the order in the registry matters. + std::apply([&, //args=std::forward_as_tuple(std::forward<Args>(args)...), + this] (auto... backend) { + (..., backend.handle(*that(), + std::forward<OpT>(op), + std::forward<Args>(args)...)); + }, EventBackendRegistry<std::decay_t<OpT>>::get_backends()); + } +}; + + +// simplest event type for recording things like beginning or end +// of TrackableOperation's life. +template <class T> +struct TimeEvent : Event<T> { + struct Backend { + // `T` is passed solely to let implementations to discriminate + // basing on the type-of-event. + virtual void handle(T&, const Operation&) = 0; + }; + + // for the sake of dumping ops-in-flight. + struct InternalBackend final : Backend { + void handle(T&, const Operation&) override { + timestamp = ceph_clock_now(); + } + utime_t timestamp; + } internal_backend; + + void dump(ceph::Formatter *f) const { + auto demangled_name = boost::core::demangle(typeid(T).name()); + detail::dump_time_event( + demangled_name.c_str(), + internal_backend.timestamp, f); + } + + auto get_timestamp() const { + return internal_backend.timestamp; + } +}; + + +template <typename T> +class BlockerT : public Blocker { +public: + struct BlockingEvent : Event<typename T::BlockingEvent> { + using Blocker = std::decay_t<T>; + + struct Backend { + // `T` is based solely to let implementations to discriminate + // basing on the type-of-event. + virtual void handle(typename T::BlockingEvent&, const Operation&, const T&) = 0; + }; + + struct InternalBackend : Backend { + void handle(typename T::BlockingEvent&, + const Operation&, + const T& blocker) override { + this->timestamp = ceph_clock_now(); + this->blocker = &blocker; + } + + utime_t timestamp; + const T* blocker; + } internal_backend; + + // we don't want to make any BlockerT to be aware and coupled with + // an operation. to not templatize an entire path from an op to + // a blocker, type erasuring is used. + struct TriggerI { + TriggerI(BlockingEvent& event) : event(event) {} + + template <class FutureT> + auto maybe_record_blocking(FutureT&& fut, const T& blocker) { + if (!fut.available()) { + // a full blown call via vtable. that's the cost for templatization + // avoidance. anyway, most of the things actually have the type + // knowledge. + record_blocking(blocker); + return std::forward<FutureT>(fut).finally( + [&event=this->event, &blocker] () mutable { + // beware trigger instance may be already dead when this + // is executed! + record_unblocking(event, blocker); + }); + } + return std::forward<FutureT>(fut); + } + virtual ~TriggerI() = default; + protected: + // it's for the sake of erasing the OpT type + virtual void record_blocking(const T& blocker) = 0; + + static void record_unblocking(BlockingEvent& event, const T& blocker) { + assert(event.internal_backend.blocker == &blocker); + event.internal_backend.blocker = nullptr; + } + + BlockingEvent& event; + }; + + template <class OpT> + struct Trigger : TriggerI { + Trigger(BlockingEvent& event, const OpT& op) : TriggerI(event), op(op) {} + + template <class FutureT> + auto maybe_record_blocking(FutureT&& fut, const T& blocker) { + if (!fut.available()) { + // no need for the dynamic dispatch! if we're lucky, a compiler + // should collapse all these abstractions into a bunch of movs. + this->Trigger::record_blocking(blocker); + return std::forward<FutureT>(fut).finally( + [&event=this->event, &blocker] () mutable { + Trigger::record_unblocking(event, blocker); + }); + } + return std::forward<FutureT>(fut); + } + + const OpT &get_op() { return op; } + + protected: + void record_blocking(const T& blocker) override { + this->event.trigger(op, blocker); + } + + const OpT& op; + }; + + void dump(ceph::Formatter *f) const { + auto demangled_name = boost::core::demangle(typeid(T).name()); + detail::dump_blocking_event( + demangled_name.c_str(), + internal_backend.timestamp, + internal_backend.blocker, + f); + } + }; + + virtual ~BlockerT() = default; + template <class TriggerT, class... Args> + decltype(auto) track_blocking(TriggerT&& trigger, Args&&... args) { + return std::forward<TriggerT>(trigger).maybe_record_blocking( + std::forward<Args>(args)..., static_cast<const T&>(*this)); + } + +private: + const char *get_type_name() const final { + return static_cast<const T*>(this)->type_name; + } +}; + +template <class T> +struct AggregateBlockingEvent { + struct TriggerI { + protected: + struct TriggerContainerI { + virtual typename T::TriggerI& get_trigger() = 0; + virtual ~TriggerContainerI() = default; + }; + using TriggerContainerIRef = std::unique_ptr<TriggerContainerI>; + virtual TriggerContainerIRef create_part_trigger() = 0; + + public: + template <class FutureT> + auto maybe_record_blocking(FutureT&& fut, + const typename T::Blocker& blocker) { + // AggregateBlockingEvent is supposed to be used on relatively cold + // paths (recovery), so we don't need to worry about the dynamic + // polymothps / dynamic memory's overhead. + auto tcont = create_part_trigger(); + return tcont->get_trigger().maybe_record_blocking( + std::move(fut), blocker + ).finally([tcont=std::move(tcont)] {}); + } + + virtual ~TriggerI() = default; + }; + + template <class OpT> + struct Trigger final : TriggerI { + Trigger(AggregateBlockingEvent& event, const OpT& op) + : event(event), op(op) {} + + class TriggerContainer final : public TriggerI::TriggerContainerI { + AggregateBlockingEvent& event; + typename decltype(event.events)::iterator iter; + typename T::template Trigger<OpT> trigger; + + typename T::TriggerI &get_trigger() final { + return trigger; + } + + public: + TriggerContainer(AggregateBlockingEvent& _event, const OpT& op) : + event(_event), + iter(event.events.emplace(event.events.end())), + trigger(*iter, op) {} + + ~TriggerContainer() final { + event.events.erase(iter); + } + }; + + protected: + typename TriggerI::TriggerContainerIRef create_part_trigger() final { + return std::make_unique<TriggerContainer>(event, op); + } + + private: + AggregateBlockingEvent& event; + const OpT& op; + }; + +private: + std::list<T> events; + template <class OpT> + friend class Trigger; +}; + +/** + * Common base for all crimson-osd operations. Mainly provides + * an interface for registering ops in flight and dumping + * diagnostic information. + */ +class Operation : public boost::intrusive_ref_counter< + Operation, boost::thread_unsafe_counter> { + public: + using id_t = uint64_t; + static constexpr id_t NULL_ID = std::numeric_limits<uint64_t>::max(); + id_t get_id() const { + return id; + } + + static constexpr bool is_trackable = false; + + virtual unsigned get_type() const = 0; + virtual const char *get_type_name() const = 0; + virtual void print(std::ostream &) const = 0; + + void dump(ceph::Formatter *f) const; + void dump_brief(ceph::Formatter *f) const; + virtual ~Operation() = default; + + private: + virtual void dump_detail(ceph::Formatter *f) const = 0; + + registry_hook_t registry_hook; + + id_t id = 0; + void set_id(id_t in_id) { + id = in_id; + } + + friend class OperationRegistryI; + template <size_t> + friend class OperationRegistryT; +}; +using OperationRef = boost::intrusive_ptr<Operation>; + +std::ostream &operator<<(std::ostream &, const Operation &op); + +/** + * Maintains a set of lists of all active ops. + */ +class OperationRegistryI { + using op_list_member_option = boost::intrusive::member_hook< + Operation, + registry_hook_t, + &Operation::registry_hook + >; + + friend class Operation; + seastar::timer<seastar::lowres_clock> shutdown_timer; + seastar::promise<> shutdown; + +protected: + virtual void do_register(Operation *op) = 0; + virtual bool registries_empty() const = 0; + virtual void do_stop() = 0; + +public: + using op_list = boost::intrusive::list< + Operation, + op_list_member_option, + boost::intrusive::constant_time_size<false>>; + + template <typename T, typename... Args> + auto create_operation(Args&&... args) { + boost::intrusive_ptr<T> op = new T(std::forward<Args>(args)...); + do_register(&*op); + return op; + } + + seastar::future<> stop() { + crimson::get_logger(ceph_subsys_osd).info("OperationRegistryI::{}", __func__); + do_stop(); + shutdown_timer.set_callback([this] { + if (registries_empty()) { + shutdown.set_value(); + shutdown_timer.cancel(); + } + }); + shutdown_timer.arm_periodic( + std::chrono::milliseconds(100/*TODO: use option instead*/)); + return shutdown.get_future(); + } +}; + + +template <size_t NUM_REGISTRIES> +class OperationRegistryT : public OperationRegistryI { + Operation::id_t next_id = 0; + std::array< + op_list, + NUM_REGISTRIES + > registries; + +protected: + void do_register(Operation *op) final { + const auto op_type = op->get_type(); + registries[op_type].push_back(*op); + op->set_id(++next_id); + } + + bool registries_empty() const final { + return std::all_of(registries.begin(), + registries.end(), + [](auto& opl) { + return opl.empty(); + }); + } + +protected: + OperationRegistryT(core_id_t core) + // Use core to initialize upper 8 bits of counters to ensure that + // ids generated by different cores are disjoint + : next_id(static_cast<id_t>(core) << + (std::numeric_limits<id_t>::digits - 8)) + {} + + template <size_t REGISTRY_INDEX> + const op_list& get_registry() const { + static_assert( + REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value); + return registries[REGISTRY_INDEX]; + } + + template <size_t REGISTRY_INDEX> + op_list& get_registry() { + static_assert( + REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value); + return registries[REGISTRY_INDEX]; + } + +public: + /// Iterate over live ops + template <typename F> + void for_each_op(F &&f) const { + for (const auto ®istry: registries) { + for (const auto &op: registry) { + std::invoke(f, op); + } + } + } + + /// Removes op from registry + void remove_from_registry(Operation &op) { + const auto op_type = op.get_type(); + registries[op_type].erase(op_list::s_iterator_to(op)); + } + + /// Adds op to registry + void add_to_registry(Operation &op) { + const auto op_type = op.get_type(); + registries[op_type].push_back(op); + } +}; + +class PipelineExitBarrierI { +public: + using Ref = std::unique_ptr<PipelineExitBarrierI>; + + /// Waits for exit barrier + virtual std::optional<seastar::future<>> wait() = 0; + + /// Releases pipeline stage, can only be called after wait + virtual void exit() = 0; + + /// Releases pipeline resources without waiting on barrier + virtual void cancel() = 0; + + /// Must ensure that resources are released, likely by calling cancel() + virtual ~PipelineExitBarrierI() {} +}; + +template <class T> +class PipelineStageIT : public BlockerT<T> { + const core_id_t core = seastar::this_shard_id(); +public: + core_id_t get_core() const { return core; } + + template <class... Args> + decltype(auto) enter(Args&&... args) { + return static_cast<T*>(this)->enter(std::forward<Args>(args)...); + } +}; + +class PipelineHandle { + PipelineExitBarrierI::Ref barrier; + + std::optional<seastar::future<>> wait_barrier() { + return barrier ? barrier->wait() : std::nullopt; + } + +public: + PipelineHandle() = default; + + PipelineHandle(const PipelineHandle&) = delete; + PipelineHandle(PipelineHandle&&) = default; + PipelineHandle &operator=(const PipelineHandle&) = delete; + PipelineHandle &operator=(PipelineHandle&&) = default; + + /** + * Returns a future which unblocks when the handle has entered the passed + * OrderedPipelinePhase. If already in a phase, enter will also release + * that phase after placing itself in the queue for the next one to preserve + * ordering. + */ + template <typename OpT, typename T> + seastar::future<> + enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) { + ceph_assert(stage.get_core() == seastar::this_shard_id()); + auto wait_fut = wait_barrier(); + if (wait_fut.has_value()) { + return wait_fut.value().then([this, &stage, t=std::move(t)] () mutable { + auto fut = t.maybe_record_blocking(stage.enter(t), stage); + exit(); + return std::move(fut).then( + [this, t=std::move(t)](auto &&barrier_ref) mutable { + barrier = std::move(barrier_ref); + return seastar::now(); + }); + }); + } else { + auto fut = t.maybe_record_blocking(stage.enter(t), stage); + exit(); + return std::move(fut).then( + [this, t=std::move(t)](auto &&barrier_ref) mutable { + barrier = std::move(barrier_ref); + return seastar::now(); + }); + } + } + + /** + * Completes pending exit barrier without entering a new one. + */ + seastar::future<> complete() { + auto ret = wait_barrier(); + barrier.reset(); + return ret ? std::move(ret.value()) : seastar::now(); + } + + /** + * Exits current phase, skips exit barrier, should only be used for op + * failure. Permitting the handle to be destructed as the same effect. + */ + void exit() { + barrier.reset(); + } + +}; + +/** + * Ensures that at most one op may consider itself in the phase at a time. + * Ops will see enter() unblock in the order in which they tried to enter + * the phase. entering (though not necessarily waiting for the future to + * resolve) a new phase prior to exiting the previous one will ensure that + * the op ordering is preserved. + */ +template <class T> +class OrderedExclusivePhaseT : public PipelineStageIT<T> { + void dump_detail(ceph::Formatter *f) const final { + f->dump_unsigned("waiting", waiting); + if (held_by != Operation::NULL_ID) { + f->dump_unsigned("held_by_operation_id", held_by); + } + } + + class ExitBarrier final : public PipelineExitBarrierI { + OrderedExclusivePhaseT *phase; + Operation::id_t op_id; + public: + ExitBarrier(OrderedExclusivePhaseT *phase, Operation::id_t id) + : phase(phase), op_id(id) {} + + std::optional<seastar::future<>> wait() final { + return std::nullopt; + } + + void exit() final { + if (phase) { + auto *p = phase; + auto id = op_id; + phase = nullptr; + std::ignore = seastar::smp::submit_to( + p->get_core(), + [p, id] { + p->exit(id); + }); + } + } + + void cancel() final { + exit(); + } + + ~ExitBarrier() final { + cancel(); + } + }; + + void exit(Operation::id_t op_id) { + clear_held_by(op_id); + mutex.unlock(); + } + +public: + template <class TriggerT> + seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) { + waiting++; + return mutex.lock().then([this, op_id=t.get_op().get_id()] { + ceph_assert_always(waiting > 0); + --waiting; + set_held_by(op_id); + return PipelineExitBarrierI::Ref(new ExitBarrier{this, op_id}); + }); + } + +private: + void set_held_by(Operation::id_t id) { + ceph_assert_always(held_by == Operation::NULL_ID); + held_by = id; + } + + void clear_held_by(Operation::id_t id) { + ceph_assert_always(held_by == id); + held_by = Operation::NULL_ID; + } + + unsigned waiting = 0; + seastar::shared_mutex mutex; + Operation::id_t held_by = Operation::NULL_ID; +}; + +/** + * Permits multiple ops to inhabit the stage concurrently, but ensures that + * they will proceed to the next stage in the order in which they called + * enter. + */ +template <class T> +class OrderedConcurrentPhaseT : public PipelineStageIT<T> { + using base_t = PipelineStageIT<T>; +public: + struct BlockingEvent : base_t::BlockingEvent { + using base_t::BlockingEvent::BlockingEvent; + + struct ExitBarrierEvent : TimeEvent<ExitBarrierEvent> {}; + + template <class OpT> + struct Trigger : base_t::BlockingEvent::template Trigger<OpT> { + using base_t::BlockingEvent::template Trigger<OpT>::Trigger; + + template <class FutureT> + decltype(auto) maybe_record_exit_barrier(FutureT&& fut) { + if (!fut.available()) { + exit_barrier_event.trigger(this->op); + } + return std::forward<FutureT>(fut); + } + + ExitBarrierEvent exit_barrier_event; + }; + }; + +private: + void dump_detail(ceph::Formatter *f) const final {} + + template <class TriggerT> + class ExitBarrier final : public PipelineExitBarrierI { + OrderedConcurrentPhaseT *phase; + std::optional<seastar::future<>> barrier; + TriggerT trigger; + public: + ExitBarrier( + OrderedConcurrentPhaseT *phase, + seastar::future<> &&barrier, + TriggerT& trigger) : phase(phase), barrier(std::move(barrier)), trigger(trigger) {} + + std::optional<seastar::future<>> wait() final { + assert(phase); + assert(barrier); + auto ret = std::move(*barrier); + barrier = std::nullopt; + return trigger.maybe_record_exit_barrier(std::move(ret)); + } + + void exit() final { + if (barrier) { + static_cast<void>( + std::move(*barrier).then([phase=this->phase] { phase->mutex.unlock(); })); + barrier = std::nullopt; + phase = nullptr; + } + if (phase) { + std::ignore = seastar::smp::submit_to( + phase->get_core(), + [this] { + phase->mutex.unlock(); + phase = nullptr; + }); + } + } + + void cancel() final { + exit(); + } + + ~ExitBarrier() final { + cancel(); + } + }; + +public: + template <class TriggerT> + seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) { + return seastar::make_ready_future<PipelineExitBarrierI::Ref>( + new ExitBarrier<TriggerT>{this, mutex.lock(), t}); + } + +private: + seastar::shared_mutex mutex; +}; + +/** + * Imposes no ordering or exclusivity at all. Ops enter without constraint and + * may exit in any order. Useful mainly for informational purposes between + * stages with constraints. + */ +template <class T> +class UnorderedStageT : public PipelineStageIT<T> { + void dump_detail(ceph::Formatter *f) const final {} + + class ExitBarrier final : public PipelineExitBarrierI { + public: + ExitBarrier() = default; + + std::optional<seastar::future<>> wait() final { + return std::nullopt; + } + + void exit() final {} + + void cancel() final {} + + ~ExitBarrier() final {} + }; + +public: + template <class... IgnoreArgs> + seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) { + return seastar::make_ready_future<PipelineExitBarrierI::Ref>( + new ExitBarrier); + } +}; + +} + +#if FMT_VERSION >= 90000 +template <> struct fmt::formatter<crimson::Operation> : fmt::ostream_formatter {}; +#endif diff --git a/src/crimson/common/perf_counters_collection.cc b/src/crimson/common/perf_counters_collection.cc new file mode 100644 index 000000000..254d85278 --- /dev/null +++ b/src/crimson/common/perf_counters_collection.cc @@ -0,0 +1,41 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_context.h" +#include "perf_counters_collection.h" + +namespace crimson::common { +PerfCountersCollection::PerfCountersCollection() +{ + perf_collection = std::make_unique<PerfCountersCollectionImpl>(); +} +PerfCountersCollection::~PerfCountersCollection() +{ + perf_collection->clear(); +} + +PerfCountersCollectionImpl* PerfCountersCollection:: get_perf_collection() +{ + return perf_collection.get(); +} + +void PerfCountersCollection::dump_formatted(ceph::Formatter *f, bool schema, + bool dump_labeled, + const std::string &logger, + const std::string &counter) +{ + perf_collection->dump_formatted(f, schema, dump_labeled, logger, counter); +} + +PerfCountersCollection::ShardedPerfCountersCollection PerfCountersCollection::sharded_perf_coll; + +void PerfCountersDeleter::operator()(PerfCounters* p) noexcept +{ + if (cct) { + cct->get_perfcounters_collection()->remove(p); + } + delete p; +} + +} + diff --git a/src/crimson/common/perf_counters_collection.h b/src/crimson/common/perf_counters_collection.h new file mode 100644 index 000000000..ae0c8670c --- /dev/null +++ b/src/crimson/common/perf_counters_collection.h @@ -0,0 +1,49 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "common/perf_counters.h" +#include "include/common_fwd.h" +#include <seastar/core/sharded.hh> + +using crimson::common::PerfCountersCollectionImpl; +namespace crimson::common { +class PerfCountersCollection: public seastar::sharded<PerfCountersCollection> +{ + using ShardedPerfCountersCollection = seastar::sharded<PerfCountersCollection>; + +private: + std::unique_ptr<PerfCountersCollectionImpl> perf_collection; + static ShardedPerfCountersCollection sharded_perf_coll; + friend PerfCountersCollection& local_perf_coll(); + friend ShardedPerfCountersCollection& sharded_perf_coll(); + +public: + PerfCountersCollection(); + ~PerfCountersCollection(); + PerfCountersCollectionImpl* get_perf_collection(); + void dump_formatted(ceph::Formatter *f, bool schema, bool dump_labeled, + const std::string &logger = "", + const std::string &counter = ""); +}; + +inline PerfCountersCollection::ShardedPerfCountersCollection& sharded_perf_coll(){ + return PerfCountersCollection::sharded_perf_coll; +} + +inline PerfCountersCollection& local_perf_coll() { + return PerfCountersCollection::sharded_perf_coll.local(); +} + +class PerfCountersDeleter { + CephContext* cct; + +public: + PerfCountersDeleter() noexcept : cct(nullptr) {} + PerfCountersDeleter(CephContext* cct) noexcept : cct(cct) {} + void operator()(PerfCounters* p) noexcept; +}; +} +using PerfCountersRef = std::unique_ptr<crimson::common::PerfCounters, crimson::common::PerfCountersDeleter>; + diff --git a/src/crimson/common/shared_lru.h b/src/crimson/common/shared_lru.h new file mode 100644 index 000000000..186f02a61 --- /dev/null +++ b/src/crimson/common/shared_lru.h @@ -0,0 +1,180 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <memory> +#include <optional> +#include <boost/smart_ptr/local_shared_ptr.hpp> +#include <boost/smart_ptr/weak_ptr.hpp> +#include "simple_lru.h" + +/// SharedLRU does its best to cache objects. It not only tracks the objects +/// in its LRU cache with strong references, it also tracks objects with +/// weak_ptr even if the cache does not hold any strong references to them. so +/// that it can return the objects after they are evicted, as long as they've +/// ever been cached and have not been destroyed yet. +template<class K, class V> +class SharedLRU { + using shared_ptr_t = boost::local_shared_ptr<V>; + using weak_ptr_t = boost::weak_ptr<V>; + using value_type = std::pair<K, shared_ptr_t>; + + // weak_refs is already ordered, and we don't use accessors like + // LRUCache::lower_bound(), so unordered LRUCache would suffice. + SimpleLRU<K, shared_ptr_t, false> cache; + std::map<K, std::pair<weak_ptr_t, V*>> weak_refs; + + struct Deleter { + SharedLRU<K,V>* cache; + const K key; + void operator()(V* ptr) { + cache->_erase_weak(key); + delete ptr; + } + }; + void _erase_weak(const K& key) { + weak_refs.erase(key); + } +public: + SharedLRU(size_t max_size = 20) + : cache{max_size} + {} + ~SharedLRU() { + cache.clear(); + // initially, we were assuming that no pointer obtained from SharedLRU + // can outlive the lru itself. However, since going with the interruption + // concept for handling shutdowns, this is no longer valid. + weak_refs.clear(); + } + /** + * Returns a reference to the given key, and perform an insertion if such + * key does not already exist + */ + shared_ptr_t operator[](const K& key); + /** + * Returns true iff there are no live references left to anything that has been + * in the cache. + */ + bool empty() const { + return weak_refs.empty(); + } + size_t size() const { + return cache.size(); + } + size_t capacity() const { + return cache.capacity(); + } + /*** + * Inserts a key if not present, or bumps it to the front of the LRU if + * it is, and then gives you a reference to the value. If the key already + * existed, you are responsible for deleting the new value you tried to + * insert. + * + * @param key The key to insert + * @param value The value that goes with the key + * @param existed Set to true if the value was already in the + * map, false otherwise + * @return A reference to the map's value for the given key + */ + shared_ptr_t insert(const K& key, std::unique_ptr<V> value); + // clear all strong reference from the lru. + void clear() { + cache.clear(); + } + shared_ptr_t find(const K& key); + // return the last element that is not greater than key + shared_ptr_t lower_bound(const K& key); + // return the first element that is greater than key + std::optional<value_type> upper_bound(const K& key); + + void erase(const K& key) { + cache.erase(key); + _erase_weak(key); + } +}; + +template<class K, class V> +typename SharedLRU<K,V>::shared_ptr_t +SharedLRU<K,V>::insert(const K& key, std::unique_ptr<V> value) +{ + shared_ptr_t val; + if (auto found = weak_refs.find(key); found != weak_refs.end()) { + val = found->second.first.lock(); + } + if (!val) { + val.reset(value.release(), Deleter{this, key}); + weak_refs.emplace(key, std::make_pair(val, val.get())); + } + cache.insert(key, val); + return val; +} + +template<class K, class V> +typename SharedLRU<K,V>::shared_ptr_t +SharedLRU<K,V>::operator[](const K& key) +{ + if (auto found = cache.find(key); found) { + return *found; + } + shared_ptr_t val; + if (auto found = weak_refs.find(key); found != weak_refs.end()) { + val = found->second.first.lock(); + } + if (!val) { + val.reset(new V{}, Deleter{this, key}); + weak_refs.emplace(key, std::make_pair(val, val.get())); + } + cache.insert(key, val); + return val; +} + +template<class K, class V> +typename SharedLRU<K,V>::shared_ptr_t +SharedLRU<K,V>::find(const K& key) +{ + if (auto found = cache.find(key); found) { + return *found; + } + shared_ptr_t val; + if (auto found = weak_refs.find(key); found != weak_refs.end()) { + val = found->second.first.lock(); + } + if (val) { + cache.insert(key, val); + } + return val; +} + +template<class K, class V> +typename SharedLRU<K,V>::shared_ptr_t +SharedLRU<K,V>::lower_bound(const K& key) +{ + if (weak_refs.empty()) { + return {}; + } + auto found = weak_refs.lower_bound(key); + if (found == weak_refs.end()) { + --found; + } + if (auto val = found->second.first.lock(); val) { + cache.insert(key, val); + return val; + } else { + return {}; + } +} + +template<class K, class V> +std::optional<typename SharedLRU<K,V>::value_type> +SharedLRU<K,V>::upper_bound(const K& key) +{ + for (auto found = weak_refs.upper_bound(key); + found != weak_refs.end(); + ++found) { + if (auto val = found->second.first.lock(); val) { + return std::make_pair(found->first, val); + } + } + return std::nullopt; +} diff --git a/src/crimson/common/simple_lru.h b/src/crimson/common/simple_lru.h new file mode 100644 index 000000000..1419c4885 --- /dev/null +++ b/src/crimson/common/simple_lru.h @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <list> +#include <map> +#include <optional> +#include <type_traits> +#include <unordered_map> + +template <class Key, class Value, bool Ordered> +class SimpleLRU { + static_assert(std::is_default_constructible_v<Value>); + using list_type = std::list<Key>; + template<class K, class V> + using map_t = std::conditional_t<Ordered, + std::map<K, V>, + std::unordered_map<K, V>>; + using map_type = map_t<Key, std::pair<Value, typename list_type::iterator>>; + list_type lru; + map_type cache; + const size_t max_size; + +public: + SimpleLRU(size_t size = 20) + : cache(size), + max_size(size) + {} + size_t size() const { + return cache.size(); + } + size_t capacity() const { + return max_size; + } + using insert_return_type = std::pair<Value, bool>; + insert_return_type insert(const Key& key, Value value); + std::optional<Value> find(const Key& key); + std::optional<std::enable_if<Ordered, Value>> lower_bound(const Key& key); + void erase(const Key& key); + void clear(); +private: + // bump the item to the front of the lru list + Value _lru_add(typename map_type::iterator found); + // evict the last element of most recently used list + void _evict(); +}; + +template <class Key, class Value, bool Ordered> +typename SimpleLRU<Key,Value,Ordered>::insert_return_type +SimpleLRU<Key,Value,Ordered>::insert(const Key& key, Value value) +{ + if constexpr(Ordered) { + auto found = cache.lower_bound(key); + if (found != cache.end() && found->first == key) { + // already exists + return {found->second.first, true}; + } else { + if (size() >= capacity()) { + _evict(); + } + lru.push_front(key); + // use lower_bound as hint to save the lookup + cache.emplace_hint(found, key, std::make_pair(value, lru.begin())); + return {std::move(value), false}; + } + } else { + // cache is not ordered + auto found = cache.find(key); + if (found != cache.end()) { + // already exists + return {found->second.first, true}; + } else { + if (size() >= capacity()) { + _evict(); + } + lru.push_front(key); + cache.emplace(key, std::make_pair(value, lru.begin())); + return {std::move(value), false}; + } + } +} + +template <class Key, class Value, bool Ordered> +std::optional<Value> SimpleLRU<Key,Value,Ordered>::find(const Key& key) +{ + if (auto found = cache.find(key); found != cache.end()){ + return _lru_add(found); + } else { + return {}; + } +} + +template <class Key, class Value, bool Ordered> +std::optional<std::enable_if<Ordered, Value>> +SimpleLRU<Key,Value,Ordered>::lower_bound(const Key& key) +{ + if (auto found = cache.lower_bound(key); found != cache.end()) { + return _lru_add(found); + } else { + return {}; + } +} + +template <class Key, class Value, bool Ordered> +void SimpleLRU<Key,Value,Ordered>::clear() +{ + lru.clear(); + cache.clear(); +} + +template <class Key, class Value, bool Ordered> +void SimpleLRU<Key,Value,Ordered>::erase(const Key& key) +{ + if (auto found = cache.find(key); found != cache.end()) { + lru.erase(found->second.second); + cache.erase(found); + } +} + +template <class Key, class Value, bool Ordered> +Value SimpleLRU<Key,Value,Ordered>::_lru_add( + typename SimpleLRU<Key,Value,Ordered>::map_type::iterator found) +{ + auto& [value, in_lru] = found->second; + if (in_lru != lru.begin()){ + // move item to the front + lru.splice(lru.begin(), lru, in_lru); + } + // the item is already at the front + return value; +} + +template <class Key, class Value, bool Ordered> +void SimpleLRU<Key,Value,Ordered>::_evict() +{ + // evict the last element of most recently used list + auto last = --lru.end(); + cache.erase(*last); + lru.erase(last); +} diff --git a/src/crimson/common/smp_helpers.h b/src/crimson/common/smp_helpers.h new file mode 100644 index 000000000..c2b7bd964 --- /dev/null +++ b/src/crimson/common/smp_helpers.h @@ -0,0 +1,92 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <limits> + +#include <seastar/core/smp.hh> + +#include "crimson/common/errorator.h" +#include "crimson/common/utility.h" + +namespace crimson { + +using core_id_t = seastar::shard_id; +static constexpr core_id_t NULL_CORE = std::numeric_limits<core_id_t>::max(); + +auto submit_to(core_id_t core, auto &&f) { + using ret_type = decltype(f()); + if constexpr (is_errorated_future_v<ret_type>) { + auto ret = seastar::smp::submit_to( + core, + [f=std::move(f)]() mutable { + return f().to_base(); + }); + return ret_type(std::move(ret)); + } else { + return seastar::smp::submit_to(core, std::move(f)); + } +} + +template <typename Obj, typename Method, typename... Args> +auto proxy_method_on_core( + core_id_t core, Obj &obj, Method method, Args&&... args) { + return crimson::submit_to( + core, + [&obj, method, + arg_tuple=std::make_tuple(std::forward<Args>(args)...)]() mutable { + return apply_method_to_tuple(obj, method, std::move(arg_tuple)); + }); +} + +/** + * reactor_map_seq + * + * Invokes f on each reactor sequentially, Caller may assume that + * f will not be invoked concurrently on multiple cores. + */ +template <typename F> +auto reactor_map_seq(F &&f) { + using ret_type = decltype(f()); + if constexpr (is_errorated_future_v<ret_type>) { + auto ret = crimson::do_for_each( + seastar::smp::all_cpus().begin(), + seastar::smp::all_cpus().end(), + [f=std::move(f)](auto core) mutable { + return seastar::smp::submit_to( + core, + [&f] { + return std::invoke(f); + }); + }); + return ret_type(ret); + } else { + return seastar::do_for_each( + seastar::smp::all_cpus().begin(), + seastar::smp::all_cpus().end(), + [f=std::move(f)](auto core) mutable { + return seastar::smp::submit_to( + core, + [&f] { + return std::invoke(f); + }); + }); + } +} + +/** + * sharded_map_seq + * + * Invokes f on each shard of t sequentially. Caller may assume that + * f will not be invoked concurrently on multiple cores. + */ +template <typename T, typename F> +auto sharded_map_seq(T &t, F &&f) { + return reactor_map_seq( + [&t, f=std::forward<F>(f)]() mutable { + return std::invoke(f, t.local()); + }); +} + +} diff --git a/src/crimson/common/throttle.cc b/src/crimson/common/throttle.cc new file mode 100644 index 000000000..88d1859f3 --- /dev/null +++ b/src/crimson/common/throttle.cc @@ -0,0 +1,64 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include "throttle.h" + +namespace crimson::common { + +int64_t Throttle::take(int64_t c) +{ + if (max == 0u) { + return 0; + } + count += c; + return count; +} + +int64_t Throttle::put(int64_t c) +{ + if (max == 0u) { + return 0; + } + if (!c) { + return count; + } + on_free_slots.signal(); + count -= c; + return count; +} + +seastar::future<> Throttle::get(size_t c) +{ + if (max == 0u) { + return seastar::make_ready_future<>(); + } + pending++; + return on_free_slots.wait([this, c] { + return !_should_wait(c); + }).then([this, c] { + pending--; + count += c; + return seastar::make_ready_future<>(); + }); +} + +void Throttle::reset_max(size_t m) { + if (max == m) { + return; + } + + if (m > max) { + on_free_slots.signal(); + } + max = m; +} + +bool Throttle::_should_wait(size_t c) const { + if (!max) { + return false; + } + return ((c <= max && count + c > max) || // normally stay under max + (c >= max && count > max)); // except for large c +} + +} // namespace crimson::common diff --git a/src/crimson/common/throttle.h b/src/crimson/common/throttle.h new file mode 100644 index 000000000..2998cb5f8 --- /dev/null +++ b/src/crimson/common/throttle.h @@ -0,0 +1,43 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/condition-variable.hh> +// pull seastar::timer<...>::timer definitions. FIX SEASTAR or reactor.hh +// is obligatory and should be included everywhere? +#include <seastar/core/reactor.hh> + +#include "common/ThrottleInterface.h" + +namespace crimson::common { + +class Throttle final : public ThrottleInterface { + size_t max = 0; + size_t count = 0; + size_t pending = 0; + // we cannot change the "count" of seastar::semaphore after it is created, + // so use condition_variable instead. + seastar::condition_variable on_free_slots; +public: + explicit Throttle(size_t m) + : max(m) + {} + int64_t take(int64_t c = 1) override; + int64_t put(int64_t c = 1) override; + seastar::future<> get(size_t c); + size_t get_current() const { + return count; + } + size_t get_max() const { + return max; + } + size_t get_pending() const { + return pending; + } + void reset_max(size_t m); +private: + bool _should_wait(size_t c) const; +}; + +} // namespace crimson::common diff --git a/src/crimson/common/tmap_helpers.cc b/src/crimson/common/tmap_helpers.cc new file mode 100644 index 000000000..9c14ebc45 --- /dev/null +++ b/src/crimson/common/tmap_helpers.cc @@ -0,0 +1,131 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/common/tmap_helpers.h" + +#include "include/buffer.h" +#include "include/encoding.h" +#include "include/rados.h" + +namespace detail { + +#define decode_or_return(v, bp) \ + try { \ + ::decode(v, bp); \ + } catch (...) { \ + return -EINVAL; \ + } + +class TMapContents { + std::map<std::string, bufferlist> keys; + bufferlist header; +public: + TMapContents() = default; + + int decode(bufferlist::const_iterator &bliter) { + keys.clear(); + header.clear(); + if (bliter.end()) { + return 0; + } + decode_or_return(header, bliter); + __u32 num_keys; + decode_or_return(num_keys, bliter); + for (; num_keys > 0; --num_keys) { + std::string key; + decode_or_return(key, bliter); + decode_or_return(keys[key], bliter); + } + return 0; + } + + bufferlist encode() { + bufferlist bl; + ::encode(header, bl); + ::encode(static_cast<__u32>(keys.size()), bl); + for (auto &[k, v]: keys) { + ::encode(k, bl); + ::encode(v, bl); + } + return bl; + } + + int update(bufferlist::const_iterator in) { + while (!in.end()) { + __u8 op; + decode_or_return(op, in); + + if (op == CEPH_OSD_TMAP_HDR) { + decode_or_return(header, in); + continue; + } + + std::string key; + decode_or_return(key, in); + + switch (op) { + case CEPH_OSD_TMAP_SET: { + decode_or_return(keys[key], in); + break; + } + case CEPH_OSD_TMAP_CREATE: { + if (keys.contains(key)) { + return -EEXIST; + } + decode_or_return(keys[key], in); + break; + } + case CEPH_OSD_TMAP_RM: { + auto kiter = keys.find(key); + if (kiter == keys.end()) { + return -ENOENT; + } + keys.erase(kiter); + break; + } + case CEPH_OSD_TMAP_RMSLOPPY: { + keys.erase(key); + break; + } + } + } + return 0; + } + + int put(bufferlist::const_iterator in) { + return 0; + } +}; + +} + +namespace crimson::common { + +using do_tmap_up_ret = tl::expected<bufferlist, int>; +do_tmap_up_ret do_tmap_up(bufferlist::const_iterator in, bufferlist contents) +{ + detail::TMapContents tmap; + auto bliter = contents.cbegin(); + int r = tmap.decode(bliter); + if (r < 0) { + return tl::unexpected(r); + } + r = tmap.update(in); + if (r < 0) { + return tl::unexpected(r); + } + return tmap.encode(); +} + +using do_tmap_up_ret = tl::expected<bufferlist, int>; +do_tmap_up_ret do_tmap_put(bufferlist::const_iterator in) +{ + detail::TMapContents tmap; + int r = tmap.decode(in); + if (r < 0) { + return tl::unexpected(r); + } + return tmap.encode(); +} + +} diff --git a/src/crimson/common/tmap_helpers.h b/src/crimson/common/tmap_helpers.h new file mode 100644 index 000000000..446dbea2a --- /dev/null +++ b/src/crimson/common/tmap_helpers.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "include/expected.hpp" + +#include "include/buffer.h" +#include "include/encoding.h" + +namespace crimson::common { + +/** + * do_tmap_up + * + * Performs tmap update instructions encoded in buffer referenced by in. + * + * @param [in] in iterator to buffer containing encoded tmap update operations + * @param [in] contents current contents of object + * @return buffer containing new object contents, + * -EINVAL for decoding errors, + * -EEXIST for CEPH_OSD_TMAP_CREATE on a key that exists + * -ENOENT for CEPH_OSD_TMAP_RM on a key that does not exist + */ +using do_tmap_up_ret = tl::expected<bufferlist, int>; +do_tmap_up_ret do_tmap_up(bufferlist::const_iterator in, bufferlist contents); + +/** + * do_tmap_put + * + * Validates passed buffer pointed to by in and returns resulting object buffer. + * + * @param [in] in iterator to buffer containing tmap encoding + * @return buffer containing validated tmap encoded by in + * -EINVAL for decoding errors, + */ +using do_tmap_up_ret = tl::expected<bufferlist, int>; +do_tmap_up_ret do_tmap_put(bufferlist::const_iterator in); + +} diff --git a/src/crimson/common/tri_mutex.cc b/src/crimson/common/tri_mutex.cc new file mode 100644 index 000000000..e4b181280 --- /dev/null +++ b/src/crimson/common/tri_mutex.cc @@ -0,0 +1,225 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include "tri_mutex.h" + +seastar::future<> read_lock::lock() +{ + return static_cast<tri_mutex*>(this)->lock_for_read(); +} + +void read_lock::unlock() +{ + static_cast<tri_mutex*>(this)->unlock_for_read(); +} + +seastar::future<> write_lock::lock() +{ + return static_cast<tri_mutex*>(this)->lock_for_write(false); +} + +void write_lock::unlock() +{ + static_cast<tri_mutex*>(this)->unlock_for_write(); +} + +seastar::future<> excl_lock::lock() +{ + return static_cast<tri_mutex*>(this)->lock_for_excl(); +} + +void excl_lock::unlock() +{ + static_cast<tri_mutex*>(this)->unlock_for_excl(); +} + +seastar::future<> excl_lock_from_read::lock() +{ + static_cast<tri_mutex*>(this)->promote_from_read(); + return seastar::make_ready_future<>(); +} + +void excl_lock_from_read::unlock() +{ + static_cast<tri_mutex*>(this)->demote_to_read(); +} + +seastar::future<> excl_lock_from_write::lock() +{ + static_cast<tri_mutex*>(this)->promote_from_write(); + return seastar::make_ready_future<>(); +} + +void excl_lock_from_write::unlock() +{ + static_cast<tri_mutex*>(this)->demote_to_write(); +} + +seastar::future<> excl_lock_from_excl::lock() +{ + return seastar::make_ready_future<>(); +} + +void excl_lock_from_excl::unlock() +{ +} + +tri_mutex::~tri_mutex() +{ + assert(!is_acquired()); +} + +seastar::future<> tri_mutex::lock_for_read() +{ + if (try_lock_for_read()) { + return seastar::make_ready_future<>(); + } + waiters.emplace_back(seastar::promise<>(), type_t::read); + return waiters.back().pr.get_future(); +} + +bool tri_mutex::try_lock_for_read() noexcept +{ + if (!writers && !exclusively_used && waiters.empty()) { + ++readers; + return true; + } else { + return false; + } +} + +void tri_mutex::unlock_for_read() +{ + assert(readers > 0); + if (--readers == 0) { + wake(); + } +} + +void tri_mutex::promote_from_read() +{ + assert(readers == 1); + --readers; + exclusively_used = true; +} + +void tri_mutex::demote_to_read() +{ + assert(exclusively_used); + exclusively_used = false; + ++readers; +} + +seastar::future<> tri_mutex::lock_for_write(bool greedy) +{ + if (try_lock_for_write(greedy)) { + return seastar::make_ready_future<>(); + } + waiters.emplace_back(seastar::promise<>(), type_t::write); + return waiters.back().pr.get_future(); +} + +bool tri_mutex::try_lock_for_write(bool greedy) noexcept +{ + if (!readers && !exclusively_used) { + if (greedy || waiters.empty()) { + ++writers; + return true; + } + } + return false; +} + +void tri_mutex::unlock_for_write() +{ + assert(writers > 0); + if (--writers == 0) { + wake(); + } +} + +void tri_mutex::promote_from_write() +{ + assert(writers == 1); + --writers; + exclusively_used = true; +} + +void tri_mutex::demote_to_write() +{ + assert(exclusively_used); + exclusively_used = false; + ++writers; +} + +// for exclusive users +seastar::future<> tri_mutex::lock_for_excl() +{ + if (try_lock_for_excl()) { + return seastar::make_ready_future<>(); + } + waiters.emplace_back(seastar::promise<>(), type_t::exclusive); + return waiters.back().pr.get_future(); +} + +bool tri_mutex::try_lock_for_excl() noexcept +{ + if (readers == 0u && writers == 0u && !exclusively_used) { + exclusively_used = true; + return true; + } else { + return false; + } +} + +void tri_mutex::unlock_for_excl() +{ + assert(exclusively_used); + exclusively_used = false; + wake(); +} + +bool tri_mutex::is_acquired() const +{ + if (readers != 0u) { + return true; + } else if (writers != 0u) { + return true; + } else if (exclusively_used) { + return true; + } else { + return false; + } +} + +void tri_mutex::wake() +{ + assert(!readers && !writers && !exclusively_used); + type_t type = type_t::none; + while (!waiters.empty()) { + auto& waiter = waiters.front(); + if (type == type_t::exclusive) { + break; + } if (type == type_t::none) { + type = waiter.type; + } else if (type != waiter.type) { + // to be woken in the next batch + break; + } + switch (type) { + case type_t::read: + ++readers; + break; + case type_t::write: + ++writers; + break; + case type_t::exclusive: + exclusively_used = true; + break; + default: + assert(0); + } + waiter.pr.set_value(); + waiters.pop_front(); + } +} diff --git a/src/crimson/common/tri_mutex.h b/src/crimson/common/tri_mutex.h new file mode 100644 index 000000000..0533f3539 --- /dev/null +++ b/src/crimson/common/tri_mutex.h @@ -0,0 +1,156 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> +#include <seastar/core/circular_buffer.hh> + +class read_lock { +public: + seastar::future<> lock(); + void unlock(); +}; + +class write_lock { +public: + seastar::future<> lock(); + void unlock(); +}; + +class excl_lock { +public: + seastar::future<> lock(); + void unlock(); +}; + +// promote from read to excl +class excl_lock_from_read { +public: + seastar::future<> lock(); + void unlock(); +}; + +// promote from write to excl +class excl_lock_from_write { +public: + seastar::future<> lock(); + void unlock(); +}; + +// promote from excl to excl +class excl_lock_from_excl { +public: + seastar::future<> lock(); + void unlock(); +}; + +/// shared/exclusive mutual exclusion +/// +/// this lock design uses reader and writer is entirely and completely +/// independent of the conventional reader/writer lock usage. Here, what we +/// mean is that we can pipeline reads, and we can pipeline writes, but we +/// cannot allow a read while writes are in progress or a write while reads are +/// in progress. Any rmw operation is therefore exclusive. +/// +/// tri_mutex is based on seastar::shared_mutex, but instead of two kinds of +/// waiters, tri_mutex keeps track of three kinds of lock users: +/// - readers +/// - writers +/// - exclusive users +class tri_mutex : private read_lock, + write_lock, + excl_lock, + excl_lock_from_read, + excl_lock_from_write, + excl_lock_from_excl +{ +public: + tri_mutex() = default; + ~tri_mutex(); + + read_lock& for_read() { + return *this; + } + write_lock& for_write() { + return *this; + } + excl_lock& for_excl() { + return *this; + } + excl_lock_from_read& excl_from_read() { + return *this; + } + excl_lock_from_write& excl_from_write() { + return *this; + } + excl_lock_from_excl& excl_from_excl() { + return *this; + } + + // for shared readers + seastar::future<> lock_for_read(); + bool try_lock_for_read() noexcept; + void unlock_for_read(); + void promote_from_read(); + void demote_to_read(); + unsigned get_readers() const { + return readers; + } + + // for shared writers + seastar::future<> lock_for_write(bool greedy); + bool try_lock_for_write(bool greedy) noexcept; + void unlock_for_write(); + void promote_from_write(); + void demote_to_write(); + unsigned get_writers() const { + return writers; + } + + // for exclusive users + seastar::future<> lock_for_excl(); + bool try_lock_for_excl() noexcept; + void unlock_for_excl(); + bool is_excl_acquired() const { + return exclusively_used; + } + + bool is_acquired() const; + + /// pass the provided exception to any waiting waiters + template<typename Exception> + void abort(Exception ex) { + while (!waiters.empty()) { + auto& waiter = waiters.front(); + waiter.pr.set_exception(std::make_exception_ptr(ex)); + waiters.pop_front(); + } + } + +private: + void wake(); + unsigned readers = 0; + unsigned writers = 0; + bool exclusively_used = false; + enum class type_t : uint8_t { + read, + write, + exclusive, + none, + }; + struct waiter_t { + waiter_t(seastar::promise<>&& pr, type_t type) + : pr(std::move(pr)), type(type) + {} + seastar::promise<> pr; + type_t type; + }; + seastar::circular_buffer<waiter_t> waiters; + friend class read_lock; + friend class write_lock; + friend class excl_lock; + friend class excl_lock_from_read; + friend class excl_lock_from_write; + friend class excl_lock_from_excl; +}; diff --git a/src/crimson/common/type_helpers.h b/src/crimson/common/type_helpers.h new file mode 100644 index 000000000..4c606581f --- /dev/null +++ b/src/crimson/common/type_helpers.h @@ -0,0 +1,8 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "boost/intrusive_ptr.hpp" + +template<typename T> using Ref = boost::intrusive_ptr<T>; diff --git a/src/crimson/common/utility.h b/src/crimson/common/utility.h new file mode 100644 index 000000000..86b308155 --- /dev/null +++ b/src/crimson/common/utility.h @@ -0,0 +1,38 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include <type_traits> + +namespace _impl { + template <class T> struct always_false : std::false_type {}; +}; + +template <class T> +void assert_moveable(T& t) { + // It's fine +} +template <class T> +void assert_moveable(const T& t) { + static_assert(_impl::always_false<T>::value, "unable to move-out from T"); +} + +namespace internal { + +template <typename Obj, typename Method, typename ArgTuple, size_t... I> +static auto _apply_method_to_tuple( + Obj &obj, Method method, ArgTuple &&tuple, + std::index_sequence<I...>) { + return (obj.*method)(std::get<I>(std::forward<ArgTuple>(tuple))...); +} + +} + +template <typename Obj, typename Method, typename ArgTuple> +auto apply_method_to_tuple(Obj &obj, Method method, ArgTuple &&tuple) { + constexpr auto tuple_size = std::tuple_size_v<ArgTuple>; + return internal::_apply_method_to_tuple( + obj, method, std::forward<ArgTuple>(tuple), + std::make_index_sequence<tuple_size>()); +} |