diff options
Diffstat (limited to '')
3 files changed, 542 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.cc b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.cc new file mode 100644 index 000000000..16f33934d --- /dev/null +++ b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.cc @@ -0,0 +1,227 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/table_properties_collectors/compact_on_deletion_collector.h" + +#include <memory> +#include <sstream> + +#include "rocksdb/utilities/customizable_util.h" +#include "rocksdb/utilities/object_registry.h" +#include "rocksdb/utilities/options_type.h" +#include "rocksdb/utilities/table_properties_collectors.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +#ifndef ROCKSDB_LITE + +CompactOnDeletionCollector::CompactOnDeletionCollector( + size_t sliding_window_size, size_t deletion_trigger, double deletion_ratio) + : bucket_size_((sliding_window_size + kNumBuckets - 1) / kNumBuckets), + current_bucket_(0), + num_keys_in_current_bucket_(0), + num_deletions_in_observation_window_(0), + deletion_trigger_(deletion_trigger), + deletion_ratio_(deletion_ratio), + deletion_ratio_enabled_(deletion_ratio > 0 && deletion_ratio <= 1), + need_compaction_(false), + finished_(false) { + memset(num_deletions_in_buckets_, 0, sizeof(size_t) * kNumBuckets); +} + +// AddUserKey() will be called when a new key/value pair is inserted into the +// table. +// @params key the user key that is inserted into the table. +// @params value the value that is inserted into the table. +// @params file_size file size up to now +Status CompactOnDeletionCollector::AddUserKey(const Slice& /*key*/, + const Slice& /*value*/, + EntryType type, + SequenceNumber /*seq*/, + uint64_t /*file_size*/) { + assert(!finished_); + if (!bucket_size_ && !deletion_ratio_enabled_) { + // This collector is effectively disabled + return Status::OK(); + } + + if (need_compaction_) { + // If the output file already needs to be compacted, skip the check. + return Status::OK(); + } + + if (deletion_ratio_enabled_) { + total_entries_++; + if (type == kEntryDelete) { + deletion_entries_++; + } + } + + if (bucket_size_) { + if (num_keys_in_current_bucket_ == bucket_size_) { + // When the current bucket is full, advance the cursor of the + // ring buffer to the next bucket. + current_bucket_ = (current_bucket_ + 1) % kNumBuckets; + + // Update the current count of observed deletion keys by excluding + // the number of deletion keys in the oldest bucket in the + // observation window. + assert(num_deletions_in_observation_window_ >= + num_deletions_in_buckets_[current_bucket_]); + num_deletions_in_observation_window_ -= + num_deletions_in_buckets_[current_bucket_]; + num_deletions_in_buckets_[current_bucket_] = 0; + num_keys_in_current_bucket_ = 0; + } + + num_keys_in_current_bucket_++; + if (type == kEntryDelete) { + num_deletions_in_observation_window_++; + num_deletions_in_buckets_[current_bucket_]++; + if (num_deletions_in_observation_window_ >= deletion_trigger_) { + need_compaction_ = true; + } + } + } + + return Status::OK(); +} + +Status CompactOnDeletionCollector::Finish( + UserCollectedProperties* /*properties*/) { + if (!need_compaction_ && deletion_ratio_enabled_ && total_entries_ > 0) { + double ratio = static_cast<double>(deletion_entries_) / total_entries_; + need_compaction_ = ratio >= deletion_ratio_; + } + finished_ = true; + return Status::OK(); +} +static std::unordered_map<std::string, OptionTypeInfo> + on_deletion_collector_type_info = { +#ifndef ROCKSDB_LITE + {"window_size", + {0, OptionType::kUnknown, OptionVerificationType::kNormal, + OptionTypeFlags::kCompareNever | OptionTypeFlags::kMutable, + [](const ConfigOptions&, const std::string&, const std::string& value, + void* addr) { + auto* factory = + static_cast<CompactOnDeletionCollectorFactory*>(addr); + factory->SetWindowSize(ParseSizeT(value)); + return Status::OK(); + }, + [](const ConfigOptions&, const std::string&, const void* addr, + std::string* value) { + const auto* factory = + static_cast<const CompactOnDeletionCollectorFactory*>(addr); + *value = std::to_string(factory->GetWindowSize()); + return Status::OK(); + }, + nullptr}}, + {"deletion_trigger", + {0, OptionType::kUnknown, OptionVerificationType::kNormal, + OptionTypeFlags::kCompareNever | OptionTypeFlags::kMutable, + [](const ConfigOptions&, const std::string&, const std::string& value, + void* addr) { + auto* factory = + static_cast<CompactOnDeletionCollectorFactory*>(addr); + factory->SetDeletionTrigger(ParseSizeT(value)); + return Status::OK(); + }, + [](const ConfigOptions&, const std::string&, const void* addr, + std::string* value) { + const auto* factory = + static_cast<const CompactOnDeletionCollectorFactory*>(addr); + *value = std::to_string(factory->GetDeletionTrigger()); + return Status::OK(); + }, + nullptr}}, + {"deletion_ratio", + {0, OptionType::kUnknown, OptionVerificationType::kNormal, + OptionTypeFlags::kCompareNever | OptionTypeFlags::kMutable, + [](const ConfigOptions&, const std::string&, const std::string& value, + void* addr) { + auto* factory = + static_cast<CompactOnDeletionCollectorFactory*>(addr); + factory->SetDeletionRatio(ParseDouble(value)); + return Status::OK(); + }, + [](const ConfigOptions&, const std::string&, const void* addr, + std::string* value) { + const auto* factory = + static_cast<const CompactOnDeletionCollectorFactory*>(addr); + *value = std::to_string(factory->GetDeletionRatio()); + return Status::OK(); + }, + nullptr}}, + +#endif // ROCKSDB_LITE +}; + +CompactOnDeletionCollectorFactory::CompactOnDeletionCollectorFactory( + size_t sliding_window_size, size_t deletion_trigger, double deletion_ratio) + : sliding_window_size_(sliding_window_size), + deletion_trigger_(deletion_trigger), + deletion_ratio_(deletion_ratio) { + RegisterOptions("", this, &on_deletion_collector_type_info); +} + +TablePropertiesCollector* +CompactOnDeletionCollectorFactory::CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context /*context*/) { + return new CompactOnDeletionCollector(sliding_window_size_.load(), + deletion_trigger_.load(), + deletion_ratio_.load()); +} + +std::string CompactOnDeletionCollectorFactory::ToString() const { + std::ostringstream cfg; + cfg << Name() << " (Sliding window size = " << sliding_window_size_.load() + << " Deletion trigger = " << deletion_trigger_.load() + << " Deletion ratio = " << deletion_ratio_.load() << ')'; + return cfg.str(); +} + +std::shared_ptr<CompactOnDeletionCollectorFactory> +NewCompactOnDeletionCollectorFactory(size_t sliding_window_size, + size_t deletion_trigger, + double deletion_ratio) { + return std::shared_ptr<CompactOnDeletionCollectorFactory>( + new CompactOnDeletionCollectorFactory(sliding_window_size, + deletion_trigger, deletion_ratio)); +} +namespace { +static int RegisterTablePropertiesCollectorFactories( + ObjectLibrary& library, const std::string& /*arg*/) { + library.AddFactory<TablePropertiesCollectorFactory>( + CompactOnDeletionCollectorFactory::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr<TablePropertiesCollectorFactory>* guard, + std::string* /* errmsg */) { + // By default, create a CompactionOnDeletionCollector that is disabled. + // Users will need to provide configuration parameters or call the + // corresponding Setter to enable the factory. + guard->reset(new CompactOnDeletionCollectorFactory(0, 0, 0)); + return guard->get(); + }); + return 1; +} +} // namespace +#endif // !ROCKSDB_LITE + +Status TablePropertiesCollectorFactory::CreateFromString( + const ConfigOptions& options, const std::string& value, + std::shared_ptr<TablePropertiesCollectorFactory>* result) { +#ifndef ROCKSDB_LITE + static std::once_flag once; + std::call_once(once, [&]() { + RegisterTablePropertiesCollectorFactories(*(ObjectLibrary::Default().get()), + ""); + }); +#endif // ROCKSDB_LITE + return LoadSharedObject<TablePropertiesCollectorFactory>(options, value, + nullptr, result); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.h b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.h new file mode 100644 index 000000000..2f7dc4f1b --- /dev/null +++ b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector.h @@ -0,0 +1,70 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE +#include "rocksdb/utilities/table_properties_collectors.h" +namespace ROCKSDB_NAMESPACE { + +class CompactOnDeletionCollector : public TablePropertiesCollector { + public: + CompactOnDeletionCollector(size_t sliding_window_size, + size_t deletion_trigger, double deletion_raatio); + + // AddUserKey() will be called when a new key/value pair is inserted into the + // table. + // @params key the user key that is inserted into the table. + // @params value the value that is inserted into the table. + // @params file_size file size up to now + virtual Status AddUserKey(const Slice& key, const Slice& value, + EntryType type, SequenceNumber seq, + uint64_t file_size) override; + + // Finish() will be called when a table has already been built and is ready + // for writing the properties block. + // @params properties User will add their collected statistics to + // `properties`. + virtual Status Finish(UserCollectedProperties* /*properties*/) override; + + // Return the human-readable properties, where the key is property name and + // the value is the human-readable form of value. + virtual UserCollectedProperties GetReadableProperties() const override { + return UserCollectedProperties(); + } + + // The name of the properties collector can be used for debugging purpose. + virtual const char* Name() const override { + return "CompactOnDeletionCollector"; + } + + // EXPERIMENTAL Return whether the output file should be further compacted + virtual bool NeedCompact() const override { return need_compaction_; } + + static const int kNumBuckets = 128; + + private: + void Reset(); + + // A ring buffer that used to count the number of deletion entries for every + // "bucket_size_" keys. + size_t num_deletions_in_buckets_[kNumBuckets]; + // the number of keys in a bucket + size_t bucket_size_; + + size_t current_bucket_; + size_t num_keys_in_current_bucket_; + size_t num_deletions_in_observation_window_; + size_t deletion_trigger_; + const double deletion_ratio_; + const bool deletion_ratio_enabled_; + size_t total_entries_ = 0; + size_t deletion_entries_ = 0; + // true if the current SST file needs to be compacted. + bool need_compaction_; + bool finished_; +}; +} // namespace ROCKSDB_NAMESPACE +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc new file mode 100644 index 000000000..88aeb8d5c --- /dev/null +++ b/src/rocksdb/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc @@ -0,0 +1,245 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include <stdio.h> + +#ifndef ROCKSDB_LITE +#include <algorithm> +#include <cmath> +#include <vector> + +#include "port/stack_trace.h" +#include "rocksdb/table.h" +#include "rocksdb/table_properties.h" +#include "rocksdb/utilities/table_properties_collectors.h" +#include "test_util/testharness.h" +#include "util/random.h" +#include "utilities/table_properties_collectors/compact_on_deletion_collector.h" + +namespace ROCKSDB_NAMESPACE { + +TEST(CompactOnDeletionCollector, DeletionRatio) { + TablePropertiesCollectorFactory::Context context; + context.column_family_id = + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; + const size_t kTotalEntries = 100; + + { + // Disable deletion ratio. + for (double deletion_ratio : {-1.5, -1.0, 0.0, 1.5, 2.0}) { + auto factory = NewCompactOnDeletionCollectorFactory(0, 0, deletion_ratio); + std::unique_ptr<TablePropertiesCollector> collector( + factory->CreateTablePropertiesCollector(context)); + for (size_t i = 0; i < kTotalEntries; i++) { + // All entries are deletion entries. + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0)); + ASSERT_FALSE(collector->NeedCompact()); + } + ASSERT_OK(collector->Finish(nullptr)); + ASSERT_FALSE(collector->NeedCompact()); + } + } + + { + for (double deletion_ratio : {0.3, 0.5, 0.8, 1.0}) { + auto factory = NewCompactOnDeletionCollectorFactory(0, 0, deletion_ratio); + const size_t deletion_entries_trigger = + static_cast<size_t>(deletion_ratio * kTotalEntries); + for (int delta : {-1, 0, 1}) { + // Actual deletion entry ratio <, =, > deletion_ratio + size_t actual_deletion_entries = deletion_entries_trigger + delta; + std::unique_ptr<TablePropertiesCollector> collector( + factory->CreateTablePropertiesCollector(context)); + for (size_t i = 0; i < kTotalEntries; i++) { + if (i < actual_deletion_entries) { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0)); + } else { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0)); + } + ASSERT_FALSE(collector->NeedCompact()); + } + ASSERT_OK(collector->Finish(nullptr)); + if (delta >= 0) { + // >= deletion_ratio + ASSERT_TRUE(collector->NeedCompact()); + } else { + ASSERT_FALSE(collector->NeedCompact()); + } + } + } + } +} + +TEST(CompactOnDeletionCollector, SlidingWindow) { + const int kWindowSizes[] = {1000, 10000, 10000, 127, 128, 129, + 255, 256, 257, 2, 10000}; + const int kDeletionTriggers[] = {500, 9500, 4323, 47, 61, 128, + 250, 250, 250, 2, 2}; + TablePropertiesCollectorFactory::Context context; + context.column_family_id = + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; + + std::vector<int> window_sizes; + std::vector<int> deletion_triggers; + // deterministic tests + for (int test = 0; test < 9; ++test) { + window_sizes.emplace_back(kWindowSizes[test]); + deletion_triggers.emplace_back(kDeletionTriggers[test]); + } + + // randomize tests + Random rnd(301); + const int kMaxTestSize = 100000l; + for (int random_test = 0; random_test < 10; random_test++) { + int window_size = rnd.Uniform(kMaxTestSize) + 1; + int deletion_trigger = rnd.Uniform(window_size); + window_sizes.emplace_back(window_size); + deletion_triggers.emplace_back(deletion_trigger); + } + + assert(window_sizes.size() == deletion_triggers.size()); + + for (size_t test = 0; test < window_sizes.size(); ++test) { + const int kBucketSize = 128; + const int kWindowSize = window_sizes[test]; + const int kPaddedWindowSize = + kBucketSize * ((window_sizes[test] + kBucketSize - 1) / kBucketSize); + const int kNumDeletionTrigger = deletion_triggers[test]; + const int kBias = (kNumDeletionTrigger + kBucketSize - 1) / kBucketSize; + // Simple test + { + auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize, + kNumDeletionTrigger); + const int kSample = 10; + for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) { + std::unique_ptr<TablePropertiesCollector> collector( + factory->CreateTablePropertiesCollector(context)); + int deletions = 0; + for (int i = 0; i < kPaddedWindowSize; ++i) { + if (i % kSample < delete_rate) { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0)); + deletions++; + } else { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0)); + } + } + if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) && + std::abs(deletions - kNumDeletionTrigger) > kBias) { + fprintf(stderr, + "[Error] collector->NeedCompact() != (%d >= %d)" + " with kWindowSize = %d and kNumDeletionTrigger = %d\n", + deletions, kNumDeletionTrigger, kWindowSize, + kNumDeletionTrigger); + ASSERT_TRUE(false); + } + ASSERT_OK(collector->Finish(nullptr)); + } + } + + // Only one section of a file satisfies the compaction trigger + { + auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize, + kNumDeletionTrigger); + const int kSample = 10; + for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) { + std::unique_ptr<TablePropertiesCollector> collector( + factory->CreateTablePropertiesCollector(context)); + int deletions = 0; + for (int section = 0; section < 5; ++section) { + int initial_entries = rnd.Uniform(kWindowSize) + kWindowSize; + for (int i = 0; i < initial_entries; ++i) { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0)); + } + } + for (int i = 0; i < kPaddedWindowSize; ++i) { + if (i % kSample < delete_rate) { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0)); + deletions++; + } else { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0)); + } + } + for (int section = 0; section < 5; ++section) { + int ending_entries = rnd.Uniform(kWindowSize) + kWindowSize; + for (int i = 0; i < ending_entries; ++i) { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0)); + } + } + if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) && + std::abs(deletions - kNumDeletionTrigger) > kBias) { + fprintf(stderr, + "[Error] collector->NeedCompact() %d != (%d >= %d)" + " with kWindowSize = %d, kNumDeletionTrigger = %d\n", + collector->NeedCompact(), deletions, kNumDeletionTrigger, + kWindowSize, kNumDeletionTrigger); + ASSERT_TRUE(false); + } + ASSERT_OK(collector->Finish(nullptr)); + } + } + + // TEST 3: Issues a lots of deletes, but their density is not + // high enough to trigger compaction. + { + std::unique_ptr<TablePropertiesCollector> collector; + auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize, + kNumDeletionTrigger); + collector.reset(factory->CreateTablePropertiesCollector(context)); + assert(collector->NeedCompact() == false); + // Insert "kNumDeletionTrigger * 0.95" deletions for every + // "kWindowSize" and verify compaction is not needed. + const int kDeletionsPerSection = kNumDeletionTrigger * 95 / 100; + if (kDeletionsPerSection >= 0) { + for (int section = 0; section < 200; ++section) { + for (int i = 0; i < kPaddedWindowSize; ++i) { + if (i < kDeletionsPerSection) { + ASSERT_OK(collector->AddUserKey("hello", "rocksdb", kEntryDelete, + 0, 0)); + } else { + ASSERT_OK( + collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0)); + } + } + } + if (collector->NeedCompact() && + std::abs(kDeletionsPerSection - kNumDeletionTrigger) > kBias) { + fprintf(stderr, + "[Error] collector->NeedCompact() != false" + " with kWindowSize = %d and kNumDeletionTrigger = %d\n", + kWindowSize, kNumDeletionTrigger); + ASSERT_TRUE(false); + } + ASSERT_OK(collector->Finish(nullptr)); + } + } + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} +#else +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as RocksDBLite does not include utilities.\n"); + return 0; +} +#endif // !ROCKSDB_LITE |