diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/test/fio/fio_ceph_objectstore.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/fio/fio_ceph_objectstore.cc')
-rw-r--r-- | src/test/fio/fio_ceph_objectstore.cc | 772 |
1 files changed, 772 insertions, 0 deletions
diff --git a/src/test/fio/fio_ceph_objectstore.cc b/src/test/fio/fio_ceph_objectstore.cc new file mode 100644 index 00000000..b8c29624 --- /dev/null +++ b/src/test/fio/fio_ceph_objectstore.cc @@ -0,0 +1,772 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph ObjectStore engine + * + * IO engine using Ceph's ObjectStore class to test low-level performance of + * Ceph OSDs. + * + */ + +#include <memory> +#include <system_error> +#include <vector> + +#include "os/ObjectStore.h" +#include "global/global_init.h" +#include "common/errno.h" +#include "include/intarith.h" +#include "include/stringify.h" +#include "include/random.h" +#include "common/perf_counters.h" + +#include <fio.h> +#include <optgroup.h> + +#include "include/ceph_assert.h" // fio.h clobbers our assert.h +#include <algorithm> + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_ + +namespace { + +/// fio configuration options read from the job file +struct Options { + thread_data* td; + char* conf; + unsigned long long + oi_attr_len_low, + oi_attr_len_high, + snapset_attr_len_low, + snapset_attr_len_high, + pglog_omap_len_low, + pglog_omap_len_high, + pglog_dup_omap_len_low, + pglog_dup_omap_len_high, + _fastinfo_omap_len_low, + _fastinfo_omap_len_high; + unsigned simulate_pglog; + unsigned single_pool_mode; + unsigned preallocate_files; +}; + +template <class Func> // void Func(fio_option&) +fio_option make_option(Func&& func) +{ + // zero-initialize and set common defaults + auto o = fio_option{}; + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_RBD; + func(std::ref(o)); + return o; +} + +static std::vector<fio_option> ceph_options{ + make_option([] (fio_option& o) { + o.name = "conf"; + o.lname = "ceph configuration file"; + o.type = FIO_OPT_STR_STORE; + o.help = "Path to a ceph configuration file"; + o.off1 = offsetof(Options, conf); + }), + make_option([] (fio_option& o) { + o.name = "oi_attr_len"; + o.lname = "OI Attr length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set OI(aka '_') attribute to specified length"; + o.off1 = offsetof(Options, oi_attr_len_low); + o.off2 = offsetof(Options, oi_attr_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "snapset_attr_len"; + o.lname = "Attr 'snapset' length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set 'snapset' attribute to specified length"; + o.off1 = offsetof(Options, snapset_attr_len_low); + o.off2 = offsetof(Options, snapset_attr_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "_fastinfo_omap_len"; + o.lname = "'_fastinfo' omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set '_fastinfo' OMAP attribute to specified length"; + o.off1 = offsetof(Options, _fastinfo_omap_len_low); + o.off2 = offsetof(Options, _fastinfo_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "pglog_simulation"; + o.lname = "pglog behavior simulation"; + o.type = FIO_OPT_BOOL; + o.help = "Enables PG Log simulation behavior"; + o.off1 = offsetof(Options, simulate_pglog); + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "pglog_omap_len"; + o.lname = "pglog omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set pglog omap entry to specified length"; + o.off1 = offsetof(Options, pglog_omap_len_low); + o.off2 = offsetof(Options, pglog_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "pglog_dup_omap_len"; + o.lname = "uplicate pglog omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set duplicate pglog omap entry to specified length"; + o.off1 = offsetof(Options, pglog_dup_omap_len_low); + o.off2 = offsetof(Options, pglog_dup_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "single_pool_mode"; + o.lname = "single(shared among jobs) pool mode"; + o.type = FIO_OPT_BOOL; + o.help = "Enables the mode when all jobs run against the same pool"; + o.off1 = offsetof(Options, single_pool_mode); + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "preallocate_files"; + o.lname = "preallocate files on init"; + o.type = FIO_OPT_BOOL; + o.help = "Enables/disables file preallocation (touch and resize) on init"; + o.off1 = offsetof(Options, preallocate_files); + o.def = "1"; + }), + {} // fio expects a 'null'-terminated list +}; + + +struct Collection { + spg_t pg; + coll_t cid; + ObjectStore::CollectionHandle ch; + // Can't use mutex directly in vectors hence dynamic allocation + + std::unique_ptr<std::mutex> lock; + uint64_t pglog_ver_head = 1; + uint64_t pglog_ver_tail = 1; + uint64_t pglog_dup_ver_tail = 1; + + // use big pool ids to avoid clashing with existing collections + static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff; + + Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch) + : pg(pg), cid(pg), ch(_ch), + lock(new std::mutex) { + } +}; + +int destroy_collections( + std::unique_ptr<ObjectStore>& os, + std::vector<Collection>& collections) +{ + ObjectStore::Transaction t; + bool failed = false; + // remove our collections + for (auto& coll : collections) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.remove(coll.cid, pgmeta_oid); + t.remove_collection(coll.cid); + int r = os->queue_transaction(coll.ch, std::move(t)); + if (r && !failed) { + derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl; + failed = true; + } + } + return 0; +} + +int init_collections(std::unique_ptr<ObjectStore>& os, + uint64_t pool, + std::vector<Collection>& collections, + uint64_t count) +{ + ceph_assert(count > 0); + collections.reserve(count); + + const int split_bits = cbits(count - 1); + + { + // propagate Superblock object to ensure proper functioning of tools that + // need it. E.g. ceph-objectstore-tool + coll_t cid(coll_t::meta()); + bool exists = os->collection_exists(cid); + if (!exists) { + auto ch = os->create_new_collection(cid); + + OSDSuperblock superblock; + bufferlist bl; + encode(superblock, bl); + + ObjectStore::Transaction t; + t.create_collection(cid, split_bits); + t.write(cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); + int r = os->queue_transaction(ch, std::move(t)); + + if (r < 0) { + derr << "Failure to write OSD superblock: " << cpp_strerror(-r) << dendl; + return r; + } + } + } + + for (uint32_t i = 0; i < count; i++) { + auto pg = spg_t{pg_t{i, pool}}; + coll_t cid(pg); + + bool exists = os->collection_exists(cid); + auto ch = exists ? + os->open_collection(cid) : + os->create_new_collection(cid) ; + + collections.emplace_back(pg, ch); + + ObjectStore::Transaction t; + auto& coll = collections.back(); + if (!exists) { + t.create_collection(coll.cid, split_bits); + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.touch(coll.cid, pgmeta_oid); + int r = os->queue_transaction(coll.ch, std::move(t)); + if (r) { + derr << "Engine init failed with " << cpp_strerror(-r) << dendl; + destroy_collections(os, collections); + return r; + } + } + } + return 0; +} + +/// global engine state shared between all jobs within the process. this +/// includes g_ceph_context and the ObjectStore instance +struct Engine { + /// the initial g_ceph_context reference to be dropped on destruction + boost::intrusive_ptr<CephContext> cct; + std::unique_ptr<ObjectStore> os; + + std::vector<Collection> collections; //< shared collections to spread objects over + + std::mutex lock; + int ref_count; + const bool unlink; //< unlink objects on destruction + + explicit Engine(thread_data* td); + ~Engine(); + + static Engine* get_instance(thread_data* td) { + // note: creates an Engine with the options associated with the first job + static Engine engine(td); + return &engine; + } + + void ref() { + std::lock_guard<std::mutex> l(lock); + ++ref_count; + } + void deref() { + std::lock_guard<std::mutex> l(lock); + --ref_count; + if (!ref_count) { + ostringstream ostr; + Formatter* f = Formatter::create("json-pretty", "json-pretty", "json-pretty"); + cct->get_perfcounters_collection()->dump_formatted(f, false); + ostr << "FIO plugin "; + f->flush(ostr); + if (g_conf()->rocksdb_perf) { + os->get_db_statistics(f); + ostr << "FIO get_db_statistics "; + f->flush(ostr); + } + ostr << "Mempools: "; + f->open_object_section("mempools"); + mempool::dump(f); + f->close_section(); + f->flush(ostr); + + ostr << "Generate db histogram: "; + os->generate_db_histogram(f); + f->flush(ostr); + delete f; + + if (unlink) { + destroy_collections(os, collections); + } + os->umount(); + dout(0) << ostr.str() << dendl; + } + } +}; + +Engine::Engine(thread_data* td) + : ref_count(0), + unlink(td->o.unlink) +{ + // add the ceph command line arguments + auto o = static_cast<Options*>(td->eo); + if (!o->conf) { + throw std::runtime_error("missing conf option for ceph configuration file"); + } + std::vector<const char*> args{ + "-i", "0", // identify as osd.0 for osd_data and osd_journal + "--conf", o->conf, // use the requested conf file + }; + if (td->o.directory) { // allow conf files to use ${fio_dir} for data + args.emplace_back("--fio_dir"); + args.emplace_back(td->o.directory); + } + + // claim the g_ceph_context reference and release it on destruction + cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + + // create the ObjectStore + os.reset(ObjectStore::create(g_ceph_context, + g_conf().get_val<std::string>("osd objectstore"), + g_conf().get_val<std::string>("osd data"), + g_conf().get_val<std::string>("osd journal"))); + if (!os) + throw std::runtime_error("bad objectstore type " + g_conf()->osd_objectstore); + + unsigned num_shards; + if(g_conf()->osd_op_num_shards) + num_shards = g_conf()->osd_op_num_shards; + else if(os->is_rotational()) + num_shards = g_conf()->osd_op_num_shards_hdd; + else + num_shards = g_conf()->osd_op_num_shards_ssd; + os->set_cache_shards(num_shards); + + //normalize options + o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high); + o->snapset_attr_len_high = max(o->snapset_attr_len_low, + o->snapset_attr_len_high); + o->pglog_omap_len_high = max(o->pglog_omap_len_low, + o->pglog_omap_len_high); + o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low, + o->pglog_dup_omap_len_high); + o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low, + o->_fastinfo_omap_len_high); + + int r = os->mkfs(); + if (r < 0) + throw std::system_error(-r, std::system_category(), "mkfs failed"); + + r = os->mount(); + if (r < 0) + throw std::system_error(-r, std::system_category(), "mount failed"); + + // create shared collections up to osd_pool_default_pg_num + if (o->single_pool_mode) { + uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num"); + if (count > td->o.nr_files) + count = td->o.nr_files; + init_collections(os, Collection::MIN_POOL_ID, collections, count); + } +} + +Engine::~Engine() +{ + ceph_assert(!ref_count); +} + +struct Object { + ghobject_t oid; + Collection& coll; + + Object(const char* name, Collection& coll) + : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")), + coll(coll) {} +}; + +/// treat each fio job either like a separate pool with its own collections and objects +/// or just a client using its own objects from the shared pool +struct Job { + Engine* engine; //< shared ptr to the global Engine + std::vector<Collection> collections; //< job's private collections to spread objects over + std::vector<Object> objects; //< associate an object with each fio_file + std::vector<io_u*> events; //< completions for fio_ceph_os_event() + const bool unlink; //< unlink objects on destruction + + bufferptr one_for_all_data; //< preallocated buffer long enough + //< to use for vairious operations + + Job(Engine* engine, const thread_data* td); + ~Job(); +}; + +Job::Job(Engine* engine, const thread_data* td) + : engine(engine), + events(td->o.iodepth), + unlink(td->o.unlink) +{ + engine->ref(); + auto o = static_cast<Options*>(td->eo); + unsigned long long max_data = max(o->oi_attr_len_high, + o->snapset_attr_len_high); + max_data = max(max_data, o->pglog_omap_len_high); + max_data = max(max_data, o->pglog_dup_omap_len_high); + max_data = max(max_data, o->_fastinfo_omap_len_high); + one_for_all_data = buffer::create(max_data); + + std::vector<Collection>* colls; + // create private collections up to osd_pool_default_pg_num + if (!o->single_pool_mode) { + uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num"); + if (count > td->o.nr_files) + count = td->o.nr_files; + // use the fio thread_number for our unique pool id + const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1; + init_collections(engine->os, pool, collections, count); + colls = &collections; + } else { + colls = &engine->collections; + } + const uint64_t file_size = td->o.size / max(1u, td->o.nr_files); + ObjectStore::Transaction t; + + // create an object for each file in the job + objects.reserve(td->o.nr_files); + for (uint32_t i = 0; i < td->o.nr_files; i++) { + auto f = td->files[i]; + f->real_file_size = file_size; + f->engine_pos = i; + + // associate each object with a collection in a round-robin fashion. + auto& coll = (*colls)[i % colls->size()]; + + objects.emplace_back(f->file_name, coll); + if (o->preallocate_files) { + auto& oid = objects.back().oid; + t.touch(coll.cid, oid); + t.truncate(coll.cid, oid, file_size); + int r = engine->os->queue_transaction(coll.ch, std::move(t)); + if (r) { + engine->deref(); + throw std::system_error(r, std::system_category(), "job init"); + } + } + } +} + +Job::~Job() +{ + if (unlink) { + ObjectStore::Transaction t; + bool failed = false; + // remove our objects + for (auto& obj : objects) { + t.remove(obj.coll.cid, obj.oid); + int r = engine->os->queue_transaction(obj.coll.ch, std::move(t)); + if (r && !failed) { + derr << "job cleanup failed with " << cpp_strerror(-r) << dendl; + failed = true; + } + } + destroy_collections(engine->os, collections); + } + engine->deref(); +} + +int fio_ceph_os_setup(thread_data* td) +{ + // if there are multiple jobs, they must run in the same process against a + // single instance of the ObjectStore. explicitly disable fio's default + // job-per-process configuration + td->o.use_thread = 1; + + try { + // get or create the global Engine instance + auto engine = Engine::get_instance(td); + // create a Job for this thread + td->io_ops_data = new Job(engine, td); + } catch (std::exception& e) { + std::cerr << "setup failed with " << e.what() << std::endl; + return -1; + } + return 0; +} + +void fio_ceph_os_cleanup(thread_data* td) +{ + auto job = static_cast<Job*>(td->io_ops_data); + td->io_ops_data = nullptr; + delete job; +} + + +io_u* fio_ceph_os_event(thread_data* td, int event) +{ + // return the requested event from fio_ceph_os_getevents() + auto job = static_cast<Job*>(td->io_ops_data); + return job->events[event]; +} + +int fio_ceph_os_getevents(thread_data* td, unsigned int min, + unsigned int max, const timespec* t) +{ + auto job = static_cast<Job*>(td->io_ops_data); + unsigned int events = 0; + io_u* u = NULL; + unsigned int i = 0; + + // loop through inflight ios until we find 'min' completions + do { + io_u_qiter(&td->io_u_all, u, i) { + if (!(u->flags & IO_U_F_FLIGHT)) + continue; + + if (u->engine_data) { + u->engine_data = nullptr; + job->events[events] = u; + events++; + } + } + if (events >= min) + break; + usleep(100); + } while (1); + + return events; +} + +/// completion context for ObjectStore::queue_transaction() +class UnitComplete : public Context { + io_u* u; + public: + explicit UnitComplete(io_u* u) : u(u) {} + void finish(int r) { + // mark the pointer to indicate completion for fio_ceph_os_getevents() + u->engine_data = reinterpret_cast<void*>(1ull); + } +}; + +enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u) +{ + fio_ro_check(td, u); + + + + auto o = static_cast<const Options*>(td->eo); + auto job = static_cast<Job*>(td->io_ops_data); + auto& object = job->objects[u->file->engine_pos]; + auto& coll = object.coll; + auto& os = job->engine->os; + + if (u->ddir == DDIR_WRITE) { + // provide a hint if we're likely to read this data back + const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0; + + bufferlist bl; + bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf), + u->xfer_buflen ) ); + + map<string,bufferptr> attrset; + map<string, bufferlist> omaps; + // enqueue a write transaction on the collection's handle + ObjectStore::Transaction t; + char ver_key[64]; + + // fill attrs if any + if (o->oi_attr_len_high) { + ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->oi_attr_len_low, o->oi_attr_len_high)); + attrset["_"] = job->one_for_all_data; + } + if (o->snapset_attr_len_high) { + ceph_assert(o->snapset_attr_len_high >= o->snapset_attr_len_low); + job->one_for_all_data.set_length( + ceph::util::generate_random_number + (o->snapset_attr_len_low, o->snapset_attr_len_high)); + attrset["snapset"] = job->one_for_all_data; + + } + if (o->_fastinfo_omap_len_high) { + ceph_assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high)); + omaps["_fastinfo"].append(job->one_for_all_data); + } + + uint64_t pglog_trim_head = 0, pglog_trim_tail = 0; + uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0; + if (o->simulate_pglog) { + + uint64_t pglog_ver_cnt = 0; + { + std::lock_guard<std::mutex> l(*coll.lock); + pglog_ver_cnt = coll.pglog_ver_head++; + if (o->pglog_omap_len_high && + pglog_ver_cnt >= + coll.pglog_ver_tail + + g_conf()->osd_min_pg_log_entries + g_conf()->osd_pg_log_trim_min) { + pglog_trim_tail = coll.pglog_ver_tail; + coll.pglog_ver_tail = pglog_trim_head = + pglog_trim_tail + g_conf()->osd_pg_log_trim_min; + + if (o->pglog_dup_omap_len_high && + pglog_ver_cnt >= + coll.pglog_dup_ver_tail + g_conf()->osd_pg_log_dups_tracked + + g_conf()->osd_pg_log_trim_min) { + pglog_dup_trim_tail = coll.pglog_dup_ver_tail; + coll.pglog_dup_ver_tail = pglog_dup_trim_head = + pglog_dup_trim_tail + g_conf()->osd_pg_log_trim_min; + } + } + } + + if (o->pglog_omap_len_high) { + ceph_assert(o->pglog_omap_len_high >= o->pglog_omap_len_low); + snprintf(ver_key, sizeof(ver_key), + "0000000011.%020llu", (unsigned long long)pglog_ver_cnt); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->pglog_omap_len_low, o->pglog_omap_len_high)); + omaps[ver_key].append(job->one_for_all_data); + } + if (o->pglog_dup_omap_len_high) { + //insert dup + ceph_assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low); + for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "dup_0000000011.%020llu", (unsigned long long)i); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high)); + omaps[ver_key].append(job->one_for_all_data); + } + } + } + + if (attrset.size()) { + t.setattrs(coll.cid, object.oid, attrset); + } + t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags); + + set<string> rmkeys; + for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "0000000011.%020llu", (unsigned long long)i); + rmkeys.emplace(ver_key); + } + for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "dup_0000000011.%020llu", (unsigned long long)i); + rmkeys.emplace(ver_key); + } + + if (rmkeys.size()) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys); + } + + if (omaps.size()) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.omap_setkeys(coll.cid, pgmeta_oid, omaps); + } + t.register_on_commit(new UnitComplete(u)); + os->queue_transaction(coll.ch, + std::move(t)); + return FIO_Q_QUEUED; + } + + if (u->ddir == DDIR_READ) { + // ObjectStore reads are synchronous, so make the call and return COMPLETED + bufferlist bl; + int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl); + if (r < 0) { + u->error = r; + td_verror(td, u->error, "xfer"); + } else { + bl.copy(0, bl.length(), static_cast<char*>(u->xfer_buf)); + u->resid = u->xfer_buflen - r; + } + return FIO_Q_COMPLETED; + } + + derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl; + u->error = -EINVAL; + td_verror(td, u->error, "xfer"); + return FIO_Q_COMPLETED; +} + +int fio_ceph_os_commit(thread_data* td) +{ + // commit() allows the engine to batch up queued requests to be submitted all + // at once. it would be natural for queue() to collect transactions in a list, + // and use commit() to pass them all to ObjectStore::queue_transactions(). but + // because we spread objects over multiple collections, we a) need to use a + // different sequencer for each collection, and b) are less likely to see a + // benefit from batching requests within a collection + return 0; +} + +// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to +// prevent fio from creating the files +int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; } +int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; } + +int fio_ceph_os_io_u_init(thread_data* td, io_u* u) +{ + // no data is allocated, we just use the pointer as a boolean 'completed' flag + u->engine_data = nullptr; + return 0; +} + +void fio_ceph_os_io_u_free(thread_data* td, io_u* u) +{ + u->engine_data = nullptr; +} + + +// ioengine_ops for get_ioengine() +struct ceph_ioengine : public ioengine_ops { + ceph_ioengine() : ioengine_ops({}) { + name = "ceph-os"; + version = FIO_IOOPS_VERSION; + flags = FIO_DISKLESSIO; + setup = fio_ceph_os_setup; + queue = fio_ceph_os_queue; + commit = fio_ceph_os_commit; + getevents = fio_ceph_os_getevents; + event = fio_ceph_os_event; + cleanup = fio_ceph_os_cleanup; + open_file = fio_ceph_os_open; + close_file = fio_ceph_os_close; + io_u_init = fio_ceph_os_io_u_init; + io_u_free = fio_ceph_os_io_u_free; + options = ceph_options.data(); + option_struct_size = sizeof(struct Options); + } +}; + +} // anonymous namespace + +extern "C" { +// the exported fio engine interface +void get_ioengine(struct ioengine_ops** ioengine_ptr) { + static ceph_ioengine ioengine; + *ioengine_ptr = &ioengine; +} +} // extern "C" |