diff options
Diffstat (limited to 'src/test/kv_store_bench.cc')
-rw-r--r-- | src/test/kv_store_bench.cc | 564 |
1 files changed, 564 insertions, 0 deletions
diff --git a/src/test/kv_store_bench.cc b/src/test/kv_store_bench.cc new file mode 100644 index 00000000..76cc375e --- /dev/null +++ b/src/test/kv_store_bench.cc @@ -0,0 +1,564 @@ +/* + * KvStoreBench.cc + * + * Created on: Aug 23, 2012 + * Author: eleanor + */ + +#include "test/kv_store_bench.h" +#include "key_value_store/key_value_structure.h" +#include "key_value_store/kv_flat_btree_async.h" +#include "include/rados/librados.hpp" +#include "test/omap_bench.h" +#include "common/ceph_argparse.h" + + +#include <string> +#include <climits> +#include <iostream> +#include <sstream> +#include <cmath> + +KvStoreBench::KvStoreBench() +: entries(30), + ops(100), + clients(5), + key_size(5), + val_size(7), + max_ops_in_flight(8), + clear_first(false), + k(2), + cache_size(10), + cache_refresh(1), + client_name("admin"), + verbose(false), + kvs(NULL), + data_lock("data lock"), + ops_in_flight(0), + ops_in_flight_lock("KvStoreBench::ops_in_flight_lock"), + rados_id("admin"), + pool_name("rbd"), + io_ctx_ready(false) +{ + probs[25] = 'i'; + probs[50] = 'u'; + probs[75] = 'd'; + probs[100] = 'r'; +} + +KvStoreBench::~KvStoreBench() +{ + if (io_ctx_ready) { + librados::ObjectWriteOperation owo; + owo.remove(); + io_ctx.operate(client_name + ".done-setting", &owo); + } + delete kvs; +} + +int KvStoreBench::setup(int argc, const char** argv) { + vector<const char*> args; + argv_to_vec(argc,argv,args); + srand(time(NULL)); + + stringstream help; + help + << "Usage: KvStoreBench [options]\n" + << "Generate latency and throughput statistics for the key value store\n" + << "\n" + << "There are two sets of options - workload options affect the kind of\n" + << "test to run, while algorithm options affect how the key value\n" + << "store handles the workload.\n" + << "\n" + << "There are about entries / k objects in the store to begin with.\n" + << "Higher k values reduce the likelihood of splits and the likelihood\n" + << "multiple writers simultaneously faling to write because an object \n" + << "is full, but having a high k also means there will be more object\n" + << "contention.\n" + << "\n" + << "WORKLOAD OPTIONS\n" + << " --name <client name> client name (default admin)\n" + << " --entries <number> number of key/value pairs to store initially\n" + << " (default " << entries << ")\n" + << " --ops <number> number of operations to run\n" + << " --keysize <number> number of characters per key (default " << key_size << ")\n" + << " --valsize <number> number of characters per value (default " << val_size << ")\n" + << " -t <number> number of operations in flight concurrently\n" + << " (default " << max_ops_in_flight << ")\n" + << " --clients <number> tells this instance how many total clients are. Note that\n" + << " changing this does not change the number of clients." + << " -d <insert> <update> <delete> <read> percent (1-100) of operations that should be of each type\n" + << " (default 25 25 25 25)\n" + << " -r <number> random seed to use (default time(0))\n" + << "ALGORITHM OPTIONS\n" + << " --kval k, where each object has a number of entries\n" + << " >= k and <= 2k.\n" + << " --cache-size number of index entries to keep in cache\n" + << " (default " << cache_size << ")\n" + << " --cache-refresh percent (1-100) of cache-size to read each \n" + << " time the index is read\n" + << "OTHER OPTIONS\n" + << " --verbosity-on display debug output\n" + << " --clear-first delete all existing objects in the pool before running tests\n"; + for (unsigned i = 0; i < args.size(); i++) { + if(i < args.size() - 1) { + if (strcmp(args[i], "--ops") == 0) { + ops = atoi(args[i+1]); + } else if (strcmp(args[i], "--entries") == 0) { + entries = atoi(args[i+1]); + } else if (strcmp(args[i], "--kval") == 0) { + k = atoi(args[i+1]); + } else if (strcmp(args[i], "--keysize") == 0) { + key_size = atoi(args[i+1]); + } else if (strcmp(args[i], "--valsize") == 0) { + val_size = atoi(args[i+1]); + } else if (strcmp(args[i], "--cache-size") == 0) { + cache_size = atoi(args[i+1]); + } else if (strcmp(args[i], "--cache-refresh") == 0) { + auto temp = atoi(args[i+1]); + assert (temp != 0); + cache_refresh = 100 / (double)temp; + } else if (strcmp(args[i], "-t") == 0) { + max_ops_in_flight = atoi(args[i+1]); + } else if (strcmp(args[i], "--clients") == 0) { + clients = atoi(args[i+1]); + } else if (strcmp(args[i], "-d") == 0) { + if (i + 4 >= args.size()) { + cout << "Invalid arguments after -d: there must be 4 of them." + << std::endl; + continue; + } else { + probs.clear(); + int sum = atoi(args[i + 1]); + probs[sum] = 'i'; + sum += atoi(args[i + 2]); + probs[sum] = 'u'; + sum += atoi(args[i + 3]); + probs[sum] = 'd'; + sum += atoi(args[i + 4]); + probs[sum] = 'r'; + if (sum != 100) { + cout << "Invalid arguments after -d: they must add to 100." + << std::endl; + } + } + } else if (strcmp(args[i], "--name") == 0) { + client_name = args[i+1]; + } else if (strcmp(args[i], "-r") == 0) { + srand(atoi(args[i+1])); + } + } else if (strcmp(args[i], "--verbosity-on") == 0) { + verbose = true; + } else if (strcmp(args[i], "--clear-first") == 0) { + clear_first = true; + } else if (strcmp(args[i], "--help") == 0) { + cout << help.str() << std::endl; + exit(1); + } + } + + KvFlatBtreeAsync * kvba = new KvFlatBtreeAsync(k, client_name, cache_size, + cache_refresh, verbose); + kvs = kvba; + + int r = rados.init(rados_id.c_str()); + if (r < 0) { + cout << "error during init" << std::endl; + return r; + } + r = rados.conf_parse_argv(argc, argv); + if (r < 0) { + cout << "error during parsing args" << std::endl; + return r; + } + r = rados.conf_parse_env(NULL); + if (r < 0) { + cout << "error during parsing env" << std::endl; + return r; + } + r = rados.conf_read_file(NULL); + if (r < 0) { + cout << "error during read file" << std::endl; + return r; + } + r = rados.connect(); + if (r < 0) { + cout << "error during connect: " << r << std::endl; + return r; + } + r = rados.ioctx_create(pool_name.c_str(), io_ctx); + if (r < 0) { + cout << "error creating io ctx" << std::endl; + rados.shutdown(); + return r; + } + io_ctx_ready = true; + + if (clear_first) { + librados::NObjectIterator it; + for (it = io_ctx.nobjects_begin(); it != io_ctx.nobjects_end(); ++it) { + librados::ObjectWriteOperation rm; + rm.remove(); + io_ctx.operate(it->get_oid(), &rm); + } + } + + int err = kvs->setup(argc, argv); + if (err < 0 && err != -17) { + cout << "error during setup of kvs: " << err << std::endl; + return err; + } + + return 0; +} + +string KvStoreBench::random_string(int len) { + string ret; + string alphanum = "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + for (int i = 0; i < len; ++i) { + ret.push_back(alphanum[rand() % (alphanum.size() - 1)]); + } + + return ret; +} + +pair<string, bufferlist> KvStoreBench::rand_distr(bool new_elem) { + pair<string, bufferlist> ret; + if (new_elem) { + ret = make_pair(random_string(key_size), + KvFlatBtreeAsync::to_bl(random_string(val_size))); + key_set.insert(ret.first); + } else { + if (key_set.size() == 0) { + return make_pair("",KvFlatBtreeAsync::to_bl("")); + } + string get_string = random_string(key_size); + std::set<string>::iterator it = key_set.lower_bound(get_string); + if (it == key_set.end()) { + ret.first = *(key_set.rbegin()); + } else { + ret.first = *it; + } + ret.second = KvFlatBtreeAsync::to_bl(random_string(val_size)); + } + return ret; +} + +int KvStoreBench::test_random_insertions() { + int err; + if (entries == 0) { + return 0; + } + stringstream prev_ss; + prev_ss << (atoi(client_name.c_str()) - 1); + string prev_rid = prev_ss.str(); + stringstream last_ss; + if (client_name.size() > 1) { + last_ss << client_name.substr(0,client_name.size() - 2); + } + last_ss << clients - 1; + string last_rid = client_name == "admin" ? "admin" : last_ss.str(); + + map<string, bufferlist> big_map; + for (int i = 0; i < entries; i++) { + bufferlist bfr; + bfr.append(random_string(7)); + big_map[random_string(5)] = bfr; + } + + uint64_t uint; + time_t t; + if (client_name[client_name.size() - 1] != '0' && client_name != "admin") { + do { + librados::ObjectReadOperation oro; + oro.stat(&uint, &t, &err); + err = io_ctx.operate(prev_rid + ".done-setting", &oro, NULL); + if (verbose) cout << "reading " << prev_rid << ": err = " << err + << std::endl; + } while (err != 0); + cout << "detected " << prev_rid << ".done-setting" << std::endl; + } + + cout << "testing random insertions"; + err = kvs->set_many(big_map); + if (err < 0) { + cout << "error setting things" << std::endl; + return err; + } + + librados::ObjectWriteOperation owo; + owo.create(true); + io_ctx.operate(client_name + ".done-setting", &owo); + cout << "created " << client_name + ".done-setting. waiting for " + << last_rid << ".done-setting" << std::endl; + + do { + librados::ObjectReadOperation oro; + oro.stat(&uint, &t, &err); + err = io_ctx.operate(last_rid + ".done-setting", &oro, NULL); + } while (err != 0); + cout << "detected " << last_rid << ".done-setting" << std::endl; + + return err; +} + +void KvStoreBench::aio_callback_timed(int * err, void *arg) { + timed_args *args = reinterpret_cast<timed_args *>(arg); + Mutex * ops_in_flight_lock = &args->kvsb->ops_in_flight_lock; + Mutex * data_lock = &args->kvsb->data_lock; + Cond * op_avail = &args->kvsb->op_avail; + int *ops_in_flight = &args->kvsb->ops_in_flight; + if (*err < 0 && *err != -61) { + cerr << "Error during " << args->op << " operation: " << *err << std::endl; + } + + args->sw.stop_time(); + double time = args->sw.get_time(); + args->sw.clear(); + + data_lock->Lock(); + //latency + args->kvsb->data.latency_jf.open_object_section("latency"); + args->kvsb->data.latency_jf.dump_float(string(1, args->op).c_str(), + time); + args->kvsb->data.latency_jf.close_section(); + + //throughput + args->kvsb->data.throughput_jf.open_object_section("throughput"); + args->kvsb->data.throughput_jf.dump_unsigned(string(1, args->op).c_str(), + ceph_clock_now()); + args->kvsb->data.throughput_jf.close_section(); + + data_lock->Unlock(); + + ops_in_flight_lock->Lock(); + (*ops_in_flight)--; + op_avail->Signal(); + ops_in_flight_lock->Unlock(); + + delete args; +} + +int KvStoreBench::test_teuthology_aio(next_gen_t distr, + const map<int, char> &probs) +{ + int err = 0; + cout << "inserting initial entries..." << std::endl; + err = test_random_insertions(); + if (err < 0) { + return err; + } + cout << "finished inserting initial entries. Waiting 10 seconds for everyone" + << " to catch up..." << std::endl; + + sleep(10); + + cout << "done waiting. Starting random operations..." << std::endl; + + Mutex::Locker l(ops_in_flight_lock); + for (int i = 0; i < ops; i++) { + ceph_assert(ops_in_flight <= max_ops_in_flight); + if (ops_in_flight == max_ops_in_flight) { + int err = op_avail.Wait(ops_in_flight_lock); + if (err < 0) { + ceph_abort(); + return err; + } + ceph_assert(ops_in_flight < max_ops_in_flight); + } + cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / " + << ops << std::endl; + timed_args * cb_args = new timed_args(this); + pair<string, bufferlist> kv; + int random = (rand() % 100); + cb_args->op = probs.lower_bound(random)->second; + switch (cb_args->op) { + case 'i': + kv = (((KvStoreBench *)this)->*distr)(true); + if (kv.first == "") { + i--; + delete cb_args; + continue; + } + ops_in_flight++; + cb_args->sw.start_time(); + kvs->aio_set(kv.first, kv.second, false, aio_callback_timed, + cb_args, &cb_args->err); + break; + case 'u': + kv = (((KvStoreBench *)this)->*distr)(false); + if (kv.first == "") { + i--; + delete cb_args; + continue; + } + ops_in_flight++; + cb_args->sw.start_time(); + kvs->aio_set(kv.first, kv.second, true, aio_callback_timed, + cb_args, &cb_args->err); + break; + case 'd': + kv = (((KvStoreBench *)this)->*distr)(false); + if (kv.first == "") { + i--; + delete cb_args; + continue; + } + key_set.erase(kv.first); + ops_in_flight++; + cb_args->sw.start_time(); + kvs->aio_remove(kv.first, aio_callback_timed, cb_args, &cb_args->err); + break; + case 'r': + kv = (((KvStoreBench *)this)->*distr)(false); + if (kv.first == "") { + i--; + delete cb_args; + continue; + } + bufferlist val; + ops_in_flight++; + cb_args->sw.start_time(); + kvs->aio_get(kv.first, &cb_args->val, aio_callback_timed, + cb_args, &cb_args->err); + break; + } + + delete cb_args; + } + + while(ops_in_flight > 0) { + op_avail.Wait(ops_in_flight_lock); + } + + print_time_data(); + return err; +} + +int KvStoreBench::test_teuthology_sync(next_gen_t distr, + const map<int, char> &probs) +{ + int err = 0; + err = test_random_insertions(); + if (err < 0) { + return err; + } + sleep(10); + for (int i = 0; i < ops; i++) { + StopWatch sw; + pair<char, double> d; + cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / " + << ops << std::endl; + pair<string, bufferlist> kv; + int random = (rand() % 100); + d.first = probs.lower_bound(random)->second; + switch (d.first) { + case 'i': + kv = (((KvStoreBench *)this)->*distr)(true); + if (kv.first == "") { + i--; + continue; + } + sw.start_time(); + err = kvs->set(kv.first, kv.second, true); + sw.stop_time(); + if (err < 0) { + cout << "Error setting " << kv << ": " << err << std::endl; + return err; + } + break; + case 'u': + kv = (((KvStoreBench *)this)->*distr)(false); + if (kv.first == "") { + i--; + continue; + } + sw.start_time(); + err = kvs->set(kv.first, kv.second, true); + sw.stop_time(); + if (err < 0 && err != -61) { + cout << "Error updating " << kv << ": " << err << std::endl; + return err; + } + break; + case 'd': + kv = (((KvStoreBench *)this)->*distr)(false); + if (kv.first == "") { + i--; + continue; + } + key_set.erase(kv.first); + sw.start_time(); + err = kvs->remove(kv.first); + sw.stop_time(); + if (err < 0 && err != -61) { + cout << "Error removing " << kv << ": " << err << std::endl; + return err; + } + break; + case 'r': + kv = (((KvStoreBench *)this)->*distr)(false); + if (kv.first == "") { + i--; + continue; + } + bufferlist val; + sw.start_time(); + err = kvs->get(kv.first, &kv.second); + sw.stop_time(); + if (err < 0 && err != -61) { + cout << "Error getting " << kv << ": " << err << std::endl; + return err; + } + break; + } + + double time = sw.get_time(); + d.second = time; + sw.clear(); + //latency + data.latency_jf.open_object_section("latency"); + data.latency_jf.dump_float(string(1, d.first).c_str(), + time); + data.latency_jf.close_section(); + } + + print_time_data(); + return err; +} + +void KvStoreBench::print_time_data() { + cout << "========================================================\n"; + cout << "latency:" << std::endl; + data.latency_jf.flush(cout); + cout << std::endl; + cout << "throughput:" << std::endl; + data.throughput_jf.flush(cout); + cout << "\n========================================================" + << std::endl; +} + +int KvStoreBench::teuthology_tests() { + int err = 0; + if (max_ops_in_flight > 1) { + test_teuthology_aio(&KvStoreBench::rand_distr, probs); + } else { + err = test_teuthology_sync(&KvStoreBench::rand_distr, probs); + } + return err; +} + +int main(int argc, const char** argv) { + KvStoreBench kvsb; + int err = kvsb.setup(argc, argv); + if (err == 0) cout << "setup successful" << std::endl; + else{ + cout << "error " << err << std::endl; + return err; + } + err = kvsb.teuthology_tests(); + if (err < 0) return err; + return 0; +}; |