From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/db/blob/db_blob_basic_test.cc | 1789 +++++++++++++++++++++++++++++ 1 file changed, 1789 insertions(+) create mode 100644 src/rocksdb/db/blob/db_blob_basic_test.cc (limited to 'src/rocksdb/db/blob/db_blob_basic_test.cc') diff --git a/src/rocksdb/db/blob/db_blob_basic_test.cc b/src/rocksdb/db/blob/db_blob_basic_test.cc new file mode 100644 index 000000000..e6832a2ae --- /dev/null +++ b/src/rocksdb/db/blob/db_blob_basic_test.cc @@ -0,0 +1,1789 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include +#include + +#include "cache/compressed_secondary_cache.h" +#include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/sync_point.h" +#include "utilities/fault_injection_env.h" + +namespace ROCKSDB_NAMESPACE { + +class DBBlobBasicTest : public DBTestBase { + protected: + DBBlobBasicTest() + : DBTestBase("db_blob_basic_test", /* env_do_fsync */ false) {} +}; + +TEST_F(DBBlobBasicTest, GetBlob) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob_value[] = "blob_value"; + + ASSERT_OK(Put(key, blob_value)); + + ASSERT_OK(Flush()); + + ASSERT_EQ(Get(key), blob_value); + + // Try again with no I/O allowed. The table and the necessary blocks should + // already be in their respective caches; however, the blob itself can only be + // read from the blob file, so the read should return Incomplete. + ReadOptions read_options; + read_options.read_tier = kBlockCacheTier; + + PinnableSlice result; + ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result) + .IsIncomplete()); +} + +TEST_F(DBBlobBasicTest, GetBlobFromCache) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 2 << 20; // 2MB + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + options.enable_blob_files = true; + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob_value[] = "blob_value"; + + ASSERT_OK(Put(key, blob_value)); + + ASSERT_OK(Flush()); + + ReadOptions read_options; + + read_options.fill_cache = false; + + { + PinnableSlice result; + + read_options.read_tier = kReadAllTier; + ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)); + ASSERT_EQ(result, blob_value); + + result.Reset(); + read_options.read_tier = kBlockCacheTier; + + // Try again with no I/O allowed. Since we didn't re-fill the cache, the + // blob itself can only be read from the blob file, so the read should + // return Incomplete. + ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result) + .IsIncomplete()); + ASSERT_TRUE(result.empty()); + } + + read_options.fill_cache = true; + + { + PinnableSlice result; + + read_options.read_tier = kReadAllTier; + ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)); + ASSERT_EQ(result, blob_value); + + result.Reset(); + read_options.read_tier = kBlockCacheTier; + + // Try again with no I/O allowed. The table and the necessary blocks/blobs + // should already be in their respective caches. + ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)); + ASSERT_EQ(result, blob_value); + } +} + +TEST_F(DBBlobBasicTest, IterateBlobsFromCache) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 2 << 20; // 2MB + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + options.enable_blob_files = true; + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + options.statistics = CreateDBStatistics(); + + Reopen(options); + + int num_blobs = 5; + std::vector keys; + std::vector blobs; + + for (int i = 0; i < num_blobs; ++i) { + keys.push_back("key" + std::to_string(i)); + blobs.push_back("blob" + std::to_string(i)); + ASSERT_OK(Put(keys[i], blobs[i])); + } + ASSERT_OK(Flush()); + + ReadOptions read_options; + + { + read_options.fill_cache = false; + read_options.read_tier = kReadAllTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + + int i = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key().ToString(), keys[i]); + ASSERT_EQ(iter->value().ToString(), blobs[i]); + ++i; + } + ASSERT_EQ(i, num_blobs); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 0); + } + + { + read_options.fill_cache = false; + read_options.read_tier = kBlockCacheTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + + // Try again with no I/O allowed. Since we didn't re-fill the cache, + // the blob itself can only be read from the blob file, so iter->Valid() + // should be false. + iter->SeekToFirst(); + ASSERT_NOK(iter->status()); + ASSERT_FALSE(iter->Valid()); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 0); + } + + { + read_options.fill_cache = true; + read_options.read_tier = kReadAllTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + + // Read blobs from the file and refill the cache. + int i = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key().ToString(), keys[i]); + ASSERT_EQ(iter->value().ToString(), blobs[i]); + ++i; + } + ASSERT_EQ(i, num_blobs); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), + num_blobs); + } + + { + read_options.fill_cache = false; + read_options.read_tier = kBlockCacheTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + + // Try again with no I/O allowed. The table and the necessary blocks/blobs + // should already be in their respective caches. + int i = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key().ToString(), keys[i]); + ASSERT_EQ(iter->value().ToString(), blobs[i]); + ++i; + } + ASSERT_EQ(i, num_blobs); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 0); + } +} + +TEST_F(DBBlobBasicTest, IterateBlobsFromCachePinning) { + constexpr size_t min_blob_size = 6; + + Options options = GetDefaultOptions(); + + LRUCacheOptions cache_options; + cache_options.capacity = 2048; + cache_options.num_shard_bits = 0; + cache_options.metadata_charge_policy = kDontChargeCacheMetadata; + + options.blob_cache = NewLRUCache(cache_options); + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + + Reopen(options); + + // Put then iterate over three key-values. The second value is below the size + // limit and is thus stored inline; the other two are stored separately as + // blobs. We expect to have something pinned in the cache iff we are + // positioned on a blob. + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "long_value"; + static_assert(sizeof(first_value) - 1 >= min_blob_size, + "first_value too short to be stored as blob"); + + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "short"; + static_assert(sizeof(second_value) - 1 < min_blob_size, + "second_value too long to be inlined"); + + ASSERT_OK(Put(second_key, second_value)); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "other_long_value"; + static_assert(sizeof(third_value) - 1 >= min_blob_size, + "third_value too short to be stored as blob"); + + ASSERT_OK(Put(third_key, third_value)); + + ASSERT_OK(Flush()); + + { + ReadOptions read_options; + read_options.fill_cache = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_value); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), second_value); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), third_value); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + + { + ReadOptions read_options; + read_options.fill_cache = false; + read_options.read_tier = kBlockCacheTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_value); + ASSERT_GT(options.blob_cache->GetPinnedUsage(), 0); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), second_value); + ASSERT_EQ(options.blob_cache->GetPinnedUsage(), 0); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), third_value); + ASSERT_GT(options.blob_cache->GetPinnedUsage(), 0); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(options.blob_cache->GetPinnedUsage(), 0); + } + + { + ReadOptions read_options; + read_options.fill_cache = false; + read_options.read_tier = kBlockCacheTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), third_value); + ASSERT_GT(options.blob_cache->GetPinnedUsage(), 0); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), second_value); + ASSERT_EQ(options.blob_cache->GetPinnedUsage(), 0); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_value); + ASSERT_GT(options.blob_cache->GetPinnedUsage(), 0); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(options.blob_cache->GetPinnedUsage(), 0); + } +} + +TEST_F(DBBlobBasicTest, MultiGetBlobs) { + constexpr size_t min_blob_size = 6; + + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + + Reopen(options); + + // Put then retrieve three key-values. The first value is below the size limit + // and is thus stored inline; the other two are stored separately as blobs. + constexpr size_t num_keys = 3; + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "short"; + static_assert(sizeof(first_value) - 1 < min_blob_size, + "first_value too long to be inlined"); + + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "long_value"; + static_assert(sizeof(second_value) - 1 >= min_blob_size, + "second_value too short to be stored as blob"); + + ASSERT_OK(Put(second_key, second_value)); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "other_long_value"; + static_assert(sizeof(third_value) - 1 >= min_blob_size, + "third_value too short to be stored as blob"); + + ASSERT_OK(Put(third_key, third_value)); + + ASSERT_OK(Flush()); + + ReadOptions read_options; + + std::array keys{{first_key, second_key, third_key}}; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } + + // Try again with no I/O allowed. The table and the necessary blocks should + // already be in their respective caches. The first (inlined) value should be + // successfully read; however, the two blob values could only be read from the + // blob file, so for those the read should return Incomplete. + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_TRUE(statuses[1].IsIncomplete()); + + ASSERT_TRUE(statuses[2].IsIncomplete()); + } +} + +TEST_F(DBBlobBasicTest, MultiGetBlobsFromCache) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 2 << 20; // 2MB + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + constexpr size_t min_blob_size = 6; + options.min_blob_size = min_blob_size; + options.create_if_missing = true; + options.enable_blob_files = true; + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + DestroyAndReopen(options); + + // Put then retrieve three key-values. The first value is below the size limit + // and is thus stored inline; the other two are stored separately as blobs. + constexpr size_t num_keys = 3; + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "short"; + static_assert(sizeof(first_value) - 1 < min_blob_size, + "first_value too long to be inlined"); + + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "long_value"; + static_assert(sizeof(second_value) - 1 >= min_blob_size, + "second_value too short to be stored as blob"); + + ASSERT_OK(Put(second_key, second_value)); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "other_long_value"; + static_assert(sizeof(third_value) - 1 >= min_blob_size, + "third_value too short to be stored as blob"); + + ASSERT_OK(Put(third_key, third_value)); + + ASSERT_OK(Flush()); + + ReadOptions read_options; + read_options.fill_cache = false; + + std::array keys{{first_key, second_key, third_key}}; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } + + // Try again with no I/O allowed. The first (inlined) value should be + // successfully read; however, the two blob values could only be read from the + // blob file, so for those the read should return Incomplete. + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_TRUE(statuses[1].IsIncomplete()); + + ASSERT_TRUE(statuses[2].IsIncomplete()); + } + + // Fill the cache when reading blobs from the blob file. + read_options.read_tier = kReadAllTier; + read_options.fill_cache = true; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } + + // Try again with no I/O allowed. All blobs should be successfully read from + // the cache. + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } +} + +#ifndef ROCKSDB_LITE +TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) { + Options options = GetDefaultOptions(); + + // First, create an external SST file ["b"]. + const std::string file_path = dbname_ + "/test.sst"; + { + SstFileWriter sst_file_writer(EnvOptions(), GetDefaultOptions()); + Status s = sst_file_writer.Open(file_path); + ASSERT_OK(s); + ASSERT_OK(sst_file_writer.Put("b", "b_value")); + ASSERT_OK(sst_file_writer.Finish()); + } + + options.enable_blob_files = true; + options.min_blob_size = 1000; + options.use_direct_reads = true; + options.allow_ingest_behind = true; + + // Open DB with fixed-prefix sst-partitioner so that compaction will cut + // new table file when encountering a new key whose 1-byte prefix changes. + constexpr size_t key_len = 1; + options.sst_partitioner_factory = + NewSstPartitionerFixedPrefixFactory(key_len); + + Status s = TryReopen(options); + if (s.IsInvalidArgument()) { + ROCKSDB_GTEST_SKIP("This test requires direct IO support"); + return; + } + ASSERT_OK(s); + + constexpr size_t num_keys = 3; + constexpr size_t blob_size = 3000; + + constexpr char first_key[] = "a"; + const std::string first_blob(blob_size, 'a'); + ASSERT_OK(Put(first_key, first_blob)); + + constexpr char second_key[] = "b"; + const std::string second_blob(2 * blob_size, 'b'); + ASSERT_OK(Put(second_key, second_blob)); + + constexpr char third_key[] = "d"; + const std::string third_blob(blob_size, 'd'); + ASSERT_OK(Put(third_key, third_blob)); + + // first_blob, second_blob and third_blob in the same blob file. + // SST Blob file + // L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'| + // | | | ^ ^ ^ + // | | | | | | + // | | +---------|-------|--------+ + // | +-----------------|-------+ + // +-------------------------+ + ASSERT_OK(Flush()); + + constexpr char fourth_key[] = "c"; + const std::string fourth_blob(blob_size, 'c'); + ASSERT_OK(Put(fourth_key, fourth_blob)); + // fourth_blob in another blob file. + // SST Blob file SST Blob file + // L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'| ["c"] |'cccc'| + // | | | ^ ^ ^ | ^ + // | | | | | | | | + // | | +---------|-------|--------+ +-------+ + // | +-----------------|-------+ + // +-------------------------+ + ASSERT_OK(Flush()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + + // Due to the above sst partitioner, we get 4 L1 files. The blob files are + // unchanged. + // |'aaaa', 'bbbb', 'dddd'| |'cccc'| + // ^ ^ ^ ^ + // | | | | + // L0 | | | | + // L1 ["a"] ["b"] ["c"] | | ["d"] | + // | | | | | | + // | | +---------|-------|---------------+ + // | +-----------------|-------+ + // +-------------------------+ + ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1)); + + { + // Ingest the external SST file into bottommost level. + std::vector ext_files{file_path}; + IngestExternalFileOptions opts; + opts.ingest_behind = true; + ASSERT_OK( + db_->IngestExternalFile(db_->DefaultColumnFamily(), ext_files, opts)); + } + + // Now the database becomes as follows. + // |'aaaa', 'bbbb', 'dddd'| |'cccc'| + // ^ ^ ^ ^ + // | | | | + // L0 | | | | + // L1 ["a"] ["b"] ["c"] | | ["d"] | + // | | | | | | + // | | +---------|-------|---------------+ + // | +-----------------|-------+ + // +-------------------------+ + // + // L6 ["b"] + + { + // Compact ["b"] to bottommost level. + Slice begin = Slice(second_key); + Slice end = Slice(second_key); + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, &begin, &end)); + } + + // |'aaaa', 'bbbb', 'dddd'| |'cccc'| + // ^ ^ ^ ^ + // | | | | + // L0 | | | | + // L1 ["a"] ["c"] | | ["d"] | + // | | | | | + // | +---------|-------|---------------+ + // | +-----------------|-------+ + // +-------|-----------------+ + // | + // L6 ["b"] + ASSERT_EQ(3, NumTableFilesAtLevel(/*level=*/1)); + ASSERT_EQ(1, NumTableFilesAtLevel(/*level=*/6)); + + bool called = false; + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* arg) { + auto* aligned_reqs = static_cast*>(arg); + assert(aligned_reqs); + ASSERT_EQ(1, aligned_reqs->size()); + called = true; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::array keys{{first_key, third_key, second_key}}; + + { + std::array values; + std::array statuses; + + // The MultiGet(), when constructing the KeyContexts, will process the keys + // in such order: a, d, b. The reason is that ["a"] and ["d"] are in L1, + // while ["b"] resides in L6. + // Consequently, the original FSReadRequest list prepared by + // Version::MultiGetblob() will be for "a", "d" and "b". It is unsorted as + // follows: + // + // ["a", offset=30, len=3033], + // ["d", offset=9096, len=3033], + // ["b", offset=3063, len=6033] + // + // If we do not sort them before calling MultiRead() in DirectIO, then the + // underlying IO merging logic will yield two requests. + // + // [offset=0, len=4096] (for "a") + // [offset=0, len=12288] (result of merging the request for "d" and "b") + // + // We need to sort them in Version::MultiGetBlob() so that the underlying + // IO merging logic in DirectIO mode works as expected. The correct + // behavior will be one aligned request: + // + // [offset=0, len=12288] + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_TRUE(called); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_blob); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], third_blob); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], second_blob); + } +} +#endif // !ROCKSDB_LITE + +TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 2 << 20; // 2MB + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + options.min_blob_size = 0; + options.create_if_missing = true; + options.enable_blob_files = true; + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + Reopen(options); + + constexpr size_t kNumBlobFiles = 3; + constexpr size_t kNumBlobsPerFile = 3; + constexpr size_t kNumKeys = kNumBlobsPerFile * kNumBlobFiles; + + std::vector key_strs; + std::vector value_strs; + for (size_t i = 0; i < kNumBlobFiles; ++i) { + for (size_t j = 0; j < kNumBlobsPerFile; ++j) { + std::string key = "key" + std::to_string(i) + "_" + std::to_string(j); + std::string value = + "value_as_blob" + std::to_string(i) + "_" + std::to_string(j); + ASSERT_OK(Put(key, value)); + key_strs.push_back(key); + value_strs.push_back(value); + } + ASSERT_OK(Flush()); + } + assert(key_strs.size() == kNumKeys); + std::array keys; + for (size_t i = 0; i < keys.size(); ++i) { + keys[i] = key_strs[i]; + } + + ReadOptions read_options; + read_options.read_tier = kReadAllTier; + read_options.fill_cache = false; + + { + std::array values; + std::array statuses; + db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(value_strs[i], values[i]); + } + } + + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_TRUE(statuses[i].IsIncomplete()); + ASSERT_TRUE(values[i].empty()); + } + } + + read_options.read_tier = kReadAllTier; + read_options.fill_cache = true; + + { + std::array values; + std::array statuses; + db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(value_strs[i], values[i]); + } + } + + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(value_strs[i], values[i]); + } + } +} + +TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + ASSERT_OK(Put(key, blob)); + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->SetCallBack( + "Version::Get::TamperWithBlobIndex", [](void* arg) { + Slice* const blob_index = static_cast(arg); + assert(blob_index); + assert(!blob_index->empty()); + blob_index->remove_prefix(1); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) + .IsCorruption()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + + DestroyAndReopen(options); + + constexpr size_t kNumOfKeys = 3; + std::array key_strs; + std::array value_strs; + std::array keys; + for (size_t i = 0; i < kNumOfKeys; ++i) { + key_strs[i] = "foo" + std::to_string(i); + value_strs[i] = "blob_value" + std::to_string(i); + ASSERT_OK(Put(key_strs[i], value_strs[i])); + keys[i] = key_strs[i]; + } + + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + ASSERT_OK(Put(key, blob)); + keys[kNumOfKeys] = key; + + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->SetCallBack( + "Version::MultiGet::TamperWithBlobIndex", [&key](void* arg) { + KeyContext* const key_context = static_cast(arg); + assert(key_context); + assert(key_context->key); + + if (*(key_context->key) == key) { + Slice* const blob_index = key_context->value; + assert(blob_index); + assert(!blob_index->empty()); + blob_index->remove_prefix(1); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::array values; + std::array statuses; + db_->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), kNumOfKeys + 1, + keys.data(), values.data(), statuses.data(), + /*sorted_input=*/false); + for (size_t i = 0; i < kNumOfKeys + 1; ++i) { + if (i != kNumOfKeys) { + ASSERT_OK(statuses[i]); + ASSERT_EQ("blob_value" + std::to_string(i), values[i]); + } else { + ASSERT_TRUE(statuses[i].IsCorruption()); + } + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBBlobBasicTest, MultiGetBlob_ExceedSoftLimit) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr size_t kNumOfKeys = 3; + std::array key_bufs; + std::array value_bufs; + std::array keys; + for (size_t i = 0; i < kNumOfKeys; ++i) { + key_bufs[i] = "foo" + std::to_string(i); + value_bufs[i] = "blob_value" + std::to_string(i); + ASSERT_OK(Put(key_bufs[i], value_bufs[i])); + keys[i] = key_bufs[i]; + } + ASSERT_OK(Flush()); + + std::array values; + std::array statuses; + ReadOptions read_opts; + read_opts.value_size_soft_limit = 1; + db_->MultiGet(read_opts, dbfull()->DefaultColumnFamily(), kNumOfKeys, + keys.data(), values.data(), statuses.data(), + /*sorted_input=*/true); + for (const auto& s : statuses) { + ASSERT_TRUE(s.IsAborted()); + } +} + +TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) { + constexpr uint64_t min_blob_size = 10; + + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob[] = "short"; + static_assert(sizeof(short) - 1 < min_blob_size, + "Blob too long to be inlined"); + + // Fake an inlined TTL blob index. + std::string blob_index; + + constexpr uint64_t expiration = 1234567890; + + BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob); + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + ASSERT_OK(Flush()); + + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) + .IsCorruption()); +} + +TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + + // Fake a blob index referencing a non-existent blob file. + std::string blob_index; + + constexpr uint64_t blob_file_number = 1000; + constexpr uint64_t offset = 1234; + constexpr uint64_t size = 5678; + + BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, + kNoCompression); + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + ASSERT_OK(Flush()); + + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) + .IsCorruption()); +} + +#ifndef ROCKSDB_LITE +TEST_F(DBBlobBasicTest, GenerateIOTracing) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + std::string trace_file = dbname_ + "/io_trace_file"; + + Reopen(options); + { + // Create IO trace file + std::unique_ptr trace_writer; + ASSERT_OK( + NewFileTraceWriter(env_, EnvOptions(), trace_file, &trace_writer)); + ASSERT_OK(db_->StartIOTrace(TraceOptions(), std::move(trace_writer))); + + constexpr char key[] = "key"; + constexpr char blob_value[] = "blob_value"; + + ASSERT_OK(Put(key, blob_value)); + ASSERT_OK(Flush()); + ASSERT_EQ(Get(key), blob_value); + + ASSERT_OK(db_->EndIOTrace()); + ASSERT_OK(env_->FileExists(trace_file)); + } + { + // Parse trace file to check file operations related to blob files are + // recorded. + std::unique_ptr trace_reader; + ASSERT_OK( + NewFileTraceReader(env_, EnvOptions(), trace_file, &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast(header.rocksdb_minor_version)); + + // Read records. + int blob_files_op_count = 0; + Status status; + while (true) { + IOTraceRecord record; + status = reader.ReadIOOp(&record); + if (!status.ok()) { + break; + } + if (record.file_name.find("blob") != std::string::npos) { + blob_files_op_count++; + } + } + // Assuming blob files will have Append, Close and then Read operations. + ASSERT_GT(blob_files_op_count, 2); + } +} +#endif // !ROCKSDB_LITE + +TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + Reopen(options); + + ASSERT_OK(dbfull()->DisableFileDeletions()); + constexpr int kNumTableFiles = 2; + for (int i = 0; i < kNumTableFiles; ++i) { + for (char ch = 'a'; ch != 'c'; ++ch) { + std::string key(1, ch); + ASSERT_OK(Put(key, "value" + std::to_string(i))); + } + ASSERT_OK(Flush()); + } + + Close(); + + std::vector files; + ASSERT_OK(env_->GetChildren(dbname_, &files)); + std::string blob_file_path; + uint64_t max_blob_file_num = kInvalidBlobFileNumber; + for (const auto& fname : files) { + uint64_t file_num = 0; + FileType type; + if (ParseFileName(fname, &file_num, /*info_log_name_prefix=*/"", &type) && + type == kBlobFile) { + if (file_num > max_blob_file_num) { + max_blob_file_num = file_num; + blob_file_path = dbname_ + "/" + fname; + } + } + } + ASSERT_OK(env_->DeleteFile(blob_file_path)); + + options.best_efforts_recovery = true; + Reopen(options); + std::string value; + ASSERT_OK(db_->Get(ReadOptions(), "a", &value)); + ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value); +} + +TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) { + Options options = GetDefaultOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + ASSERT_OK(Put("Key1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("Key1", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("Key1", "v3")); + ASSERT_OK(Flush()); + + std::string value; + ASSERT_OK(db_->Get(ReadOptions(), "Key1", &value)); + ASSERT_EQ(Get("Key1"), "v1,v2,v3"); +} + +TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) { + constexpr size_t num_keys = 3; + + Options options = GetDefaultOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + ASSERT_OK(Put("Key0", "v0_0")); + ASSERT_OK(Put("Key1", "v1_0")); + ASSERT_OK(Put("Key2", "v2_0")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("Key0", "v0_1")); + ASSERT_OK(Merge("Key1", "v1_1")); + ASSERT_OK(Flush()); + ASSERT_OK(Merge("Key0", "v0_2")); + ASSERT_OK(Flush()); + + std::array keys{{"Key0", "Key1", "Key2"}}; + std::array values; + std::array statuses; + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], "v0_0,v0_1,v0_2"); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], "v1_0,v1_1"); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], "v2_0"); +} + +#ifndef ROCKSDB_LITE +TEST_F(DBBlobBasicTest, Properties) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key1[] = "key1"; + constexpr size_t key1_size = sizeof(key1) - 1; + + constexpr char key2[] = "key2"; + constexpr size_t key2_size = sizeof(key2) - 1; + + constexpr char key3[] = "key3"; + constexpr size_t key3_size = sizeof(key3) - 1; + + constexpr char blob[] = "00000000000000"; + constexpr size_t blob_size = sizeof(blob) - 1; + + constexpr char longer_blob[] = "00000000000000000000"; + constexpr size_t longer_blob_size = sizeof(longer_blob) - 1; + + ASSERT_OK(Put(key1, blob)); + ASSERT_OK(Put(key2, longer_blob)); + ASSERT_OK(Flush()); + + constexpr size_t first_blob_file_expected_size = + BlobLogHeader::kSize + + BlobLogRecord::CalculateAdjustmentForRecordHeader(key1_size) + blob_size + + BlobLogRecord::CalculateAdjustmentForRecordHeader(key2_size) + + longer_blob_size + BlobLogFooter::kSize; + + ASSERT_OK(Put(key3, blob)); + ASSERT_OK(Flush()); + + constexpr size_t second_blob_file_expected_size = + BlobLogHeader::kSize + + BlobLogRecord::CalculateAdjustmentForRecordHeader(key3_size) + blob_size + + BlobLogFooter::kSize; + + constexpr size_t total_expected_size = + first_blob_file_expected_size + second_blob_file_expected_size; + + // Number of blob files + uint64_t num_blob_files = 0; + ASSERT_TRUE( + db_->GetIntProperty(DB::Properties::kNumBlobFiles, &num_blob_files)); + ASSERT_EQ(num_blob_files, 2); + + // Total size of live blob files + uint64_t live_blob_file_size = 0; + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileSize, + &live_blob_file_size)); + ASSERT_EQ(live_blob_file_size, total_expected_size); + + // Total amount of garbage in live blob files + { + uint64_t live_blob_file_garbage_size = 0; + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileGarbageSize, + &live_blob_file_garbage_size)); + ASSERT_EQ(live_blob_file_garbage_size, 0); + } + + // Total size of all blob files across all versions + // Note: this should be the same as above since we only have one + // version at this point. + uint64_t total_blob_file_size = 0; + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize, + &total_blob_file_size)); + ASSERT_EQ(total_blob_file_size, total_expected_size); + + // Delete key2 to create some garbage + ASSERT_OK(Delete(key2)); + ASSERT_OK(Flush()); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + constexpr size_t expected_garbage_size = + BlobLogRecord::CalculateAdjustmentForRecordHeader(key2_size) + + longer_blob_size; + + constexpr double expected_space_amp = + static_cast(total_expected_size) / + (total_expected_size - expected_garbage_size); + + // Blob file stats + std::string blob_stats; + ASSERT_TRUE(db_->GetProperty(DB::Properties::kBlobStats, &blob_stats)); + + std::ostringstream oss; + oss << "Number of blob files: 2\nTotal size of blob files: " + << total_expected_size + << "\nTotal size of garbage in blob files: " << expected_garbage_size + << "\nBlob file space amplification: " << expected_space_amp << '\n'; + + ASSERT_EQ(blob_stats, oss.str()); + + // Total amount of garbage in live blob files + { + uint64_t live_blob_file_garbage_size = 0; + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileGarbageSize, + &live_blob_file_garbage_size)); + ASSERT_EQ(live_blob_file_garbage_size, expected_garbage_size); + } +} + +TEST_F(DBBlobBasicTest, PropertiesMultiVersion) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key1[] = "key1"; + constexpr char key2[] = "key2"; + constexpr char key3[] = "key3"; + + constexpr size_t key_size = sizeof(key1) - 1; + static_assert(sizeof(key2) - 1 == key_size, "unexpected size: key2"); + static_assert(sizeof(key3) - 1 == key_size, "unexpected size: key3"); + + constexpr char blob[] = "0000000000"; + constexpr size_t blob_size = sizeof(blob) - 1; + + ASSERT_OK(Put(key1, blob)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(key2, blob)); + ASSERT_OK(Flush()); + + // Create an iterator to keep the current version alive + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + ASSERT_OK(iter->status()); + + // Note: the Delete and subsequent compaction results in the first blob file + // not making it to the final version. (It is still part of the previous + // version kept alive by the iterator though.) On the other hand, the Put + // results in a third blob file. + ASSERT_OK(Delete(key1)); + ASSERT_OK(Put(key3, blob)); + ASSERT_OK(Flush()); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + // Total size of all blob files across all versions: between the two versions, + // we should have three blob files of the same size with one blob each. + // The version kept alive by the iterator contains the first and the second + // blob file, while the final version contains the second and the third blob + // file. (The second blob file is thus shared by the two versions but should + // be counted only once.) + uint64_t total_blob_file_size = 0; + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize, + &total_blob_file_size)); + ASSERT_EQ(total_blob_file_size, + 3 * (BlobLogHeader::kSize + + BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) + + blob_size + BlobLogFooter::kSize)); +} +#endif // !ROCKSDB_LITE + +class DBBlobBasicIOErrorTest : public DBBlobBasicTest, + public testing::WithParamInterface { + protected: + DBBlobBasicIOErrorTest() : sync_point_(GetParam()) { + fault_injection_env_.reset(new FaultInjectionTestEnv(env_)); + } + ~DBBlobBasicIOErrorTest() { Close(); } + + std::unique_ptr fault_injection_env_; + std::string sync_point_; +}; + +class DBBlobBasicIOErrorMultiGetTest : public DBBlobBasicIOErrorTest { + public: + DBBlobBasicIOErrorMultiGetTest() : DBBlobBasicIOErrorTest() {} +}; + +INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest, + ::testing::ValuesIn(std::vector{ + "BlobFileReader::OpenFile:NewRandomAccessFile", + "BlobFileReader::GetBlob:ReadFromFile"})); + +INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorMultiGetTest, + ::testing::ValuesIn(std::vector{ + "BlobFileReader::OpenFile:NewRandomAccessFile", + "BlobFileReader::MultiGetBlob:ReadFromFile"})); + +TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) { + Options options; + options.env = fault_injection_env_.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob_value[] = "blob_value"; + + ASSERT_OK(Put(key, blob_value)); + + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_->SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) + .IsIOError()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(DBBlobBasicIOErrorMultiGetTest, MultiGetBlobs_IOError) { + Options options = GetDefaultOptions(); + options.env = fault_injection_env_.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr size_t num_keys = 2; + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "first_value"; + + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "second_value"; + + ASSERT_OK(Put(second_key, second_value)); + + ASSERT_OK(Flush()); + + std::array keys{{first_key, second_key}}; + std::array values; + std::array statuses; + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_->SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_TRUE(statuses[0].IsIOError()); + ASSERT_TRUE(statuses[1].IsIOError()); +} + +TEST_P(DBBlobBasicIOErrorMultiGetTest, MultipleBlobFiles) { + Options options = GetDefaultOptions(); + options.env = fault_injection_env_.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr size_t num_keys = 2; + + constexpr char key1[] = "key1"; + constexpr char value1[] = "blob1"; + + ASSERT_OK(Put(key1, value1)); + ASSERT_OK(Flush()); + + constexpr char key2[] = "key2"; + constexpr char value2[] = "blob2"; + + ASSERT_OK(Put(key2, value2)); + ASSERT_OK(Flush()); + + std::array keys{{key1, key2}}; + std::array values; + std::array statuses; + + bool first_blob_file = true; + SyncPoint::GetInstance()->SetCallBack( + sync_point_, [&first_blob_file, this](void* /* arg */) { + if (first_blob_file) { + first_blob_file = false; + return; + } + fault_injection_env_->SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + keys.data(), values.data(), statuses.data()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_OK(statuses[0]); + ASSERT_EQ(value1, values[0]); + ASSERT_TRUE(statuses[1].IsIOError()); +} + +namespace { + +class ReadBlobCompactionFilter : public CompactionFilter { + public: + ReadBlobCompactionFilter() = default; + const char* Name() const override { + return "rocksdb.compaction.filter.read.blob"; + } + CompactionFilter::Decision FilterV2( + int /*level*/, const Slice& /*key*/, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* /*skip_until*/) const override { + if (value_type != CompactionFilter::ValueType::kValue) { + return CompactionFilter::Decision::kKeep; + } + assert(new_value); + new_value->assign(existing_value.data(), existing_value.size()); + return CompactionFilter::Decision::kChangeValue; + } +}; + +} // anonymous namespace + +TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) { + Options options = GetDefaultOptions(); + options.env = fault_injection_env_.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + std::unique_ptr compaction_filter_guard( + new ReadBlobCompactionFilter); + options.compaction_filter = compaction_filter_guard.get(); + + DestroyAndReopen(options); + constexpr char key[] = "foo"; + constexpr char blob_value[] = "foo_blob_value"; + ASSERT_OK(Put(key, blob_value)); + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_->SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr) + .IsIOError()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBBlobBasicTest, WarmCacheWithBlobsDuringFlush) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 1 << 25; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; + options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + DestroyAndReopen(options); + + constexpr size_t kNumBlobs = 10; + constexpr size_t kValueSize = 100; + + std::string value(kValueSize, 'a'); + + for (size_t i = 1; i <= kNumBlobs; i++) { + ASSERT_OK(Put(std::to_string(i), value)); + ASSERT_OK(Put(std::to_string(i + kNumBlobs), value)); // Add some overlap + ASSERT_OK(Flush()); + ASSERT_EQ(i * 2, options.statistics->getTickerCount(BLOB_DB_CACHE_ADD)); + ASSERT_EQ(value, Get(std::to_string(i))); + ASSERT_EQ(value, Get(std::to_string(i + kNumBlobs))); + ASSERT_EQ(0, options.statistics->getTickerCount(BLOB_DB_CACHE_MISS)); + ASSERT_EQ(i * 2, options.statistics->getTickerCount(BLOB_DB_CACHE_HIT)); + } + + // Verify compaction not counted + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + EXPECT_EQ(kNumBlobs * 2, + options.statistics->getTickerCount(BLOB_DB_CACHE_ADD)); +} + +#ifndef ROCKSDB_LITE +TEST_F(DBBlobBasicTest, DynamicallyWarmCacheDuringFlush) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 1 << 25; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; + options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + DestroyAndReopen(options); + + constexpr size_t kNumBlobs = 10; + constexpr size_t kValueSize = 100; + + std::string value(kValueSize, 'a'); + + for (size_t i = 1; i <= 5; i++) { + ASSERT_OK(Put(std::to_string(i), value)); + ASSERT_OK(Put(std::to_string(i + kNumBlobs), value)); // Add some overlap + ASSERT_OK(Flush()); + ASSERT_EQ(2, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD)); + + ASSERT_EQ(value, Get(std::to_string(i))); + ASSERT_EQ(value, Get(std::to_string(i + kNumBlobs))); + ASSERT_EQ(0, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD)); + ASSERT_EQ(0, + options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS)); + ASSERT_EQ(2, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT)); + } + + ASSERT_OK(dbfull()->SetOptions({{"prepopulate_blob_cache", "kDisable"}})); + + for (size_t i = 6; i <= kNumBlobs; i++) { + ASSERT_OK(Put(std::to_string(i), value)); + ASSERT_OK(Put(std::to_string(i + kNumBlobs), value)); // Add some overlap + ASSERT_OK(Flush()); + ASSERT_EQ(0, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD)); + + ASSERT_EQ(value, Get(std::to_string(i))); + ASSERT_EQ(value, Get(std::to_string(i + kNumBlobs))); + ASSERT_EQ(2, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD)); + ASSERT_EQ(2, + options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS)); + ASSERT_EQ(0, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT)); + } + + // Verify compaction not counted + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + EXPECT_EQ(0, options.statistics->getTickerCount(BLOB_DB_CACHE_ADD)); +} +#endif // !ROCKSDB_LITE + +TEST_F(DBBlobBasicTest, WarmCacheWithBlobsSecondary) { + CompressedSecondaryCacheOptions secondary_cache_opts; + secondary_cache_opts.capacity = 1 << 20; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + secondary_cache_opts.compression_type = kNoCompression; + + LRUCacheOptions primary_cache_opts; + primary_cache_opts.capacity = 1024; + primary_cache_opts.num_shard_bits = 0; + primary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + primary_cache_opts.secondary_cache = + NewCompressedSecondaryCache(secondary_cache_opts); + + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.statistics = CreateDBStatistics(); + options.enable_blob_files = true; + options.blob_cache = NewLRUCache(primary_cache_opts); + options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly; + + DestroyAndReopen(options); + + // Note: only one of the two blobs fit in the primary cache at any given time. + constexpr char first_key[] = "foo"; + constexpr size_t first_blob_size = 512; + const std::string first_blob(first_blob_size, 'a'); + + constexpr char second_key[] = "bar"; + constexpr size_t second_blob_size = 768; + const std::string second_blob(second_blob_size, 'b'); + + // First blob is inserted into primary cache during flush. + ASSERT_OK(Put(first_key, first_blob)); + ASSERT_OK(Flush()); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 1); + + // Second blob is inserted into primary cache during flush, + // First blob is evicted but only a dummy handle is inserted into secondary + // cache. + ASSERT_OK(Put(second_key, second_blob)); + ASSERT_OK(Flush()); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 1); + + // First blob is inserted into primary cache. + // Second blob is evicted but only a dummy handle is inserted into secondary + // cache. + ASSERT_EQ(Get(first_key), first_blob); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 1); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 0); + ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS), + 0); + // Second blob is inserted into primary cache, + // First blob is evicted and is inserted into secondary cache. + ASSERT_EQ(Get(second_key), second_blob); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 1); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 0); + ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS), + 0); + + // First blob's dummy item is inserted into primary cache b/c of lookup. + // Second blob is still in primary cache. + ASSERT_EQ(Get(first_key), first_blob); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 0); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 1); + ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS), + 1); + + // First blob's item is inserted into primary cache b/c of lookup. + // Second blob is evicted and inserted into secondary cache. + ASSERT_EQ(Get(first_key), first_blob); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 0); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 1); + ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS), + 1); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + RegisterCustomObjects(argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3