diff options
Diffstat (limited to 'src/rocksdb/tools/db_repl_stress.cc')
-rw-r--r-- | src/rocksdb/tools/db_repl_stress.cc | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/src/rocksdb/tools/db_repl_stress.cc b/src/rocksdb/tools/db_repl_stress.cc new file mode 100644 index 00000000..c640b594 --- /dev/null +++ b/src/rocksdb/tools/db_repl_stress.cc @@ -0,0 +1,159 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE +#ifndef GFLAGS +#include <cstdio> +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#else + +#include <atomic> +#include <cstdio> + +#include "db/write_batch_internal.h" +#include "rocksdb/db.h" +#include "rocksdb/types.h" +#include "util/gflags_compat.h" +#include "util/testutil.h" + +// Run a thread to perform Put's. +// Another thread uses GetUpdatesSince API to keep getting the updates. +// options : +// --num_inserts = the num of inserts the first thread should perform. +// --wal_ttl = the wal ttl for the run. + +using namespace rocksdb; + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; +using GFLAGS_NAMESPACE::SetUsageMessage; + +struct DataPumpThread { + size_t no_records; + DB* db; // Assumption DB is Open'ed already. +}; + +static std::string RandomString(Random* rnd, int len) { + std::string r; + test::RandomString(rnd, len, &r); + return r; +} + +static void DataPumpThreadBody(void* arg) { + DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg); + DB* db = t->db; + Random rnd(301); + size_t i = 0; + while (i++ < t->no_records) { + if (!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)), + Slice(RandomString(&rnd, 500))) + .ok()) { + fprintf(stderr, "Error in put\n"); + exit(1); + } + } +} + +struct ReplicationThread { + std::atomic<bool> stop; + DB* db; + volatile size_t no_read; +}; + +static void ReplicationThreadBody(void* arg) { + ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg); + DB* db = t->db; + std::unique_ptr<TransactionLogIterator> iter; + SequenceNumber currentSeqNum = 1; + while (!t->stop.load(std::memory_order_acquire)) { + iter.reset(); + Status s; + while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) { + if (t->stop.load(std::memory_order_acquire)) { + return; + } + } + fprintf(stderr, "Refreshing iterator\n"); + for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { + BatchResult res = iter->GetBatch(); + if (res.sequence != currentSeqNum) { + fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", + (long)currentSeqNum, (long)res.sequence); + exit(1); + } + } + } +} + +DEFINE_uint64(num_inserts, 1000, + "the num of inserts the first thread should" + " perform."); +DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)"); +DEFINE_uint64(wal_size_limit_MB, 10, + "the wal size limit for the run" + "(in MB)"); + +int main(int argc, const char** argv) { + SetUsageMessage( + std::string("\nUSAGE:\n") + std::string(argv[0]) + + " --num_inserts=<num_inserts> --wal_ttl_seconds=<WAL_ttl_seconds>" + + " --wal_size_limit_MB=<WAL_size_limit_MB>"); + ParseCommandLineFlags(&argc, const_cast<char***>(&argv), true); + + Env* env = Env::Default(); + std::string default_db_path; + env->GetTestDirectory(&default_db_path); + default_db_path += "db_repl_stress"; + Options options; + options.create_if_missing = true; + options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; + options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; + DB* db; + DestroyDB(default_db_path, options); + + Status s = DB::Open(options, default_db_path, &db); + + if (!s.ok()) { + fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str()); + exit(1); + } + + DataPumpThread dataPump; + dataPump.no_records = FLAGS_num_inserts; + dataPump.db = db; + env->StartThread(DataPumpThreadBody, &dataPump); + + ReplicationThread replThread; + replThread.db = db; + replThread.no_read = 0; + replThread.stop.store(false, std::memory_order_release); + + env->StartThread(ReplicationThreadBody, &replThread); + while (replThread.no_read < FLAGS_num_inserts) + ; + replThread.stop.store(true, std::memory_order_release); + if (replThread.no_read < dataPump.no_records) { + // no. read should be => than inserted. + fprintf(stderr, + "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt + " Written : %" ROCKSDB_PRIszt "\n", + replThread.no_read, dataPump.no_records); + exit(1); + } + fprintf(stderr, "Successful!\n"); + exit(0); +} + +#endif // GFLAGS + +#else // ROCKSDB_LITE +#include <stdio.h> +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "Not supported in lite mode.\n"); + return 1; +} +#endif // ROCKSDB_LITE |