diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/version_set_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/version_set_test.cc')
-rw-r--r-- | src/rocksdb/db/version_set_test.cc | 3587 |
1 files changed, 3587 insertions, 0 deletions
diff --git a/src/rocksdb/db/version_set_test.cc b/src/rocksdb/db/version_set_test.cc new file mode 100644 index 000000000..7d17406c1 --- /dev/null +++ b/src/rocksdb/db/version_set_test.cc @@ -0,0 +1,3587 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/version_set.h" + +#include <algorithm> + +#include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" +#include "db/log_writer.h" +#include "rocksdb/advanced_options.h" +#include "rocksdb/convenience.h" +#include "rocksdb/file_system.h" +#include "table/block_based/block_based_table_factory.h" +#include "table/mock_table.h" +#include "table/unique_id_impl.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +class GenerateLevelFilesBriefTest : public testing::Test { + public: + std::vector<FileMetaData*> files_; + LevelFilesBrief file_level_; + Arena arena_; + + GenerateLevelFilesBriefTest() {} + + ~GenerateLevelFilesBriefTest() override { + for (size_t i = 0; i < files_.size(); i++) { + delete files_[i]; + } + } + + void Add(const char* smallest, const char* largest, + SequenceNumber smallest_seq = 100, + SequenceNumber largest_seq = 100) { + FileMetaData* f = new FileMetaData( + files_.size() + 1, 0, 0, + InternalKey(smallest, smallest_seq, kTypeValue), + InternalKey(largest, largest_seq, kTypeValue), smallest_seq, + largest_seq, /* marked_for_compact */ false, Temperature::kUnknown, + kInvalidBlobFileNumber, kUnknownOldestAncesterTime, + kUnknownFileCreationTime, kUnknownFileChecksum, + kUnknownFileChecksumFuncName, kNullUniqueId64x2); + files_.push_back(f); + } + + int Compare() { + int diff = 0; + for (size_t i = 0; i < files_.size(); i++) { + if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) { + diff++; + } + } + return diff; + } +}; + +TEST_F(GenerateLevelFilesBriefTest, Empty) { + DoGenerateLevelFilesBrief(&file_level_, files_, &arena_); + ASSERT_EQ(0u, file_level_.num_files); + ASSERT_EQ(0, Compare()); +} + +TEST_F(GenerateLevelFilesBriefTest, Single) { + Add("p", "q"); + DoGenerateLevelFilesBrief(&file_level_, files_, &arena_); + ASSERT_EQ(1u, file_level_.num_files); + ASSERT_EQ(0, Compare()); +} + +TEST_F(GenerateLevelFilesBriefTest, Multiple) { + Add("150", "200"); + Add("200", "250"); + Add("300", "350"); + Add("400", "450"); + DoGenerateLevelFilesBrief(&file_level_, files_, &arena_); + ASSERT_EQ(4u, file_level_.num_files); + ASSERT_EQ(0, Compare()); +} + +class CountingLogger : public Logger { + public: + CountingLogger() : log_count(0) {} + using Logger::Logv; + void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; } + int log_count; +}; + +Options GetOptionsWithNumLevels(int num_levels, + std::shared_ptr<CountingLogger> logger) { + Options opt; + opt.num_levels = num_levels; + opt.info_log = logger; + return opt; +} + +class VersionStorageInfoTestBase : public testing::Test { + public: + const Comparator* ucmp_; + InternalKeyComparator icmp_; + std::shared_ptr<CountingLogger> logger_; + Options options_; + ImmutableOptions ioptions_; + MutableCFOptions mutable_cf_options_; + VersionStorageInfo vstorage_; + + InternalKey GetInternalKey(const char* ukey, + SequenceNumber smallest_seq = 100) { + return InternalKey(ukey, smallest_seq, kTypeValue); + } + + explicit VersionStorageInfoTestBase(const Comparator* ucmp) + : ucmp_(ucmp), + icmp_(ucmp_), + logger_(new CountingLogger()), + options_(GetOptionsWithNumLevels(6, logger_)), + ioptions_(options_), + mutable_cf_options_(options_), + vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, + /*src_vstorage=*/nullptr, + /*_force_consistency_checks=*/false) {} + + ~VersionStorageInfoTestBase() override { + for (int i = 0; i < vstorage_.num_levels(); ++i) { + for (auto* f : vstorage_.LevelFiles(i)) { + if (--f->refs == 0) { + delete f; + } + } + } + } + + void Add(int level, uint32_t file_number, const char* smallest, + const char* largest, uint64_t file_size = 0, + uint64_t oldest_blob_file_number = kInvalidBlobFileNumber) { + constexpr SequenceNumber dummy_seq = 0; + + Add(level, file_number, GetInternalKey(smallest, dummy_seq), + GetInternalKey(largest, dummy_seq), file_size, oldest_blob_file_number); + } + + void Add(int level, uint32_t file_number, const InternalKey& smallest, + const InternalKey& largest, uint64_t file_size = 0, + uint64_t oldest_blob_file_number = kInvalidBlobFileNumber) { + assert(level < vstorage_.num_levels()); + FileMetaData* f = new FileMetaData( + file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0, + /* largest_seq */ 0, /* marked_for_compact */ false, + Temperature::kUnknown, oldest_blob_file_number, + kUnknownOldestAncesterTime, kUnknownFileCreationTime, + kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2); + f->compensated_file_size = file_size; + vstorage_.AddFile(level, f); + } + + void AddBlob(uint64_t blob_file_number, uint64_t total_blob_count, + uint64_t total_blob_bytes, + BlobFileMetaData::LinkedSsts linked_ssts, + uint64_t garbage_blob_count, uint64_t garbage_blob_bytes) { + auto shared_meta = SharedBlobFileMetaData::Create( + blob_file_number, total_blob_count, total_blob_bytes, + /* checksum_method */ std::string(), + /* checksum_value */ std::string()); + auto meta = + BlobFileMetaData::Create(std::move(shared_meta), std::move(linked_ssts), + garbage_blob_count, garbage_blob_bytes); + + vstorage_.AddBlobFile(std::move(meta)); + } + + void UpdateVersionStorageInfo() { + vstorage_.PrepareForVersionAppend(ioptions_, mutable_cf_options_); + vstorage_.SetFinalized(); + } + + std::string GetOverlappingFiles(int level, const InternalKey& begin, + const InternalKey& end) { + std::vector<FileMetaData*> inputs; + vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs); + + std::string result; + for (size_t i = 0; i < inputs.size(); ++i) { + if (i > 0) { + result += ","; + } + AppendNumberTo(&result, inputs[i]->fd.GetNumber()); + } + return result; + } +}; + +class VersionStorageInfoTest : public VersionStorageInfoTestBase { + public: + VersionStorageInfoTest() : VersionStorageInfoTestBase(BytewiseComparator()) {} + + ~VersionStorageInfoTest() override {} +}; + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) { + ioptions_.level_compaction_dynamic_level_bytes = false; + mutable_cf_options_.max_bytes_for_level_base = 10; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + + Add(4, 100U, "1", "2", 100U); + Add(5, 101U, "1", "2", 100U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U); + + ASSERT_EQ(0, logger_->log_count); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_1) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + + Add(5, 1U, "1", "2", 500U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.base_level(), 5); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_2) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + + Add(5, 1U, "1", "2", 500U); + Add(5, 2U, "3", "4", 550U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U); + ASSERT_EQ(vstorage_.base_level(), 4); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_3) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + + Add(5, 1U, "1", "2", 500U); + Add(5, 2U, "3", "4", 550U); + Add(4, 3U, "3", "4", 550U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U); + ASSERT_EQ(vstorage_.base_level(), 4); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_4) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + + Add(5, 1U, "1", "2", 500U); + Add(5, 2U, "3", "4", 550U); + Add(4, 3U, "3", "4", 550U); + Add(3, 4U, "3", "4", 250U); + Add(3, 5U, "5", "7", 300U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(1, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U); + ASSERT_EQ(vstorage_.base_level(), 3); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic_5) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + + Add(5, 1U, "1", "2", 500U); + Add(5, 2U, "3", "4", 550U); + Add(4, 3U, "3", "4", 550U); + Add(3, 4U, "3", "4", 250U); + Add(3, 5U, "5", "7", 300U); + Add(1, 6U, "3", "4", 5U); + Add(1, 7U, "8", "9", 5U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(1, logger_->log_count); + ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U); + ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U); + ASSERT_EQ(vstorage_.base_level(), 1); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 100; + mutable_cf_options_.max_bytes_for_level_multiplier = 2; + + Add(0, 1U, "1", "2", 50U); + Add(1, 2U, "1", "2", 50U); + Add(2, 3U, "1", "2", 500U); + Add(3, 4U, "1", "2", 500U); + Add(4, 5U, "1", "2", 1700U); + Add(5, 6U, "1", "2", 500U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U); + ASSERT_EQ(vstorage_.base_level(), 1); + ASSERT_EQ(0, logger_->log_count); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) { + uint64_t kOneGB = 1000U * 1000U * 1000U; + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + + Add(0, 1U, "1", "2", 50U); + Add(3, 4U, "1", "2", 32U * kOneGB); + Add(4, 5U, "1", "2", 500U * kOneGB); + Add(5, 6U, "1", "2", 3000U * kOneGB); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB); + ASSERT_EQ(vstorage_.base_level(), 2); + ASSERT_EQ(0, logger_->log_count); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 40000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + + Add(0, 1U, "1", "2", 10000U); + Add(0, 2U, "1", "2", 10000U); + Add(0, 3U, "1", "2", 10000U); + + Add(5, 4U, "1", "2", 1286250U); + Add(4, 5U, "1", "2", 200000U); + Add(3, 6U, "1", "2", 40000U); + Add(2, 7U, "1", "2", 8000U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(2, vstorage_.base_level()); + // level multiplier should be 3.5 + ASSERT_EQ(vstorage_.level_multiplier(), 5.0); + ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2)); + ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3)); + ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4)); + + vstorage_.ComputeCompactionScore(ioptions_, mutable_cf_options_); + // Only L0 hits compaction. + ASSERT_EQ(vstorage_.CompactionScoreLevel(0), 0); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 10000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + mutable_cf_options_.level0_file_num_compaction_trigger = 4; + + Add(0, 11U, "1", "2", 10000U); + Add(0, 12U, "1", "2", 10000U); + Add(0, 13U, "1", "2", 10000U); + + // Level size should be around 10,000, 10,290, 51,450, 257,250 + Add(5, 4U, "1", "2", 1286250U); + Add(4, 5U, "1", "2", 258000U); // unadjusted score 1.003 + Add(3, 6U, "1", "2", 53000U); // unadjusted score 1.03 + Add(2, 7U, "1", "2", 20000U); // unadjusted score 1.94 + + UpdateVersionStorageInfo(); + + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(1, vstorage_.base_level()); + ASSERT_EQ(10000U, vstorage_.MaxBytesForLevel(1)); + ASSERT_EQ(10290U, vstorage_.MaxBytesForLevel(2)); + ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3)); + ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4)); + + vstorage_.ComputeCompactionScore(ioptions_, mutable_cf_options_); + // Although L2 and l3 have higher unadjusted compaction score, considering + // a relatively large L0 being compacted down soon, L4 is picked up for + // compaction. + // L0 is still picked up for oversizing. + ASSERT_EQ(0, vstorage_.CompactionScoreLevel(0)); + ASSERT_EQ(4, vstorage_.CompactionScoreLevel(1)); +} + +TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 20000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + mutable_cf_options_.level0_file_num_compaction_trigger = 5; + + Add(0, 11U, "1", "2", 2500U); + Add(0, 12U, "1", "2", 2500U); + Add(0, 13U, "1", "2", 2500U); + Add(0, 14U, "1", "2", 2500U); + + // Level size should be around 20,000, 53000, 258000 + Add(5, 4U, "1", "2", 1286250U); + Add(4, 5U, "1", "2", 260000U); // Unadjusted score 1.01, adjusted about 4.3 + Add(3, 6U, "1", "2", 85000U); // Unadjusted score 1.42, adjusted about 11.6 + Add(2, 7U, "1", "2", 30000); // Unadjusted score 1.5, adjusted about 10.0 + + UpdateVersionStorageInfo(); + + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(2, vstorage_.base_level()); + ASSERT_EQ(20000U, vstorage_.MaxBytesForLevel(2)); + + vstorage_.ComputeCompactionScore(ioptions_, mutable_cf_options_); + // Although L2 has higher unadjusted compaction score, considering + // a relatively large L0 being compacted down soon, L3 is picked up for + // compaction. + + ASSERT_EQ(3, vstorage_.CompactionScoreLevel(0)); + ASSERT_EQ(2, vstorage_.CompactionScoreLevel(1)); + ASSERT_EQ(4, vstorage_.CompactionScoreLevel(2)); +} + +TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) { + // Test whether the overlaps are detected as expected + Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level + Add(2, 2U, "3", "5", 1U); // Partial overlap with last level + Add(2, 3U, "6", "8", 1U); // Partial overlap with last level + Add(3, 4U, "1", "9", 1U); // Contains range of last level + Add(4, 5U, "4", "5", 1U); // Inside range of last level + Add(4, 6U, "6", "7", 1U); // Inside range of last level + Add(5, 7U, "4", "7", 10U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize()); +} + +TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) { + Add(0, 1U, "9", "9", 1U); // Level 0 is not ordered + Add(0, 2U, "5", "6", 1U); // Ignored because of [5,6] in l1 + Add(1, 3U, "1", "2", 1U); // Ignored because of [2,3] in l2 + Add(1, 4U, "3", "4", 1U); // Ignored because of [2,3] in l2 + Add(1, 5U, "5", "6", 1U); + Add(2, 6U, "2", "3", 1U); + Add(3, 7U, "7", "8", 1U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize()); +} + +TEST_F(VersionStorageInfoTest, GetOverlappingInputs) { + // Two files that overlap at the range deletion tombstone sentinel. + Add(1, 1U, {"a", 0, kTypeValue}, + {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1); + Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1); + // Two files that overlap at the same user key. + Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1); + Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1); + // Two files that do not overlap. + Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1); + Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1); + + UpdateVersionStorageInfo(); + + ASSERT_EQ("1,2", + GetOverlappingFiles(1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue})); + ASSERT_EQ("1", + GetOverlappingFiles(1, {"a", 0, kTypeValue}, + {"b", kMaxSequenceNumber, kTypeRangeDeletion})); + ASSERT_EQ("2", GetOverlappingFiles(1, {"b", kMaxSequenceNumber, kTypeValue}, + {"c", 0, kTypeValue})); + ASSERT_EQ("3,4", + GetOverlappingFiles(1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue})); + ASSERT_EQ("3", + GetOverlappingFiles(1, {"d", 0, kTypeValue}, + {"e", kMaxSequenceNumber, kTypeRangeDeletion})); + ASSERT_EQ("3,4", GetOverlappingFiles(1, {"e", kMaxSequenceNumber, kTypeValue}, + {"f", 0, kTypeValue})); + ASSERT_EQ("3,4", + GetOverlappingFiles(1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue})); + ASSERT_EQ("5", + GetOverlappingFiles(1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue})); + ASSERT_EQ("6", + GetOverlappingFiles(1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue})); +} + +TEST_F(VersionStorageInfoTest, FileLocationAndMetaDataByNumber) { + Add(0, 11U, "1", "2", 5000U); + Add(0, 12U, "1", "2", 5000U); + + Add(2, 7U, "1", "2", 8000U); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(vstorage_.GetFileLocation(11U), + VersionStorageInfo::FileLocation(0, 0)); + ASSERT_NE(vstorage_.GetFileMetaDataByNumber(11U), nullptr); + + ASSERT_EQ(vstorage_.GetFileLocation(12U), + VersionStorageInfo::FileLocation(0, 1)); + ASSERT_NE(vstorage_.GetFileMetaDataByNumber(12U), nullptr); + + ASSERT_EQ(vstorage_.GetFileLocation(7U), + VersionStorageInfo::FileLocation(2, 0)); + ASSERT_NE(vstorage_.GetFileMetaDataByNumber(7U), nullptr); + + ASSERT_FALSE(vstorage_.GetFileLocation(999U).IsValid()); + ASSERT_EQ(vstorage_.GetFileMetaDataByNumber(999U), nullptr); +} + +TEST_F(VersionStorageInfoTest, ForcedBlobGCEmpty) { + // No SST or blob files in VersionStorageInfo + UpdateVersionStorageInfo(); + + constexpr double age_cutoff = 0.5; + constexpr double force_threshold = 0.75; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty()); +} + +TEST_F(VersionStorageInfoTest, ForcedBlobGCSingleBatch) { + // Test the edge case when all blob files are part of the oldest batch. + // We have one L0 SST file #1, and four blob files #10, #11, #12, and #13. + // The oldest blob file used by SST #1 is blob file #10. + + constexpr int level = 0; + + constexpr uint64_t sst = 1; + + constexpr uint64_t first_blob = 10; + constexpr uint64_t second_blob = 11; + constexpr uint64_t third_blob = 12; + constexpr uint64_t fourth_blob = 13; + + { + constexpr char smallest[] = "bar1"; + constexpr char largest[] = "foo1"; + constexpr uint64_t file_size = 1000; + + Add(level, sst, smallest, largest, file_size, first_blob); + } + + { + constexpr uint64_t total_blob_count = 10; + constexpr uint64_t total_blob_bytes = 100000; + constexpr uint64_t garbage_blob_count = 2; + constexpr uint64_t garbage_blob_bytes = 15000; + + AddBlob(first_blob, total_blob_count, total_blob_bytes, + BlobFileMetaData::LinkedSsts{sst}, garbage_blob_count, + garbage_blob_bytes); + } + + { + constexpr uint64_t total_blob_count = 4; + constexpr uint64_t total_blob_bytes = 400000; + constexpr uint64_t garbage_blob_count = 3; + constexpr uint64_t garbage_blob_bytes = 235000; + + AddBlob(second_blob, total_blob_count, total_blob_bytes, + BlobFileMetaData::LinkedSsts{}, garbage_blob_count, + garbage_blob_bytes); + } + + { + constexpr uint64_t total_blob_count = 20; + constexpr uint64_t total_blob_bytes = 1000000; + constexpr uint64_t garbage_blob_count = 8; + constexpr uint64_t garbage_blob_bytes = 400000; + + AddBlob(third_blob, total_blob_count, total_blob_bytes, + BlobFileMetaData::LinkedSsts{}, garbage_blob_count, + garbage_blob_bytes); + } + + { + constexpr uint64_t total_blob_count = 128; + constexpr uint64_t total_blob_bytes = 1000000; + constexpr uint64_t garbage_blob_count = 67; + constexpr uint64_t garbage_blob_bytes = 600000; + + AddBlob(fourth_blob, total_blob_count, total_blob_bytes, + BlobFileMetaData::LinkedSsts{}, garbage_blob_count, + garbage_blob_bytes); + } + + UpdateVersionStorageInfo(); + + assert(vstorage_.num_levels() > 0); + const auto& level_files = vstorage_.LevelFiles(level); + + assert(level_files.size() == 1); + assert(level_files[0] && level_files[0]->fd.GetNumber() == sst); + + // No blob files eligible for GC due to the age cutoff + + { + constexpr double age_cutoff = 0.1; + constexpr double force_threshold = 0.0; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty()); + } + + // Part of the oldest batch of blob files (specifically, #12 and #13) is + // ineligible for GC due to the age cutoff + + { + constexpr double age_cutoff = 0.5; + constexpr double force_threshold = 0.0; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty()); + } + + // Oldest batch is eligible based on age cutoff but its overall garbage ratio + // is below threshold + + { + constexpr double age_cutoff = 1.0; + constexpr double force_threshold = 0.6; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty()); + } + + // Oldest batch is eligible based on age cutoff and its overall garbage ratio + // meets threshold + + { + constexpr double age_cutoff = 1.0; + constexpr double force_threshold = 0.5; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + auto ssts_to_be_compacted = vstorage_.FilesMarkedForForcedBlobGC(); + ASSERT_EQ(ssts_to_be_compacted.size(), 1); + + const autovector<std::pair<int, FileMetaData*>> + expected_ssts_to_be_compacted{{level, level_files[0]}}; + + ASSERT_EQ(ssts_to_be_compacted[0], expected_ssts_to_be_compacted[0]); + } +} + +TEST_F(VersionStorageInfoTest, ForcedBlobGCMultipleBatches) { + // Add three L0 SSTs (1, 2, and 3) and four blob files (10, 11, 12, and 13). + // The first two SSTs have the same oldest blob file, namely, the very oldest + // one (10), while the third SST's oldest blob file reference points to the + // third blob file (12). Thus, the oldest batch of blob files contains the + // first two blob files 10 and 11, and assuming they are eligible for GC based + // on the age cutoff, compacting away the SSTs 1 and 2 will eliminate them. + + constexpr int level = 0; + + constexpr uint64_t first_sst = 1; + constexpr uint64_t second_sst = 2; + constexpr uint64_t third_sst = 3; + + constexpr uint64_t first_blob = 10; + constexpr uint64_t second_blob = 11; + constexpr uint64_t third_blob = 12; + constexpr uint64_t fourth_blob = 13; + + { + constexpr char smallest[] = "bar1"; + constexpr char largest[] = "foo1"; + constexpr uint64_t file_size = 1000; + + Add(level, first_sst, smallest, largest, file_size, first_blob); + } + + { + constexpr char smallest[] = "bar2"; + constexpr char largest[] = "foo2"; + constexpr uint64_t file_size = 2000; + + Add(level, second_sst, smallest, largest, file_size, first_blob); + } + + { + constexpr char smallest[] = "bar3"; + constexpr char largest[] = "foo3"; + constexpr uint64_t file_size = 3000; + + Add(level, third_sst, smallest, largest, file_size, third_blob); + } + + { + constexpr uint64_t total_blob_count = 10; + constexpr uint64_t total_blob_bytes = 100000; + constexpr uint64_t garbage_blob_count = 2; + constexpr uint64_t garbage_blob_bytes = 15000; + + AddBlob(first_blob, total_blob_count, total_blob_bytes, + BlobFileMetaData::LinkedSsts{first_sst, second_sst}, + garbage_blob_count, garbage_blob_bytes); + } + + { + constexpr uint64_t total_blob_count = 4; + constexpr uint64_t total_blob_bytes = 400000; + constexpr uint64_t garbage_blob_count = 3; + constexpr uint64_t garbage_blob_bytes = 235000; + + AddBlob(second_blob, total_blob_count, total_blob_bytes, + BlobFileMetaData::LinkedSsts{}, garbage_blob_count, + garbage_blob_bytes); + } + + { + constexpr uint64_t total_blob_count = 20; + constexpr uint64_t total_blob_bytes = 1000000; + constexpr uint64_t garbage_blob_count = 8; + constexpr uint64_t garbage_blob_bytes = 123456; + + AddBlob(third_blob, total_blob_count, total_blob_bytes, + BlobFileMetaData::LinkedSsts{third_sst}, garbage_blob_count, + garbage_blob_bytes); + } + + { + constexpr uint64_t total_blob_count = 128; + constexpr uint64_t total_blob_bytes = 789012345; + constexpr uint64_t garbage_blob_count = 67; + constexpr uint64_t garbage_blob_bytes = 88888888; + + AddBlob(fourth_blob, total_blob_count, total_blob_bytes, + BlobFileMetaData::LinkedSsts{}, garbage_blob_count, + garbage_blob_bytes); + } + + UpdateVersionStorageInfo(); + + assert(vstorage_.num_levels() > 0); + const auto& level_files = vstorage_.LevelFiles(level); + + assert(level_files.size() == 3); + assert(level_files[0] && level_files[0]->fd.GetNumber() == first_sst); + assert(level_files[1] && level_files[1]->fd.GetNumber() == second_sst); + assert(level_files[2] && level_files[2]->fd.GetNumber() == third_sst); + + // No blob files eligible for GC due to the age cutoff + + { + constexpr double age_cutoff = 0.1; + constexpr double force_threshold = 0.0; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty()); + } + + // Part of the oldest batch of blob files (specifically, the second file) is + // ineligible for GC due to the age cutoff + + { + constexpr double age_cutoff = 0.25; + constexpr double force_threshold = 0.0; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty()); + } + + // Oldest batch is eligible based on age cutoff but its overall garbage ratio + // is below threshold + + { + constexpr double age_cutoff = 0.5; + constexpr double force_threshold = 0.6; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty()); + } + + // Oldest batch is eligible based on age cutoff and its overall garbage ratio + // meets threshold + + { + constexpr double age_cutoff = 0.5; + constexpr double force_threshold = 0.5; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + auto ssts_to_be_compacted = vstorage_.FilesMarkedForForcedBlobGC(); + ASSERT_EQ(ssts_to_be_compacted.size(), 2); + + std::sort(ssts_to_be_compacted.begin(), ssts_to_be_compacted.end(), + [](const std::pair<int, FileMetaData*>& lhs, + const std::pair<int, FileMetaData*>& rhs) { + assert(lhs.second); + assert(rhs.second); + return lhs.second->fd.GetNumber() < rhs.second->fd.GetNumber(); + }); + + const autovector<std::pair<int, FileMetaData*>> + expected_ssts_to_be_compacted{{level, level_files[0]}, + {level, level_files[1]}}; + + ASSERT_EQ(ssts_to_be_compacted[0], expected_ssts_to_be_compacted[0]); + ASSERT_EQ(ssts_to_be_compacted[1], expected_ssts_to_be_compacted[1]); + } + + // Now try the last two cases again with a greater than necessary age cutoff + + // Oldest batch is eligible based on age cutoff but its overall garbage ratio + // is below threshold + + { + constexpr double age_cutoff = 0.75; + constexpr double force_threshold = 0.6; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + ASSERT_TRUE(vstorage_.FilesMarkedForForcedBlobGC().empty()); + } + + // Oldest batch is eligible based on age cutoff and its overall garbage ratio + // meets threshold + + { + constexpr double age_cutoff = 0.75; + constexpr double force_threshold = 0.5; + vstorage_.ComputeFilesMarkedForForcedBlobGC(age_cutoff, force_threshold); + + auto ssts_to_be_compacted = vstorage_.FilesMarkedForForcedBlobGC(); + ASSERT_EQ(ssts_to_be_compacted.size(), 2); + + std::sort(ssts_to_be_compacted.begin(), ssts_to_be_compacted.end(), + [](const std::pair<int, FileMetaData*>& lhs, + const std::pair<int, FileMetaData*>& rhs) { + assert(lhs.second); + assert(rhs.second); + return lhs.second->fd.GetNumber() < rhs.second->fd.GetNumber(); + }); + + const autovector<std::pair<int, FileMetaData*>> + expected_ssts_to_be_compacted{{level, level_files[0]}, + {level, level_files[1]}}; + + ASSERT_EQ(ssts_to_be_compacted[0], expected_ssts_to_be_compacted[0]); + ASSERT_EQ(ssts_to_be_compacted[1], expected_ssts_to_be_compacted[1]); + } +} + +class VersionStorageInfoTimestampTest : public VersionStorageInfoTestBase { + public: + VersionStorageInfoTimestampTest() + : VersionStorageInfoTestBase(test::BytewiseComparatorWithU64TsWrapper()) { + } + ~VersionStorageInfoTimestampTest() override {} + std::string Timestamp(uint64_t ts) const { + std::string ret; + PutFixed64(&ret, ts); + return ret; + } + std::string PackUserKeyAndTimestamp(const Slice& ukey, uint64_t ts) const { + std::string ret; + ret.assign(ukey.data(), ukey.size()); + PutFixed64(&ret, ts); + return ret; + } +}; + +TEST_F(VersionStorageInfoTimestampTest, GetOverlappingInputs) { + Add(/*level=*/1, /*file_number=*/1, /*smallest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/9), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/8), /*s=*/0, kTypeValue}, + /*file_size=*/100); + Add(/*level=*/1, /*file_number=*/2, /*smallest=*/ + {PackUserKeyAndTimestamp("a", /*ts=*/5), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("b", /*ts=*/10), /*s=*/0, kTypeValue}, + /*file_size=*/100); + Add(/*level=*/1, /*file_number=*/3, /*smallest=*/ + {PackUserKeyAndTimestamp("c", /*ts=*/12), /*s=*/0, kTypeValue}, + /*largest=*/ + {PackUserKeyAndTimestamp("d", /*ts=*/1), /*s=*/0, kTypeValue}, + /*file_size=*/100); + + UpdateVersionStorageInfo(); + + ASSERT_EQ( + "1,2", + GetOverlappingFiles( + /*level=*/1, + {PackUserKeyAndTimestamp("a", /*ts=*/12), /*s=*/0, kTypeValue}, + {PackUserKeyAndTimestamp("a", /*ts=*/11), /*s=*/0, kTypeValue})); + ASSERT_EQ("3", + GetOverlappingFiles( + /*level=*/1, + {PackUserKeyAndTimestamp("c", /*ts=*/15), /*s=*/0, kTypeValue}, + {PackUserKeyAndTimestamp("c", /*ts=*/2), /*s=*/0, kTypeValue})); +} + +class FindLevelFileTest : public testing::Test { + public: + LevelFilesBrief file_level_; + bool disjoint_sorted_files_; + Arena arena_; + + FindLevelFileTest() : disjoint_sorted_files_(true) {} + + ~FindLevelFileTest() override {} + + void LevelFileInit(size_t num = 0) { + char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange)); + file_level_.files = new (mem) FdWithKeyRange[num]; + file_level_.num_files = 0; + } + + void Add(const char* smallest, const char* largest, + SequenceNumber smallest_seq = 100, + SequenceNumber largest_seq = 100) { + InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue); + InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue); + + Slice smallest_slice = smallest_key.Encode(); + Slice largest_slice = largest_key.Encode(); + + char* mem = + arena_.AllocateAligned(smallest_slice.size() + largest_slice.size()); + memcpy(mem, smallest_slice.data(), smallest_slice.size()); + memcpy(mem + smallest_slice.size(), largest_slice.data(), + largest_slice.size()); + + // add to file_level_ + size_t num = file_level_.num_files; + auto& file = file_level_.files[num]; + file.fd = FileDescriptor(num + 1, 0, 0); + file.smallest_key = Slice(mem, smallest_slice.size()); + file.largest_key = Slice(mem + smallest_slice.size(), largest_slice.size()); + file_level_.num_files++; + } + + int Find(const char* key) { + InternalKey target(key, 100, kTypeValue); + InternalKeyComparator cmp(BytewiseComparator()); + return FindFile(cmp, file_level_, target.Encode()); + } + + bool Overlaps(const char* smallest, const char* largest) { + InternalKeyComparator cmp(BytewiseComparator()); + Slice s(smallest != nullptr ? smallest : ""); + Slice l(largest != nullptr ? largest : ""); + return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_, + (smallest != nullptr ? &s : nullptr), + (largest != nullptr ? &l : nullptr)); + } +}; + +TEST_F(FindLevelFileTest, LevelEmpty) { + LevelFileInit(0); + + ASSERT_EQ(0, Find("foo")); + ASSERT_TRUE(!Overlaps("a", "z")); + ASSERT_TRUE(!Overlaps(nullptr, "z")); + ASSERT_TRUE(!Overlaps("a", nullptr)); + ASSERT_TRUE(!Overlaps(nullptr, nullptr)); +} + +TEST_F(FindLevelFileTest, LevelSingle) { + LevelFileInit(1); + + Add("p", "q"); + ASSERT_EQ(0, Find("a")); + ASSERT_EQ(0, Find("p")); + ASSERT_EQ(0, Find("p1")); + ASSERT_EQ(0, Find("q")); + ASSERT_EQ(1, Find("q1")); + ASSERT_EQ(1, Find("z")); + + ASSERT_TRUE(!Overlaps("a", "b")); + ASSERT_TRUE(!Overlaps("z1", "z2")); + ASSERT_TRUE(Overlaps("a", "p")); + ASSERT_TRUE(Overlaps("a", "q")); + ASSERT_TRUE(Overlaps("a", "z")); + ASSERT_TRUE(Overlaps("p", "p1")); + ASSERT_TRUE(Overlaps("p", "q")); + ASSERT_TRUE(Overlaps("p", "z")); + ASSERT_TRUE(Overlaps("p1", "p2")); + ASSERT_TRUE(Overlaps("p1", "z")); + ASSERT_TRUE(Overlaps("q", "q")); + ASSERT_TRUE(Overlaps("q", "q1")); + + ASSERT_TRUE(!Overlaps(nullptr, "j")); + ASSERT_TRUE(!Overlaps("r", nullptr)); + ASSERT_TRUE(Overlaps(nullptr, "p")); + ASSERT_TRUE(Overlaps(nullptr, "p1")); + ASSERT_TRUE(Overlaps("q", nullptr)); + ASSERT_TRUE(Overlaps(nullptr, nullptr)); +} + +TEST_F(FindLevelFileTest, LevelMultiple) { + LevelFileInit(4); + + Add("150", "200"); + Add("200", "250"); + Add("300", "350"); + Add("400", "450"); + ASSERT_EQ(0, Find("100")); + ASSERT_EQ(0, Find("150")); + ASSERT_EQ(0, Find("151")); + ASSERT_EQ(0, Find("199")); + ASSERT_EQ(0, Find("200")); + ASSERT_EQ(1, Find("201")); + ASSERT_EQ(1, Find("249")); + ASSERT_EQ(1, Find("250")); + ASSERT_EQ(2, Find("251")); + ASSERT_EQ(2, Find("299")); + ASSERT_EQ(2, Find("300")); + ASSERT_EQ(2, Find("349")); + ASSERT_EQ(2, Find("350")); + ASSERT_EQ(3, Find("351")); + ASSERT_EQ(3, Find("400")); + ASSERT_EQ(3, Find("450")); + ASSERT_EQ(4, Find("451")); + + ASSERT_TRUE(!Overlaps("100", "149")); + ASSERT_TRUE(!Overlaps("251", "299")); + ASSERT_TRUE(!Overlaps("451", "500")); + ASSERT_TRUE(!Overlaps("351", "399")); + + ASSERT_TRUE(Overlaps("100", "150")); + ASSERT_TRUE(Overlaps("100", "200")); + ASSERT_TRUE(Overlaps("100", "300")); + ASSERT_TRUE(Overlaps("100", "400")); + ASSERT_TRUE(Overlaps("100", "500")); + ASSERT_TRUE(Overlaps("375", "400")); + ASSERT_TRUE(Overlaps("450", "450")); + ASSERT_TRUE(Overlaps("450", "500")); +} + +TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) { + LevelFileInit(4); + + Add("150", "200"); + Add("200", "250"); + Add("300", "350"); + Add("400", "450"); + ASSERT_TRUE(!Overlaps(nullptr, "149")); + ASSERT_TRUE(!Overlaps("451", nullptr)); + ASSERT_TRUE(Overlaps(nullptr, nullptr)); + ASSERT_TRUE(Overlaps(nullptr, "150")); + ASSERT_TRUE(Overlaps(nullptr, "199")); + ASSERT_TRUE(Overlaps(nullptr, "200")); + ASSERT_TRUE(Overlaps(nullptr, "201")); + ASSERT_TRUE(Overlaps(nullptr, "400")); + ASSERT_TRUE(Overlaps(nullptr, "800")); + ASSERT_TRUE(Overlaps("100", nullptr)); + ASSERT_TRUE(Overlaps("200", nullptr)); + ASSERT_TRUE(Overlaps("449", nullptr)); + ASSERT_TRUE(Overlaps("450", nullptr)); +} + +TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) { + LevelFileInit(1); + + Add("200", "200", 5000, 3000); + ASSERT_TRUE(!Overlaps("199", "199")); + ASSERT_TRUE(!Overlaps("201", "300")); + ASSERT_TRUE(Overlaps("200", "200")); + ASSERT_TRUE(Overlaps("190", "200")); + ASSERT_TRUE(Overlaps("200", "210")); +} + +TEST_F(FindLevelFileTest, LevelOverlappingFiles) { + LevelFileInit(2); + + Add("150", "600"); + Add("400", "500"); + disjoint_sorted_files_ = false; + ASSERT_TRUE(!Overlaps("100", "149")); + ASSERT_TRUE(!Overlaps("601", "700")); + ASSERT_TRUE(Overlaps("100", "150")); + ASSERT_TRUE(Overlaps("100", "200")); + ASSERT_TRUE(Overlaps("100", "300")); + ASSERT_TRUE(Overlaps("100", "400")); + ASSERT_TRUE(Overlaps("100", "500")); + ASSERT_TRUE(Overlaps("375", "400")); + ASSERT_TRUE(Overlaps("450", "450")); + ASSERT_TRUE(Overlaps("450", "500")); + ASSERT_TRUE(Overlaps("450", "700")); + ASSERT_TRUE(Overlaps("600", "700")); +} + +class VersionSetTestBase { + public: + const static std::string kColumnFamilyName1; + const static std::string kColumnFamilyName2; + const static std::string kColumnFamilyName3; + int num_initial_edits_; + + explicit VersionSetTestBase(const std::string& name) + : env_(nullptr), + dbname_(test::PerThreadDBPath(name)), + options_(), + db_options_(options_), + cf_options_(options_), + immutable_options_(db_options_, cf_options_), + mutable_cf_options_(cf_options_), + table_cache_(NewLRUCache(50000, 16)), + write_buffer_manager_(db_options_.db_write_buffer_size), + shutting_down_(false), + mock_table_factory_(std::make_shared<mock::MockTableFactory>()) { + EXPECT_OK(test::CreateEnvFromSystem(ConfigOptions(), &env_, &env_guard_)); + if (env_ == Env::Default() && getenv("MEM_ENV")) { + env_guard_.reset(NewMemEnv(Env::Default())); + env_ = env_guard_.get(); + } + EXPECT_NE(nullptr, env_); + + fs_ = env_->GetFileSystem(); + EXPECT_OK(fs_->CreateDirIfMissing(dbname_, IOOptions(), nullptr)); + + options_.env = env_; + db_options_.env = env_; + db_options_.fs = fs_; + immutable_options_.env = env_; + immutable_options_.fs = fs_; + immutable_options_.clock = env_->GetSystemClock().get(); + + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + reactive_versions_ = std::make_shared<ReactiveVersionSet>( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, nullptr); + db_options_.db_paths.emplace_back(dbname_, + std::numeric_limits<uint64_t>::max()); + } + + virtual ~VersionSetTestBase() { + if (getenv("KEEP_DB")) { + fprintf(stdout, "DB is still at %s\n", dbname_.c_str()); + } else { + Options options; + options.env = env_; + EXPECT_OK(DestroyDB(dbname_, options)); + } + } + + protected: + virtual void PrepareManifest( + std::vector<ColumnFamilyDescriptor>* column_families, + SequenceNumber* last_seqno, std::unique_ptr<log::Writer>* log_writer) { + assert(column_families != nullptr); + assert(last_seqno != nullptr); + assert(log_writer != nullptr); + VersionEdit new_db; + if (db_options_.write_dbid_to_manifest) { + DBOptions tmp_db_options; + tmp_db_options.env = env_; + std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_)); + std::string db_id; + impl->GetDbIdentityFromIdentityFile(&db_id); + new_db.SetDBId(db_id); + } + new_db.SetLogNumber(0); + new_db.SetNextFile(2); + new_db.SetLastSequence(0); + + const std::vector<std::string> cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + const int kInitialNumOfCfs = static_cast<int>(cf_names.size()); + autovector<VersionEdit> new_cfs; + uint64_t last_seq = 1; + uint32_t cf_id = 1; + for (int i = 1; i != kInitialNumOfCfs; ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + new_cf.SetLogNumber(0); + new_cf.SetNextFile(2); + new_cf.SetLastSequence(last_seq++); + new_cfs.emplace_back(new_cf); + } + *last_seqno = last_seq; + num_initial_edits_ = static_cast<int>(new_cfs.size() + 1); + std::unique_ptr<WritableFileWriter> file_writer; + const std::string manifest = DescriptorFileName(dbname_, 1); + const auto& fs = env_->GetFileSystem(); + Status s = WritableFileWriter::Create( + fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, + nullptr); + ASSERT_OK(s); + { + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); + std::string record; + new_db.EncodeTo(&record); + s = (*log_writer)->AddRecord(record); + for (const auto& e : new_cfs) { + record.clear(); + e.EncodeTo(&record); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + } + ASSERT_OK(s); + + cf_options_.table_factory = mock_table_factory_; + for (const auto& cf_name : cf_names) { + column_families->emplace_back(cf_name, cf_options_); + } + } + + // Create DB with 3 column families. + void NewDB() { + SequenceNumber last_seqno; + std::unique_ptr<log::Writer> log_writer; + SetIdentityFile(env_, dbname_); + PrepareManifest(&column_families_, &last_seqno, &log_writer); + log_writer.reset(); + // Make "CURRENT" file point to the new manifest file. + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); + ASSERT_OK(s); + + EXPECT_OK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + } + + void ReopenDB() { + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + EXPECT_OK(versions_->Recover(column_families_, false)); + } + + void VerifyManifest(std::string* manifest_path) const { + assert(manifest_path != nullptr); + uint64_t manifest_file_number = 0; + Status s = versions_->GetCurrentManifestPath( + dbname_, fs_.get(), manifest_path, &manifest_file_number); + ASSERT_OK(s); + ASSERT_EQ(1, manifest_file_number); + } + + Status LogAndApplyToDefaultCF(VersionEdit& edit) { + mutex_.Lock(); + Status s = + versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options_, &edit, &mutex_, nullptr); + mutex_.Unlock(); + return s; + } + + Status LogAndApplyToDefaultCF( + const autovector<std::unique_ptr<VersionEdit>>& edits) { + autovector<VersionEdit*> vedits; + for (auto& e : edits) { + vedits.push_back(e.get()); + } + mutex_.Lock(); + Status s = + versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options_, vedits, &mutex_, nullptr); + mutex_.Unlock(); + return s; + } + + void CreateNewManifest() { + constexpr FSDirectory* db_directory = nullptr; + constexpr bool new_descriptor_log = true; + mutex_.Lock(); + VersionEdit dummy; + ASSERT_OK(versions_->LogAndApply( + versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_, + &dummy, &mutex_, db_directory, new_descriptor_log)); + mutex_.Unlock(); + } + + ColumnFamilyData* CreateColumnFamily(const std::string& cf_name, + const ColumnFamilyOptions& cf_options) { + VersionEdit new_cf; + new_cf.AddColumnFamily(cf_name); + uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); + new_cf.SetColumnFamily(new_id); + new_cf.SetLogNumber(0); + new_cf.SetComparatorName(cf_options.comparator->Name()); + Status s; + mutex_.Lock(); + s = versions_->LogAndApply(/*column_family_data=*/nullptr, + MutableCFOptions(cf_options), &new_cf, &mutex_, + /*db_directory=*/nullptr, + /*new_descriptor_log=*/false, &cf_options); + mutex_.Unlock(); + EXPECT_OK(s); + ColumnFamilyData* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(cf_name); + EXPECT_NE(nullptr, cfd); + return cfd; + } + + Env* mem_env_; + Env* env_; + std::shared_ptr<Env> env_guard_; + std::shared_ptr<FileSystem> fs_; + const std::string dbname_; + EnvOptions env_options_; + Options options_; + ImmutableDBOptions db_options_; + ColumnFamilyOptions cf_options_; + ImmutableOptions immutable_options_; + MutableCFOptions mutable_cf_options_; + std::shared_ptr<Cache> table_cache_; + WriteController write_controller_; + WriteBufferManager write_buffer_manager_; + std::shared_ptr<VersionSet> versions_; + std::shared_ptr<ReactiveVersionSet> reactive_versions_; + InstrumentedMutex mutex_; + std::atomic<bool> shutting_down_; + std::shared_ptr<mock::MockTableFactory> mock_table_factory_; + std::vector<ColumnFamilyDescriptor> column_families_; +}; + +const std::string VersionSetTestBase::kColumnFamilyName1 = "alice"; +const std::string VersionSetTestBase::kColumnFamilyName2 = "bob"; +const std::string VersionSetTestBase::kColumnFamilyName3 = "charles"; + +class VersionSetTest : public VersionSetTestBase, public testing::Test { + public: + VersionSetTest() : VersionSetTestBase("version_set_test") {} +}; + +TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { + NewDB(); + const int kGroupSize = 5; + autovector<VersionEdit> edits; + for (int i = 0; i != kGroupSize; ++i) { + edits.emplace_back(VersionEdit()); + } + autovector<ColumnFamilyData*> cfds; + autovector<const MutableCFOptions*> all_mutable_cf_options; + autovector<autovector<VersionEdit*>> edit_lists; + for (int i = 0; i != kGroupSize; ++i) { + cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault()); + all_mutable_cf_options.emplace_back(&mutable_cf_options_); + autovector<VersionEdit*> edit_list; + edit_list.emplace_back(&edits[i]); + edit_lists.emplace_back(edit_list); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + int count = 0; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) { + uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg); + EXPECT_EQ(0u, *cf_id); + ++count; + }); + SyncPoint::GetInstance()->EnableProcessing(); + mutex_.Lock(); + Status s = versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, + &mutex_, nullptr); + mutex_.Unlock(); + EXPECT_OK(s); + EXPECT_EQ(kGroupSize - 1, count); +} + +TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) { + // Initialize the database and add a couple of blob files, one with some + // garbage in it, and one without any garbage. + NewDB(); + + assert(versions_); + assert(versions_->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const version = cfd->current(); + assert(version); + + VersionStorageInfo* const storage_info = version->storage_info(); + assert(storage_info); + + { + constexpr uint64_t blob_file_number = 123; + constexpr uint64_t total_blob_count = 456; + constexpr uint64_t total_blob_bytes = 77777777; + constexpr char checksum_method[] = "SHA1"; + constexpr char checksum_value[] = + "\xbd\xb7\xf3\x4a\x59\xdf\xa1\x59\x2c\xe7\xf5\x2e\x99\xf9\x8c\x57\x0c" + "\x52\x5c\xbd"; + + auto shared_meta = SharedBlobFileMetaData::Create( + blob_file_number, total_blob_count, total_blob_bytes, checksum_method, + checksum_value); + + constexpr uint64_t garbage_blob_count = 89; + constexpr uint64_t garbage_blob_bytes = 1000000; + + auto meta = BlobFileMetaData::Create( + std::move(shared_meta), BlobFileMetaData::LinkedSsts(), + garbage_blob_count, garbage_blob_bytes); + + storage_info->AddBlobFile(std::move(meta)); + } + + { + constexpr uint64_t blob_file_number = 234; + constexpr uint64_t total_blob_count = 555; + constexpr uint64_t total_blob_bytes = 66666; + constexpr char checksum_method[] = "CRC32"; + constexpr char checksum_value[] = "\x3d\x87\xff\x57"; + + auto shared_meta = SharedBlobFileMetaData::Create( + blob_file_number, total_blob_count, total_blob_bytes, checksum_method, + checksum_value); + + constexpr uint64_t garbage_blob_count = 0; + constexpr uint64_t garbage_blob_bytes = 0; + + auto meta = BlobFileMetaData::Create( + std::move(shared_meta), BlobFileMetaData::LinkedSsts(), + garbage_blob_count, garbage_blob_bytes); + + storage_info->AddBlobFile(std::move(meta)); + } + + // Force the creation of a new manifest file and make sure metadata for + // the blob files is re-persisted. + size_t addition_encoded = 0; + SyncPoint::GetInstance()->SetCallBack( + "BlobFileAddition::EncodeTo::CustomFields", + [&](void* /* arg */) { ++addition_encoded; }); + + size_t garbage_encoded = 0; + SyncPoint::GetInstance()->SetCallBack( + "BlobFileGarbage::EncodeTo::CustomFields", + [&](void* /* arg */) { ++garbage_encoded; }); + SyncPoint::GetInstance()->EnableProcessing(); + + CreateNewManifest(); + + ASSERT_EQ(addition_encoded, 2); + ASSERT_EQ(garbage_encoded, 1); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(VersionSetTest, AddLiveBlobFiles) { + // Initialize the database and add a blob file. + NewDB(); + + assert(versions_); + assert(versions_->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const first_version = cfd->current(); + assert(first_version); + + VersionStorageInfo* const first_storage_info = first_version->storage_info(); + assert(first_storage_info); + + constexpr uint64_t first_blob_file_number = 234; + constexpr uint64_t first_total_blob_count = 555; + constexpr uint64_t first_total_blob_bytes = 66666; + constexpr char first_checksum_method[] = "CRC32"; + constexpr char first_checksum_value[] = "\x3d\x87\xff\x57"; + + auto first_shared_meta = SharedBlobFileMetaData::Create( + first_blob_file_number, first_total_blob_count, first_total_blob_bytes, + first_checksum_method, first_checksum_value); + + constexpr uint64_t garbage_blob_count = 0; + constexpr uint64_t garbage_blob_bytes = 0; + + auto first_meta = BlobFileMetaData::Create( + std::move(first_shared_meta), BlobFileMetaData::LinkedSsts(), + garbage_blob_count, garbage_blob_bytes); + + first_storage_info->AddBlobFile(first_meta); + + // Reference the version so it stays alive even after the following version + // edit. + first_version->Ref(); + + // Get live files directly from version. + std::vector<uint64_t> version_table_files; + std::vector<uint64_t> version_blob_files; + + first_version->AddLiveFiles(&version_table_files, &version_blob_files); + + ASSERT_EQ(version_blob_files.size(), 1); + ASSERT_EQ(version_blob_files[0], first_blob_file_number); + + // Create a new version containing an additional blob file. + versions_->TEST_CreateAndAppendVersion(cfd); + + Version* const second_version = cfd->current(); + assert(second_version); + assert(second_version != first_version); + + VersionStorageInfo* const second_storage_info = + second_version->storage_info(); + assert(second_storage_info); + + constexpr uint64_t second_blob_file_number = 456; + constexpr uint64_t second_total_blob_count = 100; + constexpr uint64_t second_total_blob_bytes = 2000000; + constexpr char second_checksum_method[] = "CRC32B"; + constexpr char second_checksum_value[] = "\x6d\xbd\xf2\x3a"; + + auto second_shared_meta = SharedBlobFileMetaData::Create( + second_blob_file_number, second_total_blob_count, second_total_blob_bytes, + second_checksum_method, second_checksum_value); + + auto second_meta = BlobFileMetaData::Create( + std::move(second_shared_meta), BlobFileMetaData::LinkedSsts(), + garbage_blob_count, garbage_blob_bytes); + + second_storage_info->AddBlobFile(std::move(first_meta)); + second_storage_info->AddBlobFile(std::move(second_meta)); + + // Get all live files from version set. Note that the result contains + // duplicates. + std::vector<uint64_t> all_table_files; + std::vector<uint64_t> all_blob_files; + + versions_->AddLiveFiles(&all_table_files, &all_blob_files); + + ASSERT_EQ(all_blob_files.size(), 3); + ASSERT_EQ(all_blob_files[0], first_blob_file_number); + ASSERT_EQ(all_blob_files[1], first_blob_file_number); + ASSERT_EQ(all_blob_files[2], second_blob_file_number); + + // Clean up previous version. + first_version->Unref(); +} + +TEST_F(VersionSetTest, ObsoleteBlobFile) { + // Initialize the database and add a blob file that is entirely garbage + // and thus can immediately be marked obsolete. + NewDB(); + + VersionEdit edit; + + constexpr uint64_t blob_file_number = 234; + constexpr uint64_t total_blob_count = 555; + constexpr uint64_t total_blob_bytes = 66666; + constexpr char checksum_method[] = "CRC32"; + constexpr char checksum_value[] = "\x3d\x87\xff\x57"; + + edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes, + checksum_method, checksum_value); + + edit.AddBlobFileGarbage(blob_file_number, total_blob_count, total_blob_bytes); + + mutex_.Lock(); + Status s = + versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options_, &edit, &mutex_, nullptr); + mutex_.Unlock(); + + ASSERT_OK(s); + + // Make sure blob files from the pending number range are not returned + // as obsolete. + { + std::vector<ObsoleteFileInfo> table_files; + std::vector<ObsoleteBlobFileInfo> blob_files; + std::vector<std::string> manifest_files; + constexpr uint64_t min_pending_output = blob_file_number; + + versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files, + min_pending_output); + + ASSERT_TRUE(blob_files.empty()); + } + + // Make sure the blob file is returned as obsolete if it's not in the pending + // range. + { + std::vector<ObsoleteFileInfo> table_files; + std::vector<ObsoleteBlobFileInfo> blob_files; + std::vector<std::string> manifest_files; + constexpr uint64_t min_pending_output = blob_file_number + 1; + + versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files, + min_pending_output); + + ASSERT_EQ(blob_files.size(), 1); + ASSERT_EQ(blob_files[0].GetBlobFileNumber(), blob_file_number); + } + + // Make sure it's not returned a second time. + { + std::vector<ObsoleteFileInfo> table_files; + std::vector<ObsoleteBlobFileInfo> blob_files; + std::vector<std::string> manifest_files; + constexpr uint64_t min_pending_output = blob_file_number + 1; + + versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files, + min_pending_output); + + ASSERT_TRUE(blob_files.empty()); + } +} + +TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) { + NewDB(); + + constexpr uint64_t kNumWals = 5; + + autovector<std::unique_ptr<VersionEdit>> edits; + // Add some WALs. + for (uint64_t i = 1; i <= kNumWals; i++) { + edits.emplace_back(new VersionEdit); + // WAL's size equals its log number. + edits.back()->AddWal(i, WalMetadata(i)); + } + // Delete the first half of the WALs. + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWalsBefore(kNumWals / 2 + 1); + + autovector<Version*> versions; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:NewVersion", + [&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(LogAndApplyToDefaultCF(edits)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Since the edits are all WAL edits, no version should be created. + ASSERT_EQ(versions.size(), 1); + ASSERT_EQ(versions[0], nullptr); +} + +// Similar to WalEditsNotAppliedToVersion, but contains a non-WAL edit. +TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) { + NewDB(); + + const std::string kDBId = "db_db"; + constexpr uint64_t kNumWals = 5; + + autovector<std::unique_ptr<VersionEdit>> edits; + // Add some WALs. + for (uint64_t i = 1; i <= kNumWals; i++) { + edits.emplace_back(new VersionEdit); + // WAL's size equals its log number. + edits.back()->AddWal(i, WalMetadata(i)); + } + // Delete the first half of the WALs. + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWalsBefore(kNumWals / 2 + 1); + edits.emplace_back(new VersionEdit); + edits.back()->SetDBId(kDBId); + + autovector<Version*> versions; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:NewVersion", + [&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(LogAndApplyToDefaultCF(edits)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Since the edits are all WAL edits, no version should be created. + ASSERT_EQ(versions.size(), 1); + ASSERT_NE(versions[0], nullptr); +} + +TEST_F(VersionSetTest, WalAddition) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + + // A WAL is just created. + { + VersionEdit edit; + edit.AddWal(kLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize()); + } + + // The WAL is synced for several times before closing. + { + for (uint64_t size_delta = 100; size_delta > 0; size_delta /= 2) { + uint64_t size = kSizeInBytes - size_delta; + WalMetadata wal(size); + VersionEdit edit; + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), size); + } + } + + // The WAL is closed. + { + WalMetadata wal(kSizeInBytes); + VersionEdit edit; + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes); + } + + // Recover a new VersionSet. + { + std::unique_ptr<VersionSet> new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes); + } +} + +TEST_F(VersionSetTest, WalCloseWithoutSync) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + constexpr uint64_t kSyncedSizeInBytes = kSizeInBytes / 2; + + // A WAL is just created. + { + VersionEdit edit; + edit.AddWal(kLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize()); + } + + // The WAL is synced before closing. + { + WalMetadata wal(kSyncedSizeInBytes); + VersionEdit edit; + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes); + } + + // A new WAL with larger log number is created, + // implicitly marking the current WAL closed. + { + VersionEdit edit; + edit.AddWal(kLogNumber + 1); + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 2); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes); + ASSERT_TRUE(wals.find(kLogNumber + 1) != wals.end()); + ASSERT_FALSE(wals.at(kLogNumber + 1).HasSyncedSize()); + } + + // Recover a new VersionSet. + { + std::unique_ptr<VersionSet> new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 2); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes); + } +} + +TEST_F(VersionSetTest, WalDeletion) { + NewDB(); + + constexpr WalNumber kClosedLogNumber = 10; + constexpr WalNumber kNonClosedLogNumber = 20; + constexpr uint64_t kSizeInBytes = 111; + + // Add a non-closed and a closed WAL. + { + VersionEdit edit; + edit.AddWal(kClosedLogNumber, WalMetadata(kSizeInBytes)); + edit.AddWal(kNonClosedLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 2); + ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end()); + ASSERT_TRUE(wals.find(kClosedLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize()); + ASSERT_TRUE(wals.at(kClosedLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kClosedLogNumber).GetSyncedSizeInBytes(), kSizeInBytes); + } + + // Delete the closed WAL. + { + VersionEdit edit; + edit.DeleteWalsBefore(kNonClosedLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize()); + } + + // Recover a new VersionSet, only the non-closed WAL should show up. + { + std::unique_ptr<VersionSet> new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize()); + } + + // Force the creation of a new MANIFEST file, + // only the non-closed WAL should be written to the new MANIFEST. + { + std::vector<WalAddition> wal_additions; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::WriteCurrentStateToManifest:SaveWal", [&](void* arg) { + VersionEdit* edit = reinterpret_cast<VersionEdit*>(arg); + ASSERT_TRUE(edit->IsWalAddition()); + for (auto& addition : edit->GetWalAdditions()) { + wal_additions.push_back(addition); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + CreateNewManifest(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(wal_additions.size(), 1); + ASSERT_EQ(wal_additions[0].GetLogNumber(), kNonClosedLogNumber); + ASSERT_FALSE(wal_additions[0].GetMetadata().HasSyncedSize()); + } + + // Recover from the new MANIFEST, only the non-closed WAL should show up. + { + std::unique_ptr<VersionSet> new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize()); + } +} + +TEST_F(VersionSetTest, WalCreateTwice) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + + VersionEdit edit; + edit.AddWal(kLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + Status s = LogAndApplyToDefaultCF(edit); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") != + std::string::npos) + << s.ToString(); +} + +TEST_F(VersionSetTest, WalCreateAfterClose) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + + { + // Add a closed WAL. + VersionEdit edit; + edit.AddWal(kLogNumber); + WalMetadata wal(kSizeInBytes); + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + { + // Create the same WAL again. + VersionEdit edit; + edit.AddWal(kLogNumber); + + Status s = LogAndApplyToDefaultCF(edit); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") != + std::string::npos) + << s.ToString(); + } +} + +TEST_F(VersionSetTest, AddWalWithSmallerSize) { + NewDB(); + assert(versions_); + + constexpr WalNumber kLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + + { + // Add a closed WAL. + VersionEdit edit; + WalMetadata wal(kSizeInBytes); + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + // Copy for future comparison. + const std::map<WalNumber, WalMetadata> wals1 = + versions_->GetWalSet().GetWals(); + + { + // Add the same WAL with smaller synced size. + VersionEdit edit; + WalMetadata wal(kSizeInBytes / 2); + edit.AddWal(kLogNumber, wal); + + Status s = LogAndApplyToDefaultCF(edit); + ASSERT_OK(s); + } + const std::map<WalNumber, WalMetadata> wals2 = + versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals1, wals2); +} + +TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) { + NewDB(); + + constexpr WalNumber kLogNumber0 = 10; + constexpr WalNumber kLogNumber1 = 20; + constexpr WalNumber kNonExistingNumber = 15; + constexpr uint64_t kSizeInBytes = 111; + + { + // Add closed WALs. + VersionEdit edit; + WalMetadata wal(kSizeInBytes); + edit.AddWal(kLogNumber0, wal); + edit.AddWal(kLogNumber1, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + { + // Delete WALs before a non-existing WAL. + VersionEdit edit; + edit.DeleteWalsBefore(kNonExistingNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + // Recover a new VersionSet, WAL0 is deleted, WAL1 is not. + { + std::unique_ptr<VersionSet> new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber1) != wals.end()); + } +} + +TEST_F(VersionSetTest, DeleteAllWals) { + NewDB(); + + constexpr WalNumber kMaxLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + + { + // Add a closed WAL. + VersionEdit edit; + WalMetadata wal(kSizeInBytes); + edit.AddWal(kMaxLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + { + VersionEdit edit; + edit.DeleteWalsBefore(kMaxLogNumber + 10); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + // Recover a new VersionSet, all WALs are deleted. + { + std::unique_ptr<VersionSet> new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 0); + } +} + +TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { + NewDB(); + + constexpr int kAtomicGroupSize = 7; + constexpr uint64_t kNumWals = 5; + const std::string kDBId = "db_db"; + + int remaining = kAtomicGroupSize; + autovector<std::unique_ptr<VersionEdit>> edits; + // Add 5 WALs. + for (uint64_t i = 1; i <= kNumWals; i++) { + edits.emplace_back(new VersionEdit); + // WAL's size equals its log number. + edits.back()->AddWal(i, WalMetadata(i)); + edits.back()->MarkAtomicGroup(--remaining); + } + // One edit with the min log number set. + edits.emplace_back(new VersionEdit); + edits.back()->SetDBId(kDBId); + edits.back()->MarkAtomicGroup(--remaining); + // Delete the first added 4 WALs. + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWalsBefore(kNumWals); + edits.back()->MarkAtomicGroup(--remaining); + ASSERT_EQ(remaining, 0); + + ASSERT_OK(LogAndApplyToDefaultCF(edits)); + + // Recover a new VersionSet, the min log number and the last WAL should be + // kept. + { + std::unique_ptr<VersionSet> new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + std::string db_id; + ASSERT_OK( + new_versions->Recover(column_families_, /*read_only=*/false, &db_id)); + + ASSERT_EQ(db_id, kDBId); + + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kNumWals) != wals.end()); + ASSERT_TRUE(wals.at(kNumWals).HasSyncedSize()); + ASSERT_EQ(wals.at(kNumWals).GetSyncedSizeInBytes(), kNumWals); + } +} + +class VersionSetWithTimestampTest : public VersionSetTest { + public: + static const std::string kNewCfName; + + explicit VersionSetWithTimestampTest() : VersionSetTest() {} + + void SetUp() override { + NewDB(); + Options options; + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + cfd_ = CreateColumnFamily(kNewCfName, options); + EXPECT_NE(nullptr, cfd_); + EXPECT_NE(nullptr, cfd_->GetLatestMutableCFOptions()); + column_families_.emplace_back(kNewCfName, options); + } + + void TearDown() override { + for (auto* e : edits_) { + delete e; + } + edits_.clear(); + } + + void GenVersionEditsToSetFullHistoryTsLow( + const std::vector<uint64_t>& ts_lbs) { + for (const auto ts_lb : ts_lbs) { + VersionEdit* edit = new VersionEdit; + edit->SetColumnFamily(cfd_->GetID()); + std::string ts_str = test::EncodeInt(ts_lb); + edit->SetFullHistoryTsLow(ts_str); + edits_.emplace_back(edit); + } + } + + void VerifyFullHistoryTsLow(uint64_t expected_ts_low) { + std::unique_ptr<VersionSet> vset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false, + /*db_id=*/nullptr)); + for (auto* cfd : *(vset->GetColumnFamilySet())) { + ASSERT_NE(nullptr, cfd); + if (cfd->GetName() == kNewCfName) { + ASSERT_EQ(test::EncodeInt(expected_ts_low), cfd->GetFullHistoryTsLow()); + } else { + ASSERT_TRUE(cfd->GetFullHistoryTsLow().empty()); + } + } + } + + void DoTest(const std::vector<uint64_t>& ts_lbs) { + if (ts_lbs.empty()) { + return; + } + + GenVersionEditsToSetFullHistoryTsLow(ts_lbs); + + Status s; + mutex_.Lock(); + s = versions_->LogAndApply(cfd_, *(cfd_->GetLatestMutableCFOptions()), + edits_, &mutex_, nullptr); + mutex_.Unlock(); + ASSERT_OK(s); + VerifyFullHistoryTsLow(*std::max_element(ts_lbs.begin(), ts_lbs.end())); + } + + protected: + ColumnFamilyData* cfd_{nullptr}; + // edits_ must contain and own pointers to heap-alloc VersionEdit objects. + autovector<VersionEdit*> edits_; +}; + +const std::string VersionSetWithTimestampTest::kNewCfName("new_cf"); + +TEST_F(VersionSetWithTimestampTest, SetFullHistoryTsLbOnce) { + constexpr uint64_t kTsLow = 100; + DoTest({kTsLow}); +} + +// Simulate the application increasing full_history_ts_low. +TEST_F(VersionSetWithTimestampTest, IncreaseFullHistoryTsLb) { + const std::vector<uint64_t> ts_lbs = {100, 101, 102, 103}; + DoTest(ts_lbs); +} + +// Simulate the application trying to decrease full_history_ts_low +// unsuccessfully. If the application calls public API sequentially to +// decrease the lower bound ts, RocksDB will return an InvalidArgument +// status before involving VersionSet. Only when multiple threads trying +// to decrease the lower bound concurrently will this case ever happen. Even +// so, the lower bound cannot be decreased. The application will be notified +// via return value of the API. +TEST_F(VersionSetWithTimestampTest, TryDecreaseFullHistoryTsLb) { + const std::vector<uint64_t> ts_lbs = {103, 102, 101, 100}; + DoTest(ts_lbs); +} + +class VersionSetAtomicGroupTest : public VersionSetTestBase, + public testing::Test { + public: + VersionSetAtomicGroupTest() + : VersionSetTestBase("version_set_atomic_group_test") {} + + void SetUp() override { + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + SetupTestSyncPoints(); + } + + void SetupValidAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + edits_[i].MarkAtomicGroup(--remaining); + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr)); + } + + void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + edits_[i].MarkAtomicGroup(--remaining); + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr)); + } + + void SetupCorruptedAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + if (i != ((size_t)atomic_group_size / 2)) { + edits_[i].MarkAtomicGroup(--remaining); + } + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr)); + } + + void SetupIncorrectAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + if (i != 1) { + edits_[i].MarkAtomicGroup(--remaining); + } else { + edits_[i].MarkAtomicGroup(remaining--); + } + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr)); + } + + void SetupTestSyncPoints() { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); + EXPECT_EQ(edits_.front().DebugString(), + e->DebugString()); // compare based on value + first_in_atomic_group_ = true; + }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); + EXPECT_EQ(edits_.back().DebugString(), + e->DebugString()); // compare based on value + EXPECT_TRUE(first_in_atomic_group_); + last_in_atomic_group_ = true; + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionEditHandlerBase::Iterate:Finish", [&](void* arg) { + num_recovered_edits_ = *reinterpret_cast<size_t*>(arg); + }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:AtomicGroup", + [&](void* /* arg */) { ++num_edits_in_atomic_group_; }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", + [&](void* arg) { + corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg); + }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", + [&](void* arg) { + edit_with_incorrect_group_size_ = + *reinterpret_cast<VersionEdit*>(arg); + }); + SyncPoint::GetInstance()->EnableProcessing(); + } + + void AddNewEditsToLog(int num_edits) { + for (int i = 0; i < num_edits; i++) { + std::string record; + edits_[i].EncodeTo(&record); + ASSERT_OK(log_writer_->AddRecord(record)); + } + } + + void TearDown() override { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + log_writer_.reset(); + } + + protected: + std::vector<ColumnFamilyDescriptor> column_families_; + SequenceNumber last_seqno_; + std::vector<VersionEdit> edits_; + bool first_in_atomic_group_ = false; + bool last_in_atomic_group_ = false; + int num_edits_in_atomic_group_ = 0; + size_t num_recovered_edits_ = 0; + VersionEdit corrupted_edit_; + VersionEdit edit_with_incorrect_group_size_; + std::unique_ptr<log::Writer> log_writer_; +}; + +TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) { + const int kAtomicGroupSize = 3; + SetupValidAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_OK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleValidAtomicGroupWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 3; + SetupValidAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + // The recover should clean up the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 3; + SetupValidAtomicGroup(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + AddNewEditsToLog(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + mu.Lock(); + EXPECT_OK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + mu.Unlock(); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + // The recover should clean up the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(kAtomicGroupSize, num_recovered_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) { + const int kAtomicGroupSize = 4; + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + EXPECT_OK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 4; + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + // Reactive version set should store the edits in the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == + kNumberOfPersistedVersionEdits); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize); + // Write the last record. The reactive version set should now apply all + // edits. + std::string last_record; + edits_[kAtomicGroupSize - 1].EncodeTo(&last_record); + EXPECT_OK(log_writer_->AddRecord(last_record)); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + mu.Lock(); + EXPECT_OK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + mu.Unlock(); + // Reactive version set should be empty now. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 4; + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + // No edits in an atomic group. + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + // Write a few edits in an atomic group. + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + mu.Lock(); + EXPECT_OK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + mu.Unlock(); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + // Reactive version set should store the edits in the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == + kNumberOfPersistedVersionEdits); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_NOK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + // Write the corrupted edits. + AddNewEditsToLog(kAtomicGroupSize); + mu.Lock(); + EXPECT_NOK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + mu.Unlock(); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_NOK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set<ColumnFamilyData*> cfds_changed; + std::unique_ptr<log::FragmentBufferedReader> manifest_reader; + std::unique_ptr<log::Reader::Reporter> manifest_reporter; + std::unique_ptr<Status> manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + AddNewEditsToLog(kAtomicGroupSize); + mu.Lock(); + EXPECT_NOK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); + mu.Unlock(); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); +} + +class VersionSetTestDropOneCF : public VersionSetTestBase, + public testing::TestWithParam<std::string> { + public: + VersionSetTestDropOneCF() + : VersionSetTestBase("version_set_test_drop_one_cf") {} +}; + +// This test simulates the following execution sequence +// Time thread1 bg_flush_thr +// | Prepare version edits (e1,e2,e3) for atomic +// | flush cf1, cf2, cf3 +// | Enqueue e to drop cfi +// | to manifest_writers_ +// | Enqueue (e1,e2,e3) to manifest_writers_ +// | +// | Apply e, +// | cfi.IsDropped() is true +// | Apply (e1,e2,e3), +// | since cfi.IsDropped() == true, we need to +// | drop ei and write the rest to MANIFEST. +// V +// +// Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and +// last column family in an atomic group. +TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) { + std::vector<ColumnFamilyDescriptor> column_families; + SequenceNumber last_seqno; + std::unique_ptr<log::Writer> log_writer; + PrepareManifest(&column_families, &last_seqno, &log_writer); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); + ASSERT_OK(s); + + EXPECT_OK(versions_->Recover(column_families, false /* read_only */)); + EXPECT_EQ(column_families.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + + const int kAtomicGroupSize = 3; + const std::vector<std::string> non_default_cf_names = { + kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3}; + + // Drop one column family + VersionEdit drop_cf_edit; + drop_cf_edit.DropColumnFamily(); + const std::string cf_to_drop_name(GetParam()); + auto cfd_to_drop = + versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name); + ASSERT_NE(nullptr, cfd_to_drop); + // Increase its refcount because cfd_to_drop is used later, and we need to + // prevent it from being deleted. + cfd_to_drop->Ref(); + drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID()); + mutex_.Lock(); + s = versions_->LogAndApply(cfd_to_drop, + *cfd_to_drop->GetLatestMutableCFOptions(), + &drop_cf_edit, &mutex_, nullptr); + mutex_.Unlock(); + ASSERT_OK(s); + + std::vector<VersionEdit> edits(kAtomicGroupSize); + uint32_t remaining = kAtomicGroupSize; + size_t i = 0; + autovector<ColumnFamilyData*> cfds; + autovector<const MutableCFOptions*> mutable_cf_options_list; + autovector<autovector<VersionEdit*>> edit_lists; + for (const auto& cf_name : non_default_cf_names) { + auto cfd = (cf_name != cf_to_drop_name) + ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name) + : cfd_to_drop; + ASSERT_NE(nullptr, cfd); + cfds.push_back(cfd); + mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions()); + edits[i].SetColumnFamily(cfd->GetID()); + edits[i].SetLogNumber(0); + edits[i].SetNextFile(2); + edits[i].MarkAtomicGroup(--remaining); + edits[i].SetLastSequence(last_seqno++); + autovector<VersionEdit*> tmp_edits; + tmp_edits.push_back(&edits[i]); + edit_lists.emplace_back(tmp_edits); + ++i; + } + int called = 0; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) { + std::vector<VersionEdit*>* tmp_edits = + reinterpret_cast<std::vector<VersionEdit*>*>(arg); + EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size()); + for (const auto e : *tmp_edits) { + bool found = false; + for (const auto& e2 : edits) { + if (&e2 == e) { + found = true; + break; + } + } + ASSERT_TRUE(found); + } + ++called; + }); + SyncPoint::GetInstance()->EnableProcessing(); + mutex_.Lock(); + s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists, &mutex_, + nullptr); + mutex_.Unlock(); + ASSERT_OK(s); + ASSERT_EQ(1, called); + cfd_to_drop->UnrefAndTryDelete(); +} + +INSTANTIATE_TEST_CASE_P( + AtomicGroup, VersionSetTestDropOneCF, + testing::Values(VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3)); + +class EmptyDefaultCfNewManifest : public VersionSetTestBase, + public testing::Test { + public: + EmptyDefaultCfNewManifest() : VersionSetTestBase("version_set_new_db_test") {} + // Emulate DBImpl::NewDB() + void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/, + SequenceNumber* /*last_seqno*/, + std::unique_ptr<log::Writer>* log_writer) override { + assert(log_writer != nullptr); + VersionEdit new_db; + new_db.SetLogNumber(0); + const std::string manifest_path = DescriptorFileName(dbname_, 1); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr<WritableFileWriter> file_writer; + Status s = WritableFileWriter::Create( + fs, manifest_path, fs->OptimizeForManifestWrite(env_options_), + &file_writer, nullptr); + ASSERT_OK(s); + log_writer->reset(new log::Writer(std::move(file_writer), 0, true)); + std::string record; + ASSERT_TRUE(new_db.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + // Create new column family + VersionEdit new_cf; + new_cf.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1); + new_cf.SetColumnFamily(1); + new_cf.SetLastSequence(2); + new_cf.SetNextFile(2); + record.clear(); + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + + protected: + bool write_dbid_to_manifest_ = false; + std::unique_ptr<log::Writer> log_writer_; +}; + +// Create db, create column family. Cf creation will switch to a new MANIFEST. +// Then reopen db, trying to recover. +TEST_F(EmptyDefaultCfNewManifest, Recover) { + PrepareManifest(nullptr, nullptr, &log_writer_); + log_writer_.reset(); + Status s = + SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::vector<ColumnFamilyDescriptor> column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); + column_families.emplace_back(VersionSetTestBase::kColumnFamilyName1, + cf_options_); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest( + manifest_path, column_families, false, &db_id, &has_missing_table_file); + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); +} + +class VersionSetTestEmptyDb + : public VersionSetTestBase, + public testing::TestWithParam< + std::tuple<bool, bool, std::vector<std::string>>> { + public: + static const std::string kUnknownColumnFamilyName; + VersionSetTestEmptyDb() : VersionSetTestBase("version_set_test_empty_db") {} + + protected: + void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/, + SequenceNumber* /*last_seqno*/, + std::unique_ptr<log::Writer>* log_writer) override { + assert(nullptr != log_writer); + VersionEdit new_db; + if (db_options_.write_dbid_to_manifest) { + DBOptions tmp_db_options; + tmp_db_options.env = env_; + std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_)); + std::string db_id; + impl->GetDbIdentityFromIdentityFile(&db_id); + new_db.SetDBId(db_id); + } + const std::string manifest_path = DescriptorFileName(dbname_, 1); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr<WritableFileWriter> file_writer; + Status s = WritableFileWriter::Create( + fs, manifest_path, fs->OptimizeForManifestWrite(env_options_), + &file_writer, nullptr); + ASSERT_OK(s); + { + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); + std::string record; + new_db.EncodeTo(&record); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + } + + std::unique_ptr<log::Writer> log_writer_; +}; + +const std::string VersionSetTestEmptyDb::kUnknownColumnFamilyName = "unknown"; + +TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + log_writer_.reset(); + Status s = + SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector<std::string> cf_names = std::get<2>(GetParam()); + + std::vector<ColumnFamilyDescriptor> column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_NE(s.ToString().find(manifest_path), std::string::npos); + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Only a subset of column families in the MANIFEST. + VersionEdit new_cf1; + new_cf1.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1); + new_cf1.SetColumnFamily(1); + Status s; + { + std::string record; + new_cf1.EncodeTo(&record); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector<std::string>& cf_names = std::get<2>(GetParam()); + std::vector<ColumnFamilyDescriptor> column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_NE(s.ToString().find(manifest_path), std::string::npos); + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector<std::string> all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector<std::string>& cf_names = std::get<2>(GetParam()); + std::vector<ColumnFamilyDescriptor> column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_NE(s.ToString().find(manifest_path), std::string::npos); + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector<std::string> all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + { + VersionEdit tmp_edit; + tmp_edit.SetColumnFamily(4); + tmp_edit.SetLogNumber(0); + tmp_edit.SetNextFile(2); + tmp_edit.SetLastSequence(0); + std::string record; + ASSERT_TRUE(tmp_edit.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector<std::string>& cf_names = std::get<2>(GetParam()); + std::vector<ColumnFamilyDescriptor> column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_NE(s.ToString().find(manifest_path), std::string::npos); + ASSERT_TRUE(s.IsCorruption()); + } +} + +TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) { + db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); + PrepareManifest(nullptr, nullptr, &log_writer_); + // Write all column families but no log_number, next_file_number and + // last_sequence. + const std::vector<std::string> all_cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; + Status s; + for (size_t i = 1; i != all_cf_names.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(all_cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + { + VersionEdit tmp_edit; + tmp_edit.SetLogNumber(0); + tmp_edit.SetNextFile(2); + tmp_edit.SetLastSequence(0); + std::string record; + ASSERT_TRUE(tmp_edit.EncodeTo(&record)); + s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + log_writer_.reset(); + s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); + ASSERT_OK(s); + + std::string manifest_path; + VerifyManifest(&manifest_path); + + bool read_only = std::get<1>(GetParam()); + const std::vector<std::string>& cf_names = std::get<2>(GetParam()); + std::vector<ColumnFamilyDescriptor> column_families; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families, + read_only, &db_id, + &has_missing_table_file); + auto iter = + std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName); + if (iter == cf_names.end()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else if (read_only) { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + } else if (cf_names.size() == all_cf_names.size()) { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + } else if (cf_names.size() < all_cf_names.size()) { + ASSERT_TRUE(s.IsInvalidArgument()); + } else { + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily( + kUnknownColumnFamilyName); + ASSERT_EQ(nullptr, cfd); + } +} + +INSTANTIATE_TEST_CASE_P( + BestEffortRecovery, VersionSetTestEmptyDb, + testing::Combine( + /*write_dbid_to_manifest=*/testing::Bool(), + /*read_only=*/testing::Bool(), + /*cf_names=*/ + testing::Values( + std::vector<std::string>(), + std::vector<std::string>({kDefaultColumnFamilyName}), + std::vector<std::string>({VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3}), + std::vector<std::string>({kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1}), + std::vector<std::string>({kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3}), + std::vector<std::string>( + {kDefaultColumnFamilyName, + VersionSetTestBase::kColumnFamilyName1, + VersionSetTestBase::kColumnFamilyName2, + VersionSetTestBase::kColumnFamilyName3, + VersionSetTestEmptyDb::kUnknownColumnFamilyName})))); + +class VersionSetTestMissingFiles : public VersionSetTestBase, + public testing::Test { + public: + VersionSetTestMissingFiles() + : VersionSetTestBase("version_set_test_missing_files"), + block_based_table_options_(), + table_factory_(std::make_shared<BlockBasedTableFactory>( + block_based_table_options_)), + internal_comparator_( + std::make_shared<InternalKeyComparator>(options_.comparator)) {} + + protected: + void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families, + SequenceNumber* last_seqno, + std::unique_ptr<log::Writer>* log_writer) override { + assert(column_families != nullptr); + assert(last_seqno != nullptr); + assert(log_writer != nullptr); + const std::string manifest = DescriptorFileName(dbname_, 1); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr<WritableFileWriter> file_writer; + Status s = WritableFileWriter::Create( + fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, + nullptr); + ASSERT_OK(s); + log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); + VersionEdit new_db; + if (db_options_.write_dbid_to_manifest) { + DBOptions tmp_db_options; + tmp_db_options.env = env_; + std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_)); + std::string db_id; + impl->GetDbIdentityFromIdentityFile(&db_id); + new_db.SetDBId(db_id); + } + { + std::string record; + ASSERT_TRUE(new_db.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + const std::vector<std::string> cf_names = { + kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2, + kColumnFamilyName3}; + uint32_t cf_id = 1; // default cf id is 0 + cf_options_.table_factory = table_factory_; + for (const auto& cf_name : cf_names) { + column_families->emplace_back(cf_name, cf_options_); + if (cf_name == kDefaultColumnFamilyName) { + continue; + } + VersionEdit new_cf; + new_cf.AddColumnFamily(cf_name); + new_cf.SetColumnFamily(cf_id); + std::string record; + ASSERT_TRUE(new_cf.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + + VersionEdit cf_files; + cf_files.SetColumnFamily(cf_id); + cf_files.SetLogNumber(0); + record.clear(); + ASSERT_TRUE(cf_files.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + ++cf_id; + } + SequenceNumber seq = 2; + { + VersionEdit edit; + edit.SetNextFile(7); + edit.SetLastSequence(seq); + std::string record; + ASSERT_TRUE(edit.EncodeTo(&record)); + s = (*log_writer)->AddRecord(record); + ASSERT_OK(s); + } + *last_seqno = seq + 1; + } + + struct SstInfo { + uint64_t file_number; + std::string column_family; + std::string key; // the only key + int level = 0; + SstInfo(uint64_t file_num, const std::string& cf_name, + const std::string& _key) + : SstInfo(file_num, cf_name, _key, 0) {} + SstInfo(uint64_t file_num, const std::string& cf_name, + const std::string& _key, int lvl) + : file_number(file_num), + column_family(cf_name), + key(_key), + level(lvl) {} + }; + + // Create dummy sst, return their metadata. Note that only file name and size + // are used. + void CreateDummyTableFiles(const std::vector<SstInfo>& file_infos, + std::vector<FileMetaData>* file_metas) { + assert(file_metas != nullptr); + for (const auto& info : file_infos) { + uint64_t file_num = info.file_number; + std::string fname = MakeTableFileName(dbname_, file_num); + std::unique_ptr<FSWritableFile> file; + Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr); + ASSERT_OK(s); + std::unique_ptr<WritableFileWriter> fwriter(new WritableFileWriter( + std::move(file), fname, FileOptions(), env_->GetSystemClock().get())); + IntTblPropCollectorFactories int_tbl_prop_collector_factories; + + std::unique_ptr<TableBuilder> builder(table_factory_->NewTableBuilder( + TableBuilderOptions( + immutable_options_, mutable_cf_options_, *internal_comparator_, + &int_tbl_prop_collector_factories, kNoCompression, + CompressionOptions(), + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, + info.column_family, info.level), + fwriter.get())); + InternalKey ikey(info.key, 0, ValueType::kTypeValue); + builder->Add(ikey.Encode(), "value"); + ASSERT_OK(builder->Finish()); + ASSERT_OK(fwriter->Flush()); + uint64_t file_size = 0; + s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr); + ASSERT_OK(s); + ASSERT_NE(0, file_size); + file_metas->emplace_back(file_num, /*file_path_id=*/0, file_size, ikey, + ikey, 0, 0, false, Temperature::kUnknown, 0, 0, + 0, kUnknownFileChecksum, + kUnknownFileChecksumFuncName, kNullUniqueId64x2); + } + } + + // This method updates last_sequence_. + void WriteFileAdditionAndDeletionToManifest( + uint32_t cf, const std::vector<std::pair<int, FileMetaData>>& added_files, + const std::vector<std::pair<int, uint64_t>>& deleted_files) { + VersionEdit edit; + edit.SetColumnFamily(cf); + for (const auto& elem : added_files) { + int level = elem.first; + edit.AddFile(level, elem.second); + } + for (const auto& elem : deleted_files) { + int level = elem.first; + edit.DeleteFile(level, elem.second); + } + edit.SetLastSequence(last_seqno_); + ++last_seqno_; + assert(log_writer_.get() != nullptr); + std::string record; + ASSERT_TRUE(edit.EncodeTo(&record)); + Status s = log_writer_->AddRecord(record); + ASSERT_OK(s); + } + + BlockBasedTableOptions block_based_table_options_; + std::shared_ptr<TableFactory> table_factory_; + std::shared_ptr<InternalKeyComparator> internal_comparator_; + std::vector<ColumnFamilyDescriptor> column_families_; + SequenceNumber last_seqno_; + std::unique_ptr<log::Writer> log_writer_; +}; + +TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) { + std::vector<SstInfo> existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector<FileMetaData> file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector<std::pair<int, FileMetaData>> added_files; + for (uint64_t file_num = 10; file_num < 15; ++file_num) { + std::string smallest_ukey = "a"; + std::string largest_ukey = "b"; + InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue); + InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue); + FileMetaData meta = FileMetaData( + file_num, /*file_path_id=*/0, /*file_size=*/12, smallest_ikey, + largest_ikey, 0, 0, false, Temperature::kUnknown, 0, 0, 0, + kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2); + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>()); + std::vector<std::pair<int, uint64_t>> deleted_files; + deleted_files.emplace_back(0, 10); + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files); + log_writer_.reset(); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_TRUE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0); + ASSERT_TRUE(files.empty()); + } +} + +TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) { + std::vector<SstInfo> existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector<FileMetaData> file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector<std::pair<int, FileMetaData>> added_files; + for (size_t i = 3; i != 5; ++i) { + added_files.emplace_back(0, file_metas[i]); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>()); + + added_files.clear(); + for (uint64_t file_num = 120; file_num < 130; ++file_num) { + std::string smallest_ukey = "a"; + std::string largest_ukey = "b"; + InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue); + InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue); + FileMetaData meta = FileMetaData( + file_num, /*file_path_id=*/0, /*file_size=*/12, smallest_ikey, + largest_ikey, 0, 0, false, Temperature::kUnknown, 0, 0, 0, + kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2); + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>()); + log_writer_.reset(); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_TRUE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0); + if (cfd->GetName() == kDefaultColumnFamilyName) { + ASSERT_EQ(2, files.size()); + for (const auto* fmeta : files) { + if (fmeta->fd.GetNumber() != 107 && fmeta->fd.GetNumber() != 110) { + ASSERT_FALSE(true); + } + } + } else { + ASSERT_TRUE(files.empty()); + } + } +} + +TEST_F(VersionSetTestMissingFiles, NoFileMissing) { + std::vector<SstInfo> existing_files = { + SstInfo(100, kDefaultColumnFamilyName, "a"), + SstInfo(102, kDefaultColumnFamilyName, "b"), + SstInfo(103, kDefaultColumnFamilyName, "c"), + SstInfo(107, kDefaultColumnFamilyName, "d"), + SstInfo(110, kDefaultColumnFamilyName, "e")}; + std::vector<FileMetaData> file_metas; + CreateDummyTableFiles(existing_files, &file_metas); + + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + std::vector<std::pair<int, FileMetaData>> added_files; + for (const auto& meta : file_metas) { + added_files.emplace_back(0, meta); + } + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>()); + std::vector<std::pair<int, uint64_t>> deleted_files; + deleted_files.emplace_back(/*level=*/0, 100); + WriteFileAdditionAndDeletionToManifest( + /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files); + log_writer_.reset(); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); + ASSERT_OK(s); + std::string manifest_path; + VerifyManifest(&manifest_path); + std::string db_id; + bool has_missing_table_file = false; + s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_, + /*read_only=*/false, &db_id, + &has_missing_table_file); + ASSERT_OK(s); + ASSERT_FALSE(has_missing_table_file); + for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) { + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0); + if (cfd->GetName() == kDefaultColumnFamilyName) { + ASSERT_EQ(existing_files.size() - deleted_files.size(), files.size()); + bool has_deleted_file = false; + for (const auto* fmeta : files) { + if (fmeta->fd.GetNumber() == 100) { + has_deleted_file = true; + break; + } + } + ASSERT_FALSE(has_deleted_file); + } else { + ASSERT_TRUE(files.empty()); + } + } +} + +TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) { + db_options_.allow_2pc = true; + NewDB(); + + SstInfo sst(100, kDefaultColumnFamilyName, "a"); + std::vector<FileMetaData> file_metas; + CreateDummyTableFiles({sst}, &file_metas); + + constexpr WalNumber kMinWalNumberToKeep2PC = 10; + VersionEdit edit; + edit.AddFile(0, file_metas[0]); + edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC); + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC); + + for (int i = 0; i < 3; i++) { + CreateNewManifest(); + ReopenDB(); + ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC); + } +} + +class ChargeFileMetadataTest : public DBTestBase { + public: + ChargeFileMetadataTest() + : DBTestBase("charge_file_metadata_test", /*env_do_fsync=*/true) {} +}; + +class ChargeFileMetadataTestWithParam + : public ChargeFileMetadataTest, + public testing::WithParamInterface<CacheEntryRoleOptions::Decision> { + public: + ChargeFileMetadataTestWithParam() {} +}; + +#ifndef ROCKSDB_LITE +INSTANTIATE_TEST_CASE_P( + ChargeFileMetadataTestWithParam, ChargeFileMetadataTestWithParam, + ::testing::Values(CacheEntryRoleOptions::Decision::kEnabled, + CacheEntryRoleOptions::Decision::kDisabled)); + +TEST_P(ChargeFileMetadataTestWithParam, Basic) { + Options options; + BlockBasedTableOptions table_options; + CacheEntryRoleOptions::Decision charge_file_metadata = GetParam(); + table_options.cache_usage_options.options_overrides.insert( + {CacheEntryRole::kFileMetadata, {/*.charged = */ charge_file_metadata}}); + std::shared_ptr<TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>> + file_metadata_charge_only_cache = std::make_shared< + TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>>( + NewLRUCache( + 4 * CacheReservationManagerImpl< + CacheEntryRole::kFileMetadata>::GetDummyEntrySize(), + 0 /* num_shard_bits */, true /* strict_capacity_limit */)); + table_options.block_cache = file_metadata_charge_only_cache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.create_if_missing = true; + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + // Create 128 file metadata, each of which is roughly 1024 bytes. + // This results in 1 * + // CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize() + // cache reservation for file metadata. + for (int i = 1; i <= 128; ++i) { + ASSERT_OK(Put(std::string(1024, 'a'), "va")); + ASSERT_OK(Put("b", "vb")); + ASSERT_OK(Flush()); + } + if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) { + EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), + 1 * CacheReservationManagerImpl< + CacheEntryRole::kFileMetadata>::GetDummyEntrySize()); + + } else { + EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0); + } + + // Create another 128 file metadata. + // This increases the file metadata cache reservation to 2 * + // CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize(). + for (int i = 1; i <= 128; ++i) { + ASSERT_OK(Put(std::string(1024, 'a'), "vva")); + ASSERT_OK(Put("b", "vvb")); + ASSERT_OK(Flush()); + } + if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) { + EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), + 2 * CacheReservationManagerImpl< + CacheEntryRole::kFileMetadata>::GetDummyEntrySize()); + } else { + EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0); + } + // Compaction will create 1 new file metadata, obsolete and delete all 256 + // file metadata above. This results in 1 * + // CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize() + // cache reservation for file metadata. + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "ChargeFileMetadataTestWithParam::" + "PreVerifyingCacheReservationRelease"}}); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + TEST_SYNC_POINT( + "ChargeFileMetadataTestWithParam::PreVerifyingCacheReservationRelease"); + if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) { + EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), + 1 * CacheReservationManagerImpl< + CacheEntryRole::kFileMetadata>::GetDummyEntrySize()); + } else { + EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0); + } + SyncPoint::GetInstance()->DisableProcessing(); + + // Destroying the db will delete the remaining 1 new file metadata + // This results in no cache reservation for file metadata. + Destroy(options); + EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), + 0 * CacheReservationManagerImpl< + CacheEntryRole::kFileMetadata>::GetDummyEntrySize()); + + // Reopen the db with a smaller cache in order to test failure in allocating + // file metadata due to memory limit based on cache capacity + file_metadata_charge_only_cache = std::make_shared< + TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>>( + NewLRUCache(1 * CacheReservationManagerImpl< + CacheEntryRole::kFileMetadata>::GetDummyEntrySize(), + 0 /* num_shard_bits */, true /* strict_capacity_limit */)); + table_options.block_cache = file_metadata_charge_only_cache; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + Reopen(options); + ASSERT_OK(Put(std::string(1024, 'a'), "va")); + ASSERT_OK(Put("b", "vb")); + Status s = Flush(); + if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) { + EXPECT_TRUE(s.IsMemoryLimit()); + EXPECT_TRUE(s.ToString().find( + kCacheEntryRoleToCamelString[static_cast<std::uint32_t>( + CacheEntryRole::kFileMetadata)]) != std::string::npos); + EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") != + std::string::npos); + } else { + EXPECT_TRUE(s.ok()); + } +} +#endif // ROCKSDB_LITE +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |