From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/test/fio/fio_ceph_objectstore.cc | 942 +++++++++++++++++++++++++++++++++++ 1 file changed, 942 insertions(+) create mode 100644 src/test/fio/fio_ceph_objectstore.cc (limited to 'src/test/fio/fio_ceph_objectstore.cc') diff --git a/src/test/fio/fio_ceph_objectstore.cc b/src/test/fio/fio_ceph_objectstore.cc new file mode 100644 index 000000000..ade043f0c --- /dev/null +++ b/src/test/fio/fio_ceph_objectstore.cc @@ -0,0 +1,942 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph ObjectStore engine + * + * IO engine using Ceph's ObjectStore class to test low-level performance of + * Ceph OSDs. + * + */ + +#include +#include +#include +#include + +#include "os/ObjectStore.h" +#include "global/global_init.h" +#include "common/errno.h" +#include "include/intarith.h" +#include "include/stringify.h" +#include "include/random.h" +#include "include/str_list.h" +#include "common/perf_counters.h" +#include "common/TracepointProvider.h" + +#include +#include + +#include "include/ceph_assert.h" // fio.h clobbers our assert.h +#include + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_ + +using namespace std; + +namespace { + +/// fio configuration options read from the job file +struct Options { + thread_data* td; + char* conf; + char* perf_output_file; + char* throttle_values; + char* deferred_throttle_values; + unsigned long long + cycle_throttle_period, + oi_attr_len_low, + oi_attr_len_high, + snapset_attr_len_low, + snapset_attr_len_high, + pglog_omap_len_low, + pglog_omap_len_high, + pglog_dup_omap_len_low, + pglog_dup_omap_len_high, + _fastinfo_omap_len_low, + _fastinfo_omap_len_high; + unsigned simulate_pglog; + unsigned single_pool_mode; + unsigned preallocate_files; + unsigned check_files; +}; + +template // void Func(fio_option&) +fio_option make_option(Func&& func) +{ + // zero-initialize and set common defaults + auto o = fio_option{}; + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_RBD; + func(std::ref(o)); + return o; +} + +static std::vector ceph_options{ + make_option([] (fio_option& o) { + o.name = "conf"; + o.lname = "ceph configuration file"; + o.type = FIO_OPT_STR_STORE; + o.help = "Path to a ceph configuration file"; + o.off1 = offsetof(Options, conf); + }), + make_option([] (fio_option& o) { + o.name = "perf_output_file"; + o.lname = "perf output target"; + o.type = FIO_OPT_STR_STORE; + o.help = "Path to which to write json formatted perf output"; + o.off1 = offsetof(Options, perf_output_file); + o.def = 0; + }), + make_option([] (fio_option& o) { + o.name = "oi_attr_len"; + o.lname = "OI Attr length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set OI(aka '_') attribute to specified length"; + o.off1 = offsetof(Options, oi_attr_len_low); + o.off2 = offsetof(Options, oi_attr_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "snapset_attr_len"; + o.lname = "Attr 'snapset' length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set 'snapset' attribute to specified length"; + o.off1 = offsetof(Options, snapset_attr_len_low); + o.off2 = offsetof(Options, snapset_attr_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "_fastinfo_omap_len"; + o.lname = "'_fastinfo' omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set '_fastinfo' OMAP attribute to specified length"; + o.off1 = offsetof(Options, _fastinfo_omap_len_low); + o.off2 = offsetof(Options, _fastinfo_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "pglog_simulation"; + o.lname = "pglog behavior simulation"; + o.type = FIO_OPT_BOOL; + o.help = "Enables PG Log simulation behavior"; + o.off1 = offsetof(Options, simulate_pglog); + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "pglog_omap_len"; + o.lname = "pglog omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set pglog omap entry to specified length"; + o.off1 = offsetof(Options, pglog_omap_len_low); + o.off2 = offsetof(Options, pglog_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "pglog_dup_omap_len"; + o.lname = "uplicate pglog omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set duplicate pglog omap entry to specified length"; + o.off1 = offsetof(Options, pglog_dup_omap_len_low); + o.off2 = offsetof(Options, pglog_dup_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "single_pool_mode"; + o.lname = "single(shared among jobs) pool mode"; + o.type = FIO_OPT_BOOL; + o.help = "Enables the mode when all jobs run against the same pool"; + o.off1 = offsetof(Options, single_pool_mode); + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "preallocate_files"; + o.lname = "preallocate files on init"; + o.type = FIO_OPT_BOOL; + o.help = "Enables/disables file preallocation (touch and resize) on init"; + o.off1 = offsetof(Options, preallocate_files); + o.def = "1"; + }), + make_option([] (fio_option& o) { + o.name = "check_files"; + o.lname = "ensure files exist and are correct on init"; + o.type = FIO_OPT_BOOL; + o.help = "Enables/disables checking of files on init"; + o.off1 = offsetof(Options, check_files); + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "bluestore_throttle"; + o.lname = "set bluestore throttle"; + o.type = FIO_OPT_STR_STORE; + o.help = "comma delimited list of throttle values", + o.off1 = offsetof(Options, throttle_values); + o.def = 0; + }), + make_option([] (fio_option& o) { + o.name = "bluestore_deferred_throttle"; + o.lname = "set bluestore deferred throttle"; + o.type = FIO_OPT_STR_STORE; + o.help = "comma delimited list of throttle values", + o.off1 = offsetof(Options, deferred_throttle_values); + o.def = 0; + }), + make_option([] (fio_option& o) { + o.name = "vary_bluestore_throttle_period"; + o.lname = "period between different throttle values"; + o.type = FIO_OPT_STR_VAL; + o.help = "set to non-zero value to periodically cycle through throttle options"; + o.off1 = offsetof(Options, cycle_throttle_period); + o.def = "0"; + o.minval = 0; + }), + {} // fio expects a 'null'-terminated list +}; + + +struct Collection { + spg_t pg; + coll_t cid; + ObjectStore::CollectionHandle ch; + // Can't use mutex directly in vectors hence dynamic allocation + + std::unique_ptr lock; + uint64_t pglog_ver_head = 1; + uint64_t pglog_ver_tail = 1; + uint64_t pglog_dup_ver_tail = 1; + + // use big pool ids to avoid clashing with existing collections + static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff; + + Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch) + : pg(pg), cid(pg), ch(_ch), + lock(new std::mutex) { + } +}; + +int destroy_collections( + std::unique_ptr& os, + std::vector& collections) +{ + ObjectStore::Transaction t; + bool failed = false; + // remove our collections + for (auto& coll : collections) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.remove(coll.cid, pgmeta_oid); + t.remove_collection(coll.cid); + int r = os->queue_transaction(coll.ch, std::move(t)); + if (r && !failed) { + derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl; + failed = true; + } + } + return 0; +} + +int init_collections(std::unique_ptr& os, + uint64_t pool, + std::vector& collections, + uint64_t count) +{ + ceph_assert(count > 0); + collections.reserve(count); + + const int split_bits = cbits(count - 1); + + { + // propagate Superblock object to ensure proper functioning of tools that + // need it. E.g. ceph-objectstore-tool + coll_t cid(coll_t::meta()); + bool exists = os->collection_exists(cid); + if (!exists) { + auto ch = os->create_new_collection(cid); + + OSDSuperblock superblock; + bufferlist bl; + encode(superblock, bl); + + ObjectStore::Transaction t; + t.create_collection(cid, split_bits); + t.write(cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); + int r = os->queue_transaction(ch, std::move(t)); + + if (r < 0) { + derr << "Failure to write OSD superblock: " << cpp_strerror(-r) << dendl; + return r; + } + } + } + + for (uint32_t i = 0; i < count; i++) { + auto pg = spg_t{pg_t{i, pool}}; + coll_t cid(pg); + + bool exists = os->collection_exists(cid); + auto ch = exists ? + os->open_collection(cid) : + os->create_new_collection(cid) ; + + collections.emplace_back(pg, ch); + + ObjectStore::Transaction t; + auto& coll = collections.back(); + if (!exists) { + t.create_collection(coll.cid, split_bits); + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.touch(coll.cid, pgmeta_oid); + int r = os->queue_transaction(coll.ch, std::move(t)); + if (r) { + derr << "Engine init failed with " << cpp_strerror(-r) << dendl; + destroy_collections(os, collections); + return r; + } + } + } + return 0; +} + +/// global engine state shared between all jobs within the process. this +/// includes g_ceph_context and the ObjectStore instance +struct Engine { + /// the initial g_ceph_context reference to be dropped on destruction + boost::intrusive_ptr cct; + std::unique_ptr os; + + std::vector collections; //< shared collections to spread objects over + + std::mutex lock; + int ref_count; + const bool unlink; //< unlink objects on destruction + + // file to which to output formatted perf information + const std::optional perf_output_file; + + explicit Engine(thread_data* td); + ~Engine(); + + static Engine* get_instance(thread_data* td) { + // note: creates an Engine with the options associated with the first job + static Engine engine(td); + return &engine; + } + + void ref() { + std::lock_guard l(lock); + ++ref_count; + } + void deref() { + std::lock_guard l(lock); + --ref_count; + if (!ref_count) { + ostringstream ostr; + Formatter* f = Formatter::create( + "json-pretty", "json-pretty", "json-pretty"); + f->open_object_section("perf_output"); + cct->get_perfcounters_collection()->dump_formatted(f, false, false); + if (g_conf()->rocksdb_perf) { + f->open_object_section("rocksdb_perf"); + os->get_db_statistics(f); + f->close_section(); + } + mempool::dump(f); + { + f->open_object_section("db_histogram"); + os->generate_db_histogram(f); + f->close_section(); + } + f->close_section(); + + f->flush(ostr); + delete f; + + if (unlink) { + destroy_collections(os, collections); + } + os->umount(); + dout(0) << "FIO plugin perf dump:" << dendl; + dout(0) << ostr.str() << dendl; + if (perf_output_file) { + try { + std::ofstream foutput(*perf_output_file); + foutput << ostr.str() << std::endl; + } catch (std::exception &e) { + std::cerr << "Unable to write formatted output to " + << *perf_output_file + << ", exception: " << e.what() + << std::endl; + } + } + } + } +}; + +TracepointProvider::Traits bluestore_tracepoint_traits("libbluestore_tp.so", + "bluestore_tracing"); + +Engine::Engine(thread_data* td) + : ref_count(0), + unlink(td->o.unlink), + perf_output_file( + static_cast(td->eo)->perf_output_file ? + std::make_optional(static_cast(td->eo)->perf_output_file) : + std::nullopt) +{ + // add the ceph command line arguments + auto o = static_cast(td->eo); + if (!o->conf) { + throw std::runtime_error("missing conf option for ceph configuration file"); + } + std::vector args{ + "-i", "0", // identify as osd.0 for osd_data and osd_journal + "--conf", o->conf, // use the requested conf file + }; + if (td->o.directory) { // allow conf files to use ${fio_dir} for data + args.emplace_back("--fio_dir"); + args.emplace_back(td->o.directory); + } + + // claim the g_ceph_context reference and release it on destruction + cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + + TracepointProvider::initialize(g_ceph_context); + + // create the ObjectStore + os = ObjectStore::create(g_ceph_context, + g_conf().get_val("osd objectstore"), + g_conf().get_val("osd data"), + g_conf().get_val("osd journal")); + if (!os) + throw std::runtime_error("bad objectstore type " + g_conf()->osd_objectstore); + + unsigned num_shards; + if(g_conf()->osd_op_num_shards) + num_shards = g_conf()->osd_op_num_shards; + else if(os->is_rotational()) + num_shards = g_conf()->osd_op_num_shards_hdd; + else + num_shards = g_conf()->osd_op_num_shards_ssd; + os->set_cache_shards(num_shards); + + //normalize options + o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high); + o->snapset_attr_len_high = max(o->snapset_attr_len_low, + o->snapset_attr_len_high); + o->pglog_omap_len_high = max(o->pglog_omap_len_low, + o->pglog_omap_len_high); + o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low, + o->pglog_dup_omap_len_high); + o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low, + o->_fastinfo_omap_len_high); + + int r = os->mkfs(); + if (r < 0) + throw std::system_error(-r, std::system_category(), "mkfs failed"); + + r = os->mount(); + if (r < 0) + throw std::system_error(-r, std::system_category(), "mount failed"); + + // create shared collections up to osd_pool_default_pg_num + if (o->single_pool_mode) { + uint64_t count = g_conf().get_val("osd_pool_default_pg_num"); + if (count > td->o.nr_files) + count = td->o.nr_files; + init_collections(os, Collection::MIN_POOL_ID, collections, count); + } +} + +Engine::~Engine() +{ + ceph_assert(!ref_count); +} + +struct Object { + ghobject_t oid; + Collection& coll; + + Object(const char* name, Collection& coll) + : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")), + coll(coll) {} +}; + +/// treat each fio job either like a separate pool with its own collections and objects +/// or just a client using its own objects from the shared pool +struct Job { + Engine* engine; //< shared ptr to the global Engine + const unsigned subjob_number; //< subjob num + std::vector collections; //< job's private collections to spread objects over + std::vector objects; //< associate an object with each fio_file + std::vector events; //< completions for fio_ceph_os_event() + const bool unlink; //< unlink objects on destruction + + bufferptr one_for_all_data; //< preallocated buffer long enough + //< to use for vairious operations + std::mutex throttle_lock; + const vector throttle_values; + const vector deferred_throttle_values; + std::chrono::duration cycle_throttle_period; + mono_clock::time_point last = ceph::mono_clock::zero(); + unsigned index = 0; + + static vector parse_throttle_str(const char *p) { + vector ret; + if (p == nullptr) { + return ret; + } + ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable { + if (s.size() > 0) { + ret.push_back(std::stoul(std::string(s))); + } + }); + return ret; + } + void check_throttle(); + + Job(Engine* engine, const thread_data* td); + ~Job(); +}; + +Job::Job(Engine* engine, const thread_data* td) + : engine(engine), + subjob_number(td->subjob_number), + events(td->o.iodepth), + unlink(td->o.unlink), + throttle_values( + parse_throttle_str(static_cast(td->eo)->throttle_values)), + deferred_throttle_values( + parse_throttle_str(static_cast(td->eo)->deferred_throttle_values)), + cycle_throttle_period( + static_cast(td->eo)->cycle_throttle_period) +{ + engine->ref(); + auto o = static_cast(td->eo); + unsigned long long max_data = max(o->oi_attr_len_high, + o->snapset_attr_len_high); + max_data = max(max_data, o->pglog_omap_len_high); + max_data = max(max_data, o->pglog_dup_omap_len_high); + max_data = max(max_data, o->_fastinfo_omap_len_high); + one_for_all_data = buffer::create(max_data); + + std::vector* colls; + // create private collections up to osd_pool_default_pg_num + if (!o->single_pool_mode) { + uint64_t count = g_conf().get_val("osd_pool_default_pg_num"); + if (count > td->o.nr_files) + count = td->o.nr_files; + // use the fio thread_number for our unique pool id + const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1; + init_collections(engine->os, pool, collections, count); + colls = &collections; + } else { + colls = &engine->collections; + } + const uint64_t file_size = td->o.size / max(1u, td->o.nr_files); + ObjectStore::Transaction t; + + // create an object for each file in the job + objects.reserve(td->o.nr_files); + unsigned checked_or_preallocated = 0; + for (uint32_t i = 0; i < td->o.nr_files; i++) { + auto f = td->files[i]; + f->real_file_size = file_size; + f->engine_pos = i; + + // associate each object with a collection in a round-robin fashion. + auto& coll = (*colls)[i % colls->size()]; + + objects.emplace_back(f->file_name, coll); + if (o->preallocate_files) { + auto& oid = objects.back().oid; + t.touch(coll.cid, oid); + t.truncate(coll.cid, oid, file_size); + int r = engine->os->queue_transaction(coll.ch, std::move(t)); + if (r) { + engine->deref(); + throw std::system_error(r, std::system_category(), "job init"); + } + } + if (o->check_files) { + auto& oid = objects.back().oid; + struct stat st; + int r = engine->os->stat(coll.ch, oid, &st); + if (r || ((unsigned)st.st_size) != file_size) { + derr << "Problem checking " << oid << ", r=" << r + << ", st.st_size=" << st.st_size + << ", file_size=" << file_size + << ", nr_files=" << td->o.nr_files << dendl; + engine->deref(); + throw std::system_error( + r, std::system_category(), "job init -- cannot check file"); + } + } + if (o->check_files || o->preallocate_files) { + ++checked_or_preallocated; + } + } + if (o->check_files) { + derr << "fio_ceph_objectstore checked " << checked_or_preallocated + << " files"<< dendl; + } + if (o->preallocate_files ){ + derr << "fio_ceph_objectstore preallocated " << checked_or_preallocated + << " files"<< dendl; + } +} + +Job::~Job() +{ + if (unlink) { + ObjectStore::Transaction t; + bool failed = false; + // remove our objects + for (auto& obj : objects) { + t.remove(obj.coll.cid, obj.oid); + int r = engine->os->queue_transaction(obj.coll.ch, std::move(t)); + if (r && !failed) { + derr << "job cleanup failed with " << cpp_strerror(-r) << dendl; + failed = true; + } + } + destroy_collections(engine->os, collections); + } + engine->deref(); +} + +void Job::check_throttle() +{ + if (subjob_number != 0) + return; + + std::lock_guard l(throttle_lock); + if (throttle_values.empty() && deferred_throttle_values.empty()) + return; + + if (ceph::mono_clock::is_zero(last) || + ((cycle_throttle_period != cycle_throttle_period.zero()) && + (ceph::mono_clock::now() - last) > cycle_throttle_period)) { + unsigned tvals = throttle_values.size() ? throttle_values.size() : 1; + unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1; + if (!throttle_values.empty()) { + std::string val = std::to_string(throttle_values[index % tvals]); + std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl; + int r = engine->cct->_conf.set_val( + "bluestore_throttle_bytes", + val, + nullptr); + ceph_assert(r == 0); + } + if (!deferred_throttle_values.empty()) { + std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]); + std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl; + int r = engine->cct->_conf.set_val( + "bluestore_throttle_deferred_bytes", + val, + nullptr); + ceph_assert(r == 0); + } + engine->cct->_conf.apply_changes(nullptr); + index++; + index %= tvals * dtvals; + last = ceph::mono_clock::now(); + } +} + +int fio_ceph_os_setup(thread_data* td) +{ + // if there are multiple jobs, they must run in the same process against a + // single instance of the ObjectStore. explicitly disable fio's default + // job-per-process configuration + td->o.use_thread = 1; + + try { + // get or create the global Engine instance + auto engine = Engine::get_instance(td); + // create a Job for this thread + td->io_ops_data = new Job(engine, td); + } catch (std::exception& e) { + std::cerr << "setup failed with " << e.what() << std::endl; + return -1; + } + return 0; +} + +void fio_ceph_os_cleanup(thread_data* td) +{ + auto job = static_cast(td->io_ops_data); + td->io_ops_data = nullptr; + delete job; +} + + +io_u* fio_ceph_os_event(thread_data* td, int event) +{ + // return the requested event from fio_ceph_os_getevents() + auto job = static_cast(td->io_ops_data); + return job->events[event]; +} + +int fio_ceph_os_getevents(thread_data* td, unsigned int min, + unsigned int max, const timespec* t) +{ + auto job = static_cast(td->io_ops_data); + unsigned int events = 0; + io_u* u = NULL; + unsigned int i = 0; + + // loop through inflight ios until we find 'min' completions + do { + io_u_qiter(&td->io_u_all, u, i) { + if (!(u->flags & IO_U_F_FLIGHT)) + continue; + + if (u->engine_data) { + u->engine_data = nullptr; + job->events[events] = u; + events++; + } + } + if (events >= min) + break; + usleep(100); + } while (1); + + return events; +} + +/// completion context for ObjectStore::queue_transaction() +class UnitComplete : public Context { + io_u* u; + public: + explicit UnitComplete(io_u* u) : u(u) {} + void finish(int r) { + // mark the pointer to indicate completion for fio_ceph_os_getevents() + u->engine_data = reinterpret_cast(1ull); + } +}; + +enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u) +{ + fio_ro_check(td, u); + + + + auto o = static_cast(td->eo); + auto job = static_cast(td->io_ops_data); + auto& object = job->objects[u->file->engine_pos]; + auto& coll = object.coll; + auto& os = job->engine->os; + + job->check_throttle(); + + if (u->ddir == DDIR_WRITE) { + // provide a hint if we're likely to read this data back + const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0; + + bufferlist bl; + bl.push_back(buffer::copy(reinterpret_cast(u->xfer_buf), + u->xfer_buflen ) ); + + map> attrset; + map omaps; + // enqueue a write transaction on the collection's handle + ObjectStore::Transaction t; + char ver_key[64]; + + // fill attrs if any + if (o->oi_attr_len_high) { + ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->oi_attr_len_low, o->oi_attr_len_high)); + attrset["_"] = job->one_for_all_data; + } + if (o->snapset_attr_len_high) { + ceph_assert(o->snapset_attr_len_high >= o->snapset_attr_len_low); + job->one_for_all_data.set_length( + ceph::util::generate_random_number + (o->snapset_attr_len_low, o->snapset_attr_len_high)); + attrset["snapset"] = job->one_for_all_data; + + } + if (o->_fastinfo_omap_len_high) { + ceph_assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high)); + omaps["_fastinfo"].append(job->one_for_all_data); + } + + uint64_t pglog_trim_head = 0, pglog_trim_tail = 0; + uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0; + if (o->simulate_pglog) { + + uint64_t pglog_ver_cnt = 0; + { + std::lock_guard l(*coll.lock); + pglog_ver_cnt = coll.pglog_ver_head++; + if (o->pglog_omap_len_high && + pglog_ver_cnt >= + coll.pglog_ver_tail + + g_conf()->osd_min_pg_log_entries + g_conf()->osd_pg_log_trim_min) { + pglog_trim_tail = coll.pglog_ver_tail; + coll.pglog_ver_tail = pglog_trim_head = + pglog_trim_tail + g_conf()->osd_pg_log_trim_min; + + if (o->pglog_dup_omap_len_high && + pglog_ver_cnt >= + coll.pglog_dup_ver_tail + g_conf()->osd_pg_log_dups_tracked + + g_conf()->osd_pg_log_trim_min) { + pglog_dup_trim_tail = coll.pglog_dup_ver_tail; + coll.pglog_dup_ver_tail = pglog_dup_trim_head = + pglog_dup_trim_tail + g_conf()->osd_pg_log_trim_min; + } + } + } + + if (o->pglog_omap_len_high) { + ceph_assert(o->pglog_omap_len_high >= o->pglog_omap_len_low); + snprintf(ver_key, sizeof(ver_key), + "0000000011.%020llu", (unsigned long long)pglog_ver_cnt); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->pglog_omap_len_low, o->pglog_omap_len_high)); + omaps[ver_key].append(job->one_for_all_data); + } + if (o->pglog_dup_omap_len_high) { + //insert dup + ceph_assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low); + for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "dup_0000000011.%020llu", (unsigned long long)i); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high)); + omaps[ver_key].append(job->one_for_all_data); + } + } + } + + if (!attrset.empty()) { + t.setattrs(coll.cid, object.oid, attrset); + } + t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags); + + set rmkeys; + for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "0000000011.%020llu", (unsigned long long)i); + rmkeys.emplace(ver_key); + } + for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "dup_0000000011.%020llu", (unsigned long long)i); + rmkeys.emplace(ver_key); + } + + if (rmkeys.size()) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys); + } + + if (omaps.size()) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.omap_setkeys(coll.cid, pgmeta_oid, omaps); + } + t.register_on_commit(new UnitComplete(u)); + os->queue_transaction(coll.ch, + std::move(t)); + return FIO_Q_QUEUED; + } + + if (u->ddir == DDIR_READ) { + // ObjectStore reads are synchronous, so make the call and return COMPLETED + bufferlist bl; + int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl); + if (r < 0) { + u->error = r; + td_verror(td, u->error, "xfer"); + } else { + bl.begin().copy(bl.length(), static_cast(u->xfer_buf)); + u->resid = u->xfer_buflen - r; + } + return FIO_Q_COMPLETED; + } + + derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl; + u->error = -EINVAL; + td_verror(td, u->error, "xfer"); + return FIO_Q_COMPLETED; +} + +int fio_ceph_os_commit(thread_data* td) +{ + // commit() allows the engine to batch up queued requests to be submitted all + // at once. it would be natural for queue() to collect transactions in a list, + // and use commit() to pass them all to ObjectStore::queue_transactions(). but + // because we spread objects over multiple collections, we a) need to use a + // different sequencer for each collection, and b) are less likely to see a + // benefit from batching requests within a collection + return 0; +} + +// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to +// prevent fio from creating the files +int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; } +int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; } + +int fio_ceph_os_io_u_init(thread_data* td, io_u* u) +{ + // no data is allocated, we just use the pointer as a boolean 'completed' flag + u->engine_data = nullptr; + return 0; +} + +void fio_ceph_os_io_u_free(thread_data* td, io_u* u) +{ + u->engine_data = nullptr; +} + + +// ioengine_ops for get_ioengine() +struct ceph_ioengine : public ioengine_ops { + ceph_ioengine() : ioengine_ops({}) { + name = "ceph-os"; + version = FIO_IOOPS_VERSION; + flags = FIO_DISKLESSIO; + setup = fio_ceph_os_setup; + queue = fio_ceph_os_queue; + commit = fio_ceph_os_commit; + getevents = fio_ceph_os_getevents; + event = fio_ceph_os_event; + cleanup = fio_ceph_os_cleanup; + open_file = fio_ceph_os_open; + close_file = fio_ceph_os_close; + io_u_init = fio_ceph_os_io_u_init; + io_u_free = fio_ceph_os_io_u_free; + options = ceph_options.data(); + option_struct_size = sizeof(struct Options); + } +}; + +} // anonymous namespace + +extern "C" { +// the exported fio engine interface +void get_ioengine(struct ioengine_ops** ioengine_ptr) { + static ceph_ioengine ioengine; + *ioengine_ptr = &ioengine; +} +} // extern "C" -- cgit v1.2.3