summaryrefslogtreecommitdiffstats
path: root/src/test/fio
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/fio')
-rw-r--r--src/test/fio/CMakeLists.txt16
-rw-r--r--src/test/fio/README.md88
-rw-r--r--src/test/fio/ceph-bluestore.conf20
-rw-r--r--src/test/fio/ceph-bluestore.fio39
-rw-r--r--src/test/fio/ceph-filestore.conf26
-rw-r--r--src/test/fio/ceph-filestore.fio17
-rw-r--r--src/test/fio/ceph-memstore.conf18
-rw-r--r--src/test/fio/ceph-memstore.fio17
-rw-r--r--src/test/fio/fio_ceph_objectstore.cc772
9 files changed, 1013 insertions, 0 deletions
diff --git a/src/test/fio/CMakeLists.txt b/src/test/fio/CMakeLists.txt
new file mode 100644
index 00000000..81783d45
--- /dev/null
+++ b/src/test/fio/CMakeLists.txt
@@ -0,0 +1,16 @@
+add_library(fio_ceph_objectstore SHARED fio_ceph_objectstore.cc)
+target_include_directories(fio_ceph_objectstore SYSTEM PUBLIC ${FIO_INCLUDE_DIR})
+
+# prevent fio from adding a 'typedef int bool'
+set(FIO_CFLAGS "-DCONFIG_HAVE_BOOL")
+
+# fio headers use typeof(), which requires c++11 extensions
+set_target_properties(fio_ceph_objectstore PROPERTIES
+ CXX_EXTENSIONS ON
+ COMPILE_FLAGS "${FIO_CFLAGS}")
+
+if(WITH_FIO)
+ add_dependencies(fio_ceph_objectstore fio_ext)
+endif()
+target_link_libraries(fio_ceph_objectstore os global)
+install(TARGETS fio_ceph_objectstore DESTINATION lib)
diff --git a/src/test/fio/README.md b/src/test/fio/README.md
new file mode 100644
index 00000000..a2f67a38
--- /dev/null
+++ b/src/test/fio/README.md
@@ -0,0 +1,88 @@
+FIO
+===
+
+Ceph uses the fio workload generator and benchmarking utility.
+(https://github.com/axboe/fio.git)
+
+FIO tool is automatically fetched to build/src/fio, and build if necessary.
+
+RBD
+---
+
+The fio engine for rbd is located in the fio tree itself, so you'll need to
+build it from source.
+
+If you install the ceph libraries to a location that isn't in your
+LD_LIBRARY_PATH, be sure to add it:
+
+ export LD_LIBRARY_PATH=/path/to/install/lib
+
+To build fio with rbd:
+
+ ./configure --extra-cflags="-I/path/to/install/include -L/path/to/install/lib"
+ make
+
+If configure fails with "Rados Block Device engine no", see config.log for
+details and adjust the cflags as necessary.
+
+If ceph was compiled with tcmalloc, it may be necessary to compile fio with:
+ make EXTLIBS=tcmalloc
+Otherwise fio might crash in malloc_usable_size().
+
+To view the fio options specific to the rbd engine:
+
+ ./fio --enghelp=rbd
+
+See examples/rbd.fio for an example job file. To run:
+
+ ./fio examples/rbd.fio
+
+ObjectStore
+-----------
+
+This fio engine allows you to mount and use a ceph object store directly,
+without having to build a ceph cluster or start any daemons.
+
+Because the ObjectStore is not a public-facing interface, we build it inside
+of the ceph tree and load libfio_ceph_objectstore.so into fio as an external
+engine.
+
+To build fio_ceph_objectstore run:
+```
+ ./do_cmake.sh -DWITH_FIO=ON
+ cd build
+ make fio_ceph_objectstore
+```
+This will fetch FIO to build/src/fio directory,
+compile fio tool and libfio_ceph_objectstore.so.
+
+If you install the ceph libraries to a location that isn't in your
+LD_LIBRARY_PATH, be sure to add it:
+
+ export LD_LIBRARY_PATH=/path/to/install/lib
+
+To view the fio options specific to the objectstore engine:
+
+ ./fio --enghelp=libfio_ceph_objectstore.so
+
+The conf= option requires a ceph configuration file (ceph.conf). Example job
+and conf files for each object store are provided in the same directory as
+this README.
+
+To run:
+
+ ./fio /path/to/job.fio
+
+RADOS
+-----
+
+By default FIO can be compiled with support for RADOS.
+When ceph is installed in your system default compilation of FIO includes RADOS ioengine.
+If you installed ceph in any other place (cmake -DCMAKE_INSTALL_PREFIX=${CEPH_INSTALL_ROOT} ..) you can build FIO following way:
+
+ LIBS="-lrados -ltcmalloc" LDFLAGS="-L${CEPH_INSTALL_ROOT}/lib" EXTFLAGS="-I${CEPH_INSTALL_ROOT}/include" \
+ rados=yes ./configure
+ LIBS="-lrados -ltcmalloc" LDFLAGS="-L${CEPH_INSTALL_ROOT}/lib" EXTFLAGS="-I${CEPH_INSTALL_ROOT}/include" \
+ rados=yes make
+
+"-ltcmalloc" is necessary if ceph was compiled with tcmalloc.
diff --git a/src/test/fio/ceph-bluestore.conf b/src/test/fio/ceph-bluestore.conf
new file mode 100644
index 00000000..6dd4f1af
--- /dev/null
+++ b/src/test/fio/ceph-bluestore.conf
@@ -0,0 +1,20 @@
+# example configuration file for ceph-bluestore.fio
+
+[global]
+ debug bluestore = 0/0
+ debug bluefs = 0/0
+ debug bdev = 0/0
+ debug rocksdb = 0/0
+ # spread objects over 8 collections
+ osd pool default pg num = 8
+ # increasing shards can help when scaling number of collections
+ osd op num shards = 5
+
+[osd]
+ osd objectstore = bluestore
+
+ # use directory= option from fio job file
+ osd data = ${fio_dir}
+
+ # log inside fio_dir
+ log file = ${fio_dir}/log
diff --git a/src/test/fio/ceph-bluestore.fio b/src/test/fio/ceph-bluestore.fio
new file mode 100644
index 00000000..dbadf701
--- /dev/null
+++ b/src/test/fio/ceph-bluestore.fio
@@ -0,0 +1,39 @@
+# Runs a 64k random write test against the ceph BlueStore.
+[global]
+ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH
+
+conf=ceph-bluestore.conf # must point to a valid ceph configuration file
+directory=/mnt/fio-bluestore # directory for osd_data
+
+#oi_attr_len=350-4000 # specifies OI(aka '_') attribute length range to couple
+ # writes with. Default: 0 (disabled)
+
+#snapset_attr_len=35 # specifies snapset attribute length range to couple
+ # writes with. Default: 0 (disabled)
+
+#_fastinfo_omap_len=186 # specifies _fastinfo omap entry length range to
+ # couple writes with. Default: 0 (disabled)
+
+#pglog_simulation=1 # couples write and omap generation in OSD PG log manner.
+ # Ceph's osd_min_pg_log_entries, osd_pg_log_trim_min,
+ # osd_pg_log_dups_tracked settings control cyclic
+ # omap keys creation/removal.
+ # Following additional FIO pglog_ settings to apply too:
+
+#pglog_omap_len=173 # specifies PG log entry length range to couple
+ # writes with. Default: 0 (disabled)
+
+#pglog_dup_omap_len=57 # specifies duplicate PG log entry length range
+ # to couple writes with. Default: 0 (disabled)
+#single_pool_mode=0 # Enables the mode when all jobs run against for the same pool.
+
+rw=randwrite
+iodepth=16
+
+time_based=1
+runtime=20s
+
+[bluestore]
+nr_files=64
+size=256m
+bs=64k
diff --git a/src/test/fio/ceph-filestore.conf b/src/test/fio/ceph-filestore.conf
new file mode 100644
index 00000000..06266656
--- /dev/null
+++ b/src/test/fio/ceph-filestore.conf
@@ -0,0 +1,26 @@
+# example configuration file for ceph-filestore.fio
+
+[global]
+ debug filestore = 0/0
+ debug journal = 0/0
+
+ # spread objects over 8 collections
+ osd pool default pg num = 8
+ # increasing shards can help when scaling number of collections
+ osd op num shards = 5
+
+ filestore fd cache size = 32
+
+[osd]
+ osd objectstore = filestore
+
+ # use directory= option from fio job file
+ osd data = ${fio_dir}
+
+ # journal inside fio_dir
+ osd journal = ${fio_dir}/journal
+ osd journal size = 500
+ journal force aio = 1
+
+ # log outside fio_dir
+ log file = ${fio_dir}.log
diff --git a/src/test/fio/ceph-filestore.fio b/src/test/fio/ceph-filestore.fio
new file mode 100644
index 00000000..bb93c8df
--- /dev/null
+++ b/src/test/fio/ceph-filestore.fio
@@ -0,0 +1,17 @@
+# Runs a 64k random write test against the ceph FileStore.
+[global]
+ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH
+
+conf=ceph-filestore.conf # must point to a valid ceph configuration file
+directory=/mnt/fio-filestore # directory for osd_data
+
+rw=randwrite
+iodepth=16
+
+time_based=1
+runtime=20s
+
+[filestore]
+nr_files=64
+size=256m
+bs=64k
diff --git a/src/test/fio/ceph-memstore.conf b/src/test/fio/ceph-memstore.conf
new file mode 100644
index 00000000..3553d1bc
--- /dev/null
+++ b/src/test/fio/ceph-memstore.conf
@@ -0,0 +1,18 @@
+# example configuration file for ceph-memstore.fio
+
+[global]
+ debug filestore = 0
+
+ # spread objects over 8 collections
+ osd pool default pg num = 8
+ # increasing shards can help when scaling number of collections
+ osd op num shards = 5
+
+[osd]
+ osd objectstore = memstore
+
+ # use directory= option from fio job file
+ osd data = ${fio_dir}
+
+ # log inside fio_dir
+ log file = ${fio_dir}/log
diff --git a/src/test/fio/ceph-memstore.fio b/src/test/fio/ceph-memstore.fio
new file mode 100644
index 00000000..ceb6671d
--- /dev/null
+++ b/src/test/fio/ceph-memstore.fio
@@ -0,0 +1,17 @@
+# Runs a 64k random write test against the ceph MemStore.
+[global]
+ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH
+
+conf=ceph-memstore.conf # must point to a valid ceph configuration file
+directory=/mnt/fio-memstore # directory for osd_data
+
+rw=randwrite
+iodepth=16
+
+time_based=1
+runtime=20s
+
+[memstore]
+nr_files=64
+size=256m
+bs=64k
diff --git a/src/test/fio/fio_ceph_objectstore.cc b/src/test/fio/fio_ceph_objectstore.cc
new file mode 100644
index 00000000..b8c29624
--- /dev/null
+++ b/src/test/fio/fio_ceph_objectstore.cc
@@ -0,0 +1,772 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph ObjectStore engine
+ *
+ * IO engine using Ceph's ObjectStore class to test low-level performance of
+ * Ceph OSDs.
+ *
+ */
+
+#include <memory>
+#include <system_error>
+#include <vector>
+
+#include "os/ObjectStore.h"
+#include "global/global_init.h"
+#include "common/errno.h"
+#include "include/intarith.h"
+#include "include/stringify.h"
+#include "include/random.h"
+#include "common/perf_counters.h"
+
+#include <fio.h>
+#include <optgroup.h>
+
+#include "include/ceph_assert.h" // fio.h clobbers our assert.h
+#include <algorithm>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_
+
+namespace {
+
+/// fio configuration options read from the job file
+struct Options {
+ thread_data* td;
+ char* conf;
+ unsigned long long
+ oi_attr_len_low,
+ oi_attr_len_high,
+ snapset_attr_len_low,
+ snapset_attr_len_high,
+ pglog_omap_len_low,
+ pglog_omap_len_high,
+ pglog_dup_omap_len_low,
+ pglog_dup_omap_len_high,
+ _fastinfo_omap_len_low,
+ _fastinfo_omap_len_high;
+ unsigned simulate_pglog;
+ unsigned single_pool_mode;
+ unsigned preallocate_files;
+};
+
+template <class Func> // void Func(fio_option&)
+fio_option make_option(Func&& func)
+{
+ // zero-initialize and set common defaults
+ auto o = fio_option{};
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_RBD;
+ func(std::ref(o));
+ return o;
+}
+
+static std::vector<fio_option> ceph_options{
+ make_option([] (fio_option& o) {
+ o.name = "conf";
+ o.lname = "ceph configuration file";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "Path to a ceph configuration file";
+ o.off1 = offsetof(Options, conf);
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "oi_attr_len";
+ o.lname = "OI Attr length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set OI(aka '_') attribute to specified length";
+ o.off1 = offsetof(Options, oi_attr_len_low);
+ o.off2 = offsetof(Options, oi_attr_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "snapset_attr_len";
+ o.lname = "Attr 'snapset' length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set 'snapset' attribute to specified length";
+ o.off1 = offsetof(Options, snapset_attr_len_low);
+ o.off2 = offsetof(Options, snapset_attr_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "_fastinfo_omap_len";
+ o.lname = "'_fastinfo' omap entry length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set '_fastinfo' OMAP attribute to specified length";
+ o.off1 = offsetof(Options, _fastinfo_omap_len_low);
+ o.off2 = offsetof(Options, _fastinfo_omap_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "pglog_simulation";
+ o.lname = "pglog behavior simulation";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables PG Log simulation behavior";
+ o.off1 = offsetof(Options, simulate_pglog);
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "pglog_omap_len";
+ o.lname = "pglog omap entry length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set pglog omap entry to specified length";
+ o.off1 = offsetof(Options, pglog_omap_len_low);
+ o.off2 = offsetof(Options, pglog_omap_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "pglog_dup_omap_len";
+ o.lname = "uplicate pglog omap entry length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set duplicate pglog omap entry to specified length";
+ o.off1 = offsetof(Options, pglog_dup_omap_len_low);
+ o.off2 = offsetof(Options, pglog_dup_omap_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "single_pool_mode";
+ o.lname = "single(shared among jobs) pool mode";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables the mode when all jobs run against the same pool";
+ o.off1 = offsetof(Options, single_pool_mode);
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "preallocate_files";
+ o.lname = "preallocate files on init";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables/disables file preallocation (touch and resize) on init";
+ o.off1 = offsetof(Options, preallocate_files);
+ o.def = "1";
+ }),
+ {} // fio expects a 'null'-terminated list
+};
+
+
+struct Collection {
+ spg_t pg;
+ coll_t cid;
+ ObjectStore::CollectionHandle ch;
+ // Can't use mutex directly in vectors hence dynamic allocation
+
+ std::unique_ptr<std::mutex> lock;
+ uint64_t pglog_ver_head = 1;
+ uint64_t pglog_ver_tail = 1;
+ uint64_t pglog_dup_ver_tail = 1;
+
+ // use big pool ids to avoid clashing with existing collections
+ static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
+
+ Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch)
+ : pg(pg), cid(pg), ch(_ch),
+ lock(new std::mutex) {
+ }
+};
+
+int destroy_collections(
+ std::unique_ptr<ObjectStore>& os,
+ std::vector<Collection>& collections)
+{
+ ObjectStore::Transaction t;
+ bool failed = false;
+ // remove our collections
+ for (auto& coll : collections) {
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.remove(coll.cid, pgmeta_oid);
+ t.remove_collection(coll.cid);
+ int r = os->queue_transaction(coll.ch, std::move(t));
+ if (r && !failed) {
+ derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
+ failed = true;
+ }
+ }
+ return 0;
+}
+
+int init_collections(std::unique_ptr<ObjectStore>& os,
+ uint64_t pool,
+ std::vector<Collection>& collections,
+ uint64_t count)
+{
+ ceph_assert(count > 0);
+ collections.reserve(count);
+
+ const int split_bits = cbits(count - 1);
+
+ {
+ // propagate Superblock object to ensure proper functioning of tools that
+ // need it. E.g. ceph-objectstore-tool
+ coll_t cid(coll_t::meta());
+ bool exists = os->collection_exists(cid);
+ if (!exists) {
+ auto ch = os->create_new_collection(cid);
+
+ OSDSuperblock superblock;
+ bufferlist bl;
+ encode(superblock, bl);
+
+ ObjectStore::Transaction t;
+ t.create_collection(cid, split_bits);
+ t.write(cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
+ int r = os->queue_transaction(ch, std::move(t));
+
+ if (r < 0) {
+ derr << "Failure to write OSD superblock: " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ }
+ }
+
+ for (uint32_t i = 0; i < count; i++) {
+ auto pg = spg_t{pg_t{i, pool}};
+ coll_t cid(pg);
+
+ bool exists = os->collection_exists(cid);
+ auto ch = exists ?
+ os->open_collection(cid) :
+ os->create_new_collection(cid) ;
+
+ collections.emplace_back(pg, ch);
+
+ ObjectStore::Transaction t;
+ auto& coll = collections.back();
+ if (!exists) {
+ t.create_collection(coll.cid, split_bits);
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.touch(coll.cid, pgmeta_oid);
+ int r = os->queue_transaction(coll.ch, std::move(t));
+ if (r) {
+ derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
+ destroy_collections(os, collections);
+ return r;
+ }
+ }
+ }
+ return 0;
+}
+
+/// global engine state shared between all jobs within the process. this
+/// includes g_ceph_context and the ObjectStore instance
+struct Engine {
+ /// the initial g_ceph_context reference to be dropped on destruction
+ boost::intrusive_ptr<CephContext> cct;
+ std::unique_ptr<ObjectStore> os;
+
+ std::vector<Collection> collections; //< shared collections to spread objects over
+
+ std::mutex lock;
+ int ref_count;
+ const bool unlink; //< unlink objects on destruction
+
+ explicit Engine(thread_data* td);
+ ~Engine();
+
+ static Engine* get_instance(thread_data* td) {
+ // note: creates an Engine with the options associated with the first job
+ static Engine engine(td);
+ return &engine;
+ }
+
+ void ref() {
+ std::lock_guard<std::mutex> l(lock);
+ ++ref_count;
+ }
+ void deref() {
+ std::lock_guard<std::mutex> l(lock);
+ --ref_count;
+ if (!ref_count) {
+ ostringstream ostr;
+ Formatter* f = Formatter::create("json-pretty", "json-pretty", "json-pretty");
+ cct->get_perfcounters_collection()->dump_formatted(f, false);
+ ostr << "FIO plugin ";
+ f->flush(ostr);
+ if (g_conf()->rocksdb_perf) {
+ os->get_db_statistics(f);
+ ostr << "FIO get_db_statistics ";
+ f->flush(ostr);
+ }
+ ostr << "Mempools: ";
+ f->open_object_section("mempools");
+ mempool::dump(f);
+ f->close_section();
+ f->flush(ostr);
+
+ ostr << "Generate db histogram: ";
+ os->generate_db_histogram(f);
+ f->flush(ostr);
+ delete f;
+
+ if (unlink) {
+ destroy_collections(os, collections);
+ }
+ os->umount();
+ dout(0) << ostr.str() << dendl;
+ }
+ }
+};
+
+Engine::Engine(thread_data* td)
+ : ref_count(0),
+ unlink(td->o.unlink)
+{
+ // add the ceph command line arguments
+ auto o = static_cast<Options*>(td->eo);
+ if (!o->conf) {
+ throw std::runtime_error("missing conf option for ceph configuration file");
+ }
+ std::vector<const char*> args{
+ "-i", "0", // identify as osd.0 for osd_data and osd_journal
+ "--conf", o->conf, // use the requested conf file
+ };
+ if (td->o.directory) { // allow conf files to use ${fio_dir} for data
+ args.emplace_back("--fio_dir");
+ args.emplace_back(td->o.directory);
+ }
+
+ // claim the g_ceph_context reference and release it on destruction
+ cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+
+ // create the ObjectStore
+ os.reset(ObjectStore::create(g_ceph_context,
+ g_conf().get_val<std::string>("osd objectstore"),
+ g_conf().get_val<std::string>("osd data"),
+ g_conf().get_val<std::string>("osd journal")));
+ if (!os)
+ throw std::runtime_error("bad objectstore type " + g_conf()->osd_objectstore);
+
+ unsigned num_shards;
+ if(g_conf()->osd_op_num_shards)
+ num_shards = g_conf()->osd_op_num_shards;
+ else if(os->is_rotational())
+ num_shards = g_conf()->osd_op_num_shards_hdd;
+ else
+ num_shards = g_conf()->osd_op_num_shards_ssd;
+ os->set_cache_shards(num_shards);
+
+ //normalize options
+ o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high);
+ o->snapset_attr_len_high = max(o->snapset_attr_len_low,
+ o->snapset_attr_len_high);
+ o->pglog_omap_len_high = max(o->pglog_omap_len_low,
+ o->pglog_omap_len_high);
+ o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low,
+ o->pglog_dup_omap_len_high);
+ o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low,
+ o->_fastinfo_omap_len_high);
+
+ int r = os->mkfs();
+ if (r < 0)
+ throw std::system_error(-r, std::system_category(), "mkfs failed");
+
+ r = os->mount();
+ if (r < 0)
+ throw std::system_error(-r, std::system_category(), "mount failed");
+
+ // create shared collections up to osd_pool_default_pg_num
+ if (o->single_pool_mode) {
+ uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
+ if (count > td->o.nr_files)
+ count = td->o.nr_files;
+ init_collections(os, Collection::MIN_POOL_ID, collections, count);
+ }
+}
+
+Engine::~Engine()
+{
+ ceph_assert(!ref_count);
+}
+
+struct Object {
+ ghobject_t oid;
+ Collection& coll;
+
+ Object(const char* name, Collection& coll)
+ : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")),
+ coll(coll) {}
+};
+
+/// treat each fio job either like a separate pool with its own collections and objects
+/// or just a client using its own objects from the shared pool
+struct Job {
+ Engine* engine; //< shared ptr to the global Engine
+ std::vector<Collection> collections; //< job's private collections to spread objects over
+ std::vector<Object> objects; //< associate an object with each fio_file
+ std::vector<io_u*> events; //< completions for fio_ceph_os_event()
+ const bool unlink; //< unlink objects on destruction
+
+ bufferptr one_for_all_data; //< preallocated buffer long enough
+ //< to use for vairious operations
+
+ Job(Engine* engine, const thread_data* td);
+ ~Job();
+};
+
+Job::Job(Engine* engine, const thread_data* td)
+ : engine(engine),
+ events(td->o.iodepth),
+ unlink(td->o.unlink)
+{
+ engine->ref();
+ auto o = static_cast<Options*>(td->eo);
+ unsigned long long max_data = max(o->oi_attr_len_high,
+ o->snapset_attr_len_high);
+ max_data = max(max_data, o->pglog_omap_len_high);
+ max_data = max(max_data, o->pglog_dup_omap_len_high);
+ max_data = max(max_data, o->_fastinfo_omap_len_high);
+ one_for_all_data = buffer::create(max_data);
+
+ std::vector<Collection>* colls;
+ // create private collections up to osd_pool_default_pg_num
+ if (!o->single_pool_mode) {
+ uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
+ if (count > td->o.nr_files)
+ count = td->o.nr_files;
+ // use the fio thread_number for our unique pool id
+ const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1;
+ init_collections(engine->os, pool, collections, count);
+ colls = &collections;
+ } else {
+ colls = &engine->collections;
+ }
+ const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
+ ObjectStore::Transaction t;
+
+ // create an object for each file in the job
+ objects.reserve(td->o.nr_files);
+ for (uint32_t i = 0; i < td->o.nr_files; i++) {
+ auto f = td->files[i];
+ f->real_file_size = file_size;
+ f->engine_pos = i;
+
+ // associate each object with a collection in a round-robin fashion.
+ auto& coll = (*colls)[i % colls->size()];
+
+ objects.emplace_back(f->file_name, coll);
+ if (o->preallocate_files) {
+ auto& oid = objects.back().oid;
+ t.touch(coll.cid, oid);
+ t.truncate(coll.cid, oid, file_size);
+ int r = engine->os->queue_transaction(coll.ch, std::move(t));
+ if (r) {
+ engine->deref();
+ throw std::system_error(r, std::system_category(), "job init");
+ }
+ }
+ }
+}
+
+Job::~Job()
+{
+ if (unlink) {
+ ObjectStore::Transaction t;
+ bool failed = false;
+ // remove our objects
+ for (auto& obj : objects) {
+ t.remove(obj.coll.cid, obj.oid);
+ int r = engine->os->queue_transaction(obj.coll.ch, std::move(t));
+ if (r && !failed) {
+ derr << "job cleanup failed with " << cpp_strerror(-r) << dendl;
+ failed = true;
+ }
+ }
+ destroy_collections(engine->os, collections);
+ }
+ engine->deref();
+}
+
+int fio_ceph_os_setup(thread_data* td)
+{
+ // if there are multiple jobs, they must run in the same process against a
+ // single instance of the ObjectStore. explicitly disable fio's default
+ // job-per-process configuration
+ td->o.use_thread = 1;
+
+ try {
+ // get or create the global Engine instance
+ auto engine = Engine::get_instance(td);
+ // create a Job for this thread
+ td->io_ops_data = new Job(engine, td);
+ } catch (std::exception& e) {
+ std::cerr << "setup failed with " << e.what() << std::endl;
+ return -1;
+ }
+ return 0;
+}
+
+void fio_ceph_os_cleanup(thread_data* td)
+{
+ auto job = static_cast<Job*>(td->io_ops_data);
+ td->io_ops_data = nullptr;
+ delete job;
+}
+
+
+io_u* fio_ceph_os_event(thread_data* td, int event)
+{
+ // return the requested event from fio_ceph_os_getevents()
+ auto job = static_cast<Job*>(td->io_ops_data);
+ return job->events[event];
+}
+
+int fio_ceph_os_getevents(thread_data* td, unsigned int min,
+ unsigned int max, const timespec* t)
+{
+ auto job = static_cast<Job*>(td->io_ops_data);
+ unsigned int events = 0;
+ io_u* u = NULL;
+ unsigned int i = 0;
+
+ // loop through inflight ios until we find 'min' completions
+ do {
+ io_u_qiter(&td->io_u_all, u, i) {
+ if (!(u->flags & IO_U_F_FLIGHT))
+ continue;
+
+ if (u->engine_data) {
+ u->engine_data = nullptr;
+ job->events[events] = u;
+ events++;
+ }
+ }
+ if (events >= min)
+ break;
+ usleep(100);
+ } while (1);
+
+ return events;
+}
+
+/// completion context for ObjectStore::queue_transaction()
+class UnitComplete : public Context {
+ io_u* u;
+ public:
+ explicit UnitComplete(io_u* u) : u(u) {}
+ void finish(int r) {
+ // mark the pointer to indicate completion for fio_ceph_os_getevents()
+ u->engine_data = reinterpret_cast<void*>(1ull);
+ }
+};
+
+enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u)
+{
+ fio_ro_check(td, u);
+
+
+
+ auto o = static_cast<const Options*>(td->eo);
+ auto job = static_cast<Job*>(td->io_ops_data);
+ auto& object = job->objects[u->file->engine_pos];
+ auto& coll = object.coll;
+ auto& os = job->engine->os;
+
+ if (u->ddir == DDIR_WRITE) {
+ // provide a hint if we're likely to read this data back
+ const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;
+
+ bufferlist bl;
+ bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf),
+ u->xfer_buflen ) );
+
+ map<string,bufferptr> attrset;
+ map<string, bufferlist> omaps;
+ // enqueue a write transaction on the collection's handle
+ ObjectStore::Transaction t;
+ char ver_key[64];
+
+ // fill attrs if any
+ if (o->oi_attr_len_high) {
+ ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low);
+ // fill with the garbage as we do not care of the actual content...
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number(
+ o->oi_attr_len_low, o->oi_attr_len_high));
+ attrset["_"] = job->one_for_all_data;
+ }
+ if (o->snapset_attr_len_high) {
+ ceph_assert(o->snapset_attr_len_high >= o->snapset_attr_len_low);
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number
+ (o->snapset_attr_len_low, o->snapset_attr_len_high));
+ attrset["snapset"] = job->one_for_all_data;
+
+ }
+ if (o->_fastinfo_omap_len_high) {
+ ceph_assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low);
+ // fill with the garbage as we do not care of the actual content...
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number(
+ o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high));
+ omaps["_fastinfo"].append(job->one_for_all_data);
+ }
+
+ uint64_t pglog_trim_head = 0, pglog_trim_tail = 0;
+ uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0;
+ if (o->simulate_pglog) {
+
+ uint64_t pglog_ver_cnt = 0;
+ {
+ std::lock_guard<std::mutex> l(*coll.lock);
+ pglog_ver_cnt = coll.pglog_ver_head++;
+ if (o->pglog_omap_len_high &&
+ pglog_ver_cnt >=
+ coll.pglog_ver_tail +
+ g_conf()->osd_min_pg_log_entries + g_conf()->osd_pg_log_trim_min) {
+ pglog_trim_tail = coll.pglog_ver_tail;
+ coll.pglog_ver_tail = pglog_trim_head =
+ pglog_trim_tail + g_conf()->osd_pg_log_trim_min;
+
+ if (o->pglog_dup_omap_len_high &&
+ pglog_ver_cnt >=
+ coll.pglog_dup_ver_tail + g_conf()->osd_pg_log_dups_tracked +
+ g_conf()->osd_pg_log_trim_min) {
+ pglog_dup_trim_tail = coll.pglog_dup_ver_tail;
+ coll.pglog_dup_ver_tail = pglog_dup_trim_head =
+ pglog_dup_trim_tail + g_conf()->osd_pg_log_trim_min;
+ }
+ }
+ }
+
+ if (o->pglog_omap_len_high) {
+ ceph_assert(o->pglog_omap_len_high >= o->pglog_omap_len_low);
+ snprintf(ver_key, sizeof(ver_key),
+ "0000000011.%020llu", (unsigned long long)pglog_ver_cnt);
+ // fill with the garbage as we do not care of the actual content...
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number(
+ o->pglog_omap_len_low, o->pglog_omap_len_high));
+ omaps[ver_key].append(job->one_for_all_data);
+ }
+ if (o->pglog_dup_omap_len_high) {
+ //insert dup
+ ceph_assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low);
+ for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
+ snprintf(ver_key, sizeof(ver_key),
+ "dup_0000000011.%020llu", (unsigned long long)i);
+ // fill with the garbage as we do not care of the actual content...
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number(
+ o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high));
+ omaps[ver_key].append(job->one_for_all_data);
+ }
+ }
+ }
+
+ if (attrset.size()) {
+ t.setattrs(coll.cid, object.oid, attrset);
+ }
+ t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
+
+ set<string> rmkeys;
+ for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
+ snprintf(ver_key, sizeof(ver_key),
+ "0000000011.%020llu", (unsigned long long)i);
+ rmkeys.emplace(ver_key);
+ }
+ for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) {
+ snprintf(ver_key, sizeof(ver_key),
+ "dup_0000000011.%020llu", (unsigned long long)i);
+ rmkeys.emplace(ver_key);
+ }
+
+ if (rmkeys.size()) {
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys);
+ }
+
+ if (omaps.size()) {
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.omap_setkeys(coll.cid, pgmeta_oid, omaps);
+ }
+ t.register_on_commit(new UnitComplete(u));
+ os->queue_transaction(coll.ch,
+ std::move(t));
+ return FIO_Q_QUEUED;
+ }
+
+ if (u->ddir == DDIR_READ) {
+ // ObjectStore reads are synchronous, so make the call and return COMPLETED
+ bufferlist bl;
+ int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl);
+ if (r < 0) {
+ u->error = r;
+ td_verror(td, u->error, "xfer");
+ } else {
+ bl.copy(0, bl.length(), static_cast<char*>(u->xfer_buf));
+ u->resid = u->xfer_buflen - r;
+ }
+ return FIO_Q_COMPLETED;
+ }
+
+ derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl;
+ u->error = -EINVAL;
+ td_verror(td, u->error, "xfer");
+ return FIO_Q_COMPLETED;
+}
+
+int fio_ceph_os_commit(thread_data* td)
+{
+ // commit() allows the engine to batch up queued requests to be submitted all
+ // at once. it would be natural for queue() to collect transactions in a list,
+ // and use commit() to pass them all to ObjectStore::queue_transactions(). but
+ // because we spread objects over multiple collections, we a) need to use a
+ // different sequencer for each collection, and b) are less likely to see a
+ // benefit from batching requests within a collection
+ return 0;
+}
+
+// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
+// prevent fio from creating the files
+int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; }
+int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; }
+
+int fio_ceph_os_io_u_init(thread_data* td, io_u* u)
+{
+ // no data is allocated, we just use the pointer as a boolean 'completed' flag
+ u->engine_data = nullptr;
+ return 0;
+}
+
+void fio_ceph_os_io_u_free(thread_data* td, io_u* u)
+{
+ u->engine_data = nullptr;
+}
+
+
+// ioengine_ops for get_ioengine()
+struct ceph_ioengine : public ioengine_ops {
+ ceph_ioengine() : ioengine_ops({}) {
+ name = "ceph-os";
+ version = FIO_IOOPS_VERSION;
+ flags = FIO_DISKLESSIO;
+ setup = fio_ceph_os_setup;
+ queue = fio_ceph_os_queue;
+ commit = fio_ceph_os_commit;
+ getevents = fio_ceph_os_getevents;
+ event = fio_ceph_os_event;
+ cleanup = fio_ceph_os_cleanup;
+ open_file = fio_ceph_os_open;
+ close_file = fio_ceph_os_close;
+ io_u_init = fio_ceph_os_io_u_init;
+ io_u_free = fio_ceph_os_io_u_free;
+ options = ceph_options.data();
+ option_struct_size = sizeof(struct Options);
+ }
+};
+
+} // anonymous namespace
+
+extern "C" {
+// the exported fio engine interface
+void get_ioengine(struct ioengine_ops** ioengine_ptr) {
+ static ceph_ioengine ioengine;
+ *ioengine_ptr = &ioengine;
+}
+} // extern "C"