diff options
Diffstat (limited to 'src/rocksdb/db/db_compaction_filter_test.cc')
-rw-r--r-- | src/rocksdb/db/db_compaction_filter_test.cc | 1036 |
1 files changed, 1036 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_compaction_filter_test.cc b/src/rocksdb/db/db_compaction_filter_test.cc new file mode 100644 index 000000000..be863d4f6 --- /dev/null +++ b/src/rocksdb/db/db_compaction_filter_test.cc @@ -0,0 +1,1036 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_test_util.h" +#include "port/stack_trace.h" + +namespace ROCKSDB_NAMESPACE { + +static int cfilter_count = 0; +static int cfilter_skips = 0; + +// This is a static filter used for filtering +// kvs during the compaction process. +static std::string NEW_VALUE = "NewValue"; + +class DBTestCompactionFilter : public DBTestBase { + public: + DBTestCompactionFilter() + : DBTestBase("db_compaction_filter_test", /*env_do_fsync=*/true) {} +}; + +// Param variant of DBTestBase::ChangeCompactOptions +class DBTestCompactionFilterWithCompactParam + : public DBTestCompactionFilter, + public ::testing::WithParamInterface<DBTestBase::OptionConfig> { + public: + DBTestCompactionFilterWithCompactParam() : DBTestCompactionFilter() { + option_config_ = GetParam(); + Destroy(last_options_); + auto options = CurrentOptions(); + if (option_config_ == kDefault || option_config_ == kUniversalCompaction || + option_config_ == kUniversalCompactionMultiLevel) { + options.create_if_missing = true; + } + if (option_config_ == kLevelSubcompactions || + option_config_ == kUniversalSubcompactions) { + assert(options.max_subcompactions > 1); + } + Reopen(options); + } +}; + +#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) +INSTANTIATE_TEST_CASE_P( + CompactionFilterWithOption, DBTestCompactionFilterWithCompactParam, + ::testing::Values(DBTestBase::OptionConfig::kDefault, + DBTestBase::OptionConfig::kUniversalCompaction, + DBTestBase::OptionConfig::kUniversalCompactionMultiLevel, + DBTestBase::OptionConfig::kLevelSubcompactions, + DBTestBase::OptionConfig::kUniversalSubcompactions)); +#else +// Run fewer cases in non-full valgrind to save time. +INSTANTIATE_TEST_CASE_P(CompactionFilterWithOption, + DBTestCompactionFilterWithCompactParam, + ::testing::Values(DBTestBase::OptionConfig::kDefault)); +#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) + +class KeepFilter : public CompactionFilter { + public: + bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const override { + cfilter_count++; + return false; + } + + const char* Name() const override { return "KeepFilter"; } +}; + +class DeleteFilter : public CompactionFilter { + public: + bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const override { + cfilter_count++; + return true; + } + + bool FilterMergeOperand(int /*level*/, const Slice& /*key*/, + const Slice& /*operand*/) const override { + return true; + } + + const char* Name() const override { return "DeleteFilter"; } +}; + +class DeleteISFilter : public CompactionFilter { + public: + bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const override { + cfilter_count++; + int i = std::stoi(key.ToString()); + if (i > 5 && i <= 105) { + return true; + } + return false; + } + + bool IgnoreSnapshots() const override { return true; } + + const char* Name() const override { return "DeleteFilter"; } +}; + +// Skip x if floor(x/10) is even, use range skips. Requires that keys are +// zero-padded to length 10. +class SkipEvenFilter : public CompactionFilter { + public: + Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + std::string* skip_until) const override { + cfilter_count++; + int i = std::stoi(key.ToString()); + if (i / 10 % 2 == 0) { + char key_str[100]; + snprintf(key_str, sizeof(key_str), "%010d", i / 10 * 10 + 10); + *skip_until = key_str; + ++cfilter_skips; + return Decision::kRemoveAndSkipUntil; + } + return Decision::kKeep; + } + + bool IgnoreSnapshots() const override { return true; } + + const char* Name() const override { return "DeleteFilter"; } +}; + +class ConditionalFilter : public CompactionFilter { + public: + explicit ConditionalFilter(const std::string* filtered_value) + : filtered_value_(filtered_value) {} + bool Filter(int /*level*/, const Slice& /*key*/, const Slice& value, + std::string* /*new_value*/, + bool* /*value_changed*/) const override { + return value.ToString() == *filtered_value_; + } + + const char* Name() const override { return "ConditionalFilter"; } + + private: + const std::string* filtered_value_; +}; + +class ChangeFilter : public CompactionFilter { + public: + explicit ChangeFilter() {} + + bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, + std::string* new_value, bool* value_changed) const override { + assert(new_value != nullptr); + *new_value = NEW_VALUE; + *value_changed = true; + return false; + } + + const char* Name() const override { return "ChangeFilter"; } +}; + +class KeepFilterFactory : public CompactionFilterFactory { + public: + explicit KeepFilterFactory(bool check_context = false, + bool check_context_cf_id = false) + : check_context_(check_context), + check_context_cf_id_(check_context_cf_id), + compaction_filter_created_(false) {} + + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& context) override { + if (check_context_) { + EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction); + EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); + } + if (check_context_cf_id_) { + EXPECT_EQ(expect_cf_id_.load(), context.column_family_id); + } + compaction_filter_created_ = true; + return std::unique_ptr<CompactionFilter>(new KeepFilter()); + } + + bool compaction_filter_created() const { return compaction_filter_created_; } + + const char* Name() const override { return "KeepFilterFactory"; } + bool check_context_; + bool check_context_cf_id_; + std::atomic_bool expect_full_compaction_; + std::atomic_bool expect_manual_compaction_; + std::atomic<uint32_t> expect_cf_id_; + bool compaction_filter_created_; +}; + +// This filter factory is configured with a `TableFileCreationReason`. Only +// table files created for that reason will undergo filtering. This +// configurability makes it useful to tests for filtering non-compaction table +// files, such as "CompactionFilterFlush" and "CompactionFilterRecovery". +class DeleteFilterFactory : public CompactionFilterFactory { + public: + explicit DeleteFilterFactory(TableFileCreationReason reason) + : reason_(reason) {} + + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& context) override { + EXPECT_EQ(reason_, context.reason); + if (context.reason == TableFileCreationReason::kCompaction && + !context.is_manual_compaction) { + // Table files created by automatic compaction do not undergo filtering. + // Presumably some tests rely on this. + return std::unique_ptr<CompactionFilter>(nullptr); + } + return std::unique_ptr<CompactionFilter>(new DeleteFilter()); + } + + bool ShouldFilterTableFileCreation( + TableFileCreationReason reason) const override { + return reason_ == reason; + } + + const char* Name() const override { return "DeleteFilterFactory"; } + + private: + const TableFileCreationReason reason_; +}; + +// Delete Filter Factory which ignores snapshots +class DeleteISFilterFactory : public CompactionFilterFactory { + public: + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& context) override { + if (context.is_manual_compaction) { + return std::unique_ptr<CompactionFilter>(new DeleteISFilter()); + } else { + return std::unique_ptr<CompactionFilter>(nullptr); + } + } + + const char* Name() const override { return "DeleteFilterFactory"; } +}; + +class SkipEvenFilterFactory : public CompactionFilterFactory { + public: + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& context) override { + if (context.is_manual_compaction) { + return std::unique_ptr<CompactionFilter>(new SkipEvenFilter()); + } else { + return std::unique_ptr<CompactionFilter>(nullptr); + } + } + + const char* Name() const override { return "SkipEvenFilterFactory"; } +}; + +class ConditionalFilterFactory : public CompactionFilterFactory { + public: + explicit ConditionalFilterFactory(const Slice& filtered_value) + : filtered_value_(filtered_value.ToString()) {} + + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + return std::unique_ptr<CompactionFilter>( + new ConditionalFilter(&filtered_value_)); + } + + const char* Name() const override { return "ConditionalFilterFactory"; } + + private: + std::string filtered_value_; +}; + +class ChangeFilterFactory : public CompactionFilterFactory { + public: + explicit ChangeFilterFactory() {} + + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + return std::unique_ptr<CompactionFilter>(new ChangeFilter()); + } + + const char* Name() const override { return "ChangeFilterFactory"; } +}; + +#ifndef ROCKSDB_LITE +TEST_F(DBTestCompactionFilter, CompactionFilter) { + Options options = CurrentOptions(); + options.max_open_files = -1; + options.num_levels = 3; + options.compaction_filter_factory = std::make_shared<KeepFilterFactory>(); + options = CurrentOptions(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Write 100K keys, these are written to a few files in L0. + const std::string value(10, 'x'); + for (int i = 0; i < 100000; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + ASSERT_OK(Put(1, key, value)); + } + ASSERT_OK(Flush(1)); + + // Push all files to the highest level L2. Verify that + // the compaction is each level invokes the filter for + // all the keys in that level. + cfilter_count = 0; + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); + ASSERT_EQ(cfilter_count, 100000); + + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); + ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); + ASSERT_NE(NumTableFilesAtLevel(2, 1), 0); + cfilter_count = 0; + + // All the files are in the lowest level. + // Verify that all but the 100001st record + // has sequence number zero. The 100001st record + // is at the tip of this snapshot and cannot + // be zeroed out. + int count = 0; + int total = 0; + Arena arena; + { + InternalKeyComparator icmp(options.comparator); + ReadOptions read_options; + ScopedArenaIterator iter(dbfull()->NewInternalIterator( + read_options, &arena, kMaxSequenceNumber, handles_[1])); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ASSERT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */)); + total++; + if (ikey.sequence != 0) { + count++; + } + iter->Next(); + } + ASSERT_OK(iter->status()); + } + ASSERT_EQ(total, 100000); + ASSERT_EQ(count, 0); + + // overwrite all the 100K keys once again. + for (int i = 0; i < 100000; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + ASSERT_OK(Put(1, key, value)); + } + ASSERT_OK(Flush(1)); + + // push all files to the highest level L2. This + // means that all keys should pass at least once + // via the compaction filter + cfilter_count = 0; + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); + ASSERT_EQ(cfilter_count, 100000); + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); + ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); + ASSERT_NE(NumTableFilesAtLevel(2, 1), 0); + + // create a new database with the compaction + // filter in such a way that it deletes all keys + options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>( + TableFileCreationReason::kCompaction); + options.create_if_missing = true; + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // write all the keys once again. + for (int i = 0; i < 100000; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + ASSERT_OK(Put(1, key, value)); + } + ASSERT_OK(Flush(1)); + ASSERT_NE(NumTableFilesAtLevel(0, 1), 0); + ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0); + + // Push all files to the highest level L2. This + // triggers the compaction filter to delete all keys, + // verify that at the end of the compaction process, + // nothing is left. + cfilter_count = 0; + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); + ASSERT_EQ(cfilter_count, 0); + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); + ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); + + { + // Scan the entire database to ensure that nothing is left + std::unique_ptr<Iterator> iter( + db_->NewIterator(ReadOptions(), handles_[1])); + iter->SeekToFirst(); + count = 0; + while (iter->Valid()) { + count++; + iter->Next(); + } + ASSERT_OK(iter->status()); + ASSERT_EQ(count, 0); + } + + // The sequence number of the remaining record + // is not zeroed out even though it is at the + // level Lmax because this record is at the tip + count = 0; + { + InternalKeyComparator icmp(options.comparator); + ReadOptions read_options; + ScopedArenaIterator iter(dbfull()->NewInternalIterator( + read_options, &arena, kMaxSequenceNumber, handles_[1])); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ASSERT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */)); + ASSERT_NE(ikey.sequence, (unsigned)0); + count++; + iter->Next(); + } + ASSERT_EQ(count, 0); + } +} + +// Tests the edge case where compaction does not produce any output -- all +// entries are deleted. The compaction should create bunch of 'DeleteFile' +// entries in VersionEdit, but none of the 'AddFile's. +TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) { + Options options = CurrentOptions(); + options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>( + TableFileCreationReason::kCompaction); + options.disable_auto_compactions = true; + options.create_if_missing = true; + DestroyAndReopen(options); + + // put some data + for (int table = 0; table < 4; ++table) { + for (int i = 0; i < 10 + table; ++i) { + ASSERT_OK(Put(std::to_string(table * 100 + i), "val")); + } + ASSERT_OK(Flush()); + } + + // this will produce empty file (delete compaction filter) + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ(0U, CountLiveFiles()); + + Reopen(options); + + Iterator* itr = db_->NewIterator(ReadOptions()); + itr->SeekToFirst(); + ASSERT_OK(itr->status()); + // empty db + ASSERT_TRUE(!itr->Valid()); + + delete itr; +} +#endif // ROCKSDB_LITE + +TEST_F(DBTestCompactionFilter, CompactionFilterFlush) { + // Tests a `CompactionFilterFactory` that filters when table file is created + // by flush. + Options options = CurrentOptions(); + options.compaction_filter_factory = + std::make_shared<DeleteFilterFactory>(TableFileCreationReason::kFlush); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + Reopen(options); + + // Puts and Merges are purged in flush. + ASSERT_OK(Put("a", "v")); + ASSERT_OK(Merge("b", "v")); + ASSERT_OK(Flush()); + ASSERT_EQ("NOT_FOUND", Get("a")); + ASSERT_EQ("NOT_FOUND", Get("b")); + + // However, Puts and Merges are preserved by recovery. + ASSERT_OK(Put("a", "v")); + ASSERT_OK(Merge("b", "v")); + Reopen(options); + ASSERT_EQ("v", Get("a")); + ASSERT_EQ("v", Get("b")); + + // Likewise, compaction does not apply filtering. + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("v", Get("a")); + ASSERT_EQ("v", Get("b")); +} + +TEST_F(DBTestCompactionFilter, CompactionFilterRecovery) { + // Tests a `CompactionFilterFactory` that filters when table file is created + // by recovery. + Options options = CurrentOptions(); + options.compaction_filter_factory = + std::make_shared<DeleteFilterFactory>(TableFileCreationReason::kRecovery); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + Reopen(options); + + // Puts and Merges are purged in recovery. + ASSERT_OK(Put("a", "v")); + ASSERT_OK(Merge("b", "v")); + Reopen(options); + ASSERT_EQ("NOT_FOUND", Get("a")); + ASSERT_EQ("NOT_FOUND", Get("b")); + + // However, Puts and Merges are preserved by flush. + ASSERT_OK(Put("a", "v")); + ASSERT_OK(Merge("b", "v")); + ASSERT_OK(Flush()); + ASSERT_EQ("v", Get("a")); + ASSERT_EQ("v", Get("b")); + + // Likewise, compaction does not apply filtering. + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("v", Get("a")); + ASSERT_EQ("v", Get("b")); +} + +TEST_P(DBTestCompactionFilterWithCompactParam, + CompactionFilterWithValueChange) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.compaction_filter_factory = std::make_shared<ChangeFilterFactory>(); + CreateAndReopenWithCF({"pikachu"}, options); + + // Write 100K+1 keys, these are written to a few files + // in L0. We do this so that the current snapshot points + // to the 100001 key.The compaction filter is not invoked + // on keys that are visible via a snapshot because we + // anyways cannot delete it. + const std::string value(10, 'x'); + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + ASSERT_OK(Put(1, key, value)); + } + + // push all files to lower levels + ASSERT_OK(Flush(1)); + if (option_config_ != kUniversalCompactionMultiLevel && + option_config_ != kUniversalSubcompactions) { + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); + ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); + } else { + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], + nullptr, nullptr)); + } + + // re-write all data again + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + ASSERT_OK(Put(1, key, value)); + } + + // push all files to lower levels. This should + // invoke the compaction filter for all 100000 keys. + ASSERT_OK(Flush(1)); + if (option_config_ != kUniversalCompactionMultiLevel && + option_config_ != kUniversalSubcompactions) { + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1])); + ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1])); + } else { + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], + nullptr, nullptr)); + } + + // verify that all keys now have the new value that + // was set by the compaction process. + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + std::string newvalue = Get(1, key); + ASSERT_EQ(newvalue.compare(NEW_VALUE), 0); + } +} + +TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) { + std::string one, two, three, four; + PutFixed64(&one, 1); + PutFixed64(&two, 2); + PutFixed64(&three, 3); + PutFixed64(&four, 4); + + Options options = CurrentOptions(); + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + options.num_levels = 3; + // Filter out keys with value is 2. + options.compaction_filter_factory = + std::make_shared<ConditionalFilterFactory>(two); + DestroyAndReopen(options); + + // In the same compaction, a value type needs to be deleted based on + // compaction filter, and there is a merge type for the key. compaction + // filter result is ignored. + ASSERT_OK(db_->Put(WriteOptions(), "foo", two)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->Merge(WriteOptions(), "foo", one)); + ASSERT_OK(Flush()); + std::string newvalue = Get("foo"); + ASSERT_EQ(newvalue, three); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + newvalue = Get("foo"); + ASSERT_EQ(newvalue, three); + + // value key can be deleted based on compaction filter, leaving only + // merge keys. + ASSERT_OK(db_->Put(WriteOptions(), "bar", two)); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + newvalue = Get("bar"); + ASSERT_EQ("NOT_FOUND", newvalue); + ASSERT_OK(db_->Merge(WriteOptions(), "bar", two)); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + newvalue = Get("bar"); + ASSERT_EQ(two, two); + + // Compaction filter never applies to merge keys. + ASSERT_OK(db_->Put(WriteOptions(), "foobar", one)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two)); + ASSERT_OK(Flush()); + newvalue = Get("foobar"); + ASSERT_EQ(newvalue, three); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + newvalue = Get("foobar"); + ASSERT_EQ(newvalue, three); + + // In the same compaction, both of value type and merge type keys need to be + // deleted based on compaction filter, and there is a merge type for the key. + // For both keys, compaction filter results are ignored. + ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two)); + ASSERT_OK(Flush()); + newvalue = Get("barfoo"); + ASSERT_EQ(newvalue, four); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + newvalue = Get("barfoo"); + ASSERT_EQ(newvalue, four); +} + +#ifndef ROCKSDB_LITE +TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { + KeepFilterFactory* filter = new KeepFilterFactory(true, true); + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.compaction_filter_factory.reset(filter); + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = 8; + Reopen(options); + int num_keys_per_file = 400; + for (int j = 0; j < 3; j++) { + // Write several keys. + const std::string value(10, 'x'); + for (int i = 0; i < num_keys_per_file; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%08d%02d", i, j); + ASSERT_OK(Put(key, value)); + } + ASSERT_OK(dbfull()->TEST_FlushMemTable()); + // Make sure next file is much smaller so automatic compaction will not + // be triggered. + num_keys_per_file /= 2; + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // Force a manual compaction + cfilter_count = 0; + filter->expect_manual_compaction_.store(true); + filter->expect_full_compaction_.store(true); + filter->expect_cf_id_.store(0); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ(cfilter_count, 700); + ASSERT_EQ(NumSortedRuns(0), 1); + ASSERT_TRUE(filter->compaction_filter_created()); + + // Verify total number of keys is correct after manual compaction. + { + int count = 0; + int total = 0; + Arena arena; + InternalKeyComparator icmp(options.comparator); + ReadOptions read_options; + ScopedArenaIterator iter(dbfull()->NewInternalIterator(read_options, &arena, + kMaxSequenceNumber)); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ASSERT_OK(ParseInternalKey(iter->key(), &ikey, true /* log_err_key */)); + total++; + if (ikey.sequence != 0) { + count++; + } + iter->Next(); + } + ASSERT_EQ(total, 700); + ASSERT_EQ(count, 0); + } +} +#endif // ROCKSDB_LITE + +TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) { + KeepFilterFactory* filter = new KeepFilterFactory(false, true); + filter->expect_cf_id_.store(1); + + Options options = CurrentOptions(); + options.compaction_filter_factory.reset(filter); + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = 2; + CreateAndReopenWithCF({"pikachu"}, options); + + int num_keys_per_file = 400; + for (int j = 0; j < 3; j++) { + // Write several keys. + const std::string value(10, 'x'); + for (int i = 0; i < num_keys_per_file; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%08d%02d", i, j); + ASSERT_OK(Put(1, key, value)); + } + ASSERT_OK(Flush(1)); + // Make sure next file is much smaller so automatic compaction will not + // be triggered. + num_keys_per_file /= 2; + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_TRUE(filter->compaction_filter_created()); +} + +#ifndef ROCKSDB_LITE +// Compaction filters aplies to all records, regardless snapshots. +TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) { + std::string five = std::to_string(5); + Options options = CurrentOptions(); + options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>(); + options.disable_auto_compactions = true; + options.create_if_missing = true; + DestroyAndReopen(options); + + // Put some data. + const Snapshot* snapshot = nullptr; + for (int table = 0; table < 4; ++table) { + for (int i = 0; i < 10; ++i) { + ASSERT_OK(Put(std::to_string(table * 100 + i), "val")); + } + ASSERT_OK(Flush()); + + if (table == 0) { + snapshot = db_->GetSnapshot(); + } + } + assert(snapshot != nullptr); + + cfilter_count = 0; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // The filter should delete 40 records. + ASSERT_EQ(40, cfilter_count); + + { + // Scan the entire database as of the snapshot to ensure + // that nothing is left + ReadOptions read_options; + read_options.snapshot = snapshot; + std::unique_ptr<Iterator> iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + int count = 0; + while (iter->Valid()) { + count++; + iter->Next(); + } + ASSERT_EQ(count, 6); + read_options.snapshot = nullptr; + std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options)); + ASSERT_OK(iter1->status()); + iter1->SeekToFirst(); + count = 0; + while (iter1->Valid()) { + count++; + iter1->Next(); + } + // We have deleted 10 keys from 40 using the compaction filter + // Keys 6-9 before the snapshot and 100-105 after the snapshot + ASSERT_EQ(count, 30); + } + + // Release the snapshot and compact again -> now all records should be + // removed. + db_->ReleaseSnapshot(snapshot); +} +#endif // ROCKSDB_LITE + +TEST_F(DBTestCompactionFilter, SkipUntil) { + Options options = CurrentOptions(); + options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>(); + options.disable_auto_compactions = true; + options.create_if_missing = true; + DestroyAndReopen(options); + + // Write 100K keys, these are written to a few files in L0. + for (int table = 0; table < 4; ++table) { + // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371]. + for (int i = table * 6; i < 39 + table * 11; ++i) { + char key[100]; + snprintf(key, sizeof(key), "%010d", table * 100 + i); + ASSERT_OK(Put(key, std::to_string(table * 1000 + i))); + } + ASSERT_OK(Flush()); + } + + cfilter_skips = 0; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // Number of skips in tables: 2, 3, 3, 3. + ASSERT_EQ(11, cfilter_skips); + + for (int table = 0; table < 4; ++table) { + for (int i = table * 6; i < 39 + table * 11; ++i) { + int k = table * 100 + i; + char key[100]; + snprintf(key, sizeof(key), "%010d", table * 100 + i); + auto expected = std::to_string(table * 1000 + i); + std::string val; + Status s = db_->Get(ReadOptions(), key, &val); + if (k / 10 % 2 == 0) { + ASSERT_TRUE(s.IsNotFound()); + } else { + ASSERT_OK(s); + ASSERT_EQ(expected, val); + } + } + } +} + +TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) { + BlockBasedTableOptions table_options; + table_options.whole_key_filtering = false; + table_options.filter_policy.reset(NewBloomFilterPolicy(100, false)); + + Options options = CurrentOptions(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.prefix_extractor.reset(NewCappedPrefixTransform(9)); + options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>(); + options.disable_auto_compactions = true; + options.create_if_missing = true; + DestroyAndReopen(options); + + ASSERT_OK(Put("0000000010", "v10")); + ASSERT_OK(Put("0000000020", "v20")); // skipped + ASSERT_OK(Put("0000000050", "v50")); + ASSERT_OK(Flush()); + + cfilter_skips = 0; + EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + EXPECT_EQ(1, cfilter_skips); + + Status s; + std::string val; + + s = db_->Get(ReadOptions(), "0000000010", &val); + ASSERT_OK(s); + EXPECT_EQ("v10", val); + + s = db_->Get(ReadOptions(), "0000000020", &val); + EXPECT_TRUE(s.IsNotFound()); + + s = db_->Get(ReadOptions(), "0000000050", &val); + ASSERT_OK(s); + EXPECT_EQ("v50", val); +} + +class TestNotSupportedFilter : public CompactionFilter { + public: + bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const override { + return true; + } + + const char* Name() const override { return "NotSupported"; } + bool IgnoreSnapshots() const override { return false; } +}; + +TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalse) { + Options options = CurrentOptions(); + options.compaction_filter = new TestNotSupportedFilter(); + DestroyAndReopen(options); + + ASSERT_OK(Put("a", "v10")); + ASSERT_OK(Put("z", "v20")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("a", "v10")); + ASSERT_OK(Put("z", "v20")); + ASSERT_OK(Flush()); + + // Comapction should fail because IgnoreSnapshots() = false + EXPECT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr) + .IsNotSupported()); + + delete options.compaction_filter; +} + +class TestNotSupportedFilterFactory : public CompactionFilterFactory { + public: + explicit TestNotSupportedFilterFactory(TableFileCreationReason reason) + : reason_(reason) {} + + bool ShouldFilterTableFileCreation( + TableFileCreationReason reason) const override { + return reason_ == reason; + } + + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& /* context */) override { + return std::unique_ptr<CompactionFilter>(new TestNotSupportedFilter()); + } + + const char* Name() const override { return "TestNotSupportedFilterFactory"; } + + private: + const TableFileCreationReason reason_; +}; + +TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalseDuringFlush) { + Options options = CurrentOptions(); + options.compaction_filter_factory = + std::make_shared<TestNotSupportedFilterFactory>( + TableFileCreationReason::kFlush); + Reopen(options); + + ASSERT_OK(Put("a", "v10")); + ASSERT_TRUE(Flush().IsNotSupported()); +} + +TEST_F(DBTestCompactionFilter, IgnoreSnapshotsFalseRecovery) { + Options options = CurrentOptions(); + options.compaction_filter_factory = + std::make_shared<TestNotSupportedFilterFactory>( + TableFileCreationReason::kRecovery); + Reopen(options); + + ASSERT_OK(Put("a", "v10")); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); +} + +TEST_F(DBTestCompactionFilter, DropKeyWithSingleDelete) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + + Reopen(options); + + ASSERT_OK(Put("a", "v0")); + ASSERT_OK(Put("b", "v0")); + const Snapshot* snapshot = db_->GetSnapshot(); + + ASSERT_OK(SingleDelete("b")); + ASSERT_OK(Flush()); + + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = options.num_levels - 1; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } + + db_->ReleaseSnapshot(snapshot); + Close(); + + class DeleteFilterV2 : public CompactionFilter { + public: + Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/, + const Slice& /*existing_value*/, + std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (key.starts_with("b")) { + return Decision::kPurge; + } + return Decision::kRemove; + } + + const char* Name() const override { return "DeleteFilterV2"; } + } delete_filter_v2; + + options.compaction_filter = &delete_filter_v2; + options.level0_file_num_compaction_trigger = 2; + Reopen(options); + + ASSERT_OK(Put("b", "v1")); + ASSERT_OK(Put("x", "v1")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("r", "v1")); + ASSERT_OK(Put("z", "v1")); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + Close(); + + options.compaction_filter = nullptr; + Reopen(options); + ASSERT_OK(SingleDelete("b")); + ASSERT_OK(Flush()); + { + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |