summaryrefslogtreecommitdiffstats
path: root/src/crimson/os/alienstore
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/crimson/os/alienstore
parentInitial commit. (diff)
downloadceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.tar.xz
ceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/os/alienstore')
-rw-r--r--src/crimson/os/alienstore/CMakeLists.txt76
-rw-r--r--src/crimson/os/alienstore/alien_collection.h26
-rw-r--r--src/crimson/os/alienstore/alien_store.cc575
-rw-r--r--src/crimson/os/alienstore/alien_store.h125
-rw-r--r--src/crimson/os/alienstore/thread_pool.cc80
-rw-r--r--src/crimson/os/alienstore/thread_pool.h132
6 files changed, 1014 insertions, 0 deletions
diff --git a/src/crimson/os/alienstore/CMakeLists.txt b/src/crimson/os/alienstore/CMakeLists.txt
new file mode 100644
index 000000000..659a3c6ce
--- /dev/null
+++ b/src/crimson/os/alienstore/CMakeLists.txt
@@ -0,0 +1,76 @@
+include_directories(SYSTEM "${CMAKE_SOURCE_DIR}/src/rocksdb/include")
+
+add_library(alien::cflags INTERFACE IMPORTED)
+set_target_properties(alien::cflags PROPERTIES
+ INTERFACE_COMPILE_DEFINITIONS "WITH_SEASTAR;WITH_ALIEN"
+ INTERFACE_INCLUDE_DIRECTORIES $<TARGET_PROPERTY:Seastar::seastar,INTERFACE_INCLUDE_DIRECTORIES>)
+
+add_library(crimson-alien-common STATIC
+ ${PROJECT_SOURCE_DIR}/src/common/admin_socket.cc
+ ${PROJECT_SOURCE_DIR}/src/common/blkdev.cc
+ ${PROJECT_SOURCE_DIR}/src/common/ceph_context.cc
+ ${PROJECT_SOURCE_DIR}/src/common/ceph_crypto.cc
+ ${PROJECT_SOURCE_DIR}/src/common/condition_variable_debug.cc
+ ${PROJECT_SOURCE_DIR}/src/common/cmdparse.cc
+ ${PROJECT_SOURCE_DIR}/src/common/Finisher.cc
+ ${PROJECT_SOURCE_DIR}/src/common/HeartbeatMap.cc
+ ${PROJECT_SOURCE_DIR}/src/common/PluginRegistry.cc
+ ${PROJECT_SOURCE_DIR}/src/common/lockdep.cc
+ ${PROJECT_SOURCE_DIR}/src/common/mutex_debug.cc
+ ${PROJECT_SOURCE_DIR}/src/common/perf_counters.cc
+ ${PROJECT_SOURCE_DIR}/src/common/perf_counters_collection.cc
+ ${PROJECT_SOURCE_DIR}/src/common/RefCountedObj.cc
+ ${PROJECT_SOURCE_DIR}/src/common/shared_mutex_debug.cc
+ ${PROJECT_SOURCE_DIR}/src/common/SubProcess.cc
+ ${PROJECT_SOURCE_DIR}/src/common/Throttle.cc
+ ${PROJECT_SOURCE_DIR}/src/common/Timer.cc
+ ${PROJECT_SOURCE_DIR}/src/common/TrackedOp.cc
+ ${PROJECT_SOURCE_DIR}/src/common/WorkQueue.cc
+ ${PROJECT_SOURCE_DIR}/src/common/util.cc
+ ${PROJECT_SOURCE_DIR}/src/crush/CrushLocation.cc
+ ${PROJECT_SOURCE_DIR}/src/global/global_context.cc
+ $<TARGET_OBJECTS:compressor_objs>
+ $<TARGET_OBJECTS:common_prioritycache_obj>)
+target_link_libraries(crimson-alien-common
+ crimson-common
+ alien::cflags)
+
+set(alien_store_srcs
+ alien_store.cc
+ thread_pool.cc
+ ${PROJECT_SOURCE_DIR}/src/os/ObjectStore.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/Allocator.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/AvlAllocator.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/BitmapFreelistManager.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueFS.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/bluefs_types.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueRocksEnv.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/BlueStore.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/bluestore_types.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/fastbmap_allocator_impl.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/FreelistManager.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/HybridAllocator.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/StupidAllocator.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/BitmapAllocator.cc)
+if(WITH_ZBD)
+ list(APPEND alien_store_srcs
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/zoned_types.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/ZonedFreelistManager.cc
+ ${PROJECT_SOURCE_DIR}/src/os/bluestore/ZonedAllocator.cc)
+endif()
+add_library(crimson-alienstore STATIC
+ ${alien_store_srcs})
+if(WITH_LTTNG)
+ add_dependencies(crimson-alienstore bluestore-tp)
+endif()
+target_link_libraries(crimson-alienstore
+ PRIVATE
+ alien::cflags
+ fmt::fmt
+ kv
+ heap_profiler
+ crimson-alien-common
+ ${BLKID_LIBRARIES}
+ ${UDEV_LIBRARIES}
+ crimson
+ blk)
diff --git a/src/crimson/os/alienstore/alien_collection.h b/src/crimson/os/alienstore/alien_collection.h
new file mode 100644
index 000000000..98b8fdef4
--- /dev/null
+++ b/src/crimson/os/alienstore/alien_collection.h
@@ -0,0 +1,26 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "os/ObjectStore.h"
+
+#include "crimson/os/futurized_collection.h"
+#include "crimson/os/futurized_store.h"
+#include "alien_store.h"
+
+namespace crimson::os {
+
+class AlienCollection final : public FuturizedCollection {
+public:
+ AlienCollection(ObjectStore::CollectionHandle ch)
+ : FuturizedCollection(ch->cid),
+ collection(ch) {}
+
+ ~AlienCollection() {}
+
+private:
+ ObjectStore::CollectionHandle collection;
+ friend AlienStore;
+};
+}
diff --git a/src/crimson/os/alienstore/alien_store.cc b/src/crimson/os/alienstore/alien_store.cc
new file mode 100644
index 000000000..cb5553254
--- /dev/null
+++ b/src/crimson/os/alienstore/alien_store.cc
@@ -0,0 +1,575 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "alien_collection.h"
+#include "alien_store.h"
+
+#include <map>
+#include <string_view>
+#include <boost/algorithm/string/trim.hpp>
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include <seastar/core/alien.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+
+#include "common/ceph_context.h"
+#include "global/global_context.h"
+#include "include/Context.h"
+#include "os/bluestore/BlueStore.h"
+#include "os/ObjectStore.h"
+#include "os/Transaction.h"
+
+#include "crimson/common/log.h"
+#include "crimson/os/futurized_store.h"
+
+namespace {
+ seastar::logger& logger()
+ {
+ return crimson::get_logger(ceph_subsys_filestore);
+ }
+
+class OnCommit final: public Context
+{
+ int cpuid;
+ Context *oncommit;
+ seastar::promise<> &alien_done;
+public:
+ OnCommit(
+ int id,
+ seastar::promise<> &done,
+ Context *oncommit,
+ ceph::os::Transaction& txn)
+ : cpuid(id), oncommit(oncommit),
+ alien_done(done) {}
+
+ void finish(int) final {
+ return seastar::alien::submit_to(cpuid, [this] {
+ if (oncommit) oncommit->complete(0);
+ alien_done.set_value();
+ return seastar::make_ready_future<>();
+ }).wait();
+ }
+};
+}
+
+namespace crimson::os {
+
+AlienStore::AlienStore(const std::string& path, const ConfigValues& values)
+ : path{path}
+{
+ cct = std::make_unique<CephContext>(CEPH_ENTITY_TYPE_OSD);
+ g_ceph_context = cct.get();
+ cct->_conf.set_config_values(values);
+ store = std::make_unique<BlueStore>(cct.get(), path);
+
+ long cpu_id = 0;
+ if (long nr_cpus = sysconf(_SC_NPROCESSORS_ONLN); nr_cpus != -1) {
+ cpu_id = nr_cpus - 1;
+ } else {
+ logger().error("{}: unable to get nproc: {}", __func__, errno);
+ cpu_id = -1;
+ }
+ tp = std::make_unique<crimson::os::ThreadPool>(1, 128, cpu_id);
+}
+
+seastar::future<> AlienStore::start()
+{
+ return tp->start();
+}
+
+seastar::future<> AlienStore::stop()
+{
+ return tp->submit([this] {
+ for (auto [cid, ch]: coll_map)
+ static_cast<AlienCollection*>(ch.get())->collection.reset();
+ store.reset();
+ }).then([this] {
+ return tp->stop();
+ });
+}
+
+AlienStore::~AlienStore() = default;
+
+seastar::future<> AlienStore::mount()
+{
+ logger().debug("{}", __func__);
+ return tp->submit([this] {
+ return store->mount();
+ }).then([] (int r) {
+ assert(r == 0);
+ return seastar::now();
+ });
+}
+
+seastar::future<> AlienStore::umount()
+{
+ logger().info("{}", __func__);
+ return transaction_gate.close().then([this] {
+ return tp->submit([this] {
+ return store->umount();
+ });
+ }).then([] (int r) {
+ assert(r == 0);
+ return seastar::now();
+ });
+}
+
+seastar::future<> AlienStore::mkfs(uuid_d osd_fsid)
+{
+ logger().debug("{}", __func__);
+ store->set_fsid(osd_fsid);
+ return tp->submit([this] {
+ return store->mkfs();
+ }).then([] (int r) {
+ assert(r == 0);
+ return seastar::now();
+ });
+}
+
+seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
+AlienStore::list_objects(CollectionRef ch,
+ const ghobject_t& start,
+ const ghobject_t& end,
+ uint64_t limit) const
+{
+ logger().debug("{}", __func__);
+ return seastar::do_with(std::vector<ghobject_t>(), ghobject_t(),
+ [=] (auto &objects, auto &next) {
+ objects.reserve(limit);
+ return tp->submit([=, &objects, &next] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->collection_list(c->collection, start, end,
+ store->get_ideal_list_max(),
+ &objects, &next);
+ }).then([&objects, &next] (int r) {
+ assert(r == 0);
+ return seastar::make_ready_future<std::tuple<std::vector<ghobject_t>, ghobject_t>>(
+ std::make_tuple(std::move(objects), std::move(next)));
+ });
+ });
+}
+
+seastar::future<CollectionRef> AlienStore::create_new_collection(const coll_t& cid)
+{
+ logger().debug("{}", __func__);
+ return tp->submit([this, cid] {
+ return store->create_new_collection(cid);
+ }).then([this, cid] (ObjectStore::CollectionHandle c) {
+ CollectionRef ch;
+ auto cp = coll_map.find(c->cid);
+ if (cp == coll_map.end()) {
+ ch = new AlienCollection(c);
+ coll_map[c->cid] = ch;
+ } else {
+ ch = cp->second;
+ auto ach = static_cast<AlienCollection*>(ch.get());
+ if (ach->collection != c) {
+ ach->collection = c;
+ }
+ }
+ return seastar::make_ready_future<CollectionRef>(ch);
+ });
+
+}
+
+seastar::future<CollectionRef> AlienStore::open_collection(const coll_t& cid)
+{
+ logger().debug("{}", __func__);
+ return tp->submit([this, cid] {
+ return store->open_collection(cid);
+ }).then([this] (ObjectStore::CollectionHandle c) {
+ CollectionRef ch;
+ auto cp = coll_map.find(c->cid);
+ if (cp == coll_map.end()){
+ ch = new AlienCollection(c);
+ coll_map[c->cid] = ch;
+ } else {
+ ch = cp->second;
+ auto ach = static_cast<AlienCollection*>(ch.get());
+ if (ach->collection != c){
+ ach->collection = c;
+ }
+ }
+ return seastar::make_ready_future<CollectionRef>(ch);
+ });
+}
+
+seastar::future<std::vector<coll_t>> AlienStore::list_collections()
+{
+ logger().debug("{}", __func__);
+
+ return seastar::do_with(std::vector<coll_t>{}, [=] (auto &ls) {
+ return tp->submit([this, &ls] {
+ return store->list_collections(ls);
+ }).then([&ls] (int r) {
+ assert(r == 0);
+ return seastar::make_ready_future<std::vector<coll_t>>(std::move(ls));
+ });
+ });
+}
+
+AlienStore::read_errorator::future<ceph::bufferlist>
+AlienStore::read(CollectionRef ch,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ uint32_t op_flags)
+{
+ logger().debug("{}", __func__);
+ return seastar::do_with(ceph::bufferlist{}, [=] (auto &bl) {
+ return tp->submit([=, &bl] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->read(c->collection, oid, offset, len, bl, op_flags);
+ }).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
+ if (r == -ENOENT) {
+ return crimson::ct_error::enoent::make();
+ } else if (r == -EIO) {
+ return crimson::ct_error::input_output_error::make();
+ } else {
+ return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
+ }
+ });
+ });
+}
+
+AlienStore::read_errorator::future<ceph::bufferlist>
+AlienStore::readv(CollectionRef ch,
+ const ghobject_t& oid,
+ interval_set<uint64_t>& m,
+ uint32_t op_flags)
+{
+ logger().debug("{}", __func__);
+ return seastar::do_with(ceph::bufferlist{},
+ [this, ch, oid, &m, op_flags](auto& bl) {
+ return tp->submit([this, ch, oid, &m, op_flags, &bl] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->readv(c->collection, oid, m, bl, op_flags);
+ }).then([&bl](int r) -> read_errorator::future<ceph::bufferlist> {
+ if (r == -ENOENT) {
+ return crimson::ct_error::enoent::make();
+ } else if (r == -EIO) {
+ return crimson::ct_error::input_output_error::make();
+ } else {
+ return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
+ }
+ });
+ });
+}
+
+AlienStore::get_attr_errorator::future<ceph::bufferptr>
+AlienStore::get_attr(CollectionRef ch,
+ const ghobject_t& oid,
+ std::string_view name) const
+{
+ logger().debug("{}", __func__);
+ return seastar::do_with(ceph::bufferptr{}, [=] (auto &value) {
+ return tp->submit([=, &value] {
+ auto c =static_cast<AlienCollection*>(ch.get());
+ return store->getattr(c->collection, oid,
+ static_cast<std::string>(name).c_str(), value);
+ }).then([oid, &value] (int r) -> get_attr_errorator::future<ceph::bufferptr> {
+ if (r == -ENOENT) {
+ return crimson::ct_error::enoent::make();
+ } else if (r == -ENODATA) {
+ return crimson::ct_error::enodata::make();
+ } else {
+ return get_attr_errorator::make_ready_future<ceph::bufferptr>(
+ std::move(value));
+ }
+ });
+ });
+}
+
+AlienStore::get_attrs_ertr::future<AlienStore::attrs_t>
+AlienStore::get_attrs(CollectionRef ch,
+ const ghobject_t& oid)
+{
+ logger().debug("{}", __func__);
+ return seastar::do_with(attrs_t{}, [=] (auto &aset) {
+ return tp->submit([=, &aset] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->getattrs(c->collection, oid,
+ reinterpret_cast<map<string,bufferptr>&>(aset));
+ }).then([&aset] (int r) -> get_attrs_ertr::future<attrs_t> {
+ if (r == -ENOENT) {
+ return crimson::ct_error::enoent::make();
+ } else {
+ return get_attrs_ertr::make_ready_future<attrs_t>(std::move(aset));
+ }
+ });
+ });
+}
+
+auto AlienStore::omap_get_values(CollectionRef ch,
+ const ghobject_t& oid,
+ const set<string>& keys)
+ -> read_errorator::future<omap_values_t>
+{
+ logger().debug("{}", __func__);
+ return seastar::do_with(omap_values_t{}, [=] (auto &values) {
+ return tp->submit([=, &values] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->omap_get_values(c->collection, oid, keys,
+ reinterpret_cast<map<string, bufferlist>*>(&values));
+ }).then([&values] (int r) -> read_errorator::future<omap_values_t> {
+ if (r == -ENOENT) {
+ return crimson::ct_error::enoent::make();
+ } else {
+ assert(r == 0);
+ return read_errorator::make_ready_future<omap_values_t>(std::move(values));
+ }
+ });
+ });
+}
+
+auto AlienStore::omap_get_values(CollectionRef ch,
+ const ghobject_t &oid,
+ const std::optional<string> &start)
+ -> read_errorator::future<std::tuple<bool, omap_values_t>>
+{
+ logger().debug("{} with_start", __func__);
+ return seastar::do_with(omap_values_t{}, [=] (auto &values) {
+ return tp->submit([=, &values] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->omap_get_values(c->collection, oid, start,
+ reinterpret_cast<map<string, bufferlist>*>(&values));
+ }).then([&values] (int r)
+ -> read_errorator::future<std::tuple<bool, omap_values_t>> {
+ if (r == -ENOENT) {
+ return crimson::ct_error::enoent::make();
+ } else if (r < 0){
+ logger().error("omap_get_values(start): {}", r);
+ return crimson::ct_error::input_output_error::make();
+ } else {
+ return read_errorator::make_ready_future<std::tuple<bool, omap_values_t>>(
+ std::make_tuple(true, std::move(values)));
+ }
+ });
+ });
+}
+
+seastar::future<> AlienStore::do_transaction(CollectionRef ch,
+ ceph::os::Transaction&& txn)
+{
+ logger().debug("{}", __func__);
+ auto id = seastar::this_shard_id();
+ auto done = seastar::promise<>();
+ return seastar::do_with(
+ std::move(txn),
+ std::move(done),
+ [this, ch, id] (auto &txn, auto &done) {
+ return seastar::with_gate(transaction_gate, [this, ch, id, &txn, &done] {
+ return tp_mutex.lock().then ([this, ch, id, &txn, &done] {
+ Context *crimson_wrapper =
+ ceph::os::Transaction::collect_all_contexts(txn);
+ return tp->submit([this, ch, id, crimson_wrapper, &txn, &done] {
+ txn.register_on_commit(new OnCommit(id, done, crimson_wrapper, txn));
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->queue_transaction(c->collection, std::move(txn));
+ });
+ }).then([this, &done] (int r) {
+ assert(r == 0);
+ tp_mutex.unlock();
+ return done.get_future();
+ });
+ });
+ });
+}
+
+seastar::future<> AlienStore::write_meta(const std::string& key,
+ const std::string& value)
+{
+ logger().debug("{}", __func__);
+ return tp->submit([=] {
+ return store->write_meta(key, value);
+ }).then([] (int r) {
+ assert(r == 0);
+ return seastar::make_ready_future<>();
+ });
+}
+
+seastar::future<std::tuple<int, std::string>>
+AlienStore::read_meta(const std::string& key)
+{
+ logger().debug("{}", __func__);
+ return tp->submit([this, key] {
+ std::string value;
+ int r = store->read_meta(key, &value);
+ if (r > 0) {
+ value.resize(r);
+ boost::algorithm::trim_right_if(value,
+ [] (unsigned char c) {return isspace(c);});
+ } else {
+ value.clear();
+ }
+ return std::make_pair(r, value);
+ }).then([] (auto entry) {
+ return seastar::make_ready_future<std::tuple<int, std::string>>(
+ std::move(entry));
+ });
+}
+
+uuid_d AlienStore::get_fsid() const
+{
+ logger().debug("{}", __func__);
+ return store->get_fsid();
+}
+
+seastar::future<store_statfs_t> AlienStore::stat() const
+{
+ logger().info("{}", __func__);
+ return seastar::do_with(store_statfs_t{}, [this] (store_statfs_t &st) {
+ return tp->submit([this, &st] {
+ return store->statfs(&st, nullptr);
+ }).then([&st] (int r) {
+ assert(r == 0);
+ return seastar::make_ready_future<store_statfs_t>(std::move(st));
+ });
+ });
+}
+
+unsigned AlienStore::get_max_attr_name_length() const
+{
+ logger().info("{}", __func__);
+ return 256;
+}
+
+seastar::future<struct stat> AlienStore::stat(
+ CollectionRef ch,
+ const ghobject_t& oid)
+{
+ return seastar::do_with((struct stat){}, [this, ch, oid](auto& st) {
+ return tp->submit([this, ch, oid, &st] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ store->stat(c->collection, oid, &st);
+ return st;
+ });
+ });
+}
+
+auto AlienStore::omap_get_header(CollectionRef ch,
+ const ghobject_t& oid)
+ -> read_errorator::future<ceph::bufferlist>
+{
+ return seastar::do_with(ceph::bufferlist(), [=](auto& bl) {
+ return tp->submit([=, &bl] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->omap_get_header(c->collection, oid, &bl);
+ }).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
+ if (r == -ENOENT) {
+ return crimson::ct_error::enoent::make();
+ } else if (r < 0) {
+ logger().error("omap_get_header: {}", r);
+ return crimson::ct_error::input_output_error::make();
+ } else {
+ return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
+ }
+ });
+ });
+}
+
+seastar::future<std::map<uint64_t, uint64_t>> AlienStore::fiemap(
+ CollectionRef ch,
+ const ghobject_t& oid,
+ uint64_t off,
+ uint64_t len)
+{
+ return seastar::do_with(std::map<uint64_t, uint64_t>(), [=](auto& destmap) {
+ return tp->submit([=, &destmap] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->fiemap(c->collection, oid, off, len, destmap);
+ }).then([&destmap] (int i) {
+ return seastar::make_ready_future
+ <std::map<uint64_t, uint64_t>>
+ (std::move(destmap));
+ });
+ });
+}
+
+seastar::future<FuturizedStore::OmapIteratorRef> AlienStore::get_omap_iterator(
+ CollectionRef ch,
+ const ghobject_t& oid)
+{
+ return tp->submit([=] {
+ auto c = static_cast<AlienCollection*>(ch.get());
+ auto iter = store->get_omap_iterator(c->collection, oid);
+ return FuturizedStore::OmapIteratorRef(
+ new AlienStore::AlienOmapIterator(iter,
+ this));
+ });
+}
+
+//TODO: each iterator op needs one submit, this is not efficient,
+// needs further optimization.
+seastar::future<> AlienStore::AlienOmapIterator::seek_to_first()
+{
+ return store->tp->submit([=] {
+ return iter->seek_to_first();
+ }).then([] (int r) {
+ assert(r == 0);
+ return seastar::now();
+ });
+}
+
+seastar::future<> AlienStore::AlienOmapIterator::upper_bound(
+ const std::string& after)
+{
+ return store->tp->submit([this, after] {
+ return iter->upper_bound(after);
+ }).then([] (int r) {
+ assert(r == 0);
+ return seastar::now();
+ });
+}
+
+seastar::future<> AlienStore::AlienOmapIterator::lower_bound(
+ const std::string& to)
+{
+ return store->tp->submit([this, to] {
+ return iter->lower_bound(to);
+ }).then([] (int r) {
+ assert(r == 0);
+ return seastar::now();
+ });
+}
+
+seastar::future<> AlienStore::AlienOmapIterator::next()
+{
+ return store->tp->submit([this] {
+ return iter->next();
+ }).then([] (int r) {
+ assert(r == 0);
+ return seastar::now();
+ });
+}
+
+bool AlienStore::AlienOmapIterator::valid() const
+{
+ return iter->valid();
+}
+
+std::string AlienStore::AlienOmapIterator::key()
+{
+ return iter->key();
+}
+
+seastar::future<std::string> AlienStore::AlienOmapIterator::tail_key()
+{
+ return store->tp->submit([this] {
+ return iter->tail_key();
+ });
+}
+
+ceph::buffer::list AlienStore::AlienOmapIterator::value()
+{
+ return iter->value();
+}
+
+int AlienStore::AlienOmapIterator::status() const
+{
+ return iter->status();
+}
+
+}
diff --git a/src/crimson/os/alienstore/alien_store.h b/src/crimson/os/alienstore/alien_store.h
new file mode 100644
index 000000000..92739340e
--- /dev/null
+++ b/src/crimson/os/alienstore/alien_store.h
@@ -0,0 +1,125 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_mutex.hh>
+
+#include "common/ceph_context.h"
+#include "os/ObjectStore.h"
+#include "osd/osd_types.h"
+
+#include "crimson/os/alienstore/thread_pool.h"
+#include "crimson/os/futurized_collection.h"
+#include "crimson/os/futurized_store.h"
+
+namespace ceph::os {
+class Transaction;
+}
+
+namespace crimson::os {
+class AlienStore final : public FuturizedStore {
+public:
+ class AlienOmapIterator final : public OmapIterator {
+ public:
+ AlienOmapIterator(ObjectMap::ObjectMapIterator& it,
+ AlienStore* store) : iter(it), store(store) {}
+ seastar::future<> seek_to_first();
+ seastar::future<> upper_bound(const std::string& after);
+ seastar::future<> lower_bound(const std::string& to);
+ bool valid() const;
+ seastar::future<> next();
+ std::string key();
+ seastar::future<std::string> tail_key();
+ ceph::buffer::list value();
+ int status() const;
+ private:
+ ObjectMap::ObjectMapIterator iter;
+ AlienStore* store;
+ };
+ AlienStore(const std::string& path, const ConfigValues& values);
+ ~AlienStore() final;
+
+ seastar::future<> start() final;
+ seastar::future<> stop() final;
+ seastar::future<> mount() final;
+ seastar::future<> umount() final;
+
+ seastar::future<> mkfs(uuid_d new_osd_fsid) final;
+ read_errorator::future<ceph::bufferlist> read(CollectionRef c,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ uint32_t op_flags = 0) final;
+ read_errorator::future<ceph::bufferlist> readv(CollectionRef c,
+ const ghobject_t& oid,
+ interval_set<uint64_t>& m,
+ uint32_t op_flags = 0) final;
+
+
+ get_attr_errorator::future<ceph::bufferptr> get_attr(CollectionRef c,
+ const ghobject_t& oid,
+ std::string_view name) const final;
+ get_attrs_ertr::future<attrs_t> get_attrs(CollectionRef c,
+ const ghobject_t& oid) final;
+
+ read_errorator::future<omap_values_t> omap_get_values(
+ CollectionRef c,
+ const ghobject_t& oid,
+ const omap_keys_t& keys) final;
+
+ /// Retrieves paged set of values > start (if present)
+ read_errorator::future<std::tuple<bool, omap_values_t>> omap_get_values(
+ CollectionRef c, ///< [in] collection
+ const ghobject_t &oid, ///< [in] oid
+ const std::optional<std::string> &start ///< [in] start, empty for begin
+ ) final; ///< @return <done, values> values.empty() iff done
+
+ seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>> list_objects(
+ CollectionRef c,
+ const ghobject_t& start,
+ const ghobject_t& end,
+ uint64_t limit) const final;
+
+ seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
+ seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
+ seastar::future<std::vector<coll_t>> list_collections() final;
+
+ seastar::future<> do_transaction(CollectionRef c,
+ ceph::os::Transaction&& txn) final;
+
+ seastar::future<> write_meta(const std::string& key,
+ const std::string& value) final;
+ seastar::future<std::tuple<int, std::string>> read_meta(
+ const std::string& key) final;
+ uuid_d get_fsid() const final;
+ seastar::future<store_statfs_t> stat() const final;
+ unsigned get_max_attr_name_length() const final;
+ seastar::future<struct stat> stat(
+ CollectionRef,
+ const ghobject_t&) final;
+ read_errorator::future<ceph::bufferlist> omap_get_header(
+ CollectionRef,
+ const ghobject_t&) final;
+ seastar::future<std::map<uint64_t, uint64_t>> fiemap(
+ CollectionRef,
+ const ghobject_t&,
+ uint64_t off,
+ uint64_t len) final;
+ seastar::future<FuturizedStore::OmapIteratorRef> get_omap_iterator(
+ CollectionRef ch,
+ const ghobject_t& oid) final;
+
+private:
+ constexpr static unsigned MAX_KEYS_PER_OMAP_GET_CALL = 32;
+ mutable std::unique_ptr<crimson::os::ThreadPool> tp;
+ const std::string path;
+ uint64_t used_bytes = 0;
+ std::unique_ptr<ObjectStore> store;
+ std::unique_ptr<CephContext> cct;
+ seastar::gate transaction_gate;
+ std::unordered_map<coll_t, CollectionRef> coll_map;
+ seastar::shared_mutex tp_mutex;
+};
+}
diff --git a/src/crimson/os/alienstore/thread_pool.cc b/src/crimson/os/alienstore/thread_pool.cc
new file mode 100644
index 000000000..e127d87d5
--- /dev/null
+++ b/src/crimson/os/alienstore/thread_pool.cc
@@ -0,0 +1,80 @@
+#include "thread_pool.h"
+
+#include <chrono>
+#include <pthread.h>
+
+#include "include/ceph_assert.h"
+#include "crimson/common/config_proxy.h"
+
+using crimson::common::local_conf;
+
+namespace crimson::os {
+
+ThreadPool::ThreadPool(size_t n_threads,
+ size_t queue_sz,
+ long cpu_id)
+ : queue_size{round_up_to(queue_sz, seastar::smp::count)},
+ pending{queue_size}
+{
+ auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
+ for (size_t i = 0; i < n_threads; i++) {
+ threads.emplace_back([this, cpu_id, queue_max_wait] {
+ if (cpu_id >= 0) {
+ pin(cpu_id);
+ }
+ loop(queue_max_wait);
+ });
+ }
+}
+
+ThreadPool::~ThreadPool()
+{
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+void ThreadPool::pin(unsigned cpu_id)
+{
+ cpu_set_t cs;
+ CPU_ZERO(&cs);
+ CPU_SET(cpu_id, &cs);
+ [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
+ sizeof(cs), &cs);
+ ceph_assert(r == 0);
+}
+
+void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
+{
+ for (;;) {
+ WorkItem* work_item = nullptr;
+ {
+ std::unique_lock lock{mutex};
+ cond.wait_for(lock, queue_max_wait,
+ [this, &work_item] {
+ return pending.pop(work_item) || is_stopping();
+ });
+ }
+ if (work_item) {
+ work_item->process();
+ } else if (is_stopping()) {
+ break;
+ }
+ }
+}
+
+seastar::future<> ThreadPool::start()
+{
+ auto slots_per_shard = queue_size / seastar::smp::count;
+ return submit_queue.start(slots_per_shard);
+}
+
+seastar::future<> ThreadPool::stop()
+{
+ return submit_queue.stop().then([this] {
+ stopping = true;
+ cond.notify_all();
+ });
+}
+
+} // namespace crimson::os
diff --git a/src/crimson/os/alienstore/thread_pool.h b/src/crimson/os/alienstore/thread_pool.h
new file mode 100644
index 000000000..27840da18
--- /dev/null
+++ b/src/crimson/os/alienstore/thread_pool.h
@@ -0,0 +1,132 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <tuple>
+#include <type_traits>
+#include <boost/lockfree/queue.hpp>
+#include <boost/optional.hpp>
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/sharded.hh>
+
+namespace crimson::os {
+
+struct WorkItem {
+ virtual ~WorkItem() {}
+ virtual void process() = 0;
+};
+
+template<typename Func>
+struct Task final : WorkItem {
+ using T = std::invoke_result_t<Func>;
+ using future_stored_type_t =
+ std::conditional_t<std::is_void_v<T>,
+ seastar::internal::future_stored_type_t<>,
+ seastar::internal::future_stored_type_t<T>>;
+ using futurator_t = seastar::futurize<T>;
+public:
+ explicit Task(Func&& f)
+ : func(std::move(f))
+ {}
+ void process() override {
+ try {
+ if constexpr (std::is_void_v<T>) {
+ func();
+ state.set();
+ } else {
+ state.set(func());
+ }
+ } catch (...) {
+ state.set_exception(std::current_exception());
+ }
+ on_done.write_side().signal(1);
+ }
+ typename futurator_t::type get_future() {
+ return on_done.wait().then([this](size_t) {
+ if (state.failed()) {
+ return futurator_t::make_exception_future(state.get_exception());
+ } else {
+ return futurator_t::from_tuple(state.get_value());
+ }
+ });
+ }
+private:
+ Func func;
+ seastar::future_state<future_stored_type_t> state;
+ seastar::readable_eventfd on_done;
+};
+
+struct SubmitQueue {
+ seastar::semaphore free_slots;
+ seastar::gate pending_tasks;
+ explicit SubmitQueue(size_t num_free_slots)
+ : free_slots(num_free_slots)
+ {}
+ seastar::future<> stop() {
+ return pending_tasks.close();
+ }
+};
+
+/// an engine for scheduling non-seastar tasks from seastar fibers
+class ThreadPool {
+ std::atomic<bool> stopping = false;
+ std::mutex mutex;
+ std::condition_variable cond;
+ std::vector<std::thread> threads;
+ seastar::sharded<SubmitQueue> submit_queue;
+ const size_t queue_size;
+ boost::lockfree::queue<WorkItem*> pending;
+
+ void loop(std::chrono::milliseconds queue_max_wait);
+ bool is_stopping() const {
+ return stopping.load(std::memory_order_relaxed);
+ }
+ static void pin(unsigned cpu_id);
+ seastar::semaphore& local_free_slots() {
+ return submit_queue.local().free_slots;
+ }
+ ThreadPool(const ThreadPool&) = delete;
+ ThreadPool& operator=(const ThreadPool&) = delete;
+public:
+ /**
+ * @param queue_sz the depth of pending queue. before a task is scheduled,
+ * it waits in this queue. we will round this number to
+ * multiple of the number of cores.
+ * @param n_threads the number of threads in this thread pool.
+ * @param cpu the CPU core to which this thread pool is assigned
+ * @note each @c Task has its own crimson::thread::Condition, which possesses
+ * an fd, so we should keep the size of queue under a reasonable limit.
+ */
+ ThreadPool(size_t n_threads, size_t queue_sz, long cpu);
+ ~ThreadPool();
+ seastar::future<> start();
+ seastar::future<> stop();
+ template<typename Func, typename...Args>
+ auto submit(Func&& func, Args&&... args) {
+ auto packaged = [func=std::move(func),
+ args=std::forward_as_tuple(args...)] {
+ return std::apply(std::move(func), std::move(args));
+ };
+ return seastar::with_gate(submit_queue.local().pending_tasks,
+ [packaged=std::move(packaged), this] {
+ return local_free_slots().wait()
+ .then([packaged=std::move(packaged), this] {
+ auto task = new Task{std::move(packaged)};
+ auto fut = task->get_future();
+ pending.push(task);
+ cond.notify_one();
+ return fut.finally([task, this] {
+ local_free_slots().signal();
+ delete task;
+ });
+ });
+ });
+ }
+};
+
+} // namespace crimson::os