summaryrefslogtreecommitdiffstats
path: root/src/test/fio
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/fio')
-rw-r--r--src/test/fio/CMakeLists.txt21
-rw-r--r--src/test/fio/README.md116
-rw-r--r--src/test/fio/ceph-bluestore.conf20
-rw-r--r--src/test/fio/ceph-bluestore.fio39
-rw-r--r--src/test/fio/ceph-filestore.conf26
-rw-r--r--src/test/fio/ceph-filestore.fio17
-rw-r--r--src/test/fio/ceph-librgw.fio28
-rw-r--r--src/test/fio/ceph-memstore.conf18
-rw-r--r--src/test/fio/ceph-memstore.fio17
-rw-r--r--src/test/fio/ceph-messenger.conf7
-rw-r--r--src/test/fio/ceph-messenger.fio22
-rw-r--r--src/test/fio/fio_ceph_messenger.cc697
-rw-r--r--src/test/fio/fio_ceph_objectstore.cc940
-rw-r--r--src/test/fio/fio_librgw.cc542
-rw-r--r--src/test/fio/ring_buffer.h102
15 files changed, 2612 insertions, 0 deletions
diff --git a/src/test/fio/CMakeLists.txt b/src/test/fio/CMakeLists.txt
new file mode 100644
index 000000000..60731a0c5
--- /dev/null
+++ b/src/test/fio/CMakeLists.txt
@@ -0,0 +1,21 @@
+# ObjectStore
+add_library(fio_ceph_objectstore SHARED fio_ceph_objectstore.cc)
+target_link_libraries(fio_ceph_objectstore fio)
+
+# Messenger
+add_library(fio_ceph_messenger SHARED fio_ceph_messenger.cc)
+target_link_libraries(fio_ceph_messenger fio)
+
+# librgw
+add_library(fio_librgw SHARED fio_librgw.cc)
+target_link_libraries(fio_librgw rgw fio)
+
+target_link_libraries(fio_ceph_objectstore os global)
+install(TARGETS fio_ceph_objectstore DESTINATION lib)
+
+target_link_libraries(fio_ceph_messenger os global)
+install(TARGETS fio_ceph_messenger DESTINATION lib)
+
+target_link_libraries(fio_librgw os global rgw)
+install(TARGETS fio_librgw DESTINATION lib)
+
diff --git a/src/test/fio/README.md b/src/test/fio/README.md
new file mode 100644
index 000000000..91a98af9e
--- /dev/null
+++ b/src/test/fio/README.md
@@ -0,0 +1,116 @@
+FIO
+===
+
+Ceph uses the fio workload generator and benchmarking utility.
+(https://github.com/axboe/fio.git)
+
+FIO tool is automatically fetched to build/src/fio, and build if necessary.
+
+RBD
+---
+
+The fio engine for rbd is located in the fio tree itself, so you'll need to
+build it from source.
+
+If you install the ceph libraries to a location that isn't in your
+LD_LIBRARY_PATH, be sure to add it:
+
+ export LD_LIBRARY_PATH=/path/to/install/lib
+
+To build fio with rbd:
+
+ ./configure --extra-cflags="-I/path/to/install/include -L/path/to/install/lib"
+ make
+
+If configure fails with "Rados Block Device engine no", see config.log for
+details and adjust the cflags as necessary.
+
+If ceph was compiled with tcmalloc, it may be necessary to compile fio with:
+ make EXTLIBS=tcmalloc
+Otherwise fio might crash in malloc_usable_size().
+
+To view the fio options specific to the rbd engine:
+
+ ./fio --enghelp=rbd
+
+See examples/rbd.fio for an example job file. To run:
+
+ ./fio examples/rbd.fio
+
+ObjectStore
+-----------
+
+This fio engine allows you to mount and use a ceph object store directly,
+without having to build a ceph cluster or start any daemons.
+
+Because the ObjectStore is not a public-facing interface, we build it inside
+of the ceph tree and load libfio_ceph_objectstore.so into fio as an external
+engine.
+
+To build fio_ceph_objectstore run:
+```
+ ./do_cmake.sh -DWITH_FIO=ON
+ cd build
+ make fio_ceph_objectstore
+```
+This will fetch FIO to build/src/fio directory,
+compile fio tool and libfio_ceph_objectstore.so.
+
+If you install the ceph libraries to a location that isn't in your
+LD_LIBRARY_PATH, be sure to add it:
+
+ export LD_LIBRARY_PATH=/path/to/install/lib
+
+To view the fio options specific to the objectstore engine:
+
+ ./fio --enghelp=libfio_ceph_objectstore.so
+
+The conf= option requires a ceph configuration file (ceph.conf). Example job
+and conf files for each object store are provided in the same directory as
+this README.
+
+To run:
+
+ ./fio /path/to/job.fio
+
+RADOS
+-----
+
+By default FIO can be compiled with support for RADOS.
+When ceph is installed in your system default compilation of FIO includes RADOS ioengine.
+If you installed ceph in any other place (cmake -DCMAKE_INSTALL_PREFIX=${CEPH_INSTALL_ROOT} ..) you can build FIO following way:
+
+ LIBS="-lrados -ltcmalloc" LDFLAGS="-L${CEPH_INSTALL_ROOT}/lib" EXTFLAGS="-I${CEPH_INSTALL_ROOT}/include" \
+ rados=yes ./configure
+ LIBS="-lrados -ltcmalloc" LDFLAGS="-L${CEPH_INSTALL_ROOT}/lib" EXTFLAGS="-I${CEPH_INSTALL_ROOT}/include" \
+ rados=yes make
+
+"-ltcmalloc" is necessary if ceph was compiled with tcmalloc.
+
+Messenger
+---------
+
+This fio engine allows you to test CEPH messenger transport layer, without
+any disk activities involved.
+
+To build fio_ceph_messenger:
+```
+ ./do_cmake.sh -DWITH_FIO=ON
+ cd build
+ make fio_ceph_messenger
+```
+If you install the ceph libraries to a location that isn't in your
+LD_LIBRARY_PATH, be sure to add it:
+
+ export LD_LIBRARY_PATH=/path/to/install/lib
+
+To view the fio options specific to the messenger engine:
+
+ ./fio --enghelp=libfio_ceph_messenger.so
+
+The ceph_conf_file= option requires a ceph configuration file (ceph.conf),
+see ceph-messenger.conf and ceph-messenger.fio for details.
+
+To run:
+
+ ./fio ./ceph-messenger.fio
diff --git a/src/test/fio/ceph-bluestore.conf b/src/test/fio/ceph-bluestore.conf
new file mode 100644
index 000000000..6dd4f1afa
--- /dev/null
+++ b/src/test/fio/ceph-bluestore.conf
@@ -0,0 +1,20 @@
+# example configuration file for ceph-bluestore.fio
+
+[global]
+ debug bluestore = 0/0
+ debug bluefs = 0/0
+ debug bdev = 0/0
+ debug rocksdb = 0/0
+ # spread objects over 8 collections
+ osd pool default pg num = 8
+ # increasing shards can help when scaling number of collections
+ osd op num shards = 5
+
+[osd]
+ osd objectstore = bluestore
+
+ # use directory= option from fio job file
+ osd data = ${fio_dir}
+
+ # log inside fio_dir
+ log file = ${fio_dir}/log
diff --git a/src/test/fio/ceph-bluestore.fio b/src/test/fio/ceph-bluestore.fio
new file mode 100644
index 000000000..dbadf701a
--- /dev/null
+++ b/src/test/fio/ceph-bluestore.fio
@@ -0,0 +1,39 @@
+# Runs a 64k random write test against the ceph BlueStore.
+[global]
+ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH
+
+conf=ceph-bluestore.conf # must point to a valid ceph configuration file
+directory=/mnt/fio-bluestore # directory for osd_data
+
+#oi_attr_len=350-4000 # specifies OI(aka '_') attribute length range to couple
+ # writes with. Default: 0 (disabled)
+
+#snapset_attr_len=35 # specifies snapset attribute length range to couple
+ # writes with. Default: 0 (disabled)
+
+#_fastinfo_omap_len=186 # specifies _fastinfo omap entry length range to
+ # couple writes with. Default: 0 (disabled)
+
+#pglog_simulation=1 # couples write and omap generation in OSD PG log manner.
+ # Ceph's osd_min_pg_log_entries, osd_pg_log_trim_min,
+ # osd_pg_log_dups_tracked settings control cyclic
+ # omap keys creation/removal.
+ # Following additional FIO pglog_ settings to apply too:
+
+#pglog_omap_len=173 # specifies PG log entry length range to couple
+ # writes with. Default: 0 (disabled)
+
+#pglog_dup_omap_len=57 # specifies duplicate PG log entry length range
+ # to couple writes with. Default: 0 (disabled)
+#single_pool_mode=0 # Enables the mode when all jobs run against for the same pool.
+
+rw=randwrite
+iodepth=16
+
+time_based=1
+runtime=20s
+
+[bluestore]
+nr_files=64
+size=256m
+bs=64k
diff --git a/src/test/fio/ceph-filestore.conf b/src/test/fio/ceph-filestore.conf
new file mode 100644
index 000000000..06266656a
--- /dev/null
+++ b/src/test/fio/ceph-filestore.conf
@@ -0,0 +1,26 @@
+# example configuration file for ceph-filestore.fio
+
+[global]
+ debug filestore = 0/0
+ debug journal = 0/0
+
+ # spread objects over 8 collections
+ osd pool default pg num = 8
+ # increasing shards can help when scaling number of collections
+ osd op num shards = 5
+
+ filestore fd cache size = 32
+
+[osd]
+ osd objectstore = filestore
+
+ # use directory= option from fio job file
+ osd data = ${fio_dir}
+
+ # journal inside fio_dir
+ osd journal = ${fio_dir}/journal
+ osd journal size = 500
+ journal force aio = 1
+
+ # log outside fio_dir
+ log file = ${fio_dir}.log
diff --git a/src/test/fio/ceph-filestore.fio b/src/test/fio/ceph-filestore.fio
new file mode 100644
index 000000000..bb93c8df6
--- /dev/null
+++ b/src/test/fio/ceph-filestore.fio
@@ -0,0 +1,17 @@
+# Runs a 64k random write test against the ceph FileStore.
+[global]
+ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH
+
+conf=ceph-filestore.conf # must point to a valid ceph configuration file
+directory=/mnt/fio-filestore # directory for osd_data
+
+rw=randwrite
+iodepth=16
+
+time_based=1
+runtime=20s
+
+[filestore]
+nr_files=64
+size=256m
+bs=64k
diff --git a/src/test/fio/ceph-librgw.fio b/src/test/fio/ceph-librgw.fio
new file mode 100644
index 000000000..fefca6f84
--- /dev/null
+++ b/src/test/fio/ceph-librgw.fio
@@ -0,0 +1,28 @@
+#
+# example jobfile, e.g.:
+# fio --max-jobs=20 /lv2tb/ceph-cp/src/test/fio/ceph-librgw.fio
+#
+[global]
+ioengine=external:/home/mbenjamin/ceph-cp/build/lib/libfio_librgw.so
+name=fiotest
+direct=0
+access_key=${AWS_ACCESS_KEY_ID}
+secret_key=${AWS_SECRET_ACCESS_KEY}
+userid=testuser
+ceph_cluster=ceph
+ceph_conf=/home/mbenjamin/ceph-cp/build/ceph.conf
+#in current impl, there is only one, global bucket
+bucket_name=fiotest
+thread=1
+nr_files=8
+bs=256k
+size=256k
+
+[rgw_randwrite]
+numjobs=8
+rw=rw
+rwmixread=70
+rwmixwrite=30
+offset=0
+time_based=1
+runtime=30s
diff --git a/src/test/fio/ceph-memstore.conf b/src/test/fio/ceph-memstore.conf
new file mode 100644
index 000000000..3553d1bcf
--- /dev/null
+++ b/src/test/fio/ceph-memstore.conf
@@ -0,0 +1,18 @@
+# example configuration file for ceph-memstore.fio
+
+[global]
+ debug filestore = 0
+
+ # spread objects over 8 collections
+ osd pool default pg num = 8
+ # increasing shards can help when scaling number of collections
+ osd op num shards = 5
+
+[osd]
+ osd objectstore = memstore
+
+ # use directory= option from fio job file
+ osd data = ${fio_dir}
+
+ # log inside fio_dir
+ log file = ${fio_dir}/log
diff --git a/src/test/fio/ceph-memstore.fio b/src/test/fio/ceph-memstore.fio
new file mode 100644
index 000000000..ceb6671d7
--- /dev/null
+++ b/src/test/fio/ceph-memstore.fio
@@ -0,0 +1,17 @@
+# Runs a 64k random write test against the ceph MemStore.
+[global]
+ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH
+
+conf=ceph-memstore.conf # must point to a valid ceph configuration file
+directory=/mnt/fio-memstore # directory for osd_data
+
+rw=randwrite
+iodepth=16
+
+time_based=1
+runtime=20s
+
+[memstore]
+nr_files=64
+size=256m
+bs=64k
diff --git a/src/test/fio/ceph-messenger.conf b/src/test/fio/ceph-messenger.conf
new file mode 100644
index 000000000..8d83d3613
--- /dev/null
+++ b/src/test/fio/ceph-messenger.conf
@@ -0,0 +1,7 @@
+[global]
+
+ms_type=async+posix
+ms_crc_data=false
+ms_crc_header=false
+ms_dispatch_throttle_bytes=0
+debug_ms=0/0
diff --git a/src/test/fio/ceph-messenger.fio b/src/test/fio/ceph-messenger.fio
new file mode 100644
index 000000000..20115cb8d
--- /dev/null
+++ b/src/test/fio/ceph-messenger.fio
@@ -0,0 +1,22 @@
+[global]
+bs=4k
+size=1g
+iodepth=128
+
+ioengine=libfio_ceph_messenger.so
+#ceph_conf_file=ceph-messenger.conf
+
+# In order to select protocol explicitly add 'v1:' or 'v2:' prefix.
+# By default v2 is used.
+hostname=127.0.0.1
+port=5555
+
+ms_type=async+posix # or async+dpdk or async+rdma
+
+[client]
+receiver=0
+rw=write
+
+[server]
+receiver=1
+rw=read
diff --git a/src/test/fio/fio_ceph_messenger.cc b/src/test/fio/fio_ceph_messenger.cc
new file mode 100644
index 000000000..4a4cf4fb5
--- /dev/null
+++ b/src/test/fio/fio_ceph_messenger.cc
@@ -0,0 +1,697 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * CEPH messenger engine
+ *
+ * FIO engine which uses ceph messenger as a transport. See corresponding
+ * FIO client and server jobs for details.
+ */
+
+#include "global/global_init.h"
+#include "msg/Messenger.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "common/perf_counters.h"
+#include "auth/DummyAuth.h"
+#include "ring_buffer.h"
+
+#include <fio.h>
+#include <flist.h>
+#include <optgroup.h>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_
+
+enum ceph_msgr_type {
+ CEPH_MSGR_TYPE_UNDEF,
+ CEPH_MSGR_TYPE_POSIX,
+ CEPH_MSGR_TYPE_DPDK,
+ CEPH_MSGR_TYPE_RDMA,
+};
+
+const char *ceph_msgr_types[] = { "undef", "async+posix",
+ "async+dpdk", "async+rdma" };
+
+struct ceph_msgr_options {
+ struct thread_data *td__;
+ unsigned int is_receiver;
+ unsigned int is_single;
+ unsigned int port;
+ const char *hostname;
+ const char *conffile;
+ enum ceph_msgr_type ms_type;
+};
+
+class FioDispatcher;
+
+struct ceph_msgr_data {
+ ceph_msgr_data(struct ceph_msgr_options *o_, unsigned iodepth) :
+ o(o_) {
+ INIT_FLIST_HEAD(&io_inflight_list);
+ INIT_FLIST_HEAD(&io_pending_list);
+ ring_buffer_init(&io_completed_q, iodepth);
+ pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
+ }
+
+ struct ceph_msgr_options *o;
+ Messenger *msgr = NULL;
+ FioDispatcher *disp = NULL;
+ pthread_spinlock_t spin;
+ struct ring_buffer io_completed_q;
+ struct flist_head io_inflight_list;
+ struct flist_head io_pending_list;
+ unsigned int io_inflight_nr = 0;
+ unsigned int io_pending_nr = 0;
+};
+
+struct ceph_msgr_io {
+ struct flist_head list;
+ struct ceph_msgr_data *data;
+ struct io_u *io_u;
+ MOSDOp *req_msg; /** Cached request, valid only for sender */
+};
+
+struct ceph_msgr_reply_io {
+ struct flist_head list;
+ MOSDOpReply *rep;
+};
+
+static void *str_to_ptr(const std::string &str)
+{
+ return (void *)strtoul(str.c_str(), NULL, 16);
+}
+
+static std::string ptr_to_str(void *ptr)
+{
+ char buf[32];
+
+ snprintf(buf, sizeof(buf), "%llx", (unsigned long long)ptr);
+ return std::string(buf);
+}
+
+/*
+ * Used for refcounters print on the last context put, almost duplicates
+ * global context refcounter, sigh.
+ */
+static std::atomic<int> ctx_ref(1);
+static DummyAuthClientServer *g_dummy_auth;
+
+static void create_or_get_ceph_context(struct ceph_msgr_options *o)
+{
+ if (g_ceph_context) {
+ g_ceph_context->get();
+ ctx_ref++;
+ return;
+ }
+
+ boost::intrusive_ptr<CephContext> cct;
+ vector<const char*> args;
+
+ if (o->conffile)
+ args = { "--conf", o->conffile };
+
+ cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ /* Will use g_ceph_context instead */
+ cct.detach();
+
+ common_init_finish(g_ceph_context);
+ g_ceph_context->_conf.apply_changes(NULL);
+ g_dummy_auth = new DummyAuthClientServer(g_ceph_context);
+ g_dummy_auth->auth_registry.refresh_config();
+}
+
+static void put_ceph_context(void)
+{
+ if (--ctx_ref == 0) {
+ ostringstream ostr;
+ Formatter* f;
+
+ f = Formatter::create("json-pretty");
+ g_ceph_context->get_perfcounters_collection()->dump_formatted(f, false);
+ ostr << ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl;
+ f->flush(ostr);
+ ostr << ">>>>>>>>>>>>> PERFCOUNTERS END <<<<<<<<<<<<" << std::endl;
+
+ delete f;
+ delete g_dummy_auth;
+ dout(0) << ostr.str() << dendl;
+ }
+
+ g_ceph_context->put();
+}
+
+static void ceph_msgr_sender_on_reply(const object_t &oid)
+{
+ struct ceph_msgr_data *data;
+ struct ceph_msgr_io *io;
+
+ /*
+ * Here we abuse object and use it as a raw pointer. Since this is
+ * only for benchmarks and testing we do not care about anything
+ * but performance. So no need to use global structure in order
+ * to search for reply, just send a pointer and get it back.
+ */
+
+ io = (decltype(io))str_to_ptr(oid.name);
+ data = io->data;
+ ring_buffer_enqueue(&data->io_completed_q, (void *)io);
+}
+
+
+class ReplyCompletion : public Message::CompletionHook {
+ struct ceph_msgr_io *m_io;
+
+public:
+ ReplyCompletion(MOSDOpReply *rep, struct ceph_msgr_io *io) :
+ Message::CompletionHook(rep),
+ m_io(io) {
+ }
+ void finish(int err) override {
+ struct ceph_msgr_data *data = m_io->data;
+
+ ring_buffer_enqueue(&data->io_completed_q, (void *)m_io);
+ }
+};
+
+static void ceph_msgr_receiver_on_request(struct ceph_msgr_data *data,
+ MOSDOp *req)
+{
+ MOSDOpReply *rep;
+
+ rep = new MOSDOpReply(req, 0, 0, 0, false);
+ rep->set_connection(req->get_connection());
+
+ pthread_spin_lock(&data->spin);
+ if (data->io_inflight_nr) {
+ struct ceph_msgr_io *io;
+
+ data->io_inflight_nr--;
+ io = flist_first_entry(&data->io_inflight_list,
+ struct ceph_msgr_io, list);
+ flist_del(&io->list);
+ pthread_spin_unlock(&data->spin);
+
+ rep->set_completion_hook(new ReplyCompletion(rep, io));
+ rep->get_connection()->send_message(rep);
+ } else {
+ struct ceph_msgr_reply_io *rep_io;
+
+ rep_io = (decltype(rep_io))malloc(sizeof(*rep_io));
+ rep_io->rep = rep;
+
+ data->io_pending_nr++;
+ flist_add_tail(&rep_io->list, &data->io_pending_list);
+ pthread_spin_unlock(&data->spin);
+ }
+}
+
+class FioDispatcher : public Dispatcher {
+ struct ceph_msgr_data *m_data;
+
+public:
+ FioDispatcher(struct ceph_msgr_data *data):
+ Dispatcher(g_ceph_context),
+ m_data(data) {
+ }
+ bool ms_can_fast_dispatch_any() const override {
+ return true;
+ }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return m_data->o->is_receiver;
+ case CEPH_MSG_OSD_OPREPLY:
+ return !m_data->o->is_receiver;
+ default:
+ return false;
+ }
+ }
+ void ms_handle_fast_connect(Connection *con) override {
+ }
+ void ms_handle_fast_accept(Connection *con) override {
+ }
+ bool ms_dispatch(Message *m) override {
+ return true;
+ }
+ void ms_fast_dispatch(Message *m) override {
+ if (m_data->o->is_receiver) {
+ MOSDOp *req;
+
+ /*
+ * Server side, handle request.
+ */
+
+ req = static_cast<MOSDOp*>(m);
+ req->finish_decode();
+
+ ceph_msgr_receiver_on_request(m_data, req);
+ } else {
+ MOSDOpReply *rep;
+
+ /*
+ * Client side, get reply, extract objid and mark
+ * IO as completed.
+ */
+
+ rep = static_cast<MOSDOpReply*>(m);
+ ceph_msgr_sender_on_reply(rep->get_oid());
+ }
+ m->put();
+ }
+ bool ms_handle_reset(Connection *con) override {
+ return true;
+ }
+ void ms_handle_remote_reset(Connection *con) override {
+ }
+ bool ms_handle_refused(Connection *con) override {
+ return false;
+ }
+ int ms_handle_authentication(Connection *con) override {
+ return 1;
+ }
+};
+
+static entity_addr_t hostname_to_addr(struct ceph_msgr_options *o)
+{
+ entity_addr_t addr;
+
+ addr.parse(o->hostname);
+ addr.set_port(o->port);
+ addr.set_nonce(0);
+
+ return addr;
+}
+
+static Messenger *create_messenger(struct ceph_msgr_options *o)
+{
+ entity_name_t ename = o->is_receiver ?
+ entity_name_t::OSD(0) : entity_name_t::CLIENT(0);
+ std::string lname = o->is_receiver ?
+ "receiver" : "sender";
+
+ std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ?
+ ceph_msgr_types[o->ms_type] :
+ g_ceph_context->_conf.get_val<std::string>("ms_type");
+
+ /* o->td__>pid doesn't set value, so use getpid() instead*/
+ auto nonce = o->is_receiver ? 0 : (getpid() + o->td__->thread_number);
+ Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(),
+ ename, lname, nonce);
+ if (o->is_receiver) {
+ msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+ msgr->bind(hostname_to_addr(o));
+ } else {
+ msgr->set_default_policy(Messenger::Policy::lossless_client(0));
+ }
+ msgr->set_auth_client(g_dummy_auth);
+ msgr->set_auth_server(g_dummy_auth);
+ msgr->set_require_authorizer(false);
+ msgr->start();
+
+ return msgr;
+}
+
+static Messenger *single_msgr;
+static std::atomic<int> single_msgr_ref;
+static vector<FioDispatcher *> single_msgr_disps;
+
+static void init_messenger(struct ceph_msgr_data *data)
+{
+ struct ceph_msgr_options *o = data->o;
+ FioDispatcher *disp;
+ Messenger *msgr;
+
+ disp = new FioDispatcher(data);
+ if (o->is_single) {
+ /*
+ * Single messenger instance for the whole FIO
+ */
+
+ if (!single_msgr) {
+ msgr = create_messenger(o);
+ single_msgr = msgr;
+ } else {
+ msgr = single_msgr;
+ }
+ single_msgr_disps.push_back(disp);
+ single_msgr_ref++;
+ } else {
+ /*
+ * Messenger instance per FIO thread
+ */
+ msgr = create_messenger(o);
+ }
+ msgr->add_dispatcher_head(disp);
+
+ data->disp = disp;
+ data->msgr = msgr;
+}
+
+static void free_messenger(struct ceph_msgr_data *data)
+{
+ data->msgr->shutdown();
+ data->msgr->wait();
+ delete data->msgr;
+}
+
+static void put_messenger(struct ceph_msgr_data *data)
+{
+ struct ceph_msgr_options *o = data->o;
+
+ if (o->is_single) {
+ if (--single_msgr_ref == 0) {
+ free_messenger(data);
+ /*
+ * In case of a single messenger instance we have to
+ * free dispatchers after actual messenger destruction.
+ */
+ for (auto disp : single_msgr_disps)
+ delete disp;
+ single_msgr = NULL;
+ }
+ } else {
+ free_messenger(data);
+ delete data->disp;
+ }
+ data->disp = NULL;
+ data->msgr = NULL;
+}
+
+static int fio_ceph_msgr_setup(struct thread_data *td)
+{
+ struct ceph_msgr_options *o = (decltype(o))td->eo;
+ o->td__ = td;
+ ceph_msgr_data *data;
+
+ /* We have to manage global resources so we use threads */
+ td->o.use_thread = 1;
+
+ create_or_get_ceph_context(o);
+
+ if (!td->io_ops_data) {
+ data = new ceph_msgr_data(o, td->o.iodepth);
+ init_messenger(data);
+ td->io_ops_data = (void *)data;
+ }
+
+ return 0;
+}
+
+static void fio_ceph_msgr_cleanup(struct thread_data *td)
+{
+ struct ceph_msgr_data *data;
+ unsigned nr;
+
+ data = (decltype(data))td->io_ops_data;
+ put_messenger(data);
+
+ nr = ring_buffer_used_size(&data->io_completed_q);
+ if (nr)
+ fprintf(stderr, "fio: io_completed_nr==%d, but should be zero\n",
+ nr);
+ if (data->io_inflight_nr)
+ fprintf(stderr, "fio: io_inflight_nr==%d, but should be zero\n",
+ data->io_inflight_nr);
+ if (data->io_pending_nr)
+ fprintf(stderr, "fio: io_pending_nr==%d, but should be zero\n",
+ data->io_pending_nr);
+ if (!flist_empty(&data->io_inflight_list))
+ fprintf(stderr, "fio: io_inflight_list is not empty\n");
+ if (!flist_empty(&data->io_pending_list))
+ fprintf(stderr, "fio: io_pending_list is not empty\n");
+
+ ring_buffer_deinit(&data->io_completed_q);
+ delete data;
+ put_ceph_context();
+}
+
+static int fio_ceph_msgr_io_u_init(struct thread_data *td, struct io_u *io_u)
+{
+ struct ceph_msgr_options *o = (decltype(o))td->eo;
+ struct ceph_msgr_io *io;
+ MOSDOp *req_msg = NULL;
+
+ io = (decltype(io))malloc(sizeof(*io));
+ io->io_u = io_u;
+ io->data = (decltype(io->data))td->io_ops_data;
+
+ if (!o->is_receiver) {
+ object_t oid(ptr_to_str(io));
+ pg_t pgid;
+ object_locator_t oloc;
+ hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(),
+ pgid.pool(), oloc.nspace);
+ spg_t spgid(pgid);
+ entity_inst_t dest(entity_name_t::OSD(0), hostname_to_addr(o));
+
+ Messenger *msgr = io->data->msgr;
+ ConnectionRef con = msgr->connect_to(dest.name.type(),
+ entity_addrvec_t(dest.addr));
+
+ req_msg = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+ req_msg->set_connection(con);
+ }
+
+ io->req_msg = req_msg;
+ io_u->engine_data = (void *)io;
+
+ return 0;
+}
+
+static void fio_ceph_msgr_io_u_free(struct thread_data *td, struct io_u *io_u)
+{
+ struct ceph_msgr_io *io;
+
+ io = (decltype(io))io_u->engine_data;
+ if (io) {
+ io_u->engine_data = NULL;
+ if (io->req_msg)
+ io->req_msg->put();
+ free(io);
+ }
+}
+
+static enum fio_q_status ceph_msgr_sender_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct ceph_msgr_data *data;
+ struct ceph_msgr_io *io;
+
+ bufferlist buflist = bufferlist::static_from_mem(
+ (char *)io_u->buf, io_u->buflen);
+
+ io = (decltype(io))io_u->engine_data;
+ data = (decltype(data))td->io_ops_data;
+
+ /* No handy method to clear ops before reusage? Ok */
+ io->req_msg->ops.clear();
+
+ /* Here we do not care about direction, always send as write */
+ io->req_msg->write(0, io_u->buflen, buflist);
+ /* Keep message alive */
+ io->req_msg->get();
+ io->req_msg->get_connection()->send_message(io->req_msg);
+
+ return FIO_Q_QUEUED;
+}
+
+static int fio_ceph_msgr_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *ts)
+{
+ struct ceph_msgr_data *data;
+ unsigned int nr;
+
+ data = (decltype(data))td->io_ops_data;
+
+ /*
+ * Check io_u.c : if min == 0 -> ts is valid and equal to zero,
+ * if min != 0 -> ts is NULL.
+ */
+ assert(!min ^ !ts);
+
+ nr = ring_buffer_used_size(&data->io_completed_q);
+ if (nr >= min)
+ /* We got something */
+ return min(nr, max);
+
+ /* Here we are only if min != 0 and ts == NULL */
+ assert(min && !ts);
+
+ while ((nr = ring_buffer_used_size(&data->io_completed_q)) < min &&
+ !td->terminate) {
+ /* Poll, no disk IO, so we expect response immediately. */
+ usleep(10);
+ }
+
+ return min(nr, max);
+}
+
+static struct io_u *fio_ceph_msgr_event(struct thread_data *td, int event)
+{
+ struct ceph_msgr_data *data;
+ struct ceph_msgr_io *io;
+
+ data = (decltype(data))td->io_ops_data;
+ io = (decltype(io))ring_buffer_dequeue(&data->io_completed_q);
+
+ return io->io_u;
+}
+
+static enum fio_q_status ceph_msgr_receiver_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct ceph_msgr_data *data;
+ struct ceph_msgr_io *io;
+
+ io = (decltype(io))io_u->engine_data;
+ data = io->data;
+ pthread_spin_lock(&data->spin);
+ if (data->io_pending_nr) {
+ struct ceph_msgr_reply_io *rep_io;
+ MOSDOpReply *rep;
+
+ data->io_pending_nr--;
+ rep_io = flist_first_entry(&data->io_pending_list,
+ struct ceph_msgr_reply_io,
+ list);
+ flist_del(&rep_io->list);
+ rep = rep_io->rep;
+ pthread_spin_unlock(&data->spin);
+ free(rep_io);
+
+ rep->set_completion_hook(new ReplyCompletion(rep, io));
+ rep->get_connection()->send_message(rep);
+ } else {
+ data->io_inflight_nr++;
+ flist_add_tail(&io->list, &data->io_inflight_list);
+ pthread_spin_unlock(&data->spin);
+ }
+
+ return FIO_Q_QUEUED;
+}
+
+static enum fio_q_status fio_ceph_msgr_queue(struct thread_data *td,
+ struct io_u *io_u)
+{
+ struct ceph_msgr_options *o = (decltype(o))td->eo;
+
+ if (o->is_receiver)
+ return ceph_msgr_receiver_queue(td, io_u);
+ else
+ return ceph_msgr_sender_queue(td, io_u);
+}
+
+static int fio_ceph_msgr_open_file(struct thread_data *td, struct fio_file *f)
+{
+ return 0;
+}
+
+static int fio_ceph_msgr_close_file(struct thread_data *, struct fio_file *)
+{
+ return 0;
+}
+
+template <class Func>
+fio_option make_option(Func&& func)
+{
+ auto o = fio_option{};
+ o.category = FIO_OPT_C_ENGINE;
+ func(std::ref(o));
+ return o;
+}
+
+static std::vector<fio_option> options {
+ make_option([] (fio_option& o) {
+ o.name = "receiver";
+ o.lname = "CEPH messenger is receiver";
+ o.type = FIO_OPT_BOOL;
+ o.off1 = offsetof(struct ceph_msgr_options, is_receiver);
+ o.help = "CEPH messenger is sender or receiver";
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "single_instance";
+ o.lname = "Single instance of CEPH messenger ";
+ o.type = FIO_OPT_BOOL;
+ o.off1 = offsetof(struct ceph_msgr_options, is_single);
+ o.help = "CEPH messenger is a created once for all threads";
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "hostname";
+ o.lname = "CEPH messenger hostname";
+ o.type = FIO_OPT_STR_STORE;
+ o.off1 = offsetof(struct ceph_msgr_options, hostname);
+ o.help = "Hostname for CEPH messenger engine";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "port";
+ o.lname = "CEPH messenger engine port";
+ o.type = FIO_OPT_INT;
+ o.off1 = offsetof(struct ceph_msgr_options, port);
+ o.maxval = 65535;
+ o.minval = 1;
+ o.help = "Port to use for CEPH messenger";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "ms_type";
+ o.lname = "CEPH messenger transport type: async+posix, async+dpdk, async+rdma";
+ o.type = FIO_OPT_STR;
+ o.off1 = offsetof(struct ceph_msgr_options, ms_type);
+ o.help = "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page";
+ o.def = "undef";
+
+ o.posval[0].ival = "undef";
+ o.posval[0].oval = CEPH_MSGR_TYPE_UNDEF;
+
+ o.posval[1].ival = "async+posix";
+ o.posval[1].oval = CEPH_MSGR_TYPE_POSIX;
+ o.posval[1].help = "POSIX API";
+
+ o.posval[2].ival = "async+dpdk";
+ o.posval[2].oval = CEPH_MSGR_TYPE_DPDK;
+ o.posval[2].help = "DPDK";
+
+ o.posval[3].ival = "async+rdma";
+ o.posval[3].oval = CEPH_MSGR_TYPE_RDMA;
+ o.posval[3].help = "RDMA";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "ceph_conf_file";
+ o.lname = "CEPH configuration file";
+ o.type = FIO_OPT_STR_STORE;
+ o.off1 = offsetof(struct ceph_msgr_options, conffile);
+ o.help = "Path to CEPH configuration file";
+ }),
+ {} /* Last NULL */
+};
+
+static struct ioengine_ops ioengine;
+
+extern "C" {
+
+void get_ioengine(struct ioengine_ops** ioengine_ptr)
+{
+ /*
+ * Main ioengine structure
+ */
+ ioengine.name = "ceph-msgr";
+ ioengine.version = FIO_IOOPS_VERSION;
+ ioengine.flags = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO;
+ ioengine.setup = fio_ceph_msgr_setup;
+ ioengine.queue = fio_ceph_msgr_queue;
+ ioengine.getevents = fio_ceph_msgr_getevents;
+ ioengine.event = fio_ceph_msgr_event;
+ ioengine.cleanup = fio_ceph_msgr_cleanup;
+ ioengine.open_file = fio_ceph_msgr_open_file;
+ ioengine.close_file = fio_ceph_msgr_close_file;
+ ioengine.io_u_init = fio_ceph_msgr_io_u_init;
+ ioengine.io_u_free = fio_ceph_msgr_io_u_free;
+ ioengine.option_struct_size = sizeof(struct ceph_msgr_options);
+ ioengine.options = options.data();
+
+ *ioengine_ptr = &ioengine;
+}
+} // extern "C"
diff --git a/src/test/fio/fio_ceph_objectstore.cc b/src/test/fio/fio_ceph_objectstore.cc
new file mode 100644
index 000000000..33f900b80
--- /dev/null
+++ b/src/test/fio/fio_ceph_objectstore.cc
@@ -0,0 +1,940 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph ObjectStore engine
+ *
+ * IO engine using Ceph's ObjectStore class to test low-level performance of
+ * Ceph OSDs.
+ *
+ */
+
+#include <memory>
+#include <system_error>
+#include <vector>
+#include <fstream>
+
+#include "os/ObjectStore.h"
+#include "global/global_init.h"
+#include "common/errno.h"
+#include "include/intarith.h"
+#include "include/stringify.h"
+#include "include/random.h"
+#include "include/str_list.h"
+#include "common/perf_counters.h"
+#include "common/TracepointProvider.h"
+
+#include <fio.h>
+#include <optgroup.h>
+
+#include "include/ceph_assert.h" // fio.h clobbers our assert.h
+#include <algorithm>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_
+
+namespace {
+
+/// fio configuration options read from the job file
+struct Options {
+ thread_data* td;
+ char* conf;
+ char* perf_output_file;
+ char* throttle_values;
+ char* deferred_throttle_values;
+ unsigned long long
+ cycle_throttle_period,
+ oi_attr_len_low,
+ oi_attr_len_high,
+ snapset_attr_len_low,
+ snapset_attr_len_high,
+ pglog_omap_len_low,
+ pglog_omap_len_high,
+ pglog_dup_omap_len_low,
+ pglog_dup_omap_len_high,
+ _fastinfo_omap_len_low,
+ _fastinfo_omap_len_high;
+ unsigned simulate_pglog;
+ unsigned single_pool_mode;
+ unsigned preallocate_files;
+ unsigned check_files;
+};
+
+template <class Func> // void Func(fio_option&)
+fio_option make_option(Func&& func)
+{
+ // zero-initialize and set common defaults
+ auto o = fio_option{};
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_RBD;
+ func(std::ref(o));
+ return o;
+}
+
+static std::vector<fio_option> ceph_options{
+ make_option([] (fio_option& o) {
+ o.name = "conf";
+ o.lname = "ceph configuration file";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "Path to a ceph configuration file";
+ o.off1 = offsetof(Options, conf);
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "perf_output_file";
+ o.lname = "perf output target";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "Path to which to write json formatted perf output";
+ o.off1 = offsetof(Options, perf_output_file);
+ o.def = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "oi_attr_len";
+ o.lname = "OI Attr length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set OI(aka '_') attribute to specified length";
+ o.off1 = offsetof(Options, oi_attr_len_low);
+ o.off2 = offsetof(Options, oi_attr_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "snapset_attr_len";
+ o.lname = "Attr 'snapset' length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set 'snapset' attribute to specified length";
+ o.off1 = offsetof(Options, snapset_attr_len_low);
+ o.off2 = offsetof(Options, snapset_attr_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "_fastinfo_omap_len";
+ o.lname = "'_fastinfo' omap entry length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set '_fastinfo' OMAP attribute to specified length";
+ o.off1 = offsetof(Options, _fastinfo_omap_len_low);
+ o.off2 = offsetof(Options, _fastinfo_omap_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "pglog_simulation";
+ o.lname = "pglog behavior simulation";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables PG Log simulation behavior";
+ o.off1 = offsetof(Options, simulate_pglog);
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "pglog_omap_len";
+ o.lname = "pglog omap entry length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set pglog omap entry to specified length";
+ o.off1 = offsetof(Options, pglog_omap_len_low);
+ o.off2 = offsetof(Options, pglog_omap_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "pglog_dup_omap_len";
+ o.lname = "uplicate pglog omap entry length";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "Set duplicate pglog omap entry to specified length";
+ o.off1 = offsetof(Options, pglog_dup_omap_len_low);
+ o.off2 = offsetof(Options, pglog_dup_omap_len_high);
+ o.def = 0;
+ o.minval = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "single_pool_mode";
+ o.lname = "single(shared among jobs) pool mode";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables the mode when all jobs run against the same pool";
+ o.off1 = offsetof(Options, single_pool_mode);
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "preallocate_files";
+ o.lname = "preallocate files on init";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables/disables file preallocation (touch and resize) on init";
+ o.off1 = offsetof(Options, preallocate_files);
+ o.def = "1";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "check_files";
+ o.lname = "ensure files exist and are correct on init";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables/disables checking of files on init";
+ o.off1 = offsetof(Options, check_files);
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "bluestore_throttle";
+ o.lname = "set bluestore throttle";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "comma delimited list of throttle values",
+ o.off1 = offsetof(Options, throttle_values);
+ o.def = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "bluestore_deferred_throttle";
+ o.lname = "set bluestore deferred throttle";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "comma delimited list of throttle values",
+ o.off1 = offsetof(Options, deferred_throttle_values);
+ o.def = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "vary_bluestore_throttle_period";
+ o.lname = "period between different throttle values";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "set to non-zero value to periodically cycle through throttle options";
+ o.off1 = offsetof(Options, cycle_throttle_period);
+ o.def = "0";
+ o.minval = 0;
+ }),
+ {} // fio expects a 'null'-terminated list
+};
+
+
+struct Collection {
+ spg_t pg;
+ coll_t cid;
+ ObjectStore::CollectionHandle ch;
+ // Can't use mutex directly in vectors hence dynamic allocation
+
+ std::unique_ptr<std::mutex> lock;
+ uint64_t pglog_ver_head = 1;
+ uint64_t pglog_ver_tail = 1;
+ uint64_t pglog_dup_ver_tail = 1;
+
+ // use big pool ids to avoid clashing with existing collections
+ static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
+
+ Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch)
+ : pg(pg), cid(pg), ch(_ch),
+ lock(new std::mutex) {
+ }
+};
+
+int destroy_collections(
+ std::unique_ptr<ObjectStore>& os,
+ std::vector<Collection>& collections)
+{
+ ObjectStore::Transaction t;
+ bool failed = false;
+ // remove our collections
+ for (auto& coll : collections) {
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.remove(coll.cid, pgmeta_oid);
+ t.remove_collection(coll.cid);
+ int r = os->queue_transaction(coll.ch, std::move(t));
+ if (r && !failed) {
+ derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
+ failed = true;
+ }
+ }
+ return 0;
+}
+
+int init_collections(std::unique_ptr<ObjectStore>& os,
+ uint64_t pool,
+ std::vector<Collection>& collections,
+ uint64_t count)
+{
+ ceph_assert(count > 0);
+ collections.reserve(count);
+
+ const int split_bits = cbits(count - 1);
+
+ {
+ // propagate Superblock object to ensure proper functioning of tools that
+ // need it. E.g. ceph-objectstore-tool
+ coll_t cid(coll_t::meta());
+ bool exists = os->collection_exists(cid);
+ if (!exists) {
+ auto ch = os->create_new_collection(cid);
+
+ OSDSuperblock superblock;
+ bufferlist bl;
+ encode(superblock, bl);
+
+ ObjectStore::Transaction t;
+ t.create_collection(cid, split_bits);
+ t.write(cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
+ int r = os->queue_transaction(ch, std::move(t));
+
+ if (r < 0) {
+ derr << "Failure to write OSD superblock: " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ }
+ }
+
+ for (uint32_t i = 0; i < count; i++) {
+ auto pg = spg_t{pg_t{i, pool}};
+ coll_t cid(pg);
+
+ bool exists = os->collection_exists(cid);
+ auto ch = exists ?
+ os->open_collection(cid) :
+ os->create_new_collection(cid) ;
+
+ collections.emplace_back(pg, ch);
+
+ ObjectStore::Transaction t;
+ auto& coll = collections.back();
+ if (!exists) {
+ t.create_collection(coll.cid, split_bits);
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.touch(coll.cid, pgmeta_oid);
+ int r = os->queue_transaction(coll.ch, std::move(t));
+ if (r) {
+ derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
+ destroy_collections(os, collections);
+ return r;
+ }
+ }
+ }
+ return 0;
+}
+
+/// global engine state shared between all jobs within the process. this
+/// includes g_ceph_context and the ObjectStore instance
+struct Engine {
+ /// the initial g_ceph_context reference to be dropped on destruction
+ boost::intrusive_ptr<CephContext> cct;
+ std::unique_ptr<ObjectStore> os;
+
+ std::vector<Collection> collections; //< shared collections to spread objects over
+
+ std::mutex lock;
+ int ref_count;
+ const bool unlink; //< unlink objects on destruction
+
+ // file to which to output formatted perf information
+ const std::optional<std::string> perf_output_file;
+
+ explicit Engine(thread_data* td);
+ ~Engine();
+
+ static Engine* get_instance(thread_data* td) {
+ // note: creates an Engine with the options associated with the first job
+ static Engine engine(td);
+ return &engine;
+ }
+
+ void ref() {
+ std::lock_guard<std::mutex> l(lock);
+ ++ref_count;
+ }
+ void deref() {
+ std::lock_guard<std::mutex> l(lock);
+ --ref_count;
+ if (!ref_count) {
+ ostringstream ostr;
+ Formatter* f = Formatter::create(
+ "json-pretty", "json-pretty", "json-pretty");
+ f->open_object_section("perf_output");
+ cct->get_perfcounters_collection()->dump_formatted(f, false);
+ if (g_conf()->rocksdb_perf) {
+ f->open_object_section("rocksdb_perf");
+ os->get_db_statistics(f);
+ f->close_section();
+ }
+ mempool::dump(f);
+ {
+ f->open_object_section("db_histogram");
+ os->generate_db_histogram(f);
+ f->close_section();
+ }
+ f->close_section();
+
+ f->flush(ostr);
+ delete f;
+
+ if (unlink) {
+ destroy_collections(os, collections);
+ }
+ os->umount();
+ dout(0) << "FIO plugin perf dump:" << dendl;
+ dout(0) << ostr.str() << dendl;
+ if (perf_output_file) {
+ try {
+ std::ofstream foutput(*perf_output_file);
+ foutput << ostr.str() << std::endl;
+ } catch (std::exception &e) {
+ std::cerr << "Unable to write formatted output to "
+ << *perf_output_file
+ << ", exception: " << e.what()
+ << std::endl;
+ }
+ }
+ }
+ }
+};
+
+TracepointProvider::Traits bluestore_tracepoint_traits("libbluestore_tp.so",
+ "bluestore_tracing");
+
+Engine::Engine(thread_data* td)
+ : ref_count(0),
+ unlink(td->o.unlink),
+ perf_output_file(
+ static_cast<Options*>(td->eo)->perf_output_file ?
+ std::make_optional(static_cast<Options*>(td->eo)->perf_output_file) :
+ std::nullopt)
+{
+ // add the ceph command line arguments
+ auto o = static_cast<Options*>(td->eo);
+ if (!o->conf) {
+ throw std::runtime_error("missing conf option for ceph configuration file");
+ }
+ std::vector<const char*> args{
+ "-i", "0", // identify as osd.0 for osd_data and osd_journal
+ "--conf", o->conf, // use the requested conf file
+ };
+ if (td->o.directory) { // allow conf files to use ${fio_dir} for data
+ args.emplace_back("--fio_dir");
+ args.emplace_back(td->o.directory);
+ }
+
+ // claim the g_ceph_context reference and release it on destruction
+ cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+
+ TracepointProvider::initialize<bluestore_tracepoint_traits>(g_ceph_context);
+
+ // create the ObjectStore
+ os.reset(ObjectStore::create(g_ceph_context,
+ g_conf().get_val<std::string>("osd objectstore"),
+ g_conf().get_val<std::string>("osd data"),
+ g_conf().get_val<std::string>("osd journal")));
+ if (!os)
+ throw std::runtime_error("bad objectstore type " + g_conf()->osd_objectstore);
+
+ unsigned num_shards;
+ if(g_conf()->osd_op_num_shards)
+ num_shards = g_conf()->osd_op_num_shards;
+ else if(os->is_rotational())
+ num_shards = g_conf()->osd_op_num_shards_hdd;
+ else
+ num_shards = g_conf()->osd_op_num_shards_ssd;
+ os->set_cache_shards(num_shards);
+
+ //normalize options
+ o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high);
+ o->snapset_attr_len_high = max(o->snapset_attr_len_low,
+ o->snapset_attr_len_high);
+ o->pglog_omap_len_high = max(o->pglog_omap_len_low,
+ o->pglog_omap_len_high);
+ o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low,
+ o->pglog_dup_omap_len_high);
+ o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low,
+ o->_fastinfo_omap_len_high);
+
+ int r = os->mkfs();
+ if (r < 0)
+ throw std::system_error(-r, std::system_category(), "mkfs failed");
+
+ r = os->mount();
+ if (r < 0)
+ throw std::system_error(-r, std::system_category(), "mount failed");
+
+ // create shared collections up to osd_pool_default_pg_num
+ if (o->single_pool_mode) {
+ uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
+ if (count > td->o.nr_files)
+ count = td->o.nr_files;
+ init_collections(os, Collection::MIN_POOL_ID, collections, count);
+ }
+}
+
+Engine::~Engine()
+{
+ ceph_assert(!ref_count);
+}
+
+struct Object {
+ ghobject_t oid;
+ Collection& coll;
+
+ Object(const char* name, Collection& coll)
+ : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")),
+ coll(coll) {}
+};
+
+/// treat each fio job either like a separate pool with its own collections and objects
+/// or just a client using its own objects from the shared pool
+struct Job {
+ Engine* engine; //< shared ptr to the global Engine
+ const unsigned subjob_number; //< subjob num
+ std::vector<Collection> collections; //< job's private collections to spread objects over
+ std::vector<Object> objects; //< associate an object with each fio_file
+ std::vector<io_u*> events; //< completions for fio_ceph_os_event()
+ const bool unlink; //< unlink objects on destruction
+
+ bufferptr one_for_all_data; //< preallocated buffer long enough
+ //< to use for vairious operations
+ std::mutex throttle_lock;
+ const vector<unsigned> throttle_values;
+ const vector<unsigned> deferred_throttle_values;
+ std::chrono::duration<double> cycle_throttle_period;
+ mono_clock::time_point last = ceph::mono_clock::zero();
+ unsigned index = 0;
+
+ static vector<unsigned> parse_throttle_str(const char *p) {
+ vector<unsigned> ret;
+ if (p == nullptr) {
+ return ret;
+ }
+ ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable {
+ if (s.size() > 0) {
+ ret.push_back(std::stoul(std::string(s)));
+ }
+ });
+ return ret;
+ }
+ void check_throttle();
+
+ Job(Engine* engine, const thread_data* td);
+ ~Job();
+};
+
+Job::Job(Engine* engine, const thread_data* td)
+ : engine(engine),
+ subjob_number(td->subjob_number),
+ events(td->o.iodepth),
+ unlink(td->o.unlink),
+ throttle_values(
+ parse_throttle_str(static_cast<Options*>(td->eo)->throttle_values)),
+ deferred_throttle_values(
+ parse_throttle_str(static_cast<Options*>(td->eo)->deferred_throttle_values)),
+ cycle_throttle_period(
+ static_cast<Options*>(td->eo)->cycle_throttle_period)
+{
+ engine->ref();
+ auto o = static_cast<Options*>(td->eo);
+ unsigned long long max_data = max(o->oi_attr_len_high,
+ o->snapset_attr_len_high);
+ max_data = max(max_data, o->pglog_omap_len_high);
+ max_data = max(max_data, o->pglog_dup_omap_len_high);
+ max_data = max(max_data, o->_fastinfo_omap_len_high);
+ one_for_all_data = buffer::create(max_data);
+
+ std::vector<Collection>* colls;
+ // create private collections up to osd_pool_default_pg_num
+ if (!o->single_pool_mode) {
+ uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num");
+ if (count > td->o.nr_files)
+ count = td->o.nr_files;
+ // use the fio thread_number for our unique pool id
+ const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1;
+ init_collections(engine->os, pool, collections, count);
+ colls = &collections;
+ } else {
+ colls = &engine->collections;
+ }
+ const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
+ ObjectStore::Transaction t;
+
+ // create an object for each file in the job
+ objects.reserve(td->o.nr_files);
+ unsigned checked_or_preallocated = 0;
+ for (uint32_t i = 0; i < td->o.nr_files; i++) {
+ auto f = td->files[i];
+ f->real_file_size = file_size;
+ f->engine_pos = i;
+
+ // associate each object with a collection in a round-robin fashion.
+ auto& coll = (*colls)[i % colls->size()];
+
+ objects.emplace_back(f->file_name, coll);
+ if (o->preallocate_files) {
+ auto& oid = objects.back().oid;
+ t.touch(coll.cid, oid);
+ t.truncate(coll.cid, oid, file_size);
+ int r = engine->os->queue_transaction(coll.ch, std::move(t));
+ if (r) {
+ engine->deref();
+ throw std::system_error(r, std::system_category(), "job init");
+ }
+ }
+ if (o->check_files) {
+ auto& oid = objects.back().oid;
+ struct stat st;
+ int r = engine->os->stat(coll.ch, oid, &st);
+ if (r || ((unsigned)st.st_size) != file_size) {
+ derr << "Problem checking " << oid << ", r=" << r
+ << ", st.st_size=" << st.st_size
+ << ", file_size=" << file_size
+ << ", nr_files=" << td->o.nr_files << dendl;
+ engine->deref();
+ throw std::system_error(
+ r, std::system_category(), "job init -- cannot check file");
+ }
+ }
+ if (o->check_files || o->preallocate_files) {
+ ++checked_or_preallocated;
+ }
+ }
+ if (o->check_files) {
+ derr << "fio_ceph_objectstore checked " << checked_or_preallocated
+ << " files"<< dendl;
+ }
+ if (o->preallocate_files ){
+ derr << "fio_ceph_objectstore preallocated " << checked_or_preallocated
+ << " files"<< dendl;
+ }
+}
+
+Job::~Job()
+{
+ if (unlink) {
+ ObjectStore::Transaction t;
+ bool failed = false;
+ // remove our objects
+ for (auto& obj : objects) {
+ t.remove(obj.coll.cid, obj.oid);
+ int r = engine->os->queue_transaction(obj.coll.ch, std::move(t));
+ if (r && !failed) {
+ derr << "job cleanup failed with " << cpp_strerror(-r) << dendl;
+ failed = true;
+ }
+ }
+ destroy_collections(engine->os, collections);
+ }
+ engine->deref();
+}
+
+void Job::check_throttle()
+{
+ if (subjob_number != 0)
+ return;
+
+ std::lock_guard<std::mutex> l(throttle_lock);
+ if (throttle_values.empty() && deferred_throttle_values.empty())
+ return;
+
+ if (ceph::mono_clock::is_zero(last) ||
+ ((cycle_throttle_period != cycle_throttle_period.zero()) &&
+ (ceph::mono_clock::now() - last) > cycle_throttle_period)) {
+ unsigned tvals = throttle_values.size() ? throttle_values.size() : 1;
+ unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1;
+ if (!throttle_values.empty()) {
+ std::string val = std::to_string(throttle_values[index % tvals]);
+ std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl;
+ int r = engine->cct->_conf.set_val(
+ "bluestore_throttle_bytes",
+ val,
+ nullptr);
+ ceph_assert(r == 0);
+ }
+ if (!deferred_throttle_values.empty()) {
+ std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]);
+ std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl;
+ int r = engine->cct->_conf.set_val(
+ "bluestore_throttle_deferred_bytes",
+ val,
+ nullptr);
+ ceph_assert(r == 0);
+ }
+ engine->cct->_conf.apply_changes(nullptr);
+ index++;
+ index %= tvals * dtvals;
+ last = ceph::mono_clock::now();
+ }
+}
+
+int fio_ceph_os_setup(thread_data* td)
+{
+ // if there are multiple jobs, they must run in the same process against a
+ // single instance of the ObjectStore. explicitly disable fio's default
+ // job-per-process configuration
+ td->o.use_thread = 1;
+
+ try {
+ // get or create the global Engine instance
+ auto engine = Engine::get_instance(td);
+ // create a Job for this thread
+ td->io_ops_data = new Job(engine, td);
+ } catch (std::exception& e) {
+ std::cerr << "setup failed with " << e.what() << std::endl;
+ return -1;
+ }
+ return 0;
+}
+
+void fio_ceph_os_cleanup(thread_data* td)
+{
+ auto job = static_cast<Job*>(td->io_ops_data);
+ td->io_ops_data = nullptr;
+ delete job;
+}
+
+
+io_u* fio_ceph_os_event(thread_data* td, int event)
+{
+ // return the requested event from fio_ceph_os_getevents()
+ auto job = static_cast<Job*>(td->io_ops_data);
+ return job->events[event];
+}
+
+int fio_ceph_os_getevents(thread_data* td, unsigned int min,
+ unsigned int max, const timespec* t)
+{
+ auto job = static_cast<Job*>(td->io_ops_data);
+ unsigned int events = 0;
+ io_u* u = NULL;
+ unsigned int i = 0;
+
+ // loop through inflight ios until we find 'min' completions
+ do {
+ io_u_qiter(&td->io_u_all, u, i) {
+ if (!(u->flags & IO_U_F_FLIGHT))
+ continue;
+
+ if (u->engine_data) {
+ u->engine_data = nullptr;
+ job->events[events] = u;
+ events++;
+ }
+ }
+ if (events >= min)
+ break;
+ usleep(100);
+ } while (1);
+
+ return events;
+}
+
+/// completion context for ObjectStore::queue_transaction()
+class UnitComplete : public Context {
+ io_u* u;
+ public:
+ explicit UnitComplete(io_u* u) : u(u) {}
+ void finish(int r) {
+ // mark the pointer to indicate completion for fio_ceph_os_getevents()
+ u->engine_data = reinterpret_cast<void*>(1ull);
+ }
+};
+
+enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u)
+{
+ fio_ro_check(td, u);
+
+
+
+ auto o = static_cast<const Options*>(td->eo);
+ auto job = static_cast<Job*>(td->io_ops_data);
+ auto& object = job->objects[u->file->engine_pos];
+ auto& coll = object.coll;
+ auto& os = job->engine->os;
+
+ job->check_throttle();
+
+ if (u->ddir == DDIR_WRITE) {
+ // provide a hint if we're likely to read this data back
+ const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;
+
+ bufferlist bl;
+ bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf),
+ u->xfer_buflen ) );
+
+ map<string,bufferptr> attrset;
+ map<string, bufferlist> omaps;
+ // enqueue a write transaction on the collection's handle
+ ObjectStore::Transaction t;
+ char ver_key[64];
+
+ // fill attrs if any
+ if (o->oi_attr_len_high) {
+ ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low);
+ // fill with the garbage as we do not care of the actual content...
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number(
+ o->oi_attr_len_low, o->oi_attr_len_high));
+ attrset["_"] = job->one_for_all_data;
+ }
+ if (o->snapset_attr_len_high) {
+ ceph_assert(o->snapset_attr_len_high >= o->snapset_attr_len_low);
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number
+ (o->snapset_attr_len_low, o->snapset_attr_len_high));
+ attrset["snapset"] = job->one_for_all_data;
+
+ }
+ if (o->_fastinfo_omap_len_high) {
+ ceph_assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low);
+ // fill with the garbage as we do not care of the actual content...
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number(
+ o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high));
+ omaps["_fastinfo"].append(job->one_for_all_data);
+ }
+
+ uint64_t pglog_trim_head = 0, pglog_trim_tail = 0;
+ uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0;
+ if (o->simulate_pglog) {
+
+ uint64_t pglog_ver_cnt = 0;
+ {
+ std::lock_guard<std::mutex> l(*coll.lock);
+ pglog_ver_cnt = coll.pglog_ver_head++;
+ if (o->pglog_omap_len_high &&
+ pglog_ver_cnt >=
+ coll.pglog_ver_tail +
+ g_conf()->osd_min_pg_log_entries + g_conf()->osd_pg_log_trim_min) {
+ pglog_trim_tail = coll.pglog_ver_tail;
+ coll.pglog_ver_tail = pglog_trim_head =
+ pglog_trim_tail + g_conf()->osd_pg_log_trim_min;
+
+ if (o->pglog_dup_omap_len_high &&
+ pglog_ver_cnt >=
+ coll.pglog_dup_ver_tail + g_conf()->osd_pg_log_dups_tracked +
+ g_conf()->osd_pg_log_trim_min) {
+ pglog_dup_trim_tail = coll.pglog_dup_ver_tail;
+ coll.pglog_dup_ver_tail = pglog_dup_trim_head =
+ pglog_dup_trim_tail + g_conf()->osd_pg_log_trim_min;
+ }
+ }
+ }
+
+ if (o->pglog_omap_len_high) {
+ ceph_assert(o->pglog_omap_len_high >= o->pglog_omap_len_low);
+ snprintf(ver_key, sizeof(ver_key),
+ "0000000011.%020llu", (unsigned long long)pglog_ver_cnt);
+ // fill with the garbage as we do not care of the actual content...
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number(
+ o->pglog_omap_len_low, o->pglog_omap_len_high));
+ omaps[ver_key].append(job->one_for_all_data);
+ }
+ if (o->pglog_dup_omap_len_high) {
+ //insert dup
+ ceph_assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low);
+ for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
+ snprintf(ver_key, sizeof(ver_key),
+ "dup_0000000011.%020llu", (unsigned long long)i);
+ // fill with the garbage as we do not care of the actual content...
+ job->one_for_all_data.set_length(
+ ceph::util::generate_random_number(
+ o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high));
+ omaps[ver_key].append(job->one_for_all_data);
+ }
+ }
+ }
+
+ if (attrset.size()) {
+ t.setattrs(coll.cid, object.oid, attrset);
+ }
+ t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
+
+ set<string> rmkeys;
+ for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
+ snprintf(ver_key, sizeof(ver_key),
+ "0000000011.%020llu", (unsigned long long)i);
+ rmkeys.emplace(ver_key);
+ }
+ for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) {
+ snprintf(ver_key, sizeof(ver_key),
+ "dup_0000000011.%020llu", (unsigned long long)i);
+ rmkeys.emplace(ver_key);
+ }
+
+ if (rmkeys.size()) {
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys);
+ }
+
+ if (omaps.size()) {
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.omap_setkeys(coll.cid, pgmeta_oid, omaps);
+ }
+ t.register_on_commit(new UnitComplete(u));
+ os->queue_transaction(coll.ch,
+ std::move(t));
+ return FIO_Q_QUEUED;
+ }
+
+ if (u->ddir == DDIR_READ) {
+ // ObjectStore reads are synchronous, so make the call and return COMPLETED
+ bufferlist bl;
+ int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl);
+ if (r < 0) {
+ u->error = r;
+ td_verror(td, u->error, "xfer");
+ } else {
+ bl.begin().copy(bl.length(), static_cast<char*>(u->xfer_buf));
+ u->resid = u->xfer_buflen - r;
+ }
+ return FIO_Q_COMPLETED;
+ }
+
+ derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl;
+ u->error = -EINVAL;
+ td_verror(td, u->error, "xfer");
+ return FIO_Q_COMPLETED;
+}
+
+int fio_ceph_os_commit(thread_data* td)
+{
+ // commit() allows the engine to batch up queued requests to be submitted all
+ // at once. it would be natural for queue() to collect transactions in a list,
+ // and use commit() to pass them all to ObjectStore::queue_transactions(). but
+ // because we spread objects over multiple collections, we a) need to use a
+ // different sequencer for each collection, and b) are less likely to see a
+ // benefit from batching requests within a collection
+ return 0;
+}
+
+// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
+// prevent fio from creating the files
+int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; }
+int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; }
+
+int fio_ceph_os_io_u_init(thread_data* td, io_u* u)
+{
+ // no data is allocated, we just use the pointer as a boolean 'completed' flag
+ u->engine_data = nullptr;
+ return 0;
+}
+
+void fio_ceph_os_io_u_free(thread_data* td, io_u* u)
+{
+ u->engine_data = nullptr;
+}
+
+
+// ioengine_ops for get_ioengine()
+struct ceph_ioengine : public ioengine_ops {
+ ceph_ioengine() : ioengine_ops({}) {
+ name = "ceph-os";
+ version = FIO_IOOPS_VERSION;
+ flags = FIO_DISKLESSIO;
+ setup = fio_ceph_os_setup;
+ queue = fio_ceph_os_queue;
+ commit = fio_ceph_os_commit;
+ getevents = fio_ceph_os_getevents;
+ event = fio_ceph_os_event;
+ cleanup = fio_ceph_os_cleanup;
+ open_file = fio_ceph_os_open;
+ close_file = fio_ceph_os_close;
+ io_u_init = fio_ceph_os_io_u_init;
+ io_u_free = fio_ceph_os_io_u_free;
+ options = ceph_options.data();
+ option_struct_size = sizeof(struct Options);
+ }
+};
+
+} // anonymous namespace
+
+extern "C" {
+// the exported fio engine interface
+void get_ioengine(struct ioengine_ops** ioengine_ptr) {
+ static ceph_ioengine ioengine;
+ *ioengine_ptr = &ioengine;
+}
+} // extern "C"
diff --git a/src/test/fio/fio_librgw.cc b/src/test/fio/fio_librgw.cc
new file mode 100644
index 000000000..b088b68f9
--- /dev/null
+++ b/src/test/fio/fio_librgw.cc
@@ -0,0 +1,542 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <stdint.h>
+#include <tuple>
+#include <vector>
+#include <functional>
+#include <iostream>
+
+#include <semaphore.h> // XXX kill this?
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include "fmt/include/fmt/format.h"
+
+#include "include/rados/librgw.h"
+#include "include/rados/rgw_file.h"
+//#include "rgw/rgw_file.h"
+//#include "rgw/rgw_lib_frontend.h" // direct requests
+
+/* naughty fio.h leaks min and max as C macros--include it last */
+#include <fio.h>
+#include <optgroup.h>
+#undef min
+#undef max
+
+namespace {
+
+ struct librgw_iou {
+ struct io_u *io_u;
+ int io_complete;
+ };
+
+ struct librgw_data {
+ io_u** aio_events;
+ librgw_t rgw_h;
+ rgw_fs* fs;
+ rgw_file_handle* bucket_fh;
+
+ std::vector<rgw_file_handle*> fh_vec;
+
+ librgw_data(thread_data* td)
+ : rgw_h(nullptr), fs(nullptr), bucket_fh(nullptr)
+ {
+ auto size = td->o.iodepth * sizeof(io_u*);
+ aio_events = static_cast<io_u**>(malloc(size));
+ memset(aio_events, 0, size);
+ }
+
+ void save_handle(rgw_file_handle* fh) {
+ fh_vec.push_back(fh);
+ }
+
+ void release_handles() {
+ for (auto object_fh : fh_vec) {
+ rgw_fh_rele(fs, object_fh, RGW_FH_RELE_FLAG_NONE);
+ }
+ fh_vec.clear();
+ }
+
+ ~librgw_data() {
+ free(aio_events);
+ }
+ };
+
+ struct opt_struct {
+ struct thread_data *td;
+
+ const char* config; /* can these be std::strings? */
+ const char* cluster;
+ const char* name; // instance?
+ const char* init_args;
+ const char* access_key;
+ const char* secret_key;
+ const char* userid;
+ const char* bucket_name;
+
+ uint32_t owner_uid = 867;
+ uint32_t owner_gid = 5309;
+ };
+
+ uint32_t create_mask = RGW_SETATTR_UID | RGW_SETATTR_GID | RGW_SETATTR_MODE;
+
+/* borrowed from fio_ceph_objectstore */
+ template <class F>
+ fio_option make_option(F&& func)
+ {
+ // zero-initialize and set common defaults
+ auto o = fio_option{};
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_INVALID;
+ func(std::ref(o));
+ return o;
+ }
+
+ static std::vector<fio_option> options = {
+ make_option([] (fio_option& o) {
+ o.name = "ceph_conf";
+ o.lname = "ceph configuration file";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "Path to ceph.conf file";
+ o.off1 = offsetof(opt_struct, config);
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "ceph_name";
+ o.lname = "ceph instance name";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "Name of this program instance";
+ o.off1 = offsetof(opt_struct, name);
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_INVALID;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "ceph_cluster";
+ o.lname = "ceph cluster name";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "Name of ceph cluster (default=ceph)";
+ o.off1 = offsetof(opt_struct, cluster);
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_INVALID;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "ceph_init_args";
+ o.lname = "ceph init args";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "Extra ceph arguments (e.g., -d --debug-rgw=16)";
+ o.off1 = offsetof(opt_struct, init_args);
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_INVALID;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "access_key";
+ o.lname = "AWS access key";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "AWS access key";
+ o.off1 = offsetof(opt_struct, access_key);
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_INVALID;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "secret_key";
+ o.lname = "AWS secret key";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "AWS secret key";
+ o.off1 = offsetof(opt_struct, secret_key);
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_INVALID;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "userid";
+ o.lname = "userid";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "userid corresponding to access key";
+ o.off1 = offsetof(opt_struct, userid);
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_INVALID;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "bucket_name";
+ o.lname = "S3 bucket";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "S3 bucket to operate on";
+ o.off1 = offsetof(opt_struct, bucket_name);
+ o.category = FIO_OPT_C_ENGINE;
+ o.group = FIO_OPT_G_INVALID;
+ }),
+ {} // fio expects a 'null'-terminated list
+ };
+
+ struct save_args {
+ int argc;
+ char *argv[8];
+ save_args() : argc(1)
+ {
+ argv[0] = strdup("librgw");
+ for (int ix = 1; ix < 8; ++ix) {
+ argv[ix] = nullptr;
+ }
+ }
+
+ void push_arg(const std::string sarg) {
+ argv[argc++] = strdup(sarg.c_str());
+ }
+
+ ~save_args() {
+ for (int ix = 0; ix < argc; ++ix) {
+ argv[ix] = nullptr;
+ }
+ }
+ } args;
+
+/*
+ * It looks like the setup function is called once, on module load.
+ * It's not documented in the skeleton driver.
+ */
+ static int fio_librgw_setup(struct thread_data* td)
+ {
+ opt_struct& o = *(reinterpret_cast<opt_struct*>(td->eo));
+ librgw_data* data = nullptr;
+ int r = 0;
+
+ dprint(FD_IO, "fio_librgw_setup\n");
+
+ if (! td->io_ops_data) {
+ data = new librgw_data(td);
+
+ /* init args */
+ std::string sopt;
+ if (o.config) {
+ sopt = fmt::format("--conf={}", o.config);
+ args.push_arg(sopt);
+ }
+ std::cout << o.name << std::endl;
+ if (o.name) {
+ sopt = fmt::format("--name={}", o.name);
+ args.push_arg(sopt);
+ }
+ if (o.cluster) {
+ sopt = fmt::format("--cluster={}", o.cluster);
+ args.push_arg(sopt);
+ }
+ if (o.init_args) {
+ args.push_arg(std::string(o.init_args));
+ }
+
+ r = librgw_create(&data->rgw_h, args.argc, args.argv);
+ if (!! r) {
+ dprint(FD_IO, "librgw_create failed\n");
+ return r;
+ }
+
+ r = rgw_mount2(data->rgw_h, o.userid, o.access_key, o.secret_key, "/",
+ &data->fs, RGW_MOUNT_FLAG_NONE);
+ if (!! r) {
+ dprint(FD_IO, "rgw_mount2 failed\n");
+ return r;
+ }
+
+ /* go ahead and lookup the bucket as well */
+ r = rgw_lookup(data->fs, data->fs->root_fh, o.bucket_name,
+ &data->bucket_fh, nullptr, 0, RGW_LOOKUP_FLAG_NONE);
+ if (! data->bucket_fh) {
+ dprint(FD_IO, "rgw_lookup on bucket %s failed, will create\n",
+ o.bucket_name);
+
+ struct stat st;
+ st.st_uid = o.owner_uid;
+ st.st_gid = o.owner_gid;
+ st.st_mode = 755;
+
+ r = rgw_mkdir(data->fs, data->fs->root_fh, o.bucket_name,
+ &st, create_mask, &data->bucket_fh, RGW_MKDIR_FLAG_NONE);
+ if (! data->bucket_fh) {
+ dprint(FD_IO, "rgw_mkdir for bucket %s failed\n", o.bucket_name);
+ return EINVAL;
+ }
+ }
+
+ td->io_ops_data = data;
+ }
+
+ td->o.use_thread = 1;
+
+ if (r != 0) {
+ abort();
+ }
+
+ return r;
+ }
+
+/*
+ * The init function is called once per thread/process, and should set up
+ * any structures that this io engine requires to keep track of io. Not
+ * required.
+ */
+ static int fio_librgw_init(struct thread_data *td)
+ {
+ dprint(FD_IO, "fio_librgw_init\n");
+ return 0;
+ }
+
+/*
+ * This is paired with the ->init() function and is called when a thread is
+ * done doing io. Should tear down anything setup by the ->init() function.
+ * Not required.
+ *
+ * N.b., the cohort driver made this idempotent by allocating data in
+ * setup, clearing data here if present, and doing nothing in the
+ * subsequent per-thread invocations.
+ */
+ static void fio_librgw_cleanup(struct thread_data *td)
+ {
+ int r = 0;
+
+ dprint(FD_IO, "fio_librgw_cleanup\n");
+
+ /* cleanup specific data */
+ librgw_data* data = static_cast<librgw_data*>(td->io_ops_data);
+ if (data) {
+
+ /* release active handles */
+ data->release_handles();
+
+ if (data->bucket_fh) {
+ r = rgw_fh_rele(data->fs, data->bucket_fh, 0 /* flags */);
+ }
+ r = rgw_umount(data->fs, RGW_UMOUNT_FLAG_NONE);
+ librgw_shutdown(data->rgw_h);
+ td->io_ops_data = nullptr;
+ delete data;
+ }
+ }
+
+/*
+ * The ->prep() function is called for each io_u prior to being submitted
+ * with ->queue(). This hook allows the io engine to perform any
+ * preparatory actions on the io_u, before being submitted. Not required.
+ */
+ static int fio_librgw_prep(struct thread_data *td, struct io_u *io_u)
+ {
+ return 0;
+ }
+
+/*
+ * The ->event() hook is called to match an event number with an io_u.
+ * After the core has called ->getevents() and it has returned eg 3,
+ * the ->event() hook must return the 3 events that have completed for
+ * subsequent calls to ->event() with [0-2]. Required.
+ */
+ static struct io_u *fio_librgw_event(struct thread_data *td, int event)
+ {
+ return NULL;
+ }
+
+/*
+ * The ->getevents() hook is used to reap completion events from an async
+ * io engine. It returns the number of completed events since the last call,
+ * which may then be retrieved by calling the ->event() hook with the event
+ * numbers. Required.
+ */
+ static int fio_librgw_getevents(struct thread_data *td, unsigned int min,
+ unsigned int max, const struct timespec *t)
+ {
+ return 0;
+ }
+
+/*
+ * The ->cancel() hook attempts to cancel the io_u. Only relevant for
+ * async io engines, and need not be supported.
+ */
+ static int fio_librgw_cancel(struct thread_data *td, struct io_u *io_u)
+ {
+ return 0;
+ }
+
+/*
+ * The ->queue() hook is responsible for initiating io on the io_u
+ * being passed in. If the io engine is a synchronous one, io may complete
+ * before ->queue() returns. Required.
+ *
+ * The io engine must transfer in the direction noted by io_u->ddir
+ * to the buffer pointed to by io_u->xfer_buf for as many bytes as
+ * io_u->xfer_buflen. Residual data count may be set in io_u->resid
+ * for a short read/write.
+ */
+ static enum fio_q_status fio_librgw_queue(struct thread_data *td,
+ struct io_u *io_u)
+ {
+ librgw_data* data = static_cast<librgw_data*>(td->io_ops_data);
+ const char* object = io_u->file->file_name;
+ struct rgw_file_handle* object_fh = nullptr;
+ size_t nbytes;
+ int r = 0;
+
+ /*
+ * Double sanity check to catch errant write on a readonly setup
+ */
+ fio_ro_check(td, io_u);
+
+ if (io_u->ddir == DDIR_WRITE) {
+ /* Do full write cycle */
+ r = rgw_lookup(data->fs, data->bucket_fh, object, &object_fh, nullptr, 0,
+ RGW_LOOKUP_FLAG_CREATE);
+ if (!! r) {
+ dprint(FD_IO, "rgw_lookup failed to create filehandle for %s\n",
+ object);
+ goto out;
+ }
+
+ r = rgw_open(data->fs, object_fh, 0 /* posix flags */, 0 /* flags */);
+ if (!! r) {
+ dprint(FD_IO, "rgw_open failed to create filehandle for %s\n",
+ object);
+ rgw_fh_rele(data->fs, object_fh, RGW_FH_RELE_FLAG_NONE);
+ goto out;
+ }
+
+ /* librgw can write at any offset, but only sequentially
+ * starting at 0, in one open/write/close cycle */
+ r = rgw_write(data->fs, object_fh, 0, io_u->xfer_buflen, &nbytes,
+ (void*) io_u->xfer_buf, RGW_WRITE_FLAG_NONE);
+ if (!! r) {
+ dprint(FD_IO, "rgw_write failed for %s\n",
+ object);
+ }
+
+ r = rgw_close(data->fs, object_fh, 0 /* flags */);
+
+ /* object_fh is closed but still reachable, save it */
+ data->save_handle(object_fh);
+ } else if (io_u->ddir == DDIR_READ) {
+
+ r = rgw_lookup(data->fs, data->bucket_fh, object, &object_fh,
+ nullptr, 0, RGW_LOOKUP_FLAG_NONE);
+ if (!! r) {
+ dprint(FD_IO, "rgw_lookup failed to create filehandle for %s\n",
+ object);
+ goto out;
+ }
+
+ r = rgw_open(data->fs, object_fh, 0 /* posix flags */, 0 /* flags */);
+ if (!! r) {
+ dprint(FD_IO, "rgw_open failed to create filehandle for %s\n",
+ object);
+ rgw_fh_rele(data->fs, object_fh, RGW_FH_RELE_FLAG_NONE);
+ goto out;
+ }
+
+ r = rgw_read(data->fs, object_fh, io_u->offset, io_u->xfer_buflen,
+ &nbytes, io_u->xfer_buf, RGW_READ_FLAG_NONE);
+ if (!! r) {
+ dprint(FD_IO, "rgw_read failed for %s\n",
+ object);
+ }
+ } else {
+ dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__,
+ io_u->ddir);
+ }
+
+ if (object_fh) {
+ r = rgw_close(data->fs, object_fh, 0 /* flags */);
+
+ /* object_fh is closed but still reachable, save it */
+ data->save_handle(object_fh);
+ }
+
+ out:
+ /*
+ * Could return FIO_Q_QUEUED for a queued request,
+ * FIO_Q_COMPLETED for a completed request, and FIO_Q_BUSY
+ * if we could queue no more at this point (you'd have to
+ * define ->commit() to handle that.
+ */
+ return FIO_Q_COMPLETED;
+ }
+
+ int fio_librgw_commit(thread_data* td)
+ {
+ // commit() allows the engine to batch up queued requests to be submitted all
+ // at once. it would be natural for queue() to collect transactions in a list,
+ // and use commit() to pass them all to ObjectStore::queue_transactions(). but
+ // because we spread objects over multiple collections, we a) need to use a
+ // different sequencer for each collection, and b) are less likely to see a
+ // benefit from batching requests within a collection
+ return 0;
+ }
+
+/*
+ * Hook for opening the given file. Unless the engine has special
+ * needs, it usually just provides generic_open_file() as the handler.
+ */
+ static int fio_librgw_open(struct thread_data *td, struct fio_file *f)
+ {
+ /* for now, let's try to avoid doing open/close in these hooks */
+ return 0;
+ }
+
+/*
+ * Hook for closing a file. See fio_librgw_open().
+ */
+ static int fio_librgw_close(struct thread_data *td, struct fio_file *f)
+ {
+ /* for now, let's try to avoid doing open/close in these hooks */
+ return 0;
+ }
+
+/* XXX next two probably not needed */
+ int fio_librgw_io_u_init(thread_data* td, io_u* u)
+ {
+ // no data is allocated, we just use the pointer as a boolean 'completed' flag
+ u->engine_data = nullptr;
+ return 0;
+ }
+
+ void fio_librgw_io_u_free(thread_data* td, io_u* u)
+ {
+ u->engine_data = nullptr;
+ }
+
+ struct librgw_ioengine : public ioengine_ops
+ {
+ librgw_ioengine() : ioengine_ops({}) {
+ name = "librgw";
+ version = FIO_IOOPS_VERSION;
+ flags = FIO_DISKLESSIO;
+ setup = fio_librgw_setup;
+ init = fio_librgw_init;
+ queue = fio_librgw_queue;
+ commit = fio_librgw_commit;
+ getevents = fio_librgw_getevents;
+ event = fio_librgw_event;
+ cleanup = fio_librgw_cleanup;
+ open_file = fio_librgw_open;
+ close_file = fio_librgw_close;
+ io_u_init = fio_librgw_io_u_init;
+ io_u_free = fio_librgw_io_u_free;
+ options = ::options.data();
+ option_struct_size = sizeof(opt_struct);
+ }
+ };
+
+} // namespace
+
+extern "C" {
+// the exported fio engine interface
+ void get_ioengine(struct ioengine_ops** ioengine_ptr) {
+ static librgw_ioengine ioengine;
+ *ioengine_ptr = &ioengine;
+ }
+} // extern "C"
diff --git a/src/test/fio/ring_buffer.h b/src/test/fio/ring_buffer.h
new file mode 100644
index 000000000..0e1eb62be
--- /dev/null
+++ b/src/test/fio/ring_buffer.h
@@ -0,0 +1,102 @@
+/*
+ * Very simple and fast lockless ring buffer implementatation for
+ * one producer and one consumer.
+ */
+
+#include <stdint.h>
+#include <stddef.h>
+
+/* Do not overcomplicate, choose generic x86 case */
+#define L1_CACHE_BYTES 64
+#define __cacheline_aligned __attribute__((__aligned__(L1_CACHE_BYTES)))
+
+struct ring_buffer
+{
+ unsigned int read_idx __cacheline_aligned;
+ unsigned int write_idx __cacheline_aligned;
+ unsigned int size;
+ unsigned int low_mask;
+ unsigned int high_mask;
+ unsigned int bit_shift;
+ void *data_ptr;
+};
+
+static inline unsigned int upper_power_of_two(unsigned int v)
+{
+ v--;
+ v |= v >> 1;
+ v |= v >> 2;
+ v |= v >> 4;
+ v |= v >> 8;
+ v |= v >> 16;
+ v++;
+
+ return v;
+}
+
+static inline int ring_buffer_init(struct ring_buffer* rbuf, unsigned int size)
+{
+ /* Must be pow2 */
+ if (((size-1) & size))
+ size = upper_power_of_two(size);
+
+ size *= sizeof(void *);
+ rbuf->data_ptr = malloc(size);
+ rbuf->size = size;
+ rbuf->read_idx = 0;
+ rbuf->write_idx = 0;
+ rbuf->bit_shift = __builtin_ffs(sizeof(void *))-1;
+ rbuf->low_mask = rbuf->size - 1;
+ rbuf->high_mask = rbuf->size * 2 - 1;
+
+ return 0;
+}
+
+static inline void ring_buffer_deinit(struct ring_buffer* rbuf)
+{
+ free(rbuf->data_ptr);
+}
+
+static inline unsigned int ring_buffer_used_size(const struct ring_buffer* rbuf)
+{
+ __sync_synchronize();
+ return ((rbuf->write_idx - rbuf->read_idx) & rbuf->high_mask) >>
+ rbuf->bit_shift;
+}
+
+static inline void ring_buffer_enqueue(struct ring_buffer* rbuf, void *ptr)
+{
+
+ unsigned int idx;
+
+ /*
+ * Be aware: we do not check that buffer can be full,
+ * assume user of the ring buffer can't submit more.
+ */
+
+ idx = rbuf->write_idx & rbuf->low_mask;
+ *(void **)((uintptr_t)rbuf->data_ptr + idx) = ptr;
+ /* Barrier to be sure stored pointer will be seen properly */
+ __sync_synchronize();
+ rbuf->write_idx = (rbuf->write_idx + sizeof(ptr)) & rbuf->high_mask;
+}
+
+static inline void *ring_buffer_dequeue(struct ring_buffer* rbuf)
+{
+
+ unsigned idx;
+ void *ptr;
+
+ /*
+ * Be aware: we do not check that buffer can be empty,
+ * assume user of the ring buffer called ring_buffer_used_size(),
+ * which returns actual used size and introduces memory barrier
+ * explicitly.
+ */
+
+ idx = rbuf->read_idx & rbuf->low_mask;
+ ptr = *(void **)((uintptr_t)rbuf->data_ptr + idx);
+ rbuf->read_idx = (rbuf->read_idx + sizeof(ptr)) & rbuf->high_mask;
+
+ return ptr;
+}