From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/crimson/os/alienstore/CMakeLists.txt | 76 ++++ src/crimson/os/alienstore/alien_collection.h | 26 ++ src/crimson/os/alienstore/alien_store.cc | 575 +++++++++++++++++++++++++++ src/crimson/os/alienstore/alien_store.h | 125 ++++++ src/crimson/os/alienstore/thread_pool.cc | 80 ++++ src/crimson/os/alienstore/thread_pool.h | 132 ++++++ 6 files changed, 1014 insertions(+) create mode 100644 src/crimson/os/alienstore/CMakeLists.txt create mode 100644 src/crimson/os/alienstore/alien_collection.h create mode 100644 src/crimson/os/alienstore/alien_store.cc create mode 100644 src/crimson/os/alienstore/alien_store.h create mode 100644 src/crimson/os/alienstore/thread_pool.cc create mode 100644 src/crimson/os/alienstore/thread_pool.h (limited to 'src/crimson/os/alienstore') diff --git a/src/crimson/os/alienstore/CMakeLists.txt b/src/crimson/os/alienstore/CMakeLists.txt new file mode 100644 index 000000000..659a3c6ce --- /dev/null +++ b/src/crimson/os/alienstore/CMakeLists.txt @@ -0,0 +1,76 @@ +include_directories(SYSTEM "${CMAKE_SOURCE_DIR}/src/rocksdb/include") + +add_library(alien::cflags INTERFACE IMPORTED) +set_target_properties(alien::cflags PROPERTIES + INTERFACE_COMPILE_DEFINITIONS "WITH_SEASTAR;WITH_ALIEN" + INTERFACE_INCLUDE_DIRECTORIES $) + +add_library(crimson-alien-common STATIC + ${PROJECT_SOURCE_DIR}/src/common/admin_socket.cc + ${PROJECT_SOURCE_DIR}/src/common/blkdev.cc + ${PROJECT_SOURCE_DIR}/src/common/ceph_context.cc + ${PROJECT_SOURCE_DIR}/src/common/ceph_crypto.cc + ${PROJECT_SOURCE_DIR}/src/common/condition_variable_debug.cc + ${PROJECT_SOURCE_DIR}/src/common/cmdparse.cc + ${PROJECT_SOURCE_DIR}/src/common/Finisher.cc + ${PROJECT_SOURCE_DIR}/src/common/HeartbeatMap.cc + ${PROJECT_SOURCE_DIR}/src/common/PluginRegistry.cc + ${PROJECT_SOURCE_DIR}/src/common/lockdep.cc + ${PROJECT_SOURCE_DIR}/src/common/mutex_debug.cc + ${PROJECT_SOURCE_DIR}/src/common/perf_counters.cc + ${PROJECT_SOURCE_DIR}/src/common/perf_counters_collection.cc + ${PROJECT_SOURCE_DIR}/src/common/RefCountedObj.cc + ${PROJECT_SOURCE_DIR}/src/common/shared_mutex_debug.cc + ${PROJECT_SOURCE_DIR}/src/common/SubProcess.cc + ${PROJECT_SOURCE_DIR}/src/common/Throttle.cc + ${PROJECT_SOURCE_DIR}/src/common/Timer.cc + ${PROJECT_SOURCE_DIR}/src/common/TrackedOp.cc + ${PROJECT_SOURCE_DIR}/src/common/WorkQueue.cc + ${PROJECT_SOURCE_DIR}/src/common/util.cc + ${PROJECT_SOURCE_DIR}/src/crush/CrushLocation.cc + ${PROJECT_SOURCE_DIR}/src/global/global_context.cc + $ + $) +target_link_libraries(crimson-alien-common + crimson-common + alien::cflags) + +set(alien_store_srcs + alien_store.cc + thread_pool.cc + ${PROJECT_SOURCE_DIR}/src/os/ObjectStore.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/Allocator.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/AvlAllocator.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/BitmapFreelistManager.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueFS.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/bluefs_types.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueRocksEnv.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueStore.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/bluestore_types.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/fastbmap_allocator_impl.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/FreelistManager.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/HybridAllocator.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/StupidAllocator.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/BitmapAllocator.cc) +if(WITH_ZBD) + list(APPEND alien_store_srcs + ${PROJECT_SOURCE_DIR}/src/os/bluestore/zoned_types.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/ZonedFreelistManager.cc + ${PROJECT_SOURCE_DIR}/src/os/bluestore/ZonedAllocator.cc) +endif() +add_library(crimson-alienstore STATIC + ${alien_store_srcs}) +if(WITH_LTTNG) + add_dependencies(crimson-alienstore bluestore-tp) +endif() +target_link_libraries(crimson-alienstore + PRIVATE + alien::cflags + fmt::fmt + kv + heap_profiler + crimson-alien-common + ${BLKID_LIBRARIES} + ${UDEV_LIBRARIES} + crimson + blk) diff --git a/src/crimson/os/alienstore/alien_collection.h b/src/crimson/os/alienstore/alien_collection.h new file mode 100644 index 000000000..98b8fdef4 --- /dev/null +++ b/src/crimson/os/alienstore/alien_collection.h @@ -0,0 +1,26 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "os/ObjectStore.h" + +#include "crimson/os/futurized_collection.h" +#include "crimson/os/futurized_store.h" +#include "alien_store.h" + +namespace crimson::os { + +class AlienCollection final : public FuturizedCollection { +public: + AlienCollection(ObjectStore::CollectionHandle ch) + : FuturizedCollection(ch->cid), + collection(ch) {} + + ~AlienCollection() {} + +private: + ObjectStore::CollectionHandle collection; + friend AlienStore; +}; +} diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc new file mode 100644 index 000000000..cb5553254 --- /dev/null +++ b/src/crimson/os/alienstore/alien_store.cc @@ -0,0 +1,575 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "alien_collection.h" +#include "alien_store.h" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "common/ceph_context.h" +#include "global/global_context.h" +#include "include/Context.h" +#include "os/bluestore/BlueStore.h" +#include "os/ObjectStore.h" +#include "os/Transaction.h" + +#include "crimson/common/log.h" +#include "crimson/os/futurized_store.h" + +namespace { + seastar::logger& logger() + { + return crimson::get_logger(ceph_subsys_filestore); + } + +class OnCommit final: public Context +{ + int cpuid; + Context *oncommit; + seastar::promise<> &alien_done; +public: + OnCommit( + int id, + seastar::promise<> &done, + Context *oncommit, + ceph::os::Transaction& txn) + : cpuid(id), oncommit(oncommit), + alien_done(done) {} + + void finish(int) final { + return seastar::alien::submit_to(cpuid, [this] { + if (oncommit) oncommit->complete(0); + alien_done.set_value(); + return seastar::make_ready_future<>(); + }).wait(); + } +}; +} + +namespace crimson::os { + +AlienStore::AlienStore(const std::string& path, const ConfigValues& values) + : path{path} +{ + cct = std::make_unique(CEPH_ENTITY_TYPE_OSD); + g_ceph_context = cct.get(); + cct->_conf.set_config_values(values); + store = std::make_unique(cct.get(), path); + + long cpu_id = 0; + if (long nr_cpus = sysconf(_SC_NPROCESSORS_ONLN); nr_cpus != -1) { + cpu_id = nr_cpus - 1; + } else { + logger().error("{}: unable to get nproc: {}", __func__, errno); + cpu_id = -1; + } + tp = std::make_unique(1, 128, cpu_id); +} + +seastar::future<> AlienStore::start() +{ + return tp->start(); +} + +seastar::future<> AlienStore::stop() +{ + return tp->submit([this] { + for (auto [cid, ch]: coll_map) + static_cast(ch.get())->collection.reset(); + store.reset(); + }).then([this] { + return tp->stop(); + }); +} + +AlienStore::~AlienStore() = default; + +seastar::future<> AlienStore::mount() +{ + logger().debug("{}", __func__); + return tp->submit([this] { + return store->mount(); + }).then([] (int r) { + assert(r == 0); + return seastar::now(); + }); +} + +seastar::future<> AlienStore::umount() +{ + logger().info("{}", __func__); + return transaction_gate.close().then([this] { + return tp->submit([this] { + return store->umount(); + }); + }).then([] (int r) { + assert(r == 0); + return seastar::now(); + }); +} + +seastar::future<> AlienStore::mkfs(uuid_d osd_fsid) +{ + logger().debug("{}", __func__); + store->set_fsid(osd_fsid); + return tp->submit([this] { + return store->mkfs(); + }).then([] (int r) { + assert(r == 0); + return seastar::now(); + }); +} + +seastar::future, ghobject_t>> +AlienStore::list_objects(CollectionRef ch, + const ghobject_t& start, + const ghobject_t& end, + uint64_t limit) const +{ + logger().debug("{}", __func__); + return seastar::do_with(std::vector(), ghobject_t(), + [=] (auto &objects, auto &next) { + objects.reserve(limit); + return tp->submit([=, &objects, &next] { + auto c = static_cast(ch.get()); + return store->collection_list(c->collection, start, end, + store->get_ideal_list_max(), + &objects, &next); + }).then([&objects, &next] (int r) { + assert(r == 0); + return seastar::make_ready_future, ghobject_t>>( + std::make_tuple(std::move(objects), std::move(next))); + }); + }); +} + +seastar::future AlienStore::create_new_collection(const coll_t& cid) +{ + logger().debug("{}", __func__); + return tp->submit([this, cid] { + return store->create_new_collection(cid); + }).then([this, cid] (ObjectStore::CollectionHandle c) { + CollectionRef ch; + auto cp = coll_map.find(c->cid); + if (cp == coll_map.end()) { + ch = new AlienCollection(c); + coll_map[c->cid] = ch; + } else { + ch = cp->second; + auto ach = static_cast(ch.get()); + if (ach->collection != c) { + ach->collection = c; + } + } + return seastar::make_ready_future(ch); + }); + +} + +seastar::future AlienStore::open_collection(const coll_t& cid) +{ + logger().debug("{}", __func__); + return tp->submit([this, cid] { + return store->open_collection(cid); + }).then([this] (ObjectStore::CollectionHandle c) { + CollectionRef ch; + auto cp = coll_map.find(c->cid); + if (cp == coll_map.end()){ + ch = new AlienCollection(c); + coll_map[c->cid] = ch; + } else { + ch = cp->second; + auto ach = static_cast(ch.get()); + if (ach->collection != c){ + ach->collection = c; + } + } + return seastar::make_ready_future(ch); + }); +} + +seastar::future> AlienStore::list_collections() +{ + logger().debug("{}", __func__); + + return seastar::do_with(std::vector{}, [=] (auto &ls) { + return tp->submit([this, &ls] { + return store->list_collections(ls); + }).then([&ls] (int r) { + assert(r == 0); + return seastar::make_ready_future>(std::move(ls)); + }); + }); +} + +AlienStore::read_errorator::future +AlienStore::read(CollectionRef ch, + const ghobject_t& oid, + uint64_t offset, + size_t len, + uint32_t op_flags) +{ + logger().debug("{}", __func__); + return seastar::do_with(ceph::bufferlist{}, [=] (auto &bl) { + return tp->submit([=, &bl] { + auto c = static_cast(ch.get()); + return store->read(c->collection, oid, offset, len, bl, op_flags); + }).then([&bl] (int r) -> read_errorator::future { + if (r == -ENOENT) { + return crimson::ct_error::enoent::make(); + } else if (r == -EIO) { + return crimson::ct_error::input_output_error::make(); + } else { + return read_errorator::make_ready_future(std::move(bl)); + } + }); + }); +} + +AlienStore::read_errorator::future +AlienStore::readv(CollectionRef ch, + const ghobject_t& oid, + interval_set& m, + uint32_t op_flags) +{ + logger().debug("{}", __func__); + return seastar::do_with(ceph::bufferlist{}, + [this, ch, oid, &m, op_flags](auto& bl) { + return tp->submit([this, ch, oid, &m, op_flags, &bl] { + auto c = static_cast(ch.get()); + return store->readv(c->collection, oid, m, bl, op_flags); + }).then([&bl](int r) -> read_errorator::future { + if (r == -ENOENT) { + return crimson::ct_error::enoent::make(); + } else if (r == -EIO) { + return crimson::ct_error::input_output_error::make(); + } else { + return read_errorator::make_ready_future(std::move(bl)); + } + }); + }); +} + +AlienStore::get_attr_errorator::future +AlienStore::get_attr(CollectionRef ch, + const ghobject_t& oid, + std::string_view name) const +{ + logger().debug("{}", __func__); + return seastar::do_with(ceph::bufferptr{}, [=] (auto &value) { + return tp->submit([=, &value] { + auto c =static_cast(ch.get()); + return store->getattr(c->collection, oid, + static_cast(name).c_str(), value); + }).then([oid, &value] (int r) -> get_attr_errorator::future { + if (r == -ENOENT) { + return crimson::ct_error::enoent::make(); + } else if (r == -ENODATA) { + return crimson::ct_error::enodata::make(); + } else { + return get_attr_errorator::make_ready_future( + std::move(value)); + } + }); + }); +} + +AlienStore::get_attrs_ertr::future +AlienStore::get_attrs(CollectionRef ch, + const ghobject_t& oid) +{ + logger().debug("{}", __func__); + return seastar::do_with(attrs_t{}, [=] (auto &aset) { + return tp->submit([=, &aset] { + auto c = static_cast(ch.get()); + return store->getattrs(c->collection, oid, + reinterpret_cast&>(aset)); + }).then([&aset] (int r) -> get_attrs_ertr::future { + if (r == -ENOENT) { + return crimson::ct_error::enoent::make(); + } else { + return get_attrs_ertr::make_ready_future(std::move(aset)); + } + }); + }); +} + +auto AlienStore::omap_get_values(CollectionRef ch, + const ghobject_t& oid, + const set& keys) + -> read_errorator::future +{ + logger().debug("{}", __func__); + return seastar::do_with(omap_values_t{}, [=] (auto &values) { + return tp->submit([=, &values] { + auto c = static_cast(ch.get()); + return store->omap_get_values(c->collection, oid, keys, + reinterpret_cast*>(&values)); + }).then([&values] (int r) -> read_errorator::future { + if (r == -ENOENT) { + return crimson::ct_error::enoent::make(); + } else { + assert(r == 0); + return read_errorator::make_ready_future(std::move(values)); + } + }); + }); +} + +auto AlienStore::omap_get_values(CollectionRef ch, + const ghobject_t &oid, + const std::optional &start) + -> read_errorator::future> +{ + logger().debug("{} with_start", __func__); + return seastar::do_with(omap_values_t{}, [=] (auto &values) { + return tp->submit([=, &values] { + auto c = static_cast(ch.get()); + return store->omap_get_values(c->collection, oid, start, + reinterpret_cast*>(&values)); + }).then([&values] (int r) + -> read_errorator::future> { + if (r == -ENOENT) { + return crimson::ct_error::enoent::make(); + } else if (r < 0){ + logger().error("omap_get_values(start): {}", r); + return crimson::ct_error::input_output_error::make(); + } else { + return read_errorator::make_ready_future>( + std::make_tuple(true, std::move(values))); + } + }); + }); +} + +seastar::future<> AlienStore::do_transaction(CollectionRef ch, + ceph::os::Transaction&& txn) +{ + logger().debug("{}", __func__); + auto id = seastar::this_shard_id(); + auto done = seastar::promise<>(); + return seastar::do_with( + std::move(txn), + std::move(done), + [this, ch, id] (auto &txn, auto &done) { + return seastar::with_gate(transaction_gate, [this, ch, id, &txn, &done] { + return tp_mutex.lock().then ([this, ch, id, &txn, &done] { + Context *crimson_wrapper = + ceph::os::Transaction::collect_all_contexts(txn); + return tp->submit([this, ch, id, crimson_wrapper, &txn, &done] { + txn.register_on_commit(new OnCommit(id, done, crimson_wrapper, txn)); + auto c = static_cast(ch.get()); + return store->queue_transaction(c->collection, std::move(txn)); + }); + }).then([this, &done] (int r) { + assert(r == 0); + tp_mutex.unlock(); + return done.get_future(); + }); + }); + }); +} + +seastar::future<> AlienStore::write_meta(const std::string& key, + const std::string& value) +{ + logger().debug("{}", __func__); + return tp->submit([=] { + return store->write_meta(key, value); + }).then([] (int r) { + assert(r == 0); + return seastar::make_ready_future<>(); + }); +} + +seastar::future> +AlienStore::read_meta(const std::string& key) +{ + logger().debug("{}", __func__); + return tp->submit([this, key] { + std::string value; + int r = store->read_meta(key, &value); + if (r > 0) { + value.resize(r); + boost::algorithm::trim_right_if(value, + [] (unsigned char c) {return isspace(c);}); + } else { + value.clear(); + } + return std::make_pair(r, value); + }).then([] (auto entry) { + return seastar::make_ready_future>( + std::move(entry)); + }); +} + +uuid_d AlienStore::get_fsid() const +{ + logger().debug("{}", __func__); + return store->get_fsid(); +} + +seastar::future AlienStore::stat() const +{ + logger().info("{}", __func__); + return seastar::do_with(store_statfs_t{}, [this] (store_statfs_t &st) { + return tp->submit([this, &st] { + return store->statfs(&st, nullptr); + }).then([&st] (int r) { + assert(r == 0); + return seastar::make_ready_future(std::move(st)); + }); + }); +} + +unsigned AlienStore::get_max_attr_name_length() const +{ + logger().info("{}", __func__); + return 256; +} + +seastar::future AlienStore::stat( + CollectionRef ch, + const ghobject_t& oid) +{ + return seastar::do_with((struct stat){}, [this, ch, oid](auto& st) { + return tp->submit([this, ch, oid, &st] { + auto c = static_cast(ch.get()); + store->stat(c->collection, oid, &st); + return st; + }); + }); +} + +auto AlienStore::omap_get_header(CollectionRef ch, + const ghobject_t& oid) + -> read_errorator::future +{ + return seastar::do_with(ceph::bufferlist(), [=](auto& bl) { + return tp->submit([=, &bl] { + auto c = static_cast(ch.get()); + return store->omap_get_header(c->collection, oid, &bl); + }).then([&bl] (int r) -> read_errorator::future { + if (r == -ENOENT) { + return crimson::ct_error::enoent::make(); + } else if (r < 0) { + logger().error("omap_get_header: {}", r); + return crimson::ct_error::input_output_error::make(); + } else { + return read_errorator::make_ready_future(std::move(bl)); + } + }); + }); +} + +seastar::future> AlienStore::fiemap( + CollectionRef ch, + const ghobject_t& oid, + uint64_t off, + uint64_t len) +{ + return seastar::do_with(std::map(), [=](auto& destmap) { + return tp->submit([=, &destmap] { + auto c = static_cast(ch.get()); + return store->fiemap(c->collection, oid, off, len, destmap); + }).then([&destmap] (int i) { + return seastar::make_ready_future + > + (std::move(destmap)); + }); + }); +} + +seastar::future AlienStore::get_omap_iterator( + CollectionRef ch, + const ghobject_t& oid) +{ + return tp->submit([=] { + auto c = static_cast(ch.get()); + auto iter = store->get_omap_iterator(c->collection, oid); + return FuturizedStore::OmapIteratorRef( + new AlienStore::AlienOmapIterator(iter, + this)); + }); +} + +//TODO: each iterator op needs one submit, this is not efficient, +// needs further optimization. +seastar::future<> AlienStore::AlienOmapIterator::seek_to_first() +{ + return store->tp->submit([=] { + return iter->seek_to_first(); + }).then([] (int r) { + assert(r == 0); + return seastar::now(); + }); +} + +seastar::future<> AlienStore::AlienOmapIterator::upper_bound( + const std::string& after) +{ + return store->tp->submit([this, after] { + return iter->upper_bound(after); + }).then([] (int r) { + assert(r == 0); + return seastar::now(); + }); +} + +seastar::future<> AlienStore::AlienOmapIterator::lower_bound( + const std::string& to) +{ + return store->tp->submit([this, to] { + return iter->lower_bound(to); + }).then([] (int r) { + assert(r == 0); + return seastar::now(); + }); +} + +seastar::future<> AlienStore::AlienOmapIterator::next() +{ + return store->tp->submit([this] { + return iter->next(); + }).then([] (int r) { + assert(r == 0); + return seastar::now(); + }); +} + +bool AlienStore::AlienOmapIterator::valid() const +{ + return iter->valid(); +} + +std::string AlienStore::AlienOmapIterator::key() +{ + return iter->key(); +} + +seastar::future AlienStore::AlienOmapIterator::tail_key() +{ + return store->tp->submit([this] { + return iter->tail_key(); + }); +} + +ceph::buffer::list AlienStore::AlienOmapIterator::value() +{ + return iter->value(); +} + +int AlienStore::AlienOmapIterator::status() const +{ + return iter->status(); +} + +} diff --git a/src/crimson/os/alienstore/alien_store.h b/src/crimson/os/alienstore/alien_store.h new file mode 100644 index 000000000..92739340e --- /dev/null +++ b/src/crimson/os/alienstore/alien_store.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "common/ceph_context.h" +#include "os/ObjectStore.h" +#include "osd/osd_types.h" + +#include "crimson/os/alienstore/thread_pool.h" +#include "crimson/os/futurized_collection.h" +#include "crimson/os/futurized_store.h" + +namespace ceph::os { +class Transaction; +} + +namespace crimson::os { +class AlienStore final : public FuturizedStore { +public: + class AlienOmapIterator final : public OmapIterator { + public: + AlienOmapIterator(ObjectMap::ObjectMapIterator& it, + AlienStore* store) : iter(it), store(store) {} + seastar::future<> seek_to_first(); + seastar::future<> upper_bound(const std::string& after); + seastar::future<> lower_bound(const std::string& to); + bool valid() const; + seastar::future<> next(); + std::string key(); + seastar::future tail_key(); + ceph::buffer::list value(); + int status() const; + private: + ObjectMap::ObjectMapIterator iter; + AlienStore* store; + }; + AlienStore(const std::string& path, const ConfigValues& values); + ~AlienStore() final; + + seastar::future<> start() final; + seastar::future<> stop() final; + seastar::future<> mount() final; + seastar::future<> umount() final; + + seastar::future<> mkfs(uuid_d new_osd_fsid) final; + read_errorator::future read(CollectionRef c, + const ghobject_t& oid, + uint64_t offset, + size_t len, + uint32_t op_flags = 0) final; + read_errorator::future readv(CollectionRef c, + const ghobject_t& oid, + interval_set& m, + uint32_t op_flags = 0) final; + + + get_attr_errorator::future get_attr(CollectionRef c, + const ghobject_t& oid, + std::string_view name) const final; + get_attrs_ertr::future get_attrs(CollectionRef c, + const ghobject_t& oid) final; + + read_errorator::future omap_get_values( + CollectionRef c, + const ghobject_t& oid, + const omap_keys_t& keys) final; + + /// Retrieves paged set of values > start (if present) + read_errorator::future> omap_get_values( + CollectionRef c, ///< [in] collection + const ghobject_t &oid, ///< [in] oid + const std::optional &start ///< [in] start, empty for begin + ) final; ///< @return values.empty() iff done + + seastar::future, ghobject_t>> list_objects( + CollectionRef c, + const ghobject_t& start, + const ghobject_t& end, + uint64_t limit) const final; + + seastar::future create_new_collection(const coll_t& cid) final; + seastar::future open_collection(const coll_t& cid) final; + seastar::future> list_collections() final; + + seastar::future<> do_transaction(CollectionRef c, + ceph::os::Transaction&& txn) final; + + seastar::future<> write_meta(const std::string& key, + const std::string& value) final; + seastar::future> read_meta( + const std::string& key) final; + uuid_d get_fsid() const final; + seastar::future stat() const final; + unsigned get_max_attr_name_length() const final; + seastar::future stat( + CollectionRef, + const ghobject_t&) final; + read_errorator::future omap_get_header( + CollectionRef, + const ghobject_t&) final; + seastar::future> fiemap( + CollectionRef, + const ghobject_t&, + uint64_t off, + uint64_t len) final; + seastar::future get_omap_iterator( + CollectionRef ch, + const ghobject_t& oid) final; + +private: + constexpr static unsigned MAX_KEYS_PER_OMAP_GET_CALL = 32; + mutable std::unique_ptr tp; + const std::string path; + uint64_t used_bytes = 0; + std::unique_ptr store; + std::unique_ptr cct; + seastar::gate transaction_gate; + std::unordered_map coll_map; + seastar::shared_mutex tp_mutex; +}; +} diff --git a/src/crimson/os/alienstore/thread_pool.cc b/src/crimson/os/alienstore/thread_pool.cc new file mode 100644 index 000000000..e127d87d5 --- /dev/null +++ b/src/crimson/os/alienstore/thread_pool.cc @@ -0,0 +1,80 @@ +#include "thread_pool.h" + +#include +#include + +#include "include/ceph_assert.h" +#include "crimson/common/config_proxy.h" + +using crimson::common::local_conf; + +namespace crimson::os { + +ThreadPool::ThreadPool(size_t n_threads, + size_t queue_sz, + long cpu_id) + : queue_size{round_up_to(queue_sz, seastar::smp::count)}, + pending{queue_size} +{ + auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait); + for (size_t i = 0; i < n_threads; i++) { + threads.emplace_back([this, cpu_id, queue_max_wait] { + if (cpu_id >= 0) { + pin(cpu_id); + } + loop(queue_max_wait); + }); + } +} + +ThreadPool::~ThreadPool() +{ + for (auto& thread : threads) { + thread.join(); + } +} + +void ThreadPool::pin(unsigned cpu_id) +{ + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpu_id, &cs); + [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(), + sizeof(cs), &cs); + ceph_assert(r == 0); +} + +void ThreadPool::loop(std::chrono::milliseconds queue_max_wait) +{ + for (;;) { + WorkItem* work_item = nullptr; + { + std::unique_lock lock{mutex}; + cond.wait_for(lock, queue_max_wait, + [this, &work_item] { + return pending.pop(work_item) || is_stopping(); + }); + } + if (work_item) { + work_item->process(); + } else if (is_stopping()) { + break; + } + } +} + +seastar::future<> ThreadPool::start() +{ + auto slots_per_shard = queue_size / seastar::smp::count; + return submit_queue.start(slots_per_shard); +} + +seastar::future<> ThreadPool::stop() +{ + return submit_queue.stop().then([this] { + stopping = true; + cond.notify_all(); + }); +} + +} // namespace crimson::os diff --git a/src/crimson/os/alienstore/thread_pool.h b/src/crimson/os/alienstore/thread_pool.h new file mode 100644 index 000000000..27840da18 --- /dev/null +++ b/src/crimson/os/alienstore/thread_pool.h @@ -0,0 +1,132 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace crimson::os { + +struct WorkItem { + virtual ~WorkItem() {} + virtual void process() = 0; +}; + +template +struct Task final : WorkItem { + using T = std::invoke_result_t; + using future_stored_type_t = + std::conditional_t, + seastar::internal::future_stored_type_t<>, + seastar::internal::future_stored_type_t>; + using futurator_t = seastar::futurize; +public: + explicit Task(Func&& f) + : func(std::move(f)) + {} + void process() override { + try { + if constexpr (std::is_void_v) { + func(); + state.set(); + } else { + state.set(func()); + } + } catch (...) { + state.set_exception(std::current_exception()); + } + on_done.write_side().signal(1); + } + typename futurator_t::type get_future() { + return on_done.wait().then([this](size_t) { + if (state.failed()) { + return futurator_t::make_exception_future(state.get_exception()); + } else { + return futurator_t::from_tuple(state.get_value()); + } + }); + } +private: + Func func; + seastar::future_state state; + seastar::readable_eventfd on_done; +}; + +struct SubmitQueue { + seastar::semaphore free_slots; + seastar::gate pending_tasks; + explicit SubmitQueue(size_t num_free_slots) + : free_slots(num_free_slots) + {} + seastar::future<> stop() { + return pending_tasks.close(); + } +}; + +/// an engine for scheduling non-seastar tasks from seastar fibers +class ThreadPool { + std::atomic stopping = false; + std::mutex mutex; + std::condition_variable cond; + std::vector threads; + seastar::sharded submit_queue; + const size_t queue_size; + boost::lockfree::queue pending; + + void loop(std::chrono::milliseconds queue_max_wait); + bool is_stopping() const { + return stopping.load(std::memory_order_relaxed); + } + static void pin(unsigned cpu_id); + seastar::semaphore& local_free_slots() { + return submit_queue.local().free_slots; + } + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; +public: + /** + * @param queue_sz the depth of pending queue. before a task is scheduled, + * it waits in this queue. we will round this number to + * multiple of the number of cores. + * @param n_threads the number of threads in this thread pool. + * @param cpu the CPU core to which this thread pool is assigned + * @note each @c Task has its own crimson::thread::Condition, which possesses + * an fd, so we should keep the size of queue under a reasonable limit. + */ + ThreadPool(size_t n_threads, size_t queue_sz, long cpu); + ~ThreadPool(); + seastar::future<> start(); + seastar::future<> stop(); + template + auto submit(Func&& func, Args&&... args) { + auto packaged = [func=std::move(func), + args=std::forward_as_tuple(args...)] { + return std::apply(std::move(func), std::move(args)); + }; + return seastar::with_gate(submit_queue.local().pending_tasks, + [packaged=std::move(packaged), this] { + return local_free_slots().wait() + .then([packaged=std::move(packaged), this] { + auto task = new Task{std::move(packaged)}; + auto fut = task->get_future(); + pending.push(task); + cond.notify_one(); + return fut.finally([task, this] { + local_free_slots().signal(); + delete task; + }); + }); + }); + } +}; + +} // namespace crimson::os -- cgit v1.2.3