diff options
Diffstat (limited to 'src/crimson')
61 files changed, 8541 insertions, 0 deletions
diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt new file mode 100644 index 00000000..d7b58521 --- /dev/null +++ b/src/crimson/CMakeLists.txt @@ -0,0 +1,141 @@ +add_library(crimson::cflags INTERFACE IMPORTED) +set_target_properties(crimson::cflags PROPERTIES + INTERFACE_COMPILE_DEFINITIONS "WITH_SEASTAR=1" + INTERFACE_LINK_LIBRARIES Seastar::seastar) + +set(crimson_common_srcs + common/config_proxy.cc + common/perf_counters_collection.cc + common/assert.cc + common/log.cc) + +# the specialized version of ceph-common, where +# - the logging is sent to Seastar backend +# - and the template parameter of lock_policy is SINGLE +add_library(crimson-common STATIC + ${PROJECT_SOURCE_DIR}/src/common/admin_socket.cc + ${PROJECT_SOURCE_DIR}/src/common/admin_socket_client.cc + ${PROJECT_SOURCE_DIR}/src/common/bit_str.cc + ${PROJECT_SOURCE_DIR}/src/common/bloom_filter.cc + ${PROJECT_SOURCE_DIR}/src/common/ceph_argparse.cc + ${PROJECT_SOURCE_DIR}/src/common/ceph_context.cc + ${PROJECT_SOURCE_DIR}/src/common/ceph_crypto.cc + ${PROJECT_SOURCE_DIR}/src/common/ceph_hash.cc + ${PROJECT_SOURCE_DIR}/src/common/ceph_time.cc + ${PROJECT_SOURCE_DIR}/src/common/ceph_strings.cc + ${PROJECT_SOURCE_DIR}/src/common/cmdparse.cc + ${PROJECT_SOURCE_DIR}/src/common/common_init.cc + ${PROJECT_SOURCE_DIR}/src/common/compat.cc + ${PROJECT_SOURCE_DIR}/src/common/code_environment.cc + ${PROJECT_SOURCE_DIR}/src/common/config.cc + ${PROJECT_SOURCE_DIR}/src/common/config_values.cc + ${PROJECT_SOURCE_DIR}/src/common/dout.cc + ${PROJECT_SOURCE_DIR}/src/common/entity_name.cc + ${PROJECT_SOURCE_DIR}/src/common/environment.cc + ${PROJECT_SOURCE_DIR}/src/common/errno.cc + ${PROJECT_SOURCE_DIR}/src/common/escape.cc + ${PROJECT_SOURCE_DIR}/src/common/hex.cc + ${PROJECT_SOURCE_DIR}/src/common/fs_types.cc + ${PROJECT_SOURCE_DIR}/src/common/histogram.cc + ${PROJECT_SOURCE_DIR}/src/common/hobject.cc + ${PROJECT_SOURCE_DIR}/src/common/hostname.cc + ${PROJECT_SOURCE_DIR}/src/common/ipaddr.cc + ${PROJECT_SOURCE_DIR}/src/common/lockdep.cc + ${PROJECT_SOURCE_DIR}/src/common/mutex_debug.cc + ${PROJECT_SOURCE_DIR}/src/common/mempool.cc + ${PROJECT_SOURCE_DIR}/src/common/options.cc + ${PROJECT_SOURCE_DIR}/src/common/perf_counters.cc + ${PROJECT_SOURCE_DIR}/src/common/perf_histogram.cc + ${PROJECT_SOURCE_DIR}/src/common/page.cc + ${PROJECT_SOURCE_DIR}/src/common/pick_address.cc + ${PROJECT_SOURCE_DIR}/src/common/snap_types.cc + ${PROJECT_SOURCE_DIR}/src/common/signal.cc + ${PROJECT_SOURCE_DIR}/src/common/str_list.cc + ${PROJECT_SOURCE_DIR}/src/common/str_map.cc + ${PROJECT_SOURCE_DIR}/src/common/strtol.cc + ${PROJECT_SOURCE_DIR}/src/common/reverse.c + ${PROJECT_SOURCE_DIR}/src/common/types.cc + ${PROJECT_SOURCE_DIR}/src/common/utf8.c + ${PROJECT_SOURCE_DIR}/src/common/version.cc + ${PROJECT_SOURCE_DIR}/src/common/BackTrace.cc + ${PROJECT_SOURCE_DIR}/src/common/ConfUtils.cc + ${PROJECT_SOURCE_DIR}/src/common/DecayCounter.cc + ${PROJECT_SOURCE_DIR}/src/common/HTMLFormatter.cc + ${PROJECT_SOURCE_DIR}/src/common/Formatter.cc + ${PROJECT_SOURCE_DIR}/src/common/Graylog.cc + ${PROJECT_SOURCE_DIR}/src/common/LogEntry.cc + ${PROJECT_SOURCE_DIR}/src/common/Mutex.cc + ${PROJECT_SOURCE_DIR}/src/common/SubProcess.cc + ${PROJECT_SOURCE_DIR}/src/common/TextTable.cc + ${PROJECT_SOURCE_DIR}/src/common/Thread.cc + ${PROJECT_SOURCE_DIR}/src/common/HeartbeatMap.cc + ${PROJECT_SOURCE_DIR}/src/common/PluginRegistry.cc + ${PROJECT_SOURCE_DIR}/src/librbd/Features.cc + ${PROJECT_SOURCE_DIR}/src/log/Log.cc + ${PROJECT_SOURCE_DIR}/src/mgr/ServiceMap.cc + ${PROJECT_SOURCE_DIR}/src/mds/inode_backtrace.cc + ${PROJECT_SOURCE_DIR}/src/mds/mdstypes.cc + ${PROJECT_SOURCE_DIR}/src/mds/FSMap.cc + ${PROJECT_SOURCE_DIR}/src/mds/FSMapUser.cc + ${PROJECT_SOURCE_DIR}/src/mds/MDSMap.cc + ${PROJECT_SOURCE_DIR}/src/msg/msg_types.cc + ${PROJECT_SOURCE_DIR}/src/msg/Message.cc + ${PROJECT_SOURCE_DIR}/src/mon/PGMap.cc + ${PROJECT_SOURCE_DIR}/src/mon/MonCap.cc + ${PROJECT_SOURCE_DIR}/src/mon/MonMap.cc + ${PROJECT_SOURCE_DIR}/src/osd/osd_types.cc + ${PROJECT_SOURCE_DIR}/src/osd/ECMsgTypes.cc + ${PROJECT_SOURCE_DIR}/src/osd/HitSet.cc + ${PROJECT_SOURCE_DIR}/src/osd/OSDMap.cc + ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc + ${crimson_common_srcs} + $<TARGET_OBJECTS:crimson-auth> + $<TARGET_OBJECTS:common_buffer_obj> + $<TARGET_OBJECTS:common_mountcephfs_objs> + $<TARGET_OBJECTS:crimson-crush> + $<TARGET_OBJECTS:global_common_objs>) + +target_compile_definitions(crimson-common PRIVATE + "CEPH_LIBDIR=\"${CMAKE_INSTALL_FULL_LIBDIR}\"" + "CEPH_PKGLIBDIR=\"${CEPH_INSTALL_FULL_PKGLIBDIR}\"" + "CEPH_DATADIR=\"${CEPH_INSTALL_DATADIR}\"") + +target_link_libraries(crimson-common + PUBLIC + json_spirit + PRIVATE + crc32 + crimson::cflags + Boost::iostreams + Boost::random + ${NSS_LIBRARIES} ${NSPR_LIBRARIES} OpenSSL::Crypto) + +set(crimson_auth_srcs + auth/KeyRing.cc) +set(crimson_mon_srcs + mon/MonClient.cc + ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc) +set(crimson_net_srcs + net/Dispatcher.cc + net/Errors.cc + net/Messenger.cc + net/SocketConnection.cc + net/SocketMessenger.cc + net/Socket.cc) +set(crimson_thread_srcs + thread/ThreadPool.cc + thread/Throttle.cc) +add_library(crimson STATIC + ${crimson_auth_srcs} + ${crimson_mon_srcs} + ${crimson_net_srcs} + ${crimson_thread_srcs} + ${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc) +target_compile_options(crimson PUBLIC + "-ftemplate-backtrace-limit=0") +target_link_libraries(crimson + PUBLIC + crimson-common + crimson::cflags) +add_subdirectory(os) +add_subdirectory(osd) diff --git a/src/crimson/auth/Errors.cc b/src/crimson/auth/Errors.cc new file mode 100644 index 00000000..c5f1b8d8 --- /dev/null +++ b/src/crimson/auth/Errors.cc @@ -0,0 +1,31 @@ +#include "Errors.h" + +namespace ceph::net { + +const std::error_category& auth_category() +{ + struct category : public std::error_category { + const char* name() const noexcept override { + return "ceph::auth"; + } + + std::string message(int ev) const override { + switch (static_cast<error>(ev)) { + case error::success: + return "success", + case error::key_not_found: + return "key not found"; + case error::invalid_key: + return "corrupted key"; + case error::unknown_service: + return "unknown service"; + default: + return "unknown"; + } + } + }; + static category instance; + return instance; +} + +} // namespace ceph::auth diff --git a/src/crimson/auth/Errors.h b/src/crimson/auth/Errors.h new file mode 100644 index 00000000..92f5c733 --- /dev/null +++ b/src/crimson/auth/Errors.h @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +namespace ceph::auth { + +enum class error { + success = 0, + key_not_found, + invalid_key, + unknown_service, // no ticket handler for required service +}; + +const std::error_category& auth_category(); + +inline std::error_code make_error_code(error e) +{ + return {static_cast<int>(e), auth_category()}; +} + +inline std::error_condition make_error_condition(error e) +{ + return {static_cast<int>(e), auth_category()}; +} + +class auth_error : public std::runtime_error {}; + +} // namespace ceph::auth + +namespace std { + +/// enables implicit conversion to std::error_condition +template <> +struct is_error_condition_enum<ceph::auth::error> : public true_type {}; + +} // namespace std diff --git a/src/crimson/auth/KeyRing.cc b/src/crimson/auth/KeyRing.cc new file mode 100644 index 00000000..5f82a262 --- /dev/null +++ b/src/crimson/auth/KeyRing.cc @@ -0,0 +1,89 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "KeyRing.h" + +#include <boost/algorithm/string.hpp> + +#include <seastar/core/do_with.hh> +#include <seastar/core/fstream.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/reactor.hh> + +#include "common/buffer_seastar.h" +#include "auth/KeyRing.h" +#include "include/denc.h" +#include "crimson/common/config_proxy.h" + +namespace ceph::auth { + +seastar::future<seastar::temporary_buffer<char>> read_file(const std::string& path) +{ + return seastar::open_file_dma(path, seastar::open_flags::ro).then([] (seastar::file f) { + return f.size().then([f = std::move(f)](size_t s) { + return seastar::do_with(seastar::make_file_input_stream(f), [s](seastar::input_stream<char>& in) { + return in.read_exactly(s); + }); + }); + }); +} + +seastar::future<KeyRing*> load_from_keyring(KeyRing* keyring) +{ + std::vector<std::string> paths; + boost::split(paths, ceph::common::local_conf()->keyring, + boost::is_any_of(",;")); + std::pair<bool, std::string> found; + return seastar::map_reduce(paths, [](auto path) { + return seastar::engine().file_exists(path).then([path](bool file_exists) { + return std::make_pair(file_exists, path); + }); + }, std::move(found), [](auto found, auto file_exists_and_path) { + if (!found.first && file_exists_and_path.first) { + found = std::move(file_exists_and_path); + } + return found; + }).then([keyring] (auto file_exists_and_path) { + const auto& [exists, path] = file_exists_and_path; + if (exists) { + return read_file(path).then([keyring](auto buf) { + bufferlist bl; + bl.append(buffer::create(std::move(buf))); + auto i = bl.cbegin(); + keyring->decode(i); + return seastar::make_ready_future<KeyRing*>(keyring); + }); + } else { + return seastar::make_ready_future<KeyRing*>(keyring); + } + }); +} + +seastar::future<KeyRing*> load_from_keyfile(KeyRing* keyring) +{ + auto& path = ceph::common::local_conf()->keyfile; + if (!path.empty()) { + return read_file(path).then([keyring](auto buf) { + EntityAuth ea; + ea.key.decode_base64(std::string(buf.begin(), + buf.end())); + keyring->add(ceph::common::local_conf()->name, ea); + return seastar::make_ready_future<KeyRing*>(keyring); + }); + } else { + return seastar::make_ready_future<KeyRing*>(keyring); + } +} + +seastar::future<KeyRing*> load_from_key(KeyRing* keyring) +{ + auto& key = ceph::common::local_conf()->key; + if (!key.empty()) { + EntityAuth ea; + ea.key.decode_base64(key); + keyring->add(ceph::common::local_conf()->name, ea); + } + return seastar::make_ready_future<KeyRing*>(keyring); +} + +} // namespace ceph::auth diff --git a/src/crimson/auth/KeyRing.h b/src/crimson/auth/KeyRing.h new file mode 100644 index 00000000..b68e6389 --- /dev/null +++ b/src/crimson/auth/KeyRing.h @@ -0,0 +1,15 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> + +class KeyRing; + +namespace ceph::auth { + // see KeyRing::from_ceph_context + seastar::future<KeyRing*> load_from_keyring(KeyRing* keyring); + seastar::future<KeyRing*> load_from_keyfile(KeyRing* keyring); + seastar::future<KeyRing*> load_from_key(KeyRing* keyring); +} diff --git a/src/crimson/common/assert.cc b/src/crimson/common/assert.cc new file mode 100644 index 00000000..9ed6b7e3 --- /dev/null +++ b/src/crimson/common/assert.cc @@ -0,0 +1,56 @@ +#include <cstdarg> + +#include <seastar/util/backtrace.hh> +#include <seastar/core/reactor.hh> + +#include "include/ceph_assert.h" + +#include "crimson/common/log.h" + +namespace ceph { + [[gnu::cold]] void __ceph_assert_fail(const ceph::assert_data &ctx) + { + __ceph_assert_fail(ctx.assertion, ctx.file, ctx.line, ctx.function); + } + + [[gnu::cold]] void __ceph_assert_fail(const char* assertion, + const char* file, int line, + const char* func) + { + seastar::logger& logger = ceph::get_logger(0); + logger.error("{}:{} : In function '{}', ceph_assert(%s)\n" + "{}", + file, line, func, assertion, + seastar::current_backtrace()); + abort(); + } + [[gnu::cold]] void __ceph_assertf_fail(const char *assertion, + const char *file, int line, + const char *func, const char* msg, + ...) + { + char buf[8096]; + va_list args; + va_start(args, msg); + std::vsnprintf(buf, sizeof(buf), msg, args); + va_end(args); + + seastar::logger& logger = ceph::get_logger(0); + logger.error("{}:{} : In function '{}', ceph_assert(%s)\n" + "{}", + file, line, func, assertion, + seastar::current_backtrace()); + abort(); + } + + [[gnu::cold]] void __ceph_abort(const char* file, int line, + const char* func, const std::string& msg) + { + seastar::logger& logger = ceph::get_logger(0); + logger.error("{}:{} : In function '{}', abort(%s)\n" + "{}", + file, line, func, msg, + seastar::current_backtrace()); + abort(); + } +} diff --git a/src/crimson/common/config_proxy.cc b/src/crimson/common/config_proxy.cc new file mode 100644 index 00000000..720fcffd --- /dev/null +++ b/src/crimson/common/config_proxy.cc @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + +#include "config_proxy.h" + +namespace ceph::common { + +ConfigProxy::ConfigProxy(const EntityName& name, std::string_view cluster) +{ + if (seastar::engine().cpu_id() != 0) { + return; + } + // set the initial value on CPU#0 + values.reset(seastar::make_lw_shared<ConfigValues>()); + values.get()->name = name; + values.get()->cluster = cluster; + // and the only copy of md_config_impl<> is allocated on CPU#0 + local_config.reset(new md_config_t{*values, obs_mgr, true}); + if (name.is_mds()) { + local_config->set_val_default(*values, obs_mgr, + "keyring", "$mds_data/keyring"); + } else if (name.is_osd()) { + local_config->set_val_default(*values, obs_mgr, + "keyring", "$osd_data/keyring"); + } +} + +seastar::future<> ConfigProxy::start() +{ + // populate values and config to all other shards + if (!values) { + return seastar::make_ready_future<>(); + } + return container().invoke_on_others([this](auto& proxy) { + return values.copy().then([config=local_config.get(), + &proxy](auto foreign_values) { + proxy.values.reset(); + proxy.values = std::move(foreign_values); + proxy.remote_config = config; + return seastar::make_ready_future<>(); + }); + }); +} + +ConfigProxy::ShardedConfig ConfigProxy::sharded_conf; +} diff --git a/src/crimson/common/config_proxy.h b/src/crimson/common/config_proxy.h new file mode 100644 index 00000000..46690009 --- /dev/null +++ b/src/crimson/common/config_proxy.h @@ -0,0 +1,178 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + +#pragma once + +#include <seastar/core/reactor.hh> +#include <seastar/core/sharded.hh> +#include "common/config.h" +#include "common/config_obs.h" +#include "common/config_obs_mgr.h" +#include "common/errno.h" + +namespace ceph::common { + +// a facade for managing config. each shard has its own copy of ConfigProxy. +// +// In seastar-osd, there could be multiple instances of @c ConfigValues in a +// single process, as we are using a variant of read-copy-update mechinary to +// update the settings at runtime. +class ConfigProxy : public seastar::peering_sharded_service<ConfigProxy> +{ + using LocalConfigValues = seastar::lw_shared_ptr<ConfigValues>; + seastar::foreign_ptr<LocalConfigValues> values; + + md_config_t* remote_config = nullptr; + std::unique_ptr<md_config_t> local_config; + + using ConfigObserver = ceph::md_config_obs_impl<ConfigProxy>; + ObserverMgr<ConfigObserver> obs_mgr; + + const md_config_t& get_config() const { + return remote_config ? *remote_config : * local_config; + } + md_config_t& get_config() { + return remote_config ? *remote_config : * local_config; + } + + // apply changes to all shards + // @param func a functor which accepts @c "ConfigValues&" + template<typename Func> + seastar::future<> do_change(Func&& func) { + return container().invoke_on(values.get_owner_shard(), + [func = std::move(func)](ConfigProxy& owner) { + // apply the changes to a copy of the values + auto new_values = seastar::make_lw_shared(*owner.values); + new_values->changed.clear(); + func(*new_values); + + // always apply the new settings synchronously on the owner shard, to + // avoid racings with other do_change() calls in parallel. + ObserverMgr<ConfigObserver>::rev_obs_map rev_obs; + owner.values.reset(new_values); + owner.obs_mgr.for_each_change(owner.values->changed, owner, + [&rev_obs](ConfigObserver *obs, + const std::string &key) { + rev_obs[obs].insert(key); + }, nullptr); + for (auto& [obs, keys] : rev_obs) { + obs->handle_conf_change(owner, keys); + } + + return seastar::parallel_for_each(boost::irange(1u, seastar::smp::count), + [&owner, new_values] (auto cpu) { + return owner.container().invoke_on(cpu, + [foreign_values = seastar::make_foreign(new_values)](ConfigProxy& proxy) mutable { + proxy.values.reset(); + proxy.values = std::move(foreign_values); + + ObserverMgr<ConfigObserver>::rev_obs_map rev_obs; + proxy.obs_mgr.for_each_change(proxy.values->changed, proxy, + [&rev_obs](md_config_obs_t *obs, + const std::string &key) { + rev_obs[obs].insert(key); + }, nullptr); + for (auto& obs_keys : rev_obs) { + obs_keys.first->handle_conf_change(proxy, obs_keys.second); + } + }); + }).finally([new_values] { + new_values->changed.clear(); + }); + }); + } +public: + ConfigProxy(const EntityName& name, std::string_view cluster); + const ConfigValues* operator->() const noexcept { + return values.get(); + } + ConfigValues* operator->() noexcept { + return values.get(); + } + + // required by sharded<> + seastar::future<> start(); + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + void add_observer(ConfigObserver* obs) { + obs_mgr.add_observer(obs); + } + void remove_observer(ConfigObserver* obs) { + obs_mgr.remove_observer(obs); + } + seastar::future<> rm_val(const std::string& key) { + return do_change([key, this](ConfigValues& values) { + auto ret = get_config().rm_val(values, key); + if (ret < 0) { + throw std::invalid_argument(cpp_strerror(ret)); + } + }); + } + seastar::future<> set_val(const std::string& key, + const std::string& val) { + return do_change([key, val, this](ConfigValues& values) { + std::stringstream err; + auto ret = get_config().set_val(values, obs_mgr, key, val, &err); + if (ret < 0) { + throw std::invalid_argument(err.str()); + } + }); + } + int get_val(const std::string &key, std::string *val) const { + return get_config().get_val(*values, key, val); + } + template<typename T> + const T get_val(const std::string& key) const { + return get_config().template get_val<T>(*values, key); + } + + int get_all_sections(std::vector<std::string>& sections) const { + return get_config().get_all_sections(sections); + } + + int get_val_from_conf_file(const std::vector<std::string>& sections, + const std::string& key, std::string& out, + bool expand_meta) const { + return get_config().get_val_from_conf_file(*values, sections, key, + out, expand_meta); + } + + unsigned get_osd_pool_default_min_size(uint8_t size) const { + return get_config().get_osd_pool_default_min_size(*values, size); + } + + seastar::future<> set_mon_vals(const std::map<std::string,std::string>& kv) { + return do_change([kv, this](ConfigValues& values) { + get_config().set_mon_vals(nullptr, values, obs_mgr, kv, nullptr); + }); + } + + seastar::future<> parse_config_files(const std::string& conf_files) { + return do_change([this, conf_files](ConfigValues& values) { + const char* conf_file_paths = + conf_files.empty() ? nullptr : conf_files.c_str(); + get_config().parse_config_files(values, + obs_mgr, + conf_file_paths, + &std::cerr, + CODE_ENVIRONMENT_DAEMON); + }); + } + + using ShardedConfig = seastar::sharded<ConfigProxy>; + +private: + static ShardedConfig sharded_conf; + friend ConfigProxy& local_conf(); + friend ShardedConfig& sharded_conf(); +}; + +inline ConfigProxy& local_conf() { + return ConfigProxy::sharded_conf.local(); +} + +inline ConfigProxy::ShardedConfig& sharded_conf() { + return ConfigProxy::sharded_conf; +} + +} diff --git a/src/crimson/common/log.cc b/src/crimson/common/log.cc new file mode 100644 index 00000000..6a57b233 --- /dev/null +++ b/src/crimson/common/log.cc @@ -0,0 +1,18 @@ +#include "log.h" + +static std::array<seastar::logger, ceph_subsys_get_num()> loggers{ +#define SUBSYS(name, log_level, gather_level) \ + seastar::logger(#name), +#define DEFAULT_SUBSYS(log_level, gather_level) \ + seastar::logger("none"), + #include "common/subsys.h" +#undef SUBSYS +#undef DEFAULT_SUBSYS +}; + +namespace ceph { +seastar::logger& get_logger(int subsys) { + assert(subsys < ceph_subsys_max); + return loggers[subsys]; +} +} diff --git a/src/crimson/common/log.h b/src/crimson/common/log.h new file mode 100644 index 00000000..64ff3365 --- /dev/null +++ b/src/crimson/common/log.h @@ -0,0 +1,6 @@ +#include <seastar/util/log.hh> +#include "common/subsys_types.h" + +namespace ceph { +seastar::logger& get_logger(int subsys); +} diff --git a/src/crimson/common/perf_counters_collection.cc b/src/crimson/common/perf_counters_collection.cc new file mode 100644 index 00000000..7a19a9f2 --- /dev/null +++ b/src/crimson/common/perf_counters_collection.cc @@ -0,0 +1,22 @@ +#include "perf_counters_collection.h" + +namespace ceph::common { +PerfCountersCollection::PerfCountersCollection() +{ + perf_collection = std::make_unique<PerfCountersCollectionImpl>(); +} +PerfCountersCollection::~PerfCountersCollection() +{ + perf_collection->clear(); +} + +PerfCountersCollectionImpl* PerfCountersCollection:: get_perf_collection() +{ + return perf_collection.get(); +} + +PerfCountersCollection::ShardedPerfCountersCollection PerfCountersCollection::sharded_perf_coll; + +} + + diff --git a/src/crimson/common/perf_counters_collection.h b/src/crimson/common/perf_counters_collection.h new file mode 100644 index 00000000..95e2a4f2 --- /dev/null +++ b/src/crimson/common/perf_counters_collection.h @@ -0,0 +1,33 @@ +#pragma once + +#include "common/perf_counters.h" +#include <seastar/core/sharded.hh> + +namespace ceph::common { +class PerfCountersCollection: public seastar::sharded<PerfCountersCollection> +{ + using ShardedPerfCountersCollection = seastar::sharded<PerfCountersCollection>; + +private: + std::unique_ptr<PerfCountersCollectionImpl> perf_collection; + static ShardedPerfCountersCollection sharded_perf_coll; + friend PerfCountersCollection& local_perf_coll(); + friend ShardedPerfCountersCollection& sharded_perf_coll(); + +public: + PerfCountersCollection(); + ~PerfCountersCollection(); + PerfCountersCollectionImpl* get_perf_collection(); + +}; + +inline PerfCountersCollection::ShardedPerfCountersCollection& sharded_perf_coll(){ + return PerfCountersCollection::sharded_perf_coll; +} + +inline PerfCountersCollection& local_perf_coll() { + return PerfCountersCollection::sharded_perf_coll.local(); +} + +} + diff --git a/src/crimson/common/shared_lru.h b/src/crimson/common/shared_lru.h new file mode 100644 index 00000000..25b1fe5e --- /dev/null +++ b/src/crimson/common/shared_lru.h @@ -0,0 +1,173 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <memory> +#include <optional> +#include <boost/smart_ptr/local_shared_ptr.hpp> +#include <boost/smart_ptr/weak_ptr.hpp> +#include "simple_lru.h" + +/// SharedLRU does its best to cache objects. It not only tracks the objects +/// in its LRU cache with strong references, it also tracks objects with +/// weak_ptr even if the cache does not hold any strong references to them. so +/// that it can return the objects after they are evicted, as long as they've +/// ever been cached and have not been destroyed yet. +template<class K, class V> +class SharedLRU { + using shared_ptr_t = boost::local_shared_ptr<V>; + using weak_ptr_t = boost::weak_ptr<V>; + using value_type = std::pair<K, shared_ptr_t>; + + // weak_refs is already ordered, and we don't use accessors like + // LRUCache::lower_bound(), so unordered LRUCache would suffice. + SimpleLRU<K, shared_ptr_t, false> cache; + std::map<K, std::pair<weak_ptr_t, V*>> weak_refs; + + struct Deleter { + SharedLRU<K,V>* cache; + const K key; + void operator()(V* ptr) { + cache->_erase(key); + delete ptr; + } + }; + void _erase(const K& key) { + weak_refs.erase(key); + } +public: + SharedLRU(size_t max_size = 20) + : cache{max_size} + {} + ~SharedLRU() { + cache.clear(); + // use plain assert() in utiliy classes to avoid dependencies on logging + assert(weak_refs.empty()); + } + /** + * Returns a reference to the given key, and perform an insertion if such + * key does not already exist + */ + shared_ptr_t operator[](const K& key); + /** + * Returns true iff there are no live references left to anything that has been + * in the cache. + */ + bool empty() const { + return weak_refs.empty(); + } + size_t size() const { + return cache.size(); + } + size_t capacity() const { + return cache.capacity(); + } + /*** + * Inserts a key if not present, or bumps it to the front of the LRU if + * it is, and then gives you a reference to the value. If the key already + * existed, you are responsible for deleting the new value you tried to + * insert. + * + * @param key The key to insert + * @param value The value that goes with the key + * @param existed Set to true if the value was already in the + * map, false otherwise + * @return A reference to the map's value for the given key + */ + shared_ptr_t insert(const K& key, std::unique_ptr<V> value); + // clear all strong reference from the lru. + void clear() { + cache.clear(); + } + shared_ptr_t find(const K& key); + // return the last element that is not greater than key + shared_ptr_t lower_bound(const K& key); + // return the first element that is greater than key + std::optional<value_type> upper_bound(const K& key); +}; + +template<class K, class V> +typename SharedLRU<K,V>::shared_ptr_t +SharedLRU<K,V>::insert(const K& key, std::unique_ptr<V> value) +{ + shared_ptr_t val; + if (auto found = weak_refs.find(key); found != weak_refs.end()) { + val = found->second.first.lock(); + } + if (!val) { + val.reset(value.release(), Deleter{this, key}); + weak_refs.emplace(key, std::make_pair(val, val.get())); + } + cache.insert(key, val); + return val; +} + +template<class K, class V> +typename SharedLRU<K,V>::shared_ptr_t +SharedLRU<K,V>::operator[](const K& key) +{ + if (auto found = cache.find(key); found) { + return *found; + } + shared_ptr_t val; + if (auto found = weak_refs.find(key); found != weak_refs.end()) { + val = found->second.first.lock(); + } + if (!val) { + val.reset(new V{}, Deleter{this, key}); + weak_refs.emplace(key, std::make_pair(val, val.get())); + } + cache.insert(key, val); + return val; +} + +template<class K, class V> +typename SharedLRU<K,V>::shared_ptr_t +SharedLRU<K,V>::find(const K& key) +{ + if (auto found = cache.find(key); found) { + return *found; + } + shared_ptr_t val; + if (auto found = weak_refs.find(key); found != weak_refs.end()) { + val = found->second.first.lock(); + } + if (val) { + cache.insert(key, val); + } + return val; +} + +template<class K, class V> +typename SharedLRU<K,V>::shared_ptr_t +SharedLRU<K,V>::lower_bound(const K& key) +{ + if (weak_refs.empty()) { + return {}; + } + auto found = weak_refs.lower_bound(key); + if (found == weak_refs.end()) { + --found; + } + if (auto val = found->second.first.lock(); val) { + cache.insert(key, val); + return val; + } else { + return {}; + } +} + +template<class K, class V> +std::optional<typename SharedLRU<K,V>::value_type> +SharedLRU<K,V>::upper_bound(const K& key) +{ + for (auto found = weak_refs.upper_bound(key); + found != weak_refs.end(); + ++found) { + if (auto val = found->second.first.lock(); val) { + return std::make_pair(found->first, val); + } + } + return std::nullopt; +} diff --git a/src/crimson/common/simple_lru.h b/src/crimson/common/simple_lru.h new file mode 100644 index 00000000..fca1061f --- /dev/null +++ b/src/crimson/common/simple_lru.h @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <list> +#include <map> +#include <optional> +#include <type_traits> +#include <unordered_map> + +template <class Key, class Value, bool Ordered> +class SimpleLRU { + static_assert(std::is_default_constructible_v<Value>); + using list_type = std::list<Key>; + template<class K, class V> + using map_t = std::conditional_t<Ordered, + std::map<K, V>, + std::unordered_map<K, V>>; + using map_type = map_t<Key, std::pair<Value, typename list_type::iterator>>; + list_type lru; + map_type cache; + const size_t max_size; + +public: + SimpleLRU(size_t size = 20) + : cache(size), + max_size(size) + {} + size_t size() const { + return cache.size(); + } + size_t capacity() const { + return max_size; + } + using insert_return_type = std::pair<Value, bool>; + insert_return_type insert(const Key& key, Value value); + std::optional<Value> find(const Key& key); + std::optional<std::enable_if<Ordered, Value>> lower_bound(const Key& key); + void erase(const Key& key); + void clear(); +private: + // bump the item to the front of the lru list + Value _lru_add(typename map_type::iterator found); + // evict the last element of most recently used list + void _evict(); +}; + +template <class Key, class Value, bool Ordered> +typename SimpleLRU<Key,Value,Ordered>::insert_return_type +SimpleLRU<Key,Value,Ordered>::insert(const Key& key, Value value) +{ + if constexpr(Ordered) { + auto found = cache.lower_bound(key); + if (found != cache.end() && found->first == key) { + // already exists + return {found->second.first, true}; + } else { + if (size() >= capacity()) { + _evict(); + } + lru.push_front(key); + // use lower_bound as hint to save the lookup + cache.emplace_hint(found, key, std::make_pair(value, lru.begin())); + return {std::move(value), false}; + } + } else { + // cache is not ordered + auto found = cache.find(key); + if (found != cache.end()) { + // already exists + return {found->second.first, true}; + } else { + if (size() >= capacity()) { + _evict(); + } + lru.push_front(key); + cache.emplace(key, std::make_pair(value, lru.begin())); + return {std::move(value), false}; + } + } +} + +template <class Key, class Value, bool Ordered> +std::optional<Value> SimpleLRU<Key,Value,Ordered>::find(const Key& key) +{ + if (auto found = cache.find(key); found != cache.end()){ + return _lru_add(found); + } else { + return {}; + } +} + +template <class Key, class Value, bool Ordered> +std::optional<std::enable_if<Ordered, Value>> +SimpleLRU<Key,Value,Ordered>::lower_bound(const Key& key) +{ + if (auto found = cache.lower_bound(key); found != cache.end()) { + return _lru_add(found); + } else { + return {}; + } +} + +template <class Key, class Value, bool Ordered> +void SimpleLRU<Key,Value,Ordered>::clear() +{ + lru.clear(); + cache.clear(); +} + +template <class Key, class Value, bool Ordered> +void SimpleLRU<Key,Value,Ordered>::erase(const Key& key) +{ + if (auto found = cache.find(key); found != cache.end()) { + lru.erase(found->second->second); + cache.erase(found); + } +} + +template <class Key, class Value, bool Ordered> +Value SimpleLRU<Key,Value,Ordered>::_lru_add( + typename SimpleLRU<Key,Value,Ordered>::map_type::iterator found) +{ + auto& [value, in_lru] = found->second; + if (in_lru != lru.begin()){ + // move item to the front + lru.splice(lru.begin(), lru, in_lru); + } + // the item is already at the front + return value; +} + +template <class Key, class Value, bool Ordered> +void SimpleLRU<Key,Value,Ordered>::_evict() +{ + // evict the last element of most recently used list + auto last = --lru.end(); + cache.erase(*last); + lru.erase(last); +} diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc new file mode 100644 index 00000000..4b61270c --- /dev/null +++ b/src/crimson/mon/MonClient.cc @@ -0,0 +1,606 @@ +#include "MonClient.h" + +#include <random> + +#include <seastar/core/future-util.hh> +#include <seastar/core/lowres_clock.hh> +#include <seastar/util/log.hh> + +#include "auth/AuthClientHandler.h" +#include "auth/AuthMethodList.h" +#include "auth/RotatingKeyRing.h" + +#include "common/hostname.h" + +#include "crimson/auth/KeyRing.h" +#include "crimson/common/config_proxy.h" +#include "crimson/common/log.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Errors.h" +#include "crimson/net/Messenger.h" + +#include "messages/MAuth.h" +#include "messages/MAuthReply.h" +#include "messages/MConfig.h" +#include "messages/MLogAck.h" +#include "messages/MMonCommand.h" +#include "messages/MMonCommandAck.h" +#include "messages/MMonGetVersion.h" +#include "messages/MMonGetVersionReply.h" +#include "messages/MMonMap.h" +#include "messages/MMonSubscribe.h" +#include "messages/MMonSubscribeAck.h" + +namespace { + seastar::logger& logger() + { + return ceph::get_logger(ceph_subsys_monc); + } + + template<typename Message, typename... Args> + Ref<Message> make_message(Args&&... args) + { + return {new Message{std::forward<Args>(args)...}, false}; + } +} + +namespace ceph::mon { + + +class Connection { +public: + Connection(ceph::net::ConnectionRef conn, + KeyRing* keyring); + seastar::future<> handle_auth_reply(Ref<MAuthReply> m); + seastar::future<> authenticate(epoch_t epoch, + const EntityName& name, + const AuthMethodList& auth_methods, + uint32_t want_keys); + seastar::future<> close(); + bool is_my_peer(const entity_addr_t& addr) const; + + seastar::future<> renew_tickets(); + ceph::net::ConnectionRef get_conn(); + +private: + seastar::future<> setup_session(epoch_t epoch, + const AuthMethodList& auth_methods, + const EntityName& name); + std::unique_ptr<AuthClientHandler> create_auth(Ref<MAuthReply> m, + const EntityName& name, + uint32_t want_keys); + seastar::future<bool> do_auth(); + +private: + bool closed = false; + seastar::promise<Ref<MAuthReply>> reply; + ceph::net::ConnectionRef conn; + std::unique_ptr<AuthClientHandler> auth; + RotatingKeyRing rotating_keyring; + uint64_t global_id; +}; + +Connection::Connection(ceph::net::ConnectionRef conn, + KeyRing* keyring) + : conn{conn}, + rotating_keyring{nullptr, CEPH_ENTITY_TYPE_OSD, keyring} +{} + +seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m) +{ + reply.set_value(m); + return seastar::now(); +} + +seastar::future<> Connection::renew_tickets() +{ + if (auth->need_tickets()) { + return do_auth().then([](bool success) { + if (!success) { + throw std::system_error(make_error_code( + ceph::net::error::negotiation_failure)); + } + }); + } + return seastar::now(); +} + +std::unique_ptr<AuthClientHandler> +Connection::create_auth(Ref<MAuthReply> m, + const EntityName& name, + uint32_t want_keys) +{ + static CephContext cct; + std::unique_ptr<AuthClientHandler> auth; + auth.reset(AuthClientHandler::create(&cct, + m->protocol, + &rotating_keyring)); + if (!auth) { + logger().error("no handler for protocol {}", m->protocol); + throw std::system_error(make_error_code( + ceph::net::error::negotiation_failure)); + } + auth->init(name); + auth->set_want_keys(want_keys); + auth->set_global_id(global_id); + + if (m->global_id != global_id) { + // it's a new session + auth->set_global_id(global_id); + auth->reset(); + } + return auth; +} + +seastar::future<> +Connection::setup_session(epoch_t epoch, + const AuthMethodList& auth_methods, + const EntityName& name) +{ + auto m = make_message<MAuth>(); + m->protocol = 0; + m->monmap_epoch = epoch; + __u8 struct_v = 1; + encode(struct_v, m->auth_payload); + encode(auth_methods.get_supported_set(), m->auth_payload); + encode(name, m->auth_payload); + encode(global_id, m->auth_payload); + return conn->send(m); +} + +seastar::future<bool> Connection::do_auth() +{ + auto m = make_message<MAuth>(); + m->protocol = auth->get_protocol(); + auth->prepare_build_request(); + if (int ret = auth->build_request(m->auth_payload); ret) { + logger().error("missing/bad key for '{}'", + ceph::common::local_conf()->name); + throw std::system_error(make_error_code( + ceph::net::error::negotiation_failure)); + } + logger().info("sending {}", *m); + return conn->send(m).then([this] { + logger().info("waiting"); + return reply.get_future(); + }).then([this] (Ref<MAuthReply> m) { + logger().info("mon {} => {} returns {}: {}", + conn->get_messenger()->get_myaddr(), + conn->get_peer_addr(), *m, m->result); + reply = decltype(reply){}; + auto p = m->result_bl.cbegin(); + auto ret = auth->handle_response(m->result, p, +#warning fix crimson: session_key, connection_secret + nullptr, nullptr); + if (ret != 0 && ret != -EAGAIN) { + throw std::system_error(make_error_code( + ceph::net::error::negotiation_failure)); + } + return seastar::make_ready_future<bool>(ret == 0); + }); +} + +seastar::future<> +Connection::authenticate(epoch_t epoch, + const EntityName& name, + const AuthMethodList& auth_methods, + uint32_t want_keys) +{ + return conn->keepalive().then([epoch, auth_methods, name, this] { + return setup_session(epoch, auth_methods, name); + }).then([this] { + return reply.get_future(); + }).then([name, want_keys, this](Ref<MAuthReply> m) { + reply = decltype(reply){}; + auth = create_auth(m, name, want_keys); + global_id = m->global_id; + switch (auto p = m->result_bl.cbegin(); + auth->handle_response(m->result, p, +#warning fix crimson: session_key, connection_secret + nullptr, nullptr)) { + case 0: + // none + return seastar::now(); + case -EAGAIN: + // cephx + return seastar::repeat([this] { + return do_auth().then([](bool success) { + return seastar::make_ready_future<seastar::stop_iteration>( + success ? + seastar::stop_iteration::yes: + seastar::stop_iteration::no); + }); + }); + default: + ceph_assert_always(0); + } + }); +} + +seastar::future<> Connection::close() +{ + if (conn && !std::exchange(closed, true)) { + return conn->close(); + } else { + return seastar::now(); + } +} + +bool Connection::is_my_peer(const entity_addr_t& addr) const +{ + return conn->get_peer_addr() == addr; +} + +ceph::net::ConnectionRef Connection::get_conn() { + return conn; +} + +namespace { +auto create_auth_methods(uint32_t entity_type) +{ + auto& conf = ceph::common::local_conf(); + std::string method; + const auto auth_supported = conf.get_val<std::string>("auth_supported"); + if (!auth_supported.empty()) { + method = auth_supported; + } else if (entity_type & (CEPH_ENTITY_TYPE_OSD | + CEPH_ENTITY_TYPE_MDS | + CEPH_ENTITY_TYPE_MON | + CEPH_ENTITY_TYPE_MGR)) { + method = conf.get_val<std::string>("auth_cluster_required"); + } else { + method = conf.get_val<std::string>("auth_client_required"); + } + return std::make_unique<AuthMethodList>(nullptr, method); +} +} + +Client::Client(ceph::net::Messenger& messenger) + // currently, crimson is OSD-only + : want_keys{CEPH_ENTITY_TYPE_MON | + CEPH_ENTITY_TYPE_OSD | + CEPH_ENTITY_TYPE_MGR}, + timer{[this] { tick(); }}, + msgr{messenger} +{} + +Client::Client(Client&&) = default; +Client::~Client() = default; + +seastar::future<> Client::start() { + entity_name = ceph::common::local_conf()->name; + // should always be OSD, though + auth_methods = create_auth_methods(entity_name.get_type()); + return load_keyring().then([this] { + return monmap.build_initial(ceph::common::local_conf(), false); + }).then([this] { + return authenticate(); + }); +} + +seastar::future<> Client::load_keyring() +{ + if (!auth_methods->is_supported_auth(CEPH_AUTH_CEPHX)) { + return seastar::now(); + } else { + return ceph::auth::load_from_keyring(&keyring).then([](KeyRing* keyring) { + return ceph::auth::load_from_keyfile(keyring); + }).then([](KeyRing* keyring) { + return ceph::auth::load_from_key(keyring); + }).then([](KeyRing*) { + return seastar::now(); + }); + } +} + +void Client::tick() +{ + seastar::with_gate(tick_gate, [this] { + return active_con->renew_tickets(); + }); +} + +bool Client::is_hunting() const { + return !active_con; +} + +seastar::future<> +Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) +{ + logger().info("ms_dispatch {}", *m); + // we only care about these message types + switch (m->get_type()) { + case CEPH_MSG_MON_MAP: + return handle_monmap(conn, boost::static_pointer_cast<MMonMap>(m)); + case CEPH_MSG_AUTH_REPLY: + return handle_auth_reply( + conn, boost::static_pointer_cast<MAuthReply>(m)); + case CEPH_MSG_MON_SUBSCRIBE_ACK: + return handle_subscribe_ack( + boost::static_pointer_cast<MMonSubscribeAck>(m)); + case CEPH_MSG_MON_GET_VERSION_REPLY: + return handle_get_version_reply( + boost::static_pointer_cast<MMonGetVersionReply>(m)); + case MSG_MON_COMMAND_ACK: + return handle_mon_command_ack( + boost::static_pointer_cast<MMonCommandAck>(m)); + case MSG_LOGACK: + return handle_log_ack( + boost::static_pointer_cast<MLogAck>(m)); + case MSG_CONFIG: + return handle_config( + boost::static_pointer_cast<MConfig>(m)); + default: + return seastar::now(); + } +} + +seastar::future<> Client::ms_handle_reset(ceph::net::ConnectionRef conn) +{ + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn->get_peer_addr()](auto& mc) { + return mc.is_my_peer(peer_addr); + }); + if (found != pending_conns.end()) { + logger().warn("pending conn reset by {}", conn->get_peer_addr()); + return found->close(); + } else if (active_con && active_con->is_my_peer(conn->get_peer_addr())) { + logger().warn("active conn reset {}", conn->get_peer_addr()); + active_con.reset(); + return reopen_session(-1); + } else { + logger().error("unknown reset from {}", conn->get_peer_addr()); + return seastar::now(); + } +} + +seastar::future<> Client::handle_monmap(ceph::net::ConnectionRef conn, + Ref<MMonMap> m) +{ + monmap.decode(m->monmapbl); + const auto peer_addr = conn->get_peer_addr(); + auto cur_mon = monmap.get_name(peer_addr); + logger().info("got monmap {}, mon.{}, is now rank {}", + monmap.epoch, cur_mon, monmap.get_rank(cur_mon)); + sub.got("monmap", monmap.get_epoch()); + + if (monmap.get_addr_name(peer_addr, cur_mon)) { + return seastar::now(); + } else { + logger().warn("mon.{} went away", cur_mon); + return reopen_session(-1); + } +} + +seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn, + Ref<MAuthReply> m) +{ + logger().info("mon {} => {} returns {}: {}", + conn->get_messenger()->get_myaddr(), + conn->get_peer_addr(), *m, m->result); + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer_addr = conn->get_peer_addr()](auto& mc) { + return mc.is_my_peer(peer_addr); + }); + if (found != pending_conns.end()) { + return found->handle_auth_reply(m); + } else if (active_con) { + return active_con->handle_auth_reply(m); + } else { + logger().error("unknown auth reply from {}", conn->get_peer_addr()); + return seastar::now(); + } +} + +seastar::future<> Client::handle_subscribe_ack(Ref<MMonSubscribeAck> m) +{ + sub.acked(m->interval); + return seastar::now(); +} + +Client::get_version_t Client::get_version(const std::string& map) +{ + auto m = make_message<MMonGetVersion>(); + auto tid = ++last_version_req_id; + m->handle = tid; + m->what = map; + auto& req = version_reqs[tid]; + return active_con->get_conn()->send(m).then([&req] { + return req.get_future(); + }); +} + +seastar::future<> +Client::handle_get_version_reply(Ref<MMonGetVersionReply> m) +{ + if (auto found = version_reqs.find(m->handle); + found != version_reqs.end()) { + auto& result = found->second; + logger().trace("{}: {} returns {}", + __func__, m->handle, m->version); + result.set_value(m->version, m->oldest_version); + version_reqs.erase(found); + } else { + logger().warn("{}: version request with handle {} not found", + __func__, m->handle); + } + return seastar::now(); +} + +seastar::future<> Client::handle_mon_command_ack(Ref<MMonCommandAck> m) +{ + const auto tid = m->get_tid(); + if (auto found = mon_commands.find(tid); + found != mon_commands.end()) { + auto& result = found->second; + logger().trace("{} {}", __func__, tid); + result.set_value(m->r, m->rs, std::move(m->get_data())); + mon_commands.erase(found); + } else { + logger().warn("{} {} not found", __func__, tid); + } + return seastar::now(); +} + +seastar::future<> Client::handle_log_ack(Ref<MLogAck> m) +{ + // XXX + return seastar::now(); +} + +seastar::future<> Client::handle_config(Ref<MConfig> m) +{ + return ceph::common::local_conf().set_mon_vals(m->config); +} + +std::vector<unsigned> Client::get_random_mons(unsigned n) const +{ + uint16_t min_priority = std::numeric_limits<uint16_t>::max(); + for (const auto& m : monmap.mon_info) { + if (m.second.priority < min_priority) { + min_priority = m.second.priority; + } + } + vector<unsigned> ranks; + for (auto [name, info] : monmap.mon_info) { + // TODO: #msgr-v2 + if (info.public_addrs.legacy_addr().is_blank_ip()) { + continue; + } + if (info.priority == min_priority) { + ranks.push_back(monmap.get_rank(name)); + } + } + std::random_device rd; + std::default_random_engine rng{rd()}; + std::shuffle(ranks.begin(), ranks.end(), rng); + if (n == 0 || n > ranks.size()) { + return ranks; + } else { + return {ranks.begin(), ranks.begin() + n}; + } +} + +seastar::future<> Client::authenticate() +{ + return reopen_session(-1); +} + +seastar::future<> Client::stop() +{ + return tick_gate.close().then([this] { + if (active_con) { + return active_con->close(); + } else { + return seastar::now(); + } + }); +} + +seastar::future<> Client::reopen_session(int rank) +{ + vector<unsigned> mons; + if (rank >= 0) { + mons.push_back(rank); + } else { + const auto parallel = + ceph::common::local_conf().get_val<uint64_t>("mon_client_hunt_parallel"); + mons = get_random_mons(parallel); + } + pending_conns.reserve(mons.size()); + return seastar::parallel_for_each(mons, [this](auto rank) { +#warning fixme + auto peer = monmap.get_addrs(rank).legacy_addr(); + logger().info("connecting to mon.{}", rank); + return msgr.connect(peer, CEPH_ENTITY_TYPE_MON) + .then([this] (auto xconn) { + // sharded-messenger compatible mode assumes all connections running + // in one shard. + ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id()); + ceph::net::ConnectionRef conn = xconn->release(); + auto& mc = pending_conns.emplace_back(conn, &keyring); + return mc.authenticate( + monmap.get_epoch(), entity_name, + *auth_methods, want_keys).handle_exception([conn](auto ep) { + return conn->close().then([ep = std::move(ep)] { + std::rethrow_exception(ep); + }); + }); + }).then([peer, this] { + if (!is_hunting()) { + return seastar::now(); + } + auto found = std::find_if(pending_conns.begin(), pending_conns.end(), + [peer](auto& mc) { + return mc.is_my_peer(peer); + }); + ceph_assert_always(found != pending_conns.end()); + active_con.reset(new Connection{std::move(*found)}); + return seastar::parallel_for_each(pending_conns, [] (auto& conn) { + return conn.close(); + }); + }); + }).then([this] { + pending_conns.clear(); + }); +} + +Client::command_result_t +Client::run_command(const std::vector<std::string>& cmd, + const bufferlist& bl) +{ + auto m = make_message<MMonCommand>(monmap.fsid); + auto tid = ++last_mon_command_id; + m->set_tid(tid); + m->cmd = cmd; + m->set_data(bl); + auto& req = mon_commands[tid]; + return active_con->get_conn()->send(m).then([&req] { + return req.get_future(); + }); +} + +seastar::future<> Client::send_message(MessageRef m) +{ + return active_con->get_conn()->send(m); +} + +bool Client::sub_want(const std::string& what, version_t start, unsigned flags) +{ + return sub.want(what, start, flags); +} + +void Client::sub_got(const std::string& what, version_t have) +{ + sub.got(what, have); +} + +void Client::sub_unwant(const std::string& what) +{ + sub.unwant(what); +} + +bool Client::sub_want_increment(const std::string& what, + version_t start, + unsigned flags) +{ + return sub.inc_want(what, start, flags); +} + +seastar::future<> Client::renew_subs() +{ + if (!sub.have_new()) { + logger().warn("{} - empty", __func__); + return seastar::now(); + } + logger().trace("{}", __func__); + + auto m = make_message<MMonSubscribe>(); + m->what = sub.get_subs(); + m->hostname = ceph_get_short_hostname(); + return active_con->get_conn()->send(m).then([this] { + sub.renewed(); + }); +} + +} // namespace ceph::mon diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h new file mode 100644 index 00000000..ef9bcb69 --- /dev/null +++ b/src/crimson/mon/MonClient.h @@ -0,0 +1,114 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + +#pragma once + +#include <memory> + +#include <seastar/core/future.hh> +#include <seastar/core/gate.hh> +#include <seastar/core/lowres_clock.hh> +#include <seastar/core/timer.hh> + +#include "auth/KeyRing.h" + +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Fwd.h" + +#include "mon/MonMap.h" + +#include "mon/MonSub.h" + +template<typename Message> using Ref = boost::intrusive_ptr<Message>; +namespace ceph::net { + class Messenger; +} + +class AuthMethodList; +class MAuthReply; +struct MMonMap; +struct MMonSubscribeAck; +struct MMonGetVersionReply; +struct MMonCommandAck; +struct MLogAck; +struct MConfig; + +namespace ceph::mon { + +class Connection; + +class Client : public ceph::net::Dispatcher { + EntityName entity_name; + KeyRing keyring; + std::unique_ptr<AuthMethodList> auth_methods; + const uint32_t want_keys; + + MonMap monmap; + seastar::promise<MessageRef> reply; + std::unique_ptr<Connection> active_con; + std::vector<Connection> pending_conns; + seastar::timer<seastar::lowres_clock> timer; + seastar::gate tick_gate; + + ceph::net::Messenger& msgr; + + // commands + using get_version_t = seastar::future<version_t, version_t>; + + ceph_tid_t last_version_req_id = 0; + std::map<ceph_tid_t, typename get_version_t::promise_type> version_reqs; + + ceph_tid_t last_mon_command_id = 0; + using command_result_t = + seastar::future<std::int32_t, string, ceph::bufferlist>; + std::map<ceph_tid_t, typename command_result_t::promise_type> mon_commands; + + MonSub sub; + +public: + Client(ceph::net::Messenger& messenger); + Client(Client&&); + ~Client(); + seastar::future<> start(); + seastar::future<> stop(); + + const uuid_d& get_fsid() const { + return monmap.fsid; + } + get_version_t get_version(const std::string& map); + command_result_t run_command(const std::vector<std::string>& cmd, + const bufferlist& bl); + seastar::future<> send_message(MessageRef); + bool sub_want(const std::string& what, version_t start, unsigned flags); + void sub_got(const std::string& what, version_t have); + void sub_unwant(const std::string& what); + bool sub_want_increment(const std::string& what, version_t start, unsigned flags); + seastar::future<> renew_subs(); + +private: + void tick(); + + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + + seastar::future<> handle_monmap(ceph::net::ConnectionRef conn, + Ref<MMonMap> m); + seastar::future<> handle_auth_reply(ceph::net::ConnectionRef conn, + Ref<MAuthReply> m); + seastar::future<> handle_subscribe_ack(Ref<MMonSubscribeAck> m); + seastar::future<> handle_get_version_reply(Ref<MMonGetVersionReply> m); + seastar::future<> handle_mon_command_ack(Ref<MMonCommandAck> m); + seastar::future<> handle_log_ack(Ref<MLogAck> m); + seastar::future<> handle_config(Ref<MConfig> m); + +private: + seastar::future<> load_keyring(); + seastar::future<> authenticate(); + + bool is_hunting() const; + seastar::future<> reopen_session(int rank); + std::vector<unsigned> get_random_mons(unsigned n) const; + seastar::future<> _add_conn(unsigned rank, uint64_t global_id); +}; + +} // namespace ceph::mon diff --git a/src/crimson/net/Config.h b/src/crimson/net/Config.h new file mode 100644 index 00000000..90929bde --- /dev/null +++ b/src/crimson/net/Config.h @@ -0,0 +1,26 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +// XXX: a poor man's md_config_t +#pragma once + +#include "include/msgr.h" +#include <chrono> + +namespace ceph::net { + +using namespace std::literals::chrono_literals; + +constexpr struct simple_md_config_t { + uint32_t host_type = CEPH_ENTITY_TYPE_OSD; + bool cephx_require_signatures = false; + bool cephx_cluster_require_signatures = false; + bool cephx_service_require_signatures = false; + bool ms_die_on_old_message = true; + bool ms_die_on_skipped_message = true; + std::chrono::milliseconds ms_initial_backoff = 200ms; + std::chrono::milliseconds ms_max_backoff = 15000ms; + std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms; + size_t osd_client_message_size_cap = 500ULL << 20; +} conf; +} diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h new file mode 100644 index 00000000..b1b72c74 --- /dev/null +++ b/src/crimson/net/Connection.h @@ -0,0 +1,66 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <queue> +#include <seastar/core/future.hh> +#include <seastar/core/shared_ptr.hh> + +#include "Fwd.h" + +namespace ceph::net { + +using seq_num_t = uint64_t; + +class Connection : public seastar::enable_shared_from_this<Connection> { + protected: + entity_addr_t peer_addr; + peer_type_t peer_type = -1; + + public: + Connection() {} + virtual ~Connection() {} + + virtual Messenger* get_messenger() const = 0; + const entity_addr_t& get_peer_addr() const { return peer_addr; } + virtual int get_peer_type() const = 0; + + /// true if the handshake has completed and no errors have been encountered + virtual seastar::future<bool> is_connected() = 0; + + /// send a message over a connection that has completed its handshake + virtual seastar::future<> send(MessageRef msg) = 0; + + /// send a keepalive message over a connection that has completed its + /// handshake + virtual seastar::future<> keepalive() = 0; + + /// close the connection and cancel any any pending futures from read/send + virtual seastar::future<> close() = 0; + + /// which shard id the connection lives + virtual seastar::shard_id shard_id() const = 0; + + virtual void print(ostream& out) const = 0; +}; + +inline ostream& operator<<(ostream& out, const Connection& conn) { + out << "["; + conn.print(out); + out << "]"; + return out; +} + +} // namespace ceph::net diff --git a/src/crimson/net/Dispatcher.cc b/src/crimson/net/Dispatcher.cc new file mode 100644 index 00000000..336ded38 --- /dev/null +++ b/src/crimson/net/Dispatcher.cc @@ -0,0 +1,11 @@ +#include "auth/Auth.h" +#include "Dispatcher.h" + +namespace ceph::net +{ +seastar::future<std::unique_ptr<AuthAuthorizer>> +Dispatcher::ms_get_authorizer(peer_type_t) +{ + return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>(nullptr); +} +} diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h new file mode 100644 index 00000000..cbde1549 --- /dev/null +++ b/src/crimson/net/Dispatcher.h @@ -0,0 +1,65 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/future.hh> +#include <seastar/core/sharded.hh> + +#include "Fwd.h" + +class AuthAuthorizer; + +namespace ceph::net { + +class Dispatcher { + public: + virtual ~Dispatcher() {} + + virtual seastar::future<> ms_dispatch(ConnectionRef conn, MessageRef m) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> ms_handle_accept(ConnectionRef conn) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> ms_handle_connect(ConnectionRef conn) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> ms_handle_reset(ConnectionRef conn) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> ms_handle_remote_reset(ConnectionRef conn) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<msgr_tag_t, bufferlist> + ms_verify_authorizer(peer_type_t, + auth_proto_t, + bufferlist&) { + return seastar::make_ready_future<msgr_tag_t, bufferlist>(0, bufferlist{}); + } + virtual seastar::future<std::unique_ptr<AuthAuthorizer>> + ms_get_authorizer(peer_type_t); + + // get the local dispatcher shard if it is accessed by another core + virtual Dispatcher* get_local_shard() { + return this; + } +}; + +} // namespace ceph::net diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc new file mode 100644 index 00000000..62d60ce1 --- /dev/null +++ b/src/crimson/net/Errors.cc @@ -0,0 +1,107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Errors.h" + +namespace ceph::net { + +const std::error_category& net_category() +{ + struct category : public std::error_category { + const char* name() const noexcept override { + return "ceph::net"; + } + + std::string message(int ev) const override { + switch (static_cast<error>(ev)) { + case error::success: + return "success"; + case error::bad_connect_banner: + return "bad connect banner"; + case error::bad_peer_address: + return "bad peer address"; + case error::negotiation_failure: + return "negotiation failure"; + case error::read_eof: + return "read eof"; + case error::connection_aborted: + return "connection aborted"; + case error::connection_refused: + return "connection refused"; + case error::connection_reset: + return "connection reset"; + default: + return "unknown"; + } + } + + // unfortunately, seastar throws connection errors with the system category, + // rather than the generic category that would match their counterparts + // in std::errc. we add our own errors for them, so we can match either + std::error_condition default_error_condition(int ev) const noexcept override { + switch (static_cast<error>(ev)) { + case error::connection_aborted: + return std::errc::connection_aborted; + case error::connection_refused: + return std::errc::connection_refused; + case error::connection_reset: + return std::errc::connection_reset; + default: + return std::error_condition(ev, *this); + } + } + + bool equivalent(int code, const std::error_condition& cond) const noexcept override { + if (error_category::equivalent(code, cond)) { + return true; + } + switch (static_cast<error>(code)) { + case error::connection_aborted: + return cond == std::errc::connection_aborted + || cond == std::error_condition(ECONNABORTED, std::system_category()); + case error::connection_refused: + return cond == std::errc::connection_refused + || cond == std::error_condition(ECONNREFUSED, std::system_category()); + case error::connection_reset: + return cond == std::errc::connection_reset + || cond == std::error_condition(ECONNRESET, std::system_category()); + default: + return false; + } + } + + bool equivalent(const std::error_code& code, int cond) const noexcept override { + if (error_category::equivalent(code, cond)) { + return true; + } + switch (static_cast<error>(cond)) { + case error::connection_aborted: + return code == std::errc::connection_aborted + || code == std::error_code(ECONNABORTED, std::system_category()); + case error::connection_refused: + return code == std::errc::connection_refused + || code == std::error_code(ECONNREFUSED, std::system_category()); + case error::connection_reset: + return code == std::errc::connection_reset + || code == std::error_code(ECONNRESET, std::system_category()); + default: + return false; + } + } + }; + static category instance; + return instance; +} + +} // namespace ceph::net diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h new file mode 100644 index 00000000..d75082fd --- /dev/null +++ b/src/crimson/net/Errors.h @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <system_error> + +namespace ceph::net { + +/// net error codes +enum class error { + success = 0, + bad_connect_banner, + bad_peer_address, + negotiation_failure, + read_eof, + connection_aborted, + connection_refused, + connection_reset, +}; + +/// net error category +const std::error_category& net_category(); + +inline std::error_code make_error_code(error e) +{ + return {static_cast<int>(e), net_category()}; +} + +inline std::error_condition make_error_condition(error e) +{ + return {static_cast<int>(e), net_category()}; +} + +} // namespace ceph::net + +namespace std { + +/// enables implicit conversion to std::error_condition +template <> +struct is_error_condition_enum<ceph::net::error> : public true_type {}; + +} // namespace std diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h new file mode 100644 index 00000000..8a0a1c96 --- /dev/null +++ b/src/crimson/net/Fwd.h @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/sharded.hh> + +#include "msg/msg_types.h" +#include "msg/Message.h" + +using peer_type_t = int; +using auth_proto_t = int; + +namespace ceph::net { + +using msgr_tag_t = uint8_t; + +class Connection; +using ConnectionRef = seastar::shared_ptr<Connection>; +// NOTE: ConnectionXRef should only be used in seastar world, because +// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads. +using ConnectionXRef = seastar::lw_shared_ptr<seastar::foreign_ptr<ConnectionRef>>; + +class Dispatcher; + +class Messenger; + +template <typename T, typename... Args> +seastar::future<T*> create_sharded(Args... args) { + auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>(); + return sharded_obj->start(args...).then([sharded_obj]() { + auto ret = &sharded_obj->local(); + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().finally([sharded_obj] {}); + }); + return ret; + }); +} + +} // namespace ceph::net diff --git a/src/crimson/net/Messenger.cc b/src/crimson/net/Messenger.cc new file mode 100644 index 00000000..7f8665ef --- /dev/null +++ b/src/crimson/net/Messenger.cc @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Messenger.h" +#include "SocketMessenger.h" + +namespace ceph::net { + +seastar::future<Messenger*> +Messenger::create(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const int master_sid) +{ + return create_sharded<SocketMessenger>(name, lname, nonce, master_sid) + .then([](Messenger *msgr) { + return msgr; + }); +} + +} // namespace ceph::net diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h new file mode 100644 index 00000000..9d766cb0 --- /dev/null +++ b/src/crimson/net/Messenger.h @@ -0,0 +1,117 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/future.hh> + +#include "Fwd.h" +#include "crimson/thread/Throttle.h" +#include "msg/Policy.h" + +class AuthAuthorizer; + +namespace ceph::net { + +using Throttle = ceph::thread::Throttle; +using SocketPolicy = ceph::net::Policy<Throttle>; + +class Messenger { + entity_name_t my_name; + entity_addrvec_t my_addrs; + uint32_t global_seq = 0; + uint32_t crc_flags = 0; + + public: + Messenger(const entity_name_t& name) + : my_name(name) + {} + virtual ~Messenger() {} + + const entity_name_t& get_myname() const { return my_name; } + const entity_addrvec_t& get_myaddrs() const { return my_addrs; } + entity_addr_t get_myaddr() const { return my_addrs.front(); } + virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) { + my_addrs = addrs; + return seastar::now(); + } + + /// bind to the given address + virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0; + + /// try to bind to the first unused port of given address + virtual seastar::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) = 0; + + /// start the messenger + virtual seastar::future<> start(Dispatcher *dispatcher) = 0; + + /// either return an existing connection to the peer, + /// or a new pending connection + virtual seastar::future<ConnectionXRef> + connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) = 0; + + // wait for messenger shutdown + virtual seastar::future<> wait() = 0; + + /// stop listenening and wait for all connections to close. safe to destruct + /// after this future becomes available + virtual seastar::future<> shutdown() = 0; + + uint32_t get_global_seq(uint32_t old=0) { + if (old > global_seq) { + global_seq = old; + } + return ++global_seq; + } + + uint32_t get_crc_flags() const { + return crc_flags; + } + void set_crc_data() { + crc_flags |= MSG_CRC_DATA; + } + void set_crc_header() { + crc_flags |= MSG_CRC_HEADER; + } + + // get the local messenger shard if it is accessed by another core + virtual Messenger* get_local_shard() { + return this; + } + + virtual void print(ostream& out) const = 0; + + virtual void set_default_policy(const SocketPolicy& p) = 0; + + virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0; + + virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0; + + static seastar::future<Messenger*> + create(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce, + const int master_sid=-1); +}; + +inline ostream& operator<<(ostream& out, const Messenger& msgr) { + out << "["; + msgr.print(out); + out << "]"; + return out; +} + +} // namespace ceph::net diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc new file mode 100644 index 00000000..a22e9b2e --- /dev/null +++ b/src/crimson/net/Socket.cc @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Socket.h" + +#include "Errors.h" + +namespace ceph::net { + +namespace { + +// an input_stream consumer that reads buffer segments into a bufferlist up to +// the given number of remaining bytes +struct bufferlist_consumer { + bufferlist& bl; + size_t& remaining; + + bufferlist_consumer(bufferlist& bl, size_t& remaining) + : bl(bl), remaining(remaining) {} + + using tmp_buf = seastar::temporary_buffer<char>; + using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type; + + // consume some or all of a buffer segment + seastar::future<consumption_result_type> operator()(tmp_buf&& data) { + if (remaining >= data.size()) { + // consume the whole buffer + remaining -= data.size(); + bl.append(buffer::create_foreign(std::move(data))); + if (remaining > 0) { + // return none to request more segments + return seastar::make_ready_future<consumption_result_type>( + seastar::continue_consuming{}); + } else { + // return an empty buffer to singal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type({})); + } + } + if (remaining > 0) { + // consume the front + bl.append(buffer::create_foreign(data.share(0, remaining))); + data.trim_front(remaining); + remaining = 0; + } + // give the rest back to signal that we're done + return seastar::make_ready_future<consumption_result_type>( + consumption_result_type::stop_consuming_type{std::move(data)}); + }; +}; + +} // anonymous namespace + +seastar::future<bufferlist> Socket::read(size_t bytes) +{ + if (bytes == 0) { + return seastar::make_ready_future<bufferlist>(); + } + r.buffer.clear(); + r.remaining = bytes; + return in.consume(bufferlist_consumer{r.buffer, r.remaining}) + .then([this] { + if (r.remaining) { // throw on short reads + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future<bufferlist>(std::move(r.buffer)); + }); +} + +seastar::future<seastar::temporary_buffer<char>> +Socket::read_exactly(size_t bytes) { + return in.read_exactly(bytes) + .then([this](auto buf) { + if (buf.empty()) { + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future<tmp_buf>(std::move(buf)); + }); +} + +} // namespace ceph::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h new file mode 100644 index 00000000..c1a2ed59 --- /dev/null +++ b/src/crimson/net/Socket.h @@ -0,0 +1,59 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/reactor.hh> +#include <seastar/net/packet.hh> + +#include "include/buffer.h" + +namespace ceph::net { + +class Socket +{ + const seastar::shard_id sid; + seastar::connected_socket socket; + seastar::input_stream<char> in; + seastar::output_stream<char> out; + + /// buffer state for read() + struct { + bufferlist buffer; + size_t remaining; + } r; + + public: + explicit Socket(seastar::connected_socket&& _socket) + : sid{seastar::engine().cpu_id()}, + socket(std::move(_socket)), + in(socket.input()), + out(socket.output()) {} + Socket(Socket&& o) = delete; + + /// read the requested number of bytes into a bufferlist + seastar::future<bufferlist> read(size_t bytes); + using tmp_buf = seastar::temporary_buffer<char>; + using packet = seastar::net::packet; + seastar::future<tmp_buf> read_exactly(size_t bytes); + + seastar::future<> write(packet&& buf) { + return out.write(std::move(buf)); + } + seastar::future<> flush() { + return out.flush(); + } + seastar::future<> write_flush(packet&& buf) { + return out.write(std::move(buf)).then([this] { return out.flush(); }); + } + + /// Socket can only be closed once. + seastar::future<> close() { + return seastar::smp::submit_to(sid, [this] { + return seastar::when_all( + in.close(), out.close()).discard_result(); + }); + } +}; + +} // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc new file mode 100644 index 00000000..2907c486 --- /dev/null +++ b/src/crimson/net/SocketConnection.cc @@ -0,0 +1,972 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketConnection.h" + +#include <algorithm> +#include <seastar/core/shared_future.hh> +#include <seastar/core/sleep.hh> +#include <seastar/net/packet.hh> + +#include "include/msgr.h" +#include "include/random.h" +#include "auth/Auth.h" +#include "auth/AuthSessionHandler.h" + +#include "crimson/common/log.h" +#include "Config.h" +#include "Dispatcher.h" +#include "Errors.h" +#include "SocketMessenger.h" + +using namespace ceph::net; + +template <typename T> +seastar::net::packet make_static_packet(const T& value) { + return { reinterpret_cast<const char*>(&value), sizeof(value) }; +} + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); + } +} + +SocketConnection::SocketConnection(SocketMessenger& messenger, + Dispatcher& dispatcher) + : messenger(messenger), + dispatcher(dispatcher), + send_ready(h.promise.get_future()) +{ + ceph_assert(&messenger.container().local() == &messenger); +} + +SocketConnection::~SocketConnection() +{ + ceph_assert(pending_dispatch.is_closed()); +} + +ceph::net::Messenger* +SocketConnection::get_messenger() const { + return &messenger; +} + +seastar::future<bool> SocketConnection::is_connected() +{ + return seastar::smp::submit_to(shard_id(), [this] { + return !send_ready.failed(); + }); +} + +seastar::future<> SocketConnection::send(MessageRef msg) +{ + return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] { + if (state == state_t::closing) + return seastar::now(); + return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] { + return do_send(std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} send fault: {}", *this, eptr); + close(); + }); + }); + }); +} + +seastar::future<> SocketConnection::keepalive() +{ + return seastar::smp::submit_to(shard_id(), [this] { + if (state == state_t::closing) + return seastar::now(); + return seastar::with_gate(pending_dispatch, [this] { + return do_keepalive() + .handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} keepalive fault: {}", *this, eptr); + close(); + }); + }); + }); +} + +seastar::future<> SocketConnection::close() +{ + return seastar::smp::submit_to(shard_id(), [this] { + return do_close(); + }); +} + +seastar::future<> SocketConnection::handle_tags() +{ + return seastar::keep_doing([this] { + // read the next tag + return socket->read_exactly(1) + .then([this] (auto buf) { + switch (buf[0]) { + case CEPH_MSGR_TAG_MSG: + return read_message(); + case CEPH_MSGR_TAG_ACK: + return handle_ack(); + case CEPH_MSGR_TAG_KEEPALIVE: + return seastar::now(); + case CEPH_MSGR_TAG_KEEPALIVE2: + return handle_keepalive2(); + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + return handle_keepalive2_ack(); + case CEPH_MSGR_TAG_CLOSE: + logger().info("{} got tag close", *this); + throw std::system_error(make_error_code(error::connection_aborted)); + default: + logger().error("{} got unknown msgr tag {}", *this, static_cast<int>(buf[0])); + throw std::system_error(make_error_code(error::read_eof)); + } + }); + }); +} + +seastar::future<> SocketConnection::handle_ack() +{ + return socket->read_exactly(sizeof(ceph_le64)) + .then([this] (auto buf) { + auto seq = reinterpret_cast<const ceph_le64*>(buf.get()); + discard_up_to(&sent, *seq); + }); +} + +void SocketConnection::discard_up_to(std::queue<MessageRef>* queue, + seq_num_t seq) +{ + while (!queue->empty() && + queue->front()->get_seq() < seq) { + queue->pop(); + } +} + +void SocketConnection::requeue_sent() +{ + out_seq -= sent.size(); + while (!sent.empty()) { + auto m = sent.front(); + sent.pop(); + out_q.push(std::move(m)); + } +} + +seastar::future<> SocketConnection::maybe_throttle() +{ + if (!policy.throttler_bytes) { + return seastar::now(); + } + const auto to_read = (m.header.front_len + + m.header.middle_len + + m.header.data_len); + return policy.throttler_bytes->get(to_read); +} + +seastar::future<> SocketConnection::read_message() +{ + return socket->read(sizeof(m.header)) + .then([this] (bufferlist bl) { + // throttle the traffic, maybe + auto p = bl.cbegin(); + ::decode(m.header, p); + return maybe_throttle(); + }).then([this] { + // read front + return socket->read(m.header.front_len); + }).then([this] (bufferlist bl) { + m.front = std::move(bl); + // read middle + return socket->read(m.header.middle_len); + }).then([this] (bufferlist bl) { + m.middle = std::move(bl); + // read data + return socket->read(m.header.data_len); + }).then([this] (bufferlist bl) { + m.data = std::move(bl); + // read footer + return socket->read(sizeof(m.footer)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(m.footer, p); + auto msg = ::decode_message(nullptr, 0, m.header, m.footer, + m.front, m.middle, m.data, nullptr); + // TODO: set time stamps + msg->set_byte_throttler(policy.throttler_bytes); + + if (!update_rx_seq(msg->get_seq())) { + // skip this message + return; + } + + constexpr bool add_ref = false; // Message starts with 1 ref + // TODO: change MessageRef with foreign_ptr + auto msg_ref = MessageRef{msg, add_ref}; + // start dispatch, ignoring exceptions from the application layer + seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { + return dispatcher.ms_dispatch( + seastar::static_pointer_cast<SocketConnection>(shared_from_this()), + std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_dispatch caught exception: {}", *this, eptr); + ceph_assert(false); + }); + }); + }); +} + +bool SocketConnection::update_rx_seq(seq_num_t seq) +{ + if (seq <= in_seq) { + if (HAVE_FEATURE(features, RECONNECT_SEQ) && + conf.ms_die_on_old_message) { + ceph_abort_msg("old msgs despite reconnect_seq feature"); + } + return false; + } else if (seq > in_seq + 1) { + if (conf.ms_die_on_skipped_message) { + ceph_abort_msg("skipped incoming seq"); + } + return false; + } else { + in_seq = seq; + return true; + } +} + +seastar::future<> SocketConnection::write_message(MessageRef msg) +{ + msg->set_seq(++out_seq); + auto& header = msg->get_header(); + header.src = messenger.get_myname(); + msg->encode(features, messenger.get_crc_flags()); + bufferlist bl; + bl.append(CEPH_MSGR_TAG_MSG); + bl.append((const char*)&header, sizeof(header)); + bl.append(msg->get_payload()); + bl.append(msg->get_middle()); + bl.append(msg->get_data()); + auto& footer = msg->get_footer(); + if (HAVE_FEATURE(features, MSG_AUTH)) { + bl.append((const char*)&footer, sizeof(footer)); + } else { + ceph_msg_footer_old old_footer; + if (messenger.get_crc_flags() & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + if (messenger.get_crc_flags() & MSG_CRC_DATA) { + old_footer.data_crc = footer.data_crc; + } else { + old_footer.data_crc = 0; + } + old_footer.flags = footer.flags; + bl.append((const char*)&old_footer, sizeof(old_footer)); + } + // write as a seastar::net::packet + return socket->write_flush(std::move(bl)); + // TODO: lossless policy + // .then([this, msg = std::move(msg)] { + // if (!policy.lossy) { + // sent.push(std::move(msg)); + // } + // }); +} + +seastar::future<> SocketConnection::do_send(MessageRef msg) +{ + // chain the message after the last message is sent + // TODO: retry send for lossless connection + seastar::shared_future<> f = send_ready.then( + [this, msg = std::move(msg)] { + if (state == state_t::closing) + return seastar::now(); + return write_message(std::move(msg)); + }); + + // chain any later messages after this one completes + send_ready = f.get_future(); + // allow the caller to wait on the same future + return f.get_future(); +} + +seastar::future<> SocketConnection::do_keepalive() +{ + // TODO: retry keepalive for lossless connection + seastar::shared_future<> f = send_ready.then([this] { + if (state == state_t::closing) + return seastar::now(); + k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( + ceph::coarse_real_clock::now()); + return socket->write_flush(make_static_packet(k.req)); + }); + send_ready = f.get_future(); + return f.get_future(); +} + +seastar::future<> SocketConnection::do_close() +{ + if (state == state_t::closing) { + // already closing + assert(close_ready.valid()); + return close_ready.get_future(); + } + + // unregister_conn() drops a reference, so hold another until completion + auto cleanup = [conn_ref = shared_from_this(), this] { + logger().debug("{} closed!", *this); + }; + + if (state == state_t::accepting) { + messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + } else if (state >= state_t::connecting && state < state_t::closing) { + messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + } else { + // cannot happen + ceph_assert(false); + } + + // close_ready become valid only after state is state_t::closing + assert(!close_ready.valid()); + + if (socket) { + close_ready = socket->close() + .then([this] { + return pending_dispatch.close(); + }).finally(std::move(cleanup)); + } else { + ceph_assert(state == state_t::connecting); + close_ready = pending_dispatch.close().finally(std::move(cleanup)); + } + logger().debug("{} trigger closing, was {}", *this, static_cast<int>(state)); + state = state_t::closing; + return close_ready.get_future(); +} + +// handshake + +/// store the banner in a non-const string for buffer::create_static() +static char banner[] = CEPH_BANNER; +constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; + +constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); +constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); + +WRITE_RAW_ENCODER(ceph_msg_connect); +WRITE_RAW_ENCODER(ceph_msg_connect_reply); + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) +{ + return out << "connect{features=" << std::hex << c.features << std::dec + << " host_type=" << c.host_type + << " global_seq=" << c.global_seq + << " connect_seq=" << c.connect_seq + << " protocol_version=" << c.protocol_version + << " authorizer_protocol=" << c.authorizer_protocol + << " authorizer_len=" << c.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}'; +} + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) +{ + return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag) + << " features=" << std::hex << r.features << std::dec + << " global_seq=" << r.global_seq + << " connect_seq=" << r.connect_seq + << " protocol_version=" << r.protocol_version + << " authorizer_len=" << r.authorizer_len + << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}'; +} + +// check that the buffer starts with a valid banner without requiring it to +// be contiguous in memory +static void validate_banner(bufferlist::const_iterator& p) +{ + auto b = std::cbegin(banner); + auto end = b + banner_size; + while (b != end) { + const char *buf{nullptr}; + auto remaining = std::distance(b, end); + auto len = p.get_ptr_and_advance(remaining, &buf); + if (!std::equal(buf, buf + len, b)) { + throw std::system_error(make_error_code(error::bad_connect_banner)); + } + b += len; + } +} + +// make sure that we agree with the peer about its address +static void validate_peer_addr(const entity_addr_t& addr, + const entity_addr_t& expected) +{ + if (addr == expected) { + return; + } + // ok if server bound anonymously, as long as port/nonce match + if (addr.is_blank_ip() && + addr.get_port() == expected.get_port() && + addr.get_nonce() == expected.get_nonce()) { + return; + } else { + throw std::system_error(make_error_code(error::bad_peer_address)); + } +} + +/// return a static bufferptr to the given object +template <typename T> +bufferptr create_static(T& obj) +{ + return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj)); +} + +bool SocketConnection::require_auth_feature() const +{ + if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { + return false; + } + if (conf.cephx_require_signatures) { + return true; + } + if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || + h.connect.host_type == CEPH_ENTITY_TYPE_MDS) { + return conf.cephx_cluster_require_signatures; + } else { + return conf.cephx_service_require_signatures; + } +} + +uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const +{ + constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; + // see also OSD.h, unlike other connection of simple/async messenger, + // crimson msgr is only used by osd + constexpr uint32_t CEPH_OSD_PROTOCOL = 10; + if (peer_type == my_type) { + // internal + return CEPH_OSD_PROTOCOL; + } else { + // public + switch (connect ? peer_type : my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + default: return 0; + } + } +} + +seastar::future<seastar::stop_iteration> +SocketConnection::repeat_handle_connect() +{ + return socket->read(sizeof(h.connect)) + .then([this](bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.connect, p); + peer_type = h.connect.host_type; + return socket->read(h.connect.authorizer_len); + }).then([this] (bufferlist authorizer) { + if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { + return seastar::make_ready_future<msgr_tag_t, bufferlist>( + CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); + } + if (require_auth_feature()) { + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features; + feat_missing != 0) { + return seastar::make_ready_future<msgr_tag_t, bufferlist>( + CEPH_MSGR_TAG_FEATURES, bufferlist{}); + } + return dispatcher.ms_verify_authorizer(peer_type, + h.connect.authorizer_protocol, + authorizer); + }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) { + memset(&h.reply, 0, sizeof(h.reply)); + if (tag) { + return send_connect_reply(tag, std::move(authorizer_reply)); + } + if (auto existing = messenger.lookup_conn(peer_addr); existing) { + return handle_connect_with_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq > 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, + std::move(authorizer_reply)); + } + h.connect_seq = h.connect.connect_seq + 1; + h.peer_global_seq = h.connect.global_seq; + set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features); + // TODO: cct + return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); + }); +} + +seastar::future<seastar::stop_iteration> +SocketConnection::send_connect_reply(msgr_tag_t tag, + bufferlist&& authorizer_reply) +{ + h.reply.tag = tag; + h.reply.features = static_cast<uint64_t>((h.connect.features & + policy.features_supported) | + policy.features_required); + h.reply.authorizer_len = authorizer_reply.length(); + return socket->write(make_static_packet(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + return socket->write_flush(std::move(reply)); + }).then([] { + return stop_t::no; + }); +} + +seastar::future<seastar::stop_iteration> +SocketConnection::send_connect_reply_ready(msgr_tag_t tag, + bufferlist&& authorizer_reply) +{ + h.global_seq = messenger.get_global_seq(); + h.reply.tag = tag; + h.reply.features = policy.features_supported; + h.reply.global_seq = h.global_seq; + h.reply.connect_seq = h.connect_seq; + h.reply.flags = 0; + if (policy.lossy) { + h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; + } + h.reply.authorizer_len = authorizer_reply.length(); + return socket->write(make_static_packet(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + if (reply.length()) { + return socket->write(std::move(reply)); + } else { + return seastar::now(); + } + }).then([this] { + if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { + return socket->write_flush(make_static_packet(in_seq)) + .then([this] { + return socket->read_exactly(sizeof(seq_num_t)); + }).then([this] (auto buf) { + auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get()); + discard_up_to(&out_q, *acked_seq); + }); + } else { + return socket->flush(); + } + }).then([this] { + return stop_t::yes; + }); +} + +seastar::future<> +SocketConnection::handle_keepalive2() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get()); + seastar::shared_future<> f = send_ready.then([this] { + logger().debug("{} keepalive2 {}", *this, k.ack.stamp.tv_sec); + return socket->write_flush(make_static_packet(k.ack)); + }); + send_ready = f.get_future(); + return f.get_future(); + }); +} + +seastar::future<> +SocketConnection::handle_keepalive2_ack() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + auto t = reinterpret_cast<const ceph_timespec*>(buf.get()); + k.ack_stamp = *t; + logger().debug("{} keepalive2 ack {}", *this, t->tv_sec); + }); +} + +seastar::future<seastar::stop_iteration> +SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply) +{ + if (h.connect.global_seq < existing->peer_global_seq()) { + h.reply.global_seq = existing->peer_global_seq(); + return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); + } else if (existing->is_lossy()) { + return replace_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) { + return replace_existing(existing, std::move(authorizer_reply), true); + } else if (h.connect.connect_seq < existing->connect_seq()) { + // old attempt, or we sent READY but they didn't get it. + h.reply.connect_seq = existing->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } else if (h.connect.connect_seq == existing->connect_seq()) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (existing->get_state() == state_t::open || + existing->get_state() == state_t::standby) { + if (policy.resetcheck && existing->connect_seq() == 0) { + return replace_existing(existing, std::move(authorizer_reply)); + } else { + h.reply.connect_seq = existing->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } + } else if (peer_addr < messenger.get_myaddr() || + existing->is_server_side()) { + // incoming wins + return replace_existing(existing, std::move(authorizer_reply)); + } else { + return send_connect_reply(CEPH_MSGR_TAG_WAIT); + } + } else if (policy.resetcheck && + existing->connect_seq() == 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); + } else { + return replace_existing(existing, std::move(authorizer_reply)); + } +} + +seastar::future<seastar::stop_iteration> +SocketConnection::replace_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer) +{ + msgr_tag_t reply_tag; + if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && + !is_reset_from_peer) { + reply_tag = CEPH_MSGR_TAG_SEQ; + } else { + reply_tag = CEPH_MSGR_TAG_READY; + } + messenger.unregister_conn(existing); + if (!existing->is_lossy()) { + // reset the in_seq if this is a hard reset from peer, + // otherwise we respect our original connection's value + in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num(); + // steal outgoing queue and out_seq + existing->requeue_sent(); + std::tie(out_seq, out_q) = existing->get_out_queue(); + } + return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); +} + +seastar::future<seastar::stop_iteration> +SocketConnection::handle_connect_reply(msgr_tag_t tag) +{ + switch (tag) { + case CEPH_MSGR_TAG_FEATURES: + logger().error("{} connect protocol feature mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADPROTOVER: + logger().error("{} connect protocol version mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADAUTHORIZER: + logger().error("{} got bad authorizer", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_RESETSESSION: + reset_session(); + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_RETRY_GLOBAL: + h.global_seq = messenger.get_global_seq(h.reply.global_seq); + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_RETRY_SESSION: + ceph_assert(h.reply.connect_seq > h.connect_seq); + h.connect_seq = h.reply.connect_seq; + return seastar::make_ready_future<stop_t>(stop_t::no); + case CEPH_MSGR_TAG_WAIT: + // TODO: state wait + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_SEQ: + case CEPH_MSGR_TAG_READY: + if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features); + missing) { + logger().error("{} missing required features", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + return seastar::futurize_apply([this, tag] { + if (tag == CEPH_MSGR_TAG_SEQ) { + return socket->read_exactly(sizeof(seq_num_t)) + .then([this] (auto buf) { + auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get()); + discard_up_to(&out_q, *acked_seq); + return socket->write_flush(make_static_packet(in_seq)); + }); + } + // tag CEPH_MSGR_TAG_READY + return seastar::now(); + }).then([this] { + // hooray! + h.peer_global_seq = h.reply.global_seq; + policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; + h.connect_seq++; + h.backoff = 0ms; + set_features(h.reply.features & h.connect.features); + if (h.authorizer) { + session_security.reset( + get_auth_session_handler(nullptr, + h.authorizer->protocol, + h.authorizer->session_key, + features)); + } + h.authorizer.reset(); + return seastar::make_ready_future<stop_t>(stop_t::yes); + }); + break; + default: + // unknown tag + logger().error("{} got unknown tag", __func__, int(tag)); + throw std::system_error(make_error_code(error::negotiation_failure)); + } +} + +void SocketConnection::reset_session() +{ + decltype(out_q){}.swap(out_q); + decltype(sent){}.swap(sent); + in_seq = 0; + h.connect_seq = 0; + if (HAVE_FEATURE(features, MSG_AUTH)) { + // Set out_seq to a random value, so CRC won't be predictable. + // Constant to limit starting sequence number to 2^31. Nothing special + // about it, just a big number. + constexpr uint64_t SEQ_MASK = 0x7fffffff; + out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK); + } else { + // previously, seq #'s always started at 0. + out_seq = 0; + } +} + +seastar::future<seastar::stop_iteration> +SocketConnection::repeat_connect() +{ + // encode ceph_msg_connect + memset(&h.connect, 0, sizeof(h.connect)); + h.connect.features = policy.features_supported; + h.connect.host_type = messenger.get_myname().type(); + h.connect.global_seq = h.global_seq; + h.connect.connect_seq = h.connect_seq; + h.connect.protocol_version = get_proto_version(peer_type, true); + // this is fyi, actually, server decides! + h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; + + return dispatcher.ms_get_authorizer(peer_type) + .then([this](auto&& auth) { + h.authorizer = std::move(auth); + bufferlist bl; + if (h.authorizer) { + h.connect.authorizer_protocol = h.authorizer->protocol; + h.connect.authorizer_len = h.authorizer->bl.length(); + bl.append(create_static(h.connect)); + bl.append(h.authorizer->bl); + } else { + h.connect.authorizer_protocol = 0; + h.connect.authorizer_len = 0; + bl.append(create_static(h.connect)); + }; + return socket->write_flush(std::move(bl)); + }).then([this] { + // read the reply + return socket->read(sizeof(h.reply)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.reply, p); + ceph_assert(p.end()); + return socket->read(h.reply.authorizer_len); + }).then([this] (bufferlist bl) { + if (h.authorizer) { + auto reply = bl.cbegin(); + if (!h.authorizer->verify_reply(reply, nullptr)) { + logger().error("{} authorizer failed to verify reply", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + } + return handle_connect_reply(h.reply.tag); + }); +} + +void +SocketConnection::start_connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) +{ + ceph_assert(state == state_t::none); + ceph_assert(!socket); + peer_addr = _peer_addr; + peer_type = _peer_type; + messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state)); + state = state_t::connecting; + seastar::with_gate(pending_dispatch, [this] { + return seastar::connect(peer_addr.in4_addr()) + .then([this](seastar::connected_socket fd) { + if (state == state_t::closing) { + fd.shutdown_input(); + fd.shutdown_output(); + throw std::system_error(make_error_code(error::connection_aborted)); + } + socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd))); + // read server's handshake header + return socket->read(server_header_size); + }).then([this] (bufferlist headerbl) { + auto p = headerbl.cbegin(); + validate_banner(p); + entity_addr_t saddr, caddr; + ::decode(saddr, p); + ::decode(caddr, p); + ceph_assert(p.end()); + validate_peer_addr(saddr, peer_addr); + + side = side_t::connector; + socket_port = caddr.get_port(); + return messenger.learned_addr(caddr); + }).then([this] { + // encode/send client's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + h.global_seq = messenger.get_global_seq(); + return socket->write_flush(std::move(bl)); + }).then([=] { + return seastar::repeat([this] { + return repeat_connect(); + }); + }).then([this] { + // notify the dispatcher and allow them to reject the connection + return dispatcher.ms_handle_connect(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + }).then([this] { + execute_open(); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the connecting state + logger().warn("{} connecting fault: {}", *this, eptr); + h.promise.set_value(); + close(); + }); + }); +} + +void +SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& sock, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::none); + ceph_assert(!socket); + peer_addr.u = _peer_addr.u; + peer_addr.set_port(0); + side = side_t::acceptor; + socket_port = _peer_addr.get_port(); + socket = std::move(sock); + messenger.accept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state)); + state = state_t::accepting; + seastar::with_gate(pending_dispatch, [this, _peer_addr] { + // encode/send server's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + ::encode(_peer_addr, bl, 0); + return socket->write_flush(std::move(bl)) + .then([this] { + // read client's handshake header and connect request + return socket->read(client_header_size); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + validate_banner(p); + entity_addr_t addr; + ::decode(addr, p); + ceph_assert(p.end()); + peer_addr.set_type(addr.get_type()); + peer_addr.set_port(addr.get_port()); + peer_addr.set_nonce(addr.get_nonce()); + return seastar::repeat([this] { + return repeat_handle_connect(); + }); + }).then([this] { + // notify the dispatcher and allow them to reject the connection + return dispatcher.ms_handle_accept(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + }).then([this] { + messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + execute_open(); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the accepting state + logger().warn("{} accepting fault: {}", *this, eptr); + h.promise.set_value(); + close(); + }); + }); +} + +void +SocketConnection::execute_open() +{ + logger().debug("{} trigger open, was {}", *this, static_cast<int>(state)); + state = state_t::open; + // satisfy the handshake's promise + h.promise.set_value(); + seastar::with_gate(pending_dispatch, [this] { + // start background processing of tags + return handle_tags() + .handle_exception_type([this] (const std::system_error& e) { + logger().warn("{} open fault: {}", *this, e); + if (e.code() == error::connection_aborted || + e.code() == error::connection_reset) { + return dispatcher.ms_handle_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this())) + .then([this] { + close(); + }); + } else if (e.code() == error::read_eof) { + return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this())) + .then([this] { + close(); + }); + } else { + throw e; + } + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the open state + logger().warn("{} open fault: {}", *this, eptr); + close(); + }); + }); +} + +seastar::future<> SocketConnection::fault() +{ + if (policy.lossy) { + messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this())); + } + if (h.backoff.count()) { + h.backoff += h.backoff; + } else { + h.backoff = conf.ms_initial_backoff; + } + if (h.backoff > conf.ms_max_backoff) { + h.backoff = conf.ms_max_backoff; + } + return seastar::sleep(h.backoff); +} + +seastar::shard_id SocketConnection::shard_id() const { + return messenger.shard_id(); +} + +void SocketConnection::print(ostream& out) const { + messenger.print(out); + if (side == side_t::none) { + out << " >> " << peer_addr; + } else if (side == side_t::acceptor) { + out << " >> " << peer_addr + << "@" << socket_port; + } else { // side == side_t::connector + out << "@" << socket_port + << " >> " << peer_addr; + } +} diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h new file mode 100644 index 00000000..62cc77d5 --- /dev/null +++ b/src/crimson/net/SocketConnection.h @@ -0,0 +1,235 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/shared_future.hh> +#include <seastar/core/sharded.hh> + +#include "msg/Policy.h" +#include "Connection.h" +#include "Socket.h" +#include "crimson/thread/Throttle.h" + +class AuthAuthorizer; +class AuthSessionHandler; + +namespace ceph::net { + +using stop_t = seastar::stop_iteration; + +class SocketMessenger; +class SocketConnection; +using SocketConnectionRef = seastar::shared_ptr<SocketConnection>; + +class SocketConnection : public Connection { + SocketMessenger& messenger; + seastar::foreign_ptr<std::unique_ptr<Socket>> socket; + Dispatcher& dispatcher; + seastar::gate pending_dispatch; + + // if acceptor side, socket_port is different from peer_addr.get_port(); + // if connector side, socket_port is different from my_addr.get_port(). + enum class side_t { + none, + acceptor, + connector + }; + side_t side = side_t::none; + uint16_t socket_port = 0; + + enum class state_t { + none, + accepting, + connecting, + open, + standby, + wait, + closing + }; + state_t state = state_t::none; + + /// become valid only when state is state_t::closing + seastar::shared_future<> close_ready; + + /// state for handshake + struct Handshake { + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + std::unique_ptr<AuthAuthorizer> authorizer; + std::chrono::milliseconds backoff; + uint32_t connect_seq = 0; + uint32_t peer_global_seq = 0; + uint32_t global_seq; + seastar::promise<> promise; + } h; + + /// server side of handshake negotiation + seastar::future<stop_t> repeat_handle_connect(); + seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply); + seastar::future<stop_t> replace_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer = false); + seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag, + bufferlist&& authorizer_reply = {}); + seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag, + bufferlist&& authorizer_reply); + + seastar::future<> handle_keepalive2(); + seastar::future<> handle_keepalive2_ack(); + + bool require_auth_feature() const; + uint32_t get_proto_version(entity_type_t peer_type, bool connec) const; + /// client side of handshake negotiation + seastar::future<stop_t> repeat_connect(); + seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag); + void reset_session(); + + /// state for an incoming message + struct MessageReader { + ceph_msg_header header; + ceph_msg_footer footer; + bufferlist front; + bufferlist middle; + bufferlist data; + } m; + + seastar::future<> maybe_throttle(); + seastar::future<> handle_tags(); + seastar::future<> handle_ack(); + + /// becomes available when handshake completes, and when all previous messages + /// have been sent to the output stream. send() chains new messages as + /// continuations to this future to act as a queue + seastar::future<> send_ready; + + /// encode/write a message + seastar::future<> write_message(MessageRef msg); + + ceph::net::Policy<ceph::thread::Throttle> policy; + uint64_t features; + void set_features(uint64_t new_features) { + features = new_features; + } + + /// the seq num of the last transmitted message + seq_num_t out_seq = 0; + /// the seq num of the last received message + seq_num_t in_seq = 0; + /// update the seq num of last received message + /// @returns true if the @c seq is valid, and @c in_seq is updated, + /// false otherwise. + bool update_rx_seq(seq_num_t seq); + + seastar::future<> read_message(); + + std::unique_ptr<AuthSessionHandler> session_security; + + // messages to be resent after connection gets reset + std::queue<MessageRef> out_q; + // messages sent, but not yet acked by peer + std::queue<MessageRef> sent; + static void discard_up_to(std::queue<MessageRef>*, seq_num_t); + + struct Keepalive { + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2; + ceph_timespec stamp; + } __attribute__((packed)) req; + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; + ceph_timespec stamp; + } __attribute__((packed)) ack; + ceph_timespec ack_stamp; + } k; + + seastar::future<> fault(); + + void execute_open(); + + seastar::future<> do_send(MessageRef msg); + seastar::future<> do_keepalive(); + seastar::future<> do_close(); + + public: + SocketConnection(SocketMessenger& messenger, + Dispatcher& dispatcher); + ~SocketConnection(); + + Messenger* get_messenger() const override; + + int get_peer_type() const override { + return peer_type; + } + + seastar::future<bool> is_connected() override; + + seastar::future<> send(MessageRef msg) override; + + seastar::future<> keepalive() override; + + seastar::future<> close() override; + + seastar::shard_id shard_id() const override; + + void print(ostream& out) const override; + + public: + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + void start_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket, + const entity_addr_t& peer_addr); + + /// the number of connections initiated in this session, increment when a + /// new connection is established + uint32_t connect_seq() const { + return h.connect_seq; + } + + /// the client side should connect us with a gseq. it will be reset with + /// the one of exsting connection if it's greater. + uint32_t peer_global_seq() const { + return h.peer_global_seq; + } + seq_num_t rx_seq_num() const { + return in_seq; + } + + /// current state of connection + state_t get_state() const { + return state; + } + bool is_server_side() const { + return policy.server; + } + bool is_lossy() const { + return policy.lossy; + } + + /// move all messages in the sent list back into the queue + void requeue_sent(); + + std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() { + return {out_seq, std::move(out_q)}; + } +}; + +} // namespace ceph::net diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc new file mode 100644 index 00000000..46a38ff7 --- /dev/null +++ b/src/crimson/net/SocketMessenger.cc @@ -0,0 +1,283 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketMessenger.h" + +#include <tuple> +#include <boost/functional/hash.hpp> + +#include "auth/Auth.h" +#include "Errors.h" +#include "Dispatcher.h" +#include "Socket.h" + +using namespace ceph::net; + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); + } +} + +SocketMessenger::SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce, + int master_sid) + : Messenger{myname}, + master_sid{master_sid}, + sid{seastar::engine().cpu_id()}, + logic_name{logic_name}, + nonce{nonce} +{} + +seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) +{ + auto my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; + } + return container().invoke_on_all([my_addrs](auto& msgr) { + return msgr.Messenger::set_myaddrs(my_addrs); + }); +} + +seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) +{ + ceph_assert(addrs.legacy_addr().get_family() == AF_INET); + auto my_addrs = addrs; + for (auto& addr : my_addrs.v) { + addr.nonce = nonce; + } + logger().info("listening on {}", my_addrs.legacy_addr().in4_addr()); + return container().invoke_on_all([my_addrs](auto& msgr) { + msgr.do_bind(my_addrs); + }); +} + +seastar::future<> +SocketMessenger::try_bind(const entity_addrvec_t& addrs, + uint32_t min_port, uint32_t max_port) +{ + auto addr = addrs.legacy_or_front_addr(); + if (addr.get_port() != 0) { + return bind(addrs); + } + ceph_assert(min_port <= max_port); + return seastar::do_with(uint32_t(min_port), + [this, max_port, addr] (auto& port) { + return seastar::repeat([this, max_port, addr, &port] { + auto to_bind = addr; + to_bind.set_port(port); + return bind(entity_addrvec_t{to_bind}) + .then([this] { + logger().info("{}: try_bind: done", *this); + return stop_t::yes; + }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { + logger().debug("{}: try_bind: {} already used", *this, port); + if (port == max_port) { + throw e; + } + ++port; + return stop_t::no; + }); + }); + }); +} + +seastar::future<> SocketMessenger::start(Dispatcher *disp) { + return container().invoke_on_all([disp](auto& msgr) { + return msgr.do_start(disp->get_local_shard()); + }); +} + +seastar::future<ceph::net::ConnectionXRef> +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +{ + auto shard = locate_shard(peer_addr); + return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) { + return msgr.do_connect(peer_addr, peer_type); + }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) { + return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn)); + }); +} + +seastar::future<> SocketMessenger::shutdown() +{ + return container().invoke_on_all([](auto& msgr) { + return msgr.do_shutdown(); + }).finally([this] { + return container().invoke_on_all([](auto& msgr) { + msgr.shutdown_promise.set_value(); + }); + }); +} + +void SocketMessenger::do_bind(const entity_addrvec_t& addrs) +{ + Messenger::set_myaddrs(addrs); + + // TODO: v2: listen on multiple addresses + seastar::socket_address address(addrs.legacy_addr().in4_addr()); + seastar::listen_options lo; + lo.reuse_address = true; + listener = seastar::listen(address, lo); +} + +seastar::future<> SocketMessenger::do_start(Dispatcher *disp) +{ + dispatcher = disp; + + // start listening if bind() was called + if (listener) { + seastar::keep_doing([this] { + return listener->accept() + .then([this] (seastar::connected_socket socket, + seastar::socket_address paddr) { + // allocate the connection + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + auto shard = locate_shard(peer_addr); +#warning fixme + // we currently do dangerous i/o from a Connection core, different from the Socket core. + auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket))); + // don't wait before accepting another + container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable { + SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher); + conn->start_accept(std::move(sock), peer_addr); + }); + }); + }).handle_exception_type([this] (const std::system_error& e) { + // stop gracefully on connection_aborted + if (e.code() != error::connection_aborted) { + logger().error("{} unexpected error during accept: {}", *this, e); + } + }); + } + + return seastar::now(); +} + +seastar::foreign_ptr<ceph::net::ConnectionRef> +SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +{ + if (auto found = lookup_conn(peer_addr); found) { + return seastar::make_foreign(found->shared_from_this()); + } + SocketConnectionRef conn = seastar::make_shared<SocketConnection>(*this, *dispatcher); + conn->start_connect(peer_addr, peer_type); + return seastar::make_foreign(conn->shared_from_this()); +} + +seastar::future<> SocketMessenger::do_shutdown() +{ + if (listener) { + listener->abort_accept(); + } + // close all connections + return seastar::parallel_for_each(accepting_conns, [] (auto conn) { + return conn->close(); + }).then([this] { + ceph_assert(accepting_conns.empty()); + return seastar::parallel_for_each(connections, [] (auto conn) { + return conn.second->close(); + }); + }).finally([this] { + ceph_assert(connections.empty()); + }); +} + +seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + if (!get_myaddr().is_blank_ip()) { + // already learned or binded + return seastar::now(); + } + + // Only learn IP address if blank. + entity_addr_t addr = get_myaddr(); + addr.u = peer_addr_for_me.u; + addr.set_type(peer_addr_for_me.get_type()); + addr.set_port(get_myaddr().get_port()); + return set_myaddrs(entity_addrvec_t{addr}); +} + +void SocketMessenger::set_default_policy(const SocketPolicy& p) +{ + policy_set.set_default(p); +} + +void SocketMessenger::set_policy(entity_type_t peer_type, + const SocketPolicy& p) +{ + policy_set.set(peer_type, p); +} + +void SocketMessenger::set_policy_throttler(entity_type_t peer_type, + Throttle* throttle) +{ + // only byte throttler is used in OSD + policy_set.set_throttlers(peer_type, throttle, nullptr); +} + +seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr) +{ + ceph_assert(addr.get_family() == AF_INET); + if (master_sid >= 0) { + return master_sid; + } + std::size_t seed = 0; + boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr); + //boost::hash_combine(seed, addr.u.sin.sin_port); + //boost::hash_combine(seed, addr.nonce); + return seed % seastar::smp::count; +} + +ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) +{ + if (auto found = connections.find(addr); + found != connections.end()) { + return found->second; + } else { + return nullptr; + } +} + +void SocketMessenger::accept_conn(SocketConnectionRef conn) +{ + accepting_conns.insert(conn); +} + +void SocketMessenger::unaccept_conn(SocketConnectionRef conn) +{ + accepting_conns.erase(conn); +} + +void SocketMessenger::register_conn(SocketConnectionRef conn) +{ + if (master_sid >= 0) { + ceph_assert(static_cast<int>(sid) == master_sid); + } + auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); + std::ignore = i; + ceph_assert(added); +} + +void SocketMessenger::unregister_conn(SocketConnectionRef conn) +{ + ceph_assert(conn); + auto found = connections.find(conn->get_peer_addr()); + ceph_assert(found != connections.end()); + ceph_assert(found->second == conn); + connections.erase(found); +} diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h new file mode 100644 index 00000000..535dea3a --- /dev/null +++ b/src/crimson/net/SocketMessenger.h @@ -0,0 +1,119 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include <map> +#include <optional> +#include <set> +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/sharded.hh> + +#include "Messenger.h" +#include "SocketConnection.h" + +namespace ceph::net { + +class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> { + const int master_sid; + const seastar::shard_id sid; + seastar::promise<> shutdown_promise; + + std::optional<seastar::server_socket> listener; + Dispatcher *dispatcher = nullptr; + std::map<entity_addr_t, SocketConnectionRef> connections; + std::set<SocketConnectionRef> accepting_conns; + ceph::net::PolicySet<Throttle> policy_set; + // Distinguish messengers with meaningful names for debugging + const std::string logic_name; + const uint32_t nonce; + + seastar::future<> accept(seastar::connected_socket socket, + seastar::socket_address paddr); + + void do_bind(const entity_addrvec_t& addr); + seastar::future<> do_start(Dispatcher *disp); + seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + seastar::future<> do_shutdown(); + // conn sharding options: + // 0. Compatible (master_sid >= 0): place all connections to one master shard + // 1. Simplest (master_sid < 0): sharded by ip only + // 2. Balanced (not implemented): sharded by ip + port + nonce, + // but, need to move SocketConnection between cores. + seastar::shard_id locate_shard(const entity_addr_t& addr); + + public: + SocketMessenger(const entity_name_t& myname, + const std::string& logic_name, + uint32_t nonce, + int master_sid); + + seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override; + + // Messenger interfaces are assumed to be called from its own shard, but its + // behavior should be symmetric when called from any shard. + seastar::future<> bind(const entity_addrvec_t& addr) override; + + seastar::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) override; + + seastar::future<> start(Dispatcher *dispatcher) override; + + seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) override; + // can only wait once + seastar::future<> wait() override { + return shutdown_promise.get_future(); + } + + seastar::future<> shutdown() override; + + Messenger* get_local_shard() override { + return &container().local(); + } + + void print(ostream& out) const override { + out << get_myname() + << "(" << logic_name + << ") " << get_myaddr(); + } + + void set_default_policy(const SocketPolicy& p) override; + + void set_policy(entity_type_t peer_type, const SocketPolicy& p) override; + + void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override; + + public: + seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me); + + SocketConnectionRef lookup_conn(const entity_addr_t& addr); + void accept_conn(SocketConnectionRef); + void unaccept_conn(SocketConnectionRef); + void register_conn(SocketConnectionRef); + void unregister_conn(SocketConnectionRef); + + // required by sharded<> + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + + seastar::shard_id shard_id() const { + return sid; + } +}; + +} // namespace ceph::net diff --git a/src/crimson/os/CMakeLists.txt b/src/crimson/os/CMakeLists.txt new file mode 100644 index 00000000..264e3525 --- /dev/null +++ b/src/crimson/os/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library(crimson-os + cyan_store.cc + cyan_collection.cc + cyan_object.cc + Transaction.cc) +target_link_libraries(crimson-os + crimson) diff --git a/src/crimson/os/Transaction.cc b/src/crimson/os/Transaction.cc new file mode 100644 index 00000000..acf99c59 --- /dev/null +++ b/src/crimson/os/Transaction.cc @@ -0,0 +1,507 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Transaction.h" +#include "include/denc.h" + +namespace ceph::os +{ + +void Transaction::iterator::decode_attrset_bl(bufferlist *pbl) +{ + using ceph::decode; + auto start = data_bl_p; + __u32 n; + decode(n, data_bl_p); + unsigned len = 4; + while (n--) { + __u32 l; + decode(l, data_bl_p); + data_bl_p.advance(l); + len += 4 + l; + decode(l, data_bl_p); + data_bl_p.advance(l); + len += 4 + l; + } + start.copy(len, *pbl); +} + +void Transaction::iterator::decode_keyset_bl(bufferlist *pbl) +{ + using ceph::decode; + auto start = data_bl_p; + __u32 n; + decode(n, data_bl_p); + unsigned len = 4; + while (n--) { + __u32 l; + decode(l, data_bl_p); + data_bl_p.advance(l); + len += 4 + l; + } + start.copy(len, *pbl); +} + +void Transaction::dump(ceph::Formatter *f) +{ + f->open_array_section("ops"); + iterator i = begin(); + int op_num = 0; + bool stop_looping = false; + while (i.have_op() && !stop_looping) { + Transaction::Op *op = i.decode_op(); + f->open_object_section("op"); + f->dump_int("op_num", op_num); + + switch (op->op) { + case Transaction::OP_NOP: + f->dump_string("op_name", "nop"); + break; + case Transaction::OP_TOUCH: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "touch"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_WRITE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t off = op->off; + uint64_t len = op->len; + bufferlist bl; + i.decode_bl(bl); + f->dump_string("op_name", "write"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_unsigned("length", len); + f->dump_unsigned("offset", off); + f->dump_unsigned("bufferlist length", bl.length()); + } + break; + + case Transaction::OP_ZERO: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t off = op->off; + uint64_t len = op->len; + f->dump_string("op_name", "zero"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_unsigned("offset", off); + f->dump_unsigned("length", len); + } + break; + + case Transaction::OP_TRIMCACHE: + { + // deprecated, no-op + f->dump_string("op_name", "trim_cache"); + } + break; + + case Transaction::OP_TRUNCATE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t off = op->off; + f->dump_string("op_name", "truncate"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_unsigned("offset", off); + } + break; + + case Transaction::OP_REMOVE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "remove"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_SETATTR: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + string name = i.decode_string(); + bufferlist bl; + i.decode_bl(bl); + f->dump_string("op_name", "setattr"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_string("name", name); + f->dump_unsigned("length", bl.length()); + } + break; + + case Transaction::OP_SETATTRS: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + map<string, bufferptr> aset; + i.decode_attrset(aset); + f->dump_string("op_name", "setattrs"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->open_object_section("attr_lens"); + for (map<string,bufferptr>::iterator p = aset.begin(); + p != aset.end(); ++p) { + f->dump_unsigned(p->first.c_str(), p->second.length()); + } + f->close_section(); + } + break; + + case Transaction::OP_RMATTR: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + string name = i.decode_string(); + f->dump_string("op_name", "rmattr"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_string("name", name); + } + break; + + case Transaction::OP_RMATTRS: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "rmattrs"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_CLONE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + ghobject_t noid = i.get_oid(op->dest_oid); + f->dump_string("op_name", "clone"); + f->dump_stream("collection") << cid; + f->dump_stream("src_oid") << oid; + f->dump_stream("dst_oid") << noid; + } + break; + + case Transaction::OP_CLONERANGE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + ghobject_t noid = i.get_oid(op->dest_oid); + uint64_t off = op->off; + uint64_t len = op->len; + f->dump_string("op_name", "clonerange"); + f->dump_stream("collection") << cid; + f->dump_stream("src_oid") << oid; + f->dump_stream("dst_oid") << noid; + f->dump_unsigned("offset", off); + f->dump_unsigned("len", len); + } + break; + + case Transaction::OP_CLONERANGE2: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + ghobject_t noid = i.get_oid(op->dest_oid); + uint64_t srcoff = op->off; + uint64_t len = op->len; + uint64_t dstoff = op->dest_off; + f->dump_string("op_name", "clonerange2"); + f->dump_stream("collection") << cid; + f->dump_stream("src_oid") << oid; + f->dump_stream("dst_oid") << noid; + f->dump_unsigned("src_offset", srcoff); + f->dump_unsigned("len", len); + f->dump_unsigned("dst_offset", dstoff); + } + break; + + case Transaction::OP_MKCOLL: + { + coll_t cid = i.get_cid(op->cid); + f->dump_string("op_name", "mkcoll"); + f->dump_stream("collection") << cid; + } + break; + + case Transaction::OP_COLL_HINT: + { + using ceph::decode; + coll_t cid = i.get_cid(op->cid); + uint32_t type = op->hint_type; + f->dump_string("op_name", "coll_hint"); + f->dump_stream("collection") << cid; + f->dump_unsigned("type", type); + bufferlist hint; + i.decode_bl(hint); + auto hiter = hint.cbegin(); + if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) { + uint32_t pg_num; + uint64_t num_objs; + decode(pg_num, hiter); + decode(num_objs, hiter); + f->dump_unsigned("pg_num", pg_num); + f->dump_unsigned("expected_num_objects", num_objs); + } + } + break; + + case Transaction::OP_COLL_SET_BITS: + { + coll_t cid = i.get_cid(op->cid); + f->dump_string("op_name", "coll_set_bits"); + f->dump_stream("collection") << cid; + f->dump_unsigned("bits", op->split_bits); + } + break; + + case Transaction::OP_RMCOLL: + { + coll_t cid = i.get_cid(op->cid); + f->dump_string("op_name", "rmcoll"); + f->dump_stream("collection") << cid; + } + break; + + case Transaction::OP_COLL_ADD: + { + coll_t ocid = i.get_cid(op->cid); + coll_t ncid = i.get_cid(op->dest_cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "collection_add"); + f->dump_stream("src_collection") << ocid; + f->dump_stream("dst_collection") << ncid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_COLL_REMOVE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "collection_remove"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_COLL_MOVE: + { + coll_t ocid = i.get_cid(op->cid); + coll_t ncid = i.get_cid(op->dest_cid); + ghobject_t oid = i.get_oid(op->oid); + f->open_object_section("collection_move"); + f->dump_stream("src_collection") << ocid; + f->dump_stream("dst_collection") << ncid; + f->dump_stream("oid") << oid; + f->close_section(); + } + break; + + case Transaction::OP_COLL_SETATTR: + { + coll_t cid = i.get_cid(op->cid); + string name = i.decode_string(); + bufferlist bl; + i.decode_bl(bl); + f->dump_string("op_name", "collection_setattr"); + f->dump_stream("collection") << cid; + f->dump_string("name", name); + f->dump_unsigned("length", bl.length()); + } + break; + + case Transaction::OP_COLL_RMATTR: + { + coll_t cid = i.get_cid(op->cid); + string name = i.decode_string(); + f->dump_string("op_name", "collection_rmattr"); + f->dump_stream("collection") << cid; + f->dump_string("name", name); + } + break; + + case Transaction::OP_COLL_RENAME: + { + f->dump_string("op_name", "collection_rename"); + } + break; + + case Transaction::OP_OMAP_CLEAR: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + f->dump_string("op_name", "omap_clear"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + } + break; + + case Transaction::OP_OMAP_SETKEYS: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + map<string, bufferlist> aset; + i.decode_attrset(aset); + f->dump_string("op_name", "omap_setkeys"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->open_object_section("attr_lens"); + for (map<string, bufferlist>::iterator p = aset.begin(); + p != aset.end(); ++p) { + f->dump_unsigned(p->first.c_str(), p->second.length()); + } + f->close_section(); + } + break; + + case Transaction::OP_OMAP_RMKEYS: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + set<string> keys; + i.decode_keyset(keys); + f->dump_string("op_name", "omap_rmkeys"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->open_array_section("attrs"); + for (auto& k : keys) { + f->dump_string("", k.c_str()); + } + f->close_section(); + } + break; + + case Transaction::OP_OMAP_SETHEADER: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + bufferlist bl; + i.decode_bl(bl); + f->dump_string("op_name", "omap_setheader"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_stream("header_length") << bl.length(); + } + break; + + case Transaction::OP_SPLIT_COLLECTION: + { + coll_t cid = i.get_cid(op->cid); + uint32_t bits = op->split_bits; + uint32_t rem = op->split_rem; + coll_t dest = i.get_cid(op->dest_cid); + f->dump_string("op_name", "op_split_collection_create"); + f->dump_stream("collection") << cid; + f->dump_stream("bits") << bits; + f->dump_stream("rem") << rem; + f->dump_stream("dest") << dest; + } + break; + + case Transaction::OP_SPLIT_COLLECTION2: + { + coll_t cid = i.get_cid(op->cid); + uint32_t bits = op->split_bits; + uint32_t rem = op->split_rem; + coll_t dest = i.get_cid(op->dest_cid); + f->dump_string("op_name", "op_split_collection"); + f->dump_stream("collection") << cid; + f->dump_stream("bits") << bits; + f->dump_stream("rem") << rem; + f->dump_stream("dest") << dest; + } + break; + + case Transaction::OP_MERGE_COLLECTION: + { + coll_t cid = i.get_cid(op->cid); + uint32_t bits = op->split_bits; + coll_t dest = i.get_cid(op->dest_cid); + f->dump_string("op_name", "op_merge_collection"); + f->dump_stream("collection") << cid; + f->dump_stream("dest") << dest; + f->dump_stream("bits") << bits; + } + break; + + case Transaction::OP_OMAP_RMKEYRANGE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + string first, last; + first = i.decode_string(); + last = i.decode_string(); + f->dump_string("op_name", "op_omap_rmkeyrange"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_string("first", first); + f->dump_string("last", last); + } + break; + + case Transaction::OP_COLL_MOVE_RENAME: + { + coll_t old_cid = i.get_cid(op->cid); + ghobject_t old_oid = i.get_oid(op->oid); + coll_t new_cid = i.get_cid(op->dest_cid); + ghobject_t new_oid = i.get_oid(op->dest_oid); + f->dump_string("op_name", "op_coll_move_rename"); + f->dump_stream("old_collection") << old_cid; + f->dump_stream("old_oid") << old_oid; + f->dump_stream("new_collection") << new_cid; + f->dump_stream("new_oid") << new_oid; + } + break; + + case Transaction::OP_TRY_RENAME: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t old_oid = i.get_oid(op->oid); + ghobject_t new_oid = i.get_oid(op->dest_oid); + f->dump_string("op_name", "op_coll_move_rename"); + f->dump_stream("collection") << cid; + f->dump_stream("old_oid") << old_oid; + f->dump_stream("new_oid") << new_oid; + } + break; + + case Transaction::OP_SETALLOCHINT: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t expected_object_size = op->expected_object_size; + uint64_t expected_write_size = op->expected_write_size; + f->dump_string("op_name", "op_setallochint"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_stream("expected_object_size") << expected_object_size; + f->dump_stream("expected_write_size") << expected_write_size; + } + break; + + default: + f->dump_string("op_name", "unknown"); + f->dump_unsigned("op_code", op->op); + stop_looping = true; + break; + } + f->close_section(); + op_num++; + } + f->close_section(); +} + +} diff --git a/src/crimson/os/Transaction.h b/src/crimson/os/Transaction.h new file mode 100644 index 00000000..c6f660fe --- /dev/null +++ b/src/crimson/os/Transaction.h @@ -0,0 +1,1234 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <map> + +#include "include/int_types.h" +#include "include/buffer.h" +#include "osd/osd_types.h" + +/********************************* + * transaction + * + * A Transaction represents a sequence of primitive mutation + * operations. + * + * Three events in the life of a Transaction result in + * callbacks. Any Transaction can contain any number of callback + * objects (Context) for any combination of the three classes of + * callbacks: + * + * on_applied_sync, on_applied, and on_commit. + * + * The "on_applied" and "on_applied_sync" callbacks are invoked when + * the modifications requested by the Transaction are visible to + * subsequent ObjectStore operations, i.e., the results are + * readable. The only conceptual difference between on_applied and + * on_applied_sync is the specific thread and locking environment in + * which the callbacks operate. "on_applied_sync" is called + * directly by an ObjectStore execution thread. It is expected to + * execute quickly and must not acquire any locks of the calling + * environment. Conversely, "on_applied" is called from the separate + * Finisher thread, meaning that it can contend for calling + * environment locks. NB, on_applied and on_applied_sync are + * sometimes called on_readable and on_readable_sync. + * + * The "on_commit" callback is also called from the Finisher thread + * and indicates that all of the mutations have been durably + * committed to stable storage (i.e., are now software/hardware + * crashproof). + * + * At the implementation level, each mutation primitive (and its + * associated data) can be serialized to a single buffer. That + * serialization, however, does not copy any data, but (using the + * bufferlist library) will reference the original buffers. This + * implies that the buffer that contains the data being submitted + * must remain stable until the on_commit callback completes. In + * practice, bufferlist handles all of this for you and this + * subtlety is only relevant if you are referencing an existing + * buffer via buffer::raw_static. + * + * Some implementations of ObjectStore choose to implement their own + * form of journaling that uses the serialized form of a + * Transaction. This requires that the encode/decode logic properly + * version itself and handle version upgrades that might change the + * format of the encoded Transaction. This has already happened a + * couple of times and the Transaction object contains some helper + * variables that aid in this legacy decoding: + * + * sobject_encoding detects an older/simpler version of oid + * present in pre-bobtail versions of ceph. use_pool_override + * also detects a situation where the pool of an oid can be + * overridden for legacy operations/buffers. For non-legacy + * implementations of ObjectStore, neither of these fields are + * relevant. + * + * + * TRANSACTION ISOLATION + * + * Except as noted above, isolation is the responsibility of the + * caller. In other words, if any storage element (storage element + * == any of the four portions of an object as described above) is + * altered by a transaction (including deletion), the caller + * promises not to attempt to read that element while the + * transaction is pending (here pending means from the time of + * issuance until the "on_applied_sync" callback has been + * received). Violations of isolation need not be detected by + * ObjectStore and there is no corresponding error mechanism for + * reporting an isolation violation (crashing would be the + * appropriate way to report an isolation violation if detected). + * + * Enumeration operations may violate transaction isolation as + * described above when a storage element is being created or + * deleted as part of a transaction. In this case, ObjectStore is + * allowed to consider the enumeration operation to either precede + * or follow the violating transaction element. In other words, the + * presence/absence of the mutated element in the enumeration is + * entirely at the discretion of ObjectStore. The arbitrary ordering + * applies independently to each transaction element. For example, + * if a transaction contains two mutating elements "create A" and + * "delete B". And an enumeration operation is performed while this + * transaction is pending. It is permissible for ObjectStore to + * report any of the four possible combinations of the existence of + * A and B. + * + */ +namespace ceph::os { +class Transaction { +public: + enum { + OP_NOP = 0, + OP_TOUCH = 9, // cid, oid + OP_WRITE = 10, // cid, oid, offset, len, bl + OP_ZERO = 11, // cid, oid, offset, len + OP_TRUNCATE = 12, // cid, oid, len + OP_REMOVE = 13, // cid, oid + OP_SETATTR = 14, // cid, oid, attrname, bl + OP_SETATTRS = 15, // cid, oid, attrset + OP_RMATTR = 16, // cid, oid, attrname + OP_CLONE = 17, // cid, oid, newoid + OP_CLONERANGE = 18, // cid, oid, newoid, offset, len + OP_CLONERANGE2 = 30, // cid, oid, newoid, srcoff, len, dstoff + + OP_TRIMCACHE = 19, // cid, oid, offset, len **DEPRECATED** + + OP_MKCOLL = 20, // cid + OP_RMCOLL = 21, // cid + OP_COLL_ADD = 22, // cid, oldcid, oid + OP_COLL_REMOVE = 23, // cid, oid + OP_COLL_SETATTR = 24, // cid, attrname, bl + OP_COLL_RMATTR = 25, // cid, attrname + OP_COLL_SETATTRS = 26, // cid, attrset + OP_COLL_MOVE = 8, // newcid, oldcid, oid + + OP_RMATTRS = 28, // cid, oid + OP_COLL_RENAME = 29, // cid, newcid + + OP_OMAP_CLEAR = 31, // cid + OP_OMAP_SETKEYS = 32, // cid, attrset + OP_OMAP_RMKEYS = 33, // cid, keyset + OP_OMAP_SETHEADER = 34, // cid, header + OP_SPLIT_COLLECTION = 35, // cid, bits, destination + OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination + doesn't create the destination */ + OP_OMAP_RMKEYRANGE = 37, // cid, oid, firstkey, lastkey + OP_COLL_MOVE_RENAME = 38, // oldcid, oldoid, newcid, newoid + OP_SETALLOCHINT = 39, // cid, oid, object_size, write_size + OP_COLL_HINT = 40, // cid, type, bl + + OP_TRY_RENAME = 41, // oldcid, oldoid, newoid + + OP_COLL_SET_BITS = 42, // cid, bits + + OP_MERGE_COLLECTION = 43, // cid, destination + }; + + // Transaction hint type + enum { + COLL_HINT_EXPECTED_NUM_OBJECTS = 1, + }; + + struct Op { + ceph_le32 op; + ceph_le32 cid; + ceph_le32 oid; + ceph_le64 off; + ceph_le64 len; + ceph_le32 dest_cid; + ceph_le32 dest_oid; //OP_CLONE, OP_CLONERANGE + ceph_le64 dest_off; //OP_CLONERANGE + union { + struct { + ceph_le32 hint_type; //OP_COLL_HINT + }; + struct { + ceph_le32 alloc_hint_flags; //OP_SETALLOCHINT + }; + }; + ceph_le64 expected_object_size; //OP_SETALLOCHINT + ceph_le64 expected_write_size; //OP_SETALLOCHINT + ceph_le32 split_bits; //OP_SPLIT_COLLECTION2,OP_COLL_SET_BITS, + //OP_MKCOLL + ceph_le32 split_rem; //OP_SPLIT_COLLECTION2 + } __attribute__ ((packed)) ; + + struct TransactionData { + ceph_le64 ops; + ceph_le32 largest_data_len; + ceph_le32 largest_data_off; + ceph_le32 largest_data_off_in_data_bl; + ceph_le32 fadvise_flags; + + TransactionData() noexcept : + ops(init_le64(0)), + largest_data_len(init_le32(0)), + largest_data_off(init_le32(0)), + largest_data_off_in_data_bl(init_le32(0)), + fadvise_flags(init_le32(0)) { } + + // override default move operations to reset default values + TransactionData(TransactionData&& other) noexcept : + ops(other.ops), + largest_data_len(other.largest_data_len), + largest_data_off(other.largest_data_off), + largest_data_off_in_data_bl(other.largest_data_off_in_data_bl), + fadvise_flags(other.fadvise_flags) { + other.ops = 0; + other.largest_data_len = 0; + other.largest_data_off = 0; + other.largest_data_off_in_data_bl = 0; + other.fadvise_flags = 0; + } + TransactionData& operator=(TransactionData&& other) noexcept { + ops = other.ops; + largest_data_len = other.largest_data_len; + largest_data_off = other.largest_data_off; + largest_data_off_in_data_bl = other.largest_data_off_in_data_bl; + fadvise_flags = other.fadvise_flags; + other.ops = 0; + other.largest_data_len = 0; + other.largest_data_off = 0; + other.largest_data_off_in_data_bl = 0; + other.fadvise_flags = 0; + return *this; + } + + TransactionData(const TransactionData& other) = default; + TransactionData& operator=(const TransactionData& other) = default; + + void encode(bufferlist& bl) const { + bl.append((char*)this, sizeof(TransactionData)); + } + void decode(bufferlist::const_iterator &bl) { + bl.copy(sizeof(TransactionData), (char*)this); + } + } __attribute__ ((packed)) ; + +private: + TransactionData data; + + std::map<coll_t, __le32> coll_index; + std::map<ghobject_t, __le32> object_index; + + __le32 coll_id {0}; + __le32 object_id {0}; + + bufferlist data_bl; + bufferlist op_bl; + + std::list<Context *> on_applied; + std::list<Context *> on_commit; + std::list<Context *> on_applied_sync; + +public: + Transaction() = default; + + explicit Transaction(bufferlist::const_iterator &dp) { + decode(dp); + } + explicit Transaction(bufferlist &nbl) { + auto dp = nbl.cbegin(); + decode(dp); + } + + // override default move operations to reset default values + Transaction(Transaction&& other) noexcept : + data(std::move(other.data)), + coll_index(std::move(other.coll_index)), + object_index(std::move(other.object_index)), + coll_id(other.coll_id), + object_id(other.object_id), + data_bl(std::move(other.data_bl)), + op_bl(std::move(other.op_bl)), + on_applied(std::move(other.on_applied)), + on_commit(std::move(other.on_commit)), + on_applied_sync(std::move(other.on_applied_sync)) { + other.coll_id = 0; + other.object_id = 0; + } + + Transaction& operator=(Transaction&& other) noexcept { + data = std::move(other.data); + coll_index = std::move(other.coll_index); + object_index = std::move(other.object_index); + coll_id = other.coll_id; + object_id = other.object_id; + data_bl = std::move(other.data_bl); + op_bl = std::move(other.op_bl); + on_applied = std::move(other.on_applied); + on_commit = std::move(other.on_commit); + on_applied_sync = std::move(other.on_applied_sync); + other.coll_id = 0; + other.object_id = 0; + return *this; + } + + Transaction(const Transaction& other) = default; + Transaction& operator=(const Transaction& other) = default; + + // expose object_index for FileStore::Op's benefit + const map<ghobject_t, __le32>& get_object_index() const { + return object_index; + } + + /* Operations on callback contexts */ + void register_on_applied(Context *c) { + if (!c) return; + on_applied.push_back(c); + } + void register_on_commit(Context *c) { + if (!c) return; + on_commit.push_back(c); + } + void register_on_applied_sync(Context *c) { + if (!c) return; + on_applied_sync.push_back(c); + } + void register_on_complete(Context *c) { + if (!c) return; + RunOnDeleteRef _complete (std::make_shared<RunOnDelete>(c)); + register_on_applied(new ContainerContext<RunOnDeleteRef>(_complete)); + register_on_commit(new ContainerContext<RunOnDeleteRef>(_complete)); + } + bool has_contexts() const { + return + !on_commit.empty() || + !on_applied.empty() || + !on_applied_sync.empty(); + } + + static void collect_contexts(vector<Transaction>& t, + Context **out_on_applied, + Context **out_on_commit, + Context **out_on_applied_sync) { + ceph_assert(out_on_applied); + ceph_assert(out_on_commit); + ceph_assert(out_on_applied_sync); + std::list<Context *> on_applied, on_commit, on_applied_sync; + for (auto& i : t) { + on_applied.splice(on_applied.end(), i.on_applied); + on_commit.splice(on_commit.end(), i.on_commit); + on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync); + } + *out_on_applied = C_Contexts::list_to_context(on_applied); + *out_on_commit = C_Contexts::list_to_context(on_commit); + *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); + } + static void collect_contexts(vector<Transaction>& t, + std::list<Context*> *out_on_applied, + std::list<Context*> *out_on_commit, + std::list<Context*> *out_on_applied_sync) { + ceph_assert(out_on_applied); + ceph_assert(out_on_commit); + ceph_assert(out_on_applied_sync); + for (auto& i : t) { + out_on_applied->splice(out_on_applied->end(), i.on_applied); + out_on_commit->splice(out_on_commit->end(), i.on_commit); + out_on_applied_sync->splice(out_on_applied_sync->end(), + i.on_applied_sync); + } + } + + Context *get_on_applied() { + return C_Contexts::list_to_context(on_applied); + } + Context *get_on_commit() { + return C_Contexts::list_to_context(on_commit); + } + Context *get_on_applied_sync() { + return C_Contexts::list_to_context(on_applied_sync); + } + + void set_fadvise_flags(uint32_t flags) { + data.fadvise_flags = flags; + } + void set_fadvise_flag(uint32_t flag) { + data.fadvise_flags = data.fadvise_flags | flag; + } + uint32_t get_fadvise_flags() { return data.fadvise_flags; } + + void swap(Transaction& other) noexcept { + std::swap(data, other.data); + std::swap(on_applied, other.on_applied); + std::swap(on_commit, other.on_commit); + std::swap(on_applied_sync, other.on_applied_sync); + + std::swap(coll_index, other.coll_index); + std::swap(object_index, other.object_index); + std::swap(coll_id, other.coll_id); + std::swap(object_id, other.object_id); + op_bl.swap(other.op_bl); + data_bl.swap(other.data_bl); + } + + void _update_op(Op* op, + vector<__le32> &cm, + vector<__le32> &om) { + + switch (op->op) { + case OP_NOP: + break; + + case OP_TOUCH: + case OP_REMOVE: + case OP_SETATTR: + case OP_SETATTRS: + case OP_RMATTR: + case OP_RMATTRS: + case OP_COLL_REMOVE: + case OP_OMAP_CLEAR: + case OP_OMAP_SETKEYS: + case OP_OMAP_RMKEYS: + case OP_OMAP_RMKEYRANGE: + case OP_OMAP_SETHEADER: + case OP_WRITE: + case OP_ZERO: + case OP_TRUNCATE: + case OP_SETALLOCHINT: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + break; + + case OP_CLONERANGE2: + case OP_CLONE: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_MKCOLL: + case OP_RMCOLL: + case OP_COLL_SETATTR: + case OP_COLL_RMATTR: + case OP_COLL_SETATTRS: + case OP_COLL_HINT: + case OP_COLL_SET_BITS: + ceph_assert(op->cid < cm.size()); + op->cid = cm[op->cid]; + break; + + case OP_COLL_ADD: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_cid < om.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + op->oid = om[op->oid]; + break; + + case OP_COLL_MOVE_RENAME: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_cid < cm.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_cid = cm[op->dest_cid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_TRY_RENAME: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_SPLIT_COLLECTION2: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->dest_cid < cm.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + break; + + case OP_MERGE_COLLECTION: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->dest_cid < cm.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + break; + + default: + ceph_abort_msg("Unknown OP"); + } + } + void _update_op_bl( + bufferlist& bl, + vector<__le32> &cm, + vector<__le32> &om) { + for (auto& bp : bl.buffers()) { + ceph_assert(bp.length() % sizeof(Op) == 0); + + char* raw_p = const_cast<char*>(bp.c_str()); + char* raw_end = raw_p + bp.length(); + while (raw_p < raw_end) { + _update_op(reinterpret_cast<Op*>(raw_p), cm, om); + raw_p += sizeof(Op); + } + } + } + /// Append the operations of the parameter to this Transaction. Those + /// operations are removed from the parameter Transaction + void append(Transaction& other) { + + data.ops = data.ops + other.data.ops; + if (other.data.largest_data_len > data.largest_data_len) { + data.largest_data_len = other.data.largest_data_len; + data.largest_data_off = other.data.largest_data_off; + data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl; + } + data.fadvise_flags = data.fadvise_flags | other.data.fadvise_flags; + on_applied.splice(on_applied.end(), other.on_applied); + on_commit.splice(on_commit.end(), other.on_commit); + on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); + + //append coll_index & object_index + vector<__le32> cm(other.coll_index.size()); + map<coll_t, __le32>::iterator coll_index_p; + for (coll_index_p = other.coll_index.begin(); + coll_index_p != other.coll_index.end(); + ++coll_index_p) { + cm[coll_index_p->second] = _get_coll_id(coll_index_p->first); + } + + vector<__le32> om(other.object_index.size()); + map<ghobject_t, __le32>::iterator object_index_p; + for (object_index_p = other.object_index.begin(); + object_index_p != other.object_index.end(); + ++object_index_p) { + om[object_index_p->second] = _get_object_id(object_index_p->first); + } + + //the other.op_bl SHOULD NOT be changes during append operation, + //we use additional bufferlist to avoid this problem + bufferlist other_op_bl; + { + bufferptr other_op_bl_ptr(other.op_bl.length()); + other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str()); + other_op_bl.append(std::move(other_op_bl_ptr)); + } + + //update other_op_bl with cm & om + //When the other is appended to current transaction, all coll_index and + //object_index in other.op_buffer should be updated by new index of the + //combined transaction + _update_op_bl(other_op_bl, cm, om); + + //append op_bl + op_bl.append(other_op_bl); + //append data_bl + data_bl.append(other.data_bl); + } + + /** Inquires about the Transaction as a whole. */ + + /// How big is the encoded Transaction buffer? + uint64_t get_encoded_bytes() { + //layout: data_bl + op_bl + coll_index + object_index + data + + // coll_index size, object_index size and sizeof(transaction_data) + // all here, so they may be computed at compile-time + size_t final_size = sizeof(__u32) * 2 + sizeof(data); + + // coll_index second and object_index second + final_size += (coll_index.size() + object_index.size()) * sizeof(__le32); + + // coll_index first + for (auto p = coll_index.begin(); p != coll_index.end(); ++p) { + final_size += p->first.encoded_size(); + } + + // object_index first + for (auto p = object_index.begin(); p != object_index.end(); ++p) { + final_size += p->first.encoded_size(); + } + + return data_bl.length() + + op_bl.length() + + final_size; + } + + /// Retain old version for regression testing purposes + uint64_t get_encoded_bytes_test() { + using ceph::encode; + //layout: data_bl + op_bl + coll_index + object_index + data + bufferlist bl; + encode(coll_index, bl); + encode(object_index, bl); + + return data_bl.length() + + op_bl.length() + + bl.length() + + sizeof(data); + } + + uint64_t get_num_bytes() { + return get_encoded_bytes(); + } + /// Size of largest data buffer to the "write" operation encountered so far + uint32_t get_data_length() { + return data.largest_data_len; + } + /// offset within the encoded buffer to the start of the largest data buffer + /// that's encoded + uint32_t get_data_offset() { + if (data.largest_data_off_in_data_bl) { + return data.largest_data_off_in_data_bl + + sizeof(__u8) + // encode struct_v + sizeof(__u8) + // encode compat_v + sizeof(__u32) + // encode len + sizeof(__u32); // data_bl len + } + return 0; // none + } + /// offset of buffer as aligned to destination within object. + int get_data_alignment() { + if (!data.largest_data_len) + return 0; + return (0 - get_data_offset()) & ~CEPH_PAGE_MASK; + } + /// Is the Transaction empty (no operations) + bool empty() { + return !data.ops; + } + /// Number of operations in the transaction + int get_num_ops() { + return data.ops; + } + + /** + * iterator + * + * Helper object to parse Transactions. + * + * ObjectStore instances use this object to step down the encoded + * buffer decoding operation codes and parameters as we go. + * + */ + class iterator { + Transaction *t; + + uint64_t ops; + char* op_buffer_p; + + bufferlist::const_iterator data_bl_p; + + public: + vector<coll_t> colls; + vector<ghobject_t> objects; + + private: + explicit iterator(Transaction *t) + : t(t), + data_bl_p(t->data_bl.cbegin()), + colls(t->coll_index.size()), + objects(t->object_index.size()) { + + ops = t->data.ops; + op_buffer_p = t->op_bl.c_str(); + + map<coll_t, __le32>::iterator coll_index_p; + for (coll_index_p = t->coll_index.begin(); + coll_index_p != t->coll_index.end(); + ++coll_index_p) { + colls[coll_index_p->second] = coll_index_p->first; + } + + map<ghobject_t, __le32>::iterator object_index_p; + for (object_index_p = t->object_index.begin(); + object_index_p != t->object_index.end(); + ++object_index_p) { + objects[object_index_p->second] = object_index_p->first; + } + } + + friend class Transaction; + + public: + + bool have_op() { + return ops > 0; + } + Op* decode_op() { + ceph_assert(ops > 0); + + Op* op = reinterpret_cast<Op*>(op_buffer_p); + op_buffer_p += sizeof(Op); + ops--; + + return op; + } + string decode_string() { + using ceph::decode; + string s; + decode(s, data_bl_p); + return s; + } + void decode_bp(bufferptr& bp) { + using ceph::decode; + decode(bp, data_bl_p); + } + void decode_bl(bufferlist& bl) { + using ceph::decode; + decode(bl, data_bl_p); + } + void decode_attrset(map<string,bufferptr>& aset) { + using ceph::decode; + decode(aset, data_bl_p); + } + void decode_attrset(map<string,bufferlist>& aset) { + using ceph::decode; + decode(aset, data_bl_p); + } + void decode_attrset_bl(bufferlist *pbl); + void decode_keyset(set<string> &keys){ + using ceph::decode; + decode(keys, data_bl_p); + } + void decode_keyset_bl(bufferlist *pbl); + + const ghobject_t &get_oid(__le32 oid_id) { + ceph_assert(oid_id < objects.size()); + return objects[oid_id]; + } + const coll_t &get_cid(__le32 cid_id) { + ceph_assert(cid_id < colls.size()); + return colls[cid_id]; + } + uint32_t get_fadvise_flags() const { + return t->get_fadvise_flags(); + } + }; + + iterator begin() { + return iterator(this); + } + +private: + void _build_actions_from_tbl(); + + static constexpr size_t OPS_PER_PTR = 32u; + /** + * Helper functions to encode the various mutation elements of a + * transaction. These are 1:1 with the operation codes (see + * enumeration above). These routines ensure that the + * encoder/creator of a transaction gets the right data in the + * right place. Sadly, there's no corresponding version nor any + * form of seat belts for the decoder. + */ + Op* _get_next_op() { + if (op_bl.get_append_buffer_unused_tail_length() < sizeof(Op)) { + op_bl.reserve(sizeof(Op) * OPS_PER_PTR); + } + // append_hole ensures bptr merging. Even huge number of ops + // shouldn't result in overpopulating bl::_buffers. + char* const p = op_bl.append_hole(sizeof(Op)).c_str(); + memset(p, 0, sizeof(Op)); + return reinterpret_cast<Op*>(p); + } + __le32 _get_coll_id(const coll_t& coll) { + map<coll_t, __le32>::iterator c = coll_index.find(coll); + if (c != coll_index.end()) + return c->second; + + __le32 index_id = coll_id++; + coll_index[coll] = index_id; + return index_id; + } + __le32 _get_object_id(const ghobject_t& oid) { + map<ghobject_t, __le32>::iterator o = object_index.find(oid); + if (o != object_index.end()) + return o->second; + + __le32 index_id = object_id++; + object_index[oid] = index_id; + return index_id; + } + +public: + /// noop. 'nuf said + void nop() { + Op* _op = _get_next_op(); + _op->op = OP_NOP; + data.ops = data.ops + 1; + } + /** + * touch + * + * Ensure the existance of an object in a collection. Create an + * empty object if necessary + */ + void touch(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_TOUCH; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + /** + * Write data to an offset within an object. If the object is too + * small, it is expanded as needed. It is possible to specify an + * offset beyond the current end of an object and it will be + * expanded as needed. Simple implementations of ObjectStore will + * just zero the data between the old end of the object and the + * newly provided data. More sophisticated implementations of + * ObjectStore will omit the untouched data and store it as a + * "hole" in the file. + * + * Note that a 0-length write does not affect the size of the object. + */ + void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len, + const bufferlist& write_data, uint32_t flags = 0) { + using ceph::encode; + uint32_t orig_len = data_bl.length(); + Op* _op = _get_next_op(); + _op->op = OP_WRITE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + _op->len = len; + encode(write_data, data_bl); + + ceph_assert(len == write_data.length()); + data.fadvise_flags = data.fadvise_flags | flags; + if (write_data.length() > data.largest_data_len) { + data.largest_data_len = write_data.length(); + data.largest_data_off = off; + // we are about to + data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); + } + data.ops = data.ops + 1; + } + /** + * zero out the indicated byte range within an object. Some + * ObjectStore instances may optimize this to release the + * underlying storage space. + * + * If the zero range extends beyond the end of the object, the object + * size is extended, just as if we were writing a buffer full of zeros. + * EXCEPT if the length is 0, in which case (just like a 0-length write) + * we do not adjust the object size. + */ + void zero(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len) { + Op* _op = _get_next_op(); + _op->op = OP_ZERO; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + _op->len = len; + data.ops = data.ops + 1; + } + /// Discard all data in the object beyond the specified size. + void truncate(const coll_t& cid, const ghobject_t& oid, uint64_t off) { + Op* _op = _get_next_op(); + _op->op = OP_TRUNCATE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + data.ops = data.ops + 1; + } + /// Remove an object. All four parts of the object are removed. + void remove(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_REMOVE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + /// Set an xattr of an object + void setattr(const coll_t& cid, const ghobject_t& oid, + const char* name, + bufferlist& val) { + string n(name); + setattr(cid, oid, n, val); + } + /// Set an xattr of an object + void setattr(const coll_t& cid, const ghobject_t& oid, + const string& s, bufferlist& val) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(s, data_bl); + encode(val, data_bl); + data.ops = data.ops + 1; + } + /// Set multiple xattrs of an object + void setattrs(const coll_t& cid, const ghobject_t& oid, + const map<string,bufferptr>& attrset) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops = data.ops + 1; + } + /// Set multiple xattrs of an object + void setattrs(const coll_t& cid, const ghobject_t& oid, + const map<string,bufferlist>& attrset) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops = data.ops + 1; + } + /// remove an xattr from an object + void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) { + string n(name); + rmattr(cid, oid, n); + } + /// remove an xattr from an object + void rmattr(const coll_t& cid, const ghobject_t& oid, const string& s) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_RMATTR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(s, data_bl); + data.ops = data.ops + 1; + } + /// remove all xattrs from an object + void rmattrs(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_RMATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + /** + * Clone an object into another object. + * + * Low-cost (e.g., O(1)) cloning (if supported) is best, but + * fallback to an O(n) copy is allowed. All four parts of the + * object are cloned (data, xattrs, omap header, omap + * entries). + * + * The destination named object may already exist, in + * which case its previous contents are discarded. + */ + void clone(const coll_t& cid, const ghobject_t& oid, + const ghobject_t& noid) { + Op* _op = _get_next_op(); + _op->op = OP_CLONE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->dest_oid = _get_object_id(noid); + data.ops = data.ops + 1; + } + /** + * Clone a byte range from one object to another. + * + * The data portion of the destination object receives a copy of a + * portion of the data from the source object. None of the other + * three parts of an object is copied from the source. + * + * The destination object size may be extended to the dstoff + len. + * + * The source range *must* overlap with the source object data. If it does + * not the result is undefined. + */ + void clone_range(const coll_t& cid, const ghobject_t& oid, + const ghobject_t& noid, + uint64_t srcoff, uint64_t srclen, uint64_t dstoff) { + Op* _op = _get_next_op(); + _op->op = OP_CLONERANGE2; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->dest_oid = _get_object_id(noid); + _op->off = srcoff; + _op->len = srclen; + _op->dest_off = dstoff; + data.ops = data.ops + 1; + } + + /// Create the collection + void create_collection(const coll_t& cid, int bits) { + Op* _op = _get_next_op(); + _op->op = OP_MKCOLL; + _op->cid = _get_coll_id(cid); + _op->split_bits = bits; + data.ops = data.ops + 1; + } + + /** + * Give the collection a hint. + * + * @param cid - collection id. + * @param type - hint type. + * @param hint - the hint payload, which contains the customized + * data along with the hint type. + */ + void collection_hint(const coll_t& cid, uint32_t type, const bufferlist& hint) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_COLL_HINT; + _op->cid = _get_coll_id(cid); + _op->hint_type = type; + encode(hint, data_bl); + data.ops = data.ops + 1; + } + + /// remove the collection, the collection must be empty + void remove_collection(const coll_t& cid) { + Op* _op = _get_next_op(); + _op->op = OP_RMCOLL; + _op->cid = _get_coll_id(cid); + data.ops = data.ops + 1; + } + void collection_move(const coll_t& cid, const coll_t &oldcid, + const ghobject_t& oid) + __attribute__ ((deprecated)) { + // NOTE: we encode this as a fixed combo of ADD + REMOVE. they + // always appear together, so this is effectively a single MOVE. + Op* _op = _get_next_op(); + _op->op = OP_COLL_ADD; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oid); + _op->dest_cid = _get_coll_id(cid); + data.ops = data.ops + 1; + + _op = _get_next_op(); + _op->op = OP_COLL_REMOVE; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid, + const coll_t &cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_COLL_MOVE_RENAME; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oldoid); + _op->dest_cid = _get_coll_id(cid); + _op->dest_oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + void try_rename(const coll_t &cid, const ghobject_t& oldoid, + const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_TRY_RENAME; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oldoid); + _op->dest_oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + + /// Remove omap from oid + void omap_clear( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid ///< [in] Object from which to remove omap + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_CLEAR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops = data.ops + 1; + } + /// Set keys on oid omap. Replaces duplicate keys. + void omap_setkeys( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object to update + const map<string, bufferlist> &attrset ///< [in] Replacement keys and values + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops = data.ops + 1; + } + + /// Set keys on an oid omap (bufferlist variant). + void omap_setkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object to update + const bufferlist &attrset_bl ///< [in] Replacement keys and values + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data_bl.append(attrset_bl); + data.ops = data.ops + 1; + } + + /// Remove keys from oid omap + void omap_rmkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap + const set<string> &keys ///< [in] Keys to clear + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(keys, data_bl); + data.ops = data.ops + 1; + } + + /// Remove keys from oid omap + void omap_rmkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap + const bufferlist &keys_bl ///< [in] Keys to clear + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data_bl.append(keys_bl); + data.ops = data.ops + 1; + } + + /// Remove key range from oid omap + void omap_rmkeyrange( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap keys + const string& first, ///< [in] first key in range + const string& last ///< [in] first key past range, range is [first,last) + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYRANGE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(first, data_bl); + encode(last, data_bl); + data.ops = data.ops + 1; + } + + /// Set omap header + void omap_setheader( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object + const bufferlist &bl ///< [in] Header value + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETHEADER; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(bl, data_bl); + data.ops = data.ops + 1; + } + + /// Split collection based on given prefixes, objects matching the specified + /// bits/rem are moved to the new collection + void split_collection( + const coll_t &cid, + uint32_t bits, + uint32_t rem, + const coll_t &destination) { + Op* _op = _get_next_op(); + _op->op = OP_SPLIT_COLLECTION2; + _op->cid = _get_coll_id(cid); + _op->dest_cid = _get_coll_id(destination); + _op->split_bits = bits; + _op->split_rem = rem; + data.ops = data.ops + 1; + } + + /// Merge collection into another. + void merge_collection( + coll_t cid, + coll_t destination, + uint32_t bits) { + Op* _op = _get_next_op(); + _op->op = OP_MERGE_COLLECTION; + _op->cid = _get_coll_id(cid); + _op->dest_cid = _get_coll_id(destination); + _op->split_bits = bits; + data.ops = data.ops + 1; + } + + void collection_set_bits( + const coll_t &cid, + int bits) { + Op* _op = _get_next_op(); + _op->op = OP_COLL_SET_BITS; + _op->cid = _get_coll_id(cid); + _op->split_bits = bits; + data.ops = data.ops + 1; + } + + /// Set allocation hint for an object + /// make 0 values(expected_object_size, expected_write_size) noops for all implementations + void set_alloc_hint( + const coll_t &cid, + const ghobject_t &oid, + uint64_t expected_object_size, + uint64_t expected_write_size, + uint32_t flags + ) { + Op* _op = _get_next_op(); + _op->op = OP_SETALLOCHINT; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->expected_object_size = expected_object_size; + _op->expected_write_size = expected_write_size; + _op->alloc_hint_flags = flags; + data.ops = data.ops + 1; + } + + void encode(bufferlist& bl) const { + //layout: data_bl + op_bl + coll_index + object_index + data + ENCODE_START(9, 9, bl); + encode(data_bl, bl); + encode(op_bl, bl); + encode(coll_index, bl); + encode(object_index, bl); + data.encode(bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator &bl) { + DECODE_START(9, bl); + DECODE_OLDEST(9); + + decode(data_bl, bl); + decode(op_bl, bl); + decode(coll_index, bl); + decode(object_index, bl); + data.decode(bl); + coll_id = coll_index.size(); + object_id = object_index.size(); + + DECODE_FINISH(bl); + } + + void dump(ceph::Formatter *f); +}; +} diff --git a/src/crimson/os/cyan_collection.cc b/src/crimson/os/cyan_collection.cc new file mode 100644 index 00000000..82403dbc --- /dev/null +++ b/src/crimson/os/cyan_collection.cc @@ -0,0 +1,76 @@ +#include "cyan_collection.h" + +#include "cyan_object.h" + +namespace ceph::os +{ + +Collection::Collection(const coll_t& c) + : cid{c} +{} + +Collection::~Collection() = default; + +Collection::ObjectRef Collection::create_object() const +{ + return new ceph::os::Object{}; +} + +Collection::ObjectRef Collection::get_object(ghobject_t oid) +{ + auto o = object_hash.find(oid); + if (o == object_hash.end()) + return ObjectRef(); + return o->second; +} + +Collection::ObjectRef Collection::get_or_create_object(ghobject_t oid) +{ + auto result = object_hash.emplace(oid, ObjectRef{}); + if (result.second) + object_map[oid] = result.first->second = create_object(); + return result.first->second; +} + +uint64_t Collection::used_bytes() const +{ + uint64_t result = 0; + for (auto& obj : object_map) { + result += obj.second->get_size(); + } + return result; +} + +void Collection::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + encode(xattr, bl); + encode(use_page_set, bl); + uint32_t s = object_map.size(); + encode(s, bl); + for (auto& [oid, obj] : object_map) { + encode(oid, bl); + obj->encode(bl); + } + ENCODE_FINISH(bl); +} + +void Collection::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + decode(xattr, p); + decode(use_page_set, p); + uint32_t s; + decode(s, p); + while (s--) { + ghobject_t k; + decode(k, p); + auto o = create_object(); + o->decode(p); + object_map.insert(make_pair(k, o)); + object_hash.insert(make_pair(k, o)); + } + DECODE_FINISH(p); +} + +} diff --git a/src/crimson/os/cyan_collection.h b/src/crimson/os/cyan_collection.h new file mode 100644 index 00000000..78f1aa0a --- /dev/null +++ b/src/crimson/os/cyan_collection.h @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <string> +#include <unordered_map> +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> + +#include "include/buffer.h" +#include "osd/osd_types.h" + +namespace ceph::os { + +class Object; +/** + * a collection also orders transactions + * + * Any transactions queued under a given collection will be applied in + * sequence. Transactions queued under different collections may run + * in parallel. + * + * ObjectStore users my get collection handles with open_collection() (or, + * for bootstrapping a new collection, create_new_collection()). + */ +struct Collection : public boost::intrusive_ref_counter< + Collection, + boost::thread_unsafe_counter> +{ + using ObjectRef = boost::intrusive_ptr<Object>; + const coll_t cid; + int bits = 0; + // always use bufferlist object for testing + bool use_page_set = false; + std::unordered_map<ghobject_t, ObjectRef> object_hash; ///< for lookup + std::map<ghobject_t, ObjectRef> object_map; ///< for iteration + std::map<std::string,bufferptr> xattr; + bool exists = true; + + Collection(const coll_t& c); + ~Collection(); + + ObjectRef create_object() const; + ObjectRef get_object(ghobject_t oid); + ObjectRef get_or_create_object(ghobject_t oid); + uint64_t used_bytes() const; + + const coll_t &get_cid() const { + return cid; + } + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); +}; + +} diff --git a/src/crimson/os/cyan_object.cc b/src/crimson/os/cyan_object.cc new file mode 100644 index 00000000..1612cebe --- /dev/null +++ b/src/crimson/os/cyan_object.cc @@ -0,0 +1,88 @@ +#include "cyan_object.h" +#include "include/encoding.h" + +namespace ceph::os { + +size_t Object::get_size() const { + return data.length(); +} + +int Object::read(uint64_t offset, uint64_t len, bufferlist &bl) +{ + bl.substr_of(data, offset, len); + return bl.length(); +} + +int Object::write(uint64_t offset, const bufferlist &src) +{ + unsigned len = src.length(); + // before + bufferlist newdata; + if (get_size() >= offset) { + newdata.substr_of(data, 0, offset); + } else { + if (get_size()) { + newdata.substr_of(data, 0, get_size()); + } + newdata.append_zero(offset - get_size()); + } + + newdata.append(src); + + // after + if (get_size() > offset + len) { + bufferlist tail; + tail.substr_of(data, offset + len, get_size() - (offset + len)); + newdata.append(tail); + } + + data.claim(newdata); + return 0; +} + +int Object::clone(Object *src, uint64_t srcoff, uint64_t len, + uint64_t dstoff) +{ + bufferlist bl; + if (srcoff == dstoff && len == src->get_size()) { + data = src->data; + return 0; + } + bl.substr_of(src->data, srcoff, len); + return write(dstoff, bl); + +} + +int Object::truncate(uint64_t size) +{ + if (get_size() > size) { + bufferlist bl; + bl.substr_of(data, 0, size); + data.claim(bl); + } else if (get_size() == size) { + // do nothing + } else { + data.append_zero(size - get_size()); + } + return 0; +} + +void Object::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(data, bl); + encode(xattr, bl); + encode(omap_header, bl); + encode(omap, bl); + ENCODE_FINISH(bl); +} + +void Object::decode(bufferlist::const_iterator& p) { + DECODE_START(1, p); + decode(data, p); + decode(xattr, p); + decode(omap_header, p); + decode(omap, p); + DECODE_FINISH(p); +} + +} diff --git a/src/crimson/os/cyan_object.h b/src/crimson/os/cyan_object.h new file mode 100644 index 00000000..6846b1a4 --- /dev/null +++ b/src/crimson/os/cyan_object.h @@ -0,0 +1,38 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include <cstddef> +#include <map> +#include <string> +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include "include/buffer.h" + +namespace ceph::os { + +struct Object : public boost::intrusive_ref_counter< + Object, + boost::thread_unsafe_counter> +{ + using bufferlist = ceph::bufferlist; + + bufferlist data; + std::map<std::string,bufferptr> xattr; + bufferlist omap_header; + std::map<std::string,bufferlist> omap; + + typedef boost::intrusive_ptr<Object> Ref; + + Object() = default; + + // interface for object data + size_t get_size() const; + int read(uint64_t offset, uint64_t len, bufferlist &bl); + int write(uint64_t offset, const bufferlist &bl); + int clone(Object *src, uint64_t srcoff, uint64_t len, + uint64_t dstoff); + int truncate(uint64_t offset); + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); +}; +} diff --git a/src/crimson/os/cyan_store.cc b/src/crimson/os/cyan_store.cc new file mode 100644 index 00000000..68ccae88 --- /dev/null +++ b/src/crimson/os/cyan_store.cc @@ -0,0 +1,277 @@ +#include "cyan_store.h" + +#include <fmt/format.h> + +#include "common/safe_io.h" + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_object.h" +#include "crimson/os/Transaction.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_filestore); + } +} + +namespace ceph::os { + +using ObjectRef = boost::intrusive_ptr<Object>; + +CyanStore::CyanStore(const std::string& path) + : path{path} +{} + +CyanStore::~CyanStore() = default; + +seastar::future<> CyanStore::mount() +{ + bufferlist bl; + string fn = path + "/collections"; + string err; + if (int r = bl.read_file(fn.c_str(), &err); r < 0) { + throw std::runtime_error("read_file"); + } + + set<coll_t> collections; + auto p = bl.cbegin(); + decode(collections, p); + + for (auto& coll : collections) { + string fn = fmt::format("{}/{}", path, coll); + bufferlist cbl; + if (int r = cbl.read_file(fn.c_str(), &err); r < 0) { + throw std::runtime_error("read_file"); + } + CollectionRef c{new Collection{coll}}; + auto p = cbl.cbegin(); + c->decode(p); + coll_map[coll] = c; + used_bytes += c->used_bytes(); + } + return seastar::now(); +} + +seastar::future<> CyanStore::umount() +{ + set<coll_t> collections; + for (auto& [col, ch] : coll_map) { + collections.insert(col); + bufferlist bl; + ceph_assert(ch); + ch->encode(bl); + string fn = fmt::format("{}/{}", path, col); + if (int r = bl.write_file(fn.c_str()); r < 0) { + throw std::runtime_error("write_file"); + } + } + + string fn = path + "/collections"; + bufferlist bl; + encode(collections, bl); + if (int r = bl.write_file(fn.c_str()); r < 0) { + throw std::runtime_error("write_file"); + } + return seastar::now(); +} + +seastar::future<> CyanStore::mkfs(uuid_d osd_fsid) +{ + string fsid_str; + int r = read_meta("fsid", &fsid_str); + if (r == -ENOENT) { + write_meta("fsid", fmt::format("{}", osd_fsid)); + } else if (r < 0) { + throw std::runtime_error("read_meta"); + } else { + logger().error("{} already has fsid {}", __func__, fsid_str); + throw std::runtime_error("mkfs"); + } + + string fn = path + "/collections"; + bufferlist bl; + set<coll_t> collections; + encode(collections, bl); + r = bl.write_file(fn.c_str()); + if (r < 0) + throw std::runtime_error("write_file"); + + write_meta("type", "memstore"); + return seastar::now(); +} + +CyanStore::CollectionRef CyanStore::create_new_collection(const coll_t& cid) +{ + auto c = new Collection{cid}; + return new_coll_map[cid] = c; +} + +CyanStore::CollectionRef CyanStore::open_collection(const coll_t& cid) +{ + auto cp = coll_map.find(cid); + if (cp == coll_map.end()) + return {}; + return cp->second; +} + +std::vector<coll_t> CyanStore::list_collections() +{ + std::vector<coll_t> collections; + for (auto& coll : coll_map) { + collections.push_back(coll.first); + } + return collections; +} + +seastar::future<bufferlist> CyanStore::read(CollectionRef c, + const ghobject_t& oid, + uint64_t offset, + size_t len, + uint32_t op_flags) +{ + logger().info("{} {} {} {}~{}", + __func__, c->cid, oid, offset, len); + if (!c->exists) { + throw std::runtime_error(fmt::format("collection does not exist: {}", c->cid)); + } + ObjectRef o = c->get_object(oid); + if (!o) { + throw std::runtime_error(fmt::format("object does not exist: {}", oid)); + } + if (offset >= o->get_size()) + return seastar::make_ready_future<bufferlist>(); + size_t l = len; + if (l == 0 && offset == 0) // note: len == 0 means read the entire object + l = o->get_size(); + else if (offset + l > o->get_size()) + l = o->get_size() - offset; + bufferlist bl; + if (int r = o->read(offset, l, bl); r < 0) { + throw std::runtime_error("read"); + } + return seastar::make_ready_future<bufferlist>(std::move(bl)); +} + +seastar::future<CyanStore::omap_values_t> +CyanStore::omap_get_values(CollectionRef c, + const ghobject_t& oid, + std::vector<std::string>&& keys) +{ + logger().info("{} {} {}", + __func__, c->cid, oid); + auto o = c->get_object(oid); + if (!o) { + throw std::runtime_error(fmt::format("object does not exist: {}", oid)); + } + omap_values_t values; + for (auto& key : keys) { + if (auto found = o->omap.find(key); found != o->omap.end()) { + values.insert(*found); + } + } + return seastar::make_ready_future<omap_values_t>(std::move(values)); +} + +seastar::future<> CyanStore::do_transaction(CollectionRef ch, + Transaction&& t) +{ + auto i = t.begin(); + while (i.have_op()) { + Transaction::Op* op = i.decode_op(); + int r = 0; + switch (op->op) { + case Transaction::OP_NOP: + break; + case Transaction::OP_WRITE: + { + coll_t cid = i.get_cid(op->cid); + ghobject_t oid = i.get_oid(op->oid); + uint64_t off = op->off; + uint64_t len = op->len; + uint32_t fadvise_flags = i.get_fadvise_flags(); + bufferlist bl; + i.decode_bl(bl); + r = _write(cid, oid, off, len, bl, fadvise_flags); + } + break; + case Transaction::OP_MKCOLL: + { + coll_t cid = i.get_cid(op->cid); + r = _create_collection(cid, op->split_bits); + } + break; + default: + logger().error("bad op {}", static_cast<unsigned>(op->op)); + abort(); + } + if (r < 0) { + abort(); + } + } + return seastar::now(); +} + +int CyanStore::_write(const coll_t& cid, const ghobject_t& oid, + uint64_t offset, size_t len, const bufferlist& bl, + uint32_t fadvise_flags) +{ + logger().info("{} {} {} {} ~ {}", + __func__, cid, oid, offset, len); + assert(len == bl.length()); + + auto c = open_collection(cid); + if (!c) + return -ENOENT; + + ObjectRef o = c->get_or_create_object(oid); + if (len > 0) { + const ssize_t old_size = o->get_size(); + o->write(offset, bl); + used_bytes += (o->get_size() - old_size); + } + + return 0; +} + +int CyanStore::_create_collection(const coll_t& cid, int bits) +{ + auto result = coll_map.insert(std::make_pair(cid, CollectionRef())); + if (!result.second) + return -EEXIST; + auto p = new_coll_map.find(cid); + assert(p != new_coll_map.end()); + result.first->second = p->second; + result.first->second->bits = bits; + new_coll_map.erase(p); + return 0; +} + +void CyanStore::write_meta(const std::string& key, + const std::string& value) +{ + std::string v = value; + v += "\n"; + if (int r = safe_write_file(path.c_str(), key.c_str(), + v.c_str(), v.length(), 0600); + r < 0) { + throw std::runtime_error{fmt::format("unable to write_meta({})", key)}; + } +} + +int CyanStore::read_meta(const std::string& key, + std::string* value) +{ + char buf[4096]; + int r = safe_read_file(path.c_str(), key.c_str(), + buf, sizeof(buf)); + if (r <= 0) { + return r; + } + // drop trailing newlines + while (r && isspace(buf[r-1])) { + --r; + } + *value = string{buf, static_cast<size_t>(r)}; + return 0; +} +} diff --git a/src/crimson/os/cyan_store.h b/src/crimson/os/cyan_store.h new file mode 100644 index 00000000..0b7f3d87 --- /dev/null +++ b/src/crimson/os/cyan_store.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <string> +#include <unordered_map> +#include <map> +#include <vector> +#include <seastar/core/future.hh> +#include "osd/osd_types.h" +#include "include/uuid.h" + +namespace ceph::os { + +class Collection; +class Transaction; + +// a just-enough store for reading/writing the superblock +class CyanStore { + using CollectionRef = boost::intrusive_ptr<Collection>; + const std::string path; + std::unordered_map<coll_t, CollectionRef> coll_map; + std::map<coll_t,CollectionRef> new_coll_map; + uint64_t used_bytes = 0; + +public: + CyanStore(const std::string& path); + ~CyanStore(); + + seastar::future<> mount(); + seastar::future<> umount(); + + seastar::future<> mkfs(uuid_d osd_fsid); + seastar::future<bufferlist> read(CollectionRef c, + const ghobject_t& oid, + uint64_t offset, + size_t len, + uint32_t op_flags = 0); + using omap_values_t = std::map<std::string,bufferlist, std::less<>>; + seastar::future<omap_values_t> omap_get_values( + CollectionRef c, + const ghobject_t& oid, + std::vector<std::string>&& keys); + CollectionRef create_new_collection(const coll_t& cid); + CollectionRef open_collection(const coll_t& cid); + std::vector<coll_t> list_collections(); + + seastar::future<> do_transaction(CollectionRef ch, + Transaction&& txn); + + void write_meta(const std::string& key, + const std::string& value); + int read_meta(const std::string& key, std::string* value); + +private: + int _write(const coll_t& cid, const ghobject_t& oid, + uint64_t offset, size_t len, const bufferlist& bl, + uint32_t fadvise_flags); + int _create_collection(const coll_t& cid, int bits); +}; + +} diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt new file mode 100644 index 00000000..e86a11b5 --- /dev/null +++ b/src/crimson/osd/CMakeLists.txt @@ -0,0 +1,10 @@ +add_executable(crimson-osd + chained_dispatchers.cc + heartbeat.cc + main.cc + osd.cc + osd_meta.cc + pg.cc + pg_meta.cc) +target_link_libraries(crimson-osd + crimson-common crimson-os crimson) diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc new file mode 100644 index 00000000..da4aa269 --- /dev/null +++ b/src/crimson/osd/chained_dispatchers.cc @@ -0,0 +1,72 @@ +#include "chained_dispatchers.h" +#include "crimson/net/Connection.h" + + +seastar::future<> +ChainedDispatchers::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) { + return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) { + return dispatcher->ms_dispatch(conn, m); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_accept(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_accept(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_connect(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_connect(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_reset(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_reset(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_remote_reset(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_remote_reset(conn); + }); +} + +seastar::future<std::unique_ptr<AuthAuthorizer>> +ChainedDispatchers::ms_get_authorizer(peer_type_t peer_type) +{ + // since dispatcher returns a nullptr if it does not have the authorizer, + // let's use the chain-of-responsibility pattern here. + struct Params { + peer_type_t peer_type; + std::deque<Dispatcher*>::iterator first, last; + } params = {peer_type, dispatchers.begin(), dispatchers.end()}; + return seastar::do_with(Params{params}, [this] (Params& params) { + using result_t = std::unique_ptr<AuthAuthorizer>; + return seastar::repeat_until_value([&] () { + auto& first = params.first; + if (first == params.last) { + // just give up + return seastar::make_ready_future<std::optional<result_t>>(result_t{}); + } else { + return (*first)->ms_get_authorizer(params.peer_type) + .then([&] (auto&& auth)-> std::optional<result_t> { + if (auth) { + // hooray! + return std::move(auth); + } else { + // try next one + ++first; + return {}; + } + }); + } + }); + }); +} diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h new file mode 100644 index 00000000..c3040836 --- /dev/null +++ b/src/crimson/osd/chained_dispatchers.h @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <deque> +#include "crimson/net/Dispatcher.h" + +// in existing Messenger, dispatchers are put into a chain as described by +// chain-of-responsibility pattern. we could do the same to stop processing +// the message once any of the dispatchers claims this message, and prevent +// other dispatchers from reading it. but this change is more involved as +// it requires changing the ms_ methods to return a bool. so as an intermediate +// solution, we are using an observer dispatcher to notify all the interested +// or unintersted parties. +class ChainedDispatchers : public ceph::net::Dispatcher { + std::deque<Dispatcher*> dispatchers; +public: + void push_front(Dispatcher* dispatcher) { + dispatchers.push_front(dispatcher); + } + void push_back(Dispatcher* dispatcher) { + dispatchers.push_back(dispatcher); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; + seastar::future<> ms_handle_accept(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; + seastar::future<std::unique_ptr<AuthAuthorizer>> + ms_get_authorizer(peer_type_t peer_type) override; +}; diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc new file mode 100644 index 00000000..6dfefb3b --- /dev/null +++ b/src/crimson/osd/heartbeat.cc @@ -0,0 +1,415 @@ +#include "heartbeat.h" + +#include <boost/range/join.hpp> + +#include "messages/MOSDPing.h" +#include "messages/MOSDFailure.h" + +#include "crimson/common/config_proxy.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/mon/MonClient.h" + +#include "osd/OSDMap.h" + +using ceph::common::local_conf; + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } + + template<typename Message, typename... Args> + Ref<Message> make_message(Args&&... args) + { + return {new Message{std::forward<Args>(args)...}, false}; + } +} + +Heartbeat::Heartbeat(int whoami, + uint32_t nonce, + const OSDMapService& service, + ceph::mon::Client& monc) + : whoami{whoami}, + nonce{nonce}, + service{service}, + monc{monc}, + timer{[this] {send_heartbeats();}} +{} + +seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, + entity_addrvec_t back_addrs) +{ + logger().info("heartbeat: start"); + // i only care about the address, so any unused port would work + for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) { + addr.set_port(0); + } + return seastar::when_all_succeed( + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "hb_front", + nonce, + seastar::engine().cpu_id()) + .then([this, front_addrs] (auto msgr) { + front_msgr = msgr; + return start_messenger(front_msgr, front_addrs); + }), + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "hb_back", + nonce, + seastar::engine().cpu_id()) + .then([this, back_addrs] (auto msgr) { + back_msgr = msgr; + return start_messenger(back_msgr, back_addrs); + })) + .then([this] { + timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_heartbeat_interval)); + }); +} + +seastar::future<> +Heartbeat::start_messenger(ceph::net::Messenger* msgr, + const entity_addrvec_t& addrs) +{ + if (local_conf()->ms_crc_data) { + msgr->set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr->set_crc_header(); + } + return msgr->try_bind(addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max).then([msgr, this] { + return msgr->start(this); + }); +} + +seastar::future<> Heartbeat::stop() +{ + return seastar::when_all_succeed(front_msgr->shutdown(), + back_msgr->shutdown()); +} + +const entity_addrvec_t& Heartbeat::get_front_addrs() const +{ + return front_msgr->get_myaddrs(); +} + +const entity_addrvec_t& Heartbeat::get_back_addrs() const +{ + return back_msgr->get_myaddrs(); +} + +seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) +{ + auto found = peers.find(peer); + if (found == peers.end()) { + logger().info("add_peer({})", peer); + auto osdmap = service.get_map(); + // TODO: msgr v2 + return seastar::when_all_succeed( + front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD), + back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD)) + .then([this, peer, epoch] (auto xcon_front, auto xcon_back) { + PeerInfo info; + // sharded-messenger compatible mode + info.con_front = xcon_front->release(); + info.con_back = xcon_back->release(); + info.epoch = epoch; + peers.emplace(peer, std::move(info)); + }); + } else { + found->second.epoch = epoch; + return seastar::now(); + } +} + +seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers() +{ + osds_t osds; + for (auto& peer : peers) { + osds.push_back(peer.first); + } + return seastar::map_reduce(std::move(osds), + [this](auto& osd) { + auto osdmap = service.get_map(); + if (!osdmap->is_up(osd)) { + return remove_peer(osd).then([] { + return seastar::make_ready_future<osd_id_t>(-1); + }); + } else if (peers[osd].epoch < osdmap->get_epoch()) { + return seastar::make_ready_future<osd_id_t>(osd); + } else { + return seastar::make_ready_future<osd_id_t>(-1); + } + }, osds_t{}, + [this](osds_t&& extras, osd_id_t extra) { + if (extra >= 0) { + extras.push_back(extra); + } + return extras; + }); +} + +void Heartbeat::add_reporter_peers(int whoami) +{ + auto osdmap = service.get_map(); + // include next and previous up osds to ensure we have a fully-connected set + set<int> want; + if (auto next = osdmap->get_next_up_osd_after(whoami); next >= 0) { + want.insert(next); + } + if (auto prev = osdmap->get_previous_up_osd_before(whoami); prev >= 0) { + want.insert(prev); + } + // make sure we have at least **min_down** osds coming from different + // subtree level (e.g., hosts) for fast failure detection. + auto min_down = local_conf().get_val<uint64_t>("mon_osd_min_down_reporters"); + auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level"); + osdmap->get_random_up_osds_by_subtree( + whoami, subtree, min_down, want, &want); + for (auto osd : want) { + add_peer(osd, osdmap->get_epoch()); + } +} + +seastar::future<> Heartbeat::update_peers(int whoami) +{ + const auto min_peers = static_cast<size_t>( + local_conf().get_val<int64_t>("osd_heartbeat_min_peers")); + return remove_down_peers().then([=](osds_t&& extra) { + add_reporter_peers(whoami); + // too many? + struct iteration_state { + osds_t::const_iterator where; + osds_t::const_iterator end; + }; + return seastar::do_with(iteration_state{extra.begin(),extra.end()}, + [=](iteration_state& s) { + return seastar::do_until( + [min_peers, &s, this] { + return peers.size() < min_peers || s.where == s.end; }, + [&s, this] { + return remove_peer(*s.where); } + ); + }); + }).then([=] { + // or too few? + auto osdmap = service.get_map(); + for (auto next = osdmap->get_next_up_osd_after(whoami); + peers.size() < min_peers && next >= 0 && next != whoami; + next = osdmap->get_next_up_osd_after(next)) { + add_peer(next, osdmap->get_epoch()); + } + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::remove_peer(osd_id_t peer) +{ + auto found = peers.find(peer); + assert(found != peers.end()); + logger().info("remove_peer({})", peer); + return seastar::when_all_succeed(found->second.con_front->close(), + found->second.con_back->close()).then( + [this, peer] { + peers.erase(peer); + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) +{ + logger().info("heartbeat: ms_dispatch {} from {}", + *m, m->get_source()); + switch (m->get_type()) { + case CEPH_MSG_PING: + return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m)); + default: + return seastar::now(); + } +} + +seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + switch (m->op) { + case MOSDPing::PING: + return handle_ping(conn, m); + case MOSDPing::PING_REPLY: + return handle_reply(conn, m); + case MOSDPing::YOU_DIED: + return handle_you_died(); + default: + return seastar::now(); + } +} + +seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + auto min_message = static_cast<uint32_t>( + local_conf()->osd_heartbeat_min_size); + auto reply = + make_message<MOSDPing>(m->fsid, + service.get_map()->get_epoch(), + MOSDPing::PING_REPLY, + m->stamp, + min_message); + return conn->send(reply); +} + +seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m) +{ + const osd_id_t from = m->get_source().num(); + auto found = peers.find(from); + if (found == peers.end()) { + // stale reply + return seastar::now(); + } + auto& peer = found->second; + auto ping = peer.ping_history.find(m->stamp); + if (ping == peer.ping_history.end()) { + // old replies, deprecated by newly sent pings. + return seastar::now(); + } + const auto now = clock::now(); + auto& unacked = ping->second.unacknowledged; + if (conn == peer.con_back) { + peer.last_rx_back = now; + unacked--; + } else if (conn == peer.con_front) { + peer.last_rx_front = now; + unacked--; + } + if (unacked == 0) { + peer.ping_history.erase(peer.ping_history.begin(), ++ping); + } + if (peer.is_healthy(now)) { + // cancel false reports + failure_queue.erase(from); + if (auto pending = failure_pending.find(from); + pending != failure_pending.end()) { + return send_still_alive(from, pending->second.addrs); + } + } + return seastar::now(); +} + +seastar::future<> Heartbeat::handle_you_died() +{ + // TODO: ask for newer osdmap + return seastar::now(); +} + +seastar::future<> Heartbeat::send_heartbeats() +{ + using peers_item_t = typename peers_map_t::value_type; + return seastar::parallel_for_each(peers, + [this](peers_item_t& item) { + const auto now = clock::now(); + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + auto& [peer, info] = item; + info.last_tx = now; + if (clock::is_zero(info.first_tx)) { + info.first_tx = now; + } + const utime_t sent_stamp{now}; + auto [reply, added] = info.ping_history.emplace(sent_stamp, + reply_t{deadline, 0}); + std::vector<ceph::net::ConnectionRef> conns{info.con_front, + info.con_back}; + return seastar::parallel_for_each(std::move(conns), + [sent_stamp, &reply=reply->second, this] (auto con) { + if (con) { + auto min_message = static_cast<uint32_t>( + local_conf()->osd_heartbeat_min_size); + auto ping = make_message<MOSDPing>(monc.get_fsid(), + service.get_map()->get_epoch(), + MOSDPing::PING, + sent_stamp, + min_message); + return con->send(ping).then([&reply] { + reply.unacknowledged++; + return seastar::now(); + }); + } else { + return seastar::now(); + } + }); + }); +} + +seastar::future<> Heartbeat::send_failures() +{ + using failure_item_t = typename failure_queue_t::value_type; + return seastar::parallel_for_each(failure_queue, + [this](failure_item_t& failure_item) { + auto [osd, failed_since] = failure_item; + if (failure_pending.count(osd)) { + return seastar::now(); + } + auto failed_for = chrono::duration_cast<chrono::seconds>( + clock::now() - failed_since).count(); + auto osdmap = service.get_map(); + auto failure_report = + make_message<MOSDFailure>(monc.get_fsid(), + osd, + osdmap->get_addrs(osd), + static_cast<int>(failed_for), + osdmap->get_epoch()); + failure_pending.emplace(osd, failure_info_t{failed_since, + osdmap->get_addrs(osd)}); + return monc.send_message(failure_report); + }).then([this] { + failure_queue.clear(); + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::send_still_alive(osd_id_t osd, + const entity_addrvec_t& addrs) +{ + auto still_alive = make_message<MOSDFailure>(monc.get_fsid(), + osd, + addrs, + 0, + service.get_map()->get_epoch(), + MOSDFailure::FLAG_ALIVE); + return monc.send_message(still_alive).then([=] { + failure_pending.erase(osd); + return seastar::now(); + }); +} + +bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const +{ + if (ping_history.empty()) { + // we haven't sent a ping yet or we have got all replies, + // in either way we are safe and healthy for now + return false; + } else { + auto oldest_ping = ping_history.begin(); + return now > oldest_ping->second.deadline; + } +} + +bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const +{ + if (con_front && clock::is_zero(last_rx_front)) { + return false; + } + if (con_back && clock::is_zero(last_rx_back)) { + return false; + } + // only declare to be healthy until we have received the first + // replies from both front/back connections + return !is_unhealthy(now); +} diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h new file mode 100644 index 00000000..b5eb0f7c --- /dev/null +++ b/src/crimson/osd/heartbeat.h @@ -0,0 +1,121 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <cstdint> +#include <seastar/core/future.hh> +#include "common/ceph_time.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Fwd.h" + +class MOSDPing; +class OSDMapService; + +namespace ceph::mon { + class Client; +} + +template<typename Message> using Ref = boost::intrusive_ptr<Message>; + +class Heartbeat : public ceph::net::Dispatcher { +public: + using osd_id_t = int; + + Heartbeat(int whoami, + uint32_t nonce, + const OSDMapService& service, + ceph::mon::Client& monc); + + seastar::future<> start(entity_addrvec_t front, + entity_addrvec_t back); + seastar::future<> stop(); + + seastar::future<> add_peer(osd_id_t peer, epoch_t epoch); + seastar::future<> update_peers(int whoami); + seastar::future<> remove_peer(osd_id_t peer); + + seastar::future<> send_heartbeats(); + seastar::future<> send_failures(); + + const entity_addrvec_t& get_front_addrs() const; + const entity_addrvec_t& get_back_addrs() const; + + // Dispatcher methods + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) override; + +private: + seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_ping(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_reply(ceph::net::ConnectionRef conn, + Ref<MOSDPing> m); + seastar::future<> handle_you_died(); + + seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&); + + using osds_t = std::vector<osd_id_t>; + /// remove down OSDs + /// @return peers not needed in this epoch + seastar::future<osds_t> remove_down_peers(); + /// add enough reporters for fast failure detection + void add_reporter_peers(int whoami); + + seastar::future<> start_messenger(ceph::net::Messenger* msgr, + const entity_addrvec_t& addrs); +private: + const int whoami; + const uint32_t nonce; + ceph::net::Messenger* front_msgr = nullptr; + ceph::net::Messenger* back_msgr = nullptr; + const OSDMapService& service; + ceph::mon::Client& monc; + + seastar::timer<seastar::lowres_clock> timer; + // use real_clock so it can be converted to utime_t + using clock = ceph::coarse_real_clock; + + struct reply_t { + clock::time_point deadline; + // one sent over front conn, another sent over back conn + uint8_t unacknowledged = 0; + }; + struct PeerInfo { + /// peer connection (front) + ceph::net::ConnectionRef con_front; + /// peer connection (back) + ceph::net::ConnectionRef con_back; + /// time we sent our first ping request + clock::time_point first_tx; + /// last time we sent a ping request + clock::time_point last_tx; + /// last time we got a ping reply on the front side + clock::time_point last_rx_front; + /// last time we got a ping reply on the back side + clock::time_point last_rx_back; + /// most recent epoch we wanted this peer + epoch_t epoch; + /// history of inflight pings, arranging by timestamp we sent + std::map<utime_t, reply_t> ping_history; + + bool is_unhealthy(clock::time_point now) const; + bool is_healthy(clock::time_point now) const; + }; + using peers_map_t = std::map<osd_id_t, PeerInfo>; + peers_map_t peers; + + // osds which are considered failed + // osd_id => when was the last time that both front and back pings were acked + // use for calculating how long the OSD has been unresponsive + using failure_queue_t = std::map<osd_id_t, clock::time_point>; + failure_queue_t failure_queue; + struct failure_info_t { + clock::time_point failed_since; + entity_addrvec_t addrs; + }; + // osds we've reported to monior as failed ones, but they are not marked down + // yet + std::map<osd_id_t, failure_info_t> failure_pending; +}; diff --git a/src/crimson/osd/main.cc b/src/crimson/osd/main.cc new file mode 100644 index 00000000..04a0c97d --- /dev/null +++ b/src/crimson/osd/main.cc @@ -0,0 +1,89 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#include <sys/types.h> +#include <unistd.h> + +#include <iostream> + +#include <seastar/core/app-template.hh> +#include <seastar/core/thread.hh> + +#include "common/ceph_argparse.h" +#include "crimson/common/config_proxy.h" + +#include "osd.h" + +using config_t = ceph::common::ConfigProxy; + +void usage(const char* prog) { + std::cout << "usage: " << prog << " -i <ID>" << std::endl; + generic_server_usage(); +} + +int main(int argc, char* argv[]) +{ + std::vector<const char*> args{argv + 1, argv + argc}; + if (ceph_argparse_need_usage(args)) { + usage(argv[0]); + return EXIT_SUCCESS; + } + std::string cluster; + std::string conf_file_list; + // ceph_argparse_early_args() could _exit(), while local_conf() won't ready + // until it's started. so do the boilerplate-settings parsing here. + auto init_params = ceph_argparse_early_args(args, + CEPH_ENTITY_TYPE_OSD, + &cluster, + &conf_file_list); + seastar::app_template app; + app.add_options() + ("mkfs", "create a [new] data directory"); + seastar::sharded<OSD> osd; + + using ceph::common::sharded_conf; + using ceph::common::sharded_perf_coll; + using ceph::common::local_conf; + + args.insert(begin(args), argv[0]); + try { + return app.run_deprecated(args.size(), const_cast<char**>(args.data()), [&] { + auto& config = app.configuration(); + seastar::engine().at_exit([] { + return sharded_conf().stop(); + }); + seastar::engine().at_exit([] { + return sharded_perf_coll().stop(); + }); + seastar::engine().at_exit([&] { + return osd.stop(); + }); + return sharded_conf().start(init_params.name, cluster).then([] { + return sharded_perf_coll().start(); + }).then([&conf_file_list] { + return local_conf().parse_config_files(conf_file_list); + }).then([&] { + return osd.start_single(std::stoi(local_conf()->name.get_id()), + static_cast<uint32_t>(getpid())); + }).then([&osd, mkfs = config.count("mkfs")] { + if (mkfs) { + return osd.invoke_on(0, &OSD::mkfs, + local_conf().get_val<uuid_d>("fsid")); + } else { + return osd.invoke_on(0, &OSD::start); + } + }); + }); + } catch (...) { + seastar::fprint(std::cerr, "FATAL: Exception during startup, aborting: %s\n", std::current_exception()); + return EXIT_FAILURE; + } +} + +/* + * Local Variables: + * compile-command: "make -j4 \ + * -C ../../../build \ + * crimson-osd" + * End: + */ diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc new file mode 100644 index 00000000..bdb1642e --- /dev/null +++ b/src/crimson/osd/osd.cc @@ -0,0 +1,658 @@ +#include "osd.h" + +#include <boost/range/join.hpp> +#include <boost/smart_ptr/make_local_shared.hpp> + +#include "common/pick_address.h" +#include "messages/MOSDBeacon.h" +#include "messages/MOSDBoot.h" +#include "messages/MOSDMap.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_object.h" +#include "crimson/os/cyan_store.h" +#include "crimson/os/Transaction.h" +#include "crimson/osd/heartbeat.h" +#include "crimson/osd/osd_meta.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_meta.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } + + template<typename Message, typename... Args> + Ref<Message> make_message(Args&&... args) + { + return {new Message{std::forward<Args>(args)...}, false}; + } + static constexpr int TICK_INTERVAL = 1; +} + +using ceph::common::local_conf; +using ceph::os::CyanStore; + +OSD::OSD(int id, uint32_t nonce) + : whoami{id}, + nonce{nonce}, + beacon_timer{[this] { send_beacon(); }}, + heartbeat_timer{[this] { update_heartbeat_peers(); }} +{ + osdmaps[0] = boost::make_local_shared<OSDMap>(); +} + +OSD::~OSD() = default; + +namespace { +// Initial features in new superblock. +// Features here are also automatically upgraded +CompatSet get_osd_initial_compat_set() +{ + CompatSet::FeatureSet ceph_osd_feature_compat; + CompatSet::FeatureSet ceph_osd_feature_ro_compat; + CompatSet::FeatureSet ceph_osd_feature_incompat; + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HINTS); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGMETA); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_MISSING); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_FASTINFO); + ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_RECOVERY_DELETES); + return CompatSet(ceph_osd_feature_compat, + ceph_osd_feature_ro_compat, + ceph_osd_feature_incompat); +} +} + +seastar::future<> OSD::mkfs(uuid_d cluster_fsid) +{ + const auto data_path = local_conf().get_val<std::string>("osd_data"); + store = std::make_unique<ceph::os::CyanStore>(data_path); + uuid_d osd_fsid; + osd_fsid.generate_random(); + return store->mkfs(osd_fsid).then([this] { + return store->mount(); + }).then([cluster_fsid, osd_fsid, this] { + superblock.cluster_fsid = cluster_fsid; + superblock.osd_fsid = osd_fsid; + superblock.whoami = whoami; + superblock.compat_features = get_osd_initial_compat_set(); + + meta_coll = make_unique<OSDMeta>( + store->create_new_collection(coll_t::meta()), store.get()); + ceph::os::Transaction t; + meta_coll->create(t); + meta_coll->store_superblock(t, superblock); + return store->do_transaction(meta_coll->collection(), std::move(t)); + }).then([cluster_fsid, this] { + store->write_meta("ceph_fsid", cluster_fsid.to_string()); + store->write_meta("whoami", std::to_string(whoami)); + return seastar::now(); + }); +} + +namespace { + entity_addrvec_t pick_addresses(int what) { + entity_addrvec_t addrs; + CephContext cct; + if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) { + throw std::runtime_error("failed to pick address"); + } + // TODO: v2: ::pick_addresses() returns v2 addresses, but crimson-msgr does + // not support v2 yet. remove following set_type() once v2 support is ready. + for (auto addr : addrs.v) { + addr.set_type(addr.TYPE_LEGACY); + logger().info("picked address {}", addr); + } + return addrs; + } + std::pair<entity_addrvec_t, bool> + replace_unknown_addrs(entity_addrvec_t maybe_unknowns, + const entity_addrvec_t& knowns) { + bool changed = false; + auto maybe_replace = [&](entity_addr_t addr) { + if (!addr.is_blank_ip()) { + return addr; + } + for (auto& b : knowns.v) { + if (addr.get_family() == b.get_family()) { + auto a = b; + a.set_nonce(addr.get_nonce()); + a.set_type(addr.get_type()); + a.set_port(addr.get_port()); + changed = true; + return a; + } + } + throw std::runtime_error("failed to replace unknown address"); + }; + entity_addrvec_t replaced; + std::transform(maybe_unknowns.v.begin(), + maybe_unknowns.v.end(), + std::back_inserter(replaced.v), + maybe_replace); + return {replaced, changed}; + } +} + +seastar::future<> OSD::start() +{ + logger().info("start"); + + return seastar::when_all_succeed( + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "cluster", + nonce, + seastar::engine().cpu_id()) + .then([this] (auto msgr) { cluster_msgr = msgr; }), + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "client", + nonce, + seastar::engine().cpu_id()) + .then([this] (auto msgr) { public_msgr = msgr; })) + .then([this] { + monc.reset(new ceph::mon::Client{*public_msgr}); + heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc}); + + for (auto msgr : {cluster_msgr, public_msgr}) { + if (local_conf()->ms_crc_data) { + msgr->set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr->set_crc_header(); + } + } + dispatchers.push_front(this); + dispatchers.push_front(monc.get()); + + const auto data_path = local_conf().get_val<std::string>("osd_data"); + store = std::make_unique<ceph::os::CyanStore>(data_path); + return store->mount(); + }).then([this] { + meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()), + store.get()); + return meta_coll->load_superblock(); + }).then([this](OSDSuperblock&& sb) { + superblock = std::move(sb); + return get_map(superblock.current_epoch); + }).then([this](cached_map_t&& map) { + osdmap = std::move(map); + return load_pgs(); + }).then([this] { + return seastar::when_all_succeed( + cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return cluster_msgr->start(&dispatchers); }), + public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return public_msgr->start(&dispatchers); })); + }).then([this] { + return monc->start(); + }).then([this] { + monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); + monc->sub_want("mgrmap", 0, 0); + monc->sub_want("osdmap", 0, 0); + return monc->renew_subs(); + }).then([this] { + if (auto [addrs, changed] = + replace_unknown_addrs(cluster_msgr->get_myaddrs(), + public_msgr->get_myaddrs()); changed) { + cluster_msgr->set_myaddrs(addrs); + } + return heartbeat->start(public_msgr->get_myaddrs(), + cluster_msgr->get_myaddrs()); + }).then([this] { + return start_boot(); + }); +} + +seastar::future<> OSD::start_boot() +{ + state.set_preboot(); + return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) { + return _preboot(oldest, newest); + }); +} + +seastar::future<> OSD::_preboot(version_t oldest, version_t newest) +{ + logger().info("osd.{}: _preboot", whoami); + if (osdmap->get_epoch() == 0) { + logger().warn("waiting for initial osdmap"); + } else if (osdmap->is_destroyed(whoami)) { + logger().warn("osdmap says I am destroyed"); + // provide a small margin so we don't livelock seeing if we + // un-destroyed ourselves. + if (osdmap->get_epoch() > newest - 1) { + throw std::runtime_error("i am destroyed"); + } + } else if (osdmap->is_noup(whoami)) { + logger().warn("osdmap NOUP flag is set, waiting for it to clear"); + } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { + logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it"); + } else if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) { + logger().error("osdmap require_osd_release < luminous; please upgrade to luminous"); + } else if (false) { + // TODO: update mon if current fullness state is different from osdmap + } else if (version_t n = local_conf()->osd_map_message_max; + osdmap->get_epoch() >= oldest - 1 && + osdmap->get_epoch() + n > newest) { + return _send_boot(); + } + // get all the latest maps + if (osdmap->get_epoch() + 1 >= oldest) { + return osdmap_subscribe(osdmap->get_epoch() + 1, false); + } else { + return osdmap_subscribe(oldest - 1, true); + } +} + +seastar::future<> OSD::_send_boot() +{ + state.set_booting(); + + logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs()); + logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs()); + logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr()); + auto m = make_message<MOSDBoot>(superblock, + osdmap->get_epoch(), + osdmap->get_epoch(), + heartbeat->get_back_addrs(), + heartbeat->get_front_addrs(), + cluster_msgr->get_myaddrs(), + CEPH_FEATURES_ALL); + return monc->send_message(m); +} + +seastar::future<> OSD::stop() +{ + // see also OSD::shutdown() + state.set_stopping(); + return gate.close().then([this] { + return heartbeat->stop(); + }).then([this] { + return monc->stop(); + }).then([this] { + return public_msgr->shutdown(); + }).then([this] { + return cluster_msgr->shutdown(); + }); +} + +seastar::future<> OSD::load_pgs() +{ + return seastar::parallel_for_each(store->list_collections(), + [this](auto coll) { + spg_t pgid; + if (coll.is_pg(&pgid)) { + return load_pg(pgid).then([pgid, this](auto&& pg) { + logger().info("load_pgs: loaded {}", pgid); + pgs.emplace(pgid, std::move(pg)); + return seastar::now(); + }); + } else if (coll.is_temp(&pgid)) { + // TODO: remove the collection + return seastar::now(); + } else { + logger().warn("ignoring unrecognized collection: {}", coll); + return seastar::now(); + } + }); +} + +seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid) +{ + using ec_profile_t = map<string,string>; + return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) { + return get_map(e); + }).then([pgid, this] (auto&& create_map) { + if (create_map->have_pg_pool(pgid.pool())) { + pg_pool_t pi = *create_map->get_pg_pool(pgid.pool()); + string name = create_map->get_pool_name(pgid.pool()); + ec_profile_t ec_profile; + if (pi.is_erasure()) { + ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile); + } + return seastar::make_ready_future<pg_pool_t, + string, + ec_profile_t>(std::move(pi), + std::move(name), + std::move(ec_profile)); + } else { + // pool was deleted; grab final pg_pool_t off disk. + return meta_coll->load_final_pool_info(pgid.pool()); + } + }).then([this](pg_pool_t&& pool, string&& name, ec_profile_t&& ec_profile) { + Ref<PG> pg{new PG{std::move(pool), + std::move(name), + std::move(ec_profile)}}; + return seastar::make_ready_future<Ref<PG>>(std::move(pg)); + }); +} + +seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) +{ + logger().info("ms_dispatch {}", *m); + if (state.is_stopping()) { + return seastar::now(); + } + + switch (m->get_type()) { + case CEPH_MSG_OSD_MAP: + return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m)); + default: + return seastar::now(); + } +} + +seastar::future<> OSD::ms_handle_connect(ceph::net::ConnectionRef conn) +{ + if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) { + return seastar::now(); + } else { + return seastar::now(); + } +} + +seastar::future<> OSD::ms_handle_reset(ceph::net::ConnectionRef conn) +{ + // TODO: cleanup the session attached to this connection + logger().warn("ms_handle_reset"); + return seastar::now(); +} + +seastar::future<> OSD::ms_handle_remote_reset(ceph::net::ConnectionRef conn) +{ + logger().warn("ms_handle_remote_reset"); + return seastar::now(); +} + +OSD::cached_map_t OSD::get_map() const +{ + return osdmap; +} + +seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e) +{ + // TODO: use LRU cache for managing osdmap, fallback to disk if we have to + if (auto found = osdmaps.find(e); found) { + return seastar::make_ready_future<cached_map_t>(std::move(found)); + } else { + return load_map_bl(e).then([e, this](bufferlist bl) { + auto osdmap = std::make_unique<OSDMap>(); + osdmap->decode(bl); + return seastar::make_ready_future<cached_map_t>( + osdmaps.insert(e, std::move(osdmap))); + }); + } +} + +void OSD::store_map_bl(ceph::os::Transaction& t, + epoch_t e, bufferlist&& bl) +{ + meta_coll->store_map(t, e, bl); + map_bl_cache.insert(e, std::move(bl)); +} + +seastar::future<bufferlist> OSD::load_map_bl(epoch_t e) +{ + if (std::optional<bufferlist> found = map_bl_cache.find(e); found) { + return seastar::make_ready_future<bufferlist>(*found); + } else { + return meta_coll->load_map(e); + } +} + +seastar::future<> OSD::store_maps(ceph::os::Transaction& t, + epoch_t start, Ref<MOSDMap> m) +{ + return seastar::do_for_each(boost::counting_iterator<epoch_t>(start), + boost::counting_iterator<epoch_t>(m->get_last() + 1), + [&t, m, this](epoch_t e) { + if (auto p = m->maps.find(e); p != m->maps.end()) { + auto o = std::make_unique<OSDMap>(); + o->decode(p->second); + logger().info("store_maps osdmap.{}", e); + store_map_bl(t, e, std::move(std::move(p->second))); + osdmaps.insert(e, std::move(o)); + return seastar::now(); + } else if (auto p = m->incremental_maps.find(e); + p != m->incremental_maps.end()) { + OSDMap::Incremental inc; + auto i = p->second.cbegin(); + inc.decode(i); + return load_map_bl(e - 1) + .then([&t, e, inc=std::move(inc), this](bufferlist bl) { + auto o = std::make_unique<OSDMap>(); + o->decode(bl); + o->apply_incremental(inc); + bufferlist fbl; + o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED); + store_map_bl(t, e, std::move(fbl)); + osdmaps.insert(e, std::move(o)); + return seastar::now(); + }); + } else { + logger().error("MOSDMap lied about what maps it had?"); + return seastar::now(); + } + }); +} + +seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request) +{ + logger().info("{}({})", __func__, epoch); + if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || + force_request) { + return monc->renew_subs(); + } else { + return seastar::now(); + } +} + +seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn, + Ref<MOSDMap> m) +{ + logger().info("handle_osd_map {}", *m); + if (m->fsid != superblock.cluster_fsid) { + logger().warn("fsid mismatched"); + return seastar::now(); + } + if (state.is_initializing()) { + logger().warn("i am still initializing"); + return seastar::now(); + } + + const auto first = m->get_first(); + const auto last = m->get_last(); + + // make sure there is something new, here, before we bother flushing + // the queues and such + if (last <= superblock.newest_map) { + return seastar::now(); + } + // missing some? + bool skip_maps = false; + epoch_t start = superblock.newest_map + 1; + if (first > start) { + logger().info("handle_osd_map message skips epochs {}..{}", + start, first - 1); + if (m->oldest_map <= start) { + return osdmap_subscribe(start, false); + } + // always try to get the full range of maps--as many as we can. this + // 1- is good to have + // 2- is at present the only way to ensure that we get a *full* map as + // the first map! + if (m->oldest_map < first) { + return osdmap_subscribe(m->oldest_map - 1, true); + } + skip_maps = true; + start = first; + } + + return seastar::do_with(ceph::os::Transaction{}, + [=](auto& t) { + return store_maps(t, start, m).then([=, &t] { + // even if this map isn't from a mon, we may have satisfied our subscription + monc->sub_got("osdmap", last); + if (!superblock.oldest_map || skip_maps) { + superblock.oldest_map = first; + } + superblock.newest_map = last; + superblock.current_epoch = last; + + // note in the superblock that we were clean thru the prior epoch + if (boot_epoch && boot_epoch >= superblock.mounted) { + superblock.mounted = boot_epoch; + superblock.clean_thru = last; + } + meta_coll->store_superblock(t, superblock); + return store->do_transaction(meta_coll->collection(), std::move(t)); + }); + }).then([=] { + // TODO: write to superblock and commit the transaction + return committed_osd_maps(start, last, m); + }); +} + +seastar::future<> OSD::committed_osd_maps(version_t first, + version_t last, + Ref<MOSDMap> m) +{ + logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last); + // advance through the new maps + return seastar::parallel_for_each(boost::irange(first, last + 1), + [this](epoch_t cur) { + return get_map(cur).then([this](cached_map_t&& o) { + osdmap = std::move(o); + if (up_epoch != 0 && + osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { + up_epoch = osdmap->get_epoch(); + if (!boot_epoch) { + boot_epoch = osdmap->get_epoch(); + } + } + }); + }).then([m, this] { + if (osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && + bind_epoch < osdmap->get_up_from(whoami)) { + if (state.is_booting()) { + logger().info("osd.{}: activating...", whoami); + state.set_active(); + beacon_timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_beacon_report_interval)); + heartbeat_timer.arm_periodic( + std::chrono::seconds(TICK_INTERVAL)); + } + } + + if (state.is_active()) { + logger().info("osd.{}: now active", whoami); + if (!osdmap->exists(whoami)) { + return shutdown(); + } + if (should_restart()) { + return restart(); + } else { + return seastar::now(); + } + } else if (state.is_preboot()) { + logger().info("osd.{}: now preboot", whoami); + + if (m->get_source().is_mon()) { + return _preboot(m->oldest_map, m->newest_map); + } else { + logger().info("osd.{}: start_boot", whoami); + return start_boot(); + } + } else { + logger().info("osd.{}: now {}", whoami, state); + // XXX + return seastar::now(); + } + }); +} + +bool OSD::should_restart() const +{ + if (!osdmap->is_up(whoami)) { + logger().info("map e {} marked osd.{} down", + osdmap->get_epoch(), whoami); + return true; + } else if (osdmap->get_addrs(whoami) != public_msgr->get_myaddrs()) { + logger().error("map e {} had wrong client addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_addrs(whoami), + public_msgr->get_myaddrs()); + return true; + } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { + logger().error("map e {} had wrong cluster addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_cluster_addrs(whoami), + cluster_msgr->get_myaddrs()); + return true; + } else { + return false; + } +} + +seastar::future<> OSD::restart() +{ + up_epoch = 0; + bind_epoch = osdmap->get_epoch(); + // TODO: promote to shutdown if being marked down for multiple times + // rebind messengers + return start_boot(); +} + +seastar::future<> OSD::shutdown() +{ + // TODO + superblock.mounted = boot_epoch; + superblock.clean_thru = osdmap->get_epoch(); + return seastar::now(); +} + +seastar::future<> OSD::send_beacon() +{ + // FIXME: min lec should be calculated from pg_stat + // and should set m->pgs + epoch_t min_last_epoch_clean = osdmap->get_epoch(); + auto m = make_message<MOSDBeacon>(osdmap->get_epoch(), + min_last_epoch_clean); + return monc->send_message(m); +} + +void OSD::update_heartbeat_peers() +{ + if (!state.is_active()) { + return; + } + for (auto& pg : pgs) { + vector<int> up, acting; + osdmap->pg_to_up_acting_osds(pg.first.pgid, + &up, nullptr, + &acting, nullptr); + for (auto osd : boost::join(up, acting)) { + if (osd != CRUSH_ITEM_NONE) { + heartbeat->add_peer(osd, osdmap->get_epoch()); + } + } + } + heartbeat->update_peers(whoami); +} diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h new file mode 100644 index 00000000..c5aff5e2 --- /dev/null +++ b/src/crimson/osd/osd.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <map> +#include <seastar/core/future.hh> +#include <seastar/core/gate.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/timer.hh> + +#include "crimson/common/simple_lru.h" +#include "crimson/common/shared_lru.h" +#include "crimson/mon/MonClient.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/osd/chained_dispatchers.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/osd/state.h" + +#include "osd/OSDMap.h" + +class MOSDMap; +class OSDMap; +class OSDMeta; +class PG; +class Heartbeat; + +namespace ceph::net { + class Messenger; +} + +namespace ceph::os { + class CyanStore; + struct Collection; + class Transaction; +} + +template<typename T> using Ref = boost::intrusive_ptr<T>; + +class OSD : public ceph::net::Dispatcher, + private OSDMapService { + seastar::gate gate; + const int whoami; + const uint32_t nonce; + seastar::timer<seastar::lowres_clock> beacon_timer; + // talk with osd + ceph::net::Messenger* cluster_msgr = nullptr; + // talk with client/mon/mgr + ceph::net::Messenger* public_msgr = nullptr; + ChainedDispatchers dispatchers; + std::unique_ptr<ceph::mon::Client> monc; + + std::unique_ptr<Heartbeat> heartbeat; + seastar::timer<seastar::lowres_clock> heartbeat_timer; + + SharedLRU<epoch_t, OSDMap> osdmaps; + SimpleLRU<epoch_t, bufferlist, false> map_bl_cache; + cached_map_t osdmap; + // TODO: use a wrapper for ObjectStore + std::unique_ptr<ceph::os::CyanStore> store; + std::unique_ptr<OSDMeta> meta_coll; + + std::unordered_map<spg_t, Ref<PG>> pgs; + OSDState state; + + /// _first_ epoch we were marked up (after this process started) + epoch_t boot_epoch = 0; + /// _most_recent_ epoch we were marked up + epoch_t up_epoch = 0; + //< epoch we last did a bind to new ip:ports + epoch_t bind_epoch = 0; + //< since when there is no more pending pg creates from mon + epoch_t last_pg_create_epoch = 0; + + OSDSuperblock superblock; + + // Dispatcher methods + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; + +public: + OSD(int id, uint32_t nonce); + ~OSD() override; + + seastar::future<> mkfs(uuid_d fsid); + + seastar::future<> start(); + seastar::future<> stop(); + +private: + seastar::future<> start_boot(); + seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap); + seastar::future<> _send_boot(); + + seastar::future<Ref<PG>> load_pg(spg_t pgid); + seastar::future<> load_pgs(); + + // OSDMapService methods + seastar::future<cached_map_t> get_map(epoch_t e) override; + cached_map_t get_map() const override; + + seastar::future<bufferlist> load_map_bl(epoch_t e); + void store_map_bl(ceph::os::Transaction& t, + epoch_t e, bufferlist&& bl); + seastar::future<> store_maps(ceph::os::Transaction& t, + epoch_t start, Ref<MOSDMap> m); + seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + + void write_superblock(ceph::os::Transaction& t); + seastar::future<> read_superblock(); + + seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn, + Ref<MOSDMap> m); + seastar::future<> committed_osd_maps(version_t first, + version_t last, + Ref<MOSDMap> m); + bool should_restart() const; + seastar::future<> restart(); + seastar::future<> shutdown(); + + seastar::future<> send_beacon(); + void update_heartbeat_peers(); +}; diff --git a/src/crimson/osd/osd_meta.cc b/src/crimson/osd/osd_meta.cc new file mode 100644 index 00000000..6eb225fe --- /dev/null +++ b/src/crimson/osd/osd_meta.cc @@ -0,0 +1,80 @@ +#include "osd_meta.h" + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_store.h" +#include "crimson/os/Transaction.h" + +void OSDMeta::create(ceph::os::Transaction& t) +{ + t.create_collection(coll->cid, 0); +} + +void OSDMeta::store_map(ceph::os::Transaction& t, + epoch_t e, const bufferlist& m) +{ + t.write(coll->cid, osdmap_oid(e), 0, m.length(), m); +} + +seastar::future<bufferlist> OSDMeta::load_map(epoch_t e) +{ + return store->read(coll, + osdmap_oid(e), 0, 0, + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); +} + +void OSDMeta::store_superblock(ceph::os::Transaction& t, + const OSDSuperblock& superblock) +{ + bufferlist bl; + encode(superblock, bl); + t.write(coll->cid, superblock_oid(), 0, bl.length(), bl); +} + +seastar::future<OSDSuperblock> OSDMeta::load_superblock() +{ + return store->read(coll, superblock_oid(), 0, 0) + .then([this] (bufferlist&& bl) { + auto p = bl.cbegin(); + OSDSuperblock superblock; + decode(superblock, p); + return seastar::make_ready_future<OSDSuperblock>(std::move(superblock)); + }); +} + +seastar::future<pg_pool_t, + std::string, + OSDMeta::ec_profile_t> +OSDMeta::load_final_pool_info(int64_t pool) { + return store->read(coll, final_pool_info_oid(pool), + 0, 0).then([this] (bufferlist&& bl) { + auto p = bl.cbegin(); + pg_pool_t pi; + string name; + ec_profile_t ec_profile; + decode(pi, p); + decode(name, p); + decode(ec_profile, p); + return seastar::make_ready_future<pg_pool_t, + string, + ec_profile_t>(std::move(pi), + std::move(name), + std::move(ec_profile)); + }); +} + +ghobject_t OSDMeta::osdmap_oid(epoch_t epoch) +{ + string name = fmt::format("osdmap.{}", epoch); + return ghobject_t(hobject_t(sobject_t(object_t(name), 0))); +} + +ghobject_t OSDMeta::final_pool_info_oid(int64_t pool) +{ + string name = fmt::format("final_pool_{}", pool); + return ghobject_t(hobject_t(sobject_t(object_t(name), CEPH_NOSNAP))); +} + +ghobject_t OSDMeta::superblock_oid() +{ + return ghobject_t(hobject_t(sobject_t(object_t("osd_superblock"), 0))); +} diff --git a/src/crimson/osd/osd_meta.h b/src/crimson/osd/osd_meta.h new file mode 100644 index 00000000..936d9548 --- /dev/null +++ b/src/crimson/osd/osd_meta.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <map> +#include <string> +#include <seastar/core/future.hh> +#include "osd/osd_types.h" + +namespace ceph::os { + class CyanStore; + class Collection; + class Transaction; +} + +/// metadata shared across PGs, or put in another way, +/// metadata not specific to certain PGs. +class OSDMeta { + template<typename T> using Ref = boost::intrusive_ptr<T>; + + ceph::os::CyanStore* store; + Ref<ceph::os::Collection> coll; + +public: + OSDMeta(Ref<ceph::os::Collection> coll, + ceph::os::CyanStore* store) + : store{store}, coll{coll} + {} + + + auto collection() { + return coll; + } + void create(ceph::os::Transaction& t); + + void store_map(ceph::os::Transaction& t, + epoch_t e, const bufferlist& m); + seastar::future<bufferlist> load_map(epoch_t e); + + void store_superblock(ceph::os::Transaction& t, + const OSDSuperblock& sb); + seastar::future<OSDSuperblock> load_superblock(); + + using ec_profile_t = std::map<std::string, std::string>; + seastar::future<pg_pool_t, + std::string, + ec_profile_t> load_final_pool_info(int64_t pool); +private: + static ghobject_t osdmap_oid(epoch_t epoch); + static ghobject_t final_pool_info_oid(int64_t pool); + static ghobject_t superblock_oid(); +}; diff --git a/src/crimson/osd/osdmap_service.h b/src/crimson/osd/osdmap_service.h new file mode 100644 index 00000000..0a3aaed3 --- /dev/null +++ b/src/crimson/osd/osdmap_service.h @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/smart_ptr/local_shared_ptr.hpp> + +#include "include/types.h" + +class OSDMap; + +class OSDMapService { +public: + using cached_map_t = boost::local_shared_ptr<OSDMap>; + virtual ~OSDMapService() = default; + virtual seastar::future<cached_map_t> get_map(epoch_t e) = 0; + /// get the latest map + virtual cached_map_t get_map() const = 0; +}; diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc new file mode 100644 index 00000000..bc7b8c2d --- /dev/null +++ b/src/crimson/osd/pg.cc @@ -0,0 +1,6 @@ +#include "pg.h" + +PG::PG(pg_pool_t&& pool, std::string&& name, ec_profile_t&& ec_profile) +{ + // TODO +} diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h new file mode 100644 index 00000000..4bb672f4 --- /dev/null +++ b/src/crimson/osd/pg.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include <seastar/core/future.hh> + +#include "osd/osd_types.h" + +template<typename T> using Ref = boost::intrusive_ptr<T>; + +class PG : public boost::intrusive_ref_counter< + PG, + boost::thread_unsafe_counter> +{ + using ec_profile_t = std::map<std::string,std::string>; +public: + PG(pg_pool_t&& pool, std::string&& name, ec_profile_t&& ec_profile); +}; diff --git a/src/crimson/osd/pg_meta.cc b/src/crimson/osd/pg_meta.cc new file mode 100644 index 00000000..2098e50a --- /dev/null +++ b/src/crimson/osd/pg_meta.cc @@ -0,0 +1,104 @@ +#include "pg_meta.h" + +#include <string_view> + +#include "crimson/os/cyan_collection.h" +#include "crimson/os/cyan_store.h" + +// prefix pgmeta_oid keys with _ so that PGLog::read_log_and_missing() can +// easily skip them + +static const string_view infover_key = "_infover"sv; +static const string_view info_key = "_info"sv; +static const string_view biginfo_key = "_biginfo"sv; +static const string_view epoch_key = "_epoch"sv; +static const string_view fastinfo_key = "_fastinfo"sv; + +using ceph::os::CyanStore; + +PGMeta::PGMeta(CyanStore* store, spg_t pgid) + : store{store}, + pgid{pgid} +{} + +namespace { + template<typename T> + std::optional<T> find_value(const CyanStore::omap_values_t& values, + string_view key) + { + auto found = values.find(key); + if (found == values.end()) { + return {}; + } + auto p = found->second.cbegin(); + T value; + decode(value, p); + return std::make_optional(std::move(value)); + } +} +seastar::future<epoch_t> PGMeta::get_epoch() +{ + auto ch = store->open_collection(coll_t{pgid}); + return store->omap_get_values(ch, + pgid.make_pgmeta_oid(), + {string{infover_key}, + string{epoch_key}}).then( + [](auto&& values) { + { + // sanity check + auto infover = find_value<__u8>(values, infover_key); + assert(infover); + if (*infover < 10) { + throw std::runtime_error("incompatible pg meta"); + } + } + { + auto epoch = find_value<epoch_t>(values, epoch_key); + assert(epoch); + return seastar::make_ready_future<epoch_t>(*epoch); + } + }); +} + +seastar::future<pg_info_t, PastIntervals> PGMeta::load() +{ + auto ch = store->open_collection(coll_t{pgid}); + return store->omap_get_values(ch, + pgid.make_pgmeta_oid(), + {string{infover_key}, + string{info_key}, + string{biginfo_key}, + string{fastinfo_key}}).then( + [this](auto&& values) { + { + // sanity check + auto infover = find_value<__u8>(values, infover_key); + assert(infover); + if (infover < 10) { + throw std::runtime_error("incompatible pg meta"); + } + } + pg_info_t info; + { + auto found = find_value<pg_info_t>(values, info_key); + assert(found); + info = *std::move(found); + } + PastIntervals past_intervals; + { + using biginfo_t = std::pair<PastIntervals, decltype(info.purged_snaps)>; + auto big_info = find_value<biginfo_t>(values, biginfo_key); + assert(big_info); + past_intervals = std::move(big_info->first); + info.purged_snaps = std::move(big_info->second); + } + { + auto fast_info = find_value<pg_fast_info_t>(values, fastinfo_key); + assert(fast_info); + fast_info->try_apply_to(&info); + } + return seastar::make_ready_future<pg_info_t, PastIntervals>( + std::move(info), + std::move(past_intervals)); + }); +} diff --git a/src/crimson/osd/pg_meta.h b/src/crimson/osd/pg_meta.h new file mode 100644 index 00000000..10f2234a --- /dev/null +++ b/src/crimson/osd/pg_meta.h @@ -0,0 +1,22 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/future.hh> +#include "osd/osd_types.h" + +namespace ceph::os { + class CyanStore; +} + +/// PG related metadata +class PGMeta +{ + ceph::os::CyanStore* store; + const spg_t pgid; +public: + PGMeta(ceph::os::CyanStore *store, spg_t pgid); + seastar::future<epoch_t> get_epoch(); + seastar::future<pg_info_t, PastIntervals> load(); +}; diff --git a/src/crimson/osd/state.h b/src/crimson/osd/state.h new file mode 100644 index 00000000..4c445348 --- /dev/null +++ b/src/crimson/osd/state.h @@ -0,0 +1,71 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <string_view> +#include <ostream> + +class OSDMap; + +class OSDState { + + enum class State { + INITIALIZING, + PREBOOT, + BOOTING, + ACTIVE, + STOPPING, + WAITING_FOR_HEALTHY, + }; + + State state = State::INITIALIZING; + +public: + bool is_initializing() const { + return state == State::INITIALIZING; + } + bool is_preboot() const { + return state == State::PREBOOT; + } + bool is_booting() const { + return state == State::BOOTING; + } + bool is_active() const { + return state == State::ACTIVE; + } + bool is_stopping() const { + return state == State::STOPPING; + } + bool is_waiting_for_healthy() const { + return state == State::WAITING_FOR_HEALTHY; + } + void set_preboot() { + state = State::PREBOOT; + } + void set_booting() { + state = State::BOOTING; + } + void set_active() { + state = State::ACTIVE; + } + void set_stopping() { + state = State::STOPPING; + } + std::string_view to_string() const { + switch (state) { + case State::INITIALIZING: return "initializing"; + case State::PREBOOT: return "preboot"; + case State::BOOTING: return "booting"; + case State::ACTIVE: return "active"; + case State::STOPPING: return "stopping"; + case State::WAITING_FOR_HEALTHY: return "waiting_for_healthy"; + default: return "???"; + } + } +}; + +inline std::ostream& +operator<<(std::ostream& os, const OSDState& s) { + return os << s.to_string(); +} diff --git a/src/crimson/thread/Condition.h b/src/crimson/thread/Condition.h new file mode 100644 index 00000000..2a5c643d --- /dev/null +++ b/src/crimson/thread/Condition.h @@ -0,0 +1,36 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/reactor.hh> +#include <sys/eventfd.h> + +namespace ceph::thread { + +/// a synchronization primitive can be used to block a seastar thread, until +/// another thread notifies it. +class Condition { + seastar::file_desc file_desc; + int fd; + seastar::pollable_fd_state fd_state; + eventfd_t event = 0; +public: + Condition() + : file_desc{seastar::file_desc::eventfd(0, 0)}, + fd(file_desc.get()), + fd_state{std::move(file_desc)} + {} + seastar::future<> wait() { + return seastar::engine().read_some(fd_state, &event, sizeof(event)) + .then([](size_t) { + return seastar::now(); + }); + } + void notify() { + eventfd_t result = 1; + ::eventfd_write(fd, result); + } +}; + +} // namespace ceph::thread diff --git a/src/crimson/thread/ThreadPool.cc b/src/crimson/thread/ThreadPool.cc new file mode 100644 index 00000000..9df849b5 --- /dev/null +++ b/src/crimson/thread/ThreadPool.cc @@ -0,0 +1,76 @@ +#include "ThreadPool.h" + +#include <pthread.h> +#include "crimson/net/Config.h" +#include "include/intarith.h" + +#include "include/ceph_assert.h" + +namespace ceph::thread { + +ThreadPool::ThreadPool(size_t n_threads, + size_t queue_sz, + unsigned cpu_id) + : queue_size{round_up_to(queue_sz, seastar::smp::count)}, + pending{queue_size} +{ + for (size_t i = 0; i < n_threads; i++) { + threads.emplace_back([this, cpu_id] { + pin(cpu_id); + loop(); + }); + } +} + +ThreadPool::~ThreadPool() +{ + for (auto& thread : threads) { + thread.join(); + } +} + +void ThreadPool::pin(unsigned cpu_id) +{ + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpu_id, &cs); + [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(), + sizeof(cs), &cs); + ceph_assert(r == 0); +} + +void ThreadPool::loop() +{ + for (;;) { + WorkItem* work_item = nullptr; + { + std::unique_lock lock{mutex}; + cond.wait_for(lock, + ceph::net::conf.threadpool_empty_queue_max_wait, + [this, &work_item] { + return pending.pop(work_item) || is_stopping(); + }); + } + if (work_item) { + work_item->process(); + } else if (is_stopping()) { + break; + } + } +} + +seastar::future<> ThreadPool::start() +{ + auto slots_per_shard = queue_size / seastar::smp::count; + return submit_queue.start(slots_per_shard); +} + +seastar::future<> ThreadPool::stop() +{ + return submit_queue.stop().then([this] { + stopping = true; + cond.notify_all(); + }); +} + +} // namespace ceph::thread diff --git a/src/crimson/thread/ThreadPool.h b/src/crimson/thread/ThreadPool.h new file mode 100644 index 00000000..cfd72d2a --- /dev/null +++ b/src/crimson/thread/ThreadPool.h @@ -0,0 +1,118 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +#include <atomic> +#include <condition_variable> +#include <tuple> +#include <type_traits> +#include <boost/lockfree/queue.hpp> +#include <boost/optional.hpp> +#include <seastar/core/future.hh> +#include <seastar/core/gate.hh> +#include <seastar/core/semaphore.hh> +#include <seastar/core/sharded.hh> + +#include "Condition.h" + +namespace ceph::thread { + +struct WorkItem { + virtual ~WorkItem() {} + virtual void process() = 0; +}; + +template<typename Func, typename T = std::invoke_result_t<Func>> +struct Task final : WorkItem { + Func func; + seastar::future_state<T> state; + ceph::thread::Condition on_done; +public: + explicit Task(Func&& f) + : func(std::move(f)) + {} + void process() override { + try { + state.set(func()); + } catch (...) { + state.set_exception(std::current_exception()); + } + on_done.notify(); + } + seastar::future<T> get_future() { + return on_done.wait().then([this] { + return seastar::make_ready_future<T>(state.get0(std::move(state).get())); + }); + } +}; + +struct SubmitQueue { + seastar::semaphore free_slots; + seastar::gate pending_tasks; + explicit SubmitQueue(size_t num_free_slots) + : free_slots(num_free_slots) + {} + seastar::future<> stop() { + return pending_tasks.close(); + } +}; + +/// an engine for scheduling non-seastar tasks from seastar fibers +class ThreadPool { + std::atomic<bool> stopping = false; + std::mutex mutex; + std::condition_variable cond; + std::vector<std::thread> threads; + seastar::sharded<SubmitQueue> submit_queue; + const size_t queue_size; + boost::lockfree::queue<WorkItem*> pending; + + void loop(); + bool is_stopping() const { + return stopping.load(std::memory_order_relaxed); + } + static void pin(unsigned cpu_id); + seastar::semaphore& local_free_slots() { + return submit_queue.local().free_slots; + } + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; +public: + /** + * @param queue_sz the depth of pending queue. before a task is scheduled, + * it waits in this queue. we will round this number to + * multiple of the number of cores. + * @param n_threads the number of threads in this thread pool. + * @param cpu the CPU core to which this thread pool is assigned + * @note each @c Task has its own ceph::thread::Condition, which possesses + * possesses an fd, so we should keep the size of queue under a reasonable + * limit. + */ + ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu); + ~ThreadPool(); + seastar::future<> start(); + seastar::future<> stop(); + template<typename Func, typename...Args> + auto submit(Func&& func, Args&&... args) { + auto packaged = [func=std::move(func), + args=std::forward_as_tuple(args...)] { + return std::apply(std::move(func), std::move(args)); + }; + return seastar::with_gate(submit_queue.local().pending_tasks, + [packaged=std::move(packaged), this] { + return local_free_slots().wait() + .then([packaged=std::move(packaged), this] { + auto task = new Task{std::move(packaged)}; + auto fut = task->get_future(); + pending.push(task); + cond.notify_one(); + return fut.finally([task, this] { + local_free_slots().signal(); + delete task; + }); + }); + }); + } +}; + +} // namespace ceph::thread diff --git a/src/crimson/thread/Throttle.cc b/src/crimson/thread/Throttle.cc new file mode 100644 index 00000000..1d67e723 --- /dev/null +++ b/src/crimson/thread/Throttle.cc @@ -0,0 +1,59 @@ +#include "Throttle.h" + +namespace ceph::thread { + +int64_t Throttle::take(int64_t c) +{ + if (!max) { + return 0; + } + count += c; + return count; +} + +int64_t Throttle::put(int64_t c) +{ + if (!max) { + return 0; + } + if (!c) { + return count; + } + on_free_slots.signal(); + count -= c; + return count; +} + +seastar::future<> Throttle::get(size_t c) +{ + if (!max) { + return seastar::now(); + } + return on_free_slots.wait([this, c] { + return !_should_wait(c); + }).then([this, c] { + count += c; + return seastar::now(); + }); +} + +void Throttle::reset_max(size_t m) { + if (max == m) { + return; + } + + if (m > max) { + on_free_slots.signal(); + } + max = m; +} + +bool Throttle::_should_wait(size_t c) const { + if (!max) { + return false; + } + return ((c <= max && count + c > max) || // normally stay under max + (c >= max && count > max)); // except for large c +} + +} // namespace ceph::thread::seastar diff --git a/src/crimson/thread/Throttle.h b/src/crimson/thread/Throttle.h new file mode 100644 index 00000000..c2342171 --- /dev/null +++ b/src/crimson/thread/Throttle.h @@ -0,0 +1,36 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <seastar/core/condition-variable.hh> + +#include "common/ThrottleInterface.h" + +namespace ceph::thread { + +class Throttle final : public ThrottleInterface { + size_t max = 0; + size_t count = 0; + // we cannot change the "count" of seastar::semaphore after it is created, + // so use condition_variable instead. + seastar::condition_variable on_free_slots; +public: + explicit Throttle(size_t m) + : max(m) + {} + int64_t take(int64_t c = 1) override; + int64_t put(int64_t c = 1) override; + seastar::future<> get(size_t c); + size_t get_current() const { + return count; + } + size_t get_max() const { + return max; + } + void reset_max(size_t m); +private: + bool _should_wait(size_t c) const; +}; + +} // namespace ceph::thread |