diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/version_set_test.cc | 1287 |
1 files changed, 1287 insertions, 0 deletions
diff --git a/src/rocksdb/db/version_set_test.cc b/src/rocksdb/db/version_set_test.cc new file mode 100644 index 000000000..03e0e26d2 --- /dev/null +++ b/src/rocksdb/db/version_set_test.cc @@ -0,0 +1,1287 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/version_set.h" +#include "db/db_impl/db_impl.h" +#include "db/log_writer.h" +#include "logging/logging.h" +#include "table/mock_table.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +class GenerateLevelFilesBriefTest : public testing::Test { + public: + std::vector<FileMetaData*> files_; + LevelFilesBrief file_level_; + Arena arena_; + + GenerateLevelFilesBriefTest() { } + + ~GenerateLevelFilesBriefTest() override { + for (size_t i = 0; i < files_.size(); i++) { + delete files_[i]; + } + } + + void Add(const char* smallest, const char* largest, + SequenceNumber smallest_seq = 100, + SequenceNumber largest_seq = 100) { + FileMetaData* f = new FileMetaData( + files_.size() + 1, 0, 0, + InternalKey(smallest, smallest_seq, kTypeValue), + InternalKey(largest, largest_seq, kTypeValue), smallest_seq, + largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber, + kUnknownOldestAncesterTime, kUnknownFileCreationTime, + kUnknownFileChecksum, kUnknownFileChecksumFuncName); + files_.push_back(f); + } + + int Compare() { + int diff = 0; + for (size_t i = 0; i < files_.size(); i++) { + if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) { + diff++; + } + } + return diff; + } +}; + +TEST_F(GenerateLevelFilesBriefTest, Empty) { + DoGenerateLevelFilesBrief(&file_level_, files_, &arena_); + ASSERT_EQ(0u, file_level_.num_files); + ASSERT_EQ(0, Compare()); +} + +TEST_F(GenerateLevelFilesBriefTest, Single) { + Add("p", "q"); + DoGenerateLevelFilesBrief(&file_level_, files_, &arena_); + ASSERT_EQ(1u, file_level_.num_files); + ASSERT_EQ(0, Compare()); +} + +TEST_F(GenerateLevelFilesBriefTest, Multiple) { + Add("150", "200"); + Add("200", "250"); + Add("300", "350"); + Add("400", "450"); + DoGenerateLevelFilesBrief(&file_level_, files_, &arena_); + ASSERT_EQ(4u, file_level_.num_files); + ASSERT_EQ(0, Compare()); +} + +class CountingLogger : public Logger { + public: + CountingLogger() : log_count(0) {} + using Logger::Logv; + void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; } + int log_count; +}; + +Options GetOptionsWithNumLevels(int num_levels, + std::shared_ptr<CountingLogger> logger) { + Options opt; + opt.num_levels = num_levels; + opt.info_log = logger; + return opt; +} + +class VersionStorageInfoTest : public testing::Test { + public: + const Comparator* ucmp_; + InternalKeyComparator icmp_; + std::shared_ptr<CountingLogger> logger_; + Options options_; + ImmutableCFOptions ioptions_; + MutableCFOptions mutable_cf_options_; + VersionStorageInfo vstorage_; + + InternalKey GetInternalKey(const char* ukey, + SequenceNumber smallest_seq = 100) { + return InternalKey(ukey, smallest_seq, kTypeValue); + } + + VersionStorageInfoTest() + : ucmp_(BytewiseComparator()), + icmp_(ucmp_), + logger_(new CountingLogger()), + options_(GetOptionsWithNumLevels(6, logger_)), + ioptions_(options_), + mutable_cf_options_(options_), + vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {} + + ~VersionStorageInfoTest() override { + for (int i = 0; i < vstorage_.num_levels(); i++) { + for (auto* f : vstorage_.LevelFiles(i)) { + if (--f->refs == 0) { + delete f; + } + } + } + } + + void Add(int level, uint32_t file_number, const char* smallest, + const char* largest, uint64_t file_size = 0) { + assert(level < vstorage_.num_levels()); + FileMetaData* f = new FileMetaData( + file_number, 0, file_size, GetInternalKey(smallest, 0), + GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0, + /* marked_for_compact */ false, kInvalidBlobFileNumber, + kUnknownOldestAncesterTime, kUnknownFileCreationTime, + kUnknownFileChecksum, kUnknownFileChecksumFuncName); + f->compensated_file_size = file_size; + vstorage_.AddFile(level, f); + } + + void Add(int level, uint32_t file_number, const InternalKey& smallest, + const InternalKey& largest, uint64_t file_size = 0) { + assert(level < vstorage_.num_levels()); + FileMetaData* f = new FileMetaData( + file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0, + /* largest_seq */ 0, /* marked_for_compact */ false, + kInvalidBlobFileNumber, kUnknownOldestAncesterTime, + kUnknownFileCreationTime, kUnknownFileChecksum, + kUnknownFileChecksumFuncName); + f->compensated_file_size = file_size; + vstorage_.AddFile(level, f); + } + + std::string GetOverlappingFiles(int level, const InternalKey& begin, + const InternalKey& end) { + std::vector<FileMetaData*> inputs; + vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs); + + std::string result; + for (size_t i = 0; i < inputs.size(); ++i) { + if (i > 0) { + result += ","; + } + AppendNumberTo(&result, inputs[i]->fd.GetNumber()); + } + return result; + } +}; + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) { + ioptions_.level_compaction_dynamic_level_bytes = false; + mutable_cf_options_.max_bytes_for_level_base = 10; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + Add(4, 100U, "1", "2"); + Add(5, 101U, "1", "2"); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U); + + ASSERT_EQ(0, logger_->log_count); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + Add(5, 1U, "1", "2", 500U); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.base_level(), 5); + + Add(5, 2U, "3", "4", 550U); + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U); + ASSERT_EQ(vstorage_.base_level(), 4); + + Add(4, 3U, "3", "4", 550U); + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U); + ASSERT_EQ(vstorage_.base_level(), 4); + + Add(3, 4U, "3", "4", 250U); + Add(3, 5U, "5", "7", 300U); + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(1, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U); + ASSERT_EQ(vstorage_.base_level(), 3); + + Add(1, 6U, "3", "4", 5U); + Add(1, 7U, "8", "9", 5U); + logger_->log_count = 0; + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(1, logger_->log_count); + ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U); + ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U); + ASSERT_EQ(vstorage_.base_level(), 1); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 100; + mutable_cf_options_.max_bytes_for_level_multiplier = 2; + Add(0, 1U, "1", "2", 50U); + Add(1, 2U, "1", "2", 50U); + Add(2, 3U, "1", "2", 500U); + Add(3, 4U, "1", "2", 500U); + Add(4, 5U, "1", "2", 1700U); + Add(5, 6U, "1", "2", 500U); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U); + ASSERT_EQ(vstorage_.base_level(), 1); + ASSERT_EQ(0, logger_->log_count); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) { + uint64_t kOneGB = 1000U * 1000U * 1000U; + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + Add(0, 1U, "1", "2", 50U); + Add(3, 4U, "1", "2", 32U * kOneGB); + Add(4, 5U, "1", "2", 500U * kOneGB); + Add(5, 6U, "1", "2", 3000U * kOneGB); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB); + ASSERT_EQ(vstorage_.base_level(), 2); + ASSERT_EQ(0, logger_->log_count); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 40000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + + Add(0, 1U, "1", "2", 10000U); + Add(0, 2U, "1", "2", 10000U); + Add(0, 3U, "1", "2", 10000U); + + Add(5, 4U, "1", "2", 1286250U); + Add(4, 5U, "1", "2", 200000U); + Add(3, 6U, "1", "2", 40000U); + Add(2, 7U, "1", "2", 8000U); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(2, vstorage_.base_level()); + // level multiplier should be 3.5 + ASSERT_EQ(vstorage_.level_multiplier(), 5.0); + // Level size should be around 30,000, 105,000, 367,500 + ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2)); + ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3)); + ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4)); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 10000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + + Add(0, 11U, "1", "2", 10000U); + Add(0, 12U, "1", "2", 10000U); + Add(0, 13U, "1", "2", 10000U); + + Add(5, 4U, "1", "2", 1286250U); + Add(4, 5U, "1", "2", 200000U); + Add(3, 6U, "1", "2", 40000U); + Add(2, 7U, "1", "2", 8000U); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(2, vstorage_.base_level()); + // level multiplier should be 3.5 + ASSERT_LT(vstorage_.level_multiplier(), 3.6); + ASSERT_GT(vstorage_.level_multiplier(), 3.4); + // Level size should be around 30,000, 105,000, 367,500 + ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2)); + ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U); + ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U); + ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U); + ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 10000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + + Add(0, 11U, "1", "2", 5000U); + Add(0, 12U, "1", "2", 5000U); + Add(0, 13U, "1", "2", 5000U); + Add(0, 14U, "1", "2", 5000U); + Add(0, 15U, "1", "2", 5000U); + Add(0, 16U, "1", "2", 5000U); + + Add(5, 4U, "1", "2", 1286250U); + Add(4, 5U, "1", "2", 200000U); + Add(3, 6U, "1", "2", 40000U); + Add(2, 7U, "1", "2", 8000U); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(2, vstorage_.base_level()); + // level multiplier should be 3.5 + ASSERT_LT(vstorage_.level_multiplier(), 3.6); + ASSERT_GT(vstorage_.level_multiplier(), 3.4); + // Level size should be around 30,000, 105,000, 367,500 + ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2)); + ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U); + ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U); + ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U); + ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U); +} + +TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) { + // Test whether the overlaps are detected as expected + Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level + Add(2, 2U, "3", "5", 1U); // Partial overlap with last level + Add(2, 3U, "6", "8", 1U); // Partial overlap with last level + Add(3, 4U, "1", "9", 1U); // Contains range of last level + Add(4, 5U, "4", "5", 1U); // Inside range of last level + Add(4, 5U, "6", "7", 1U); // Inside range of last level + Add(5, 6U, "4", "7", 10U); + ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize()); +} + +TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) { + Add(0, 1U, "9", "9", 1U); // Level 0 is not ordered + Add(0, 1U, "5", "6", 1U); // Ignored because of [5,6] in l1 + Add(1, 1U, "1", "2", 1U); // Ignored because of [2,3] in l2 + Add(1, 2U, "3", "4", 1U); // Ignored because of [2,3] in l2 + Add(1, 3U, "5", "6", 1U); + Add(2, 4U, "2", "3", 1U); + Add(3, 5U, "7", "8", 1U); + ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize()); +} + +TEST_F(VersionStorageInfoTest, GetOverlappingInputs) { + // Two files that overlap at the range deletion tombstone sentinel. + Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1); + Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1); + // Two files that overlap at the same user key. + Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1); + Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1); + // Two files that do not overlap. + Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1); + Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1); + vstorage_.UpdateNumNonEmptyLevels(); + vstorage_.GenerateLevelFilesBrief(); + + ASSERT_EQ("1,2", GetOverlappingFiles( + 1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue})); + ASSERT_EQ("1", GetOverlappingFiles( + 1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion})); + ASSERT_EQ("2", GetOverlappingFiles( + 1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue})); + ASSERT_EQ("3,4", GetOverlappingFiles( + 1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue})); + ASSERT_EQ("3", GetOverlappingFiles( + 1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion})); + ASSERT_EQ("3,4", GetOverlappingFiles( + 1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue})); + ASSERT_EQ("3,4", GetOverlappingFiles( + 1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue})); + ASSERT_EQ("5", GetOverlappingFiles( + 1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue})); + ASSERT_EQ("6", GetOverlappingFiles( + 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue})); +} + + +class FindLevelFileTest : public testing::Test { + public: + LevelFilesBrief file_level_; + bool disjoint_sorted_files_; + Arena arena_; + + FindLevelFileTest() : disjoint_sorted_files_(true) { } + + ~FindLevelFileTest() override {} + + void LevelFileInit(size_t num = 0) { + char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange)); + file_level_.files = new (mem)FdWithKeyRange[num]; + file_level_.num_files = 0; + } + + void Add(const char* smallest, const char* largest, + SequenceNumber smallest_seq = 100, + SequenceNumber largest_seq = 100) { + InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue); + InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue); + + Slice smallest_slice = smallest_key.Encode(); + Slice largest_slice = largest_key.Encode(); + + char* mem = arena_.AllocateAligned( + smallest_slice.size() + largest_slice.size()); + memcpy(mem, smallest_slice.data(), smallest_slice.size()); + memcpy(mem + smallest_slice.size(), largest_slice.data(), + largest_slice.size()); + + // add to file_level_ + size_t num = file_level_.num_files; + auto& file = file_level_.files[num]; + file.fd = FileDescriptor(num + 1, 0, 0); + file.smallest_key = Slice(mem, smallest_slice.size()); + file.largest_key = Slice(mem + smallest_slice.size(), + largest_slice.size()); + file_level_.num_files++; + } + + int Find(const char* key) { + InternalKey target(key, 100, kTypeValue); + InternalKeyComparator cmp(BytewiseComparator()); + return FindFile(cmp, file_level_, target.Encode()); + } + + bool Overlaps(const char* smallest, const char* largest) { + InternalKeyComparator cmp(BytewiseComparator()); + Slice s(smallest != nullptr ? smallest : ""); + Slice l(largest != nullptr ? largest : ""); + return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_, + (smallest != nullptr ? &s : nullptr), + (largest != nullptr ? &l : nullptr)); + } +}; + +TEST_F(FindLevelFileTest, LevelEmpty) { + LevelFileInit(0); + + ASSERT_EQ(0, Find("foo")); + ASSERT_TRUE(! Overlaps("a", "z")); + ASSERT_TRUE(! Overlaps(nullptr, "z")); + ASSERT_TRUE(! Overlaps("a", nullptr)); + ASSERT_TRUE(! Overlaps(nullptr, nullptr)); +} + +TEST_F(FindLevelFileTest, LevelSingle) { + LevelFileInit(1); + + Add("p", "q"); + ASSERT_EQ(0, Find("a")); + ASSERT_EQ(0, Find("p")); + ASSERT_EQ(0, Find("p1")); + ASSERT_EQ(0, Find("q")); + ASSERT_EQ(1, Find("q1")); + ASSERT_EQ(1, Find("z")); + + ASSERT_TRUE(! Overlaps("a", "b")); + ASSERT_TRUE(! Overlaps("z1", "z2")); + ASSERT_TRUE(Overlaps("a", "p")); + ASSERT_TRUE(Overlaps("a", "q")); + ASSERT_TRUE(Overlaps("a", "z")); + ASSERT_TRUE(Overlaps("p", "p1")); + ASSERT_TRUE(Overlaps("p", "q")); + ASSERT_TRUE(Overlaps("p", "z")); + ASSERT_TRUE(Overlaps("p1", "p2")); + ASSERT_TRUE(Overlaps("p1", "z")); + ASSERT_TRUE(Overlaps("q", "q")); + ASSERT_TRUE(Overlaps("q", "q1")); + + ASSERT_TRUE(! Overlaps(nullptr, "j")); + ASSERT_TRUE(! Overlaps("r", nullptr)); + ASSERT_TRUE(Overlaps(nullptr, "p")); + ASSERT_TRUE(Overlaps(nullptr, "p1")); + ASSERT_TRUE(Overlaps("q", nullptr)); + ASSERT_TRUE(Overlaps(nullptr, nullptr)); +} + +TEST_F(FindLevelFileTest, LevelMultiple) { + LevelFileInit(4); + + Add("150", "200"); + Add("200", "250"); + Add("300", "350"); + Add("400", "450"); + ASSERT_EQ(0, Find("100")); + ASSERT_EQ(0, Find("150")); + ASSERT_EQ(0, Find("151")); + ASSERT_EQ(0, Find("199")); + ASSERT_EQ(0, Find("200")); + ASSERT_EQ(1, Find("201")); + ASSERT_EQ(1, Find("249")); + ASSERT_EQ(1, Find("250")); + ASSERT_EQ(2, Find("251")); + ASSERT_EQ(2, Find("299")); + ASSERT_EQ(2, Find("300")); + ASSERT_EQ(2, Find("349")); + ASSERT_EQ(2, Find("350")); + ASSERT_EQ(3, Find("351")); + ASSERT_EQ(3, Find("400")); + ASSERT_EQ(3, Find("450")); + ASSERT_EQ(4, Find("451")); + + ASSERT_TRUE(! Overlaps("100", "149")); + ASSERT_TRUE(! Overlaps("251", "299")); + ASSERT_TRUE(! Overlaps("451", "500")); + ASSERT_TRUE(! Overlaps("351", "399")); + + ASSERT_TRUE(Overlaps("100", "150")); + ASSERT_TRUE(Overlaps("100", "200")); + ASSERT_TRUE(Overlaps("100", "300")); + ASSERT_TRUE(Overlaps("100", "400")); + ASSERT_TRUE(Overlaps("100", "500")); + ASSERT_TRUE(Overlaps("375", "400")); + ASSERT_TRUE(Overlaps("450", "450")); + ASSERT_TRUE(Overlaps("450", "500")); +} + +TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) { + LevelFileInit(4); + + Add("150", "200"); + Add("200", "250"); + Add("300", "350"); + Add("400", "450"); + ASSERT_TRUE(! Overlaps(nullptr, "149")); + ASSERT_TRUE(! Overlaps("451", nullptr)); + ASSERT_TRUE(Overlaps(nullptr, nullptr)); + ASSERT_TRUE(Overlaps(nullptr, "150")); + ASSERT_TRUE(Overlaps(nullptr, "199")); + ASSERT_TRUE(Overlaps(nullptr, "200")); + ASSERT_TRUE(Overlaps(nullptr, "201")); + ASSERT_TRUE(Overlaps(nullptr, "400")); + ASSERT_TRUE(Overlaps(nullptr, "800")); + ASSERT_TRUE(Overlaps("100", nullptr)); + ASSERT_TRUE(Overlaps("200", nullptr)); + ASSERT_TRUE(Overlaps("449", nullptr)); + ASSERT_TRUE(Overlaps("450", nullptr)); +} + +TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) { + LevelFileInit(1); + + Add("200", "200", 5000, 3000); + ASSERT_TRUE(! Overlaps("199", "199")); + ASSERT_TRUE(! Overlaps("201", "300")); + ASSERT_TRUE(Overlaps("200", "200")); + ASSERT_TRUE(Overlaps("190", "200")); + ASSERT_TRUE(Overlaps("200", "210")); +} + +TEST_F(FindLevelFileTest, LevelOverlappingFiles) { + LevelFileInit(2); + + Add("150", "600"); + Add("400", "500"); + disjoint_sorted_files_ = false; + ASSERT_TRUE(! Overlaps("100", "149")); + ASSERT_TRUE(! Overlaps("601", "700")); + ASSERT_TRUE(Overlaps("100", "150")); + ASSERT_TRUE(Overlaps("100", "200")); + ASSERT_TRUE(Overlaps("100", "300")); + ASSERT_TRUE(Overlaps("100", "400")); + ASSERT_TRUE(Overlaps("100", "500")); + ASSERT_TRUE(Overlaps("375", "400")); + ASSERT_TRUE(Overlaps("450", "450")); + ASSERT_TRUE(Overlaps("450", "500")); + ASSERT_TRUE(Overlaps("450", "700")); + ASSERT_TRUE(Overlaps("600", "700")); +} + +class VersionSetTestBase { + public: + const static std::string kColumnFamilyName1; + const static std::string kColumnFamilyName2; + const static std::string kColumnFamilyName3; + int num_initial_edits_; + + VersionSetTestBase() + : env_(Env::Default()), + fs_(std::make_shared<LegacyFileSystemWrapper>(env_)), + dbname_(test::PerThreadDBPath("version_set_test")), + db_options_(), + mutable_cf_options_(cf_options_), + table_cache_(NewLRUCache(50000, 16)), + write_buffer_manager_(db_options_.db_write_buffer_size), + shutting_down_(false), + mock_table_factory_(std::make_shared<mock::MockTableFactory>()) { + EXPECT_OK(env_->CreateDirIfMissing(dbname_)); + + db_options_.env = env_; + db_options_.fs = fs_; + versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, + table_cache_.get(), &write_buffer_manager_, + &write_controller_, + /*block_cache_tracer=*/nullptr)), + reactive_versions_ = std::make_shared<ReactiveVersionSet>( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_); + db_options_.db_paths.emplace_back(dbname_, + std::numeric_limits<uint64_t>::max()); + } + + void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families, + SequenceNumber* last_seqno, + std::unique_ptr<log::Writer>* log_writer) { + assert(column_families != nullptr); + assert(last_seqno != nullptr); + assert(log_writer != nullptr); + VersionEdit new_db; + if (db_options_.write_dbid_to_manifest) { + DBImpl* impl = new DBImpl(DBOptions(), dbname_); + std::string db_id; + impl->GetDbIdentityFromIdentityFile(&db_id); + new_db.SetDBId(db_id); + } + new_db.SetLogNumber(0); + new_db.SetNextFile(2); + new_db.SetLastSequence(0); + + const std::vector<std::string> cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + const int kInitialNumOfCfs = static_cast<int>(cf_names.size()); + autovector<VersionEdit> new_cfs; + uint64_t last_seq = 1; + uint32_t cf_id = 1; + for (int i = 1; i != kInitialNumOfCfs; ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + new_cf.SetLogNumber(0); + new_cf.SetNextFile(2); + new_cf.SetLastSequence(last_seq++); + new_cfs.emplace_back(new_cf); + } + *last_seqno = last_seq; + num_initial_edits_ = static_cast<int>(new_cfs.size() + 1); + const std::string manifest = DescriptorFileName(dbname_, 1); + std::unique_ptr<WritableFile> file; + Status s = env_->NewWritableFile( + manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( + NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_)); + { + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); + std::string record; + new_db.EncodeTo(&record); + s = (*log_writer)->AddRecord(record); + for (const auto& e : new_cfs) { + record.clear(); + e.EncodeTo(&record); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + } + ASSERT_OK(s); + + cf_options_.table_factory = mock_table_factory_; + for (const auto& cf_name : cf_names) { + column_families->emplace_back(cf_name, cf_options_); + } + } + + // Create DB with 3 column families. + void NewDB() { + std::vector<ColumnFamilyDescriptor> column_families; + SequenceNumber last_seqno; + std::unique_ptr<log::Writer> log_writer; + SetIdentityFile(env_, dbname_); + PrepareManifest(&column_families, &last_seqno, &log_writer); + log_writer.reset(); + // Make "CURRENT" file point to the new manifest file. + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + + EXPECT_OK(versions_->Recover(column_families, false)); + EXPECT_EQ(column_families.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + } + + Env* env_; + std::shared_ptr<FileSystem> fs_; + const std::string dbname_; + EnvOptions env_options_; + ImmutableDBOptions db_options_; + ColumnFamilyOptions cf_options_; + MutableCFOptions mutable_cf_options_; + std::shared_ptr<Cache> table_cache_; + WriteController write_controller_; + WriteBufferManager write_buffer_manager_; + std::shared_ptr<VersionSet> versions_; + std::shared_ptr<ReactiveVersionSet> reactive_versions_; + InstrumentedMutex mutex_; + std::atomic<bool> shutting_down_; + std::shared_ptr<mock::MockTableFactory> mock_table_factory_; +}; + +const std::string VersionSetTestBase::kColumnFamilyName1 = "alice"; +const std::string VersionSetTestBase::kColumnFamilyName2 = "bob"; +const std::string VersionSetTestBase::kColumnFamilyName3 = "charles"; + +class VersionSetTest : public VersionSetTestBase, public testing::Test { + public: + VersionSetTest() : VersionSetTestBase() {} +}; + +TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { + NewDB(); + const int kGroupSize = 5; + autovector<VersionEdit> edits; + for (int i = 0; i != kGroupSize; ++i) { + edits.emplace_back(VersionEdit()); + } + autovector<ColumnFamilyData*> cfds; + autovector<const MutableCFOptions*> all_mutable_cf_options; + autovector<autovector<VersionEdit*>> edit_lists; + for (int i = 0; i != kGroupSize; ++i) { + cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault()); + all_mutable_cf_options.emplace_back(&mutable_cf_options_); + autovector<VersionEdit*> edit_list; + edit_list.emplace_back(&edits[i]); + edit_lists.emplace_back(edit_list); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + int count = 0; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) { + uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg); + EXPECT_EQ(0u, *cf_id); + ++count; + }); + SyncPoint::GetInstance()->EnableProcessing(); + mutex_.Lock(); + Status s = + versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_); + mutex_.Unlock(); + EXPECT_OK(s); + EXPECT_EQ(kGroupSize - 1, count); +} + +class VersionSetAtomicGroupTest : public VersionSetTestBase, + public testing::Test { + public: + VersionSetAtomicGroupTest() : VersionSetTestBase() {} + + void SetUp() override { + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + SetupTestSyncPoints(); + } + + void SetupValidAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + edits_[i].MarkAtomicGroup(--remaining); + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + } + + void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + edits_[i].MarkAtomicGroup(--remaining); + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + } + + void SetupCorruptedAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + if (i != ((size_t)atomic_group_size / 2)) { + edits_[i].MarkAtomicGroup(--remaining); + } + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + } + + void SetupIncorrectAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + if (i != 1) { + edits_[i].MarkAtomicGroup(--remaining); + } else { + edits_[i].MarkAtomicGroup(remaining--); + } + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + } + + void SetupTestSyncPoints() { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); + EXPECT_EQ(edits_.front().DebugString(), + e->DebugString()); // compare based on value + first_in_atomic_group_ = true; + }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); + EXPECT_EQ(edits_.back().DebugString(), + e->DebugString()); // compare based on value + EXPECT_TRUE(first_in_atomic_group_); + last_in_atomic_group_ = true; + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) { + num_recovered_edits_ = *reinterpret_cast<int*>(arg); + }); + SyncPoint::GetInstance()->SetCallBack( + "ReactiveVersionSet::ReadAndApply:AppliedEdits", + [&](void* arg) { num_applied_edits_ = *reinterpret_cast<int*>(arg); }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:AtomicGroup", + [&](void* /* arg */) { ++num_edits_in_atomic_group_; }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", + [&](void* arg) { + corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg); + }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", + [&](void* arg) { + edit_with_incorrect_group_size_ = + *reinterpret_cast<VersionEdit*>(arg); + }); + SyncPoint::GetInstance()->EnableProcessing(); + } + + void AddNewEditsToLog(int num_edits) { + for (int i = 0; i < num_edits; i++) { + std::string record; + edits_[i].EncodeTo(&record); + ASSERT_OK(log_writer_->AddRecord(record)); + } + } + + void TearDown() override { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + log_writer_.reset(); + } + + protected: + std::vector<ColumnFamilyDescriptor> column_families_; + SequenceNumber last_seqno_; + std::vector<VersionEdit> edits_; + bool first_in_atomic_group_ = false; + bool last_in_atomic_group_ = false; + int num_edits_in_atomic_group_ = 0; + int num_recovered_edits_ = 0; + int num_applied_edits_ = 0; + VersionEdit corrupted_edit_; + VersionEdit edit_with_incorrect_group_size_; + std::unique_ptr<log::Writer> log_writer_; +}; + +TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) { + const int kAtomicGroupSize = 3; + SetupValidAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_OK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_); + EXPECT_EQ(0, num_applied_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleValidAtomicGroupWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 3; + SetupValidAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + // The recover should clean up the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_); + EXPECT_EQ(0, num_applied_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 3; + SetupValidAtomicGroup(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + AddNewEditsToLog(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + // The recover should clean up the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + EXPECT_EQ(kAtomicGroupSize, num_applied_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) { + const int kAtomicGroupSize = 4; + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + EXPECT_OK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + EXPECT_EQ(0, num_applied_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 4; + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + // Reactive version set should store the edits in the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == + kNumberOfPersistedVersionEdits); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize); + // Write the last record. The reactive version set should now apply all + // edits. + std::string last_record; + edits_[kAtomicGroupSize - 1].EncodeTo(&last_record); + EXPECT_OK(log_writer_->AddRecord(last_record)); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + // Reactive version set should be empty now. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + EXPECT_EQ(kAtomicGroupSize, num_applied_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 4; + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + // No edits in an atomic group. + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + // Write a few edits in an atomic group. + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + // Reactive version set should store the edits in the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == + kNumberOfPersistedVersionEdits); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + EXPECT_EQ(0, num_applied_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_NOK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + // Write the corrupted edits. + AddNewEditsToLog(kAtomicGroupSize); + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_NOK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + AddNewEditsToLog(kAtomicGroupSize); + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); +} + +class VersionSetTestDropOneCF : public VersionSetTestBase, + public testing::TestWithParam<std::string> { + public: + VersionSetTestDropOneCF() : VersionSetTestBase() {} +}; + +// This test simulates the following execution sequence +// Time thread1 bg_flush_thr +// | Prepare version edits (e1,e2,e3) for atomic +// | flush cf1, cf2, cf3 +// | Enqueue e to drop cfi +// | to manifest_writers_ +// | Enqueue (e1,e2,e3) to manifest_writers_ +// | +// | Apply e, +// | cfi.IsDropped() is true +// | Apply (e1,e2,e3), +// | since cfi.IsDropped() == true, we need to +// | drop ei and write the rest to MANIFEST. +// V +// +// Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and +// last column family in an atomic group. +TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) { + std::vector<ColumnFamilyDescriptor> column_families; + SequenceNumber last_seqno; + std::unique_ptr<log::Writer> log_writer; + PrepareManifest(&column_families, &last_seqno, &log_writer); + Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + ASSERT_OK(s); + + EXPECT_OK(versions_->Recover(column_families, false /* read_only */)); + EXPECT_EQ(column_families.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + + const int kAtomicGroupSize = 3; + const std::vector<std::string> non_default_cf_names = { + kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3}; + + // Drop one column family + VersionEdit drop_cf_edit; + drop_cf_edit.DropColumnFamily(); + const std::string cf_to_drop_name(GetParam()); + auto cfd_to_drop = + versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name); + ASSERT_NE(nullptr, cfd_to_drop); + // Increase its refcount because cfd_to_drop is used later, and we need to + // prevent it from being deleted. + cfd_to_drop->Ref(); + drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID()); + mutex_.Lock(); + s = versions_->LogAndApply(cfd_to_drop, + *cfd_to_drop->GetLatestMutableCFOptions(), + &drop_cf_edit, &mutex_); + mutex_.Unlock(); + ASSERT_OK(s); + + std::vector<VersionEdit> edits(kAtomicGroupSize); + uint32_t remaining = kAtomicGroupSize; + size_t i = 0; + autovector<ColumnFamilyData*> cfds; + autovector<const MutableCFOptions*> mutable_cf_options_list; + autovector<autovector<VersionEdit*>> edit_lists; + for (const auto& cf_name : non_default_cf_names) { + auto cfd = (cf_name != cf_to_drop_name) + ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name) + : cfd_to_drop; + ASSERT_NE(nullptr, cfd); + cfds.push_back(cfd); + mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions()); + edits[i].SetColumnFamily(cfd->GetID()); + edits[i].SetLogNumber(0); + edits[i].SetNextFile(2); + edits[i].MarkAtomicGroup(--remaining); + edits[i].SetLastSequence(last_seqno++); + autovector<VersionEdit*> tmp_edits; + tmp_edits.push_back(&edits[i]); + edit_lists.emplace_back(tmp_edits); + ++i; + } + int called = 0; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) { + std::vector<VersionEdit*>* tmp_edits = + reinterpret_cast<std::vector<VersionEdit*>*>(arg); + EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size()); + for (const auto e : *tmp_edits) { + bool found = false; + for (const auto& e2 : edits) { + if (&e2 == e) { + found = true; + break; + } + } + ASSERT_TRUE(found); + } + ++called; + }); + SyncPoint::GetInstance()->EnableProcessing(); + mutex_.Lock(); + s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists, + &mutex_); + mutex_.Unlock(); + ASSERT_OK(s); + ASSERT_EQ(1, called); + if (cfd_to_drop->Unref()) { + delete cfd_to_drop; + cfd_to_drop = nullptr; + } +} + +INSTANTIATE_TEST_CASE_P( + AtomicGroup, VersionSetTestDropOneCF, + testing::Values(VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3)); +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |