summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/table_properties_collectors
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.cc227
-rw-r--r--src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.h70
-rw-r--r--src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc245
3 files changed, 542 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.cc b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.cc
new file mode 100644
index 000000000..16f33934d
--- /dev/null
+++ b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.cc
@@ -0,0 +1,227 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "utilities/table_properties_collectors/compact_on_deletion_collector.h"
+
+#include <memory>
+#include <sstream>
+
+#include "rocksdb/utilities/customizable_util.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "rocksdb/utilities/options_type.h"
+#include "rocksdb/utilities/table_properties_collectors.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+#ifndef ROCKSDB_LITE
+
+CompactOnDeletionCollector::CompactOnDeletionCollector(
+ size_t sliding_window_size, size_t deletion_trigger, double deletion_ratio)
+ : bucket_size_((sliding_window_size + kNumBuckets - 1) / kNumBuckets),
+ current_bucket_(0),
+ num_keys_in_current_bucket_(0),
+ num_deletions_in_observation_window_(0),
+ deletion_trigger_(deletion_trigger),
+ deletion_ratio_(deletion_ratio),
+ deletion_ratio_enabled_(deletion_ratio > 0 && deletion_ratio <= 1),
+ need_compaction_(false),
+ finished_(false) {
+ memset(num_deletions_in_buckets_, 0, sizeof(size_t) * kNumBuckets);
+}
+
+// AddUserKey() will be called when a new key/value pair is inserted into the
+// table.
+// @params key the user key that is inserted into the table.
+// @params value the value that is inserted into the table.
+// @params file_size file size up to now
+Status CompactOnDeletionCollector::AddUserKey(const Slice& /*key*/,
+ const Slice& /*value*/,
+ EntryType type,
+ SequenceNumber /*seq*/,
+ uint64_t /*file_size*/) {
+ assert(!finished_);
+ if (!bucket_size_ && !deletion_ratio_enabled_) {
+ // This collector is effectively disabled
+ return Status::OK();
+ }
+
+ if (need_compaction_) {
+ // If the output file already needs to be compacted, skip the check.
+ return Status::OK();
+ }
+
+ if (deletion_ratio_enabled_) {
+ total_entries_++;
+ if (type == kEntryDelete) {
+ deletion_entries_++;
+ }
+ }
+
+ if (bucket_size_) {
+ if (num_keys_in_current_bucket_ == bucket_size_) {
+ // When the current bucket is full, advance the cursor of the
+ // ring buffer to the next bucket.
+ current_bucket_ = (current_bucket_ + 1) % kNumBuckets;
+
+ // Update the current count of observed deletion keys by excluding
+ // the number of deletion keys in the oldest bucket in the
+ // observation window.
+ assert(num_deletions_in_observation_window_ >=
+ num_deletions_in_buckets_[current_bucket_]);
+ num_deletions_in_observation_window_ -=
+ num_deletions_in_buckets_[current_bucket_];
+ num_deletions_in_buckets_[current_bucket_] = 0;
+ num_keys_in_current_bucket_ = 0;
+ }
+
+ num_keys_in_current_bucket_++;
+ if (type == kEntryDelete) {
+ num_deletions_in_observation_window_++;
+ num_deletions_in_buckets_[current_bucket_]++;
+ if (num_deletions_in_observation_window_ >= deletion_trigger_) {
+ need_compaction_ = true;
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+Status CompactOnDeletionCollector::Finish(
+ UserCollectedProperties* /*properties*/) {
+ if (!need_compaction_ && deletion_ratio_enabled_ && total_entries_ > 0) {
+ double ratio = static_cast<double>(deletion_entries_) / total_entries_;
+ need_compaction_ = ratio >= deletion_ratio_;
+ }
+ finished_ = true;
+ return Status::OK();
+}
+static std::unordered_map<std::string, OptionTypeInfo>
+ on_deletion_collector_type_info = {
+#ifndef ROCKSDB_LITE
+ {"window_size",
+ {0, OptionType::kUnknown, OptionVerificationType::kNormal,
+ OptionTypeFlags::kCompareNever | OptionTypeFlags::kMutable,
+ [](const ConfigOptions&, const std::string&, const std::string& value,
+ void* addr) {
+ auto* factory =
+ static_cast<CompactOnDeletionCollectorFactory*>(addr);
+ factory->SetWindowSize(ParseSizeT(value));
+ return Status::OK();
+ },
+ [](const ConfigOptions&, const std::string&, const void* addr,
+ std::string* value) {
+ const auto* factory =
+ static_cast<const CompactOnDeletionCollectorFactory*>(addr);
+ *value = std::to_string(factory->GetWindowSize());
+ return Status::OK();
+ },
+ nullptr}},
+ {"deletion_trigger",
+ {0, OptionType::kUnknown, OptionVerificationType::kNormal,
+ OptionTypeFlags::kCompareNever | OptionTypeFlags::kMutable,
+ [](const ConfigOptions&, const std::string&, const std::string& value,
+ void* addr) {
+ auto* factory =
+ static_cast<CompactOnDeletionCollectorFactory*>(addr);
+ factory->SetDeletionTrigger(ParseSizeT(value));
+ return Status::OK();
+ },
+ [](const ConfigOptions&, const std::string&, const void* addr,
+ std::string* value) {
+ const auto* factory =
+ static_cast<const CompactOnDeletionCollectorFactory*>(addr);
+ *value = std::to_string(factory->GetDeletionTrigger());
+ return Status::OK();
+ },
+ nullptr}},
+ {"deletion_ratio",
+ {0, OptionType::kUnknown, OptionVerificationType::kNormal,
+ OptionTypeFlags::kCompareNever | OptionTypeFlags::kMutable,
+ [](const ConfigOptions&, const std::string&, const std::string& value,
+ void* addr) {
+ auto* factory =
+ static_cast<CompactOnDeletionCollectorFactory*>(addr);
+ factory->SetDeletionRatio(ParseDouble(value));
+ return Status::OK();
+ },
+ [](const ConfigOptions&, const std::string&, const void* addr,
+ std::string* value) {
+ const auto* factory =
+ static_cast<const CompactOnDeletionCollectorFactory*>(addr);
+ *value = std::to_string(factory->GetDeletionRatio());
+ return Status::OK();
+ },
+ nullptr}},
+
+#endif // ROCKSDB_LITE
+};
+
+CompactOnDeletionCollectorFactory::CompactOnDeletionCollectorFactory(
+ size_t sliding_window_size, size_t deletion_trigger, double deletion_ratio)
+ : sliding_window_size_(sliding_window_size),
+ deletion_trigger_(deletion_trigger),
+ deletion_ratio_(deletion_ratio) {
+ RegisterOptions("", this, &on_deletion_collector_type_info);
+}
+
+TablePropertiesCollector*
+CompactOnDeletionCollectorFactory::CreateTablePropertiesCollector(
+ TablePropertiesCollectorFactory::Context /*context*/) {
+ return new CompactOnDeletionCollector(sliding_window_size_.load(),
+ deletion_trigger_.load(),
+ deletion_ratio_.load());
+}
+
+std::string CompactOnDeletionCollectorFactory::ToString() const {
+ std::ostringstream cfg;
+ cfg << Name() << " (Sliding window size = " << sliding_window_size_.load()
+ << " Deletion trigger = " << deletion_trigger_.load()
+ << " Deletion ratio = " << deletion_ratio_.load() << ')';
+ return cfg.str();
+}
+
+std::shared_ptr<CompactOnDeletionCollectorFactory>
+NewCompactOnDeletionCollectorFactory(size_t sliding_window_size,
+ size_t deletion_trigger,
+ double deletion_ratio) {
+ return std::shared_ptr<CompactOnDeletionCollectorFactory>(
+ new CompactOnDeletionCollectorFactory(sliding_window_size,
+ deletion_trigger, deletion_ratio));
+}
+namespace {
+static int RegisterTablePropertiesCollectorFactories(
+ ObjectLibrary& library, const std::string& /*arg*/) {
+ library.AddFactory<TablePropertiesCollectorFactory>(
+ CompactOnDeletionCollectorFactory::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<TablePropertiesCollectorFactory>* guard,
+ std::string* /* errmsg */) {
+ // By default, create a CompactionOnDeletionCollector that is disabled.
+ // Users will need to provide configuration parameters or call the
+ // corresponding Setter to enable the factory.
+ guard->reset(new CompactOnDeletionCollectorFactory(0, 0, 0));
+ return guard->get();
+ });
+ return 1;
+}
+} // namespace
+#endif // !ROCKSDB_LITE
+
+Status TablePropertiesCollectorFactory::CreateFromString(
+ const ConfigOptions& options, const std::string& value,
+ std::shared_ptr<TablePropertiesCollectorFactory>* result) {
+#ifndef ROCKSDB_LITE
+ static std::once_flag once;
+ std::call_once(once, [&]() {
+ RegisterTablePropertiesCollectorFactories(*(ObjectLibrary::Default().get()),
+ "");
+ });
+#endif // ROCKSDB_LITE
+ return LoadSharedObject<TablePropertiesCollectorFactory>(options, value,
+ nullptr, result);
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.h b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.h
new file mode 100644
index 000000000..2f7dc4f1b
--- /dev/null
+++ b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.h
@@ -0,0 +1,70 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+#include "rocksdb/utilities/table_properties_collectors.h"
+namespace ROCKSDB_NAMESPACE {
+
+class CompactOnDeletionCollector : public TablePropertiesCollector {
+ public:
+ CompactOnDeletionCollector(size_t sliding_window_size,
+ size_t deletion_trigger, double deletion_raatio);
+
+ // AddUserKey() will be called when a new key/value pair is inserted into the
+ // table.
+ // @params key the user key that is inserted into the table.
+ // @params value the value that is inserted into the table.
+ // @params file_size file size up to now
+ virtual Status AddUserKey(const Slice& key, const Slice& value,
+ EntryType type, SequenceNumber seq,
+ uint64_t file_size) override;
+
+ // Finish() will be called when a table has already been built and is ready
+ // for writing the properties block.
+ // @params properties User will add their collected statistics to
+ // `properties`.
+ virtual Status Finish(UserCollectedProperties* /*properties*/) override;
+
+ // Return the human-readable properties, where the key is property name and
+ // the value is the human-readable form of value.
+ virtual UserCollectedProperties GetReadableProperties() const override {
+ return UserCollectedProperties();
+ }
+
+ // The name of the properties collector can be used for debugging purpose.
+ virtual const char* Name() const override {
+ return "CompactOnDeletionCollector";
+ }
+
+ // EXPERIMENTAL Return whether the output file should be further compacted
+ virtual bool NeedCompact() const override { return need_compaction_; }
+
+ static const int kNumBuckets = 128;
+
+ private:
+ void Reset();
+
+ // A ring buffer that used to count the number of deletion entries for every
+ // "bucket_size_" keys.
+ size_t num_deletions_in_buckets_[kNumBuckets];
+ // the number of keys in a bucket
+ size_t bucket_size_;
+
+ size_t current_bucket_;
+ size_t num_keys_in_current_bucket_;
+ size_t num_deletions_in_observation_window_;
+ size_t deletion_trigger_;
+ const double deletion_ratio_;
+ const bool deletion_ratio_enabled_;
+ size_t total_entries_ = 0;
+ size_t deletion_entries_ = 0;
+ // true if the current SST file needs to be compacted.
+ bool need_compaction_;
+ bool finished_;
+};
+} // namespace ROCKSDB_NAMESPACE
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc
new file mode 100644
index 000000000..88aeb8d5c
--- /dev/null
+++ b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc
@@ -0,0 +1,245 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include <stdio.h>
+
+#ifndef ROCKSDB_LITE
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include "port/stack_trace.h"
+#include "rocksdb/table.h"
+#include "rocksdb/table_properties.h"
+#include "rocksdb/utilities/table_properties_collectors.h"
+#include "test_util/testharness.h"
+#include "util/random.h"
+#include "utilities/table_properties_collectors/compact_on_deletion_collector.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+TEST(CompactOnDeletionCollector, DeletionRatio) {
+ TablePropertiesCollectorFactory::Context context;
+ context.column_family_id =
+ TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
+ const size_t kTotalEntries = 100;
+
+ {
+ // Disable deletion ratio.
+ for (double deletion_ratio : {-1.5, -1.0, 0.0, 1.5, 2.0}) {
+ auto factory = NewCompactOnDeletionCollectorFactory(0, 0, deletion_ratio);
+ std::unique_ptr<TablePropertiesCollector> collector(
+ factory->CreateTablePropertiesCollector(context));
+ for (size_t i = 0; i < kTotalEntries; i++) {
+ // All entries are deletion entries.
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0));
+ ASSERT_FALSE(collector->NeedCompact());
+ }
+ ASSERT_OK(collector->Finish(nullptr));
+ ASSERT_FALSE(collector->NeedCompact());
+ }
+ }
+
+ {
+ for (double deletion_ratio : {0.3, 0.5, 0.8, 1.0}) {
+ auto factory = NewCompactOnDeletionCollectorFactory(0, 0, deletion_ratio);
+ const size_t deletion_entries_trigger =
+ static_cast<size_t>(deletion_ratio * kTotalEntries);
+ for (int delta : {-1, 0, 1}) {
+ // Actual deletion entry ratio <, =, > deletion_ratio
+ size_t actual_deletion_entries = deletion_entries_trigger + delta;
+ std::unique_ptr<TablePropertiesCollector> collector(
+ factory->CreateTablePropertiesCollector(context));
+ for (size_t i = 0; i < kTotalEntries; i++) {
+ if (i < actual_deletion_entries) {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0));
+ } else {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0));
+ }
+ ASSERT_FALSE(collector->NeedCompact());
+ }
+ ASSERT_OK(collector->Finish(nullptr));
+ if (delta >= 0) {
+ // >= deletion_ratio
+ ASSERT_TRUE(collector->NeedCompact());
+ } else {
+ ASSERT_FALSE(collector->NeedCompact());
+ }
+ }
+ }
+ }
+}
+
+TEST(CompactOnDeletionCollector, SlidingWindow) {
+ const int kWindowSizes[] = {1000, 10000, 10000, 127, 128, 129,
+ 255, 256, 257, 2, 10000};
+ const int kDeletionTriggers[] = {500, 9500, 4323, 47, 61, 128,
+ 250, 250, 250, 2, 2};
+ TablePropertiesCollectorFactory::Context context;
+ context.column_family_id =
+ TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
+
+ std::vector<int> window_sizes;
+ std::vector<int> deletion_triggers;
+ // deterministic tests
+ for (int test = 0; test < 9; ++test) {
+ window_sizes.emplace_back(kWindowSizes[test]);
+ deletion_triggers.emplace_back(kDeletionTriggers[test]);
+ }
+
+ // randomize tests
+ Random rnd(301);
+ const int kMaxTestSize = 100000l;
+ for (int random_test = 0; random_test < 10; random_test++) {
+ int window_size = rnd.Uniform(kMaxTestSize) + 1;
+ int deletion_trigger = rnd.Uniform(window_size);
+ window_sizes.emplace_back(window_size);
+ deletion_triggers.emplace_back(deletion_trigger);
+ }
+
+ assert(window_sizes.size() == deletion_triggers.size());
+
+ for (size_t test = 0; test < window_sizes.size(); ++test) {
+ const int kBucketSize = 128;
+ const int kWindowSize = window_sizes[test];
+ const int kPaddedWindowSize =
+ kBucketSize * ((window_sizes[test] + kBucketSize - 1) / kBucketSize);
+ const int kNumDeletionTrigger = deletion_triggers[test];
+ const int kBias = (kNumDeletionTrigger + kBucketSize - 1) / kBucketSize;
+ // Simple test
+ {
+ auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize,
+ kNumDeletionTrigger);
+ const int kSample = 10;
+ for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
+ std::unique_ptr<TablePropertiesCollector> collector(
+ factory->CreateTablePropertiesCollector(context));
+ int deletions = 0;
+ for (int i = 0; i < kPaddedWindowSize; ++i) {
+ if (i % kSample < delete_rate) {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0));
+ deletions++;
+ } else {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0));
+ }
+ }
+ if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) &&
+ std::abs(deletions - kNumDeletionTrigger) > kBias) {
+ fprintf(stderr,
+ "[Error] collector->NeedCompact() != (%d >= %d)"
+ " with kWindowSize = %d and kNumDeletionTrigger = %d\n",
+ deletions, kNumDeletionTrigger, kWindowSize,
+ kNumDeletionTrigger);
+ ASSERT_TRUE(false);
+ }
+ ASSERT_OK(collector->Finish(nullptr));
+ }
+ }
+
+ // Only one section of a file satisfies the compaction trigger
+ {
+ auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize,
+ kNumDeletionTrigger);
+ const int kSample = 10;
+ for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
+ std::unique_ptr<TablePropertiesCollector> collector(
+ factory->CreateTablePropertiesCollector(context));
+ int deletions = 0;
+ for (int section = 0; section < 5; ++section) {
+ int initial_entries = rnd.Uniform(kWindowSize) + kWindowSize;
+ for (int i = 0; i < initial_entries; ++i) {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0));
+ }
+ }
+ for (int i = 0; i < kPaddedWindowSize; ++i) {
+ if (i % kSample < delete_rate) {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0));
+ deletions++;
+ } else {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0));
+ }
+ }
+ for (int section = 0; section < 5; ++section) {
+ int ending_entries = rnd.Uniform(kWindowSize) + kWindowSize;
+ for (int i = 0; i < ending_entries; ++i) {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0));
+ }
+ }
+ if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) &&
+ std::abs(deletions - kNumDeletionTrigger) > kBias) {
+ fprintf(stderr,
+ "[Error] collector->NeedCompact() %d != (%d >= %d)"
+ " with kWindowSize = %d, kNumDeletionTrigger = %d\n",
+ collector->NeedCompact(), deletions, kNumDeletionTrigger,
+ kWindowSize, kNumDeletionTrigger);
+ ASSERT_TRUE(false);
+ }
+ ASSERT_OK(collector->Finish(nullptr));
+ }
+ }
+
+ // TEST 3: Issues a lots of deletes, but their density is not
+ // high enough to trigger compaction.
+ {
+ std::unique_ptr<TablePropertiesCollector> collector;
+ auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize,
+ kNumDeletionTrigger);
+ collector.reset(factory->CreateTablePropertiesCollector(context));
+ assert(collector->NeedCompact() == false);
+ // Insert "kNumDeletionTrigger * 0.95" deletions for every
+ // "kWindowSize" and verify compaction is not needed.
+ const int kDeletionsPerSection = kNumDeletionTrigger * 95 / 100;
+ if (kDeletionsPerSection >= 0) {
+ for (int section = 0; section < 200; ++section) {
+ for (int i = 0; i < kPaddedWindowSize; ++i) {
+ if (i < kDeletionsPerSection) {
+ ASSERT_OK(collector->AddUserKey("hello", "rocksdb", kEntryDelete,
+ 0, 0));
+ } else {
+ ASSERT_OK(
+ collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0));
+ }
+ }
+ }
+ if (collector->NeedCompact() &&
+ std::abs(kDeletionsPerSection - kNumDeletionTrigger) > kBias) {
+ fprintf(stderr,
+ "[Error] collector->NeedCompact() != false"
+ " with kWindowSize = %d and kNumDeletionTrigger = %d\n",
+ kWindowSize, kNumDeletionTrigger);
+ ASSERT_TRUE(false);
+ }
+ ASSERT_OK(collector->Finish(nullptr));
+ }
+ }
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+#else
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr, "SKIPPED as RocksDBLite does not include utilities.\n");
+ return 0;
+}
+#endif // !ROCKSDB_LITE