From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/test/omap_bench.cc | 431 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 431 insertions(+) create mode 100644 src/test/omap_bench.cc (limited to 'src/test/omap_bench.cc') diff --git a/src/test/omap_bench.cc b/src/test/omap_bench.cc new file mode 100644 index 000000000..714774f12 --- /dev/null +++ b/src/test/omap_bench.cc @@ -0,0 +1,431 @@ +/* + * Generate latency statistics for a configurable number of write + * operations of configurable size. + * + * Created on: May 21, 2012 + * Author: Eleanor Cawthon + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "include/rados/librados.hpp" +#include "include/Context.h" +#include "common/ceph_context.h" +#include "common/ceph_mutex.h" +#include "common/Cond.h" +#include "include/utime.h" +#include "common/ceph_argparse.h" +#include "test/omap_bench.h" + +#include +#include +#include +#include +#include + +using namespace std; +using ceph::bufferlist; + +int OmapBench::setup(int argc, const char** argv) { + //parse key_value_store_bench args + auto args = argv_to_vec(argc, argv); + for (unsigned i = 0; i < args.size(); i++) { + if(i < args.size() - 1) { + if (strcmp(args[i], "-t") == 0) { + threads = atoi(args[i+1]); + } else if (strcmp(args[i], "-o") == 0) { + objects = atoi(args[i+1]); + } else if (strcmp(args[i], "--entries") == 0) { + entries_per_omap = atoi(args[i+1]); + } else if (strcmp(args[i], "--keysize") == 0) { + key_size = atoi(args[i+1]); + } else if (strcmp(args[i], "--valsize") == 0) { + value_size = atoi(args[i+1]); + } else if (strcmp(args[i], "--inc") == 0) { + increment = atoi(args[i+1]); + } else if (strcmp(args[i], "--omaptype") == 0) { + if(strcmp("rand",args[i+1]) == 0) { + omap_generator = OmapBench::generate_non_uniform_omap; + } + else if (strcmp("uniform", args[i+1]) == 0) { + omap_generator = OmapBench::generate_uniform_omap; + } + } else if (strcmp(args[i], "--name") == 0) { + rados_id = args[i+1]; + } + } else if (strcmp(args[i], "--help") == 0) { + cout << "\nUsage: ostorebench [options]\n" + << "Generate latency statistics for a configurable number of " + << "key value pair operations of\n" + << "configurable size.\n\n" + << "OPTIONS\n" + << " -t number of threads to use (default "<data_lock.lock(); + name << omap_bench->prefix << ++(ob->data.started_ops); + ob->data_lock.unlock(); + oid = name.str(); +} +void Writer::start_time() { + begin_time = ceph_clock_now(); +} +void Writer::stop_time() { + end_time = ceph_clock_now(); +} +double Writer::get_time() { + return (end_time - begin_time) * 1000; +} +string Writer::get_oid() { + return oid; +} +std::map & Writer::get_omap() { + return omap; +} + +//AioWriter functions +AioWriter::AioWriter(OmapBench *ob) : Writer(ob) { + aioc = NULL; +} +AioWriter::~AioWriter() { + if(aioc) aioc->release(); +} +librados::AioCompletion * AioWriter::get_aioc() { + return aioc; +} +void AioWriter::set_aioc(librados::callback_t complete) { + aioc = ob->rados.aio_create_completion(this, complete); +} + + +//Helper methods +void OmapBench::aio_is_complete(rados_completion_t c, void *arg) { + AioWriter *aiow = reinterpret_cast(arg); + aiow->stop_time(); + ceph::mutex * data_lock = &aiow->ob->data_lock; + ceph::mutex * thread_is_free_lock = &aiow->ob->thread_is_free_lock; + ceph::condition_variable* thread_is_free = &aiow->ob->thread_is_free; + int &busythreads_count = aiow->ob->busythreads_count; + o_bench_data &data = aiow->ob->data; + int INCREMENT = aiow->ob->increment; + int err = aiow->get_aioc()->get_return_value(); + if (err < 0) { + cout << "error writing AioCompletion"; + return; + } + double time = aiow->get_time(); + delete aiow; + data_lock->lock(); + data.avg_latency = (data.avg_latency * data.completed_ops + time) + / (data.completed_ops + 1); + data.completed_ops++; + if (time < data.min_latency) { + data.min_latency = time; + } + if (time > data.max_latency) { + data.max_latency = time; + } + data.total_latency += time; + ++(data.freq_map[time / INCREMENT]); + if(data.freq_map[time/INCREMENT] > data.mode.second) { + data.mode.first = time/INCREMENT; + data.mode.second = data.freq_map[time/INCREMENT]; + } + data_lock->unlock(); + + thread_is_free_lock->lock(); + busythreads_count--; + thread_is_free->notify_all(); + thread_is_free_lock->unlock(); +} + +string OmapBench::random_string(int len) { + string ret; + string alphanum = "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + + for (int i = 0; i < len; ++i) { + ret.push_back(alphanum[rand() % (alphanum.size() - 1)]); + } + + return ret; +} + +int OmapBench::run() { + return (((OmapBench *)this)->*OmapBench::test)(omap_generator); +} + +int OmapBench::print_written_omap() { + for (int i = 1; i <= objects; i++) { + int err = 0; + librados::ObjectReadOperation key_read; + set out_keys; + map out_vals; + std::stringstream objstrm; + objstrm << prefix; + objstrm << i; + cout << "\nPrinting omap for "<::iterator iter = out_keys.begin(); + iter != out_keys.end(); ++iter) { + cout << *iter << "\t" << (out_vals)[*iter] << std::endl; + } + } + return 0; +} + +void OmapBench::print_results() { + cout << "========================================================"; + cout << "\nNumber of kvmaps written:\t" << objects; + cout << "\nNumber of ops at once:\t" << threads; + cout << "\nEntries per kvmap:\t\t" << entries_per_omap; + cout << "\nCharacters per key:\t" << key_size; + cout << "\nCharacters per val:\t" << value_size; + cout << std::endl; + cout << std::endl; + cout << "Average latency:\t" << data.avg_latency; + cout << "ms\nMinimum latency:\t" << data.min_latency; + cout << "ms\nMaximum latency:\t" << data.max_latency; + cout << "ms\nMode latency:\t\t"<<"between "<= "<< i * increment; + cout << "ms"; + int spaces; + if (i == 0) spaces = 4; + else spaces = 3 - floor(log10(i)); + for (int j = 0; j < spaces; j++) { + cout << " "; + } + cout << "["; + for(int j = 0; j < ((data.freq_map)[i])*45/(data.mode.second); j++) { + cout << "*"; + } + cout << std::endl; + } + cout << "\n========================================================" + << std::endl; +} + +int OmapBench::write_omap_asynchronously(AioWriter *aiow, + const std::map &omap) { + librados::ObjectWriteOperation owo; + owo.create(false); + owo.omap_clear(); + owo.omap_set(omap); + aiow->start_time(); + int err = io_ctx.aio_operate(aiow->get_oid(), aiow->get_aioc(), &owo); + if (err < 0) { + cout << "writing omap failed with code "< * out_omap) { + bufferlist bl; + + //setup omap + for (int i = 0; i < omap_entries; i++) { + bufferlist omap_val; + omap_val.append(random_string(value_size)); + string key = random_string(key_size); + (*out_omap)[key]= omap_val; + } + return 0; +} + +int OmapBench::generate_non_uniform_omap(const int omap_entries, + const int key_size, const int value_size, + std::map * out_omap) { + bufferlist bl; + + int num_entries = rand() % omap_entries + 1; + int key_len = rand() % key_size +1; + int val_len = rand() % value_size +1; + + //setup omap + for (int i = 0; i < num_entries; i++) { + bufferlist omap_val; + omap_val.append(random_string(val_len)); + string key = random_string(key_len); + (*out_omap)[key] = omap_val; + } + return 0; +} + +int OmapBench::generate_small_non_random_omap(const int omap_entries, + const int key_size, const int value_size, + std::map * out_omap) { + bufferlist bl; + stringstream key; + + //setup omap + for (int i = 0; i < omap_entries; i++) { + bufferlist omap_val; + omap_val.append("Value "); + omap_val.append(i); + key << "Key " << i; + (*out_omap)[key.str()]= omap_val; + } + return 0; +} + +//tests +int OmapBench::test_write_objects_in_parallel(omap_generator_t omap_gen) { + AioWriter *this_aio_writer; + + std::unique_lock l{thread_is_free_lock}; + for (int i = 0; i < objects; i++) { + ceph_assert(busythreads_count <= threads); + //wait for a writer to be free + if (busythreads_count == threads) { + thread_is_free.wait(l); + ceph_assert(busythreads_count < threads); + } + + //set up the write + this_aio_writer = new AioWriter(this); + this_aio_writer->set_aioc(comp); + + //perform the write + busythreads_count++; + int err = omap_gen(entries_per_omap, key_size, value_size, + & this_aio_writer->get_omap()); + if (err < 0) { + return err; + } + err = OmapBench::write_omap_asynchronously(this_aio_writer, + (this_aio_writer->get_omap())); + + + if (err < 0) { + return err; + } + } + thread_is_free.wait(l, [this] { return busythreads_count <= 0;}); + return 0; +} + +/** + * runs the specified test with the specified parameters and generates + * a histogram of latencies + */ +int main(int argc, const char** argv) { + OmapBench ob; + int err = ob.setup(argc, argv); + if (err<0) { + cout << "error during setup: "<