summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/db_test2.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/db/db_test2.cc
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/db_test2.cc')
-rw-r--r--src/rocksdb/db/db_test2.cc3712
1 files changed, 3712 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_test2.cc b/src/rocksdb/db/db_test2.cc
new file mode 100644
index 00000000..6a00300e
--- /dev/null
+++ b/src/rocksdb/db/db_test2.cc
@@ -0,0 +1,3712 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include <atomic>
+#include <cstdlib>
+#include <functional>
+
+#include "db/db_test_util.h"
+#include "db/read_callback.h"
+#include "port/port.h"
+#include "port/stack_trace.h"
+#include "rocksdb/persistent_cache.h"
+#include "rocksdb/wal_filter.h"
+
+namespace rocksdb {
+
+class DBTest2 : public DBTestBase {
+ public:
+ DBTest2() : DBTestBase("/db_test2") {}
+};
+
+class PrefixFullBloomWithReverseComparator
+ : public DBTestBase,
+ public ::testing::WithParamInterface<bool> {
+ public:
+ PrefixFullBloomWithReverseComparator()
+ : DBTestBase("/prefix_bloom_reverse") {}
+ void SetUp() override { if_cache_filter_ = GetParam(); }
+ bool if_cache_filter_;
+};
+
+TEST_P(PrefixFullBloomWithReverseComparator,
+ PrefixFullBloomWithReverseComparator) {
+ Options options = last_options_;
+ options.comparator = ReverseBytewiseComparator();
+ options.prefix_extractor.reset(NewCappedPrefixTransform(3));
+ options.statistics = rocksdb::CreateDBStatistics();
+ BlockBasedTableOptions bbto;
+ if (if_cache_filter_) {
+ bbto.no_block_cache = false;
+ bbto.cache_index_and_filter_blocks = true;
+ bbto.block_cache = NewLRUCache(1);
+ }
+ bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
+ bbto.whole_key_filtering = false;
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ DestroyAndReopen(options);
+
+ ASSERT_OK(dbfull()->Put(WriteOptions(), "bar123", "foo"));
+ ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
+ ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
+
+ dbfull()->Flush(FlushOptions());
+
+ if (bbto.block_cache) {
+ bbto.block_cache->EraseUnRefEntries();
+ }
+
+ std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
+ iter->Seek("bar345");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("bar234", iter->key().ToString());
+ ASSERT_EQ("foo2", iter->value().ToString());
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("bar123", iter->key().ToString());
+ ASSERT_EQ("foo", iter->value().ToString());
+
+ iter->Seek("foo234");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("foo123", iter->key().ToString());
+ ASSERT_EQ("foo3", iter->value().ToString());
+
+ iter->Seek("bar");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+}
+
+INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator,
+ PrefixFullBloomWithReverseComparator, testing::Bool());
+
+TEST_F(DBTest2, IteratorPropertyVersionNumber) {
+ Put("", "");
+ Iterator* iter1 = db_->NewIterator(ReadOptions());
+ std::string prop_value;
+ ASSERT_OK(
+ iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
+ uint64_t version_number1 =
+ static_cast<uint64_t>(std::atoi(prop_value.c_str()));
+
+ Put("", "");
+ Flush();
+
+ Iterator* iter2 = db_->NewIterator(ReadOptions());
+ ASSERT_OK(
+ iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
+ uint64_t version_number2 =
+ static_cast<uint64_t>(std::atoi(prop_value.c_str()));
+
+ ASSERT_GT(version_number2, version_number1);
+
+ Put("", "");
+
+ Iterator* iter3 = db_->NewIterator(ReadOptions());
+ ASSERT_OK(
+ iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
+ uint64_t version_number3 =
+ static_cast<uint64_t>(std::atoi(prop_value.c_str()));
+
+ ASSERT_EQ(version_number2, version_number3);
+
+ iter1->SeekToFirst();
+ ASSERT_OK(
+ iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
+ uint64_t version_number1_new =
+ static_cast<uint64_t>(std::atoi(prop_value.c_str()));
+ ASSERT_EQ(version_number1, version_number1_new);
+
+ delete iter1;
+ delete iter2;
+ delete iter3;
+}
+
+TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ options.statistics = rocksdb::CreateDBStatistics();
+ BlockBasedTableOptions table_options;
+ table_options.cache_index_and_filter_blocks = true;
+ table_options.filter_policy.reset(NewBloomFilterPolicy(20));
+ options.table_factory.reset(new BlockBasedTableFactory(table_options));
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ Put(1, "a", "begin");
+ Put(1, "z", "end");
+ ASSERT_OK(Flush(1));
+ TryReopenWithColumnFamilies({"default", "pikachu"}, options);
+
+ std::string value;
+ value = Get(1, "a");
+}
+
+TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ options.statistics = rocksdb::CreateDBStatistics();
+ options.max_successive_merges = 3;
+ options.merge_operator = MergeOperators::CreatePutOperator();
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+ Put("poi", "Finch");
+ db_->Merge(WriteOptions(), "poi", "Reese");
+ db_->Merge(WriteOptions(), "poi", "Shaw");
+ db_->Merge(WriteOptions(), "poi", "Root");
+ options.max_successive_merges = 2;
+ Reopen(options);
+}
+
+#ifndef ROCKSDB_LITE
+class DBTestSharedWriteBufferAcrossCFs
+ : public DBTestBase,
+ public testing::WithParamInterface<std::tuple<bool, bool>> {
+ public:
+ DBTestSharedWriteBufferAcrossCFs()
+ : DBTestBase("/db_test_shared_write_buffer") {}
+ void SetUp() override {
+ use_old_interface_ = std::get<0>(GetParam());
+ cost_cache_ = std::get<1>(GetParam());
+ }
+ bool use_old_interface_;
+ bool cost_cache_;
+};
+
+TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
+ Options options = CurrentOptions();
+ options.arena_block_size = 4096;
+
+ // Avoid undeterministic value by malloc_usable_size();
+ // Force arena block size to 1
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "Arena::Arena:0", [&](void* arg) {
+ size_t* block_size = static_cast<size_t*>(arg);
+ *block_size = 1;
+ });
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "Arena::AllocateNewBlock:0", [&](void* arg) {
+ std::pair<size_t*, size_t*>* pair =
+ static_cast<std::pair<size_t*, size_t*>*>(arg);
+ *std::get<0>(*pair) = *std::get<1>(*pair);
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // The total soft write buffer size is about 105000
+ std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
+ ASSERT_LT(cache->GetUsage(), 1024 * 1024);
+
+ if (use_old_interface_) {
+ options.db_write_buffer_size = 120000; // this is the real limit
+ } else if (!cost_cache_) {
+ options.write_buffer_manager.reset(new WriteBufferManager(114285));
+ } else {
+ options.write_buffer_manager.reset(new WriteBufferManager(114285, cache));
+ }
+ options.write_buffer_size = 500000; // this is never hit
+ CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
+
+ WriteOptions wo;
+ wo.disableWAL = true;
+
+ std::function<void()> wait_flush = [&]() {
+ dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
+ };
+
+ // Create some data and flush "default" and "nikitich" so that they
+ // are newer CFs created.
+ ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
+ Flush(3);
+ ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
+ ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
+ Flush(0);
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
+ static_cast<uint64_t>(1));
+
+ ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
+ if (cost_cache_) {
+ ASSERT_GE(cache->GetUsage(), 1024 * 1024);
+ ASSERT_LE(cache->GetUsage(), 2 * 1024 * 1024);
+ }
+ wait_flush();
+ ASSERT_OK(Put(0, Key(1), DummyString(60000), wo));
+ if (cost_cache_) {
+ ASSERT_GE(cache->GetUsage(), 1024 * 1024);
+ ASSERT_LE(cache->GetUsage(), 2 * 1024 * 1024);
+ }
+ wait_flush();
+ ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
+ // No flush should trigger
+ wait_flush();
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
+ static_cast<uint64_t>(1));
+ }
+
+ // Trigger a flush. Flushing "nikitich".
+ ASSERT_OK(Put(3, Key(2), DummyString(30000), wo));
+ wait_flush();
+ ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
+ wait_flush();
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
+ static_cast<uint64_t>(2));
+ }
+
+ // Without hitting the threshold, no flush should trigger.
+ ASSERT_OK(Put(2, Key(1), DummyString(30000), wo));
+ wait_flush();
+ ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
+ wait_flush();
+ ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
+ wait_flush();
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
+ static_cast<uint64_t>(2));
+ }
+
+ // Hit the write buffer limit again. "default"
+ // will have been flushed.
+ ASSERT_OK(Put(2, Key(2), DummyString(10000), wo));
+ wait_flush();
+ ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
+ wait_flush();
+ ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
+ wait_flush();
+ ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
+ wait_flush();
+ ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
+ wait_flush();
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(2));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
+ static_cast<uint64_t>(2));
+ }
+
+ // Trigger another flush. This time "dobrynia". "pikachu" should not
+ // be flushed, althrough it was never flushed.
+ ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
+ wait_flush();
+ ASSERT_OK(Put(2, Key(1), DummyString(80000), wo));
+ wait_flush();
+ ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
+ wait_flush();
+ ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
+ wait_flush();
+
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(2));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
+ static_cast<uint64_t>(2));
+ }
+ if (cost_cache_) {
+ ASSERT_GE(cache->GetUsage(), 1024 * 1024);
+ Close();
+ options.write_buffer_manager.reset();
+ last_options_.write_buffer_manager.reset();
+ ASSERT_LT(cache->GetUsage(), 1024 * 1024);
+ }
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
+ DBTestSharedWriteBufferAcrossCFs,
+ ::testing::Values(std::make_tuple(true, false),
+ std::make_tuple(false, false),
+ std::make_tuple(false, true)));
+
+TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
+ std::string dbname2 = test::PerThreadDBPath("db_shared_wb_db2");
+ Options options = CurrentOptions();
+ options.arena_block_size = 4096;
+ // Avoid undeterministic value by malloc_usable_size();
+ // Force arena block size to 1
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "Arena::Arena:0", [&](void* arg) {
+ size_t* block_size = static_cast<size_t*>(arg);
+ *block_size = 1;
+ });
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "Arena::AllocateNewBlock:0", [&](void* arg) {
+ std::pair<size_t*, size_t*>* pair =
+ static_cast<std::pair<size_t*, size_t*>*>(arg);
+ *std::get<0>(*pair) = *std::get<1>(*pair);
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ options.write_buffer_size = 500000; // this is never hit
+ // Use a write buffer total size so that the soft limit is about
+ // 105000.
+ options.write_buffer_manager.reset(new WriteBufferManager(120000));
+ CreateAndReopenWithCF({"cf1", "cf2"}, options);
+
+ ASSERT_OK(DestroyDB(dbname2, options));
+ DB* db2 = nullptr;
+ ASSERT_OK(DB::Open(options, dbname2, &db2));
+
+ WriteOptions wo;
+ wo.disableWAL = true;
+
+ std::function<void()> wait_flush = [&]() {
+ dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
+ static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+ };
+
+ // Trigger a flush on cf2
+ ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
+ wait_flush();
+ ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
+ wait_flush();
+
+ // Insert to DB2
+ ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
+ wait_flush();
+
+ ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
+ wait_flush();
+ static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") +
+ GetNumberOfSstFilesForColumnFamily(db_, "cf1") +
+ GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
+ static_cast<uint64_t>(0));
+ }
+
+ // Triggering to flush another CF in DB1
+ ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000)));
+ wait_flush();
+ ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
+ wait_flush();
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
+ static_cast<uint64_t>(0));
+ }
+
+ // Triggering flush in DB2.
+ ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000)));
+ wait_flush();
+ ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
+ wait_flush();
+ static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
+ static_cast<uint64_t>(1));
+ }
+
+ delete db2;
+ ASSERT_OK(DestroyDB(dbname2, options));
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, TestWriteBufferNoLimitWithCache) {
+ Options options = CurrentOptions();
+ options.arena_block_size = 4096;
+ std::shared_ptr<Cache> cache =
+ NewLRUCache(LRUCacheOptions(10000000, 1, false, 0.0));
+ options.write_buffer_size = 50000; // this is never hit
+ // Use a write buffer total size so that the soft limit is about
+ // 105000.
+ options.write_buffer_manager.reset(new WriteBufferManager(0, cache));
+ Reopen(options);
+
+ ASSERT_OK(Put("foo", "bar"));
+ // One dummy entry is 1MB.
+ ASSERT_GT(cache->GetUsage(), 500000);
+}
+
+namespace {
+ void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
+ const std::vector<Slice>& keys_must_not_exist) {
+ // Ensure that expected keys exist
+ std::vector<std::string> values;
+ if (keys_must_exist.size() > 0) {
+ std::vector<Status> status_list =
+ db->MultiGet(ReadOptions(), keys_must_exist, &values);
+ for (size_t i = 0; i < keys_must_exist.size(); i++) {
+ ASSERT_OK(status_list[i]);
+ }
+ }
+
+ // Ensure that given keys don't exist
+ if (keys_must_not_exist.size() > 0) {
+ std::vector<Status> status_list =
+ db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
+ for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
+ ASSERT_TRUE(status_list[i].IsNotFound());
+ }
+ }
+ }
+
+} // namespace
+
+TEST_F(DBTest2, WalFilterTest) {
+ class TestWalFilter : public WalFilter {
+ private:
+ // Processing option that is requested to be applied at the given index
+ WalFilter::WalProcessingOption wal_processing_option_;
+ // Index at which to apply wal_processing_option_
+ // At other indexes default wal_processing_option::kContinueProcessing is
+ // returned.
+ size_t apply_option_at_record_index_;
+ // Current record index, incremented with each record encountered.
+ size_t current_record_index_;
+
+ public:
+ TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
+ size_t apply_option_for_record_index)
+ : wal_processing_option_(wal_processing_option),
+ apply_option_at_record_index_(apply_option_for_record_index),
+ current_record_index_(0) {}
+
+ WalProcessingOption LogRecord(const WriteBatch& /*batch*/,
+ WriteBatch* /*new_batch*/,
+ bool* /*batch_changed*/) const override {
+ WalFilter::WalProcessingOption option_to_return;
+
+ if (current_record_index_ == apply_option_at_record_index_) {
+ option_to_return = wal_processing_option_;
+ }
+ else {
+ option_to_return = WalProcessingOption::kContinueProcessing;
+ }
+
+ // Filter is passed as a const object for RocksDB to not modify the
+ // object, however we modify it for our own purpose here and hence
+ // cast the constness away.
+ (const_cast<TestWalFilter*>(this)->current_record_index_)++;
+
+ return option_to_return;
+ }
+
+ const char* Name() const override { return "TestWalFilter"; }
+ };
+
+ // Create 3 batches with two keys each
+ std::vector<std::vector<std::string>> batch_keys(3);
+
+ batch_keys[0].push_back("key1");
+ batch_keys[0].push_back("key2");
+ batch_keys[1].push_back("key3");
+ batch_keys[1].push_back("key4");
+ batch_keys[2].push_back("key5");
+ batch_keys[2].push_back("key6");
+
+ // Test with all WAL processing options
+ for (int option = 0;
+ option < static_cast<int>(
+ WalFilter::WalProcessingOption::kWalProcessingOptionMax);
+ option++) {
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({ "pikachu" }, options);
+
+ // Write given keys in given batches
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+ }
+ dbfull()->Write(WriteOptions(), &batch);
+ }
+
+ WalFilter::WalProcessingOption wal_processing_option =
+ static_cast<WalFilter::WalProcessingOption>(option);
+
+ // Create a test filter that would apply wal_processing_option at the first
+ // record
+ size_t apply_option_for_record_index = 1;
+ TestWalFilter test_wal_filter(wal_processing_option,
+ apply_option_for_record_index);
+
+ // Reopen database with option to use WAL filter
+ options = OptionsForLogIterTest();
+ options.wal_filter = &test_wal_filter;
+ Status status =
+ TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ if (wal_processing_option ==
+ WalFilter::WalProcessingOption::kCorruptedRecord) {
+ assert(!status.ok());
+ // In case of corruption we can turn off paranoid_checks to reopen
+ // databse
+ options.paranoid_checks = false;
+ ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ }
+ else {
+ assert(status.ok());
+ }
+
+ // Compute which keys we expect to be found
+ // and which we expect not to be found after recovery.
+ std::vector<Slice> keys_must_exist;
+ std::vector<Slice> keys_must_not_exist;
+ switch (wal_processing_option) {
+ case WalFilter::WalProcessingOption::kCorruptedRecord:
+ case WalFilter::WalProcessingOption::kContinueProcessing: {
+ fprintf(stderr, "Testing with complete WAL processing\n");
+ // we expect all records to be processed
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ keys_must_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ }
+ break;
+ }
+ case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
+ fprintf(stderr,
+ "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
+ apply_option_for_record_index);
+ // We expect the record with apply_option_for_record_index to be not
+ // found.
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ if (i == apply_option_for_record_index) {
+ keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ else {
+ keys_must_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ }
+ }
+ break;
+ }
+ case WalFilter::WalProcessingOption::kStopReplay: {
+ fprintf(stderr,
+ "Testing with stopping replay from record %" ROCKSDB_PRIszt
+ "\n",
+ apply_option_for_record_index);
+ // We expect records beyond apply_option_for_record_index to be not
+ // found.
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ if (i >= apply_option_for_record_index) {
+ keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ else {
+ keys_must_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ }
+ }
+ break;
+ }
+ default:
+ assert(false); // unhandled case
+ }
+
+ bool checked_after_reopen = false;
+
+ while (true) {
+ // Ensure that expected keys exists
+ // and not expected keys don't exist after recovery
+ ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
+
+ if (checked_after_reopen) {
+ break;
+ }
+
+ // reopen database again to make sure previous log(s) are not used
+ //(even if they were skipped)
+ // reopn database with option to use WAL filter
+ options = OptionsForLogIterTest();
+ ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+
+ checked_after_reopen = true;
+ }
+ }
+}
+
+TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
+ class ChangeBatchHandler : public WriteBatch::Handler {
+ private:
+ // Batch to insert keys in
+ WriteBatch* new_write_batch_;
+ // Number of keys to add in the new batch
+ size_t num_keys_to_add_in_new_batch_;
+ // Number of keys added to new batch
+ size_t num_keys_added_;
+
+ public:
+ ChangeBatchHandler(WriteBatch* new_write_batch,
+ size_t num_keys_to_add_in_new_batch)
+ : new_write_batch_(new_write_batch),
+ num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
+ num_keys_added_(0) {}
+ void Put(const Slice& key, const Slice& value) override {
+ if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
+ new_write_batch_->Put(key, value);
+ ++num_keys_added_;
+ }
+ }
+ };
+
+ class TestWalFilterWithChangeBatch : public WalFilter {
+ private:
+ // Index at which to start changing records
+ size_t change_records_from_index_;
+ // Number of keys to add in the new batch
+ size_t num_keys_to_add_in_new_batch_;
+ // Current record index, incremented with each record encountered.
+ size_t current_record_index_;
+
+ public:
+ TestWalFilterWithChangeBatch(size_t change_records_from_index,
+ size_t num_keys_to_add_in_new_batch)
+ : change_records_from_index_(change_records_from_index),
+ num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
+ current_record_index_(0) {}
+
+ WalProcessingOption LogRecord(const WriteBatch& batch,
+ WriteBatch* new_batch,
+ bool* batch_changed) const override {
+ if (current_record_index_ >= change_records_from_index_) {
+ ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
+ batch.Iterate(&handler);
+ *batch_changed = true;
+ }
+
+ // Filter is passed as a const object for RocksDB to not modify the
+ // object, however we modify it for our own purpose here and hence
+ // cast the constness away.
+ (const_cast<TestWalFilterWithChangeBatch*>(this)
+ ->current_record_index_)++;
+
+ return WalProcessingOption::kContinueProcessing;
+ }
+
+ const char* Name() const override { return "TestWalFilterWithChangeBatch"; }
+ };
+
+ std::vector<std::vector<std::string>> batch_keys(3);
+
+ batch_keys[0].push_back("key1");
+ batch_keys[0].push_back("key2");
+ batch_keys[1].push_back("key3");
+ batch_keys[1].push_back("key4");
+ batch_keys[2].push_back("key5");
+ batch_keys[2].push_back("key6");
+
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({ "pikachu" }, options);
+
+ // Write given keys in given batches
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+ }
+ dbfull()->Write(WriteOptions(), &batch);
+ }
+
+ // Create a test filter that would apply wal_processing_option at the first
+ // record
+ size_t change_records_from_index = 1;
+ size_t num_keys_to_add_in_new_batch = 1;
+ TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
+ change_records_from_index, num_keys_to_add_in_new_batch);
+
+ // Reopen database with option to use WAL filter
+ options = OptionsForLogIterTest();
+ options.wal_filter = &test_wal_filter_with_change_batch;
+ ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+
+ // Ensure that all keys exist before change_records_from_index_
+ // And after that index only single key exists
+ // as our filter adds only single key for each batch
+ std::vector<Slice> keys_must_exist;
+ std::vector<Slice> keys_must_not_exist;
+
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
+ keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ else {
+ keys_must_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ }
+ }
+
+ bool checked_after_reopen = false;
+
+ while (true) {
+ // Ensure that expected keys exists
+ // and not expected keys don't exist after recovery
+ ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
+
+ if (checked_after_reopen) {
+ break;
+ }
+
+ // reopen database again to make sure previous log(s) are not used
+ //(even if they were skipped)
+ // reopn database with option to use WAL filter
+ options = OptionsForLogIterTest();
+ ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+
+ checked_after_reopen = true;
+ }
+}
+
+TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
+ class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
+ public:
+ WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch,
+ bool* batch_changed) const override {
+ *new_batch = batch;
+ new_batch->Put("key_extra", "value_extra");
+ *batch_changed = true;
+ return WalProcessingOption::kContinueProcessing;
+ }
+
+ const char* Name() const override {
+ return "WalFilterTestWithChangeBatchExtraKeys";
+ }
+ };
+
+ std::vector<std::vector<std::string>> batch_keys(3);
+
+ batch_keys[0].push_back("key1");
+ batch_keys[0].push_back("key2");
+ batch_keys[1].push_back("key3");
+ batch_keys[1].push_back("key4");
+ batch_keys[2].push_back("key5");
+ batch_keys[2].push_back("key6");
+
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({ "pikachu" }, options);
+
+ // Write given keys in given batches
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
+ }
+ dbfull()->Write(WriteOptions(), &batch);
+ }
+
+ // Create a test filter that would add extra keys
+ TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
+
+ // Reopen database with option to use WAL filter
+ options = OptionsForLogIterTest();
+ options.wal_filter = &test_wal_filter_extra_keys;
+ Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
+ ASSERT_TRUE(status.IsNotSupported());
+
+ // Reopen without filter, now reopen should succeed - previous
+ // attempt to open must not have altered the db.
+ options = OptionsForLogIterTest();
+ ReopenWithColumnFamilies({ "default", "pikachu" }, options);
+
+ std::vector<Slice> keys_must_exist;
+ std::vector<Slice> keys_must_not_exist; // empty vector
+
+ for (size_t i = 0; i < batch_keys.size(); i++) {
+ for (size_t j = 0; j < batch_keys[i].size(); j++) {
+ keys_must_exist.push_back(Slice(batch_keys[i][j]));
+ }
+ }
+
+ ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
+}
+
+TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
+ class TestWalFilterWithColumnFamilies : public WalFilter {
+ private:
+ // column_family_id -> log_number map (provided to WALFilter)
+ std::map<uint32_t, uint64_t> cf_log_number_map_;
+ // column_family_name -> column_family_id map (provided to WALFilter)
+ std::map<std::string, uint32_t> cf_name_id_map_;
+ // column_family_name -> keys_found_in_wal map
+ // We store keys that are applicable to the column_family
+ // during recovery (i.e. aren't already flushed to SST file(s))
+ // for verification against the keys we expect.
+ std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
+ public:
+ void ColumnFamilyLogNumberMap(
+ const std::map<uint32_t, uint64_t>& cf_lognumber_map,
+ const std::map<std::string, uint32_t>& cf_name_id_map) override {
+ cf_log_number_map_ = cf_lognumber_map;
+ cf_name_id_map_ = cf_name_id_map;
+ }
+
+ WalProcessingOption LogRecordFound(unsigned long long log_number,
+ const std::string& /*log_file_name*/,
+ const WriteBatch& batch,
+ WriteBatch* /*new_batch*/,
+ bool* /*batch_changed*/) override {
+ class LogRecordBatchHandler : public WriteBatch::Handler {
+ private:
+ const std::map<uint32_t, uint64_t> & cf_log_number_map_;
+ std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
+ unsigned long long log_number_;
+ public:
+ LogRecordBatchHandler(unsigned long long current_log_number,
+ const std::map<uint32_t, uint64_t> & cf_log_number_map,
+ std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
+ cf_log_number_map_(cf_log_number_map),
+ cf_wal_keys_(cf_wal_keys),
+ log_number_(current_log_number){}
+
+ Status PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& /*value*/) override {
+ auto it = cf_log_number_map_.find(column_family_id);
+ assert(it != cf_log_number_map_.end());
+ unsigned long long log_number_for_cf = it->second;
+ // If the current record is applicable for column_family_id
+ // (i.e. isn't flushed to SST file(s) for column_family_id)
+ // add it to the cf_wal_keys_ map for verification.
+ if (log_number_ >= log_number_for_cf) {
+ cf_wal_keys_[column_family_id].push_back(std::string(key.data(),
+ key.size()));
+ }
+ return Status::OK();
+ }
+ } handler(log_number, cf_log_number_map_, cf_wal_keys_);
+
+ batch.Iterate(&handler);
+
+ return WalProcessingOption::kContinueProcessing;
+ }
+
+ const char* Name() const override {
+ return "WalFilterTestWithColumnFamilies";
+ }
+
+ const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
+ return cf_wal_keys_;
+ }
+
+ const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
+ return cf_name_id_map_;
+ }
+ };
+
+ std::vector<std::vector<std::string>> batch_keys_pre_flush(3);
+
+ batch_keys_pre_flush[0].push_back("key1");
+ batch_keys_pre_flush[0].push_back("key2");
+ batch_keys_pre_flush[1].push_back("key3");
+ batch_keys_pre_flush[1].push_back("key4");
+ batch_keys_pre_flush[2].push_back("key5");
+ batch_keys_pre_flush[2].push_back("key6");
+
+ Options options = OptionsForLogIterTest();
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({ "pikachu" }, options);
+
+ // Write given keys in given batches
+ for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
+ batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
+ batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
+ }
+ dbfull()->Write(WriteOptions(), &batch);
+ }
+
+ //Flush default column-family
+ db_->Flush(FlushOptions(), handles_[0]);
+
+ // Do some more writes
+ std::vector<std::vector<std::string>> batch_keys_post_flush(3);
+
+ batch_keys_post_flush[0].push_back("key7");
+ batch_keys_post_flush[0].push_back("key8");
+ batch_keys_post_flush[1].push_back("key9");
+ batch_keys_post_flush[1].push_back("key10");
+ batch_keys_post_flush[2].push_back("key11");
+ batch_keys_post_flush[2].push_back("key12");
+
+ // Write given keys in given batches
+ for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
+ batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
+ batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
+ }
+ dbfull()->Write(WriteOptions(), &batch);
+ }
+
+ // On Recovery we should only find the second batch applicable to default CF
+ // But both batches applicable to pikachu CF
+
+ // Create a test filter that would add extra keys
+ TestWalFilterWithColumnFamilies test_wal_filter_column_families;
+
+ // Reopen database with option to use WAL filter
+ options = OptionsForLogIterTest();
+ options.wal_filter = &test_wal_filter_column_families;
+ Status status =
+ TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
+ ASSERT_TRUE(status.ok());
+
+ // verify that handles_[0] only has post_flush keys
+ // while handles_[1] has pre and post flush keys
+ auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys();
+ auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
+ size_t index = 0;
+ auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
+ //default column-family, only post_flush keys are expected
+ for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
+ for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
+ Slice key_from_the_log(keys_cf[index++]);
+ Slice batch_key(batch_keys_post_flush[i][j]);
+ ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+ }
+ }
+ ASSERT_TRUE(index == keys_cf.size());
+
+ index = 0;
+ keys_cf = cf_wal_keys[name_id_map["pikachu"]];
+ //pikachu column-family, all keys are expected
+ for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
+ for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
+ Slice key_from_the_log(keys_cf[index++]);
+ Slice batch_key(batch_keys_pre_flush[i][j]);
+ ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+ }
+ }
+
+ for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
+ for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
+ Slice key_from_the_log(keys_cf[index++]);
+ Slice batch_key(batch_keys_post_flush[i][j]);
+ ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
+ }
+ }
+ ASSERT_TRUE(index == keys_cf.size());
+}
+
+// Temporarily disable it because the test is flaky.
+TEST_F(DBTest2, DISABLED_PresetCompressionDict) {
+ // Verifies that compression ratio improves when dictionary is enabled, and
+ // improves even further when the dictionary is trained by ZSTD.
+ const size_t kBlockSizeBytes = 4 << 10;
+ const size_t kL0FileBytes = 128 << 10;
+ const size_t kApproxPerBlockOverheadBytes = 50;
+ const int kNumL0Files = 5;
+
+ Options options;
+ options.env = CurrentOptions().env; // Make sure to use any custom env that the test is configured with.
+ options.allow_concurrent_memtable_write = false;
+ options.arena_block_size = kBlockSizeBytes;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.level0_file_num_compaction_trigger = kNumL0Files;
+ options.memtable_factory.reset(
+ new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
+ options.num_levels = 2;
+ options.target_file_size_base = kL0FileBytes;
+ options.target_file_size_multiplier = 2;
+ options.write_buffer_size = kL0FileBytes;
+ BlockBasedTableOptions table_options;
+ table_options.block_size = kBlockSizeBytes;
+ std::vector<CompressionType> compression_types;
+ if (Zlib_Supported()) {
+ compression_types.push_back(kZlibCompression);
+ }
+#if LZ4_VERSION_NUMBER >= 10400 // r124+
+ compression_types.push_back(kLZ4Compression);
+ compression_types.push_back(kLZ4HCCompression);
+#endif // LZ4_VERSION_NUMBER >= 10400
+ if (ZSTD_Supported()) {
+ compression_types.push_back(kZSTD);
+ }
+
+ for (auto compression_type : compression_types) {
+ options.compression = compression_type;
+ size_t prev_out_bytes;
+ for (int i = 0; i < 3; ++i) {
+ // First iteration: compress without preset dictionary
+ // Second iteration: compress with preset dictionary
+ // Third iteration (zstd only): compress with zstd-trained dictionary
+ //
+ // To make sure the compression dictionary has the intended effect, we
+ // verify the compressed size is smaller in successive iterations. Also in
+ // the non-first iterations, verify the data we get out is the same data
+ // we put in.
+ switch (i) {
+ case 0:
+ options.compression_opts.max_dict_bytes = 0;
+ options.compression_opts.zstd_max_train_bytes = 0;
+ break;
+ case 1:
+ options.compression_opts.max_dict_bytes = 4 * kBlockSizeBytes;
+ options.compression_opts.zstd_max_train_bytes = 0;
+ break;
+ case 2:
+ if (compression_type != kZSTD) {
+ continue;
+ }
+ options.compression_opts.max_dict_bytes = 4 * kBlockSizeBytes;
+ options.compression_opts.zstd_max_train_bytes = kL0FileBytes;
+ break;
+ default:
+ assert(false);
+ }
+
+ options.statistics = rocksdb::CreateDBStatistics();
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+ std::string seq_datas[10];
+ for (int j = 0; j < 10; ++j) {
+ seq_datas[j] =
+ RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
+ }
+
+ ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
+ for (int j = 0; j < kNumL0Files; ++j) {
+ for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
+ auto key_num = j * (kL0FileBytes / kBlockSizeBytes) + k;
+ ASSERT_OK(Put(1, Key(static_cast<int>(key_num)),
+ seq_datas[(key_num / 10) % 10]));
+ }
+ dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+ ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
+ }
+ dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
+ true /* disallow_trivial_move */);
+ ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
+ ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
+
+ size_t out_bytes = 0;
+ std::vector<std::string> files;
+ GetSstFiles(env_, dbname_, &files);
+ for (const auto& file : files) {
+ uint64_t curr_bytes;
+ env_->GetFileSize(dbname_ + "/" + file, &curr_bytes);
+ out_bytes += static_cast<size_t>(curr_bytes);
+ }
+
+ for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
+ j++) {
+ ASSERT_EQ(seq_datas[(j / 10) % 10], Get(1, Key(static_cast<int>(j))));
+ }
+ if (i) {
+ ASSERT_GT(prev_out_bytes, out_bytes);
+ }
+ prev_out_bytes = out_bytes;
+ DestroyAndReopen(options);
+ }
+ }
+}
+
+TEST_F(DBTest2, PresetCompressionDictLocality) {
+ if (!ZSTD_Supported()) {
+ return;
+ }
+ // Verifies that compression dictionary is generated from local data. The
+ // verification simply checks all output SSTs have different compression
+ // dictionaries. We do not verify effectiveness as that'd likely be flaky in
+ // the future.
+ const int kNumEntriesPerFile = 1 << 10; // 1KB
+ const int kNumBytesPerEntry = 1 << 10; // 1KB
+ const int kNumFiles = 4;
+ Options options = CurrentOptions();
+ options.compression = kZSTD;
+ options.compression_opts.max_dict_bytes = 1 << 14; // 16KB
+ options.compression_opts.zstd_max_train_bytes = 1 << 18; // 256KB
+ options.statistics = rocksdb::CreateDBStatistics();
+ options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
+ BlockBasedTableOptions table_options;
+ table_options.cache_index_and_filter_blocks = true;
+ options.table_factory.reset(new BlockBasedTableFactory(table_options));
+ Reopen(options);
+
+ Random rnd(301);
+ for (int i = 0; i < kNumFiles; ++i) {
+ for (int j = 0; j < kNumEntriesPerFile; ++j) {
+ ASSERT_OK(Put(Key(i * kNumEntriesPerFile + j),
+ RandomString(&rnd, kNumBytesPerEntry)));
+ }
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(1);
+ ASSERT_EQ(NumTableFilesAtLevel(1), i + 1);
+ }
+
+ // Store all the dictionaries generated during a full compaction.
+ std::vector<std::string> compression_dicts;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
+ [&](void* arg) {
+ compression_dicts.emplace_back(static_cast<Slice*>(arg)->ToString());
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ CompactRangeOptions compact_range_opts;
+ compact_range_opts.bottommost_level_compaction =
+ BottommostLevelCompaction::kForce;
+ ASSERT_OK(db_->CompactRange(compact_range_opts, nullptr, nullptr));
+
+ // Dictionary compression should not be so good as to compress four totally
+ // random files into one. If it does then there's probably something wrong
+ // with the test.
+ ASSERT_GT(NumTableFilesAtLevel(1), 1);
+
+ // Furthermore, there should be one compression dictionary generated per file.
+ // And they should all be different from each other.
+ ASSERT_EQ(NumTableFilesAtLevel(1),
+ static_cast<int>(compression_dicts.size()));
+ for (size_t i = 1; i < compression_dicts.size(); ++i) {
+ std::string& a = compression_dicts[i - 1];
+ std::string& b = compression_dicts[i];
+ size_t alen = a.size();
+ size_t blen = b.size();
+ ASSERT_TRUE(alen != blen || memcmp(a.data(), b.data(), alen) != 0);
+ }
+}
+
+class CompactionCompressionListener : public EventListener {
+ public:
+ explicit CompactionCompressionListener(Options* db_options)
+ : db_options_(db_options) {}
+
+ void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
+ // Figure out last level with files
+ int bottommost_level = 0;
+ for (int level = 0; level < db->NumberLevels(); level++) {
+ std::string files_at_level;
+ ASSERT_TRUE(
+ db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
+ &files_at_level));
+ if (files_at_level != "0") {
+ bottommost_level = level;
+ }
+ }
+
+ if (db_options_->bottommost_compression != kDisableCompressionOption &&
+ ci.output_level == bottommost_level) {
+ ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
+ } else if (db_options_->compression_per_level.size() != 0) {
+ ASSERT_EQ(ci.compression,
+ db_options_->compression_per_level[ci.output_level]);
+ } else {
+ ASSERT_EQ(ci.compression, db_options_->compression);
+ }
+ max_level_checked = std::max(max_level_checked, ci.output_level);
+ }
+
+ int max_level_checked = 0;
+ const Options* db_options_;
+};
+
+TEST_F(DBTest2, CompressionOptions) {
+ if (!Zlib_Supported() || !Snappy_Supported()) {
+ return;
+ }
+
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = 2;
+ options.max_bytes_for_level_base = 100;
+ options.max_bytes_for_level_multiplier = 2;
+ options.num_levels = 7;
+ options.max_background_compactions = 1;
+
+ CompactionCompressionListener* listener =
+ new CompactionCompressionListener(&options);
+ options.listeners.emplace_back(listener);
+
+ const int kKeySize = 5;
+ const int kValSize = 20;
+ Random rnd(301);
+
+ for (int iter = 0; iter <= 2; iter++) {
+ listener->max_level_checked = 0;
+
+ if (iter == 0) {
+ // Use different compression algorithms for different levels but
+ // always use Zlib for bottommost level
+ options.compression_per_level = {kNoCompression, kNoCompression,
+ kNoCompression, kSnappyCompression,
+ kSnappyCompression, kSnappyCompression,
+ kZlibCompression};
+ options.compression = kNoCompression;
+ options.bottommost_compression = kZlibCompression;
+ } else if (iter == 1) {
+ // Use Snappy except for bottommost level use ZLib
+ options.compression_per_level = {};
+ options.compression = kSnappyCompression;
+ options.bottommost_compression = kZlibCompression;
+ } else if (iter == 2) {
+ // Use Snappy everywhere
+ options.compression_per_level = {};
+ options.compression = kSnappyCompression;
+ options.bottommost_compression = kDisableCompressionOption;
+ }
+
+ DestroyAndReopen(options);
+ // Write 10 random files
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 5; j++) {
+ ASSERT_OK(
+ Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize)));
+ }
+ ASSERT_OK(Flush());
+ dbfull()->TEST_WaitForCompact();
+ }
+
+ // Make sure that we wrote enough to check all 7 levels
+ ASSERT_EQ(listener->max_level_checked, 6);
+ }
+}
+
+class CompactionStallTestListener : public EventListener {
+ public:
+ CompactionStallTestListener() : compacting_files_cnt_(0), compacted_files_cnt_(0) {}
+
+ void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
+ ASSERT_EQ(ci.cf_name, "default");
+ ASSERT_EQ(ci.base_input_level, 0);
+ ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
+ compacting_files_cnt_ += ci.input_files.size();
+ }
+
+ void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
+ ASSERT_EQ(ci.cf_name, "default");
+ ASSERT_EQ(ci.base_input_level, 0);
+ ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
+ compacted_files_cnt_ += ci.input_files.size();
+ }
+
+ std::atomic<size_t> compacting_files_cnt_;
+ std::atomic<size_t> compacted_files_cnt_;
+};
+
+TEST_F(DBTest2, CompactionStall) {
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
+ {"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
+ {"DBTest2::CompactionStall:2",
+ "DBImpl::NotifyOnCompactionBegin::UnlockMutex"},
+ {"DBTest2::CompactionStall:3",
+ "DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = 4;
+ options.max_background_compactions = 40;
+ CompactionStallTestListener* listener = new CompactionStallTestListener();
+ options.listeners.emplace_back(listener);
+ DestroyAndReopen(options);
+ // make sure all background compaction jobs can be scheduled
+ auto stop_token =
+ dbfull()->TEST_write_controler().GetCompactionPressureToken();
+
+ Random rnd(301);
+
+ // 4 Files in L0
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ // Wait for compaction to be triggered
+ TEST_SYNC_POINT("DBTest2::CompactionStall:0");
+
+ // Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again
+ // at DBTest2::CompactionStall::1
+ rocksdb::SyncPoint::GetInstance()->ClearTrace();
+
+ // Another 6 L0 files to trigger compaction again
+ for (int i = 0; i < 6; i++) {
+ for (int j = 0; j < 10; j++) {
+ ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ // Wait for another compaction to be triggered
+ TEST_SYNC_POINT("DBTest2::CompactionStall:1");
+
+ // Hold NotifyOnCompactionBegin in the unlock mutex section
+ TEST_SYNC_POINT("DBTest2::CompactionStall:2");
+
+ // Hold NotifyOnCompactionCompleted in the unlock mutex section
+ TEST_SYNC_POINT("DBTest2::CompactionStall:3");
+
+ dbfull()->TEST_WaitForCompact();
+ ASSERT_LT(NumTableFilesAtLevel(0),
+ options.level0_file_num_compaction_trigger);
+ ASSERT_GT(listener->compacted_files_cnt_.load(),
+ 10 - options.level0_file_num_compaction_trigger);
+ ASSERT_EQ(listener->compacting_files_cnt_.load(), listener->compacted_files_cnt_.load());
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+#endif // ROCKSDB_LITE
+
+TEST_F(DBTest2, FirstSnapshotTest) {
+ Options options;
+ options.write_buffer_size = 100000; // Small write buffer
+ options = CurrentOptions(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ // This snapshot will have sequence number 0 what is expected behaviour.
+ const Snapshot* s1 = db_->GetSnapshot();
+
+ Put(1, "k1", std::string(100000, 'x')); // Fill memtable
+ Put(1, "k2", std::string(100000, 'y')); // Trigger flush
+
+ db_->ReleaseSnapshot(s1);
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTest2, DuplicateSnapshot) {
+ Options options;
+ options = CurrentOptions(options);
+ std::vector<const Snapshot*> snapshots;
+ DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
+ SequenceNumber oldest_ww_snap, first_ww_snap;
+
+ Put("k", "v"); // inc seq
+ snapshots.push_back(db_->GetSnapshot());
+ snapshots.push_back(db_->GetSnapshot());
+ Put("k", "v"); // inc seq
+ snapshots.push_back(db_->GetSnapshot());
+ snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
+ first_ww_snap = snapshots.back()->GetSequenceNumber();
+ Put("k", "v"); // inc seq
+ snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
+ snapshots.push_back(db_->GetSnapshot());
+ Put("k", "v"); // inc seq
+ snapshots.push_back(db_->GetSnapshot());
+
+ {
+ InstrumentedMutexLock l(dbi->mutex());
+ auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap);
+ ASSERT_EQ(seqs.size(), 4); // duplicates are not counted
+ ASSERT_EQ(oldest_ww_snap, first_ww_snap);
+ }
+
+ for (auto s : snapshots) {
+ db_->ReleaseSnapshot(s);
+ }
+}
+#endif // ROCKSDB_LITE
+
+class PinL0IndexAndFilterBlocksTest
+ : public DBTestBase,
+ public testing::WithParamInterface<std::tuple<bool, bool>> {
+ public:
+ PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {}
+ void SetUp() override {
+ infinite_max_files_ = std::get<0>(GetParam());
+ disallow_preload_ = std::get<1>(GetParam());
+ }
+
+ void CreateTwoLevels(Options* options, bool close_afterwards) {
+ if (infinite_max_files_) {
+ options->max_open_files = -1;
+ }
+ options->create_if_missing = true;
+ options->statistics = rocksdb::CreateDBStatistics();
+ BlockBasedTableOptions table_options;
+ table_options.cache_index_and_filter_blocks = true;
+ table_options.pin_l0_filter_and_index_blocks_in_cache = true;
+ table_options.filter_policy.reset(NewBloomFilterPolicy(20));
+ options->table_factory.reset(new BlockBasedTableFactory(table_options));
+ CreateAndReopenWithCF({"pikachu"}, *options);
+
+ Put(1, "a", "begin");
+ Put(1, "z", "end");
+ ASSERT_OK(Flush(1));
+ // move this table to L1
+ dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
+
+ // reset block cache
+ table_options.block_cache = NewLRUCache(64 * 1024);
+ options->table_factory.reset(NewBlockBasedTableFactory(table_options));
+ TryReopenWithColumnFamilies({"default", "pikachu"}, *options);
+ // create new table at L0
+ Put(1, "a2", "begin2");
+ Put(1, "z2", "end2");
+ ASSERT_OK(Flush(1));
+
+ if (close_afterwards) {
+ Close(); // This ensures that there is no ref to block cache entries
+ }
+ table_options.block_cache->EraseUnRefEntries();
+ }
+
+ bool infinite_max_files_;
+ bool disallow_preload_;
+};
+
+TEST_P(PinL0IndexAndFilterBlocksTest,
+ IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
+ Options options = CurrentOptions();
+ if (infinite_max_files_) {
+ options.max_open_files = -1;
+ }
+ options.create_if_missing = true;
+ options.statistics = rocksdb::CreateDBStatistics();
+ BlockBasedTableOptions table_options;
+ table_options.cache_index_and_filter_blocks = true;
+ table_options.pin_l0_filter_and_index_blocks_in_cache = true;
+ table_options.filter_policy.reset(NewBloomFilterPolicy(20));
+ options.table_factory.reset(new BlockBasedTableFactory(table_options));
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ ASSERT_OK(Put(1, "key", "val"));
+ // Create a new table.
+ ASSERT_OK(Flush(1));
+
+ // index/filter blocks added to block cache right after table creation.
+ ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+
+ // only index/filter were added
+ ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
+ ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
+
+ std::string value;
+ // Miss and hit count should remain the same, they're all pinned.
+ db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
+ ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+
+ // Miss and hit count should remain the same, they're all pinned.
+ value = Get(1, "key");
+ ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+}
+
+TEST_P(PinL0IndexAndFilterBlocksTest,
+ MultiLevelIndexAndFilterBlocksCachedWithPinning) {
+ Options options = CurrentOptions();
+ PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, false);
+ // get base cache values
+ uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
+ uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
+ uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
+ uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
+
+ std::string value;
+ // this should be read from L0
+ // so cache values don't change
+ value = Get(1, "a2");
+ ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+
+ // this should be read from L1
+ // the file is opened, prefetching results in a cache filter miss
+ // the block is loaded and added to the cache,
+ // then the get results in a cache hit for L1
+ // When we have inifinite max_files, there is still cache miss because we have
+ // reset the block cache
+ value = Get(1, "a");
+ ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+}
+
+TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
+ Options options = CurrentOptions();
+ // This ensures that db does not ref anything in the block cache, so
+ // EraseUnRefEntries could clear them up.
+ bool close_afterwards = true;
+ PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, close_afterwards);
+
+ // Get base cache values
+ uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
+ uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
+ uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
+ uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
+
+ if (disallow_preload_) {
+ // Now we have two files. We narrow the max open files to allow 3 entries
+ // so that preloading SST files won't happen.
+ options.max_open_files = 13;
+ // RocksDB sanitize max open files to at least 20. Modify it back.
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
+ int* max_open_files = static_cast<int*>(arg);
+ *max_open_files = 13;
+ });
+ }
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Reopen database. If max_open_files is set as -1, table readers will be
+ // preloaded. This will trigger a BlockBasedTable::Open() and prefetch
+ // L0 index and filter. Level 1's prefetching is disabled in DB::Open()
+ TryReopenWithColumnFamilies({"default", "pikachu"}, options);
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+
+ if (!disallow_preload_) {
+ // After reopen, cache miss are increased by one because we read (and only
+ // read) filter and index on L0
+ ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+ } else {
+ // If max_open_files is not -1, we do not preload table readers, so there is
+ // no change.
+ ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+ }
+ std::string value;
+ // this should be read from L0
+ value = Get(1, "a2");
+ // If max_open_files is -1, we have pinned index and filter in Rep, so there
+ // will not be changes in index and filter misses or hits. If max_open_files
+ // is not -1, Get() will open a TableReader and prefetch index and filter.
+ ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+
+ // this should be read from L1
+ value = Get(1, "a");
+ if (!disallow_preload_) {
+ // In inifinite max files case, there's a cache miss in executing Get()
+ // because index and filter are not prefetched before.
+ ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+ } else {
+ // In this case, cache miss will be increased by one in
+ // BlockBasedTable::Open() because this is not in DB::Open() code path so we
+ // will prefetch L1's index and filter. Cache hit will also be increased by
+ // one because Get() will read index and filter from the block cache
+ // prefetched in previous Open() call.
+ ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+ }
+
+ // Force a full compaction to one single file. There will be a block
+ // cache read for both of index and filter. If prefetch doesn't explicitly
+ // happen, it will happen when verifying the file.
+ Compact(1, "a", "zzzzz");
+ dbfull()->TEST_WaitForCompact();
+
+ if (!disallow_preload_) {
+ ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+ } else {
+ ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+ }
+
+ // Bloom and index hit will happen when a Get() happens.
+ value = Get(1, "a");
+ if (!disallow_preload_) {
+ ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+ } else {
+ ASSERT_EQ(fm + 3, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
+ ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
+ ASSERT_EQ(im + 3, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
+ ASSERT_EQ(ih + 4, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
+ PinL0IndexAndFilterBlocksTest,
+ ::testing::Values(std::make_tuple(true, false),
+ std::make_tuple(false, false),
+ std::make_tuple(false, true)));
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTest2, MaxCompactionBytesTest) {
+ Options options = CurrentOptions();
+ options.memtable_factory.reset(
+ new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
+ options.compaction_style = kCompactionStyleLevel;
+ options.write_buffer_size = 200 << 10;
+ options.arena_block_size = 4 << 10;
+ options.level0_file_num_compaction_trigger = 4;
+ options.num_levels = 4;
+ options.compression = kNoCompression;
+ options.max_bytes_for_level_base = 450 << 10;
+ options.target_file_size_base = 100 << 10;
+ // Infinite for full compaction.
+ options.max_compaction_bytes = options.target_file_size_base * 100;
+
+ Reopen(options);
+
+ Random rnd(301);
+
+ for (int num = 0; num < 8; num++) {
+ GenerateNewRandomFile(&rnd);
+ }
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ ASSERT_EQ("0,0,8", FilesPerLevel(0));
+
+ // When compact from Ln -> Ln+1, cut a file if the file overlaps with
+ // more than three files in Ln+1.
+ options.max_compaction_bytes = options.target_file_size_base * 3;
+ Reopen(options);
+
+ GenerateNewRandomFile(&rnd);
+ // Add three more small files that overlap with the previous file
+ for (int i = 0; i < 3; i++) {
+ Put("a", "z");
+ ASSERT_OK(Flush());
+ }
+ dbfull()->TEST_WaitForCompact();
+
+ // Output files to L1 are cut to three pieces, according to
+ // options.max_compaction_bytes
+ ASSERT_EQ("0,3,8", FilesPerLevel(0));
+}
+
+static void UniqueIdCallback(void* arg) {
+ int* result = reinterpret_cast<int*>(arg);
+ if (*result == -1) {
+ *result = 0;
+ }
+
+ rocksdb::SyncPoint::GetInstance()->ClearTrace();
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
+}
+
+class MockPersistentCache : public PersistentCache {
+ public:
+ explicit MockPersistentCache(const bool is_compressed, const size_t max_size)
+ : is_compressed_(is_compressed), max_size_(max_size) {
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
+ }
+
+ ~MockPersistentCache() override {}
+
+ PersistentCache::StatsType Stats() override {
+ return PersistentCache::StatsType();
+ }
+
+ Status Insert(const Slice& page_key, const char* data,
+ const size_t size) override {
+ MutexLock _(&lock_);
+
+ if (size_ > max_size_) {
+ size_ -= data_.begin()->second.size();
+ data_.erase(data_.begin());
+ }
+
+ data_.insert(std::make_pair(page_key.ToString(), std::string(data, size)));
+ size_ += size;
+ return Status::OK();
+ }
+
+ Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
+ size_t* size) override {
+ MutexLock _(&lock_);
+ auto it = data_.find(page_key.ToString());
+ if (it == data_.end()) {
+ return Status::NotFound();
+ }
+
+ assert(page_key.ToString() == it->first);
+ data->reset(new char[it->second.size()]);
+ memcpy(data->get(), it->second.c_str(), it->second.size());
+ *size = it->second.size();
+ return Status::OK();
+ }
+
+ bool IsCompressed() override { return is_compressed_; }
+
+ std::string GetPrintableOptions() const override {
+ return "MockPersistentCache";
+ }
+
+ port::Mutex lock_;
+ std::map<std::string, std::string> data_;
+ const bool is_compressed_ = true;
+ size_t size_ = 0;
+ const size_t max_size_ = 10 * 1024; // 10KiB
+};
+
+#ifdef OS_LINUX
+// Make sure that in CPU time perf context counters, Env::NowCPUNanos()
+// is used, rather than Env::CPUNanos();
+TEST_F(DBTest2, TestPerfContextGetCpuTime) {
+ // force resizing table cache so table handle is not preloaded so that
+ // we can measure find_table_nanos during Get().
+ dbfull()->TEST_table_cache()->SetCapacity(0);
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Flush());
+ env_->now_cpu_count_.store(0);
+
+ // CPU timing is not enabled with kEnableTimeExceptForMutex
+ SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
+ ASSERT_EQ("bar", Get("foo"));
+ ASSERT_EQ(0, get_perf_context()->get_cpu_nanos);
+ ASSERT_EQ(0, env_->now_cpu_count_.load());
+
+ uint64_t kDummyAddonTime = uint64_t{1000000000000};
+
+ // Add time to NowNanos() reading.
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "TableCache::FindTable:0",
+ [&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
+ ASSERT_EQ("bar", Get("foo"));
+ ASSERT_GT(env_->now_cpu_count_.load(), 2);
+ ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonTime);
+ ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);
+
+ SetPerfLevel(PerfLevel::kDisable);
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, TestPerfContextIterCpuTime) {
+ DestroyAndReopen(CurrentOptions());
+ // force resizing table cache so table handle is not preloaded so that
+ // we can measure find_table_nanos during iteration
+ dbfull()->TEST_table_cache()->SetCapacity(0);
+
+ const size_t kNumEntries = 10;
+ for (size_t i = 0; i < kNumEntries; ++i) {
+ ASSERT_OK(Put("k" + ToString(i), "v" + ToString(i)));
+ }
+ ASSERT_OK(Flush());
+ for (size_t i = 0; i < kNumEntries; ++i) {
+ ASSERT_EQ("v" + ToString(i), Get("k" + ToString(i)));
+ }
+ std::string last_key = "k" + ToString(kNumEntries - 1);
+ std::string last_value = "v" + ToString(kNumEntries - 1);
+ env_->now_cpu_count_.store(0);
+
+ // CPU timing is not enabled with kEnableTimeExceptForMutex
+ SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
+ Iterator* iter = db_->NewIterator(ReadOptions());
+ iter->Seek("k0");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("v0", iter->value().ToString());
+ iter->SeekForPrev(last_key);
+ ASSERT_TRUE(iter->Valid());
+ iter->SeekToLast();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(last_value, iter->value().ToString());
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("v0", iter->value().ToString());
+ ASSERT_EQ(0, get_perf_context()->iter_seek_cpu_nanos);
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("v1", iter->value().ToString());
+ ASSERT_EQ(0, get_perf_context()->iter_next_cpu_nanos);
+ iter->Prev();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("v0", iter->value().ToString());
+ ASSERT_EQ(0, get_perf_context()->iter_prev_cpu_nanos);
+ ASSERT_EQ(0, env_->now_cpu_count_.load());
+ delete iter;
+
+ uint64_t kDummyAddonTime = uint64_t{1000000000000};
+
+ // Add time to NowNanos() reading.
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "TableCache::FindTable:0",
+ [&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
+ iter = db_->NewIterator(ReadOptions());
+ iter->Seek("k0");
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("v0", iter->value().ToString());
+ iter->SeekForPrev(last_key);
+ ASSERT_TRUE(iter->Valid());
+ iter->SeekToLast();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(last_value, iter->value().ToString());
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("v0", iter->value().ToString());
+ ASSERT_GT(get_perf_context()->iter_seek_cpu_nanos, 0);
+ ASSERT_LT(get_perf_context()->iter_seek_cpu_nanos, kDummyAddonTime);
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("v1", iter->value().ToString());
+ ASSERT_GT(get_perf_context()->iter_next_cpu_nanos, 0);
+ ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonTime);
+ iter->Prev();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("v0", iter->value().ToString());
+ ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0);
+ ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonTime);
+ ASSERT_GE(env_->now_cpu_count_.load(), 12);
+ ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime);
+
+ SetPerfLevel(PerfLevel::kDisable);
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ delete iter;
+}
+#endif // OS_LINUX
+
+#ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
+TEST_F(DBTest2, PersistentCache) {
+ int num_iter = 80;
+
+ Options options;
+ options.write_buffer_size = 64 * 1024; // small write buffer
+ options.statistics = rocksdb::CreateDBStatistics();
+ options = CurrentOptions(options);
+
+ auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024};
+ auto types = {/*compressed*/ 1, /*uncompressed*/ 0};
+ for (auto bsize : bsizes) {
+ for (auto type : types) {
+ BlockBasedTableOptions table_options;
+ table_options.persistent_cache.reset(
+ new MockPersistentCache(type, 10 * 1024));
+ table_options.no_block_cache = true;
+ table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr;
+ table_options.block_cache_compressed = nullptr;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ // default column family doesn't have block cache
+ Options no_block_cache_opts;
+ no_block_cache_opts.statistics = options.statistics;
+ no_block_cache_opts = CurrentOptions(no_block_cache_opts);
+ BlockBasedTableOptions table_options_no_bc;
+ table_options_no_bc.no_block_cache = true;
+ no_block_cache_opts.table_factory.reset(
+ NewBlockBasedTableFactory(table_options_no_bc));
+ ReopenWithColumnFamilies(
+ {"default", "pikachu"},
+ std::vector<Options>({no_block_cache_opts, options}));
+
+ Random rnd(301);
+
+ // Write 8MB (80 values, each 100K)
+ ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
+ std::vector<std::string> values;
+ std::string str;
+ for (int i = 0; i < num_iter; i++) {
+ if (i % 4 == 0) { // high compression ratio
+ str = RandomString(&rnd, 1000);
+ }
+ values.push_back(str);
+ ASSERT_OK(Put(1, Key(i), values[i]));
+ }
+
+ // flush all data from memtable so that reads are from block cache
+ ASSERT_OK(Flush(1));
+
+ for (int i = 0; i < num_iter; i++) {
+ ASSERT_EQ(Get(1, Key(i)), values[i]);
+ }
+
+ auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT);
+ auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS);
+
+ ASSERT_GT(hit, 0);
+ ASSERT_GT(miss, 0);
+ }
+ }
+}
+#endif // !OS_SOLARIS
+
+namespace {
+void CountSyncPoint() {
+ TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
+}
+} // namespace
+
+TEST_F(DBTest2, SyncPointMarker) {
+ std::atomic<int> sync_point_called(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBTest2::MarkedPoint",
+ [&](void* /*arg*/) { sync_point_called.fetch_add(1); });
+
+ // The first dependency enforces Marker can be loaded before MarkedPoint.
+ // The second checks that thread 1's MarkedPoint should be disabled here.
+ // Execution order:
+ // | Thread 1 | Thread 2 |
+ // | | Marker |
+ // | MarkedPoint | |
+ // | Thread1First | |
+ // | | MarkedPoint |
+ rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
+ {{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}},
+ {{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}});
+
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::function<void()> func1 = [&]() {
+ CountSyncPoint();
+ TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First");
+ };
+
+ std::function<void()> func2 = [&]() {
+ TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker");
+ CountSyncPoint();
+ };
+
+ auto thread1 = port::Thread(func1);
+ auto thread2 = port::Thread(func2);
+ thread1.join();
+ thread2.join();
+
+ // Callback is only executed once
+ ASSERT_EQ(sync_point_called.load(), 1);
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+#endif
+
+size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
+ std::string buffer;
+
+ PutVarint32(&buffer, static_cast<uint32_t>(0));
+ PutVarint32(&buffer, static_cast<uint32_t>(key_size));
+ PutVarint32(&buffer, static_cast<uint32_t>(value_size));
+
+ return buffer.size() + key_size + value_size;
+}
+
+TEST_F(DBTest2, ReadAmpBitmap) {
+ Options options = CurrentOptions();
+ BlockBasedTableOptions bbto;
+ uint32_t bytes_per_bit[2] = {1, 16};
+ for (size_t k = 0; k < 2; k++) {
+ // Disable delta encoding to make it easier to calculate read amplification
+ bbto.use_delta_encoding = false;
+ // Huge block cache to make it easier to calculate read amplification
+ bbto.block_cache = NewLRUCache(1024 * 1024 * 1024);
+ bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ options.statistics = rocksdb::CreateDBStatistics();
+ DestroyAndReopen(options);
+
+ const size_t kNumEntries = 10000;
+
+ Random rnd(301);
+ for (size_t i = 0; i < kNumEntries; i++) {
+ ASSERT_OK(Put(Key(static_cast<int>(i)), RandomString(&rnd, 100)));
+ }
+ ASSERT_OK(Flush());
+
+ Close();
+ Reopen(options);
+
+ // Read keys/values randomly and verify that reported read amp error
+ // is less than 2%
+ uint64_t total_useful_bytes = 0;
+ std::set<int> read_keys;
+ std::string value;
+ for (size_t i = 0; i < kNumEntries * 5; i++) {
+ int key_idx = rnd.Next() % kNumEntries;
+ std::string key = Key(key_idx);
+ ASSERT_OK(db_->Get(ReadOptions(), key, &value));
+
+ if (read_keys.find(key_idx) == read_keys.end()) {
+ auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
+ total_useful_bytes +=
+ GetEncodedEntrySize(internal_key.size(), value.size());
+ read_keys.insert(key_idx);
+ }
+
+ double expected_read_amp =
+ static_cast<double>(total_useful_bytes) /
+ options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
+
+ double read_amp =
+ static_cast<double>(options.statistics->getTickerCount(
+ READ_AMP_ESTIMATE_USEFUL_BYTES)) /
+ options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
+
+ double error_pct = fabs(expected_read_amp - read_amp) * 100;
+ // Error between reported read amp and real read amp should be less than
+ // 2%
+ EXPECT_LE(error_pct, 2);
+ }
+
+ // Make sure we read every thing in the DB (which is smaller than our cache)
+ Iterator* iter = db_->NewIterator(ReadOptions());
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString()));
+ }
+ delete iter;
+
+ // Read amp is on average 100% since we read all what we loaded in memory
+ if (k == 0) {
+ ASSERT_EQ(
+ options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES),
+ options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES));
+ } else {
+ ASSERT_NEAR(
+ options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES) *
+ 1.0f /
+ options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES),
+ 1, .01);
+ }
+ }
+}
+
+#ifndef OS_SOLARIS // GetUniqueIdFromFile is not implemented
+TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
+ {
+ const int kIdBufLen = 100;
+ char id_buf[kIdBufLen];
+#ifndef OS_WIN
+ // You can't open a directory on windows using random access file
+ std::unique_ptr<RandomAccessFile> file;
+ ASSERT_OK(env_->NewRandomAccessFile(dbname_, &file, EnvOptions()));
+ if (file->GetUniqueId(id_buf, kIdBufLen) == 0) {
+ // fs holding db directory doesn't support getting a unique file id,
+ // this means that running this test will fail because lru_cache will load
+ // the blocks again regardless of them being already in the cache
+ return;
+ }
+#else
+ std::unique_ptr<Directory> dir;
+ ASSERT_OK(env_->NewDirectory(dbname_, &dir));
+ if (dir->GetUniqueId(id_buf, kIdBufLen) == 0) {
+ // fs holding db directory doesn't support getting a unique file id,
+ // this means that running this test will fail because lru_cache will load
+ // the blocks again regardless of them being already in the cache
+ return;
+ }
+#endif
+ }
+ uint32_t bytes_per_bit[2] = {1, 16};
+ for (size_t k = 0; k < 2; k++) {
+ std::shared_ptr<Cache> lru_cache = NewLRUCache(1024 * 1024 * 1024);
+ std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
+
+ Options options = CurrentOptions();
+ BlockBasedTableOptions bbto;
+ // Disable delta encoding to make it easier to calculate read amplification
+ bbto.use_delta_encoding = false;
+ // Huge block cache to make it easier to calculate read amplification
+ bbto.block_cache = lru_cache;
+ bbto.read_amp_bytes_per_bit = bytes_per_bit[k];
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ options.statistics = stats;
+ DestroyAndReopen(options);
+
+ const int kNumEntries = 10000;
+
+ Random rnd(301);
+ for (int i = 0; i < kNumEntries; i++) {
+ ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
+ }
+ ASSERT_OK(Flush());
+
+ Close();
+ Reopen(options);
+
+ uint64_t total_useful_bytes = 0;
+ std::set<int> read_keys;
+ std::string value;
+ // Iter1: Read half the DB, Read even keys
+ // Key(0), Key(2), Key(4), Key(6), Key(8), ...
+ for (int i = 0; i < kNumEntries; i += 2) {
+ std::string key = Key(i);
+ ASSERT_OK(db_->Get(ReadOptions(), key, &value));
+
+ if (read_keys.find(i) == read_keys.end()) {
+ auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
+ total_useful_bytes +=
+ GetEncodedEntrySize(internal_key.size(), value.size());
+ read_keys.insert(i);
+ }
+ }
+
+ size_t total_useful_bytes_iter1 =
+ options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
+ size_t total_loaded_bytes_iter1 =
+ options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
+
+ Close();
+ std::shared_ptr<Statistics> new_statistics = rocksdb::CreateDBStatistics();
+ // Destroy old statistics obj that the blocks in lru_cache are pointing to
+ options.statistics.reset();
+ // Use the statistics object that we just created
+ options.statistics = new_statistics;
+ Reopen(options);
+
+ // Iter2: Read half the DB, Read odd keys
+ // Key(1), Key(3), Key(5), Key(7), Key(9), ...
+ for (int i = 1; i < kNumEntries; i += 2) {
+ std::string key = Key(i);
+ ASSERT_OK(db_->Get(ReadOptions(), key, &value));
+
+ if (read_keys.find(i) == read_keys.end()) {
+ auto internal_key = InternalKey(key, 0, ValueType::kTypeValue);
+ total_useful_bytes +=
+ GetEncodedEntrySize(internal_key.size(), value.size());
+ read_keys.insert(i);
+ }
+ }
+
+ size_t total_useful_bytes_iter2 =
+ options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
+ size_t total_loaded_bytes_iter2 =
+ options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
+
+
+ // Read amp is on average 100% since we read all what we loaded in memory
+ if (k == 0) {
+ ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
+ total_loaded_bytes_iter1 + total_loaded_bytes_iter2);
+ } else {
+ ASSERT_NEAR((total_useful_bytes_iter1 + total_useful_bytes_iter2) * 1.0f /
+ (total_loaded_bytes_iter1 + total_loaded_bytes_iter2),
+ 1, .01);
+ }
+ }
+}
+#endif // !OS_SOLARIS
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTest2, AutomaticCompactionOverlapManualCompaction) {
+ Options options = CurrentOptions();
+ options.num_levels = 3;
+ options.IncreaseParallelism(20);
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put(Key(0), "a"));
+ ASSERT_OK(Put(Key(5), "a"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(10), "a"));
+ ASSERT_OK(Put(Key(15), "a"));
+ ASSERT_OK(Flush());
+
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 2;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+
+ auto get_stat = [](std::string level_str, LevelStatType type,
+ std::map<std::string, std::string> props) {
+ auto prop_str =
+ "compaction." + level_str + "." +
+ InternalStats::compaction_level_stats.at(type).property_name.c_str();
+ auto prop_item = props.find(prop_str);
+ return prop_item == props.end() ? 0 : std::stod(prop_item->second);
+ };
+
+ // Trivial move 2 files to L2
+ ASSERT_EQ("0,0,2", FilesPerLevel());
+ // Also test that the stats GetMapProperty API reporting the same result
+ {
+ std::map<std::string, std::string> prop;
+ ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
+ ASSERT_EQ(0, get_stat("L0", LevelStatType::NUM_FILES, prop));
+ ASSERT_EQ(0, get_stat("L1", LevelStatType::NUM_FILES, prop));
+ ASSERT_EQ(2, get_stat("L2", LevelStatType::NUM_FILES, prop));
+ ASSERT_EQ(2, get_stat("Sum", LevelStatType::NUM_FILES, prop));
+ }
+
+ // While the compaction is running, we will create 2 new files that
+ // can fit in L2, these 2 files will be moved to L2 and overlap with
+ // the running compaction and break the LSM consistency.
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::Run():Start", [&](void* /*arg*/) {
+ ASSERT_OK(
+ dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"},
+ {"max_bytes_for_level_base", "1"}}));
+ ASSERT_OK(Put(Key(6), "a"));
+ ASSERT_OK(Put(Key(7), "a"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(8), "a"));
+ ASSERT_OK(Put(Key(9), "a"));
+ ASSERT_OK(Flush());
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Run a manual compaction that will compact the 2 files in L2
+ // into 1 file in L2
+ cro.exclusive_manual_compaction = false;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+
+ // Test that the stats GetMapProperty API reporting 1 file in L2
+ {
+ std::map<std::string, std::string> prop;
+ ASSERT_TRUE(dbfull()->GetMapProperty("rocksdb.cfstats", &prop));
+ ASSERT_EQ(1, get_stat("L2", LevelStatType::NUM_FILES, prop));
+ }
+}
+
+TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
+ Options options = CurrentOptions();
+ options.num_levels = 2;
+ options.IncreaseParallelism(20);
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put(Key(0), "a"));
+ ASSERT_OK(Put(Key(5), "a"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(10), "a"));
+ ASSERT_OK(Put(Key(15), "a"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ // Trivial move 2 files to L1
+ ASSERT_EQ("0,2", FilesPerLevel());
+
+ std::function<void()> bg_manual_compact = [&]() {
+ std::string k1 = Key(6);
+ std::string k2 = Key(9);
+ Slice k1s(k1);
+ Slice k2s(k2);
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = false;
+ ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s));
+ };
+ rocksdb::port::Thread bg_thread;
+
+ // While the compaction is running, we will create 2 new files that
+ // can fit in L1, these 2 files will be moved to L1 and overlap with
+ // the running compaction and break the LSM consistency.
+ std::atomic<bool> flag(false);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::Run():Start", [&](void* /*arg*/) {
+ if (flag.exchange(true)) {
+ // We want to make sure to call this callback only once
+ return;
+ }
+ ASSERT_OK(Put(Key(6), "a"));
+ ASSERT_OK(Put(Key(7), "a"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(8), "a"));
+ ASSERT_OK(Put(Key(9), "a"));
+ ASSERT_OK(Flush());
+
+ // Start a non-exclusive manual compaction in a bg thread
+ bg_thread = port::Thread(bg_manual_compact);
+ // This manual compaction conflict with the other manual compaction
+ // so it should wait until the first compaction finish
+ env_->SleepForMicroseconds(1000000);
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Run a manual compaction that will compact the 2 files in L1
+ // into 1 file in L1
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = false;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ bg_thread.join();
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, OptimizeForPointLookup) {
+ Options options = CurrentOptions();
+ Close();
+ options.OptimizeForPointLookup(2);
+ ASSERT_OK(DB::Open(options, dbname_, &db_));
+
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_EQ("v1", Get("foo"));
+ Flush();
+ ASSERT_EQ("v1", Get("foo"));
+}
+
+#endif // ROCKSDB_LITE
+
+TEST_F(DBTest2, GetRaceFlush1) {
+ ASSERT_OK(Put("foo", "v1"));
+
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::GetImpl:1", "DBTest2::GetRaceFlush:1"},
+ {"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:2"}});
+
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ rocksdb::port::Thread t1([&] {
+ TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
+ ASSERT_OK(Put("foo", "v2"));
+ Flush();
+ TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
+ });
+
+ // Get() is issued after the first Put(), so it should see either
+ // "v1" or "v2".
+ ASSERT_NE("NOT_FOUND", Get("foo"));
+ t1.join();
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, GetRaceFlush2) {
+ ASSERT_OK(Put("foo", "v1"));
+
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::GetImpl:3", "DBTest2::GetRaceFlush:1"},
+ {"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:4"}});
+
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ port::Thread t1([&] {
+ TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
+ ASSERT_OK(Put("foo", "v2"));
+ Flush();
+ TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
+ });
+
+ // Get() is issued after the first Put(), so it should see either
+ // "v1" or "v2".
+ ASSERT_NE("NOT_FOUND", Get("foo"));
+ t1.join();
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, DirectIO) {
+ if (!IsDirectIOSupported()) {
+ return;
+ }
+ Options options = CurrentOptions();
+ options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
+ true;
+ options.allow_mmap_reads = options.allow_mmap_writes = false;
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put(Key(0), "a"));
+ ASSERT_OK(Put(Key(5), "a"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(10), "a"));
+ ASSERT_OK(Put(Key(15), "a"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ Reopen(options);
+}
+
+TEST_F(DBTest2, MemtableOnlyIterator) {
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ ASSERT_OK(Put(1, "foo", "first"));
+ ASSERT_OK(Put(1, "bar", "second"));
+
+ ReadOptions ropt;
+ ropt.read_tier = kMemtableTier;
+ std::string value;
+ Iterator* it = nullptr;
+
+ // Before flushing
+ // point lookups
+ ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
+ ASSERT_EQ("first", value);
+ ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
+ ASSERT_EQ("second", value);
+
+ // Memtable-only iterator (read_tier=kMemtableTier); data not flushed yet.
+ it = db_->NewIterator(ropt, handles_[1]);
+ int count = 0;
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ ASSERT_TRUE(it->Valid());
+ count++;
+ }
+ ASSERT_TRUE(!it->Valid());
+ ASSERT_EQ(2, count);
+ delete it;
+
+ Flush(1);
+
+ // After flushing
+ // point lookups
+ ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value));
+ ASSERT_EQ("first", value);
+ ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value));
+ ASSERT_EQ("second", value);
+ // nothing should be returned using memtable-only iterator after flushing.
+ it = db_->NewIterator(ropt, handles_[1]);
+ count = 0;
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ ASSERT_TRUE(it->Valid());
+ count++;
+ }
+ ASSERT_TRUE(!it->Valid());
+ ASSERT_EQ(0, count);
+ delete it;
+
+ // Add a key to memtable
+ ASSERT_OK(Put(1, "foobar", "third"));
+ it = db_->NewIterator(ropt, handles_[1]);
+ count = 0;
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ ASSERT_TRUE(it->Valid());
+ ASSERT_EQ("foobar", it->key().ToString());
+ ASSERT_EQ("third", it->value().ToString());
+ count++;
+ }
+ ASSERT_TRUE(!it->Valid());
+ ASSERT_EQ(1, count);
+ delete it;
+}
+
+TEST_F(DBTest2, LowPriWrite) {
+ Options options = CurrentOptions();
+ // Compaction pressure should trigger since 6 files
+ options.level0_file_num_compaction_trigger = 4;
+ options.level0_slowdown_writes_trigger = 12;
+ options.level0_stop_writes_trigger = 30;
+ options.delayed_write_rate = 8 * 1024 * 1024;
+ Reopen(options);
+
+ std::atomic<int> rate_limit_count(0);
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::Request:1", [&](void* arg) {
+ rate_limit_count.fetch_add(1);
+ int64_t* rate_bytes_per_sec = static_cast<int64_t*>(arg);
+ ASSERT_EQ(1024 * 1024, *rate_bytes_per_sec);
+ });
+ // Block compaction
+ rocksdb::SyncPoint::GetInstance()->LoadDependency({
+ {"DBTest.LowPriWrite:0", "DBImpl::BGWorkCompaction"},
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ WriteOptions wo;
+ for (int i = 0; i < 6; i++) {
+ wo.low_pri = false;
+ Put("", "", wo);
+ wo.low_pri = true;
+ Put("", "", wo);
+ Flush();
+ }
+ ASSERT_EQ(0, rate_limit_count.load());
+ wo.low_pri = true;
+ Put("", "", wo);
+ ASSERT_EQ(1, rate_limit_count.load());
+ wo.low_pri = false;
+ Put("", "", wo);
+ ASSERT_EQ(1, rate_limit_count.load());
+
+ TEST_SYNC_POINT("DBTest.LowPriWrite:0");
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+
+ dbfull()->TEST_WaitForCompact();
+ wo.low_pri = true;
+ Put("", "", wo);
+ ASSERT_EQ(1, rate_limit_count.load());
+ wo.low_pri = false;
+ Put("", "", wo);
+ ASSERT_EQ(1, rate_limit_count.load());
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTest2, RateLimitedCompactionReads) {
+ // compaction input has 512KB data
+ const int kNumKeysPerFile = 128;
+ const int kBytesPerKey = 1024;
+ const int kNumL0Files = 4;
+
+ for (auto use_direct_io : {false, true}) {
+ if (use_direct_io && !IsDirectIOSupported()) {
+ continue;
+ }
+ Options options = CurrentOptions();
+ options.compression = kNoCompression;
+ options.level0_file_num_compaction_trigger = kNumL0Files;
+ options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+ options.new_table_reader_for_compaction_inputs = true;
+ // takes roughly one second, split into 100 x 10ms intervals. Each interval
+ // permits 5.12KB, which is smaller than the block size, so this test
+ // exercises the code for chunking reads.
+ options.rate_limiter.reset(NewGenericRateLimiter(
+ static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
+ kBytesPerKey) /* rate_bytes_per_sec */,
+ 10 * 1000 /* refill_period_us */, 10 /* fairness */,
+ RateLimiter::Mode::kReadsOnly));
+ options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
+ use_direct_io;
+ BlockBasedTableOptions bbto;
+ bbto.block_size = 16384;
+ bbto.no_block_cache = true;
+ options.table_factory.reset(new BlockBasedTableFactory(bbto));
+ DestroyAndReopen(options);
+
+ for (int i = 0; i < kNumL0Files; ++i) {
+ for (int j = 0; j <= kNumKeysPerFile; ++j) {
+ ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
+ }
+ dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
+ }
+ dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(0, NumTableFilesAtLevel(0));
+
+ ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH));
+ // should be slightly above 512KB due to non-data blocks read. Arbitrarily
+ // chose 1MB as the upper bound on the total bytes read.
+ size_t rate_limited_bytes =
+ options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
+ // Include the explicit prefetch of the footer in direct I/O case.
+ size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
+ ASSERT_GE(
+ rate_limited_bytes,
+ static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
+ ASSERT_LT(
+ rate_limited_bytes,
+ static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
+ direct_io_extra));
+
+ Iterator* iter = db_->NewIterator(ReadOptions());
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
+ }
+ delete iter;
+ // bytes read for user iterator shouldn't count against the rate limit.
+ ASSERT_EQ(rate_limited_bytes,
+ static_cast<size_t>(
+ options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
+ }
+}
+#endif // ROCKSDB_LITE
+
+// Make sure DB can be reopen with reduced number of levels, given no file
+// is on levels higher than the new num_levels.
+TEST_F(DBTest2, ReduceLevel) {
+ Options options;
+ options.disable_auto_compactions = true;
+ options.num_levels = 7;
+ Reopen(options);
+ Put("foo", "bar");
+ Flush();
+ MoveFilesToLevel(6);
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
+#endif // !ROCKSDB_LITE
+ CompactRangeOptions compact_options;
+ compact_options.change_level = true;
+ compact_options.target_level = 1;
+ dbfull()->CompactRange(compact_options, nullptr, nullptr);
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("0,1", FilesPerLevel());
+#endif // !ROCKSDB_LITE
+ options.num_levels = 3;
+ Reopen(options);
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("0,1", FilesPerLevel());
+#endif // !ROCKSDB_LITE
+}
+
+// Test that ReadCallback is actually used in both memtbale and sst tables
+TEST_F(DBTest2, ReadCallbackTest) {
+ Options options;
+ options.disable_auto_compactions = true;
+ options.num_levels = 7;
+ Reopen(options);
+ std::vector<const Snapshot*> snapshots;
+ // Try to create a db with multiple layers and a memtable
+ const std::string key = "foo";
+ const std::string value = "bar";
+ // This test assumes that the seq start with 1 and increased by 1 after each
+ // write batch of size 1. If that behavior changes, the test needs to be
+ // updated as well.
+ // TODO(myabandeh): update this test to use the seq number that is returned by
+ // the DB instead of assuming what seq the DB used.
+ int i = 1;
+ for (; i < 10; i++) {
+ Put(key, value + std::to_string(i));
+ // Take a snapshot to avoid the value being removed during compaction
+ auto snapshot = dbfull()->GetSnapshot();
+ snapshots.push_back(snapshot);
+ }
+ Flush();
+ for (; i < 20; i++) {
+ Put(key, value + std::to_string(i));
+ // Take a snapshot to avoid the value being removed during compaction
+ auto snapshot = dbfull()->GetSnapshot();
+ snapshots.push_back(snapshot);
+ }
+ Flush();
+ MoveFilesToLevel(6);
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
+#endif // !ROCKSDB_LITE
+ for (; i < 30; i++) {
+ Put(key, value + std::to_string(i));
+ auto snapshot = dbfull()->GetSnapshot();
+ snapshots.push_back(snapshot);
+ }
+ Flush();
+#ifndef ROCKSDB_LITE
+ ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel());
+#endif // !ROCKSDB_LITE
+ // And also add some values to the memtable
+ for (; i < 40; i++) {
+ Put(key, value + std::to_string(i));
+ auto snapshot = dbfull()->GetSnapshot();
+ snapshots.push_back(snapshot);
+ }
+
+ class TestReadCallback : public ReadCallback {
+ public:
+ explicit TestReadCallback(SequenceNumber snapshot)
+ : ReadCallback(snapshot), snapshot_(snapshot) {}
+ bool IsVisibleFullCheck(SequenceNumber seq) override {
+ return seq <= snapshot_;
+ }
+
+ private:
+ SequenceNumber snapshot_;
+ };
+
+ for (int seq = 1; seq < i; seq++) {
+ PinnableSlice pinnable_val;
+ ReadOptions roptions;
+ TestReadCallback callback(seq);
+ bool dont_care = true;
+ Status s = dbfull()->GetImpl(roptions, dbfull()->DefaultColumnFamily(), key,
+ &pinnable_val, &dont_care, &callback);
+ ASSERT_TRUE(s.ok());
+ // Assuming that after each Put the DB increased seq by one, the value and
+ // seq number must be equal since we also inc value by 1 after each Put.
+ ASSERT_EQ(value + std::to_string(seq), pinnable_val.ToString());
+ }
+
+ for (auto snapshot : snapshots) {
+ dbfull()->ReleaseSnapshot(snapshot);
+ }
+}
+
+#ifndef ROCKSDB_LITE
+
+TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
+ // Regression test for race condition where an obsolete file is returned to
+ // user as a "live file" but then deleted, all while file deletions are
+ // disabled.
+ //
+ // It happened like this:
+ //
+ // 1. [flush thread] Log file "x.log" found by FindObsoleteFiles
+ // 2. [user thread] DisableFileDeletions, GetSortedWalFiles are called and the
+ // latter returned "x.log"
+ // 3. [flush thread] PurgeObsoleteFiles deleted "x.log"
+ // 4. [user thread] Reading "x.log" failed
+ //
+ // Unfortunately the only regression test I can come up with involves sleep.
+ // We cannot set SyncPoints to repro since, once the fix is applied, the
+ // SyncPoints would cause a deadlock as the repro's sequence of events is now
+ // prohibited.
+ //
+ // Instead, if we sleep for a second between Find and Purge, and ensure the
+ // read attempt happens after purge, then the sequence of events will almost
+ // certainly happen on the old code.
+ rocksdb::SyncPoint::GetInstance()->LoadDependency({
+ {"DBImpl::BackgroundCallFlush:FilesFound",
+ "DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered"},
+ {"DBImpl::PurgeObsoleteFiles:End",
+ "DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured"},
+ });
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::PurgeObsoleteFiles:Begin",
+ [&](void* /*arg*/) { env_->SleepForMicroseconds(1000000); });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ Put("key", "val");
+ FlushOptions flush_opts;
+ flush_opts.wait = false;
+ db_->Flush(flush_opts);
+ TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");
+
+ db_->DisableFileDeletions();
+ VectorLogPtr log_files;
+ db_->GetSortedWalFiles(log_files);
+ TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
+ for (const auto& log_file : log_files) {
+ ASSERT_OK(env_->FileExists(LogFileName(dbname_, log_file->LogNumber())));
+ }
+
+ db_->EnableFileDeletions();
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, TestNumPread) {
+ Options options = CurrentOptions();
+ // disable block cache
+ BlockBasedTableOptions table_options;
+ table_options.no_block_cache = true;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ Reopen(options);
+ env_->count_random_reads_ = true;
+
+ env_->random_file_open_counter_.store(0);
+ ASSERT_OK(Put("bar", "foo"));
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Flush());
+ // After flush, we'll open the file and read footer, meta block,
+ // property block and index block.
+ ASSERT_EQ(4, env_->random_read_counter_.Read());
+ ASSERT_EQ(1, env_->random_file_open_counter_.load());
+
+ // One pread per a normal data block read
+ env_->random_file_open_counter_.store(0);
+ env_->random_read_counter_.Reset();
+ ASSERT_EQ("bar", Get("foo"));
+ ASSERT_EQ(1, env_->random_read_counter_.Read());
+ // All files are already opened.
+ ASSERT_EQ(0, env_->random_file_open_counter_.load());
+
+ env_->random_file_open_counter_.store(0);
+ env_->random_read_counter_.Reset();
+ ASSERT_OK(Put("bar2", "foo2"));
+ ASSERT_OK(Put("foo2", "bar2"));
+ ASSERT_OK(Flush());
+ // After flush, we'll open the file and read footer, meta block,
+ // property block and index block.
+ ASSERT_EQ(4, env_->random_read_counter_.Read());
+ ASSERT_EQ(1, env_->random_file_open_counter_.load());
+
+ // Compaction needs two input blocks, which requires 2 preads, and
+ // generate a new SST file which needs 4 preads (footer, meta block,
+ // property block and index block). In total 6.
+ env_->random_file_open_counter_.store(0);
+ env_->random_read_counter_.Reset();
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_EQ(6, env_->random_read_counter_.Read());
+ // All compactin input files should have already been opened.
+ ASSERT_EQ(1, env_->random_file_open_counter_.load());
+
+ // One pread per a normal data block read
+ env_->random_file_open_counter_.store(0);
+ env_->random_read_counter_.Reset();
+ ASSERT_EQ("foo2", Get("bar2"));
+ ASSERT_EQ(1, env_->random_read_counter_.Read());
+ // SST files are already opened.
+ ASSERT_EQ(0, env_->random_file_open_counter_.load());
+}
+
+TEST_F(DBTest2, TraceAndReplay) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreatePutOperator();
+ ReadOptions ro;
+ WriteOptions wo;
+ TraceOptions trace_opts;
+ EnvOptions env_opts;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+ Iterator* single_iter = nullptr;
+
+ ASSERT_TRUE(db_->EndTrace().IsIOError());
+
+ std::string trace_filename = dbname_ + "/rocksdb.trace";
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+ ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+
+ ASSERT_OK(Put(0, "a", "1"));
+ ASSERT_OK(Merge(0, "b", "2"));
+ ASSERT_OK(Delete(0, "c"));
+ ASSERT_OK(SingleDelete(0, "d"));
+ ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
+
+ WriteBatch batch;
+ ASSERT_OK(batch.Put("f", "11"));
+ ASSERT_OK(batch.Merge("g", "12"));
+ ASSERT_OK(batch.Delete("h"));
+ ASSERT_OK(batch.SingleDelete("i"));
+ ASSERT_OK(batch.DeleteRange("j", "k"));
+ ASSERT_OK(db_->Write(wo, &batch));
+
+ single_iter = db_->NewIterator(ro);
+ single_iter->Seek("f");
+ single_iter->SeekForPrev("g");
+ delete single_iter;
+
+ ASSERT_EQ("1", Get(0, "a"));
+ ASSERT_EQ("12", Get(0, "g"));
+
+ ASSERT_OK(Put(1, "foo", "bar"));
+ ASSERT_OK(Put(1, "rocksdb", "rocks"));
+ ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+
+ ASSERT_OK(db_->EndTrace());
+ // These should not get into the trace file as it is after EndTrace.
+ Put("hello", "world");
+ Merge("foo", "bar");
+
+ // Open another db, replay, and verify the data
+ std::string value;
+ std::string dbname2 = test::TmpDir(env_) + "/db_replay";
+ ASSERT_OK(DestroyDB(dbname2, options));
+
+ // Using a different name than db2, to pacify infer's use-after-lifetime
+ // warnings (http://fbinfer.com).
+ DB* db2_init = nullptr;
+ options.create_if_missing = true;
+ ASSERT_OK(DB::Open(options, dbname2, &db2_init));
+ ColumnFamilyHandle* cf;
+ ASSERT_OK(
+ db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
+ delete cf;
+ delete db2_init;
+
+ DB* db2 = nullptr;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ ColumnFamilyOptions cf_options;
+ cf_options.merge_operator = MergeOperators::CreatePutOperator();
+ column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+ column_families.push_back(
+ ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+ std::vector<ColumnFamilyHandle*> handles;
+ ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
+
+ env_->SleepForMicroseconds(100);
+ // Verify that the keys don't already exist
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
+
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
+ Replayer replayer(db2, handles_, std::move(trace_reader));
+ ASSERT_OK(replayer.Replay());
+
+ ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
+ ASSERT_EQ("1", value);
+ ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
+ ASSERT_EQ("12", value);
+ ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
+
+ ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
+ ASSERT_EQ("bar", value);
+ ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
+ ASSERT_EQ("rocks", value);
+
+ for (auto handle : handles) {
+ delete handle;
+ }
+ delete db2;
+ ASSERT_OK(DestroyDB(dbname2, options));
+}
+
+TEST_F(DBTest2, TraceWithLimit) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreatePutOperator();
+ ReadOptions ro;
+ WriteOptions wo;
+ TraceOptions trace_opts;
+ EnvOptions env_opts;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+
+ // test the max trace file size options
+ trace_opts.max_trace_file_size = 5;
+ std::string trace_filename = dbname_ + "/rocksdb.trace1";
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+ ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+ ASSERT_OK(Put(0, "a", "1"));
+ ASSERT_OK(Put(0, "b", "1"));
+ ASSERT_OK(Put(0, "c", "1"));
+ ASSERT_OK(db_->EndTrace());
+
+ std::string dbname2 = test::TmpDir(env_) + "/db_replay2";
+ std::string value;
+ ASSERT_OK(DestroyDB(dbname2, options));
+
+ // Using a different name than db2, to pacify infer's use-after-lifetime
+ // warnings (http://fbinfer.com).
+ DB* db2_init = nullptr;
+ options.create_if_missing = true;
+ ASSERT_OK(DB::Open(options, dbname2, &db2_init));
+ ColumnFamilyHandle* cf;
+ ASSERT_OK(
+ db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
+ delete cf;
+ delete db2_init;
+
+ DB* db2 = nullptr;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ ColumnFamilyOptions cf_options;
+ cf_options.merge_operator = MergeOperators::CreatePutOperator();
+ column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+ column_families.push_back(
+ ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+ std::vector<ColumnFamilyHandle*> handles;
+ ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
+
+ env_->SleepForMicroseconds(100);
+ // Verify that the keys don't already exist
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
+
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
+ Replayer replayer(db2, handles_, std::move(trace_reader));
+ ASSERT_OK(replayer.Replay());
+
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
+
+ for (auto handle : handles) {
+ delete handle;
+ }
+ delete db2;
+ ASSERT_OK(DestroyDB(dbname2, options));
+}
+
+TEST_F(DBTest2, TraceWithSampling) {
+ Options options = CurrentOptions();
+ ReadOptions ro;
+ WriteOptions wo;
+ TraceOptions trace_opts;
+ EnvOptions env_opts;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+
+ // test the trace file sampling options
+ trace_opts.sampling_frequency = 2;
+ std::string trace_filename = dbname_ + "/rocksdb.trace_sampling";
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+ ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+ ASSERT_OK(Put(0, "a", "1"));
+ ASSERT_OK(Put(0, "b", "2"));
+ ASSERT_OK(Put(0, "c", "3"));
+ ASSERT_OK(Put(0, "d", "4"));
+ ASSERT_OK(Put(0, "e", "5"));
+ ASSERT_OK(db_->EndTrace());
+
+ std::string dbname2 = test::TmpDir(env_) + "/db_replay_sampling";
+ std::string value;
+ ASSERT_OK(DestroyDB(dbname2, options));
+
+ // Using a different name than db2, to pacify infer's use-after-lifetime
+ // warnings (http://fbinfer.com).
+ DB* db2_init = nullptr;
+ options.create_if_missing = true;
+ ASSERT_OK(DB::Open(options, dbname2, &db2_init));
+ ColumnFamilyHandle* cf;
+ ASSERT_OK(
+ db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
+ delete cf;
+ delete db2_init;
+
+ DB* db2 = nullptr;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ ColumnFamilyOptions cf_options;
+ column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+ column_families.push_back(
+ ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+ std::vector<ColumnFamilyHandle*> handles;
+ ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
+
+ env_->SleepForMicroseconds(100);
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
+
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
+ Replayer replayer(db2, handles_, std::move(trace_reader));
+ ASSERT_OK(replayer.Replay());
+
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
+ ASSERT_FALSE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
+
+ for (auto handle : handles) {
+ delete handle;
+ }
+ delete db2;
+ ASSERT_OK(DestroyDB(dbname2, options));
+}
+
+TEST_F(DBTest2, TraceWithFilter) {
+ Options options = CurrentOptions();
+ options.merge_operator = MergeOperators::CreatePutOperator();
+ ReadOptions ro;
+ WriteOptions wo;
+ TraceOptions trace_opts;
+ EnvOptions env_opts;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+ Iterator* single_iter = nullptr;
+
+ trace_opts.filter = TraceFilterType::kTraceFilterWrite;
+
+ std::string trace_filename = dbname_ + "/rocksdb.trace";
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
+ ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
+
+ ASSERT_OK(Put(0, "a", "1"));
+ ASSERT_OK(Merge(0, "b", "2"));
+ ASSERT_OK(Delete(0, "c"));
+ ASSERT_OK(SingleDelete(0, "d"));
+ ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
+
+ WriteBatch batch;
+ ASSERT_OK(batch.Put("f", "11"));
+ ASSERT_OK(batch.Merge("g", "12"));
+ ASSERT_OK(batch.Delete("h"));
+ ASSERT_OK(batch.SingleDelete("i"));
+ ASSERT_OK(batch.DeleteRange("j", "k"));
+ ASSERT_OK(db_->Write(wo, &batch));
+
+ single_iter = db_->NewIterator(ro);
+ single_iter->Seek("f");
+ single_iter->SeekForPrev("g");
+ delete single_iter;
+
+ ASSERT_EQ("1", Get(0, "a"));
+ ASSERT_EQ("12", Get(0, "g"));
+
+ ASSERT_OK(Put(1, "foo", "bar"));
+ ASSERT_OK(Put(1, "rocksdb", "rocks"));
+ ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
+
+ ASSERT_OK(db_->EndTrace());
+ // These should not get into the trace file as it is after EndTrace.
+ Put("hello", "world");
+ Merge("foo", "bar");
+
+ // Open another db, replay, and verify the data
+ std::string value;
+ std::string dbname2 = test::TmpDir(env_) + "/db_replay";
+ ASSERT_OK(DestroyDB(dbname2, options));
+
+ // Using a different name than db2, to pacify infer's use-after-lifetime
+ // warnings (http://fbinfer.com).
+ DB* db2_init = nullptr;
+ options.create_if_missing = true;
+ ASSERT_OK(DB::Open(options, dbname2, &db2_init));
+ ColumnFamilyHandle* cf;
+ ASSERT_OK(
+ db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
+ delete cf;
+ delete db2_init;
+
+ DB* db2 = nullptr;
+ std::vector<ColumnFamilyDescriptor> column_families;
+ ColumnFamilyOptions cf_options;
+ cf_options.merge_operator = MergeOperators::CreatePutOperator();
+ column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+ column_families.push_back(
+ ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+ std::vector<ColumnFamilyHandle*> handles;
+ ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
+
+ env_->SleepForMicroseconds(100);
+ // Verify that the keys don't already exist
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
+
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
+ Replayer replayer(db2, handles_, std::move(trace_reader));
+ ASSERT_OK(replayer.Replay());
+
+ // All the key-values should not present since we filter out the WRITE ops.
+ ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "foo", &value).IsNotFound());
+ ASSERT_TRUE(db2->Get(ro, handles[0], "rocksdb", &value).IsNotFound());
+
+ for (auto handle : handles) {
+ delete handle;
+ }
+ delete db2;
+ ASSERT_OK(DestroyDB(dbname2, options));
+
+ // Set up a new db.
+ std::string dbname3 = test::TmpDir(env_) + "/db_not_trace_read";
+ ASSERT_OK(DestroyDB(dbname3, options));
+
+ DB* db3_init = nullptr;
+ options.create_if_missing = true;
+ ColumnFamilyHandle* cf3;
+ ASSERT_OK(DB::Open(options, dbname3, &db3_init));
+ ASSERT_OK(
+ db3_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf3));
+ delete cf3;
+ delete db3_init;
+
+ column_families.clear();
+ column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
+ column_families.push_back(
+ ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
+ handles.clear();
+
+ DB* db3 = nullptr;
+ ASSERT_OK(DB::Open(DBOptions(), dbname3, column_families, &handles, &db3));
+
+ env_->SleepForMicroseconds(100);
+ // Verify that the keys don't already exist
+ ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound());
+ ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound());
+
+ //The tracer will not record the READ ops.
+ trace_opts.filter = TraceFilterType::kTraceFilterGet;
+ std::string trace_filename3 = dbname_ + "/rocksdb.trace_3";
+ std::unique_ptr<TraceWriter> trace_writer3;
+ ASSERT_OK(
+ NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
+ ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3)));
+
+ ASSERT_OK(db3->Put(wo, handles[0], "a", "1"));
+ ASSERT_OK(db3->Merge(wo, handles[0], "b", "2"));
+ ASSERT_OK(db3->Delete(wo, handles[0], "c"));
+ ASSERT_OK(db3->SingleDelete(wo, handles[0], "d"));
+
+ ASSERT_OK(db3->Get(ro, handles[0], "a", &value));
+ ASSERT_EQ(value, "1");
+ ASSERT_TRUE(db3->Get(ro, handles[0], "c", &value).IsNotFound());
+
+ ASSERT_OK(db3->EndTrace());
+
+ for (auto handle : handles) {
+ delete handle;
+ }
+ delete db3;
+ ASSERT_OK(DestroyDB(dbname3, options));
+
+ std::unique_ptr<TraceReader> trace_reader3;
+ ASSERT_OK(
+ NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
+
+ // Count the number of records in the trace file;
+ int count = 0;
+ std::string data;
+ Status s;
+ while (true) {
+ s = trace_reader3->Read(&data);
+ if (!s.ok()) {
+ break;
+ }
+ count += 1;
+ }
+ // We also need to count the header and footer
+ // 4 WRITE + HEADER + FOOTER = 6
+ ASSERT_EQ(count, 6);
+}
+
+#endif // ROCKSDB_LITE
+
+TEST_F(DBTest2, PinnableSliceAndMmapReads) {
+ Options options = CurrentOptions();
+ options.allow_mmap_reads = true;
+ options.max_open_files = 100;
+ options.compression = kNoCompression;
+ Reopen(options);
+
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Flush());
+
+ PinnableSlice pinned_value;
+ ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
+ // It is not safe to pin mmap files as they might disappear by compaction
+ ASSERT_FALSE(pinned_value.IsPinned());
+ ASSERT_EQ(pinned_value.ToString(), "bar");
+
+ dbfull()->TEST_CompactRange(0 /* level */, nullptr /* begin */,
+ nullptr /* end */, nullptr /* column_family */,
+ true /* disallow_trivial_move */);
+
+ // Ensure pinned_value doesn't rely on memory munmap'd by the above
+ // compaction. It crashes if it does.
+ ASSERT_EQ(pinned_value.ToString(), "bar");
+
+#ifndef ROCKSDB_LITE
+ pinned_value.Reset();
+ // Unsafe to pin mmap files when they could be kicked out of table cache
+ Close();
+ ASSERT_OK(ReadOnlyReopen(options));
+ ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
+ ASSERT_FALSE(pinned_value.IsPinned());
+ ASSERT_EQ(pinned_value.ToString(), "bar");
+
+ pinned_value.Reset();
+ // In read-only mode with infinite capacity on table cache it should pin the
+ // value and avoid the memcpy
+ Close();
+ options.max_open_files = -1;
+ ASSERT_OK(ReadOnlyReopen(options));
+ ASSERT_EQ(Get("foo", &pinned_value), Status::OK());
+ ASSERT_TRUE(pinned_value.IsPinned());
+ ASSERT_EQ(pinned_value.ToString(), "bar");
+#endif
+}
+
+TEST_F(DBTest2, DISABLED_IteratorPinnedMemory) {
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ options.statistics = rocksdb::CreateDBStatistics();
+ BlockBasedTableOptions bbto;
+ bbto.no_block_cache = false;
+ bbto.cache_index_and_filter_blocks = false;
+ bbto.block_cache = NewLRUCache(100000);
+ bbto.block_size = 400; // small block size
+ options.table_factory.reset(new BlockBasedTableFactory(bbto));
+ Reopen(options);
+
+ Random rnd(301);
+ std::string v = RandomString(&rnd, 400);
+
+ // Since v is the size of a block, each key should take a block
+ // of 400+ bytes.
+ Put("1", v);
+ Put("3", v);
+ Put("5", v);
+ Put("7", v);
+ ASSERT_OK(Flush());
+
+ ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
+
+ // Verify that iterators don't pin more than one data block in block cache
+ // at each time.
+ {
+ std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
+ iter->SeekToFirst();
+
+ for (int i = 0; i < 4; i++) {
+ ASSERT_TRUE(iter->Valid());
+ // Block cache should contain exactly one block.
+ ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
+ ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
+ iter->Next();
+ }
+ ASSERT_FALSE(iter->Valid());
+
+ iter->Seek("4");
+ ASSERT_TRUE(iter->Valid());
+
+ ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
+ ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
+
+ iter->Seek("3");
+ ASSERT_TRUE(iter->Valid());
+
+ ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
+ ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
+ }
+ ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
+
+ // Test compaction case
+ Put("2", v);
+ Put("5", v);
+ Put("6", v);
+ Put("8", v);
+ ASSERT_OK(Flush());
+
+ // Clear existing data in block cache
+ bbto.block_cache->SetCapacity(0);
+ bbto.block_cache->SetCapacity(100000);
+
+ // Verify compaction input iterators don't hold more than one data blocks at
+ // one time.
+ std::atomic<bool> finished(false);
+ std::atomic<int> block_newed(0);
+ std::atomic<int> block_destroyed(0);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "Block::Block:0", [&](void* /*arg*/) {
+ if (finished) {
+ return;
+ }
+ // Two iterators. At most 2 outstanding blocks.
+ EXPECT_GE(block_newed.load(), block_destroyed.load());
+ EXPECT_LE(block_newed.load(), block_destroyed.load() + 1);
+ block_newed.fetch_add(1);
+ });
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "Block::~Block", [&](void* /*arg*/) {
+ if (finished) {
+ return;
+ }
+ // Two iterators. At most 2 outstanding blocks.
+ EXPECT_GE(block_newed.load(), block_destroyed.load() + 1);
+ EXPECT_LE(block_newed.load(), block_destroyed.load() + 2);
+ block_destroyed.fetch_add(1);
+ });
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::Run:BeforeVerify",
+ [&](void* /*arg*/) { finished = true; });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ // Two input files. Each of them has 4 data blocks.
+ ASSERT_EQ(8, block_newed.load());
+ ASSERT_EQ(8, block_destroyed.load());
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBTest2, TestBBTTailPrefetch) {
+ std::atomic<bool> called(false);
+ size_t expected_lower_bound = 512 * 1024;
+ size_t expected_higher_bound = 512 * 1024;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
+ size_t* prefetch_size = static_cast<size_t*>(arg);
+ EXPECT_LE(expected_lower_bound, *prefetch_size);
+ EXPECT_GE(expected_higher_bound, *prefetch_size);
+ called = true;
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ Put("1", "1");
+ Put("9", "1");
+ Flush();
+
+ expected_lower_bound = 0;
+ expected_higher_bound = 8 * 1024;
+
+ Put("1", "1");
+ Put("9", "1");
+ Flush();
+
+ Put("1", "1");
+ Put("9", "1");
+ Flush();
+
+ // Full compaction to make sure there is no L0 file after the open.
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ ASSERT_TRUE(called.load());
+ called = false;
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ std::atomic<bool> first_call(true);
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
+ size_t* prefetch_size = static_cast<size_t*>(arg);
+ if (first_call) {
+ EXPECT_EQ(4 * 1024, *prefetch_size);
+ first_call = false;
+ } else {
+ EXPECT_GE(4 * 1024, *prefetch_size);
+ }
+ called = true;
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ Options options = CurrentOptions();
+ options.max_file_opening_threads = 1; // one thread
+ BlockBasedTableOptions table_options;
+ table_options.cache_index_and_filter_blocks = true;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ options.max_open_files = -1;
+ Reopen(options);
+
+ Put("1", "1");
+ Put("9", "1");
+ Flush();
+
+ Put("1", "1");
+ Put("9", "1");
+ Flush();
+
+ ASSERT_TRUE(called.load());
+ called = false;
+
+ // Parallel loading SST files
+ options.max_file_opening_threads = 16;
+ Reopen(options);
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ ASSERT_TRUE(called.load());
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
+ // Setup sync point dependency to reproduce the race condition of
+ // DBImpl::GetColumnFamilyHandleUnlocked
+ rocksdb::SyncPoint::GetInstance()->LoadDependency(
+ { {"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1",
+ "TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2"},
+ {"TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2",
+ "TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ CreateColumnFamilies({"test1", "test2"}, Options());
+ ASSERT_EQ(handles_.size(), 2);
+
+ DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
+ port::Thread user_thread1([&]() {
+ auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[0]->GetID());
+ ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
+ TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked1");
+ TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::ReadColumnFamilyHandle1");
+ ASSERT_EQ(cfh->GetID(), handles_[0]->GetID());
+ });
+
+ port::Thread user_thread2([&]() {
+ TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::PreGetColumnFamilyHandleUnlocked2");
+ auto cfh = dbi->GetColumnFamilyHandleUnlocked(handles_[1]->GetID());
+ ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
+ TEST_SYNC_POINT("TestGetColumnFamilyHandleUnlocked::GetColumnFamilyHandleUnlocked2");
+ ASSERT_EQ(cfh->GetID(), handles_[1]->GetID());
+ });
+
+ user_thread1.join();
+ user_thread2.join();
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBTest2, TestCompactFiles) {
+ // Setup sync point dependency to reproduce the race condition of
+ // DBImpl::GetColumnFamilyHandleUnlocked
+ rocksdb::SyncPoint::GetInstance()->LoadDependency({
+ {"TestCompactFiles::IngestExternalFile1",
+ "TestCompactFiles::IngestExternalFile2"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Options options;
+ options.num_levels = 2;
+ options.disable_auto_compactions = true;
+ Reopen(options);
+ auto* handle = db_->DefaultColumnFamily();
+ ASSERT_EQ(db_->NumberLevels(handle), 2);
+
+ rocksdb::SstFileWriter sst_file_writer{rocksdb::EnvOptions(), options};
+ std::string external_file1 = dbname_ + "/test_compact_files1.sst_t";
+ std::string external_file2 = dbname_ + "/test_compact_files2.sst_t";
+ std::string external_file3 = dbname_ + "/test_compact_files3.sst_t";
+
+ ASSERT_OK(sst_file_writer.Open(external_file1));
+ ASSERT_OK(sst_file_writer.Put("1", "1"));
+ ASSERT_OK(sst_file_writer.Put("2", "2"));
+ ASSERT_OK(sst_file_writer.Finish());
+
+ ASSERT_OK(sst_file_writer.Open(external_file2));
+ ASSERT_OK(sst_file_writer.Put("3", "3"));
+ ASSERT_OK(sst_file_writer.Put("4", "4"));
+ ASSERT_OK(sst_file_writer.Finish());
+
+ ASSERT_OK(sst_file_writer.Open(external_file3));
+ ASSERT_OK(sst_file_writer.Put("5", "5"));
+ ASSERT_OK(sst_file_writer.Put("6", "6"));
+ ASSERT_OK(sst_file_writer.Finish());
+
+ ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3},
+ IngestExternalFileOptions()));
+ ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
+ std::vector<std::string> files;
+ GetSstFiles(env_, dbname_, &files);
+ ASSERT_EQ(files.size(), 2);
+
+ port::Thread user_thread1(
+ [&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });
+
+ port::Thread user_thread2([&]() {
+ ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
+ IngestExternalFileOptions()));
+ TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
+ });
+
+ user_thread1.join();
+ user_thread2.join();
+
+ rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+#endif // ROCKSDB_LITE
+
+// TODO: figure out why this test fails in appveyor
+#ifndef OS_WIN
+TEST_F(DBTest2, MultiDBParallelOpenTest) {
+ const int kNumDbs = 2;
+ Options options = CurrentOptions();
+ std::vector<std::string> dbnames;
+ for (int i = 0; i < kNumDbs; ++i) {
+ dbnames.emplace_back(test::TmpDir(env_) + "/db" + ToString(i));
+ ASSERT_OK(DestroyDB(dbnames.back(), options));
+ }
+
+ // Verify empty DBs can be created in parallel
+ std::vector<std::thread> open_threads;
+ std::vector<DB*> dbs{static_cast<unsigned int>(kNumDbs), nullptr};
+ options.create_if_missing = true;
+ for (int i = 0; i < kNumDbs; ++i) {
+ open_threads.emplace_back(
+ [&](int dbnum) {
+ ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
+ },
+ i);
+ }
+
+ // Now add some data and close, so next we can verify non-empty DBs can be
+ // recovered in parallel
+ for (int i = 0; i < kNumDbs; ++i) {
+ open_threads[i].join();
+ ASSERT_OK(dbs[i]->Put(WriteOptions(), "xi", "gua"));
+ delete dbs[i];
+ }
+
+ // Verify non-empty DBs can be recovered in parallel
+ dbs.clear();
+ open_threads.clear();
+ for (int i = 0; i < kNumDbs; ++i) {
+ open_threads.emplace_back(
+ [&](int dbnum) {
+ ASSERT_OK(DB::Open(options, dbnames[dbnum], &dbs[dbnum]));
+ },
+ i);
+ }
+
+ // Wait and cleanup
+ for (int i = 0; i < kNumDbs; ++i) {
+ open_threads[i].join();
+ delete dbs[i];
+ ASSERT_OK(DestroyDB(dbnames[i], options));
+ }
+}
+#endif // OS_WIN
+
+namespace {
+class DummyOldStats : public Statistics {
+ public:
+ uint64_t getTickerCount(uint32_t /*ticker_type*/) const override { return 0; }
+ void recordTick(uint32_t /* ticker_type */, uint64_t /* count */) override {
+ num_rt++;
+ }
+ void setTickerCount(uint32_t /*ticker_type*/, uint64_t /*count*/) override {}
+ uint64_t getAndResetTickerCount(uint32_t /*ticker_type*/) override {
+ return 0;
+ }
+ void measureTime(uint32_t /*histogram_type*/, uint64_t /*count*/) override {
+ num_mt++;
+ }
+ void histogramData(uint32_t /*histogram_type*/,
+ rocksdb::HistogramData* const /*data*/) const override {}
+ std::string getHistogramString(uint32_t /*type*/) const override {
+ return "";
+ }
+ bool HistEnabledForType(uint32_t /*type*/) const override { return false; }
+ std::string ToString() const override { return ""; }
+ int num_rt = 0;
+ int num_mt = 0;
+};
+} // namespace
+
+TEST_F(DBTest2, OldStatsInterface) {
+ DummyOldStats* dos = new DummyOldStats();
+ std::shared_ptr<Statistics> stats(dos);
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ options.statistics = stats;
+ Reopen(options);
+
+ Put("foo", "bar");
+ ASSERT_EQ("bar", Get("foo"));
+ ASSERT_OK(Flush());
+ ASSERT_EQ("bar", Get("foo"));
+
+ ASSERT_GT(dos->num_rt, 0);
+ ASSERT_GT(dos->num_mt, 0);
+}
+} // namespace rocksdb
+
+int main(int argc, char** argv) {
+ rocksdb::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}