From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/rocksdb/examples/.gitignore | 9 + src/rocksdb/examples/Makefile | 53 +++ src/rocksdb/examples/README.md | 2 + src/rocksdb/examples/c_simple_example.c | 79 +++++ src/rocksdb/examples/column_families_example.cc | 72 ++++ src/rocksdb/examples/compact_files_example.cc | 171 +++++++++ src/rocksdb/examples/compaction_filter_example.cc | 88 +++++ src/rocksdb/examples/multi_processes_example.cc | 395 +++++++++++++++++++++ .../examples/optimistic_transaction_example.cc | 180 ++++++++++ src/rocksdb/examples/options_file_example.cc | 113 ++++++ .../examples/rocksdb_option_file_example.ini | 144 ++++++++ src/rocksdb/examples/simple_example.cc | 83 +++++ src/rocksdb/examples/transaction_example.cc | 186 ++++++++++ 13 files changed, 1575 insertions(+) create mode 100644 src/rocksdb/examples/.gitignore create mode 100644 src/rocksdb/examples/Makefile create mode 100644 src/rocksdb/examples/README.md create mode 100644 src/rocksdb/examples/c_simple_example.c create mode 100644 src/rocksdb/examples/column_families_example.cc create mode 100644 src/rocksdb/examples/compact_files_example.cc create mode 100644 src/rocksdb/examples/compaction_filter_example.cc create mode 100644 src/rocksdb/examples/multi_processes_example.cc create mode 100644 src/rocksdb/examples/optimistic_transaction_example.cc create mode 100644 src/rocksdb/examples/options_file_example.cc create mode 100644 src/rocksdb/examples/rocksdb_option_file_example.ini create mode 100644 src/rocksdb/examples/simple_example.cc create mode 100644 src/rocksdb/examples/transaction_example.cc (limited to 'src/rocksdb/examples') diff --git a/src/rocksdb/examples/.gitignore b/src/rocksdb/examples/.gitignore new file mode 100644 index 000000000..823664ae1 --- /dev/null +++ b/src/rocksdb/examples/.gitignore @@ -0,0 +1,9 @@ +c_simple_example +column_families_example +compact_files_example +compaction_filter_example +multi_processes_example +optimistic_transaction_example +options_file_example +simple_example +transaction_example diff --git a/src/rocksdb/examples/Makefile b/src/rocksdb/examples/Makefile new file mode 100644 index 000000000..27a6f0f42 --- /dev/null +++ b/src/rocksdb/examples/Makefile @@ -0,0 +1,53 @@ +include ../make_config.mk + +ifndef DISABLE_JEMALLOC + ifdef JEMALLOC + PLATFORM_CXXFLAGS += -DROCKSDB_JEMALLOC -DJEMALLOC_NO_DEMANGLE + endif + EXEC_LDFLAGS := $(JEMALLOC_LIB) $(EXEC_LDFLAGS) -lpthread + PLATFORM_CXXFLAGS += $(JEMALLOC_INCLUDE) +endif + +ifneq ($(USE_RTTI), 1) + CXXFLAGS += -fno-rtti +endif + +.PHONY: clean librocksdb + +all: simple_example column_families_example compact_files_example c_simple_example optimistic_transaction_example transaction_example compaction_filter_example options_file_example + +simple_example: librocksdb simple_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +column_families_example: librocksdb column_families_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +compaction_filter_example: librocksdb compaction_filter_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +compact_files_example: librocksdb compact_files_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +.c.o: + $(CC) $(CFLAGS) -c $< -o $@ -I../include + +c_simple_example: librocksdb c_simple_example.o + $(CXX) $@.o -o$@ ../librocksdb.a $(PLATFORM_LDFLAGS) $(EXEC_LDFLAGS) + +optimistic_transaction_example: librocksdb optimistic_transaction_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +transaction_example: librocksdb transaction_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +options_file_example: librocksdb options_file_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +multi_processes_example: librocksdb multi_processes_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +clean: + rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example + +librocksdb: + cd .. && $(MAKE) static_lib diff --git a/src/rocksdb/examples/README.md b/src/rocksdb/examples/README.md new file mode 100644 index 000000000..f4ba2384b --- /dev/null +++ b/src/rocksdb/examples/README.md @@ -0,0 +1,2 @@ +1. Compile RocksDB first by executing `make static_lib` in parent dir +2. Compile all examples: `cd examples/; make all` diff --git a/src/rocksdb/examples/c_simple_example.c b/src/rocksdb/examples/c_simple_example.c new file mode 100644 index 000000000..5564361d1 --- /dev/null +++ b/src/rocksdb/examples/c_simple_example.c @@ -0,0 +1,79 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include +#include +#include + +#include "rocksdb/c.h" + +#include // sysconf() - get CPU count + +const char DBPath[] = "/tmp/rocksdb_simple_example"; +const char DBBackupPath[] = "/tmp/rocksdb_simple_example_backup"; + +int main(int argc, char **argv) { + rocksdb_t *db; + rocksdb_backup_engine_t *be; + rocksdb_options_t *options = rocksdb_options_create(); + // Optimize RocksDB. This is the easiest way to + // get RocksDB to perform well + long cpus = sysconf(_SC_NPROCESSORS_ONLN); // get # of online cores + rocksdb_options_increase_parallelism(options, (int)(cpus)); + rocksdb_options_optimize_level_style_compaction(options, 0); + // create the DB if it's not already present + rocksdb_options_set_create_if_missing(options, 1); + + // open DB + char *err = NULL; + db = rocksdb_open(options, DBPath, &err); + assert(!err); + + // open Backup Engine that we will use for backing up our database + be = rocksdb_backup_engine_open(options, DBBackupPath, &err); + assert(!err); + + // Put key-value + rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); + const char key[] = "key"; + const char *value = "value"; + rocksdb_put(db, writeoptions, key, strlen(key), value, strlen(value) + 1, + &err); + assert(!err); + // Get value + rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); + size_t len; + char *returned_value = + rocksdb_get(db, readoptions, key, strlen(key), &len, &err); + assert(!err); + assert(strcmp(returned_value, "value") == 0); + free(returned_value); + + // create new backup in a directory specified by DBBackupPath + rocksdb_backup_engine_create_new_backup(be, db, &err); + assert(!err); + + rocksdb_close(db); + + // If something is wrong, you might want to restore data from last backup + rocksdb_restore_options_t *restore_options = rocksdb_restore_options_create(); + rocksdb_backup_engine_restore_db_from_latest_backup(be, DBPath, DBPath, + restore_options, &err); + assert(!err); + rocksdb_restore_options_destroy(restore_options); + + db = rocksdb_open(options, DBPath, &err); + assert(!err); + + // cleanup + rocksdb_writeoptions_destroy(writeoptions); + rocksdb_readoptions_destroy(readoptions); + rocksdb_options_destroy(options); + rocksdb_backup_engine_close(be); + rocksdb_close(db); + + return 0; +} diff --git a/src/rocksdb/examples/column_families_example.cc b/src/rocksdb/examples/column_families_example.cc new file mode 100644 index 000000000..df936d46a --- /dev/null +++ b/src/rocksdb/examples/column_families_example.cc @@ -0,0 +1,72 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/options.h" + +using namespace ROCKSDB_NAMESPACE; + +std::string kDBPath = "/tmp/rocksdb_column_families_example"; + +int main() { + // open DB + Options options; + options.create_if_missing = true; + DB* db; + Status s = DB::Open(options, kDBPath, &db); + assert(s.ok()); + + // create column family + ColumnFamilyHandle* cf; + s = db->CreateColumnFamily(ColumnFamilyOptions(), "new_cf", &cf); + assert(s.ok()); + + // close DB + delete cf; + delete db; + + // open DB with two column families + std::vector column_families; + // have to open default column family + column_families.push_back(ColumnFamilyDescriptor( + kDefaultColumnFamilyName, ColumnFamilyOptions())); + // open the new one, too + column_families.push_back(ColumnFamilyDescriptor( + "new_cf", ColumnFamilyOptions())); + std::vector handles; + s = DB::Open(DBOptions(), kDBPath, column_families, &handles, &db); + assert(s.ok()); + + // put and get from non-default column family + s = db->Put(WriteOptions(), handles[1], Slice("key"), Slice("value")); + assert(s.ok()); + std::string value; + s = db->Get(ReadOptions(), handles[1], Slice("key"), &value); + assert(s.ok()); + + // atomic write + WriteBatch batch; + batch.Put(handles[0], Slice("key2"), Slice("value2")); + batch.Put(handles[1], Slice("key3"), Slice("value3")); + batch.Delete(handles[0], Slice("key")); + s = db->Write(WriteOptions(), &batch); + assert(s.ok()); + + // drop column family + s = db->DropColumnFamily(handles[1]); + assert(s.ok()); + + // close db + for (auto handle : handles) { + delete handle; + } + delete db; + + return 0; +} diff --git a/src/rocksdb/examples/compact_files_example.cc b/src/rocksdb/examples/compact_files_example.cc new file mode 100644 index 000000000..a0a9fa90a --- /dev/null +++ b/src/rocksdb/examples/compact_files_example.cc @@ -0,0 +1,171 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// An example code demonstrating how to use CompactFiles, EventListener, +// and GetColumnFamilyMetaData APIs to implement custom compaction algorithm. + +#include +#include +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" + +using namespace ROCKSDB_NAMESPACE; +std::string kDBPath = "/tmp/rocksdb_compact_files_example"; +struct CompactionTask; + +// This is an example interface of external-compaction algorithm. +// Compaction algorithm can be implemented outside the core-RocksDB +// code by using the pluggable compaction APIs that RocksDb provides. +class Compactor : public EventListener { + public: + // Picks and returns a compaction task given the specified DB + // and column family. It is the caller's responsibility to + // destroy the returned CompactionTask. Returns "nullptr" + // if it cannot find a proper compaction task. + virtual CompactionTask* PickCompaction( + DB* db, const std::string& cf_name) = 0; + + // Schedule and run the specified compaction task in background. + virtual void ScheduleCompaction(CompactionTask *task) = 0; +}; + +// Example structure that describes a compaction task. +struct CompactionTask { + CompactionTask( + DB* _db, Compactor* _compactor, + const std::string& _column_family_name, + const std::vector& _input_file_names, + const int _output_level, + const CompactionOptions& _compact_options, + bool _retry_on_fail) + : db(_db), + compactor(_compactor), + column_family_name(_column_family_name), + input_file_names(_input_file_names), + output_level(_output_level), + compact_options(_compact_options), + retry_on_fail(_retry_on_fail) {} + DB* db; + Compactor* compactor; + const std::string& column_family_name; + std::vector input_file_names; + int output_level; + CompactionOptions compact_options; + bool retry_on_fail; +}; + +// A simple compaction algorithm that always compacts everything +// to the highest level whenever possible. +class FullCompactor : public Compactor { + public: + explicit FullCompactor(const Options options) : options_(options) { + compact_options_.compression = options_.compression; + compact_options_.output_file_size_limit = + options_.target_file_size_base; + } + + // When flush happens, it determines whether to trigger compaction. If + // triggered_writes_stop is true, it will also set the retry flag of + // compaction-task to true. + void OnFlushCompleted( + DB* db, const FlushJobInfo& info) override { + CompactionTask* task = PickCompaction(db, info.cf_name); + if (task != nullptr) { + if (info.triggered_writes_stop) { + task->retry_on_fail = true; + } + // Schedule compaction in a different thread. + ScheduleCompaction(task); + } + } + + // Always pick a compaction which includes all files whenever possible. + CompactionTask* PickCompaction( + DB* db, const std::string& cf_name) override { + ColumnFamilyMetaData cf_meta; + db->GetColumnFamilyMetaData(&cf_meta); + + std::vector input_file_names; + for (auto level : cf_meta.levels) { + for (auto file : level.files) { + if (file.being_compacted) { + return nullptr; + } + input_file_names.push_back(file.name); + } + } + return new CompactionTask( + db, this, cf_name, input_file_names, + options_.num_levels - 1, compact_options_, false); + } + + // Schedule the specified compaction task in background. + void ScheduleCompaction(CompactionTask* task) override { + options_.env->Schedule(&FullCompactor::CompactFiles, task); + } + + static void CompactFiles(void* arg) { + std::unique_ptr task( + reinterpret_cast(arg)); + assert(task); + assert(task->db); + Status s = task->db->CompactFiles( + task->compact_options, + task->input_file_names, + task->output_level); + printf("CompactFiles() finished with status %s\n", s.ToString().c_str()); + if (!s.ok() && !s.IsIOError() && task->retry_on_fail) { + // If a compaction task with its retry_on_fail=true failed, + // try to schedule another compaction in case the reason + // is not an IO error. + CompactionTask* new_task = task->compactor->PickCompaction( + task->db, task->column_family_name); + task->compactor->ScheduleCompaction(new_task); + } + } + + private: + Options options_; + CompactionOptions compact_options_; +}; + +int main() { + Options options; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + // Small slowdown and stop trigger for experimental purpose. + options.level0_slowdown_writes_trigger = 3; + options.level0_stop_writes_trigger = 5; + options.IncreaseParallelism(5); + options.listeners.emplace_back(new FullCompactor(options)); + + DB* db = nullptr; + DestroyDB(kDBPath, options); + Status s = DB::Open(options, kDBPath, &db); + assert(s.ok()); + assert(db); + + // if background compaction is not working, write will stall + // because of options.level0_stop_writes_trigger + for (int i = 1000; i < 99999; ++i) { + db->Put(WriteOptions(), std::to_string(i), + std::string(500, 'a' + (i % 26))); + } + + // verify the values are still there + std::string value; + for (int i = 1000; i < 99999; ++i) { + db->Get(ReadOptions(), std::to_string(i), + &value); + assert(value == std::string(500, 'a' + (i % 26))); + } + + // close the db. + delete db; + + return 0; +} diff --git a/src/rocksdb/examples/compaction_filter_example.cc b/src/rocksdb/examples/compaction_filter_example.cc new file mode 100644 index 000000000..cee763195 --- /dev/null +++ b/src/rocksdb/examples/compaction_filter_example.cc @@ -0,0 +1,88 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include +#include +#include + +class MyMerge : public ROCKSDB_NAMESPACE::MergeOperator { + public: + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + merge_out->new_value.clear(); + if (merge_in.existing_value != nullptr) { + merge_out->new_value.assign(merge_in.existing_value->data(), + merge_in.existing_value->size()); + } + for (const ROCKSDB_NAMESPACE::Slice& m : merge_in.operand_list) { + fprintf(stderr, "Merge(%s)\n", m.ToString().c_str()); + // the compaction filter filters out bad values + assert(m.ToString() != "bad"); + merge_out->new_value.assign(m.data(), m.size()); + } + return true; + } + + const char* Name() const override { return "MyMerge"; } +}; + +class MyFilter : public ROCKSDB_NAMESPACE::CompactionFilter { + public: + bool Filter(int level, const ROCKSDB_NAMESPACE::Slice& key, + const ROCKSDB_NAMESPACE::Slice& existing_value, + std::string* new_value, bool* value_changed) const override { + fprintf(stderr, "Filter(%s)\n", key.ToString().c_str()); + ++count_; + assert(*value_changed == false); + return false; + } + + bool FilterMergeOperand( + int level, const ROCKSDB_NAMESPACE::Slice& key, + const ROCKSDB_NAMESPACE::Slice& existing_value) const override { + fprintf(stderr, "FilterMerge(%s)\n", key.ToString().c_str()); + ++merge_count_; + return existing_value == "bad"; + } + + const char* Name() const override { return "MyFilter"; } + + mutable int count_ = 0; + mutable int merge_count_ = 0; +}; + +int main() { + ROCKSDB_NAMESPACE::DB* raw_db; + ROCKSDB_NAMESPACE::Status status; + + MyFilter filter; + + int ret = system("rm -rf /tmp/rocksmergetest"); + if (ret != 0) { + fprintf(stderr, "Error deleting /tmp/rocksmergetest, code: %d\n", ret); + return ret; + } + ROCKSDB_NAMESPACE::Options options; + options.create_if_missing = true; + options.merge_operator.reset(new MyMerge); + options.compaction_filter = &filter; + status = ROCKSDB_NAMESPACE::DB::Open(options, "/tmp/rocksmergetest", &raw_db); + assert(status.ok()); + std::unique_ptr db(raw_db); + + ROCKSDB_NAMESPACE::WriteOptions wopts; + db->Merge(wopts, "0", "bad"); // This is filtered out + db->Merge(wopts, "1", "data1"); + db->Merge(wopts, "1", "bad"); + db->Merge(wopts, "1", "data2"); + db->Merge(wopts, "1", "bad"); + db->Merge(wopts, "3", "data3"); + db->CompactRange(ROCKSDB_NAMESPACE::CompactRangeOptions(), nullptr, nullptr); + fprintf(stderr, "filter.count_ = %d\n", filter.count_); + assert(filter.count_ == 0); + fprintf(stderr, "filter.merge_count_ = %d\n", filter.merge_count_); + assert(filter.merge_count_ == 6); +} diff --git a/src/rocksdb/examples/multi_processes_example.cc b/src/rocksdb/examples/multi_processes_example.cc new file mode 100644 index 000000000..3241e36ea --- /dev/null +++ b/src/rocksdb/examples/multi_processes_example.cc @@ -0,0 +1,395 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// How to use this example +// Open two terminals, in one of them, run `./multi_processes_example 0` to +// start a process running the primary instance. This will create a new DB in +// kDBPath. The process will run for a while inserting keys to the normal +// RocksDB database. +// Next, go to the other terminal and run `./multi_processes_example 1` to +// start a process running the secondary instance. This will create a secondary +// instance following the aforementioned primary instance. This process will +// run for a while, tailing the logs of the primary. After process with primary +// instance exits, this process will keep running until you hit 'CTRL+C'. + +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(OS_LINUX) +#include +#include +#include +#include +#include +#include +#endif // !OS_LINUX + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" + +using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor; +using ROCKSDB_NAMESPACE::ColumnFamilyHandle; +using ROCKSDB_NAMESPACE::ColumnFamilyOptions; +using ROCKSDB_NAMESPACE::DB; +using ROCKSDB_NAMESPACE::FlushOptions; +using ROCKSDB_NAMESPACE::Iterator; +using ROCKSDB_NAMESPACE::Options; +using ROCKSDB_NAMESPACE::ReadOptions; +using ROCKSDB_NAMESPACE::Slice; +using ROCKSDB_NAMESPACE::Status; +using ROCKSDB_NAMESPACE::WriteOptions; + +const std::string kDBPath = "/tmp/rocksdb_multi_processes_example"; +const std::string kPrimaryStatusFile = + "/tmp/rocksdb_multi_processes_example_primary_status"; +const uint64_t kMaxKey = 600000; +const size_t kMaxValueLength = 256; +const size_t kNumKeysPerFlush = 1000; + +const std::vector& GetColumnFamilyNames() { + static std::vector column_family_names = { + ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, "pikachu"}; + return column_family_names; +} + +inline bool IsLittleEndian() { + uint32_t x = 1; + return *reinterpret_cast(&x) != 0; +} + +static std::atomic& ShouldSecondaryWait() { + static std::atomic should_secondary_wait{1}; + return should_secondary_wait; +} + +static std::string Key(uint64_t k) { + std::string ret; + if (IsLittleEndian()) { + ret.append(reinterpret_cast(&k), sizeof(k)); + } else { + char buf[sizeof(k)]; + buf[0] = k & 0xff; + buf[1] = (k >> 8) & 0xff; + buf[2] = (k >> 16) & 0xff; + buf[3] = (k >> 24) & 0xff; + buf[4] = (k >> 32) & 0xff; + buf[5] = (k >> 40) & 0xff; + buf[6] = (k >> 48) & 0xff; + buf[7] = (k >> 56) & 0xff; + ret.append(buf, sizeof(k)); + } + size_t i = 0, j = ret.size() - 1; + while (i < j) { + char tmp = ret[i]; + ret[i] = ret[j]; + ret[j] = tmp; + ++i; + --j; + } + return ret; +} + +static uint64_t Key(std::string key) { + assert(key.size() == sizeof(uint64_t)); + size_t i = 0, j = key.size() - 1; + while (i < j) { + char tmp = key[i]; + key[i] = key[j]; + key[j] = tmp; + ++i; + --j; + } + uint64_t ret = 0; + if (IsLittleEndian()) { + memcpy(&ret, key.c_str(), sizeof(uint64_t)); + } else { + const char* buf = key.c_str(); + ret |= static_cast(buf[0]); + ret |= (static_cast(buf[1]) << 8); + ret |= (static_cast(buf[2]) << 16); + ret |= (static_cast(buf[3]) << 24); + ret |= (static_cast(buf[4]) << 32); + ret |= (static_cast(buf[5]) << 40); + ret |= (static_cast(buf[6]) << 48); + ret |= (static_cast(buf[7]) << 56); + } + return ret; +} + +static Slice GenerateRandomValue(const size_t max_length, char scratch[]) { + size_t sz = 1 + (std::rand() % max_length); + int rnd = std::rand(); + for (size_t i = 0; i != sz; ++i) { + scratch[i] = static_cast(rnd ^ i); + } + return Slice(scratch, sz); +} + +static bool ShouldCloseDB() { return true; } + +// TODO: port this example to other systems. It should be straightforward for +// POSIX-compliant systems. +#if defined(OS_LINUX) +void CreateDB() { + long my_pid = static_cast(getpid()); + Options options; + Status s = ROCKSDB_NAMESPACE::DestroyDB(kDBPath, options); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + options.create_if_missing = true; + DB* db = nullptr; + s = DB::Open(options, kDBPath, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + std::vector handles; + ColumnFamilyOptions cf_opts(options); + for (const auto& cf_name : GetColumnFamilyNames()) { + if (ROCKSDB_NAMESPACE::kDefaultColumnFamilyName != cf_name) { + ColumnFamilyHandle* handle = nullptr; + s = db->CreateColumnFamily(cf_opts, cf_name, &handle); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid, + cf_name.c_str(), s.ToString().c_str()); + assert(false); + } + handles.push_back(handle); + } + } + fprintf(stdout, "[process %ld] Column families created\n", my_pid); + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; +} + +void RunPrimary() { + long my_pid = static_cast(getpid()); + fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid); + CreateDB(); + std::srand(time(nullptr)); + DB* db = nullptr; + Options options; + options.create_if_missing = false; + std::vector column_families; + for (const auto& cf_name : GetColumnFamilyNames()) { + column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); + } + std::vector handles; + WriteOptions write_opts; + char val_buf[kMaxValueLength] = {0}; + uint64_t curr_key = 0; + while (curr_key < kMaxKey) { + Status s; + if (nullptr == db) { + s = DB::Open(options, kDBPath, column_families, &handles, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid, + s.ToString().c_str()); + assert(false); + } + } + assert(nullptr != db); + assert(handles.size() == GetColumnFamilyNames().size()); + for (auto h : handles) { + assert(nullptr != h); + for (size_t i = 0; i != kNumKeysPerFlush; ++i) { + Slice key = Key(curr_key + static_cast(i)); + Slice value = GenerateRandomValue(kMaxValueLength, val_buf); + s = db->Put(write_opts, h, key, value); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to insert\n", my_pid); + assert(false); + } + } + s = db->Flush(FlushOptions(), h); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to flush\n", my_pid); + assert(false); + } + } + curr_key += static_cast(kNumKeysPerFlush); + if (ShouldCloseDB()) { + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; + db = nullptr; + } + } + if (nullptr != db) { + for (auto h : handles) { + delete h; + } + handles.clear(); + delete db; + db = nullptr; + } + fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid); +} + +void secondary_instance_sigint_handler(int signal) { + ShouldSecondaryWait().store(0, std::memory_order_relaxed); + fprintf(stdout, "\n"); + fflush(stdout); +}; + +void RunSecondary() { + ::signal(SIGINT, secondary_instance_sigint_handler); + long my_pid = static_cast(getpid()); + const std::string kSecondaryPath = + "/tmp/rocksdb_multi_processes_example_secondary"; + // Create directory if necessary + if (nullptr == opendir(kSecondaryPath.c_str())) { + int ret = + mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if (ret < 0) { + perror("failed to create directory for secondary instance"); + exit(0); + } + } + DB* db = nullptr; + Options options; + options.create_if_missing = false; + options.max_open_files = -1; + Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db); + if (!s.ok()) { + fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } else { + fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid); + } + + ReadOptions ropts; + ropts.verify_checksums = true; + ropts.total_order_seek = true; + + std::vector test_threads; + test_threads.emplace_back([&]() { + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + std::unique_ptr iter(db->NewIterator(ropts)); + iter->SeekToFirst(); + size_t count = 0; + for (; iter->Valid(); iter->Next()) { + ++count; + } + } + fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid); + }); + + test_threads.emplace_back([&]() { + std::srand(time(nullptr)); + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + Slice key = Key(std::rand() % kMaxKey); + std::string value; + db->Get(ropts, key, &value); + } + fprintf(stdout, "[process %ld] Point lookup thread finished\n"); + }); + + uint64_t curr_key = 0; + while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) { + s = db->TryCatchUpWithPrimary(); + if (!s.ok()) { + fprintf(stderr, + "[process %ld] error while trying to catch up with " + "primary %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } + { + std::unique_ptr iter(db->NewIterator(ropts)); + if (!iter) { + fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid); + assert(false); + } + iter->SeekToLast(); + if (iter->Valid()) { + uint64_t curr_max_key = Key(iter->key().ToString()); + if (curr_max_key != curr_key) { + fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid, + curr_key); + curr_key = curr_max_key; + } + } + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + s = db->TryCatchUpWithPrimary(); + if (!s.ok()) { + fprintf(stderr, + "[process %ld] error while trying to catch up with " + "primary %s\n", + my_pid, s.ToString().c_str()); + assert(false); + } + + std::vector column_families; + for (const auto& cf_name : GetColumnFamilyNames()) { + column_families.push_back(ColumnFamilyDescriptor(cf_name, options)); + } + std::vector handles; + DB* verification_db = nullptr; + s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles, + &verification_db); + assert(s.ok()); + Iterator* iter1 = verification_db->NewIterator(ropts); + iter1->SeekToFirst(); + + Iterator* iter = db->NewIterator(ropts); + iter->SeekToFirst(); + for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) { + if (iter->key().ToString() != iter1->key().ToString()) { + fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n", + Key(iter->key().ToString()), Key(iter1->key().ToString())); + assert(false); + } else if (iter->value().ToString() != iter1->value().ToString()) { + fprintf(stderr, "Value mismatch\n"); + assert(false); + } + } + fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid); + for (auto& thr : test_threads) { + thr.join(); + } + delete iter; + delete iter1; + delete db; + delete verification_db; +} + +int main(int argc, char** argv) { + if (argc < 2) { + fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]); + return 0; + } + if (atoi(argv[1]) == 0) { + RunPrimary(); + } else { + RunSecondary(); + } + return 0; +} +#else // OS_LINUX +int main() { + fpritnf(stderr, "Not implemented.\n"); + return 0; +} +#endif // !OS_LINUX diff --git a/src/rocksdb/examples/optimistic_transaction_example.cc b/src/rocksdb/examples/optimistic_transaction_example.cc new file mode 100644 index 000000000..fd6dbad63 --- /dev/null +++ b/src/rocksdb/examples/optimistic_transaction_example.cc @@ -0,0 +1,180 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" + +using namespace ROCKSDB_NAMESPACE; + +std::string kDBPath = "/tmp/rocksdb_transaction_example"; + +int main() { + // open DB + Options options; + options.create_if_missing = true; + DB* db; + OptimisticTransactionDB* txn_db; + + Status s = OptimisticTransactionDB::Open(options, kDBPath, &txn_db); + assert(s.ok()); + db = txn_db->GetBaseDB(); + + WriteOptions write_options; + ReadOptions read_options; + OptimisticTransactionOptions txn_options; + std::string value; + + //////////////////////////////////////////////////////// + // + // Simple OptimisticTransaction Example ("Read Committed") + // + //////////////////////////////////////////////////////// + + // Start a transaction + Transaction* txn = txn_db->BeginTransaction(write_options); + assert(txn); + + // Read a key in this transaction + s = txn->Get(read_options, "abc", &value); + assert(s.IsNotFound()); + + // Write a key in this transaction + s = txn->Put("abc", "xyz"); + assert(s.ok()); + + // Read a key OUTSIDE this transaction. Does not affect txn. + s = db->Get(read_options, "abc", &value); + assert(s.IsNotFound()); + + // Write a key OUTSIDE of this transaction. + // Does not affect txn since this is an unrelated key. If we wrote key 'abc' + // here, the transaction would fail to commit. + s = db->Put(write_options, "xyz", "zzz"); + assert(s.ok()); + s = db->Put(write_options, "abc", "def"); + assert(s.ok()); + + // Commit transaction + s = txn->Commit(); + assert(s.IsBusy()); + delete txn; + + s = db->Get(read_options, "xyz", &value); + assert(s.ok()); + assert(value == "zzz"); + + s = db->Get(read_options, "abc", &value); + assert(s.ok()); + assert(value == "def"); + + //////////////////////////////////////////////////////// + // + // "Repeatable Read" (Snapshot Isolation) Example + // -- Using a single Snapshot + // + //////////////////////////////////////////////////////// + + // Set a snapshot at start of transaction by setting set_snapshot=true + txn_options.set_snapshot = true; + txn = txn_db->BeginTransaction(write_options, txn_options); + + const Snapshot* snapshot = txn->GetSnapshot(); + + // Write a key OUTSIDE of transaction + s = db->Put(write_options, "abc", "xyz"); + assert(s.ok()); + + // Read a key using the snapshot + read_options.snapshot = snapshot; + s = txn->GetForUpdate(read_options, "abc", &value); + assert(s.ok()); + assert(value == "def"); + + // Attempt to commit transaction + s = txn->Commit(); + + // Transaction could not commit since the write outside of the txn conflicted + // with the read! + assert(s.IsBusy()); + + delete txn; + // Clear snapshot from read options since it is no longer valid + read_options.snapshot = nullptr; + snapshot = nullptr; + + s = db->Get(read_options, "abc", &value); + assert(s.ok()); + assert(value == "xyz"); + + //////////////////////////////////////////////////////// + // + // "Read Committed" (Monotonic Atomic Views) Example + // --Using multiple Snapshots + // + //////////////////////////////////////////////////////// + + // In this example, we set the snapshot multiple times. This is probably + // only necessary if you have very strict isolation requirements to + // implement. + + // Set a snapshot at start of transaction + txn_options.set_snapshot = true; + txn = txn_db->BeginTransaction(write_options, txn_options); + + // Do some reads and writes to key "x" + read_options.snapshot = db->GetSnapshot(); + s = txn->Get(read_options, "x", &value); + assert(s.IsNotFound()); + s = txn->Put("x", "x"); + assert(s.ok()); + + // The transaction hasn't committed, so the write is not visible + // outside of txn. + s = db->Get(read_options, "x", &value); + assert(s.IsNotFound()); + + // Do a write outside of the transaction to key "y" + s = db->Put(write_options, "y", "z"); + assert(s.ok()); + + // Set a new snapshot in the transaction + txn->SetSnapshot(); + read_options.snapshot = db->GetSnapshot(); + + // Do some reads and writes to key "y" + s = txn->GetForUpdate(read_options, "y", &value); + assert(s.ok()); + assert(value == "z"); + txn->Put("y", "y"); + + // Commit. Since the snapshot was advanced, the write done outside of the + // transaction does not prevent this transaction from Committing. + s = txn->Commit(); + assert(s.ok()); + delete txn; + // Clear snapshot from read options since it is no longer valid + read_options.snapshot = nullptr; + + // txn is committed, read the latest values. + s = db->Get(read_options, "x", &value); + assert(s.ok()); + assert(value == "x"); + + s = db->Get(read_options, "y", &value); + assert(s.ok()); + assert(value == "y"); + + // Cleanup + delete txn_db; + DestroyDB(kDBPath, options); + return 0; +} + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/examples/options_file_example.cc b/src/rocksdb/examples/options_file_example.cc new file mode 100644 index 000000000..e6a1d0e9a --- /dev/null +++ b/src/rocksdb/examples/options_file_example.cc @@ -0,0 +1,113 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file demonstrates how to use the utility functions defined in +// rocksdb/utilities/options_util.h to open a rocksdb database without +// remembering all the rocksdb options. +#include +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/table.h" +#include "rocksdb/utilities/options_util.h" + +using namespace ROCKSDB_NAMESPACE; + +std::string kDBPath = "/tmp/rocksdb_options_file_example"; + +namespace { +// A dummy compaction filter +class DummyCompactionFilter : public CompactionFilter { + public: + virtual ~DummyCompactionFilter() {} + virtual bool Filter(int level, const Slice& key, const Slice& existing_value, + std::string* new_value, bool* value_changed) const { + return false; + } + virtual const char* Name() const { return "DummyCompactionFilter"; } +}; + +} // namespace + +int main() { + DBOptions db_opt; + db_opt.create_if_missing = true; + + std::vector cf_descs; + cf_descs.push_back({kDefaultColumnFamilyName, ColumnFamilyOptions()}); + cf_descs.push_back({"new_cf", ColumnFamilyOptions()}); + + // initialize BlockBasedTableOptions + auto cache = NewLRUCache(1 * 1024 * 1024 * 1024); + BlockBasedTableOptions bbt_opts; + bbt_opts.block_size = 32 * 1024; + bbt_opts.block_cache = cache; + + // initialize column families options + std::unique_ptr compaction_filter; + compaction_filter.reset(new DummyCompactionFilter()); + cf_descs[0].options.table_factory.reset(NewBlockBasedTableFactory(bbt_opts)); + cf_descs[0].options.compaction_filter = compaction_filter.get(); + cf_descs[1].options.table_factory.reset(NewBlockBasedTableFactory(bbt_opts)); + + // destroy and open DB + DB* db; + Status s = DestroyDB(kDBPath, Options(db_opt, cf_descs[0].options)); + assert(s.ok()); + s = DB::Open(Options(db_opt, cf_descs[0].options), kDBPath, &db); + assert(s.ok()); + + // Create column family, and rocksdb will persist the options. + ColumnFamilyHandle* cf; + s = db->CreateColumnFamily(ColumnFamilyOptions(), "new_cf", &cf); + assert(s.ok()); + + // close DB + delete cf; + delete db; + + // In the following code, we will reopen the rocksdb instance using + // the options file stored in the db directory. + + // Load the options file. + DBOptions loaded_db_opt; + std::vector loaded_cf_descs; + s = LoadLatestOptions(kDBPath, Env::Default(), &loaded_db_opt, + &loaded_cf_descs); + assert(s.ok()); + assert(loaded_db_opt.create_if_missing == db_opt.create_if_missing); + + // Initialize pointer options for each column family + for (size_t i = 0; i < loaded_cf_descs.size(); ++i) { + auto* loaded_bbt_opt = reinterpret_cast( + loaded_cf_descs[0].options.table_factory->GetOptions()); + // Expect the same as BlockBasedTableOptions will be loaded form file. + assert(loaded_bbt_opt->block_size == bbt_opts.block_size); + // However, block_cache needs to be manually initialized as documented + // in rocksdb/utilities/options_util.h. + loaded_bbt_opt->block_cache = cache; + } + // In addition, as pointer options are initialized with default value, + // we need to properly initialized all the pointer options if non-defalut + // values are used before calling DB::Open(). + assert(loaded_cf_descs[0].options.compaction_filter == nullptr); + loaded_cf_descs[0].options.compaction_filter = compaction_filter.get(); + + // reopen the db using the loaded options. + std::vector handles; + s = DB::Open(loaded_db_opt, kDBPath, loaded_cf_descs, &handles, &db); + assert(s.ok()); + + // close DB + for (auto* handle : handles) { + delete handle; + } + delete db; +} diff --git a/src/rocksdb/examples/rocksdb_option_file_example.ini b/src/rocksdb/examples/rocksdb_option_file_example.ini new file mode 100644 index 000000000..dcbc9a308 --- /dev/null +++ b/src/rocksdb/examples/rocksdb_option_file_example.ini @@ -0,0 +1,144 @@ +# This is a RocksDB option file. +# +# A typical RocksDB options file has four sections, which are +# Version section, DBOptions section, at least one CFOptions +# section, and one TableOptions section for each column family. +# The RocksDB options file in general follows the basic INI +# file format with the following extensions / modifications: +# +# * Escaped characters +# We escaped the following characters: +# - \n -- line feed - new line +# - \r -- carriage return +# - \\ -- backslash \ +# - \: -- colon symbol : +# - \# -- hash tag # +# * Comments +# We support # style comments. Comments can appear at the ending +# part of a line. +# * Statements +# A statement is of the form option_name = value. +# Each statement contains a '=', where extra white-spaces +# are supported. However, we don't support multi-lined statement. +# Furthermore, each line can only contain at most one statement. +# * Sections +# Sections are of the form [SecitonTitle "SectionArgument"], +# where section argument is optional. +# * List +# We use colon-separated string to represent a list. +# For instance, n1:n2:n3:n4 is a list containing four values. +# +# Below is an example of a RocksDB options file: +[Version] + rocksdb_version=4.3.0 + options_file_version=1.1 + +[DBOptions] + stats_dump_period_sec=600 + max_manifest_file_size=18446744073709551615 + bytes_per_sync=8388608 + delayed_write_rate=2097152 + WAL_ttl_seconds=0 + WAL_size_limit_MB=0 + max_subcompactions=1 + wal_dir= + wal_bytes_per_sync=0 + db_write_buffer_size=0 + keep_log_file_num=1000 + table_cache_numshardbits=4 + max_file_opening_threads=1 + writable_file_max_buffer_size=1048576 + random_access_max_buffer_size=1048576 + use_fsync=false + max_total_wal_size=0 + max_open_files=-1 + skip_stats_update_on_db_open=false + max_background_compactions=16 + manifest_preallocation_size=4194304 + max_background_flushes=7 + is_fd_close_on_exec=true + max_log_file_size=0 + advise_random_on_open=true + create_missing_column_families=false + paranoid_checks=true + delete_obsolete_files_period_micros=21600000000 + log_file_time_to_roll=0 + compaction_readahead_size=0 + create_if_missing=false + use_adaptive_mutex=false + enable_thread_tracking=false + allow_fallocate=true + error_if_exists=false + recycle_log_file_num=0 + skip_log_error_on_recovery=false + db_log_dir= + new_table_reader_for_compaction_inputs=true + allow_mmap_reads=false + allow_mmap_writes=false + use_direct_reads=false + use_direct_writes=false + + +[CFOptions "default"] + compaction_style=kCompactionStyleLevel + compaction_filter=nullptr + num_levels=6 + table_factory=BlockBasedTable + comparator=leveldb.BytewiseComparator + max_sequential_skip_in_iterations=8 + soft_rate_limit=0.000000 + max_bytes_for_level_base=1073741824 + memtable_prefix_bloom_probes=6 + memtable_prefix_bloom_bits=0 + memtable_prefix_bloom_huge_page_tlb_size=0 + max_successive_merges=0 + arena_block_size=16777216 + min_write_buffer_number_to_merge=1 + target_file_size_multiplier=1 + source_compaction_factor=1 + max_bytes_for_level_multiplier=8 + max_bytes_for_level_multiplier_additional=2:3:5 + compaction_filter_factory=nullptr + max_write_buffer_number=8 + level0_stop_writes_trigger=20 + compression=kSnappyCompression + level0_file_num_compaction_trigger=4 + purge_redundant_kvs_while_flush=true + max_write_buffer_size_to_maintain=0 + memtable_factory=SkipListFactory + max_grandparent_overlap_factor=8 + expanded_compaction_factor=25 + hard_pending_compaction_bytes_limit=137438953472 + inplace_update_num_locks=10000 + level_compaction_dynamic_level_bytes=true + level0_slowdown_writes_trigger=12 + filter_deletes=false + verify_checksums_in_compaction=true + min_partial_merge_operands=2 + paranoid_file_checks=false + target_file_size_base=134217728 + optimize_filters_for_hits=false + merge_operator=PutOperator + compression_per_level=kNoCompression:kNoCompression:kNoCompression:kSnappyCompression:kSnappyCompression:kSnappyCompression + compaction_measure_io_stats=false + prefix_extractor=nullptr + bloom_locality=0 + write_buffer_size=134217728 + disable_auto_compactions=false + inplace_update_support=false + +[TableOptions/BlockBasedTable "default"] + format_version=2 + whole_key_filtering=true + no_block_cache=false + checksum=kCRC32c + filter_policy=rocksdb.BuiltinBloomFilter + block_size_deviation=10 + block_size=8192 + block_restart_interval=16 + cache_index_and_filter_blocks=false + pin_l0_filter_and_index_blocks_in_cache=false + pin_top_level_index_and_filter=false + index_type=kBinarySearch + hash_index_allow_collision=true + flush_block_policy_factory=FlushBlockBySizePolicyFactory diff --git a/src/rocksdb/examples/simple_example.cc b/src/rocksdb/examples/simple_example.cc new file mode 100644 index 000000000..256c965d7 --- /dev/null +++ b/src/rocksdb/examples/simple_example.cc @@ -0,0 +1,83 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/options.h" + +using namespace ROCKSDB_NAMESPACE; + +std::string kDBPath = "/tmp/rocksdb_simple_example"; + +int main() { + DB* db; + Options options; + // Optimize RocksDB. This is the easiest way to get RocksDB to perform well + options.IncreaseParallelism(); + options.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options.create_if_missing = true; + + // open DB + Status s = DB::Open(options, kDBPath, &db); + assert(s.ok()); + + // Put key-value + s = db->Put(WriteOptions(), "key1", "value"); + assert(s.ok()); + std::string value; + // get value + s = db->Get(ReadOptions(), "key1", &value); + assert(s.ok()); + assert(value == "value"); + + // atomically apply a set of updates + { + WriteBatch batch; + batch.Delete("key1"); + batch.Put("key2", value); + s = db->Write(WriteOptions(), &batch); + } + + s = db->Get(ReadOptions(), "key1", &value); + assert(s.IsNotFound()); + + db->Get(ReadOptions(), "key2", &value); + assert(value == "value"); + + { + PinnableSlice pinnable_val; + db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val); + assert(pinnable_val == "value"); + } + + { + std::string string_val; + // If it cannot pin the value, it copies the value to its internal buffer. + // The intenral buffer could be set during construction. + PinnableSlice pinnable_val(&string_val); + db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val); + assert(pinnable_val == "value"); + // If the value is not pinned, the internal buffer must have the value. + assert(pinnable_val.IsPinned() || string_val == "value"); + } + + PinnableSlice pinnable_val; + db->Get(ReadOptions(), db->DefaultColumnFamily(), "key1", &pinnable_val); + assert(s.IsNotFound()); + // Reset PinnableSlice after each use and before each reuse + pinnable_val.Reset(); + db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val); + assert(pinnable_val == "value"); + pinnable_val.Reset(); + // The Slice pointed by pinnable_val is not valid after this point + + delete db; + + return 0; +} diff --git a/src/rocksdb/examples/transaction_example.cc b/src/rocksdb/examples/transaction_example.cc new file mode 100644 index 000000000..41b233544 --- /dev/null +++ b/src/rocksdb/examples/transaction_example.cc @@ -0,0 +1,186 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" + +using namespace ROCKSDB_NAMESPACE; + +std::string kDBPath = "/tmp/rocksdb_transaction_example"; + +int main() { + // open DB + Options options; + TransactionDBOptions txn_db_options; + options.create_if_missing = true; + TransactionDB* txn_db; + + Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db); + assert(s.ok()); + + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + std::string value; + + //////////////////////////////////////////////////////// + // + // Simple Transaction Example ("Read Committed") + // + //////////////////////////////////////////////////////// + + // Start a transaction + Transaction* txn = txn_db->BeginTransaction(write_options); + assert(txn); + + // Read a key in this transaction + s = txn->Get(read_options, "abc", &value); + assert(s.IsNotFound()); + + // Write a key in this transaction + s = txn->Put("abc", "def"); + assert(s.ok()); + + // Read a key OUTSIDE this transaction. Does not affect txn. + s = txn_db->Get(read_options, "abc", &value); + assert(s.IsNotFound()); + + // Write a key OUTSIDE of this transaction. + // Does not affect txn since this is an unrelated key. + s = txn_db->Put(write_options, "xyz", "zzz"); + assert(s.ok()); + + // Write a key OUTSIDE of this transaction. + // Fail because the key conflicts with the key written in txn. + s = txn_db->Put(write_options, "abc", "def"); + assert(s.subcode() == Status::kLockTimeout); + + // Value for key "xyz" has been committed, can be read in txn. + s = txn->Get(read_options, "xyz", &value); + assert(s.ok()); + assert(value == "zzz"); + + // Commit transaction + s = txn->Commit(); + assert(s.ok()); + delete txn; + + // Value is committed, can be read now. + s = txn_db->Get(read_options, "abc", &value); + assert(s.ok()); + assert(value == "def"); + + //////////////////////////////////////////////////////// + // + // "Repeatable Read" (Snapshot Isolation) Example + // -- Using a single Snapshot + // + //////////////////////////////////////////////////////// + + // Set a snapshot at start of transaction by setting set_snapshot=true + txn_options.set_snapshot = true; + txn = txn_db->BeginTransaction(write_options, txn_options); + + const Snapshot* snapshot = txn->GetSnapshot(); + + // Write a key OUTSIDE of transaction + s = txn_db->Put(write_options, "abc", "xyz"); + assert(s.ok()); + + // Read the latest committed value. + s = txn->Get(read_options, "abc", &value); + assert(s.ok()); + assert(value == "xyz"); + + // Read the snapshotted value. + read_options.snapshot = snapshot; + s = txn->Get(read_options, "abc", &value); + assert(s.ok()); + assert(value == "def"); + + // Attempt to read a key using the snapshot. This will fail since + // the previous write outside this txn conflicts with this read. + s = txn->GetForUpdate(read_options, "abc", &value); + assert(s.IsBusy()); + + txn->Rollback(); + + // Snapshot will be released upon deleting the transaction. + delete txn; + // Clear snapshot from read options since it is no longer valid + read_options.snapshot = nullptr; + snapshot = nullptr; + + //////////////////////////////////////////////////////// + // + // "Read Committed" (Monotonic Atomic Views) Example + // --Using multiple Snapshots + // + //////////////////////////////////////////////////////// + + // In this example, we set the snapshot multiple times. This is probably + // only necessary if you have very strict isolation requirements to + // implement. + + // Set a snapshot at start of transaction + txn_options.set_snapshot = true; + txn = txn_db->BeginTransaction(write_options, txn_options); + + // Do some reads and writes to key "x" + read_options.snapshot = txn_db->GetSnapshot(); + s = txn->Get(read_options, "x", &value); + assert(s.IsNotFound()); + s = txn->Put("x", "x"); + assert(s.ok()); + + // Do a write outside of the transaction to key "y" + s = txn_db->Put(write_options, "y", "y1"); + assert(s.ok()); + + // Set a new snapshot in the transaction + txn->SetSnapshot(); + txn->SetSavePoint(); + read_options.snapshot = txn_db->GetSnapshot(); + + // Do some reads and writes to key "y" + // Since the snapshot was advanced, the write done outside of the + // transaction does not conflict. + s = txn->GetForUpdate(read_options, "y", &value); + assert(s.ok()); + assert(value == "y1"); + s = txn->Put("y", "y2"); + assert(s.ok()); + + // Decide we want to revert the last write from this transaction. + txn->RollbackToSavePoint(); + + // Commit. + s = txn->Commit(); + assert(s.ok()); + delete txn; + // Clear snapshot from read options since it is no longer valid + read_options.snapshot = nullptr; + + // db state is at the save point. + s = txn_db->Get(read_options, "x", &value); + assert(s.ok()); + assert(value == "x"); + + s = txn_db->Get(read_options, "y", &value); + assert(s.ok()); + assert(value == "y1"); + + // Cleanup + delete txn_db; + DestroyDB(kDBPath, options); + return 0; +} + +#endif // ROCKSDB_LITE -- cgit v1.2.3