summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/examples/compact_files_example.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/examples/compact_files_example.cc')
-rw-r--r--src/rocksdb/examples/compact_files_example.cc171
1 files changed, 171 insertions, 0 deletions
diff --git a/src/rocksdb/examples/compact_files_example.cc b/src/rocksdb/examples/compact_files_example.cc
new file mode 100644
index 00000000..c27df8ee
--- /dev/null
+++ b/src/rocksdb/examples/compact_files_example.cc
@@ -0,0 +1,171 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// An example code demonstrating how to use CompactFiles, EventListener,
+// and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
+
+#include <mutex>
+#include <string>
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+
+using namespace rocksdb;
+std::string kDBPath = "/tmp/rocksdb_compact_files_example";
+struct CompactionTask;
+
+// This is an example interface of external-compaction algorithm.
+// Compaction algorithm can be implemented outside the core-RocksDB
+// code by using the pluggable compaction APIs that RocksDb provides.
+class Compactor : public EventListener {
+ public:
+ // Picks and returns a compaction task given the specified DB
+ // and column family. It is the caller's responsibility to
+ // destroy the returned CompactionTask. Returns "nullptr"
+ // if it cannot find a proper compaction task.
+ virtual CompactionTask* PickCompaction(
+ DB* db, const std::string& cf_name) = 0;
+
+ // Schedule and run the specified compaction task in background.
+ virtual void ScheduleCompaction(CompactionTask *task) = 0;
+};
+
+// Example structure that describes a compaction task.
+struct CompactionTask {
+ CompactionTask(
+ DB* _db, Compactor* _compactor,
+ const std::string& _column_family_name,
+ const std::vector<std::string>& _input_file_names,
+ const int _output_level,
+ const CompactionOptions& _compact_options,
+ bool _retry_on_fail)
+ : db(_db),
+ compactor(_compactor),
+ column_family_name(_column_family_name),
+ input_file_names(_input_file_names),
+ output_level(_output_level),
+ compact_options(_compact_options),
+ retry_on_fail(_retry_on_fail) {}
+ DB* db;
+ Compactor* compactor;
+ const std::string& column_family_name;
+ std::vector<std::string> input_file_names;
+ int output_level;
+ CompactionOptions compact_options;
+ bool retry_on_fail;
+};
+
+// A simple compaction algorithm that always compacts everything
+// to the highest level whenever possible.
+class FullCompactor : public Compactor {
+ public:
+ explicit FullCompactor(const Options options) : options_(options) {
+ compact_options_.compression = options_.compression;
+ compact_options_.output_file_size_limit =
+ options_.target_file_size_base;
+ }
+
+ // When flush happens, it determines whether to trigger compaction. If
+ // triggered_writes_stop is true, it will also set the retry flag of
+ // compaction-task to true.
+ void OnFlushCompleted(
+ DB* db, const FlushJobInfo& info) override {
+ CompactionTask* task = PickCompaction(db, info.cf_name);
+ if (task != nullptr) {
+ if (info.triggered_writes_stop) {
+ task->retry_on_fail = true;
+ }
+ // Schedule compaction in a different thread.
+ ScheduleCompaction(task);
+ }
+ }
+
+ // Always pick a compaction which includes all files whenever possible.
+ CompactionTask* PickCompaction(
+ DB* db, const std::string& cf_name) override {
+ ColumnFamilyMetaData cf_meta;
+ db->GetColumnFamilyMetaData(&cf_meta);
+
+ std::vector<std::string> input_file_names;
+ for (auto level : cf_meta.levels) {
+ for (auto file : level.files) {
+ if (file.being_compacted) {
+ return nullptr;
+ }
+ input_file_names.push_back(file.name);
+ }
+ }
+ return new CompactionTask(
+ db, this, cf_name, input_file_names,
+ options_.num_levels - 1, compact_options_, false);
+ }
+
+ // Schedule the specified compaction task in background.
+ void ScheduleCompaction(CompactionTask* task) override {
+ options_.env->Schedule(&FullCompactor::CompactFiles, task);
+ }
+
+ static void CompactFiles(void* arg) {
+ std::unique_ptr<CompactionTask> task(
+ reinterpret_cast<CompactionTask*>(arg));
+ assert(task);
+ assert(task->db);
+ Status s = task->db->CompactFiles(
+ task->compact_options,
+ task->input_file_names,
+ task->output_level);
+ printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
+ if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
+ // If a compaction task with its retry_on_fail=true failed,
+ // try to schedule another compaction in case the reason
+ // is not an IO error.
+ CompactionTask* new_task = task->compactor->PickCompaction(
+ task->db, task->column_family_name);
+ task->compactor->ScheduleCompaction(new_task);
+ }
+ }
+
+ private:
+ Options options_;
+ CompactionOptions compact_options_;
+};
+
+int main() {
+ Options options;
+ options.create_if_missing = true;
+ // Disable RocksDB background compaction.
+ options.compaction_style = kCompactionStyleNone;
+ // Small slowdown and stop trigger for experimental purpose.
+ options.level0_slowdown_writes_trigger = 3;
+ options.level0_stop_writes_trigger = 5;
+ options.IncreaseParallelism(5);
+ options.listeners.emplace_back(new FullCompactor(options));
+
+ DB* db = nullptr;
+ DestroyDB(kDBPath, options);
+ Status s = DB::Open(options, kDBPath, &db);
+ assert(s.ok());
+ assert(db);
+
+ // if background compaction is not working, write will stall
+ // because of options.level0_stop_writes_trigger
+ for (int i = 1000; i < 99999; ++i) {
+ db->Put(WriteOptions(), std::to_string(i),
+ std::string(500, 'a' + (i % 26)));
+ }
+
+ // verify the values are still there
+ std::string value;
+ for (int i = 1000; i < 99999; ++i) {
+ db->Get(ReadOptions(), std::to_string(i),
+ &value);
+ assert(value == std::string(500, 'a' + (i % 26)));
+ }
+
+ // close the db.
+ delete db;
+
+ return 0;
+}