summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/forward_iterator_bench.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db/forward_iterator_bench.cc377
1 files changed, 377 insertions, 0 deletions
diff --git a/src/rocksdb/db/forward_iterator_bench.cc b/src/rocksdb/db/forward_iterator_bench.cc
new file mode 100644
index 000000000..6f1223537
--- /dev/null
+++ b/src/rocksdb/db/forward_iterator_bench.cc
@@ -0,0 +1,377 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#if !defined(GFLAGS) || defined(ROCKSDB_LITE)
+#include <cstdio>
+int main() {
+ fprintf(stderr, "Please install gflags to run rocksdb tools\n");
+ return 1;
+}
+#elif defined(OS_MACOSX) || defined(OS_WIN)
+// Block forward_iterator_bench under MAC and Windows
+int main() { return 0; }
+#else
+#include <semaphore.h>
+#include <atomic>
+#include <bitset>
+#include <chrono>
+#include <climits>
+#include <condition_variable>
+#include <limits>
+#include <mutex>
+#include <queue>
+#include <random>
+#include <thread>
+
+#include "port/port.h"
+#include "rocksdb/cache.h"
+#include "rocksdb/db.h"
+#include "rocksdb/status.h"
+#include "rocksdb/table.h"
+#include "test_util/testharness.h"
+#include "util/gflags_compat.h"
+
+const int MAX_SHARDS = 100000;
+
+DEFINE_int32(writers, 8, "");
+DEFINE_int32(readers, 8, "");
+DEFINE_int64(rate, 100000, "");
+DEFINE_int64(value_size, 300, "");
+DEFINE_int64(shards, 1000, "");
+DEFINE_int64(memtable_size, 500000000, "");
+DEFINE_int64(block_cache_size, 300000000, "");
+DEFINE_int64(block_size, 65536, "");
+DEFINE_double(runtime, 300.0, "");
+DEFINE_bool(cache_only_first, true, "");
+DEFINE_bool(iterate_upper_bound, true, "");
+
+struct Stats {
+ char pad1[128] __attribute__((__unused__));
+ std::atomic<uint64_t> written{0};
+ char pad2[128] __attribute__((__unused__));
+ std::atomic<uint64_t> read{0};
+ std::atomic<uint64_t> cache_misses{0};
+ char pad3[128] __attribute__((__unused__));
+} stats;
+
+struct Key {
+ Key() {}
+ Key(uint64_t shard_in, uint64_t seqno_in)
+ : shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {}
+
+ uint64_t shard() const { return be64toh(shard_be); }
+ uint64_t seqno() const { return be64toh(seqno_be); }
+
+ private:
+ uint64_t shard_be;
+ uint64_t seqno_be;
+} __attribute__((__packed__));
+
+struct Reader;
+struct Writer;
+
+struct ShardState {
+ char pad1[128] __attribute__((__unused__));
+ std::atomic<uint64_t> last_written{0};
+ Writer* writer;
+ Reader* reader;
+ char pad2[128] __attribute__((__unused__));
+ std::atomic<uint64_t> last_read{0};
+ std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it;
+ std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it_cacheonly;
+ Key upper_bound;
+ ROCKSDB_NAMESPACE::Slice upper_bound_slice;
+ char pad3[128] __attribute__((__unused__));
+};
+
+struct Reader {
+ public:
+ explicit Reader(std::vector<ShardState>* shard_states,
+ ROCKSDB_NAMESPACE::DB* db)
+ : shard_states_(shard_states), db_(db) {
+ sem_init(&sem_, 0, 0);
+ thread_ = port::Thread(&Reader::run, this);
+ }
+
+ void run() {
+ while (1) {
+ sem_wait(&sem_);
+ if (done_.load()) {
+ break;
+ }
+
+ uint64_t shard;
+ {
+ std::lock_guard<std::mutex> guard(queue_mutex_);
+ assert(!shards_pending_queue_.empty());
+ shard = shards_pending_queue_.front();
+ shards_pending_queue_.pop();
+ shards_pending_set_.reset(shard);
+ }
+ readOnceFromShard(shard);
+ }
+ }
+
+ void readOnceFromShard(uint64_t shard) {
+ ShardState& state = (*shard_states_)[shard];
+ if (!state.it) {
+ // Initialize iterators
+ ROCKSDB_NAMESPACE::ReadOptions options;
+ options.tailing = true;
+ if (FLAGS_iterate_upper_bound) {
+ state.upper_bound = Key(shard, std::numeric_limits<uint64_t>::max());
+ state.upper_bound_slice = ROCKSDB_NAMESPACE::Slice(
+ (const char*)&state.upper_bound, sizeof(state.upper_bound));
+ options.iterate_upper_bound = &state.upper_bound_slice;
+ }
+
+ state.it.reset(db_->NewIterator(options));
+
+ if (FLAGS_cache_only_first) {
+ options.read_tier = ROCKSDB_NAMESPACE::ReadTier::kBlockCacheTier;
+ state.it_cacheonly.reset(db_->NewIterator(options));
+ }
+ }
+
+ const uint64_t upto = state.last_written.load();
+ for (ROCKSDB_NAMESPACE::Iterator* it :
+ {state.it_cacheonly.get(), state.it.get()}) {
+ if (it == nullptr) {
+ continue;
+ }
+ if (state.last_read.load() >= upto) {
+ break;
+ }
+ bool need_seek = true;
+ for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) {
+ if (need_seek) {
+ Key from(shard, state.last_read.load() + 1);
+ it->Seek(ROCKSDB_NAMESPACE::Slice((const char*)&from, sizeof(from)));
+ need_seek = false;
+ } else {
+ it->Next();
+ }
+ if (it->status().IsIncomplete()) {
+ ++::stats.cache_misses;
+ break;
+ }
+ assert(it->Valid());
+ assert(it->key().size() == sizeof(Key));
+ Key key;
+ memcpy(&key, it->key().data(), it->key().size());
+ // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
+ // shard, seq, key.shard(), key.seqno());
+ assert(key.shard() == shard);
+ assert(key.seqno() == seq);
+ state.last_read.store(seq);
+ ++::stats.read;
+ }
+ }
+ }
+
+ void onWrite(uint64_t shard) {
+ {
+ std::lock_guard<std::mutex> guard(queue_mutex_);
+ if (!shards_pending_set_.test(shard)) {
+ shards_pending_queue_.push(shard);
+ shards_pending_set_.set(shard);
+ sem_post(&sem_);
+ }
+ }
+ }
+
+ ~Reader() {
+ done_.store(true);
+ sem_post(&sem_);
+ thread_.join();
+ }
+
+ private:
+ char pad1[128] __attribute__((__unused__));
+ std::vector<ShardState>* shard_states_;
+ ROCKSDB_NAMESPACE::DB* db_;
+ ROCKSDB_NAMESPACE::port::Thread thread_;
+ sem_t sem_;
+ std::mutex queue_mutex_;
+ std::bitset<MAX_SHARDS + 1> shards_pending_set_;
+ std::queue<uint64_t> shards_pending_queue_;
+ std::atomic<bool> done_{false};
+ char pad2[128] __attribute__((__unused__));
+};
+
+struct Writer {
+ explicit Writer(std::vector<ShardState>* shard_states,
+ ROCKSDB_NAMESPACE::DB* db)
+ : shard_states_(shard_states), db_(db) {}
+
+ void start() { thread_ = port::Thread(&Writer::run, this); }
+
+ void run() {
+ std::queue<std::chrono::steady_clock::time_point> workq;
+ std::chrono::steady_clock::time_point deadline(
+ std::chrono::steady_clock::now() +
+ std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime)));
+ std::vector<uint64_t> my_shards;
+ for (int i = 1; i <= FLAGS_shards; ++i) {
+ if ((*shard_states_)[i].writer == this) {
+ my_shards.push_back(i);
+ }
+ }
+
+ std::mt19937 rng{std::random_device()()};
+ std::uniform_int_distribution<int> shard_dist(
+ 0, static_cast<int>(my_shards.size()) - 1);
+ std::string value(FLAGS_value_size, '*');
+
+ while (1) {
+ auto now = std::chrono::steady_clock::now();
+ if (FLAGS_runtime >= 0 && now >= deadline) {
+ break;
+ }
+ if (workq.empty()) {
+ for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) {
+ std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate);
+ workq.push(now + offset);
+ }
+ }
+ while (!workq.empty() && workq.front() < now) {
+ workq.pop();
+ uint64_t shard = my_shards[shard_dist(rng)];
+ ShardState& state = (*shard_states_)[shard];
+ uint64_t seqno = state.last_written.load() + 1;
+ Key key(shard, seqno);
+ // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
+ ROCKSDB_NAMESPACE::Status status =
+ db_->Put(ROCKSDB_NAMESPACE::WriteOptions(),
+ ROCKSDB_NAMESPACE::Slice((const char*)&key, sizeof(key)),
+ ROCKSDB_NAMESPACE::Slice(value));
+ assert(status.ok());
+ state.last_written.store(seqno);
+ state.reader->onWrite(shard);
+ ++::stats.written;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ // fprintf(stderr, "Writer done\n");
+ }
+
+ ~Writer() { thread_.join(); }
+
+ private:
+ char pad1[128] __attribute__((__unused__));
+ std::vector<ShardState>* shard_states_;
+ ROCKSDB_NAMESPACE::DB* db_;
+ ROCKSDB_NAMESPACE::port::Thread thread_;
+ char pad2[128] __attribute__((__unused__));
+};
+
+struct StatsThread {
+ explicit StatsThread(ROCKSDB_NAMESPACE::DB* db)
+ : db_(db), thread_(&StatsThread::run, this) {}
+
+ void run() {
+ // using namespace std::chrono;
+ auto tstart = std::chrono::steady_clock::now(), tlast = tstart;
+ uint64_t wlast = 0, rlast = 0;
+ while (!done_.load()) {
+ {
+ std::unique_lock<std::mutex> lock(cvm_);
+ cv_.wait_for(lock, std::chrono::seconds(1));
+ }
+ auto now = std::chrono::steady_clock::now();
+ double elapsed =
+ std::chrono::duration_cast<std::chrono::duration<double> >(
+ now - tlast).count();
+ uint64_t w = ::stats.written.load();
+ uint64_t r = ::stats.read.load();
+ fprintf(stderr,
+ "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
+ "r/s %10.0f | cache misses %10ld\n",
+ db_->GetEnv()->TimeToString(time(nullptr)).c_str(),
+ std::chrono::duration_cast<std::chrono::seconds>(now - tstart)
+ .count(),
+ w, (w - wlast) / elapsed, r, (r - rlast) / elapsed,
+ ::stats.cache_misses.load());
+ wlast = w;
+ rlast = r;
+ tlast = now;
+ }
+ }
+
+ ~StatsThread() {
+ {
+ std::lock_guard<std::mutex> guard(cvm_);
+ done_.store(true);
+ }
+ cv_.notify_all();
+ thread_.join();
+ }
+
+ private:
+ ROCKSDB_NAMESPACE::DB* db_;
+ std::mutex cvm_;
+ std::condition_variable cv_;
+ ROCKSDB_NAMESPACE::port::Thread thread_;
+ std::atomic<bool> done_{false};
+};
+
+int main(int argc, char** argv) {
+ GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
+
+ std::mt19937 rng{std::random_device()()};
+ ROCKSDB_NAMESPACE::Status status;
+ std::string path =
+ ROCKSDB_NAMESPACE::test::PerThreadDBPath("forward_iterator_test");
+ fprintf(stderr, "db path is %s\n", path.c_str());
+ ROCKSDB_NAMESPACE::Options options;
+ options.create_if_missing = true;
+ options.compression = ROCKSDB_NAMESPACE::CompressionType::kNoCompression;
+ options.compaction_style =
+ ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleNone;
+ options.level0_slowdown_writes_trigger = 99999;
+ options.level0_stop_writes_trigger = 99999;
+ options.use_direct_io_for_flush_and_compaction = true;
+ options.write_buffer_size = FLAGS_memtable_size;
+ ROCKSDB_NAMESPACE::BlockBasedTableOptions table_options;
+ table_options.block_cache =
+ ROCKSDB_NAMESPACE::NewLRUCache(FLAGS_block_cache_size);
+ table_options.block_size = FLAGS_block_size;
+ options.table_factory.reset(
+ ROCKSDB_NAMESPACE::NewBlockBasedTableFactory(table_options));
+
+ status = ROCKSDB_NAMESPACE::DestroyDB(path, options);
+ assert(status.ok());
+ ROCKSDB_NAMESPACE::DB* db_raw;
+ status = ROCKSDB_NAMESPACE::DB::Open(options, path, &db_raw);
+ assert(status.ok());
+ std::unique_ptr<ROCKSDB_NAMESPACE::DB> db(db_raw);
+
+ std::vector<ShardState> shard_states(FLAGS_shards + 1);
+ std::deque<Reader> readers;
+ while (static_cast<int>(readers.size()) < FLAGS_readers) {
+ readers.emplace_back(&shard_states, db_raw);
+ }
+ std::deque<Writer> writers;
+ while (static_cast<int>(writers.size()) < FLAGS_writers) {
+ writers.emplace_back(&shard_states, db_raw);
+ }
+
+ // Each shard gets a random reader and random writer assigned to it
+ for (int i = 1; i <= FLAGS_shards; ++i) {
+ std::uniform_int_distribution<int> reader_dist(0, FLAGS_readers - 1);
+ std::uniform_int_distribution<int> writer_dist(0, FLAGS_writers - 1);
+ shard_states[i].reader = &readers[reader_dist(rng)];
+ shard_states[i].writer = &writers[writer_dist(rng)];
+ }
+
+ StatsThread stats_thread(db_raw);
+ for (Writer& w : writers) {
+ w.start();
+ }
+
+ writers.clear();
+ readers.clear();
+}
+#endif // !defined(GFLAGS) || defined(ROCKSDB_LITE)