summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/db_compaction_filter_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/db_compaction_filter_test.cc')
-rw-r--r--src/rocksdb/db/db_compaction_filter_test.cc872
1 files changed, 872 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_compaction_filter_test.cc b/src/rocksdb/db/db_compaction_filter_test.cc
new file mode 100644
index 000000000..a708c0b1a
--- /dev/null
+++ b/src/rocksdb/db/db_compaction_filter_test.cc
@@ -0,0 +1,872 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/db_test_util.h"
+#include "port/stack_trace.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+static int cfilter_count = 0;
+static int cfilter_skips = 0;
+
+// This is a static filter used for filtering
+// kvs during the compaction process.
+static std::string NEW_VALUE = "NewValue";
+
+class DBTestCompactionFilter : public DBTestBase {
+ public:
+ DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {}
+};
+
+// Param variant of DBTestBase::ChangeCompactOptions
+class DBTestCompactionFilterWithCompactParam
+ : public DBTestCompactionFilter,
+ public ::testing::WithParamInterface<DBTestBase::OptionConfig> {
+ public:
+ DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() {
+ option_config_ = GetParam();
+ Destroy(last_options_);
+ auto options = CurrentOptions();
+ if (option_config_ == kDefault || option_config_ == kUniversalCompaction ||
+ option_config_ == kUniversalCompactionMultiLevel) {
+ options.create_if_missing = true;
+ }
+ if (option_config_ == kLevelSubcompactions ||
+ option_config_ == kUniversalSubcompactions) {
+ assert(options.max_subcompactions > 1);
+ }
+ TryReopen(options);
+ }
+};
+
+#ifndef ROCKSDB_VALGRIND_RUN
+INSTANTIATE_TEST_CASE_P(
+ CompactionFilterWithOption, DBTestCompactionFilterWithCompactParam,
+ ::testing::Values(DBTestBase::OptionConfig::kDefault,
+ DBTestBase::OptionConfig::kUniversalCompaction,
+ DBTestBase::OptionConfig::kUniversalCompactionMultiLevel,
+ DBTestBase::OptionConfig::kLevelSubcompactions,
+ DBTestBase::OptionConfig::kUniversalSubcompactions));
+#else
+// Run fewer cases in valgrind
+INSTANTIATE_TEST_CASE_P(CompactionFilterWithOption,
+ DBTestCompactionFilterWithCompactParam,
+ ::testing::Values(DBTestBase::OptionConfig::kDefault));
+#endif // ROCKSDB_VALGRIND_RUN
+
+class KeepFilter : public CompactionFilter {
+ public:
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ cfilter_count++;
+ return false;
+ }
+
+ const char* Name() const override { return "KeepFilter"; }
+};
+
+class DeleteFilter : public CompactionFilter {
+ public:
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ cfilter_count++;
+ return true;
+ }
+
+ const char* Name() const override { return "DeleteFilter"; }
+};
+
+class DeleteISFilter : public CompactionFilter {
+ public:
+ bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ cfilter_count++;
+ int i = std::stoi(key.ToString());
+ if (i > 5 && i <= 105) {
+ return true;
+ }
+ return false;
+ }
+
+ bool IgnoreSnapshots() const override { return true; }
+
+ const char* Name() const override { return "DeleteFilter"; }
+};
+
+// Skip x if floor(x/10) is even, use range skips. Requires that keys are
+// zero-padded to length 10.
+class SkipEvenFilter : public CompactionFilter {
+ public:
+ Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/,
+ const Slice& /*existing_value*/, std::string* /*new_value*/,
+ std::string* skip_until) const override {
+ cfilter_count++;
+ int i = std::stoi(key.ToString());
+ if (i / 10 % 2 == 0) {
+ char key_str[100];
+ snprintf(key_str, sizeof(key_str), "%010d", i / 10 * 10 + 10);
+ *skip_until = key_str;
+ ++cfilter_skips;
+ return Decision::kRemoveAndSkipUntil;
+ }
+ return Decision::kKeep;
+ }
+
+ bool IgnoreSnapshots() const override { return true; }
+
+ const char* Name() const override { return "DeleteFilter"; }
+};
+
+class DelayFilter : public CompactionFilter {
+ public:
+ explicit DelayFilter(DBTestBase* d) : db_test(d) {}
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ db_test->env_->addon_time_.fetch_add(1000);
+ return true;
+ }
+
+ const char* Name() const override { return "DelayFilter"; }
+
+ private:
+ DBTestBase* db_test;
+};
+
+class ConditionalFilter : public CompactionFilter {
+ public:
+ explicit ConditionalFilter(const std::string* filtered_value)
+ : filtered_value_(filtered_value) {}
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& value,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ return value.ToString() == *filtered_value_;
+ }
+
+ const char* Name() const override { return "ConditionalFilter"; }
+
+ private:
+ const std::string* filtered_value_;
+};
+
+class ChangeFilter : public CompactionFilter {
+ public:
+ explicit ChangeFilter() {}
+
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+ std::string* new_value, bool* value_changed) const override {
+ assert(new_value != nullptr);
+ *new_value = NEW_VALUE;
+ *value_changed = true;
+ return false;
+ }
+
+ const char* Name() const override { return "ChangeFilter"; }
+};
+
+class KeepFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit KeepFilterFactory(bool check_context = false,
+ bool check_context_cf_id = false)
+ : check_context_(check_context),
+ check_context_cf_id_(check_context_cf_id),
+ compaction_filter_created_(false) {}
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override {
+ if (check_context_) {
+ EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
+ EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
+ }
+ if (check_context_cf_id_) {
+ EXPECT_EQ(expect_cf_id_.load(), context.column_family_id);
+ }
+ compaction_filter_created_ = true;
+ return std::unique_ptr<CompactionFilter>(new KeepFilter());
+ }
+
+ bool compaction_filter_created() const { return compaction_filter_created_; }
+
+ const char* Name() const override { return "KeepFilterFactory"; }
+ bool check_context_;
+ bool check_context_cf_id_;
+ std::atomic_bool expect_full_compaction_;
+ std::atomic_bool expect_manual_compaction_;
+ std::atomic<uint32_t> expect_cf_id_;
+ bool compaction_filter_created_;
+};
+
+class DeleteFilterFactory : public CompactionFilterFactory {
+ public:
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override {
+ if (context.is_manual_compaction) {
+ return std::unique_ptr<CompactionFilter>(new DeleteFilter());
+ } else {
+ return std::unique_ptr<CompactionFilter>(nullptr);
+ }
+ }
+
+ const char* Name() const override { return "DeleteFilterFactory"; }
+};
+
+// Delete Filter Factory which ignores snapshots
+class DeleteISFilterFactory : public CompactionFilterFactory {
+ public:
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override {
+ if (context.is_manual_compaction) {
+ return std::unique_ptr<CompactionFilter>(new DeleteISFilter());
+ } else {
+ return std::unique_ptr<CompactionFilter>(nullptr);
+ }
+ }
+
+ const char* Name() const override { return "DeleteFilterFactory"; }
+};
+
+class SkipEvenFilterFactory : public CompactionFilterFactory {
+ public:
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override {
+ if (context.is_manual_compaction) {
+ return std::unique_ptr<CompactionFilter>(new SkipEvenFilter());
+ } else {
+ return std::unique_ptr<CompactionFilter>(nullptr);
+ }
+ }
+
+ const char* Name() const override { return "SkipEvenFilterFactory"; }
+};
+
+class DelayFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
+ }
+
+ const char* Name() const override { return "DelayFilterFactory"; }
+
+ private:
+ DBTestBase* db_test;
+};
+
+class ConditionalFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit ConditionalFilterFactory(const Slice& filtered_value)
+ : filtered_value_(filtered_value.ToString()) {}
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(
+ new ConditionalFilter(&filtered_value_));
+ }
+
+ const char* Name() const override { return "ConditionalFilterFactory"; }
+
+ private:
+ std::string filtered_value_;
+};
+
+class ChangeFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit ChangeFilterFactory() {}
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(new ChangeFilter());
+ }
+
+ const char* Name() const override { return "ChangeFilterFactory"; }
+};
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTestCompactionFilter, CompactionFilter) {
+ Options options = CurrentOptions();
+ options.max_open_files = -1;
+ options.num_levels = 3;
+ options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
+ options = CurrentOptions(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ // Write 100K keys, these are written to a few files in L0.
+ const std::string value(10, 'x');
+ for (int i = 0; i < 100000; i++) {
+ char key[100];
+ snprintf(key, sizeof(key), "B%010d", i);
+ Put(1, key, value);
+ }
+ ASSERT_OK(Flush(1));
+
+ // Push all files to the highest level L2. Verify that
+ // the compaction is each level invokes the filter for
+ // all the keys in that level.
+ cfilter_count = 0;
+ dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ ASSERT_EQ(cfilter_count, 100000);
+ cfilter_count = 0;
+ dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ ASSERT_EQ(cfilter_count, 100000);
+
+ ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
+ ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
+ ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
+ cfilter_count = 0;
+
+ // All the files are in the lowest level.
+ // Verify that all but the 100001st record
+ // has sequence number zero. The 100001st record
+ // is at the tip of this snapshot and cannot
+ // be zeroed out.
+ int count = 0;
+ int total = 0;
+ Arena arena;
+ {
+ InternalKeyComparator icmp(options.comparator);
+ ReadRangeDelAggregator range_del_agg(&icmp,
+ kMaxSequenceNumber /* upper_bound */);
+ ScopedArenaIterator iter(dbfull()->NewInternalIterator(
+ &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
+ iter->SeekToFirst();
+ ASSERT_OK(iter->status());
+ while (iter->Valid()) {
+ ParsedInternalKey ikey(Slice(), 0, kTypeValue);
+ ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
+ total++;
+ if (ikey.sequence != 0) {
+ count++;
+ }
+ iter->Next();
+ }
+ }
+ ASSERT_EQ(total, 100000);
+ ASSERT_EQ(count, 0);
+
+ // overwrite all the 100K keys once again.
+ for (int i = 0; i < 100000; i++) {
+ char key[100];
+ snprintf(key, sizeof(key), "B%010d", i);
+ ASSERT_OK(Put(1, key, value));
+ }
+ ASSERT_OK(Flush(1));
+
+ // push all files to the highest level L2. This
+ // means that all keys should pass at least once
+ // via the compaction filter
+ cfilter_count = 0;
+ dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ ASSERT_EQ(cfilter_count, 100000);
+ cfilter_count = 0;
+ dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ ASSERT_EQ(cfilter_count, 100000);
+ ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
+ ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
+ ASSERT_NE(NumTableFilesAtLevel(2, 1), 0);
+
+ // create a new database with the compaction
+ // filter in such a way that it deletes all keys
+ options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ // write all the keys once again.
+ for (int i = 0; i < 100000; i++) {
+ char key[100];
+ snprintf(key, sizeof(key), "B%010d", i);
+ ASSERT_OK(Put(1, key, value));
+ }
+ ASSERT_OK(Flush(1));
+ ASSERT_NE(NumTableFilesAtLevel(0, 1), 0);
+ ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
+ ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0);
+
+ // Push all files to the highest level L2. This
+ // triggers the compaction filter to delete all keys,
+ // verify that at the end of the compaction process,
+ // nothing is left.
+ cfilter_count = 0;
+ dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ ASSERT_EQ(cfilter_count, 100000);
+ cfilter_count = 0;
+ dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ ASSERT_EQ(cfilter_count, 0);
+ ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
+ ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0);
+
+ {
+ // Scan the entire database to ensure that nothing is left
+ std::unique_ptr<Iterator> iter(
+ db_->NewIterator(ReadOptions(), handles_[1]));
+ iter->SeekToFirst();
+ count = 0;
+ while (iter->Valid()) {
+ count++;
+ iter->Next();
+ }
+ ASSERT_EQ(count, 0);
+ }
+
+ // The sequence number of the remaining record
+ // is not zeroed out even though it is at the
+ // level Lmax because this record is at the tip
+ count = 0;
+ {
+ InternalKeyComparator icmp(options.comparator);
+ ReadRangeDelAggregator range_del_agg(&icmp,
+ kMaxSequenceNumber /* upper_bound */);
+ ScopedArenaIterator iter(dbfull()->NewInternalIterator(
+ &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
+ iter->SeekToFirst();
+ ASSERT_OK(iter->status());
+ while (iter->Valid()) {
+ ParsedInternalKey ikey(Slice(), 0, kTypeValue);
+ ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
+ ASSERT_NE(ikey.sequence, (unsigned)0);
+ count++;
+ iter->Next();
+ }
+ ASSERT_EQ(count, 0);
+ }
+}
+
+// Tests the edge case where compaction does not produce any output -- all
+// entries are deleted. The compaction should create bunch of 'DeleteFile'
+// entries in VersionEdit, but none of the 'AddFile's.
+TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) {
+ Options options = CurrentOptions();
+ options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>();
+ options.disable_auto_compactions = true;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+
+ // put some data
+ for (int table = 0; table < 4; ++table) {
+ for (int i = 0; i < 10 + table; ++i) {
+ Put(ToString(table * 100 + i), "val");
+ }
+ Flush();
+ }
+
+ // this will produce empty file (delete compaction filter)
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_EQ(0U, CountLiveFiles());
+
+ Reopen(options);
+
+ Iterator* itr = db_->NewIterator(ReadOptions());
+ itr->SeekToFirst();
+ // empty db
+ ASSERT_TRUE(!itr->Valid());
+
+ delete itr;
+}
+#endif // ROCKSDB_LITE
+
+TEST_P(DBTestCompactionFilterWithCompactParam,
+ CompactionFilterWithValueChange) {
+ Options options = CurrentOptions();
+ options.num_levels = 3;
+ options.compaction_filter_factory = std::make_shared<ChangeFilterFactory>();
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ // Write 100K+1 keys, these are written to a few files
+ // in L0. We do this so that the current snapshot points
+ // to the 100001 key.The compaction filter is not invoked
+ // on keys that are visible via a snapshot because we
+ // anyways cannot delete it.
+ const std::string value(10, 'x');
+ for (int i = 0; i < 100001; i++) {
+ char key[100];
+ snprintf(key, sizeof(key), "B%010d", i);
+ Put(1, key, value);
+ }
+
+ // push all files to lower levels
+ ASSERT_OK(Flush(1));
+ if (option_config_ != kUniversalCompactionMultiLevel &&
+ option_config_ != kUniversalSubcompactions) {
+ dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ } else {
+ dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
+ nullptr);
+ }
+
+ // re-write all data again
+ for (int i = 0; i < 100001; i++) {
+ char key[100];
+ snprintf(key, sizeof(key), "B%010d", i);
+ Put(1, key, value);
+ }
+
+ // push all files to lower levels. This should
+ // invoke the compaction filter for all 100000 keys.
+ ASSERT_OK(Flush(1));
+ if (option_config_ != kUniversalCompactionMultiLevel &&
+ option_config_ != kUniversalSubcompactions) {
+ dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+ dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
+ } else {
+ dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
+ nullptr);
+ }
+
+ // verify that all keys now have the new value that
+ // was set by the compaction process.
+ for (int i = 0; i < 100001; i++) {
+ char key[100];
+ snprintf(key, sizeof(key), "B%010d", i);
+ std::string newvalue = Get(1, key);
+ ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
+ }
+}
+
+TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) {
+ std::string one, two, three, four;
+ PutFixed64(&one, 1);
+ PutFixed64(&two, 2);
+ PutFixed64(&three, 3);
+ PutFixed64(&four, 4);
+
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ options.merge_operator = MergeOperators::CreateUInt64AddOperator();
+ options.num_levels = 3;
+ // Filter out keys with value is 2.
+ options.compaction_filter_factory =
+ std::make_shared<ConditionalFilterFactory>(two);
+ DestroyAndReopen(options);
+
+ // In the same compaction, a value type needs to be deleted based on
+ // compaction filter, and there is a merge type for the key. compaction
+ // filter result is ignored.
+ ASSERT_OK(db_->Put(WriteOptions(), "foo", two));
+ ASSERT_OK(Flush());
+ ASSERT_OK(db_->Merge(WriteOptions(), "foo", one));
+ ASSERT_OK(Flush());
+ std::string newvalue = Get("foo");
+ ASSERT_EQ(newvalue, three);
+ dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ newvalue = Get("foo");
+ ASSERT_EQ(newvalue, three);
+
+ // value key can be deleted based on compaction filter, leaving only
+ // merge keys.
+ ASSERT_OK(db_->Put(WriteOptions(), "bar", two));
+ ASSERT_OK(Flush());
+ dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ newvalue = Get("bar");
+ ASSERT_EQ("NOT_FOUND", newvalue);
+ ASSERT_OK(db_->Merge(WriteOptions(), "bar", two));
+ ASSERT_OK(Flush());
+ dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ newvalue = Get("bar");
+ ASSERT_EQ(two, two);
+
+ // Compaction filter never applies to merge keys.
+ ASSERT_OK(db_->Put(WriteOptions(), "foobar", one));
+ ASSERT_OK(Flush());
+ ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two));
+ ASSERT_OK(Flush());
+ newvalue = Get("foobar");
+ ASSERT_EQ(newvalue, three);
+ dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ newvalue = Get("foobar");
+ ASSERT_EQ(newvalue, three);
+
+ // In the same compaction, both of value type and merge type keys need to be
+ // deleted based on compaction filter, and there is a merge type for the key.
+ // For both keys, compaction filter results are ignored.
+ ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two));
+ ASSERT_OK(Flush());
+ ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two));
+ ASSERT_OK(Flush());
+ newvalue = Get("barfoo");
+ ASSERT_EQ(newvalue, four);
+ dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ newvalue = Get("barfoo");
+ ASSERT_EQ(newvalue, four);
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
+ KeepFilterFactory* filter = new KeepFilterFactory(true, true);
+
+ Options options = CurrentOptions();
+ options.compaction_style = kCompactionStyleUniversal;
+ options.compaction_filter_factory.reset(filter);
+ options.compression = kNoCompression;
+ options.level0_file_num_compaction_trigger = 8;
+ Reopen(options);
+ int num_keys_per_file = 400;
+ for (int j = 0; j < 3; j++) {
+ // Write several keys.
+ const std::string value(10, 'x');
+ for (int i = 0; i < num_keys_per_file; i++) {
+ char key[100];
+ snprintf(key, sizeof(key), "B%08d%02d", i, j);
+ Put(key, value);
+ }
+ dbfull()->TEST_FlushMemTable();
+ // Make sure next file is much smaller so automatic compaction will not
+ // be triggered.
+ num_keys_per_file /= 2;
+ }
+ dbfull()->TEST_WaitForCompact();
+
+ // Force a manual compaction
+ cfilter_count = 0;
+ filter->expect_manual_compaction_.store(true);
+ filter->expect_full_compaction_.store(true);
+ filter->expect_cf_id_.store(0);
+ dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_EQ(cfilter_count, 700);
+ ASSERT_EQ(NumSortedRuns(0), 1);
+ ASSERT_TRUE(filter->compaction_filter_created());
+
+ // Verify total number of keys is correct after manual compaction.
+ {
+ int count = 0;
+ int total = 0;
+ Arena arena;
+ InternalKeyComparator icmp(options.comparator);
+ ReadRangeDelAggregator range_del_agg(&icmp,
+ kMaxSequenceNumber /* snapshots */);
+ ScopedArenaIterator iter(dbfull()->NewInternalIterator(
+ &arena, &range_del_agg, kMaxSequenceNumber));
+ iter->SeekToFirst();
+ ASSERT_OK(iter->status());
+ while (iter->Valid()) {
+ ParsedInternalKey ikey(Slice(), 0, kTypeValue);
+ ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
+ total++;
+ if (ikey.sequence != 0) {
+ count++;
+ }
+ iter->Next();
+ }
+ ASSERT_EQ(total, 700);
+ ASSERT_EQ(count, 0);
+ }
+}
+#endif // ROCKSDB_LITE
+
+TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) {
+ KeepFilterFactory* filter = new KeepFilterFactory(false, true);
+ filter->expect_cf_id_.store(1);
+
+ Options options = CurrentOptions();
+ options.compaction_filter_factory.reset(filter);
+ options.compression = kNoCompression;
+ options.level0_file_num_compaction_trigger = 2;
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ int num_keys_per_file = 400;
+ for (int j = 0; j < 3; j++) {
+ // Write several keys.
+ const std::string value(10, 'x');
+ for (int i = 0; i < num_keys_per_file; i++) {
+ char key[100];
+ snprintf(key, sizeof(key), "B%08d%02d", i, j);
+ Put(1, key, value);
+ }
+ Flush(1);
+ // Make sure next file is much smaller so automatic compaction will not
+ // be triggered.
+ num_keys_per_file /= 2;
+ }
+ dbfull()->TEST_WaitForCompact();
+
+ ASSERT_TRUE(filter->compaction_filter_created());
+}
+
+#ifndef ROCKSDB_LITE
+// Compaction filters aplies to all records, regardless snapshots.
+TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
+ std::string five = ToString(5);
+ Options options = CurrentOptions();
+ options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>();
+ options.disable_auto_compactions = true;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+
+ // Put some data.
+ const Snapshot* snapshot = nullptr;
+ for (int table = 0; table < 4; ++table) {
+ for (int i = 0; i < 10; ++i) {
+ Put(ToString(table * 100 + i), "val");
+ }
+ Flush();
+
+ if (table == 0) {
+ snapshot = db_->GetSnapshot();
+ }
+ }
+ assert(snapshot != nullptr);
+
+ cfilter_count = 0;
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ // The filter should delete 40 records.
+ ASSERT_EQ(40, cfilter_count);
+
+ {
+ // Scan the entire database as of the snapshot to ensure
+ // that nothing is left
+ ReadOptions read_options;
+ read_options.snapshot = snapshot;
+ std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
+ iter->SeekToFirst();
+ int count = 0;
+ while (iter->Valid()) {
+ count++;
+ iter->Next();
+ }
+ ASSERT_EQ(count, 6);
+ read_options.snapshot = nullptr;
+ std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options));
+ iter1->SeekToFirst();
+ count = 0;
+ while (iter1->Valid()) {
+ count++;
+ iter1->Next();
+ }
+ // We have deleted 10 keys from 40 using the compaction filter
+ // Keys 6-9 before the snapshot and 100-105 after the snapshot
+ ASSERT_EQ(count, 30);
+ }
+
+ // Release the snapshot and compact again -> now all records should be
+ // removed.
+ db_->ReleaseSnapshot(snapshot);
+}
+#endif // ROCKSDB_LITE
+
+TEST_F(DBTestCompactionFilter, SkipUntil) {
+ Options options = CurrentOptions();
+ options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
+ options.disable_auto_compactions = true;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+
+ // Write 100K keys, these are written to a few files in L0.
+ for (int table = 0; table < 4; ++table) {
+ // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371].
+ for (int i = table * 6; i < 39 + table * 11; ++i) {
+ char key[100];
+ snprintf(key, sizeof(key), "%010d", table * 100 + i);
+ Put(key, std::to_string(table * 1000 + i));
+ }
+ Flush();
+ }
+
+ cfilter_skips = 0;
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ // Number of skips in tables: 2, 3, 3, 3.
+ ASSERT_EQ(11, cfilter_skips);
+
+ for (int table = 0; table < 4; ++table) {
+ for (int i = table * 6; i < 39 + table * 11; ++i) {
+ int k = table * 100 + i;
+ char key[100];
+ snprintf(key, sizeof(key), "%010d", table * 100 + i);
+ auto expected = std::to_string(table * 1000 + i);
+ std::string val;
+ Status s = db_->Get(ReadOptions(), key, &val);
+ if (k / 10 % 2 == 0) {
+ ASSERT_TRUE(s.IsNotFound());
+ } else {
+ ASSERT_OK(s);
+ ASSERT_EQ(expected, val);
+ }
+ }
+ }
+}
+
+TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) {
+ BlockBasedTableOptions table_options;
+ table_options.whole_key_filtering = false;
+ table_options.filter_policy.reset(NewBloomFilterPolicy(100, false));
+
+ Options options = CurrentOptions();
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ options.prefix_extractor.reset(NewCappedPrefixTransform(9));
+ options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
+ options.disable_auto_compactions = true;
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+
+ Put("0000000010", "v10");
+ Put("0000000020", "v20"); // skipped
+ Put("0000000050", "v50");
+ Flush();
+
+ cfilter_skips = 0;
+ EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ EXPECT_EQ(1, cfilter_skips);
+
+ Status s;
+ std::string val;
+
+ s = db_->Get(ReadOptions(), "0000000010", &val);
+ ASSERT_OK(s);
+ EXPECT_EQ("v10", val);
+
+ s = db_->Get(ReadOptions(), "0000000020", &val);
+ EXPECT_TRUE(s.IsNotFound());
+
+ s = db_->Get(ReadOptions(), "0000000050", &val);
+ ASSERT_OK(s);
+ EXPECT_EQ("v50", val);
+}
+
+class TestNotSupportedFilter : public CompactionFilter {
+ public:
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ return true;
+ }
+
+ const char* Name() const override { return "NotSupported"; }
+ bool IgnoreSnapshots() const override { return false; }
+};
+
+TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalse) {
+ Options options = CurrentOptions();
+ options.compaction_filter = new TestNotSupportedFilter();
+ DestroyAndReopen(options);
+
+ Put("a", "v10");
+ Put("z", "v20");
+ Flush();
+
+ Put("a", "v10");
+ Put("z", "v20");
+ Flush();
+
+ // Comapction should fail because IgnoreSnapshots() = false
+ EXPECT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+ .IsNotSupported());
+
+ delete options.compaction_filter;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}