diff options
Diffstat (limited to '')
-rw-r--r-- | src/test/fio/CMakeLists.txt | 21 | ||||
-rw-r--r-- | src/test/fio/README.md | 116 | ||||
-rw-r--r-- | src/test/fio/ceph-bluestore.conf | 20 | ||||
-rw-r--r-- | src/test/fio/ceph-bluestore.fio | 39 | ||||
-rw-r--r-- | src/test/fio/ceph-filestore.conf | 26 | ||||
-rw-r--r-- | src/test/fio/ceph-filestore.fio | 17 | ||||
-rw-r--r-- | src/test/fio/ceph-librgw.fio | 28 | ||||
-rw-r--r-- | src/test/fio/ceph-memstore.conf | 18 | ||||
-rw-r--r-- | src/test/fio/ceph-memstore.fio | 17 | ||||
-rw-r--r-- | src/test/fio/ceph-messenger.conf | 7 | ||||
-rw-r--r-- | src/test/fio/ceph-messenger.fio | 22 | ||||
-rw-r--r-- | src/test/fio/fio_ceph_messenger.cc | 697 | ||||
-rw-r--r-- | src/test/fio/fio_ceph_objectstore.cc | 940 | ||||
-rw-r--r-- | src/test/fio/fio_librgw.cc | 542 | ||||
-rw-r--r-- | src/test/fio/ring_buffer.h | 102 |
15 files changed, 2612 insertions, 0 deletions
diff --git a/src/test/fio/CMakeLists.txt b/src/test/fio/CMakeLists.txt new file mode 100644 index 000000000..60731a0c5 --- /dev/null +++ b/src/test/fio/CMakeLists.txt @@ -0,0 +1,21 @@ +# ObjectStore +add_library(fio_ceph_objectstore SHARED fio_ceph_objectstore.cc) +target_link_libraries(fio_ceph_objectstore fio) + +# Messenger +add_library(fio_ceph_messenger SHARED fio_ceph_messenger.cc) +target_link_libraries(fio_ceph_messenger fio) + +# librgw +add_library(fio_librgw SHARED fio_librgw.cc) +target_link_libraries(fio_librgw rgw fio) + +target_link_libraries(fio_ceph_objectstore os global) +install(TARGETS fio_ceph_objectstore DESTINATION lib) + +target_link_libraries(fio_ceph_messenger os global) +install(TARGETS fio_ceph_messenger DESTINATION lib) + +target_link_libraries(fio_librgw os global rgw) +install(TARGETS fio_librgw DESTINATION lib) + diff --git a/src/test/fio/README.md b/src/test/fio/README.md new file mode 100644 index 000000000..91a98af9e --- /dev/null +++ b/src/test/fio/README.md @@ -0,0 +1,116 @@ +FIO +=== + +Ceph uses the fio workload generator and benchmarking utility. +(https://github.com/axboe/fio.git) + +FIO tool is automatically fetched to build/src/fio, and build if necessary. + +RBD +--- + +The fio engine for rbd is located in the fio tree itself, so you'll need to +build it from source. + +If you install the ceph libraries to a location that isn't in your +LD_LIBRARY_PATH, be sure to add it: + + export LD_LIBRARY_PATH=/path/to/install/lib + +To build fio with rbd: + + ./configure --extra-cflags="-I/path/to/install/include -L/path/to/install/lib" + make + +If configure fails with "Rados Block Device engine no", see config.log for +details and adjust the cflags as necessary. + +If ceph was compiled with tcmalloc, it may be necessary to compile fio with: + make EXTLIBS=tcmalloc +Otherwise fio might crash in malloc_usable_size(). + +To view the fio options specific to the rbd engine: + + ./fio --enghelp=rbd + +See examples/rbd.fio for an example job file. To run: + + ./fio examples/rbd.fio + +ObjectStore +----------- + +This fio engine allows you to mount and use a ceph object store directly, +without having to build a ceph cluster or start any daemons. + +Because the ObjectStore is not a public-facing interface, we build it inside +of the ceph tree and load libfio_ceph_objectstore.so into fio as an external +engine. + +To build fio_ceph_objectstore run: +``` + ./do_cmake.sh -DWITH_FIO=ON + cd build + make fio_ceph_objectstore +``` +This will fetch FIO to build/src/fio directory, +compile fio tool and libfio_ceph_objectstore.so. + +If you install the ceph libraries to a location that isn't in your +LD_LIBRARY_PATH, be sure to add it: + + export LD_LIBRARY_PATH=/path/to/install/lib + +To view the fio options specific to the objectstore engine: + + ./fio --enghelp=libfio_ceph_objectstore.so + +The conf= option requires a ceph configuration file (ceph.conf). Example job +and conf files for each object store are provided in the same directory as +this README. + +To run: + + ./fio /path/to/job.fio + +RADOS +----- + +By default FIO can be compiled with support for RADOS. +When ceph is installed in your system default compilation of FIO includes RADOS ioengine. +If you installed ceph in any other place (cmake -DCMAKE_INSTALL_PREFIX=${CEPH_INSTALL_ROOT} ..) you can build FIO following way: + + LIBS="-lrados -ltcmalloc" LDFLAGS="-L${CEPH_INSTALL_ROOT}/lib" EXTFLAGS="-I${CEPH_INSTALL_ROOT}/include" \ + rados=yes ./configure + LIBS="-lrados -ltcmalloc" LDFLAGS="-L${CEPH_INSTALL_ROOT}/lib" EXTFLAGS="-I${CEPH_INSTALL_ROOT}/include" \ + rados=yes make + +"-ltcmalloc" is necessary if ceph was compiled with tcmalloc. + +Messenger +--------- + +This fio engine allows you to test CEPH messenger transport layer, without +any disk activities involved. + +To build fio_ceph_messenger: +``` + ./do_cmake.sh -DWITH_FIO=ON + cd build + make fio_ceph_messenger +``` +If you install the ceph libraries to a location that isn't in your +LD_LIBRARY_PATH, be sure to add it: + + export LD_LIBRARY_PATH=/path/to/install/lib + +To view the fio options specific to the messenger engine: + + ./fio --enghelp=libfio_ceph_messenger.so + +The ceph_conf_file= option requires a ceph configuration file (ceph.conf), +see ceph-messenger.conf and ceph-messenger.fio for details. + +To run: + + ./fio ./ceph-messenger.fio diff --git a/src/test/fio/ceph-bluestore.conf b/src/test/fio/ceph-bluestore.conf new file mode 100644 index 000000000..6dd4f1afa --- /dev/null +++ b/src/test/fio/ceph-bluestore.conf @@ -0,0 +1,20 @@ +# example configuration file for ceph-bluestore.fio + +[global] + debug bluestore = 0/0 + debug bluefs = 0/0 + debug bdev = 0/0 + debug rocksdb = 0/0 + # spread objects over 8 collections + osd pool default pg num = 8 + # increasing shards can help when scaling number of collections + osd op num shards = 5 + +[osd] + osd objectstore = bluestore + + # use directory= option from fio job file + osd data = ${fio_dir} + + # log inside fio_dir + log file = ${fio_dir}/log diff --git a/src/test/fio/ceph-bluestore.fio b/src/test/fio/ceph-bluestore.fio new file mode 100644 index 000000000..dbadf701a --- /dev/null +++ b/src/test/fio/ceph-bluestore.fio @@ -0,0 +1,39 @@ +# Runs a 64k random write test against the ceph BlueStore. +[global] +ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH + +conf=ceph-bluestore.conf # must point to a valid ceph configuration file +directory=/mnt/fio-bluestore # directory for osd_data + +#oi_attr_len=350-4000 # specifies OI(aka '_') attribute length range to couple + # writes with. Default: 0 (disabled) + +#snapset_attr_len=35 # specifies snapset attribute length range to couple + # writes with. Default: 0 (disabled) + +#_fastinfo_omap_len=186 # specifies _fastinfo omap entry length range to + # couple writes with. Default: 0 (disabled) + +#pglog_simulation=1 # couples write and omap generation in OSD PG log manner. + # Ceph's osd_min_pg_log_entries, osd_pg_log_trim_min, + # osd_pg_log_dups_tracked settings control cyclic + # omap keys creation/removal. + # Following additional FIO pglog_ settings to apply too: + +#pglog_omap_len=173 # specifies PG log entry length range to couple + # writes with. Default: 0 (disabled) + +#pglog_dup_omap_len=57 # specifies duplicate PG log entry length range + # to couple writes with. Default: 0 (disabled) +#single_pool_mode=0 # Enables the mode when all jobs run against for the same pool. + +rw=randwrite +iodepth=16 + +time_based=1 +runtime=20s + +[bluestore] +nr_files=64 +size=256m +bs=64k diff --git a/src/test/fio/ceph-filestore.conf b/src/test/fio/ceph-filestore.conf new file mode 100644 index 000000000..06266656a --- /dev/null +++ b/src/test/fio/ceph-filestore.conf @@ -0,0 +1,26 @@ +# example configuration file for ceph-filestore.fio + +[global] + debug filestore = 0/0 + debug journal = 0/0 + + # spread objects over 8 collections + osd pool default pg num = 8 + # increasing shards can help when scaling number of collections + osd op num shards = 5 + + filestore fd cache size = 32 + +[osd] + osd objectstore = filestore + + # use directory= option from fio job file + osd data = ${fio_dir} + + # journal inside fio_dir + osd journal = ${fio_dir}/journal + osd journal size = 500 + journal force aio = 1 + + # log outside fio_dir + log file = ${fio_dir}.log diff --git a/src/test/fio/ceph-filestore.fio b/src/test/fio/ceph-filestore.fio new file mode 100644 index 000000000..bb93c8df6 --- /dev/null +++ b/src/test/fio/ceph-filestore.fio @@ -0,0 +1,17 @@ +# Runs a 64k random write test against the ceph FileStore. +[global] +ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH + +conf=ceph-filestore.conf # must point to a valid ceph configuration file +directory=/mnt/fio-filestore # directory for osd_data + +rw=randwrite +iodepth=16 + +time_based=1 +runtime=20s + +[filestore] +nr_files=64 +size=256m +bs=64k diff --git a/src/test/fio/ceph-librgw.fio b/src/test/fio/ceph-librgw.fio new file mode 100644 index 000000000..fefca6f84 --- /dev/null +++ b/src/test/fio/ceph-librgw.fio @@ -0,0 +1,28 @@ +# +# example jobfile, e.g.: +# fio --max-jobs=20 /lv2tb/ceph-cp/src/test/fio/ceph-librgw.fio +# +[global] +ioengine=external:/home/mbenjamin/ceph-cp/build/lib/libfio_librgw.so +name=fiotest +direct=0 +access_key=${AWS_ACCESS_KEY_ID} +secret_key=${AWS_SECRET_ACCESS_KEY} +userid=testuser +ceph_cluster=ceph +ceph_conf=/home/mbenjamin/ceph-cp/build/ceph.conf +#in current impl, there is only one, global bucket +bucket_name=fiotest +thread=1 +nr_files=8 +bs=256k +size=256k + +[rgw_randwrite] +numjobs=8 +rw=rw +rwmixread=70 +rwmixwrite=30 +offset=0 +time_based=1 +runtime=30s diff --git a/src/test/fio/ceph-memstore.conf b/src/test/fio/ceph-memstore.conf new file mode 100644 index 000000000..3553d1bcf --- /dev/null +++ b/src/test/fio/ceph-memstore.conf @@ -0,0 +1,18 @@ +# example configuration file for ceph-memstore.fio + +[global] + debug filestore = 0 + + # spread objects over 8 collections + osd pool default pg num = 8 + # increasing shards can help when scaling number of collections + osd op num shards = 5 + +[osd] + osd objectstore = memstore + + # use directory= option from fio job file + osd data = ${fio_dir} + + # log inside fio_dir + log file = ${fio_dir}/log diff --git a/src/test/fio/ceph-memstore.fio b/src/test/fio/ceph-memstore.fio new file mode 100644 index 000000000..ceb6671d7 --- /dev/null +++ b/src/test/fio/ceph-memstore.fio @@ -0,0 +1,17 @@ +# Runs a 64k random write test against the ceph MemStore. +[global] +ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH + +conf=ceph-memstore.conf # must point to a valid ceph configuration file +directory=/mnt/fio-memstore # directory for osd_data + +rw=randwrite +iodepth=16 + +time_based=1 +runtime=20s + +[memstore] +nr_files=64 +size=256m +bs=64k diff --git a/src/test/fio/ceph-messenger.conf b/src/test/fio/ceph-messenger.conf new file mode 100644 index 000000000..8d83d3613 --- /dev/null +++ b/src/test/fio/ceph-messenger.conf @@ -0,0 +1,7 @@ +[global] + +ms_type=async+posix +ms_crc_data=false +ms_crc_header=false +ms_dispatch_throttle_bytes=0 +debug_ms=0/0 diff --git a/src/test/fio/ceph-messenger.fio b/src/test/fio/ceph-messenger.fio new file mode 100644 index 000000000..20115cb8d --- /dev/null +++ b/src/test/fio/ceph-messenger.fio @@ -0,0 +1,22 @@ +[global] +bs=4k +size=1g +iodepth=128 + +ioengine=libfio_ceph_messenger.so +#ceph_conf_file=ceph-messenger.conf + +# In order to select protocol explicitly add 'v1:' or 'v2:' prefix. +# By default v2 is used. +hostname=127.0.0.1 +port=5555 + +ms_type=async+posix # or async+dpdk or async+rdma + +[client] +receiver=0 +rw=write + +[server] +receiver=1 +rw=read diff --git a/src/test/fio/fio_ceph_messenger.cc b/src/test/fio/fio_ceph_messenger.cc new file mode 100644 index 000000000..4a4cf4fb5 --- /dev/null +++ b/src/test/fio/fio_ceph_messenger.cc @@ -0,0 +1,697 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * CEPH messenger engine + * + * FIO engine which uses ceph messenger as a transport. See corresponding + * FIO client and server jobs for details. + */ + +#include "global/global_init.h" +#include "msg/Messenger.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "common/perf_counters.h" +#include "auth/DummyAuth.h" +#include "ring_buffer.h" + +#include <fio.h> +#include <flist.h> +#include <optgroup.h> + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_ + +enum ceph_msgr_type { + CEPH_MSGR_TYPE_UNDEF, + CEPH_MSGR_TYPE_POSIX, + CEPH_MSGR_TYPE_DPDK, + CEPH_MSGR_TYPE_RDMA, +}; + +const char *ceph_msgr_types[] = { "undef", "async+posix", + "async+dpdk", "async+rdma" }; + +struct ceph_msgr_options { + struct thread_data *td__; + unsigned int is_receiver; + unsigned int is_single; + unsigned int port; + const char *hostname; + const char *conffile; + enum ceph_msgr_type ms_type; +}; + +class FioDispatcher; + +struct ceph_msgr_data { + ceph_msgr_data(struct ceph_msgr_options *o_, unsigned iodepth) : + o(o_) { + INIT_FLIST_HEAD(&io_inflight_list); + INIT_FLIST_HEAD(&io_pending_list); + ring_buffer_init(&io_completed_q, iodepth); + pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE); + } + + struct ceph_msgr_options *o; + Messenger *msgr = NULL; + FioDispatcher *disp = NULL; + pthread_spinlock_t spin; + struct ring_buffer io_completed_q; + struct flist_head io_inflight_list; + struct flist_head io_pending_list; + unsigned int io_inflight_nr = 0; + unsigned int io_pending_nr = 0; +}; + +struct ceph_msgr_io { + struct flist_head list; + struct ceph_msgr_data *data; + struct io_u *io_u; + MOSDOp *req_msg; /** Cached request, valid only for sender */ +}; + +struct ceph_msgr_reply_io { + struct flist_head list; + MOSDOpReply *rep; +}; + +static void *str_to_ptr(const std::string &str) +{ + return (void *)strtoul(str.c_str(), NULL, 16); +} + +static std::string ptr_to_str(void *ptr) +{ + char buf[32]; + + snprintf(buf, sizeof(buf), "%llx", (unsigned long long)ptr); + return std::string(buf); +} + +/* + * Used for refcounters print on the last context put, almost duplicates + * global context refcounter, sigh. + */ +static std::atomic<int> ctx_ref(1); +static DummyAuthClientServer *g_dummy_auth; + +static void create_or_get_ceph_context(struct ceph_msgr_options *o) +{ + if (g_ceph_context) { + g_ceph_context->get(); + ctx_ref++; + return; + } + + boost::intrusive_ptr<CephContext> cct; + vector<const char*> args; + + if (o->conffile) + args = { "--conf", o->conffile }; + + cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + /* Will use g_ceph_context instead */ + cct.detach(); + + common_init_finish(g_ceph_context); + g_ceph_context->_conf.apply_changes(NULL); + g_dummy_auth = new DummyAuthClientServer(g_ceph_context); + g_dummy_auth->auth_registry.refresh_config(); +} + +static void put_ceph_context(void) +{ + if (--ctx_ref == 0) { + ostringstream ostr; + Formatter* f; + + f = Formatter::create("json-pretty"); + g_ceph_context->get_perfcounters_collection()->dump_formatted(f, false); + ostr << ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl; + f->flush(ostr); + ostr << ">>>>>>>>>>>>> PERFCOUNTERS END <<<<<<<<<<<<" << std::endl; + + delete f; + delete g_dummy_auth; + dout(0) << ostr.str() << dendl; + } + + g_ceph_context->put(); +} + +static void ceph_msgr_sender_on_reply(const object_t &oid) +{ + struct ceph_msgr_data *data; + struct ceph_msgr_io *io; + + /* + * Here we abuse object and use it as a raw pointer. Since this is + * only for benchmarks and testing we do not care about anything + * but performance. So no need to use global structure in order + * to search for reply, just send a pointer and get it back. + */ + + io = (decltype(io))str_to_ptr(oid.name); + data = io->data; + ring_buffer_enqueue(&data->io_completed_q, (void *)io); +} + + +class ReplyCompletion : public Message::CompletionHook { + struct ceph_msgr_io *m_io; + +public: + ReplyCompletion(MOSDOpReply *rep, struct ceph_msgr_io *io) : + Message::CompletionHook(rep), + m_io(io) { + } + void finish(int err) override { + struct ceph_msgr_data *data = m_io->data; + + ring_buffer_enqueue(&data->io_completed_q, (void *)m_io); + } +}; + +static void ceph_msgr_receiver_on_request(struct ceph_msgr_data *data, + MOSDOp *req) +{ + MOSDOpReply *rep; + + rep = new MOSDOpReply(req, 0, 0, 0, false); + rep->set_connection(req->get_connection()); + + pthread_spin_lock(&data->spin); + if (data->io_inflight_nr) { + struct ceph_msgr_io *io; + + data->io_inflight_nr--; + io = flist_first_entry(&data->io_inflight_list, + struct ceph_msgr_io, list); + flist_del(&io->list); + pthread_spin_unlock(&data->spin); + + rep->set_completion_hook(new ReplyCompletion(rep, io)); + rep->get_connection()->send_message(rep); + } else { + struct ceph_msgr_reply_io *rep_io; + + rep_io = (decltype(rep_io))malloc(sizeof(*rep_io)); + rep_io->rep = rep; + + data->io_pending_nr++; + flist_add_tail(&rep_io->list, &data->io_pending_list); + pthread_spin_unlock(&data->spin); + } +} + +class FioDispatcher : public Dispatcher { + struct ceph_msgr_data *m_data; + +public: + FioDispatcher(struct ceph_msgr_data *data): + Dispatcher(g_ceph_context), + m_data(data) { + } + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch(const Message *m) const override { + switch (m->get_type()) { + case CEPH_MSG_OSD_OP: + return m_data->o->is_receiver; + case CEPH_MSG_OSD_OPREPLY: + return !m_data->o->is_receiver; + default: + return false; + } + } + void ms_handle_fast_connect(Connection *con) override { + } + void ms_handle_fast_accept(Connection *con) override { + } + bool ms_dispatch(Message *m) override { + return true; + } + void ms_fast_dispatch(Message *m) override { + if (m_data->o->is_receiver) { + MOSDOp *req; + + /* + * Server side, handle request. + */ + + req = static_cast<MOSDOp*>(m); + req->finish_decode(); + + ceph_msgr_receiver_on_request(m_data, req); + } else { + MOSDOpReply *rep; + + /* + * Client side, get reply, extract objid and mark + * IO as completed. + */ + + rep = static_cast<MOSDOpReply*>(m); + ceph_msgr_sender_on_reply(rep->get_oid()); + } + m->put(); + } + bool ms_handle_reset(Connection *con) override { + return true; + } + void ms_handle_remote_reset(Connection *con) override { + } + bool ms_handle_refused(Connection *con) override { + return false; + } + int ms_handle_authentication(Connection *con) override { + return 1; + } +}; + +static entity_addr_t hostname_to_addr(struct ceph_msgr_options *o) +{ + entity_addr_t addr; + + addr.parse(o->hostname); + addr.set_port(o->port); + addr.set_nonce(0); + + return addr; +} + +static Messenger *create_messenger(struct ceph_msgr_options *o) +{ + entity_name_t ename = o->is_receiver ? + entity_name_t::OSD(0) : entity_name_t::CLIENT(0); + std::string lname = o->is_receiver ? + "receiver" : "sender"; + + std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ? + ceph_msgr_types[o->ms_type] : + g_ceph_context->_conf.get_val<std::string>("ms_type"); + + /* o->td__>pid doesn't set value, so use getpid() instead*/ + auto nonce = o->is_receiver ? 0 : (getpid() + o->td__->thread_number); + Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(), + ename, lname, nonce); + if (o->is_receiver) { + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + msgr->bind(hostname_to_addr(o)); + } else { + msgr->set_default_policy(Messenger::Policy::lossless_client(0)); + } + msgr->set_auth_client(g_dummy_auth); + msgr->set_auth_server(g_dummy_auth); + msgr->set_require_authorizer(false); + msgr->start(); + + return msgr; +} + +static Messenger *single_msgr; +static std::atomic<int> single_msgr_ref; +static vector<FioDispatcher *> single_msgr_disps; + +static void init_messenger(struct ceph_msgr_data *data) +{ + struct ceph_msgr_options *o = data->o; + FioDispatcher *disp; + Messenger *msgr; + + disp = new FioDispatcher(data); + if (o->is_single) { + /* + * Single messenger instance for the whole FIO + */ + + if (!single_msgr) { + msgr = create_messenger(o); + single_msgr = msgr; + } else { + msgr = single_msgr; + } + single_msgr_disps.push_back(disp); + single_msgr_ref++; + } else { + /* + * Messenger instance per FIO thread + */ + msgr = create_messenger(o); + } + msgr->add_dispatcher_head(disp); + + data->disp = disp; + data->msgr = msgr; +} + +static void free_messenger(struct ceph_msgr_data *data) +{ + data->msgr->shutdown(); + data->msgr->wait(); + delete data->msgr; +} + +static void put_messenger(struct ceph_msgr_data *data) +{ + struct ceph_msgr_options *o = data->o; + + if (o->is_single) { + if (--single_msgr_ref == 0) { + free_messenger(data); + /* + * In case of a single messenger instance we have to + * free dispatchers after actual messenger destruction. + */ + for (auto disp : single_msgr_disps) + delete disp; + single_msgr = NULL; + } + } else { + free_messenger(data); + delete data->disp; + } + data->disp = NULL; + data->msgr = NULL; +} + +static int fio_ceph_msgr_setup(struct thread_data *td) +{ + struct ceph_msgr_options *o = (decltype(o))td->eo; + o->td__ = td; + ceph_msgr_data *data; + + /* We have to manage global resources so we use threads */ + td->o.use_thread = 1; + + create_or_get_ceph_context(o); + + if (!td->io_ops_data) { + data = new ceph_msgr_data(o, td->o.iodepth); + init_messenger(data); + td->io_ops_data = (void *)data; + } + + return 0; +} + +static void fio_ceph_msgr_cleanup(struct thread_data *td) +{ + struct ceph_msgr_data *data; + unsigned nr; + + data = (decltype(data))td->io_ops_data; + put_messenger(data); + + nr = ring_buffer_used_size(&data->io_completed_q); + if (nr) + fprintf(stderr, "fio: io_completed_nr==%d, but should be zero\n", + nr); + if (data->io_inflight_nr) + fprintf(stderr, "fio: io_inflight_nr==%d, but should be zero\n", + data->io_inflight_nr); + if (data->io_pending_nr) + fprintf(stderr, "fio: io_pending_nr==%d, but should be zero\n", + data->io_pending_nr); + if (!flist_empty(&data->io_inflight_list)) + fprintf(stderr, "fio: io_inflight_list is not empty\n"); + if (!flist_empty(&data->io_pending_list)) + fprintf(stderr, "fio: io_pending_list is not empty\n"); + + ring_buffer_deinit(&data->io_completed_q); + delete data; + put_ceph_context(); +} + +static int fio_ceph_msgr_io_u_init(struct thread_data *td, struct io_u *io_u) +{ + struct ceph_msgr_options *o = (decltype(o))td->eo; + struct ceph_msgr_io *io; + MOSDOp *req_msg = NULL; + + io = (decltype(io))malloc(sizeof(*io)); + io->io_u = io_u; + io->data = (decltype(io->data))td->io_ops_data; + + if (!o->is_receiver) { + object_t oid(ptr_to_str(io)); + pg_t pgid; + object_locator_t oloc; + hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + spg_t spgid(pgid); + entity_inst_t dest(entity_name_t::OSD(0), hostname_to_addr(o)); + + Messenger *msgr = io->data->msgr; + ConnectionRef con = msgr->connect_to(dest.name.type(), + entity_addrvec_t(dest.addr)); + + req_msg = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); + req_msg->set_connection(con); + } + + io->req_msg = req_msg; + io_u->engine_data = (void *)io; + + return 0; +} + +static void fio_ceph_msgr_io_u_free(struct thread_data *td, struct io_u *io_u) +{ + struct ceph_msgr_io *io; + + io = (decltype(io))io_u->engine_data; + if (io) { + io_u->engine_data = NULL; + if (io->req_msg) + io->req_msg->put(); + free(io); + } +} + +static enum fio_q_status ceph_msgr_sender_queue(struct thread_data *td, + struct io_u *io_u) +{ + struct ceph_msgr_data *data; + struct ceph_msgr_io *io; + + bufferlist buflist = bufferlist::static_from_mem( + (char *)io_u->buf, io_u->buflen); + + io = (decltype(io))io_u->engine_data; + data = (decltype(data))td->io_ops_data; + + /* No handy method to clear ops before reusage? Ok */ + io->req_msg->ops.clear(); + + /* Here we do not care about direction, always send as write */ + io->req_msg->write(0, io_u->buflen, buflist); + /* Keep message alive */ + io->req_msg->get(); + io->req_msg->get_connection()->send_message(io->req_msg); + + return FIO_Q_QUEUED; +} + +static int fio_ceph_msgr_getevents(struct thread_data *td, unsigned int min, + unsigned int max, const struct timespec *ts) +{ + struct ceph_msgr_data *data; + unsigned int nr; + + data = (decltype(data))td->io_ops_data; + + /* + * Check io_u.c : if min == 0 -> ts is valid and equal to zero, + * if min != 0 -> ts is NULL. + */ + assert(!min ^ !ts); + + nr = ring_buffer_used_size(&data->io_completed_q); + if (nr >= min) + /* We got something */ + return min(nr, max); + + /* Here we are only if min != 0 and ts == NULL */ + assert(min && !ts); + + while ((nr = ring_buffer_used_size(&data->io_completed_q)) < min && + !td->terminate) { + /* Poll, no disk IO, so we expect response immediately. */ + usleep(10); + } + + return min(nr, max); +} + +static struct io_u *fio_ceph_msgr_event(struct thread_data *td, int event) +{ + struct ceph_msgr_data *data; + struct ceph_msgr_io *io; + + data = (decltype(data))td->io_ops_data; + io = (decltype(io))ring_buffer_dequeue(&data->io_completed_q); + + return io->io_u; +} + +static enum fio_q_status ceph_msgr_receiver_queue(struct thread_data *td, + struct io_u *io_u) +{ + struct ceph_msgr_data *data; + struct ceph_msgr_io *io; + + io = (decltype(io))io_u->engine_data; + data = io->data; + pthread_spin_lock(&data->spin); + if (data->io_pending_nr) { + struct ceph_msgr_reply_io *rep_io; + MOSDOpReply *rep; + + data->io_pending_nr--; + rep_io = flist_first_entry(&data->io_pending_list, + struct ceph_msgr_reply_io, + list); + flist_del(&rep_io->list); + rep = rep_io->rep; + pthread_spin_unlock(&data->spin); + free(rep_io); + + rep->set_completion_hook(new ReplyCompletion(rep, io)); + rep->get_connection()->send_message(rep); + } else { + data->io_inflight_nr++; + flist_add_tail(&io->list, &data->io_inflight_list); + pthread_spin_unlock(&data->spin); + } + + return FIO_Q_QUEUED; +} + +static enum fio_q_status fio_ceph_msgr_queue(struct thread_data *td, + struct io_u *io_u) +{ + struct ceph_msgr_options *o = (decltype(o))td->eo; + + if (o->is_receiver) + return ceph_msgr_receiver_queue(td, io_u); + else + return ceph_msgr_sender_queue(td, io_u); +} + +static int fio_ceph_msgr_open_file(struct thread_data *td, struct fio_file *f) +{ + return 0; +} + +static int fio_ceph_msgr_close_file(struct thread_data *, struct fio_file *) +{ + return 0; +} + +template <class Func> +fio_option make_option(Func&& func) +{ + auto o = fio_option{}; + o.category = FIO_OPT_C_ENGINE; + func(std::ref(o)); + return o; +} + +static std::vector<fio_option> options { + make_option([] (fio_option& o) { + o.name = "receiver"; + o.lname = "CEPH messenger is receiver"; + o.type = FIO_OPT_BOOL; + o.off1 = offsetof(struct ceph_msgr_options, is_receiver); + o.help = "CEPH messenger is sender or receiver"; + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "single_instance"; + o.lname = "Single instance of CEPH messenger "; + o.type = FIO_OPT_BOOL; + o.off1 = offsetof(struct ceph_msgr_options, is_single); + o.help = "CEPH messenger is a created once for all threads"; + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "hostname"; + o.lname = "CEPH messenger hostname"; + o.type = FIO_OPT_STR_STORE; + o.off1 = offsetof(struct ceph_msgr_options, hostname); + o.help = "Hostname for CEPH messenger engine"; + }), + make_option([] (fio_option& o) { + o.name = "port"; + o.lname = "CEPH messenger engine port"; + o.type = FIO_OPT_INT; + o.off1 = offsetof(struct ceph_msgr_options, port); + o.maxval = 65535; + o.minval = 1; + o.help = "Port to use for CEPH messenger"; + }), + make_option([] (fio_option& o) { + o.name = "ms_type"; + o.lname = "CEPH messenger transport type: async+posix, async+dpdk, async+rdma"; + o.type = FIO_OPT_STR; + o.off1 = offsetof(struct ceph_msgr_options, ms_type); + o.help = "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page"; + o.def = "undef"; + + o.posval[0].ival = "undef"; + o.posval[0].oval = CEPH_MSGR_TYPE_UNDEF; + + o.posval[1].ival = "async+posix"; + o.posval[1].oval = CEPH_MSGR_TYPE_POSIX; + o.posval[1].help = "POSIX API"; + + o.posval[2].ival = "async+dpdk"; + o.posval[2].oval = CEPH_MSGR_TYPE_DPDK; + o.posval[2].help = "DPDK"; + + o.posval[3].ival = "async+rdma"; + o.posval[3].oval = CEPH_MSGR_TYPE_RDMA; + o.posval[3].help = "RDMA"; + }), + make_option([] (fio_option& o) { + o.name = "ceph_conf_file"; + o.lname = "CEPH configuration file"; + o.type = FIO_OPT_STR_STORE; + o.off1 = offsetof(struct ceph_msgr_options, conffile); + o.help = "Path to CEPH configuration file"; + }), + {} /* Last NULL */ +}; + +static struct ioengine_ops ioengine; + +extern "C" { + +void get_ioengine(struct ioengine_ops** ioengine_ptr) +{ + /* + * Main ioengine structure + */ + ioengine.name = "ceph-msgr"; + ioengine.version = FIO_IOOPS_VERSION; + ioengine.flags = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO; + ioengine.setup = fio_ceph_msgr_setup; + ioengine.queue = fio_ceph_msgr_queue; + ioengine.getevents = fio_ceph_msgr_getevents; + ioengine.event = fio_ceph_msgr_event; + ioengine.cleanup = fio_ceph_msgr_cleanup; + ioengine.open_file = fio_ceph_msgr_open_file; + ioengine.close_file = fio_ceph_msgr_close_file; + ioengine.io_u_init = fio_ceph_msgr_io_u_init; + ioengine.io_u_free = fio_ceph_msgr_io_u_free; + ioengine.option_struct_size = sizeof(struct ceph_msgr_options); + ioengine.options = options.data(); + + *ioengine_ptr = &ioengine; +} +} // extern "C" diff --git a/src/test/fio/fio_ceph_objectstore.cc b/src/test/fio/fio_ceph_objectstore.cc new file mode 100644 index 000000000..33f900b80 --- /dev/null +++ b/src/test/fio/fio_ceph_objectstore.cc @@ -0,0 +1,940 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph ObjectStore engine + * + * IO engine using Ceph's ObjectStore class to test low-level performance of + * Ceph OSDs. + * + */ + +#include <memory> +#include <system_error> +#include <vector> +#include <fstream> + +#include "os/ObjectStore.h" +#include "global/global_init.h" +#include "common/errno.h" +#include "include/intarith.h" +#include "include/stringify.h" +#include "include/random.h" +#include "include/str_list.h" +#include "common/perf_counters.h" +#include "common/TracepointProvider.h" + +#include <fio.h> +#include <optgroup.h> + +#include "include/ceph_assert.h" // fio.h clobbers our assert.h +#include <algorithm> + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_ + +namespace { + +/// fio configuration options read from the job file +struct Options { + thread_data* td; + char* conf; + char* perf_output_file; + char* throttle_values; + char* deferred_throttle_values; + unsigned long long + cycle_throttle_period, + oi_attr_len_low, + oi_attr_len_high, + snapset_attr_len_low, + snapset_attr_len_high, + pglog_omap_len_low, + pglog_omap_len_high, + pglog_dup_omap_len_low, + pglog_dup_omap_len_high, + _fastinfo_omap_len_low, + _fastinfo_omap_len_high; + unsigned simulate_pglog; + unsigned single_pool_mode; + unsigned preallocate_files; + unsigned check_files; +}; + +template <class Func> // void Func(fio_option&) +fio_option make_option(Func&& func) +{ + // zero-initialize and set common defaults + auto o = fio_option{}; + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_RBD; + func(std::ref(o)); + return o; +} + +static std::vector<fio_option> ceph_options{ + make_option([] (fio_option& o) { + o.name = "conf"; + o.lname = "ceph configuration file"; + o.type = FIO_OPT_STR_STORE; + o.help = "Path to a ceph configuration file"; + o.off1 = offsetof(Options, conf); + }), + make_option([] (fio_option& o) { + o.name = "perf_output_file"; + o.lname = "perf output target"; + o.type = FIO_OPT_STR_STORE; + o.help = "Path to which to write json formatted perf output"; + o.off1 = offsetof(Options, perf_output_file); + o.def = 0; + }), + make_option([] (fio_option& o) { + o.name = "oi_attr_len"; + o.lname = "OI Attr length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set OI(aka '_') attribute to specified length"; + o.off1 = offsetof(Options, oi_attr_len_low); + o.off2 = offsetof(Options, oi_attr_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "snapset_attr_len"; + o.lname = "Attr 'snapset' length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set 'snapset' attribute to specified length"; + o.off1 = offsetof(Options, snapset_attr_len_low); + o.off2 = offsetof(Options, snapset_attr_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "_fastinfo_omap_len"; + o.lname = "'_fastinfo' omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set '_fastinfo' OMAP attribute to specified length"; + o.off1 = offsetof(Options, _fastinfo_omap_len_low); + o.off2 = offsetof(Options, _fastinfo_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "pglog_simulation"; + o.lname = "pglog behavior simulation"; + o.type = FIO_OPT_BOOL; + o.help = "Enables PG Log simulation behavior"; + o.off1 = offsetof(Options, simulate_pglog); + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "pglog_omap_len"; + o.lname = "pglog omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set pglog omap entry to specified length"; + o.off1 = offsetof(Options, pglog_omap_len_low); + o.off2 = offsetof(Options, pglog_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "pglog_dup_omap_len"; + o.lname = "uplicate pglog omap entry length"; + o.type = FIO_OPT_STR_VAL; + o.help = "Set duplicate pglog omap entry to specified length"; + o.off1 = offsetof(Options, pglog_dup_omap_len_low); + o.off2 = offsetof(Options, pglog_dup_omap_len_high); + o.def = 0; + o.minval = 0; + }), + make_option([] (fio_option& o) { + o.name = "single_pool_mode"; + o.lname = "single(shared among jobs) pool mode"; + o.type = FIO_OPT_BOOL; + o.help = "Enables the mode when all jobs run against the same pool"; + o.off1 = offsetof(Options, single_pool_mode); + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "preallocate_files"; + o.lname = "preallocate files on init"; + o.type = FIO_OPT_BOOL; + o.help = "Enables/disables file preallocation (touch and resize) on init"; + o.off1 = offsetof(Options, preallocate_files); + o.def = "1"; + }), + make_option([] (fio_option& o) { + o.name = "check_files"; + o.lname = "ensure files exist and are correct on init"; + o.type = FIO_OPT_BOOL; + o.help = "Enables/disables checking of files on init"; + o.off1 = offsetof(Options, check_files); + o.def = "0"; + }), + make_option([] (fio_option& o) { + o.name = "bluestore_throttle"; + o.lname = "set bluestore throttle"; + o.type = FIO_OPT_STR_STORE; + o.help = "comma delimited list of throttle values", + o.off1 = offsetof(Options, throttle_values); + o.def = 0; + }), + make_option([] (fio_option& o) { + o.name = "bluestore_deferred_throttle"; + o.lname = "set bluestore deferred throttle"; + o.type = FIO_OPT_STR_STORE; + o.help = "comma delimited list of throttle values", + o.off1 = offsetof(Options, deferred_throttle_values); + o.def = 0; + }), + make_option([] (fio_option& o) { + o.name = "vary_bluestore_throttle_period"; + o.lname = "period between different throttle values"; + o.type = FIO_OPT_STR_VAL; + o.help = "set to non-zero value to periodically cycle through throttle options"; + o.off1 = offsetof(Options, cycle_throttle_period); + o.def = "0"; + o.minval = 0; + }), + {} // fio expects a 'null'-terminated list +}; + + +struct Collection { + spg_t pg; + coll_t cid; + ObjectStore::CollectionHandle ch; + // Can't use mutex directly in vectors hence dynamic allocation + + std::unique_ptr<std::mutex> lock; + uint64_t pglog_ver_head = 1; + uint64_t pglog_ver_tail = 1; + uint64_t pglog_dup_ver_tail = 1; + + // use big pool ids to avoid clashing with existing collections + static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff; + + Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch) + : pg(pg), cid(pg), ch(_ch), + lock(new std::mutex) { + } +}; + +int destroy_collections( + std::unique_ptr<ObjectStore>& os, + std::vector<Collection>& collections) +{ + ObjectStore::Transaction t; + bool failed = false; + // remove our collections + for (auto& coll : collections) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.remove(coll.cid, pgmeta_oid); + t.remove_collection(coll.cid); + int r = os->queue_transaction(coll.ch, std::move(t)); + if (r && !failed) { + derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl; + failed = true; + } + } + return 0; +} + +int init_collections(std::unique_ptr<ObjectStore>& os, + uint64_t pool, + std::vector<Collection>& collections, + uint64_t count) +{ + ceph_assert(count > 0); + collections.reserve(count); + + const int split_bits = cbits(count - 1); + + { + // propagate Superblock object to ensure proper functioning of tools that + // need it. E.g. ceph-objectstore-tool + coll_t cid(coll_t::meta()); + bool exists = os->collection_exists(cid); + if (!exists) { + auto ch = os->create_new_collection(cid); + + OSDSuperblock superblock; + bufferlist bl; + encode(superblock, bl); + + ObjectStore::Transaction t; + t.create_collection(cid, split_bits); + t.write(cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl); + int r = os->queue_transaction(ch, std::move(t)); + + if (r < 0) { + derr << "Failure to write OSD superblock: " << cpp_strerror(-r) << dendl; + return r; + } + } + } + + for (uint32_t i = 0; i < count; i++) { + auto pg = spg_t{pg_t{i, pool}}; + coll_t cid(pg); + + bool exists = os->collection_exists(cid); + auto ch = exists ? + os->open_collection(cid) : + os->create_new_collection(cid) ; + + collections.emplace_back(pg, ch); + + ObjectStore::Transaction t; + auto& coll = collections.back(); + if (!exists) { + t.create_collection(coll.cid, split_bits); + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.touch(coll.cid, pgmeta_oid); + int r = os->queue_transaction(coll.ch, std::move(t)); + if (r) { + derr << "Engine init failed with " << cpp_strerror(-r) << dendl; + destroy_collections(os, collections); + return r; + } + } + } + return 0; +} + +/// global engine state shared between all jobs within the process. this +/// includes g_ceph_context and the ObjectStore instance +struct Engine { + /// the initial g_ceph_context reference to be dropped on destruction + boost::intrusive_ptr<CephContext> cct; + std::unique_ptr<ObjectStore> os; + + std::vector<Collection> collections; //< shared collections to spread objects over + + std::mutex lock; + int ref_count; + const bool unlink; //< unlink objects on destruction + + // file to which to output formatted perf information + const std::optional<std::string> perf_output_file; + + explicit Engine(thread_data* td); + ~Engine(); + + static Engine* get_instance(thread_data* td) { + // note: creates an Engine with the options associated with the first job + static Engine engine(td); + return &engine; + } + + void ref() { + std::lock_guard<std::mutex> l(lock); + ++ref_count; + } + void deref() { + std::lock_guard<std::mutex> l(lock); + --ref_count; + if (!ref_count) { + ostringstream ostr; + Formatter* f = Formatter::create( + "json-pretty", "json-pretty", "json-pretty"); + f->open_object_section("perf_output"); + cct->get_perfcounters_collection()->dump_formatted(f, false); + if (g_conf()->rocksdb_perf) { + f->open_object_section("rocksdb_perf"); + os->get_db_statistics(f); + f->close_section(); + } + mempool::dump(f); + { + f->open_object_section("db_histogram"); + os->generate_db_histogram(f); + f->close_section(); + } + f->close_section(); + + f->flush(ostr); + delete f; + + if (unlink) { + destroy_collections(os, collections); + } + os->umount(); + dout(0) << "FIO plugin perf dump:" << dendl; + dout(0) << ostr.str() << dendl; + if (perf_output_file) { + try { + std::ofstream foutput(*perf_output_file); + foutput << ostr.str() << std::endl; + } catch (std::exception &e) { + std::cerr << "Unable to write formatted output to " + << *perf_output_file + << ", exception: " << e.what() + << std::endl; + } + } + } + } +}; + +TracepointProvider::Traits bluestore_tracepoint_traits("libbluestore_tp.so", + "bluestore_tracing"); + +Engine::Engine(thread_data* td) + : ref_count(0), + unlink(td->o.unlink), + perf_output_file( + static_cast<Options*>(td->eo)->perf_output_file ? + std::make_optional(static_cast<Options*>(td->eo)->perf_output_file) : + std::nullopt) +{ + // add the ceph command line arguments + auto o = static_cast<Options*>(td->eo); + if (!o->conf) { + throw std::runtime_error("missing conf option for ceph configuration file"); + } + std::vector<const char*> args{ + "-i", "0", // identify as osd.0 for osd_data and osd_journal + "--conf", o->conf, // use the requested conf file + }; + if (td->o.directory) { // allow conf files to use ${fio_dir} for data + args.emplace_back("--fio_dir"); + args.emplace_back(td->o.directory); + } + + // claim the g_ceph_context reference and release it on destruction + cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + + TracepointProvider::initialize<bluestore_tracepoint_traits>(g_ceph_context); + + // create the ObjectStore + os.reset(ObjectStore::create(g_ceph_context, + g_conf().get_val<std::string>("osd objectstore"), + g_conf().get_val<std::string>("osd data"), + g_conf().get_val<std::string>("osd journal"))); + if (!os) + throw std::runtime_error("bad objectstore type " + g_conf()->osd_objectstore); + + unsigned num_shards; + if(g_conf()->osd_op_num_shards) + num_shards = g_conf()->osd_op_num_shards; + else if(os->is_rotational()) + num_shards = g_conf()->osd_op_num_shards_hdd; + else + num_shards = g_conf()->osd_op_num_shards_ssd; + os->set_cache_shards(num_shards); + + //normalize options + o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high); + o->snapset_attr_len_high = max(o->snapset_attr_len_low, + o->snapset_attr_len_high); + o->pglog_omap_len_high = max(o->pglog_omap_len_low, + o->pglog_omap_len_high); + o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low, + o->pglog_dup_omap_len_high); + o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low, + o->_fastinfo_omap_len_high); + + int r = os->mkfs(); + if (r < 0) + throw std::system_error(-r, std::system_category(), "mkfs failed"); + + r = os->mount(); + if (r < 0) + throw std::system_error(-r, std::system_category(), "mount failed"); + + // create shared collections up to osd_pool_default_pg_num + if (o->single_pool_mode) { + uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num"); + if (count > td->o.nr_files) + count = td->o.nr_files; + init_collections(os, Collection::MIN_POOL_ID, collections, count); + } +} + +Engine::~Engine() +{ + ceph_assert(!ref_count); +} + +struct Object { + ghobject_t oid; + Collection& coll; + + Object(const char* name, Collection& coll) + : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")), + coll(coll) {} +}; + +/// treat each fio job either like a separate pool with its own collections and objects +/// or just a client using its own objects from the shared pool +struct Job { + Engine* engine; //< shared ptr to the global Engine + const unsigned subjob_number; //< subjob num + std::vector<Collection> collections; //< job's private collections to spread objects over + std::vector<Object> objects; //< associate an object with each fio_file + std::vector<io_u*> events; //< completions for fio_ceph_os_event() + const bool unlink; //< unlink objects on destruction + + bufferptr one_for_all_data; //< preallocated buffer long enough + //< to use for vairious operations + std::mutex throttle_lock; + const vector<unsigned> throttle_values; + const vector<unsigned> deferred_throttle_values; + std::chrono::duration<double> cycle_throttle_period; + mono_clock::time_point last = ceph::mono_clock::zero(); + unsigned index = 0; + + static vector<unsigned> parse_throttle_str(const char *p) { + vector<unsigned> ret; + if (p == nullptr) { + return ret; + } + ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable { + if (s.size() > 0) { + ret.push_back(std::stoul(std::string(s))); + } + }); + return ret; + } + void check_throttle(); + + Job(Engine* engine, const thread_data* td); + ~Job(); +}; + +Job::Job(Engine* engine, const thread_data* td) + : engine(engine), + subjob_number(td->subjob_number), + events(td->o.iodepth), + unlink(td->o.unlink), + throttle_values( + parse_throttle_str(static_cast<Options*>(td->eo)->throttle_values)), + deferred_throttle_values( + parse_throttle_str(static_cast<Options*>(td->eo)->deferred_throttle_values)), + cycle_throttle_period( + static_cast<Options*>(td->eo)->cycle_throttle_period) +{ + engine->ref(); + auto o = static_cast<Options*>(td->eo); + unsigned long long max_data = max(o->oi_attr_len_high, + o->snapset_attr_len_high); + max_data = max(max_data, o->pglog_omap_len_high); + max_data = max(max_data, o->pglog_dup_omap_len_high); + max_data = max(max_data, o->_fastinfo_omap_len_high); + one_for_all_data = buffer::create(max_data); + + std::vector<Collection>* colls; + // create private collections up to osd_pool_default_pg_num + if (!o->single_pool_mode) { + uint64_t count = g_conf().get_val<uint64_t>("osd_pool_default_pg_num"); + if (count > td->o.nr_files) + count = td->o.nr_files; + // use the fio thread_number for our unique pool id + const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1; + init_collections(engine->os, pool, collections, count); + colls = &collections; + } else { + colls = &engine->collections; + } + const uint64_t file_size = td->o.size / max(1u, td->o.nr_files); + ObjectStore::Transaction t; + + // create an object for each file in the job + objects.reserve(td->o.nr_files); + unsigned checked_or_preallocated = 0; + for (uint32_t i = 0; i < td->o.nr_files; i++) { + auto f = td->files[i]; + f->real_file_size = file_size; + f->engine_pos = i; + + // associate each object with a collection in a round-robin fashion. + auto& coll = (*colls)[i % colls->size()]; + + objects.emplace_back(f->file_name, coll); + if (o->preallocate_files) { + auto& oid = objects.back().oid; + t.touch(coll.cid, oid); + t.truncate(coll.cid, oid, file_size); + int r = engine->os->queue_transaction(coll.ch, std::move(t)); + if (r) { + engine->deref(); + throw std::system_error(r, std::system_category(), "job init"); + } + } + if (o->check_files) { + auto& oid = objects.back().oid; + struct stat st; + int r = engine->os->stat(coll.ch, oid, &st); + if (r || ((unsigned)st.st_size) != file_size) { + derr << "Problem checking " << oid << ", r=" << r + << ", st.st_size=" << st.st_size + << ", file_size=" << file_size + << ", nr_files=" << td->o.nr_files << dendl; + engine->deref(); + throw std::system_error( + r, std::system_category(), "job init -- cannot check file"); + } + } + if (o->check_files || o->preallocate_files) { + ++checked_or_preallocated; + } + } + if (o->check_files) { + derr << "fio_ceph_objectstore checked " << checked_or_preallocated + << " files"<< dendl; + } + if (o->preallocate_files ){ + derr << "fio_ceph_objectstore preallocated " << checked_or_preallocated + << " files"<< dendl; + } +} + +Job::~Job() +{ + if (unlink) { + ObjectStore::Transaction t; + bool failed = false; + // remove our objects + for (auto& obj : objects) { + t.remove(obj.coll.cid, obj.oid); + int r = engine->os->queue_transaction(obj.coll.ch, std::move(t)); + if (r && !failed) { + derr << "job cleanup failed with " << cpp_strerror(-r) << dendl; + failed = true; + } + } + destroy_collections(engine->os, collections); + } + engine->deref(); +} + +void Job::check_throttle() +{ + if (subjob_number != 0) + return; + + std::lock_guard<std::mutex> l(throttle_lock); + if (throttle_values.empty() && deferred_throttle_values.empty()) + return; + + if (ceph::mono_clock::is_zero(last) || + ((cycle_throttle_period != cycle_throttle_period.zero()) && + (ceph::mono_clock::now() - last) > cycle_throttle_period)) { + unsigned tvals = throttle_values.size() ? throttle_values.size() : 1; + unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1; + if (!throttle_values.empty()) { + std::string val = std::to_string(throttle_values[index % tvals]); + std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl; + int r = engine->cct->_conf.set_val( + "bluestore_throttle_bytes", + val, + nullptr); + ceph_assert(r == 0); + } + if (!deferred_throttle_values.empty()) { + std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]); + std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl; + int r = engine->cct->_conf.set_val( + "bluestore_throttle_deferred_bytes", + val, + nullptr); + ceph_assert(r == 0); + } + engine->cct->_conf.apply_changes(nullptr); + index++; + index %= tvals * dtvals; + last = ceph::mono_clock::now(); + } +} + +int fio_ceph_os_setup(thread_data* td) +{ + // if there are multiple jobs, they must run in the same process against a + // single instance of the ObjectStore. explicitly disable fio's default + // job-per-process configuration + td->o.use_thread = 1; + + try { + // get or create the global Engine instance + auto engine = Engine::get_instance(td); + // create a Job for this thread + td->io_ops_data = new Job(engine, td); + } catch (std::exception& e) { + std::cerr << "setup failed with " << e.what() << std::endl; + return -1; + } + return 0; +} + +void fio_ceph_os_cleanup(thread_data* td) +{ + auto job = static_cast<Job*>(td->io_ops_data); + td->io_ops_data = nullptr; + delete job; +} + + +io_u* fio_ceph_os_event(thread_data* td, int event) +{ + // return the requested event from fio_ceph_os_getevents() + auto job = static_cast<Job*>(td->io_ops_data); + return job->events[event]; +} + +int fio_ceph_os_getevents(thread_data* td, unsigned int min, + unsigned int max, const timespec* t) +{ + auto job = static_cast<Job*>(td->io_ops_data); + unsigned int events = 0; + io_u* u = NULL; + unsigned int i = 0; + + // loop through inflight ios until we find 'min' completions + do { + io_u_qiter(&td->io_u_all, u, i) { + if (!(u->flags & IO_U_F_FLIGHT)) + continue; + + if (u->engine_data) { + u->engine_data = nullptr; + job->events[events] = u; + events++; + } + } + if (events >= min) + break; + usleep(100); + } while (1); + + return events; +} + +/// completion context for ObjectStore::queue_transaction() +class UnitComplete : public Context { + io_u* u; + public: + explicit UnitComplete(io_u* u) : u(u) {} + void finish(int r) { + // mark the pointer to indicate completion for fio_ceph_os_getevents() + u->engine_data = reinterpret_cast<void*>(1ull); + } +}; + +enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u) +{ + fio_ro_check(td, u); + + + + auto o = static_cast<const Options*>(td->eo); + auto job = static_cast<Job*>(td->io_ops_data); + auto& object = job->objects[u->file->engine_pos]; + auto& coll = object.coll; + auto& os = job->engine->os; + + job->check_throttle(); + + if (u->ddir == DDIR_WRITE) { + // provide a hint if we're likely to read this data back + const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0; + + bufferlist bl; + bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf), + u->xfer_buflen ) ); + + map<string,bufferptr> attrset; + map<string, bufferlist> omaps; + // enqueue a write transaction on the collection's handle + ObjectStore::Transaction t; + char ver_key[64]; + + // fill attrs if any + if (o->oi_attr_len_high) { + ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->oi_attr_len_low, o->oi_attr_len_high)); + attrset["_"] = job->one_for_all_data; + } + if (o->snapset_attr_len_high) { + ceph_assert(o->snapset_attr_len_high >= o->snapset_attr_len_low); + job->one_for_all_data.set_length( + ceph::util::generate_random_number + (o->snapset_attr_len_low, o->snapset_attr_len_high)); + attrset["snapset"] = job->one_for_all_data; + + } + if (o->_fastinfo_omap_len_high) { + ceph_assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high)); + omaps["_fastinfo"].append(job->one_for_all_data); + } + + uint64_t pglog_trim_head = 0, pglog_trim_tail = 0; + uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0; + if (o->simulate_pglog) { + + uint64_t pglog_ver_cnt = 0; + { + std::lock_guard<std::mutex> l(*coll.lock); + pglog_ver_cnt = coll.pglog_ver_head++; + if (o->pglog_omap_len_high && + pglog_ver_cnt >= + coll.pglog_ver_tail + + g_conf()->osd_min_pg_log_entries + g_conf()->osd_pg_log_trim_min) { + pglog_trim_tail = coll.pglog_ver_tail; + coll.pglog_ver_tail = pglog_trim_head = + pglog_trim_tail + g_conf()->osd_pg_log_trim_min; + + if (o->pglog_dup_omap_len_high && + pglog_ver_cnt >= + coll.pglog_dup_ver_tail + g_conf()->osd_pg_log_dups_tracked + + g_conf()->osd_pg_log_trim_min) { + pglog_dup_trim_tail = coll.pglog_dup_ver_tail; + coll.pglog_dup_ver_tail = pglog_dup_trim_head = + pglog_dup_trim_tail + g_conf()->osd_pg_log_trim_min; + } + } + } + + if (o->pglog_omap_len_high) { + ceph_assert(o->pglog_omap_len_high >= o->pglog_omap_len_low); + snprintf(ver_key, sizeof(ver_key), + "0000000011.%020llu", (unsigned long long)pglog_ver_cnt); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->pglog_omap_len_low, o->pglog_omap_len_high)); + omaps[ver_key].append(job->one_for_all_data); + } + if (o->pglog_dup_omap_len_high) { + //insert dup + ceph_assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low); + for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "dup_0000000011.%020llu", (unsigned long long)i); + // fill with the garbage as we do not care of the actual content... + job->one_for_all_data.set_length( + ceph::util::generate_random_number( + o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high)); + omaps[ver_key].append(job->one_for_all_data); + } + } + } + + if (attrset.size()) { + t.setattrs(coll.cid, object.oid, attrset); + } + t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags); + + set<string> rmkeys; + for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "0000000011.%020llu", (unsigned long long)i); + rmkeys.emplace(ver_key); + } + for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) { + snprintf(ver_key, sizeof(ver_key), + "dup_0000000011.%020llu", (unsigned long long)i); + rmkeys.emplace(ver_key); + } + + if (rmkeys.size()) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys); + } + + if (omaps.size()) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.omap_setkeys(coll.cid, pgmeta_oid, omaps); + } + t.register_on_commit(new UnitComplete(u)); + os->queue_transaction(coll.ch, + std::move(t)); + return FIO_Q_QUEUED; + } + + if (u->ddir == DDIR_READ) { + // ObjectStore reads are synchronous, so make the call and return COMPLETED + bufferlist bl; + int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl); + if (r < 0) { + u->error = r; + td_verror(td, u->error, "xfer"); + } else { + bl.begin().copy(bl.length(), static_cast<char*>(u->xfer_buf)); + u->resid = u->xfer_buflen - r; + } + return FIO_Q_COMPLETED; + } + + derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl; + u->error = -EINVAL; + td_verror(td, u->error, "xfer"); + return FIO_Q_COMPLETED; +} + +int fio_ceph_os_commit(thread_data* td) +{ + // commit() allows the engine to batch up queued requests to be submitted all + // at once. it would be natural for queue() to collect transactions in a list, + // and use commit() to pass them all to ObjectStore::queue_transactions(). but + // because we spread objects over multiple collections, we a) need to use a + // different sequencer for each collection, and b) are less likely to see a + // benefit from batching requests within a collection + return 0; +} + +// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to +// prevent fio from creating the files +int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; } +int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; } + +int fio_ceph_os_io_u_init(thread_data* td, io_u* u) +{ + // no data is allocated, we just use the pointer as a boolean 'completed' flag + u->engine_data = nullptr; + return 0; +} + +void fio_ceph_os_io_u_free(thread_data* td, io_u* u) +{ + u->engine_data = nullptr; +} + + +// ioengine_ops for get_ioengine() +struct ceph_ioengine : public ioengine_ops { + ceph_ioengine() : ioengine_ops({}) { + name = "ceph-os"; + version = FIO_IOOPS_VERSION; + flags = FIO_DISKLESSIO; + setup = fio_ceph_os_setup; + queue = fio_ceph_os_queue; + commit = fio_ceph_os_commit; + getevents = fio_ceph_os_getevents; + event = fio_ceph_os_event; + cleanup = fio_ceph_os_cleanup; + open_file = fio_ceph_os_open; + close_file = fio_ceph_os_close; + io_u_init = fio_ceph_os_io_u_init; + io_u_free = fio_ceph_os_io_u_free; + options = ceph_options.data(); + option_struct_size = sizeof(struct Options); + } +}; + +} // anonymous namespace + +extern "C" { +// the exported fio engine interface +void get_ioengine(struct ioengine_ops** ioengine_ptr) { + static ceph_ioengine ioengine; + *ioengine_ptr = &ioengine; +} +} // extern "C" diff --git a/src/test/fio/fio_librgw.cc b/src/test/fio/fio_librgw.cc new file mode 100644 index 000000000..b088b68f9 --- /dev/null +++ b/src/test/fio/fio_librgw.cc @@ -0,0 +1,542 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <stdint.h> +#include <tuple> +#include <vector> +#include <functional> +#include <iostream> + +#include <semaphore.h> // XXX kill this? + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include "fmt/include/fmt/format.h" + +#include "include/rados/librgw.h" +#include "include/rados/rgw_file.h" +//#include "rgw/rgw_file.h" +//#include "rgw/rgw_lib_frontend.h" // direct requests + +/* naughty fio.h leaks min and max as C macros--include it last */ +#include <fio.h> +#include <optgroup.h> +#undef min +#undef max + +namespace { + + struct librgw_iou { + struct io_u *io_u; + int io_complete; + }; + + struct librgw_data { + io_u** aio_events; + librgw_t rgw_h; + rgw_fs* fs; + rgw_file_handle* bucket_fh; + + std::vector<rgw_file_handle*> fh_vec; + + librgw_data(thread_data* td) + : rgw_h(nullptr), fs(nullptr), bucket_fh(nullptr) + { + auto size = td->o.iodepth * sizeof(io_u*); + aio_events = static_cast<io_u**>(malloc(size)); + memset(aio_events, 0, size); + } + + void save_handle(rgw_file_handle* fh) { + fh_vec.push_back(fh); + } + + void release_handles() { + for (auto object_fh : fh_vec) { + rgw_fh_rele(fs, object_fh, RGW_FH_RELE_FLAG_NONE); + } + fh_vec.clear(); + } + + ~librgw_data() { + free(aio_events); + } + }; + + struct opt_struct { + struct thread_data *td; + + const char* config; /* can these be std::strings? */ + const char* cluster; + const char* name; // instance? + const char* init_args; + const char* access_key; + const char* secret_key; + const char* userid; + const char* bucket_name; + + uint32_t owner_uid = 867; + uint32_t owner_gid = 5309; + }; + + uint32_t create_mask = RGW_SETATTR_UID | RGW_SETATTR_GID | RGW_SETATTR_MODE; + +/* borrowed from fio_ceph_objectstore */ + template <class F> + fio_option make_option(F&& func) + { + // zero-initialize and set common defaults + auto o = fio_option{}; + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_INVALID; + func(std::ref(o)); + return o; + } + + static std::vector<fio_option> options = { + make_option([] (fio_option& o) { + o.name = "ceph_conf"; + o.lname = "ceph configuration file"; + o.type = FIO_OPT_STR_STORE; + o.help = "Path to ceph.conf file"; + o.off1 = offsetof(opt_struct, config); + }), + make_option([] (fio_option& o) { + o.name = "ceph_name"; + o.lname = "ceph instance name"; + o.type = FIO_OPT_STR_STORE; + o.help = "Name of this program instance"; + o.off1 = offsetof(opt_struct, name); + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_INVALID; + }), + make_option([] (fio_option& o) { + o.name = "ceph_cluster"; + o.lname = "ceph cluster name"; + o.type = FIO_OPT_STR_STORE; + o.help = "Name of ceph cluster (default=ceph)"; + o.off1 = offsetof(opt_struct, cluster); + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_INVALID; + }), + make_option([] (fio_option& o) { + o.name = "ceph_init_args"; + o.lname = "ceph init args"; + o.type = FIO_OPT_STR_STORE; + o.help = "Extra ceph arguments (e.g., -d --debug-rgw=16)"; + o.off1 = offsetof(opt_struct, init_args); + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_INVALID; + }), + make_option([] (fio_option& o) { + o.name = "access_key"; + o.lname = "AWS access key"; + o.type = FIO_OPT_STR_STORE; + o.help = "AWS access key"; + o.off1 = offsetof(opt_struct, access_key); + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_INVALID; + }), + make_option([] (fio_option& o) { + o.name = "secret_key"; + o.lname = "AWS secret key"; + o.type = FIO_OPT_STR_STORE; + o.help = "AWS secret key"; + o.off1 = offsetof(opt_struct, secret_key); + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_INVALID; + }), + make_option([] (fio_option& o) { + o.name = "userid"; + o.lname = "userid"; + o.type = FIO_OPT_STR_STORE; + o.help = "userid corresponding to access key"; + o.off1 = offsetof(opt_struct, userid); + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_INVALID; + }), + make_option([] (fio_option& o) { + o.name = "bucket_name"; + o.lname = "S3 bucket"; + o.type = FIO_OPT_STR_STORE; + o.help = "S3 bucket to operate on"; + o.off1 = offsetof(opt_struct, bucket_name); + o.category = FIO_OPT_C_ENGINE; + o.group = FIO_OPT_G_INVALID; + }), + {} // fio expects a 'null'-terminated list + }; + + struct save_args { + int argc; + char *argv[8]; + save_args() : argc(1) + { + argv[0] = strdup("librgw"); + for (int ix = 1; ix < 8; ++ix) { + argv[ix] = nullptr; + } + } + + void push_arg(const std::string sarg) { + argv[argc++] = strdup(sarg.c_str()); + } + + ~save_args() { + for (int ix = 0; ix < argc; ++ix) { + argv[ix] = nullptr; + } + } + } args; + +/* + * It looks like the setup function is called once, on module load. + * It's not documented in the skeleton driver. + */ + static int fio_librgw_setup(struct thread_data* td) + { + opt_struct& o = *(reinterpret_cast<opt_struct*>(td->eo)); + librgw_data* data = nullptr; + int r = 0; + + dprint(FD_IO, "fio_librgw_setup\n"); + + if (! td->io_ops_data) { + data = new librgw_data(td); + + /* init args */ + std::string sopt; + if (o.config) { + sopt = fmt::format("--conf={}", o.config); + args.push_arg(sopt); + } + std::cout << o.name << std::endl; + if (o.name) { + sopt = fmt::format("--name={}", o.name); + args.push_arg(sopt); + } + if (o.cluster) { + sopt = fmt::format("--cluster={}", o.cluster); + args.push_arg(sopt); + } + if (o.init_args) { + args.push_arg(std::string(o.init_args)); + } + + r = librgw_create(&data->rgw_h, args.argc, args.argv); + if (!! r) { + dprint(FD_IO, "librgw_create failed\n"); + return r; + } + + r = rgw_mount2(data->rgw_h, o.userid, o.access_key, o.secret_key, "/", + &data->fs, RGW_MOUNT_FLAG_NONE); + if (!! r) { + dprint(FD_IO, "rgw_mount2 failed\n"); + return r; + } + + /* go ahead and lookup the bucket as well */ + r = rgw_lookup(data->fs, data->fs->root_fh, o.bucket_name, + &data->bucket_fh, nullptr, 0, RGW_LOOKUP_FLAG_NONE); + if (! data->bucket_fh) { + dprint(FD_IO, "rgw_lookup on bucket %s failed, will create\n", + o.bucket_name); + + struct stat st; + st.st_uid = o.owner_uid; + st.st_gid = o.owner_gid; + st.st_mode = 755; + + r = rgw_mkdir(data->fs, data->fs->root_fh, o.bucket_name, + &st, create_mask, &data->bucket_fh, RGW_MKDIR_FLAG_NONE); + if (! data->bucket_fh) { + dprint(FD_IO, "rgw_mkdir for bucket %s failed\n", o.bucket_name); + return EINVAL; + } + } + + td->io_ops_data = data; + } + + td->o.use_thread = 1; + + if (r != 0) { + abort(); + } + + return r; + } + +/* + * The init function is called once per thread/process, and should set up + * any structures that this io engine requires to keep track of io. Not + * required. + */ + static int fio_librgw_init(struct thread_data *td) + { + dprint(FD_IO, "fio_librgw_init\n"); + return 0; + } + +/* + * This is paired with the ->init() function and is called when a thread is + * done doing io. Should tear down anything setup by the ->init() function. + * Not required. + * + * N.b., the cohort driver made this idempotent by allocating data in + * setup, clearing data here if present, and doing nothing in the + * subsequent per-thread invocations. + */ + static void fio_librgw_cleanup(struct thread_data *td) + { + int r = 0; + + dprint(FD_IO, "fio_librgw_cleanup\n"); + + /* cleanup specific data */ + librgw_data* data = static_cast<librgw_data*>(td->io_ops_data); + if (data) { + + /* release active handles */ + data->release_handles(); + + if (data->bucket_fh) { + r = rgw_fh_rele(data->fs, data->bucket_fh, 0 /* flags */); + } + r = rgw_umount(data->fs, RGW_UMOUNT_FLAG_NONE); + librgw_shutdown(data->rgw_h); + td->io_ops_data = nullptr; + delete data; + } + } + +/* + * The ->prep() function is called for each io_u prior to being submitted + * with ->queue(). This hook allows the io engine to perform any + * preparatory actions on the io_u, before being submitted. Not required. + */ + static int fio_librgw_prep(struct thread_data *td, struct io_u *io_u) + { + return 0; + } + +/* + * The ->event() hook is called to match an event number with an io_u. + * After the core has called ->getevents() and it has returned eg 3, + * the ->event() hook must return the 3 events that have completed for + * subsequent calls to ->event() with [0-2]. Required. + */ + static struct io_u *fio_librgw_event(struct thread_data *td, int event) + { + return NULL; + } + +/* + * The ->getevents() hook is used to reap completion events from an async + * io engine. It returns the number of completed events since the last call, + * which may then be retrieved by calling the ->event() hook with the event + * numbers. Required. + */ + static int fio_librgw_getevents(struct thread_data *td, unsigned int min, + unsigned int max, const struct timespec *t) + { + return 0; + } + +/* + * The ->cancel() hook attempts to cancel the io_u. Only relevant for + * async io engines, and need not be supported. + */ + static int fio_librgw_cancel(struct thread_data *td, struct io_u *io_u) + { + return 0; + } + +/* + * The ->queue() hook is responsible for initiating io on the io_u + * being passed in. If the io engine is a synchronous one, io may complete + * before ->queue() returns. Required. + * + * The io engine must transfer in the direction noted by io_u->ddir + * to the buffer pointed to by io_u->xfer_buf for as many bytes as + * io_u->xfer_buflen. Residual data count may be set in io_u->resid + * for a short read/write. + */ + static enum fio_q_status fio_librgw_queue(struct thread_data *td, + struct io_u *io_u) + { + librgw_data* data = static_cast<librgw_data*>(td->io_ops_data); + const char* object = io_u->file->file_name; + struct rgw_file_handle* object_fh = nullptr; + size_t nbytes; + int r = 0; + + /* + * Double sanity check to catch errant write on a readonly setup + */ + fio_ro_check(td, io_u); + + if (io_u->ddir == DDIR_WRITE) { + /* Do full write cycle */ + r = rgw_lookup(data->fs, data->bucket_fh, object, &object_fh, nullptr, 0, + RGW_LOOKUP_FLAG_CREATE); + if (!! r) { + dprint(FD_IO, "rgw_lookup failed to create filehandle for %s\n", + object); + goto out; + } + + r = rgw_open(data->fs, object_fh, 0 /* posix flags */, 0 /* flags */); + if (!! r) { + dprint(FD_IO, "rgw_open failed to create filehandle for %s\n", + object); + rgw_fh_rele(data->fs, object_fh, RGW_FH_RELE_FLAG_NONE); + goto out; + } + + /* librgw can write at any offset, but only sequentially + * starting at 0, in one open/write/close cycle */ + r = rgw_write(data->fs, object_fh, 0, io_u->xfer_buflen, &nbytes, + (void*) io_u->xfer_buf, RGW_WRITE_FLAG_NONE); + if (!! r) { + dprint(FD_IO, "rgw_write failed for %s\n", + object); + } + + r = rgw_close(data->fs, object_fh, 0 /* flags */); + + /* object_fh is closed but still reachable, save it */ + data->save_handle(object_fh); + } else if (io_u->ddir == DDIR_READ) { + + r = rgw_lookup(data->fs, data->bucket_fh, object, &object_fh, + nullptr, 0, RGW_LOOKUP_FLAG_NONE); + if (!! r) { + dprint(FD_IO, "rgw_lookup failed to create filehandle for %s\n", + object); + goto out; + } + + r = rgw_open(data->fs, object_fh, 0 /* posix flags */, 0 /* flags */); + if (!! r) { + dprint(FD_IO, "rgw_open failed to create filehandle for %s\n", + object); + rgw_fh_rele(data->fs, object_fh, RGW_FH_RELE_FLAG_NONE); + goto out; + } + + r = rgw_read(data->fs, object_fh, io_u->offset, io_u->xfer_buflen, + &nbytes, io_u->xfer_buf, RGW_READ_FLAG_NONE); + if (!! r) { + dprint(FD_IO, "rgw_read failed for %s\n", + object); + } + } else { + dprint(FD_IO, "%s: Warning: unhandled ddir: %d\n", __func__, + io_u->ddir); + } + + if (object_fh) { + r = rgw_close(data->fs, object_fh, 0 /* flags */); + + /* object_fh is closed but still reachable, save it */ + data->save_handle(object_fh); + } + + out: + /* + * Could return FIO_Q_QUEUED for a queued request, + * FIO_Q_COMPLETED for a completed request, and FIO_Q_BUSY + * if we could queue no more at this point (you'd have to + * define ->commit() to handle that. + */ + return FIO_Q_COMPLETED; + } + + int fio_librgw_commit(thread_data* td) + { + // commit() allows the engine to batch up queued requests to be submitted all + // at once. it would be natural for queue() to collect transactions in a list, + // and use commit() to pass them all to ObjectStore::queue_transactions(). but + // because we spread objects over multiple collections, we a) need to use a + // different sequencer for each collection, and b) are less likely to see a + // benefit from batching requests within a collection + return 0; + } + +/* + * Hook for opening the given file. Unless the engine has special + * needs, it usually just provides generic_open_file() as the handler. + */ + static int fio_librgw_open(struct thread_data *td, struct fio_file *f) + { + /* for now, let's try to avoid doing open/close in these hooks */ + return 0; + } + +/* + * Hook for closing a file. See fio_librgw_open(). + */ + static int fio_librgw_close(struct thread_data *td, struct fio_file *f) + { + /* for now, let's try to avoid doing open/close in these hooks */ + return 0; + } + +/* XXX next two probably not needed */ + int fio_librgw_io_u_init(thread_data* td, io_u* u) + { + // no data is allocated, we just use the pointer as a boolean 'completed' flag + u->engine_data = nullptr; + return 0; + } + + void fio_librgw_io_u_free(thread_data* td, io_u* u) + { + u->engine_data = nullptr; + } + + struct librgw_ioengine : public ioengine_ops + { + librgw_ioengine() : ioengine_ops({}) { + name = "librgw"; + version = FIO_IOOPS_VERSION; + flags = FIO_DISKLESSIO; + setup = fio_librgw_setup; + init = fio_librgw_init; + queue = fio_librgw_queue; + commit = fio_librgw_commit; + getevents = fio_librgw_getevents; + event = fio_librgw_event; + cleanup = fio_librgw_cleanup; + open_file = fio_librgw_open; + close_file = fio_librgw_close; + io_u_init = fio_librgw_io_u_init; + io_u_free = fio_librgw_io_u_free; + options = ::options.data(); + option_struct_size = sizeof(opt_struct); + } + }; + +} // namespace + +extern "C" { +// the exported fio engine interface + void get_ioengine(struct ioengine_ops** ioengine_ptr) { + static librgw_ioengine ioengine; + *ioengine_ptr = &ioengine; + } +} // extern "C" diff --git a/src/test/fio/ring_buffer.h b/src/test/fio/ring_buffer.h new file mode 100644 index 000000000..0e1eb62be --- /dev/null +++ b/src/test/fio/ring_buffer.h @@ -0,0 +1,102 @@ +/* + * Very simple and fast lockless ring buffer implementatation for + * one producer and one consumer. + */ + +#include <stdint.h> +#include <stddef.h> + +/* Do not overcomplicate, choose generic x86 case */ +#define L1_CACHE_BYTES 64 +#define __cacheline_aligned __attribute__((__aligned__(L1_CACHE_BYTES))) + +struct ring_buffer +{ + unsigned int read_idx __cacheline_aligned; + unsigned int write_idx __cacheline_aligned; + unsigned int size; + unsigned int low_mask; + unsigned int high_mask; + unsigned int bit_shift; + void *data_ptr; +}; + +static inline unsigned int upper_power_of_two(unsigned int v) +{ + v--; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v++; + + return v; +} + +static inline int ring_buffer_init(struct ring_buffer* rbuf, unsigned int size) +{ + /* Must be pow2 */ + if (((size-1) & size)) + size = upper_power_of_two(size); + + size *= sizeof(void *); + rbuf->data_ptr = malloc(size); + rbuf->size = size; + rbuf->read_idx = 0; + rbuf->write_idx = 0; + rbuf->bit_shift = __builtin_ffs(sizeof(void *))-1; + rbuf->low_mask = rbuf->size - 1; + rbuf->high_mask = rbuf->size * 2 - 1; + + return 0; +} + +static inline void ring_buffer_deinit(struct ring_buffer* rbuf) +{ + free(rbuf->data_ptr); +} + +static inline unsigned int ring_buffer_used_size(const struct ring_buffer* rbuf) +{ + __sync_synchronize(); + return ((rbuf->write_idx - rbuf->read_idx) & rbuf->high_mask) >> + rbuf->bit_shift; +} + +static inline void ring_buffer_enqueue(struct ring_buffer* rbuf, void *ptr) +{ + + unsigned int idx; + + /* + * Be aware: we do not check that buffer can be full, + * assume user of the ring buffer can't submit more. + */ + + idx = rbuf->write_idx & rbuf->low_mask; + *(void **)((uintptr_t)rbuf->data_ptr + idx) = ptr; + /* Barrier to be sure stored pointer will be seen properly */ + __sync_synchronize(); + rbuf->write_idx = (rbuf->write_idx + sizeof(ptr)) & rbuf->high_mask; +} + +static inline void *ring_buffer_dequeue(struct ring_buffer* rbuf) +{ + + unsigned idx; + void *ptr; + + /* + * Be aware: we do not check that buffer can be empty, + * assume user of the ring buffer called ring_buffer_used_size(), + * which returns actual used size and introduces memory barrier + * explicitly. + */ + + idx = rbuf->read_idx & rbuf->low_mask; + ptr = *(void **)((uintptr_t)rbuf->data_ptr + idx); + rbuf->read_idx = (rbuf->read_idx + sizeof(ptr)) & rbuf->high_mask; + + return ptr; +} |