diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/crimson/tools/store_nbd | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/crimson/tools/store_nbd')
-rw-r--r-- | src/crimson/tools/store_nbd/block_driver.cc | 19 | ||||
-rw-r--r-- | src/crimson/tools/store_nbd/block_driver.h | 134 | ||||
-rw-r--r-- | src/crimson/tools/store_nbd/fs_driver.cc | 310 | ||||
-rw-r--r-- | src/crimson/tools/store_nbd/fs_driver.h | 72 | ||||
-rw-r--r-- | src/crimson/tools/store_nbd/store-nbd.cc | 456 | ||||
-rw-r--r-- | src/crimson/tools/store_nbd/tm_driver.cc | 222 | ||||
-rw-r--r-- | src/crimson/tools/store_nbd/tm_driver.h | 56 |
7 files changed, 1269 insertions, 0 deletions
diff --git a/src/crimson/tools/store_nbd/block_driver.cc b/src/crimson/tools/store_nbd/block_driver.cc new file mode 100644 index 000000000..10e77a34b --- /dev/null +++ b/src/crimson/tools/store_nbd/block_driver.cc @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "fs_driver.h" +#include "block_driver.h" + +#include "tm_driver.h" + +BlockDriverRef get_backend(BlockDriver::config_t config) +{ + if (config.type == "transaction_manager") { + return std::make_unique<TMDriver>(config); + } else if (config.is_futurized_store()) { + return std::make_unique<FSDriver>(config); + } else { + ceph_assert(0 == "invalid option"); + return BlockDriverRef(); + } +} diff --git a/src/crimson/tools/store_nbd/block_driver.h b/src/crimson/tools/store_nbd/block_driver.h new file mode 100644 index 000000000..ea3453ef3 --- /dev/null +++ b/src/crimson/tools/store_nbd/block_driver.h @@ -0,0 +1,134 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include <boost/program_options/variables_map.hpp> +#include <boost/program_options/parsers.hpp> + +#include <seastar/core/future.hh> + +#include <string> +#include <optional> + +#include "include/buffer.h" + +/** + * BlockDriver + * + * Simple interface to enable throughput test to compare raw disk to + * transaction_manager, etc + */ +class BlockDriver { +public: + struct config_t { + std::string type; + bool mkfs = false; + unsigned num_pgs = 128; + unsigned log_size = 1000; + unsigned object_size = 4<<20 /* 4MB, rbd default */; + unsigned oi_size = 1<<9 /* 512b */; + unsigned log_entry_size = 1<<9 /* 512b */; + bool prepopulate_log = false; + std::optional<std::string> path; + + bool is_futurized_store() const { + return type == "seastore" || type == "bluestore"; + } + + std::string get_fs_type() const { + ceph_assert(is_futurized_store()); + return type; + } + + bool oi_enabled() const { + return oi_size > 0; + } + + bool log_enabled() const { + return log_entry_size > 0 && log_size > 0; + } + + bool prepopulate_log_enabled() const { + return prepopulate_log; + } + + void populate_options( + boost::program_options::options_description &desc) + { + namespace po = boost::program_options; + desc.add_options() + ("type", + po::value<std::string>() + ->default_value("transaction_manager") + ->notifier([this](auto s) { type = s; }), + "Backend to use, options are transaction_manager, seastore" + ) + ("device-path", + po::value<std::string>() + ->required() + ->notifier([this](auto s) { path = s; }), + "Path to device for backend" + ) + ("num-pgs", + po::value<unsigned>() + ->notifier([this](auto s) { num_pgs = s; }), + "Number of pgs to use for futurized_store backends" + ) + ("log-size", + po::value<unsigned>() + ->notifier([this](auto s) { log_size = s; }), + "Number of log entries per pg to use for futurized_store backends" + ", 0 to disable" + ) + ("log-entry-size", + po::value<unsigned>() + ->notifier([this](auto s) { log_entry_size = s; }), + "Size of each log entry per pg to use for futurized_store backends" + ", 0 to disable" + ) + ("prepopulate-log", + po::value<bool>() + ->notifier([this](auto s) { prepopulate_log = s; }), + "Prepopulate log on mount" + ) + ("object-info-size", + po::value<unsigned>() + ->notifier([this](auto s) { log_entry_size = s; }), + "Size of each log entry per pg to use for futurized_store backends" + ", 0 to disable" + ) + ("object-size", + po::value<unsigned>() + ->notifier([this](auto s) { object_size = s; }), + "Object size to use for futurized_store backends" + ) + ("mkfs", + po::value<bool>() + ->default_value(false) + ->notifier([this](auto s) { mkfs = s; }), + "Do mkfs first" + ); + } + }; + + virtual ceph::bufferptr get_buffer(size_t size) = 0; + + virtual seastar::future<> write( + off_t offset, + ceph::bufferptr ptr) = 0; + + virtual seastar::future<ceph::bufferlist> read( + off_t offset, + size_t size) = 0; + + virtual size_t get_size() const = 0; + + virtual seastar::future<> mount() = 0; + virtual seastar::future<> close() = 0; + + virtual ~BlockDriver() {} +}; +using BlockDriverRef = std::unique_ptr<BlockDriver>; + +BlockDriverRef get_backend(BlockDriver::config_t config); diff --git a/src/crimson/tools/store_nbd/fs_driver.cc b/src/crimson/tools/store_nbd/fs_driver.cc new file mode 100644 index 000000000..18f836766 --- /dev/null +++ b/src/crimson/tools/store_nbd/fs_driver.cc @@ -0,0 +1,310 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <boost/iterator/counting_iterator.hpp> +#include <fmt/format.h> + +#include "os/Transaction.h" +#include "fs_driver.h" + +using namespace crimson; +using namespace crimson::os; + +coll_t get_coll(unsigned num) { + return coll_t(spg_t(pg_t(0, num))); +} + +ghobject_t get_log_object(unsigned coll) +{ + return ghobject_t( + shard_id_t::NO_SHARD, + 0, + (coll << 16), + "", + "", + 0, + ghobject_t::NO_GEN); +} + +std::string make_log_key( + unsigned i) +{ + return fmt::format("log_entry_{}", i); +} + +void add_log_entry( + unsigned i, + unsigned entry_size, + std::map<std::string, ceph::buffer::list> *omap) +{ + assert(omap); + bufferlist bl; + bl.append(ceph::buffer::create('0', entry_size)); + + omap->emplace(std::make_pair(make_log_key(i), bl)); +} + +void populate_log( + ceph::os::Transaction &t, + FSDriver::pg_analogue_t &pg, + unsigned entry_size, + unsigned entries) +{ + t.touch(pg.collection->get_cid(), pg.log_object); + // omap_clear not yet implemented, TODO + // t.omap_clear(pg.collection->get_cid(), pg.log_object); + + std::map<std::string, ceph::buffer::list> omap; + for (unsigned i = 0; i < entries; ++i) { + add_log_entry(i, entry_size, &omap); + } + + t.omap_setkeys( + pg.collection->get_cid(), + pg.log_object, + omap); + + pg.log_head = entries; +} + +void update_log( + ceph::os::Transaction &t, + FSDriver::pg_analogue_t &pg, + unsigned entry_size, + unsigned entries) +{ + ++pg.log_head; + std::map<std::string, ceph::buffer::list> key; + add_log_entry(pg.log_head, entry_size, &key); + + t.omap_setkeys( + pg.collection->get_cid(), + pg.log_object, + key); + + + while ((pg.log_head - pg.log_tail) > entries) { + t.omap_rmkey( + pg.collection->get_cid(), + pg.log_object, + make_log_key(pg.log_tail)); + ++pg.log_tail; + } +} + +FSDriver::offset_mapping_t FSDriver::map_offset(off_t offset) +{ + uint32_t objid = offset / config.object_size; + uint32_t collid = objid % config.num_pgs; + return offset_mapping_t{ + collections[collid], + ghobject_t( + shard_id_t::NO_SHARD, + 0, + (collid << 16) | (objid + 1), + "", + "", + 0, + ghobject_t::NO_GEN), + offset % config.object_size + }; +} + +seastar::future<> FSDriver::write( + off_t offset, + bufferptr ptr) +{ + auto mapping = map_offset(offset); + ceph_assert(mapping.offset + ptr.length() <= config.object_size); + ceph::os::Transaction t; + bufferlist bl; + bl.append(ptr); + t.write( + mapping.pg.collection->get_cid(), + mapping.object, + mapping.offset, + ptr.length(), + bl, + 0); + + if (config.oi_enabled() ) { + bufferlist attr; + attr.append(ceph::buffer::create(config.oi_size, '0')); + t.setattr( + mapping.pg.collection->get_cid(), + mapping.object, + "_", + attr); + } + + if (config.log_enabled()) { + update_log( + t, + mapping.pg, + config.log_entry_size, + config.log_size); + } + + return sharded_fs->do_transaction( + mapping.pg.collection, + std::move(t)); +} + +seastar::future<bufferlist> FSDriver::read( + off_t offset, + size_t size) +{ + auto mapping = map_offset(offset); + ceph_assert((mapping.offset + size) <= config.object_size); + return sharded_fs->read( + mapping.pg.collection, + mapping.object, + mapping.offset, + size, + 0 + ).handle_error( + crimson::ct_error::enoent::handle([size](auto &e) { + bufferlist bl; + bl.append_zero(size); + return seastar::make_ready_future<bufferlist>(std::move(bl)); + }), + crimson::ct_error::assert_all{"Unrecoverable error in FSDriver::read"} + ).then([size](auto &&bl) { + if (bl.length() < size) { + bl.append_zero(size - bl.length()); + } + return seastar::make_ready_future<bufferlist>(std::move(bl)); + }); +} + +seastar::future<> FSDriver::mkfs() +{ + return init( + ).then([this] { + assert(fs); + uuid_d uuid; + uuid.generate_random(); + return fs->mkfs(uuid).handle_error( + crimson::stateful_ec::handle([] (const auto& ec) { + crimson::get_logger(ceph_subsys_test) + .error("error creating empty object store in {}: ({}) {}", + crimson::common::local_conf().get_val<std::string>("osd_data"), + ec.value(), ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([this] { + return fs->stop(); + }).then([this] { + return init(); + }).then([this] { + return fs->mount( + ).handle_error( + crimson::stateful_ec::handle([] (const auto& ec) { + crimson::get_logger( + ceph_subsys_test + ).error( + "error mounting object store in {}: ({}) {}", + crimson::common::local_conf().get_val<std::string>("osd_data"), + ec.value(), + ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([this] { + return seastar::do_for_each( + boost::counting_iterator<unsigned>(0), + boost::counting_iterator<unsigned>(config.num_pgs), + [this](auto i) { + return sharded_fs->create_new_collection(get_coll(i) + ).then([this, i](auto coll) { + ceph::os::Transaction t; + t.create_collection(get_coll(i), 0); + return sharded_fs->do_transaction(coll, std::move(t)); + }); + }); + }).then([this] { + return fs->umount(); + }).then([this] { + return fs->stop(); + }).then([this] { + fs.reset(); + return seastar::now(); + }); +} + +seastar::future<> FSDriver::mount() +{ + ceph_assert(config.path); + return ( + config.mkfs ? mkfs() : seastar::now() + ).then([this] { + return init(); + }).then([this] { + return fs->mount( + ).handle_error( + crimson::stateful_ec::handle([] (const auto& ec) { + crimson::get_logger( + ceph_subsys_test + ).error( + "error mounting object store in {}: ({}) {}", + crimson::common::local_conf().get_val<std::string>("osd_data"), + ec.value(), + ec.message()); + std::exit(EXIT_FAILURE); + })); + }).then([this] { + return seastar::do_for_each( + boost::counting_iterator<unsigned>(0), + boost::counting_iterator<unsigned>(config.num_pgs), + [this](auto i) { + return sharded_fs->open_collection(get_coll(i) + ).then([this, i](auto ref) { + collections[i].collection = ref; + collections[i].log_object = get_log_object(i); + if (config.log_enabled()) { + ceph::os::Transaction t; + if (config.prepopulate_log_enabled()) { + populate_log( + t, + collections[i], + config.log_entry_size, + config.log_size); + } + return sharded_fs->do_transaction( + collections[i].collection, + std::move(t)); + } else { + return seastar::now(); + } + }); + }); + }).then([this] { + return fs->stat(); + }).then([this](auto s) { + size = s.total; + }); +}; + +seastar::future<> FSDriver::close() +{ + collections.clear(); + return fs->umount( + ).then([this] { + return fs->stop(); + }).then([this] { + fs.reset(); + return seastar::now(); + }); +} + +seastar::future<> FSDriver::init() +{ + fs.reset(); + fs = FuturizedStore::create( + config.get_fs_type(), + *config.path, + crimson::common::local_conf().get_config_values() + ); + return fs->start().then([this] { + sharded_fs = &(fs->get_sharded_store()); + }); +} diff --git a/src/crimson/tools/store_nbd/fs_driver.h b/src/crimson/tools/store_nbd/fs_driver.h new file mode 100644 index 000000000..89aca075f --- /dev/null +++ b/src/crimson/tools/store_nbd/fs_driver.h @@ -0,0 +1,72 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "block_driver.h" + +#include "crimson/os/futurized_collection.h" +#include "crimson/os/futurized_store.h" + +class FSDriver final : public BlockDriver { +public: + FSDriver(config_t config) + : config(config) + {} + ~FSDriver() final {} + + bufferptr get_buffer(size_t size) final { + return ceph::buffer::create_page_aligned(size); + } + + seastar::future<> write( + off_t offset, + bufferptr ptr) final; + + seastar::future<bufferlist> read( + off_t offset, + size_t size) final; + + size_t get_size() const { + return size; + } + + seastar::future<> mount() final; + + seastar::future<> close() final; + +private: + size_t size = 0; + const config_t config; + std::unique_ptr<crimson::os::FuturizedStore> fs; + crimson::os::FuturizedStore::Shard* sharded_fs; + + struct pg_analogue_t { + crimson::os::CollectionRef collection; + + ghobject_t log_object; + unsigned log_tail = 0; + unsigned log_head = 0; + }; + std::map<unsigned, pg_analogue_t> collections; + + struct offset_mapping_t { + pg_analogue_t &pg; + ghobject_t object; + off_t offset; + }; + offset_mapping_t map_offset(off_t offset); + + seastar::future<> mkfs(); + seastar::future<> init(); + + friend void populate_log( + ceph::os::Transaction &, + pg_analogue_t &, + unsigned, + unsigned); + + friend void update_log( + ceph::os::Transaction &, + FSDriver::pg_analogue_t &, + unsigned, + unsigned); +}; diff --git a/src/crimson/tools/store_nbd/store-nbd.cc b/src/crimson/tools/store_nbd/store-nbd.cc new file mode 100644 index 000000000..9f80c3b2c --- /dev/null +++ b/src/crimson/tools/store_nbd/store-nbd.cc @@ -0,0 +1,456 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- + +/** + * crimson-store-nbd + * + * This tool exposes crimson object store internals as an nbd server + * for use with fio in basic benchmarking. + * + * Example usage: + * + * $ ./bin/crimson-store-nbd --device-path /dev/nvme1n1 -c 1 --mkfs true --uds-path /tmp/store_nbd_socket.sock + * + * $ cat nbd.fio + * [global] + * ioengine=nbd + * uri=nbd+unix:///?socket=/tmp/store_nbd_socket.sock + * rw=randrw + * time_based + * runtime=120 + * group_reporting + * iodepth=1 + * size=500G + * + * [job0] + * offset=0 + * + * $ fio nbd.fio + */ + +#include <random> + +#include <boost/program_options/variables_map.hpp> +#include <boost/program_options/parsers.hpp> + +#include <linux/nbd.h> +#include <linux/fs.h> + +#include <seastar/apps/lib/stop_signal.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/byteorder.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/gate.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/rwlock.hh> +#include <seastar/core/thread.hh> +#include <seastar/util/defer.hh> + +#include "crimson/common/config_proxy.h" +#include "crimson/common/log.h" + +#include "block_driver.h" + +namespace po = boost::program_options; + +using namespace ceph; + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); + } +} + +struct request_context_t { + uint32_t magic = 0; + uint32_t type = 0; + + char handle[8] = {0}; + + uint64_t from = 0; + uint32_t len = 0; + + unsigned err = 0; + std::optional<bufferptr> in_buffer; + std::optional<bufferlist> out_buffer; + + using ref = std::unique_ptr<request_context_t>; + static ref make_ref() { + return std::make_unique<request_context_t>(); + } + + bool check_magic() const { + auto ret = magic == NBD_REQUEST_MAGIC; + if (!ret) { + logger().error( + "Invalid magic {} should be {}", + magic, + NBD_REQUEST_MAGIC); + } + return ret; + } + + uint32_t get_command() const { + return type & 0xff; + } + + bool has_input_buffer() const { + return get_command() == NBD_CMD_WRITE; + } + + seastar::future<> read_request(seastar::input_stream<char> &in) { + return in.read_exactly(sizeof(struct nbd_request) + ).then([this, &in](auto buf) { + if (buf.size() < sizeof(struct nbd_request)) { + throw std::system_error( + std::make_error_code( + std::errc::connection_reset)); + } + auto p = buf.get(); + magic = seastar::consume_be<uint32_t>(p); + type = seastar::consume_be<uint32_t>(p); + memcpy(handle, p, sizeof(handle)); + p += sizeof(handle); + from = seastar::consume_be<uint64_t>(p); + len = seastar::consume_be<uint32_t>(p); + logger().debug( + "Got request, magic {}, type {}, from {}, len {}", + magic, type, from, len); + + if (!check_magic()) { + throw std::system_error( + std::make_error_code( + std::errc::invalid_argument)); + } + + if (has_input_buffer()) { + return in.read_exactly(len).then([this](auto buf) { + in_buffer = ceph::buffer::create_page_aligned(len); + in_buffer->copy_in(0, len, buf.get()); + return seastar::now(); + }); + } else { + return seastar::now(); + } + }); + } + + seastar::future<> write_reply(seastar::output_stream<char> &out) { + seastar::temporary_buffer<char> buffer{sizeof(struct nbd_reply)}; + auto p = buffer.get_write(); + seastar::produce_be<uint32_t>(p, NBD_REPLY_MAGIC); + seastar::produce_be<uint32_t>(p, err); + logger().debug("write_reply writing err {}", err); + memcpy(p, handle, sizeof(handle)); + return out.write(std::move(buffer)).then([this, &out] { + if (out_buffer) { + return seastar::do_for_each( + out_buffer->mut_buffers(), + [&out](bufferptr &ptr) { + logger().debug("write_reply writing {}", ptr.length()); + return out.write( + seastar::temporary_buffer<char>( + ptr.c_str(), + ptr.length(), + seastar::make_deleter([ptr](){})) + ); + }); + } else { + return seastar::now(); + } + }).then([&out] { + return out.flush(); + }); + } +}; + +struct RequestWriter { + seastar::rwlock lock; + seastar::output_stream<char> stream; + seastar::gate gate; + + RequestWriter( + seastar::output_stream<char> &&stream) : stream(std::move(stream)) {} + RequestWriter(RequestWriter &&) = default; + + seastar::future<> complete(request_context_t::ref &&req) { + auto &request = *req; + return lock.write_lock( + ).then([&request, this] { + return request.write_reply(stream); + }).finally([&, this, req=std::move(req)] { + lock.write_unlock(); + logger().debug("complete"); + return seastar::now(); + }); + } + + seastar::future<> close() { + return gate.close().then([this] { + return stream.close(); + }); + } +}; + +/** + * NBDHandler + * + * Simple throughput test for concurrent, single threaded + * writes to an BlockDriver. + */ +class NBDHandler { + BlockDriver &backend; + std::string uds_path; + std::optional<seastar::server_socket> server_socket; + std::optional<seastar::connected_socket> connected_socket; + seastar::gate gate; +public: + struct config_t { + std::string uds_path; + + void populate_options( + po::options_description &desc) + { + desc.add_options() + ("uds-path", + po::value<std::string>() + ->default_value("/tmp/store_nbd_socket.sock") + ->notifier([this](auto s) { + uds_path = s; + }), + "Path to domain socket for nbd" + ); + } + }; + + NBDHandler( + BlockDriver &backend, + config_t config) : + backend(backend), + uds_path(config.uds_path) + {} + + void run(); + seastar::future<> stop(); +}; + +int main(int argc, char** argv) +{ + po::options_description desc{"Allowed options"}; + bool debug = false; + desc.add_options() + ("help,h", "show help message") + ("debug", po::value<bool>(&debug)->default_value(false), + "enable debugging"); + + po::options_description nbd_pattern_options{"NBD Pattern Options"}; + NBDHandler::config_t nbd_config; + nbd_config.populate_options(nbd_pattern_options); + desc.add(nbd_pattern_options); + + po::options_description backend_pattern_options{"Backend Options"}; + BlockDriver::config_t backend_config; + backend_config.populate_options(backend_pattern_options); + desc.add(backend_pattern_options); + + po::variables_map vm; + std::vector<std::string> unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + + po::notify(vm); + unrecognized_options = + po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + std::vector<const char*> args(argv, argv + argc); + + seastar::app_template::config app_cfg; + app_cfg.name = "crimson-store-nbd"; + app_cfg.auto_handle_sigint_sigterm = false; + seastar::app_template app(std::move(app_cfg)); + + std::vector<char*> av{argv[0]}; + std::transform(begin(unrecognized_options), + end(unrecognized_options), + std::back_inserter(av), + [](auto& s) { + return const_cast<char*>(s.c_str()); + }); + return app.run(av.size(), av.data(), [&] { + if (debug) { + seastar::global_logger_registry().set_all_loggers_level( + seastar::log_level::debug + ); + } + return seastar::async([&] { + seastar_apps_lib::stop_signal should_stop; + crimson::common::sharded_conf() + .start(EntityName{}, std::string_view{"ceph"}).get(); + auto stop_conf = seastar::defer([] { + crimson::common::sharded_conf().stop().get(); + }); + + auto backend = get_backend(backend_config); + NBDHandler nbd(*backend, nbd_config); + backend->mount().get(); + auto close_backend = seastar::defer([&] { + backend->close().get(); + }); + + logger().debug("Running nbd server..."); + nbd.run(); + auto stop_nbd = seastar::defer([&] { + nbd.stop().get(); + }); + should_stop.wait().get(); + return 0; + }); + }); +} + +class nbd_oldstyle_negotiation_t { + uint64_t magic = seastar::cpu_to_be(0x4e42444d41474943); // "NBDMAGIC" + uint64_t magic2 = seastar::cpu_to_be(0x00420281861253); // "IHAVEOPT" + uint64_t size = 0; + uint32_t flags = seastar::cpu_to_be(0); + char reserved[124] = {0}; + +public: + nbd_oldstyle_negotiation_t(uint64_t size, uint32_t flags) + : size(seastar::cpu_to_be(size)), flags(seastar::cpu_to_be(flags)) {} +} __attribute__((packed)); + +seastar::future<> send_negotiation( + size_t size, + seastar::output_stream<char>& out) +{ + seastar::temporary_buffer<char> buf{sizeof(nbd_oldstyle_negotiation_t)}; + new (buf.get_write()) nbd_oldstyle_negotiation_t(size, 1); + return out.write(std::move(buf) + ).then([&out] { + return out.flush(); + }); +} + +seastar::future<> handle_command( + BlockDriver &backend, + request_context_t::ref request_ref, + RequestWriter &out) +{ + auto &request = *request_ref; + logger().debug("got command {}", request.get_command()); + return ([&] { + switch (request.get_command()) { + case NBD_CMD_WRITE: + return backend.write( + request.from, + *request.in_buffer); + case NBD_CMD_READ: + return backend.read( + request.from, + request.len).then([&] (auto buffer) { + logger().debug("read returned buffer len {}", buffer.length()); + request.out_buffer = buffer; + }); + case NBD_CMD_DISC: + throw std::system_error(std::make_error_code(std::errc::bad_message)); + case NBD_CMD_TRIM: + throw std::system_error(std::make_error_code(std::errc::bad_message)); + default: + throw std::system_error(std::make_error_code(std::errc::bad_message)); + } + })().then([&, request_ref=std::move(request_ref)]() mutable { + logger().debug("handle_command complete"); + return out.complete(std::move(request_ref)); + }); +} + + +seastar::future<> handle_commands( + BlockDriver &backend, + seastar::input_stream<char>& in, + RequestWriter &out) +{ + logger().debug("handle_commands"); + return seastar::keep_doing([&] { + logger().debug("waiting for command"); + auto request_ref = request_context_t::make_ref(); + auto &request = *request_ref; + return request.read_request(in).then( + [&, request_ref=std::move(request_ref)]() mutable { + // keep running in background + (void)seastar::try_with_gate(out.gate, + [&backend, &out, request_ref=std::move(request_ref)]() mutable { + return handle_command(backend, std::move(request_ref), out); + }); + logger().debug("handle_commands after fork"); + }); + }).handle_exception_type([](const seastar::gate_closed_exception&) {}); +} + +void NBDHandler::run() +{ + logger().debug("About to listen on {}", uds_path); + server_socket = seastar::engine().listen( + seastar::socket_address{ + seastar::unix_domain_addr{uds_path}}); + + // keep running in background + (void)seastar::keep_doing([this] { + return seastar::try_with_gate(gate, [this] { + return server_socket->accept().then([this](auto acc) { + logger().debug("Accepted"); + connected_socket = std::move(acc.connection); + return seastar::do_with( + connected_socket->input(), + RequestWriter{connected_socket->output()}, + [&, this](auto &input, auto &output) { + return send_negotiation( + backend.get_size(), + output.stream + ).then([&, this] { + return handle_commands(backend, input, output); + }).finally([&] { + std::cout << "closing input and output" << std::endl; + return seastar::when_all(input.close(), + output.close()); + }).discard_result().handle_exception([](auto e) { + logger().error("NBDHandler::run saw exception {}", e); + }); + }); + }).handle_exception_type([] (const std::system_error &e) { + // an ECONNABORTED is expected when we are being stopped. + if (e.code() != std::errc::connection_aborted) { + logger().error("accept failed: {}", e); + } + }); + }); + }).handle_exception_type([](const seastar::gate_closed_exception&) {}); +} + +seastar::future<> NBDHandler::stop() +{ + if (server_socket) { + server_socket->abort_accept(); + } + if (connected_socket) { + connected_socket->shutdown_input(); + connected_socket->shutdown_output(); + } + return gate.close().then([this] { + if (!server_socket.has_value()) { + return seastar::now(); + } + return seastar::remove_file(uds_path); + }); +} diff --git a/src/crimson/tools/store_nbd/tm_driver.cc b/src/crimson/tools/store_nbd/tm_driver.cc new file mode 100644 index 000000000..bd216fd58 --- /dev/null +++ b/src/crimson/tools/store_nbd/tm_driver.cc @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "tm_driver.h" + +using namespace crimson; +using namespace crimson::os; +using namespace crimson::os::seastore; + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_test); + } +} + +seastar::future<> TMDriver::write( + off_t offset, + bufferptr ptr) +{ + logger().debug("Writing offset {}", offset); + assert(offset % device->get_block_size() == 0); + assert((ptr.length() % device->get_block_size()) == 0); + return seastar::do_with(ptr, [this, offset](auto& ptr) { + return repeat_eagain([this, offset, &ptr] { + return tm->with_transaction_intr( + Transaction::src_t::MUTATE, + "write", + [this, offset, &ptr](auto& t) + { + return tm->dec_ref(t, offset + ).si_then([](auto){}).handle_error_interruptible( + crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }), + crimson::ct_error::pass_further_all{} + ).si_then([this, offset, &t, &ptr] { + logger().debug("dec_ref complete"); + return tm->alloc_extent<TestBlock>(t, offset, ptr.length()); + }).si_then([this, offset, &t, &ptr](auto ext) { + boost::ignore_unused(offset); // avoid clang warning; + assert(ext->get_laddr() == (size_t)offset); + assert(ext->get_bptr().length() == ptr.length()); + ext->get_bptr().swap(ptr); + logger().debug("submitting transaction"); + return tm->submit_transaction(t); + }); + }); + }); + }).handle_error( + crimson::ct_error::assert_all{"store-nbd write"} + ); +} + +TMDriver::read_extents_ret TMDriver::read_extents( + Transaction &t, + laddr_t offset, + extent_len_t length) +{ + return seastar::do_with( + lba_pin_list_t(), + lextent_list_t<TestBlock>(), + [this, &t, offset, length](auto &pins, auto &ret) { + return tm->get_pins( + t, offset, length + ).si_then([this, &t, &pins, &ret](auto _pins) { + _pins.swap(pins); + logger().debug("read_extents: mappings {}", pins); + return trans_intr::do_for_each( + pins.begin(), + pins.end(), + [this, &t, &ret](auto &&pin) { + logger().debug( + "read_extents: get_extent {}~{}", + pin->get_val(), + pin->get_length()); + return tm->read_pin<TestBlock>( + t, + std::move(pin) + ).si_then([&ret](auto ref) mutable { + ret.push_back(std::make_pair(ref->get_laddr(), ref)); + logger().debug( + "read_extents: got extent {}", + *ref); + return seastar::now(); + }); + }).si_then([&ret] { + return std::move(ret); + }); + }); + }); +} + +seastar::future<bufferlist> TMDriver::read( + off_t offset, + size_t size) +{ + logger().debug("Reading offset {}", offset); + assert(offset % device->get_block_size() == 0); + assert(size % device->get_block_size() == 0); + auto blptrret = std::make_unique<bufferlist>(); + auto &blret = *blptrret; + return repeat_eagain([=, &blret, this] { + return tm->with_transaction_intr( + Transaction::src_t::READ, + "read", + [=, &blret, this](auto& t) + { + return read_extents(t, offset, size + ).si_then([=, &blret](auto ext_list) { + size_t cur = offset; + for (auto &i: ext_list) { + if (cur != i.first) { + assert(cur < i.first); + blret.append_zero(i.first - cur); + cur = i.first; + } + blret.append(i.second->get_bptr()); + cur += i.second->get_bptr().length(); + } + if (blret.length() != size) { + assert(blret.length() < size); + blret.append_zero(size - blret.length()); + } + }); + }); + }).handle_error( + crimson::ct_error::assert_all{"store-nbd read"} + ).then([blptrret=std::move(blptrret)]() mutable { + logger().debug("read complete"); + return std::move(*blptrret); + }); +} + +void TMDriver::init() +{ + std::vector<Device*> sec_devices; +#ifndef NDEBUG + tm = make_transaction_manager(device.get(), sec_devices, true); +#else + tm = make_transaction_manager(device.get(), sec_devices, false); +#endif +} + +void TMDriver::clear() +{ + tm.reset(); +} + +size_t TMDriver::get_size() const +{ + return device->get_available_size() * .5; +} + +seastar::future<> TMDriver::mkfs() +{ + assert(config.path); + logger().debug("mkfs"); + return Device::make_device(*config.path, device_type_t::SSD + ).then([this](DeviceRef dev) { + device = std::move(dev); + seastore_meta_t meta; + meta.seastore_id.generate_random(); + return device->mkfs( + device_config_t{ + true, + (magic_t)std::rand(), + device_type_t::SSD, + 0, + meta, + secondary_device_set_t()}); + }).safe_then([this] { + logger().debug("device mkfs done"); + return device->mount(); + }).safe_then([this] { + init(); + logger().debug("tm mkfs"); + return tm->mkfs(); + }).safe_then([this] { + logger().debug("tm close"); + return tm->close(); + }).safe_then([this] { + logger().debug("sm close"); + return device->close(); + }).safe_then([this] { + clear(); + device.reset(); + logger().debug("mkfs complete"); + return TransactionManager::mkfs_ertr::now(); + }).handle_error( + crimson::ct_error::assert_all{ + "Invalid errror during TMDriver::mkfs" + } + ); +} + +seastar::future<> TMDriver::mount() +{ + return (config.mkfs ? mkfs() : seastar::now() + ).then([this] { + return Device::make_device(*config.path, device_type_t::SSD); + }).then([this](DeviceRef dev) { + device = std::move(dev); + return device->mount(); + }).safe_then([this] { + init(); + return tm->mount(); + }).handle_error( + crimson::ct_error::assert_all{ + "Invalid errror during TMDriver::mount" + } + ); +}; + +seastar::future<> TMDriver::close() +{ + return tm->close().safe_then([this] { + clear(); + return device->close(); + }).handle_error( + crimson::ct_error::assert_all{ + "Invalid errror during TMDriver::close" + } + ); +} diff --git a/src/crimson/tools/store_nbd/tm_driver.h b/src/crimson/tools/store_nbd/tm_driver.h new file mode 100644 index 000000000..24aabdeb6 --- /dev/null +++ b/src/crimson/tools/store_nbd/tm_driver.h @@ -0,0 +1,56 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "block_driver.h" + +#include "crimson/os/seastore/cache.h" +#include "crimson/os/seastore/device.h" +#include "crimson/os/seastore/transaction_manager.h" +#include "test/crimson/seastore/test_block.h" + +class TMDriver final : public BlockDriver { +public: + TMDriver(config_t config) : config(config) {} + ~TMDriver() final {} + + bufferptr get_buffer(size_t size) final { + return ceph::buffer::create_page_aligned(size); + } + + seastar::future<> write( + off_t offset, + bufferptr ptr) final; + + seastar::future<bufferlist> read( + off_t offset, + size_t size) final; + + size_t get_size() const final; + + seastar::future<> mount() final; + + seastar::future<> close() final; + +private: + const config_t config; + + using DeviceRef = crimson::os::seastore::DeviceRef; + DeviceRef device; + + using TransactionManager = crimson::os::seastore::TransactionManager; + using TransactionManagerRef = crimson::os::seastore::TransactionManagerRef; + TransactionManagerRef tm; + + seastar::future<> mkfs(); + void init(); + void clear(); + + using read_extents_iertr = TransactionManager::read_extent_iertr; + using read_extents_ret = read_extents_iertr::future< + crimson::os::seastore::lextent_list_t<crimson::os::seastore::TestBlock> + >; + read_extents_ret read_extents( + crimson::os::seastore::Transaction &t, + crimson::os::seastore::laddr_t offset, + crimson::os::seastore::extent_len_t length); +}; |