// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // How to use this example // Open two terminals, in one of them, run `./multi_processes_example 0` to // start a process running the primary instance. This will create a new DB in // kDBPath. The process will run for a while inserting keys to the normal // RocksDB database. // Next, go to the other terminal and run `./multi_processes_example 1` to // start a process running the secondary instance. This will create a secondary // instance following the aforementioned primary instance. This process will // run for a while, tailing the logs of the primary. After process with primary // instance exits, this process will keep running until you hit 'CTRL+C'. #include #include #include #include #include #include #include #include #if defined(OS_LINUX) #include #include #include #include #include #include #endif // !OS_LINUX #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor; using ROCKSDB_NAMESPACE::ColumnFamilyHandle; using ROCKSDB_NAMESPACE::ColumnFamilyOptions; using ROCKSDB_NAMESPACE::DB; using ROCKSDB_NAMESPACE::FlushOptions; using ROCKSDB_NAMESPACE::Iterator; using ROCKSDB_NAMESPACE::Options; using ROCKSDB_NAMESPACE::ReadOptions; using ROCKSDB_NAMESPACE::Slice; using ROCKSDB_NAMESPACE::Status; using ROCKSDB_NAMESPACE::WriteOptions; const std::string kDBPath = "/tmp/rocksdb_multi_processes_example"; const std::string kPrimaryStatusFile = "/tmp/rocksdb_multi_processes_example_primary_status"; const uint64_t kMaxKey = 600000; const size_t kMaxValueLength = 256; const size_t kNumKeysPerFlush = 1000; const std::vector& GetColumnFamilyNames() { static std::vector column_family_names = { ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, "pikachu"}; return column_family_names; } inline bool IsLittleEndian() { uint32_t x = 1; return *reinterpret_cast(&x) != 0; } static std::atomic& ShouldSecondaryWait() { static std::atomic should_secondary_wait{1}; return should_secondary_wait; } static std::string Key(uint64_t k) { std::string ret; if (IsLittleEndian()) { ret.append(reinterpret_cast(&k), sizeof(k)); } else { char buf[sizeof(k)]; buf[0] = k & 0xff; buf[1] = (k >> 8) & 0xff; buf[2] = (k >> 16) & 0xff; buf[3] = (k >> 24) & 0xff; buf[4] = (k >> 32) & 0xff; buf[5] = (k >> 40) & 0xff; buf[6] = (k >> 48) & 0xff; buf[7] = (k >> 56) & 0xff; ret.append(buf, sizeof(k)); } size_t i = 0, j = ret.size() - 1; while (i < j) { char tmp = ret[i]; ret[i] = ret[j]; ret[j] = tmp; ++i; --j; } return ret; } static uint64_t Key(std::string key) { assert(key.size() == sizeof(uint64_t)); size_t i = 0, j = key.size() - 1; while (i < j) { char tmp = key[i]; key[i] = key[j]; key[j] = tmp; ++i; --j; } uint64_t ret = 0; if (IsLittleEndian()) { memcpy(&ret, key.c_str(), sizeof(uint64_t)); } else { const char* buf = key.c_str(); ret |= static_cast(buf[0]); ret |= (static_cast(buf[1]) << 8); ret |= (static_cast(buf[2]) << 16); ret |= (static_cast(buf[3]) << 24); ret |= (static_cast(buf[4]) << 32); ret |= (static_cast(buf[5]) << 40); ret |= (static_cast(buf[6]) << 48); ret |= (static_cast(buf[7]) << 56); } return ret; } static Slice GenerateRandomValue(const size_t max_length, char scratch[]) { size_t sz = 1 + (std::rand() % max_length); int rnd = std::rand(); for (size_t i = 0; i != sz; ++i) { scratch[i] = static_cast(rnd ^ i); } return Slice(scratch, sz); } static bool ShouldCloseDB() { return true; } // TODO: port this example to other systems. It should be straightforward for // POSIX-compliant systems. #if defined(OS_LINUX) void CreateDB() { long my_pid = static_cast(getpid()); Options options; Status s = ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options); if (!s.ok()) { fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid, s.ToString().c_str()); assert(false); } options.create_if_missing = true; DB* db = nullptr; s = DB::Open(options, kDBPath, &db); if (!s.ok()) { fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, s.ToString().c_str()); assert(false); } std::vector handles; ColumnFamilyOptions cf_opts(options); for (const auto& cf_name : GetColumnFamilyNames()) { if (ROCKSDB_NAMESPACE::kDefaultColumnFamilyName != cf_name) { ColumnFamilyHandle* handle = nullptr; s = db->CreateColumnFamily(cf_opts, cf_name, &handle); if (!s.ok()) { fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid, cf_name.c_str(), s.ToString().c_str()); assert(false); } handles.push_back(handle); } } fprintf(stdout, "[process %ld] Column families created\n", my_pid); for (auto h : handles) { delete h; } handles.clear(); delete db; } void RunPrimary() { long my_pid = static_cast(getpid()); fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid); CreateDB(); std::srand(time(nullptr)); DB* db = nullptr; Options options; options.create_if_missing = false; std::vector column_families; for (const auto& cf_name : GetColumnFamilyNames()) { column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); } std::vector handles; WriteOptions write_opts; char val_buf[kMaxValueLength] = {0}; uint64_t curr_key = 0; while (curr_key < kMaxKey) { Status s; if (nullptr == db) { s = DB::Open(options, kDBPath, column_families, &handles, &db); if (!s.ok()) { fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, s.ToString().c_str()); assert(false); } } assert(nullptr != db); assert(handles.size() == GetColumnFamilyNames().size()); for (auto h : handles) { assert(nullptr != h); for (size_t i = 0; i != kNumKeysPerFlush; ++i) { Slice key = Key(curr_key + static_cast(i)); Slice value = GenerateRandomValue(kMaxValueLength, val_buf); s = db->Put(write_opts, h, key, value); if (!s.ok()) { fprintf(stderr, "[process %ld] Failed to insert\n", my_pid); assert(false); } } s = db->Flush(FlushOptions(), h); if (!s.ok()) { fprintf(stderr, "[process %ld] Failed to flush\n", my_pid); assert(false); } } curr_key += static_cast(kNumKeysPerFlush); if (ShouldCloseDB()) { for (auto h : handles) { delete h; } handles.clear(); delete db; db = nullptr; } } if (nullptr != db) { for (auto h : handles) { delete h; } handles.clear(); delete db; db = nullptr; } fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid); } void secondary_instance_sigint_handler(int signal) { ShouldSecondaryWait().store(0, std::memory_order_relaxed); fprintf(stdout, "\n"); fflush(stdout); }; void RunSecondary() { ::signal(SIGINT, secondary_instance_sigint_handler); long my_pid = static_cast(getpid()); const std::string kSecondaryPath = "/tmp/rocksdb_multi_processes_example_secondary"; // Create directory if necessary if (nullptr == opendir(kSecondaryPath.c_str())) { int ret = mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); if (ret < 0) { perror("failed to create directory for secondary instance"); exit(0); } } DB* db = nullptr; Options options; options.create_if_missing = false; options.max_open_files = -1; Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db); if (!s.ok()) { fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n", my_pid, s.ToString().c_str()); assert(false); } else { fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid); } ReadOptions ropts; ropts.verify_checksums = true; ropts.total_order_seek = true; std::vector test_threads; test_threads.emplace_back([&]() { while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { std::unique_ptr iter(db->NewIterator(ropts)); iter->SeekToFirst(); size_t count = 0; for (; iter->Valid(); iter->Next()) { ++count; } } fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid); }); test_threads.emplace_back([&]() { std::srand(time(nullptr)); while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { Slice key = Key(std::rand() % kMaxKey); std::string value; db->Get(ropts, key, &value); } fprintf(stdout, "[process %ld] Point lookup thread finished\n"); }); uint64_t curr_key = 0; while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { s = db->TryCatchUpWithPrimary(); if (!s.ok()) { fprintf(stderr, "[process %ld] error while trying to catch up with " "primary %s\n", my_pid, s.ToString().c_str()); assert(false); } { std::unique_ptr iter(db->NewIterator(ropts)); if (!iter) { fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid); assert(false); } iter->SeekToLast(); if (iter->Valid()) { uint64_t curr_max_key = Key(iter->key().ToString()); if (curr_max_key != curr_key) { fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid, curr_key); curr_key = curr_max_key; } } } std::this_thread::sleep_for(std::chrono::seconds(1)); } s = db->TryCatchUpWithPrimary(); if (!s.ok()) { fprintf(stderr, "[process %ld] error while trying to catch up with " "primary %s\n", my_pid, s.ToString().c_str()); assert(false); } std::vector column_families; for (const auto& cf_name : GetColumnFamilyNames()) { column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); } std::vector handles; DB* verification_db = nullptr; s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles, &verification_db); assert(s.ok()); Iterator* iter1 = verification_db->NewIterator(ropts); iter1->SeekToFirst(); Iterator* iter = db->NewIterator(ropts); iter->SeekToFirst(); for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) { if (iter->key().ToString() != iter1->key().ToString()) { fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n", Key(iter->key().ToString()), Key(iter1->key().ToString())); assert(false); } else if (iter->value().ToString() != iter1->value().ToString()) { fprintf(stderr, "Value mismatch\n"); assert(false); } } fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid); for (auto& thr : test_threads) { thr.join(); } delete iter; delete iter1; delete db; delete verification_db; } int main(int argc, char** argv) { if (argc < 2) { fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]); return 0; } if (atoi(argv[1]) == 0) { RunPrimary(); } else { RunSecondary(); } return 0; } #else // OS_LINUX int main() { fpritnf(stderr, "Not implemented.\n"); return 0; } #endif // !OS_LINUX