diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/file/prefetch_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/file/prefetch_test.cc | 2109 |
1 files changed, 2109 insertions, 0 deletions
diff --git a/src/rocksdb/file/prefetch_test.cc b/src/rocksdb/file/prefetch_test.cc new file mode 100644 index 000000000..438286bfc --- /dev/null +++ b/src/rocksdb/file/prefetch_test.cc @@ -0,0 +1,2109 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "file/file_prefetch_buffer.h" +#include "file/file_util.h" +#include "rocksdb/file_system.h" +#include "test_util/sync_point.h" +#ifdef GFLAGS +#include "tools/io_tracer_parser_tool.h" +#endif +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +class MockFS; + +class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper { + public: + MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file, + bool support_prefetch, std::atomic_int& prefetch_count) + : FSRandomAccessFileOwnerWrapper(std::move(file)), + support_prefetch_(support_prefetch), + prefetch_count_(prefetch_count) {} + + IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) override { + if (support_prefetch_) { + prefetch_count_.fetch_add(1); + return target()->Prefetch(offset, n, options, dbg); + } else { + return IOStatus::NotSupported("Prefetch not supported"); + } + } + + private: + const bool support_prefetch_; + std::atomic_int& prefetch_count_; +}; + +class MockFS : public FileSystemWrapper { + public: + explicit MockFS(const std::shared_ptr<FileSystem>& wrapped, + bool support_prefetch) + : FileSystemWrapper(wrapped), support_prefetch_(support_prefetch) {} + + static const char* kClassName() { return "MockFS"; } + const char* Name() const override { return kClassName(); } + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& opts, + std::unique_ptr<FSRandomAccessFile>* result, + IODebugContext* dbg) override { + std::unique_ptr<FSRandomAccessFile> file; + IOStatus s; + s = target()->NewRandomAccessFile(fname, opts, &file, dbg); + result->reset( + new MockRandomAccessFile(file, support_prefetch_, prefetch_count_)); + return s; + } + + void ClearPrefetchCount() { prefetch_count_ = 0; } + + bool IsPrefetchCalled() { return prefetch_count_ > 0; } + + int GetPrefetchCount() { + return prefetch_count_.load(std::memory_order_relaxed); + } + + private: + const bool support_prefetch_; + std::atomic_int prefetch_count_{0}; +}; + +class PrefetchTest + : public DBTestBase, + public ::testing::WithParamInterface<std::tuple<bool, bool>> { + public: + PrefetchTest() : DBTestBase("prefetch_test", true) {} +}; + +INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest, + ::testing::Combine(::testing::Bool(), + ::testing::Bool())); + +std::string BuildKey(int num, std::string postfix = "") { + return "my_key_" + std::to_string(num) + postfix; +} + +TEST_P(PrefetchTest, Basic) { + // First param is if the mockFS support_prefetch or not + bool support_prefetch = + std::get<0>(GetParam()) && + test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + + // Second param is if directIO is enabled or not + bool use_direct_io = std::get<1>(GetParam()); + const int kNumKeys = 1100; + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + // create first key range + WriteBatch batch; + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), "value for range 1 key")); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + // create second key range + batch.Clear(); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i, "key2"), "value for range 2 key")); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + // delete second key range + batch.Clear(); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Delete(BuildKey(i, "key2"))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + // compact database + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + // commenting out the line below causes the example to work correctly + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + if (support_prefetch && !use_direct_io) { + // If underline file system supports prefetch, and directIO is not enabled + // make sure prefetch() is called and FilePrefetchBuffer is not used. + ASSERT_TRUE(fs->IsPrefetchCalled()); + fs->ClearPrefetchCount(); + ASSERT_EQ(0, buff_prefetch_count); + } else { + // If underline file system doesn't support prefetch, or directIO is + // enabled, make sure prefetch() is not called and FilePrefetchBuffer is + // used. + ASSERT_FALSE(fs->IsPrefetchCalled()); + ASSERT_GT(buff_prefetch_count, 0); + buff_prefetch_count = 0; + } + + // count the keys + { + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + num_keys++; + } + } + + // Make sure prefetch is called only if file system support prefetch. + if (support_prefetch && !use_direct_io) { + ASSERT_TRUE(fs->IsPrefetchCalled()); + fs->ClearPrefetchCount(); + ASSERT_EQ(0, buff_prefetch_count); + } else { + ASSERT_FALSE(fs->IsPrefetchCalled()); + ASSERT_GT(buff_prefetch_count, 0); + buff_prefetch_count = 0; + } + Close(); +} + +#ifndef ROCKSDB_LITE +TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) { + // First param is if the mockFS support_prefetch or not + bool support_prefetch = + std::get<0>(GetParam()) && + test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + + // Second param is if directIO is enabled or not + bool use_direct_io = std::get<1>(GetParam()); + + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.disable_auto_compactions = true; + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.max_auto_readahead_size = 0; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + + // DB open will create table readers unless we reduce the table cache + // capacity. SanitizeOptions will set max_open_files to minimum of 20. Table + // cache is allocated with max_open_files - 10 as capacity. So override + // max_open_files to 10 so table cache capacity will become 0. This will + // prevent file open during DB open and force the file to be opened during + // Iteration. + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = TryReopen(options); + + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + Random rnd(309); + int key_count = 0; + const int num_keys_per_level = 100; + // Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299]. + for (int level = 2; level >= 0; level--) { + key_count = level * num_keys_per_level; + for (int i = 0; i < num_keys_per_level; ++i) { + ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500))); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(level); + } + Close(); + std::vector<int> buff_prefectch_level_count = {0, 0, 0}; + TryReopen(options); + { + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + fs->ClearPrefetchCount(); + buff_prefetch_count = 0; + + for (int level = 2; level >= 0; level--) { + key_count = level * num_keys_per_level; + switch (level) { + case 0: + // max_auto_readahead_size is set 0 so data and index blocks are not + // prefetched. + ASSERT_OK(db_->SetOptions( + {{"block_based_table_factory", "{max_auto_readahead_size=0;}"}})); + break; + case 1: + // max_auto_readahead_size is set less than + // initial_auto_readahead_size. So readahead_size remains equal to + // max_auto_readahead_size. + ASSERT_OK(db_->SetOptions({{"block_based_table_factory", + "{max_auto_readahead_size=4096;}"}})); + break; + case 2: + ASSERT_OK(db_->SetOptions({{"block_based_table_factory", + "{max_auto_readahead_size=65536;}"}})); + break; + default: + assert(false); + } + + for (int i = 0; i < num_keys_per_level; ++i) { + iter->Seek(Key(key_count++)); + iter->Next(); + } + + buff_prefectch_level_count[level] = buff_prefetch_count; + if (support_prefetch && !use_direct_io) { + if (level == 0) { + ASSERT_FALSE(fs->IsPrefetchCalled()); + } else { + ASSERT_TRUE(fs->IsPrefetchCalled()); + } + fs->ClearPrefetchCount(); + } else { + ASSERT_FALSE(fs->IsPrefetchCalled()); + if (level == 0) { + ASSERT_EQ(buff_prefetch_count, 0); + } else { + ASSERT_GT(buff_prefetch_count, 0); + } + buff_prefetch_count = 0; + } + } + } + + if (!support_prefetch) { + ASSERT_GT(buff_prefectch_level_count[1], buff_prefectch_level_count[2]); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); +} + +TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) { + // First param is if the mockFS support_prefetch or not + bool support_prefetch = + std::get<0>(GetParam()) && + test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + + // Second param is if directIO is enabled or not + bool use_direct_io = std::get<1>(GetParam()); + + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.disable_auto_compactions = true; + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.initial_auto_readahead_size = 0; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + int buff_prefetch_count = 0; + // DB open will create table readers unless we reduce the table cache + // capacity. SanitizeOptions will set max_open_files to minimum of 20. + // Table cache is allocated with max_open_files - 10 as capacity. So + // override max_open_files to 10 so table cache capacity will become 0. + // This will prevent file open during DB open and force the file to be + // opened during Iteration. + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->EnableProcessing(); + + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = TryReopen(options); + + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + Random rnd(309); + int key_count = 0; + const int num_keys_per_level = 100; + // Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299]. + for (int level = 2; level >= 0; level--) { + key_count = level * num_keys_per_level; + for (int i = 0; i < num_keys_per_level; ++i) { + ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500))); + } + ASSERT_OK(Flush()); + MoveFilesToLevel(level); + } + Close(); + + TryReopen(options); + { + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + fs->ClearPrefetchCount(); + buff_prefetch_count = 0; + std::vector<int> buff_prefetch_level_count = {0, 0, 0}; + + for (int level = 2; level >= 0; level--) { + key_count = level * num_keys_per_level; + switch (level) { + case 0: + // initial_auto_readahead_size is set 0 so data and index blocks are + // not prefetched. + ASSERT_OK(db_->SetOptions({{"block_based_table_factory", + "{initial_auto_readahead_size=0;}"}})); + break; + case 1: + // intial_auto_readahead_size and max_auto_readahead_size are set same + // so readahead_size remains same. + ASSERT_OK(db_->SetOptions({{"block_based_table_factory", + "{initial_auto_readahead_size=4096;max_" + "auto_readahead_size=4096;}"}})); + break; + case 2: + ASSERT_OK( + db_->SetOptions({{"block_based_table_factory", + "{initial_auto_readahead_size=65536;}"}})); + break; + default: + assert(false); + } + + for (int i = 0; i < num_keys_per_level; ++i) { + iter->Seek(Key(key_count++)); + iter->Next(); + } + + buff_prefetch_level_count[level] = buff_prefetch_count; + if (support_prefetch && !use_direct_io) { + if (level == 0) { + ASSERT_FALSE(fs->IsPrefetchCalled()); + } else { + ASSERT_TRUE(fs->IsPrefetchCalled()); + } + fs->ClearPrefetchCount(); + } else { + ASSERT_FALSE(fs->IsPrefetchCalled()); + if (level == 0) { + ASSERT_EQ(buff_prefetch_count, 0); + } else { + ASSERT_GT(buff_prefetch_count, 0); + } + buff_prefetch_count = 0; + } + } + if (!support_prefetch) { + ASSERT_GT(buff_prefetch_level_count[1], buff_prefetch_level_count[2]); + } + } + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); +} + +TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) { + // First param is if the mockFS support_prefetch or not + bool support_prefetch = + std::get<0>(GetParam()) && + test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + + const int kNumKeys = 2000; + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + // Second param is if directIO is enabled or not + bool use_direct_io = std::get<1>(GetParam()); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.num_file_reads_for_auto_readahead = 0; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + Close(); + TryReopen(options); + + fs->ClearPrefetchCount(); + buff_prefetch_count = 0; + + { + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + /* + * Reseek keys from sequential Data Blocks within same partitioned + * index. It will prefetch the data block at the first seek since + * num_file_reads_for_auto_readahead = 0. Data Block size is nearly 4076 so + * readahead will fetch 8 * 1024 data more initially (2 more data blocks). + */ + iter->Seek(BuildKey(0)); // Prefetch data + index block since + // num_file_reads_for_auto_readahead = 0. + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1000)); // In buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1004)); // In buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1008)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1011)); // In buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1015)); // In buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1019)); // In buffer + ASSERT_TRUE(iter->Valid()); + // Missed 2 blocks but they are already in buffer so no reset. + iter->Seek(BuildKey(103)); // Already in buffer. + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1033)); // Prefetch Data. + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 4); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 4); + buff_prefetch_count = 0; + } + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); +} +#endif // !ROCKSDB_LITE + +TEST_P(PrefetchTest, PrefetchWhenReseek) { + // First param is if the mockFS support_prefetch or not + bool support_prefetch = + std::get<0>(GetParam()) && + test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + + const int kNumKeys = 2000; + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + // Second param is if directIO is enabled or not + bool use_direct_io = std::get<1>(GetParam()); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + fs->ClearPrefetchCount(); + buff_prefetch_count = 0; + + { + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + /* + * Reseek keys from sequential Data Blocks within same partitioned + * index. After 2 sequential reads it will prefetch the data block. + * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more + * initially (2 more data blocks). + */ + iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1000)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1004)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1008)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1015)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); + // Missed 2 blocks but they are already in buffer so no reset. + iter->Seek(BuildKey(103)); // Already in buffer. + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1033)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 3); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 3); + buff_prefetch_count = 0; + } + } + { + /* + * Reseek keys from non sequential data blocks within same partitioned + * index. buff_prefetch_count will be 0 in that case. + */ + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1008)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1033)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1048)); + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 0); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 0); + buff_prefetch_count = 0; + } + } + { + /* + * Reesek keys from Single Data Block. + */ + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(10)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(100)); + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 0); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 0); + buff_prefetch_count = 0; + } + } + { + /* + * Reseek keys from sequential data blocks to set implicit auto readahead + * and prefetch data but after that iterate over different (non sequential) + * data blocks which won't prefetch any data further. So buff_prefetch_count + * will be 1 for the first one. + */ + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1000)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1004)); // This iteration will prefetch buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1008)); + ASSERT_TRUE(iter->Valid()); + iter->Seek( + BuildKey(996)); // Reseek won't prefetch any data and + // readahead_size will be initiallized to 8*1024. + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(992)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(989)); + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 1); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 1); + buff_prefetch_count = 0; + } + + // Read sequentially to confirm readahead_size is reset to initial value (2 + // more data blocks) + iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1019)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1022)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1026)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(103)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 2); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 2); + buff_prefetch_count = 0; + } + } + { + /* Reseek keys from sequential partitioned index block. Since partitioned + * index fetch are sequential, buff_prefetch_count will be 1. + */ + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1167)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1334)); // This iteration will prefetch buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1499)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1667)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1847)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1999)); + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 1); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 1); + buff_prefetch_count = 0; + } + } + { + /* + * Reseek over different keys from different blocks. buff_prefetch_count is + * set 0. + */ + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + int i = 0; + int j = 1000; + do { + iter->Seek(BuildKey(i)); + if (!iter->Valid()) { + break; + } + i = i + 100; + iter->Seek(BuildKey(j)); + j = j + 100; + } while (i < 1000 && j < kNumKeys && iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 0); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 0); + buff_prefetch_count = 0; + } + } + { + /* Iterates sequentially over all keys. It will prefetch the buffer.*/ + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + } + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 13); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 13); + buff_prefetch_count = 0; + } + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); +} + +TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { + // First param is if the mockFS support_prefetch or not + bool support_prefetch = + std::get<0>(GetParam()) && + test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + + const int kNumKeys = 2000; + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + // Second param is if directIO is enabled or not + bool use_direct_io = std::get<1>(GetParam()); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + + BlockBasedTableOptions table_options; + std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB + table_options.block_cache = cache; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + fs->ClearPrefetchCount(); + buff_prefetch_count = 0; + + { + /* + * Reseek keys from sequential Data Blocks within same partitioned + * index. After 2 sequential reads it will prefetch the data block. + * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more + * initially (2 more data blocks). + */ + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + // Warm up the cache + iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 1); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 1); + buff_prefetch_count = 0; + } + } + { + // After caching, blocks will be read from cache (Sequential blocks) + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1000)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1004)); // Prefetch data (not in cache). + ASSERT_TRUE(iter->Valid()); + // Missed one sequential block but next is in already in buffer so readahead + // will not be reset. + iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); + // Prefetch data but blocks are in cache so no prefetch and reset. + iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1022)); + ASSERT_TRUE(iter->Valid()); + // Prefetch data with readahead_size = 4 blocks. + iter->Seek(BuildKey(1026)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(103)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1033)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1037)); + ASSERT_TRUE(iter->Valid()); + + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 3); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 2); + buff_prefetch_count = 0; + } + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); +} + +#ifndef ROCKSDB_LITE +TEST_P(PrefetchTest, DBIterLevelReadAhead) { + const int kNumKeys = 1000; + // Set options + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), false); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + bool is_adaptive_readahead = std::get<1>(GetParam()); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.statistics = CreateDBStatistics(); + options.env = env.get(); + + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + int total_keys = 0; + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + int buff_prefetch_count = 0; + int buff_async_prefetch_count = 0; + int readahead_carry_over_count = 0; + int num_sst_files = NumTableFilesAtLevel(2); + size_t current_readahead_size = 0; + + // Test - Iterate over the keys sequentially. + { + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_async_prefetch_count++; }); + + // The callback checks, since reads are sequential, readahead_size doesn't + // start from 8KB when iterator moves to next file and its called + // num_sst_files-1 times (excluding for first file). + SyncPoint::GetInstance()->SetCallBack( + "BlockPrefetcher::SetReadaheadState", [&](void* arg) { + readahead_carry_over_count++; + size_t readahead_size = *reinterpret_cast<size_t*>(arg); + if (readahead_carry_over_count) { + ASSERT_GT(readahead_size, 8 * 1024); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { + current_readahead_size = *reinterpret_cast<size_t*>(arg); + ASSERT_GT(current_readahead_size, 0); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + ReadOptions ro; + if (is_adaptive_readahead) { + ro.adaptive_readahead = true; + ro.async_io = true; + } + + ASSERT_OK(options.statistics->Reset()); + + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + ASSERT_EQ(num_keys, total_keys); + + // For index and data blocks. + if (is_adaptive_readahead) { + ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); + ASSERT_GT(buff_async_prefetch_count, 0); + } else { + ASSERT_GT(buff_prefetch_count, 0); + ASSERT_EQ(readahead_carry_over_count, 0); + } + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + if (ro.async_io) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + } + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + } + Close(); +} +#endif //! ROCKSDB_LITE + +class PrefetchTest1 : public DBTestBase, + public ::testing::WithParamInterface<bool> { + public: + PrefetchTest1() : DBTestBase("prefetch_test1", true) {} +}; + +INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool()); + +#ifndef ROCKSDB_LITE +TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) { + const int kNumKeys = 1000; + // Set options + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), false); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (GetParam()) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + + int buff_prefetch_count = 0; + int set_readahead = 0; + size_t readahead_size = 0; + + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "BlockPrefetcher::SetReadaheadState", + [&](void* /*arg*/) { set_readahead++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::TryReadFromCache", + [&](void* arg) { readahead_size = *reinterpret_cast<size_t*>(arg); }); + + SyncPoint::GetInstance()->EnableProcessing(); + + { + // Iterate until prefetch is done. + ReadOptions ro; + ro.adaptive_readahead = true; + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + + while (iter->Valid() && buff_prefetch_count == 0) { + iter->Next(); + } + + ASSERT_EQ(readahead_size, 8 * 1024); + ASSERT_EQ(buff_prefetch_count, 1); + ASSERT_EQ(set_readahead, 0); + buff_prefetch_count = 0; + + // Move to last file and check readahead size fallbacks to 8KB. So next + // readahead size after prefetch should be 8 * 1024; + iter->Seek(BuildKey(4004)); + ASSERT_TRUE(iter->Valid()); + + while (iter->Valid() && buff_prefetch_count == 0) { + iter->Next(); + } + + ASSERT_EQ(readahead_size, 8 * 1024); + ASSERT_EQ(set_readahead, 0); + ASSERT_EQ(buff_prefetch_count, 1); + } + Close(); +} +#endif //! ROCKSDB_LITE + +TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { + const int kNumKeys = 2000; + // Set options + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), false); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (GetParam()) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB + table_options.block_cache = cache; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + int buff_prefetch_count = 0; + size_t current_readahead_size = 0; + size_t expected_current_readahead_size = 8 * 1024; + size_t decrease_readahead_size = 8 * 1024; + + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { + current_readahead_size = *reinterpret_cast<size_t*>(arg); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions ro; + ro.adaptive_readahead = true; + { + /* + * Reseek keys from sequential Data Blocks within same partitioned + * index. After 2 sequential reads it will prefetch the data block. + * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data + * more initially (2 more data blocks). + */ + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + // Warm up the cache + iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); + buff_prefetch_count = 0; + } + + { + ASSERT_OK(options.statistics->Reset()); + // After caching, blocks will be read from cache (Sequential blocks) + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + iter->Seek( + BuildKey(0)); // In cache so it will decrease the readahead_size. + ASSERT_TRUE(iter->Valid()); + expected_current_readahead_size = std::max( + decrease_readahead_size, + (expected_current_readahead_size >= decrease_readahead_size + ? (expected_current_readahead_size - decrease_readahead_size) + : 0)); + + iter->Seek(BuildKey(1000)); // Won't prefetch the block. + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(current_readahead_size, expected_current_readahead_size); + + iter->Seek(BuildKey(1004)); // Prefetch the block. + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(current_readahead_size, expected_current_readahead_size); + expected_current_readahead_size *= 2; + + iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); + + // Eligible to Prefetch data (not in buffer) but block is in cache so no + // prefetch will happen and will result in decrease in readahead_size. + // readahead_size will be 8 * 1024 + iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); + expected_current_readahead_size = std::max( + decrease_readahead_size, + (expected_current_readahead_size >= decrease_readahead_size + ? (expected_current_readahead_size - decrease_readahead_size) + : 0)); + + // 1016 is the same block as 1015. So no change in readahead_size. + iter->Seek(BuildKey(1016)); + ASSERT_TRUE(iter->Valid()); + + // Prefetch data (not in buffer) but found in cache. So decrease + // readahead_size. Since it will 0 after decrementing so readahead_size will + // be set to initial value. + iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); + expected_current_readahead_size = std::max( + decrease_readahead_size, + (expected_current_readahead_size >= decrease_readahead_size + ? (expected_current_readahead_size - decrease_readahead_size) + : 0)); + + // Prefetch next sequential data. + iter->Seek(BuildKey(1022)); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(current_readahead_size, expected_current_readahead_size); + ASSERT_EQ(buff_prefetch_count, 2); + + buff_prefetch_count = 0; + } + Close(); +} + +TEST_P(PrefetchTest1, SeekParallelizationTest) { + const int kNumKeys = 2000; + // Set options + std::shared_ptr<MockFS> fs = + std::make_shared<MockFS>(env_->GetFileSystem(), false); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (GetParam()) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + int buff_prefetch_count = 0; + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + { + ASSERT_OK(options.statistics->Reset()); + // Each block contains around 4 keys. + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization. + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // New data block. Since num_file_reads in FilePrefetch after this read is + // 2, it won't go for prefetching. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // Prefetch data. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + ASSERT_EQ(buff_prefetch_count, 2); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } + + buff_prefetch_count = 0; + } + Close(); +} + +extern "C" bool RocksDbIOUringEnable() { return true; } + +namespace { +#ifndef ROCKSDB_LITE +#ifdef GFLAGS +const int kMaxArgCount = 100; +const size_t kArgBufferSize = 100000; + +void RunIOTracerParserTool(std::string trace_file) { + std::vector<std::string> params = {"./io_tracer_parser", + "-io_trace_file=" + trace_file}; + + char arg_buffer[kArgBufferSize]; + char* argv[kMaxArgCount]; + int argc = 0; + int cursor = 0; + for (const auto& arg : params) { + ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize); + ASSERT_LE(argc + 1, kMaxArgCount); + + snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str()); + + argv[argc++] = arg_buffer + cursor; + cursor += static_cast<int>(arg.size()) + 1; + } + ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv)); +} +#endif // GFLAGS +#endif // ROCKSDB_LITE +} // namespace + +// Tests the default implementation of ReadAsync API with PosixFileSystem. +TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + const int kNumKeys = 1000; + std::shared_ptr<MockFS> fs = std::make_shared<MockFS>( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + } + + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys. + { + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + + ASSERT_EQ(num_keys, total_keys); + ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + HistogramData prefetched_bytes_discarded; + options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, + &prefetched_bytes_discarded); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + } + ASSERT_GT(prefetched_bytes_discarded.count, 0); + } + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Close(); +} + +TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + const int kNumKeys = 1000; + std::shared_ptr<MockFS> fs = std::make_shared<MockFS>( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + } + + int num_keys_first_batch = 0; + int num_keys_second_batch = 0; + // Calculate number of keys without async_io for correctness validation. + { + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); + // First Seek. + iter->Seek(BuildKey(450)); + while (iter->Valid() && num_keys_first_batch < 100) { + ASSERT_OK(iter->status()); + num_keys_first_batch++; + iter->Next(); + } + ASSERT_OK(iter->status()); + + iter->Seek(BuildKey(942)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys_second_batch++; + iter->Next(); + } + ASSERT_OK(iter->status()); + } + + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys using seek. + { + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + int num_keys = 0; + // First Seek. + { + iter->Seek(BuildKey(450)); + while (iter->Valid() && num_keys < 100) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Next(); + } + ASSERT_OK(iter->status()); + ASSERT_EQ(num_keys, num_keys_first_batch); + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } + } + } + + // Second Seek. + { + num_keys = 0; + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + + iter->Seek(BuildKey(942)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Next(); + } + ASSERT_OK(iter->status()); + ASSERT_EQ(num_keys, num_keys_second_batch); + + ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + HistogramData prefetched_bytes_discarded; + options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, + &prefetched_bytes_discarded); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } + ASSERT_GT(prefetched_bytes_discarded.count, 0); + } + } + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); +} + +TEST_P(PrefetchTest, SeekParallelizationTest1) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + const int kNumKeys = 2000; + // Set options + std::shared_ptr<MockFS> fs = std::make_shared<MockFS>( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + int buff_prefetch_count = 0; + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + bool read_async_called = false; + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } + + { + ASSERT_OK(options.statistics->Reset()); + // Each block contains around 4 keys. + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization. + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // New data block. Since num_file_reads in FilePrefetch after this read is + // 2, it won't go for prefetching. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // Prefetch data. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + if (std::get<1>(GetParam())) { + ASSERT_EQ(buff_prefetch_count, 1); + } else { + ASSERT_EQ(buff_prefetch_count, 2); + } + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + ASSERT_EQ(buff_prefetch_count, 1); + } + } + + buff_prefetch_count = 0; + } + Close(); +} + +#ifndef ROCKSDB_LITE +#ifdef GFLAGS +TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + const int kNumKeys = 1000; + std::shared_ptr<MockFS> fs = std::make_shared<MockFS>( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + } + + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys. + { + // Start io_tracing. + WriteOptions write_opt; + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + std::string trace_file_path = dbname_ + "/io_trace_file"; + + ASSERT_OK( + NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer)); + ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer))); + ASSERT_OK(options.statistics->Reset()); + + auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + + // End the tracing. + ASSERT_OK(db_->EndIOTrace()); + ASSERT_OK(env_->FileExists(trace_file_path)); + + ASSERT_EQ(num_keys, total_keys); + ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + } + } + + // Check the file to see if ReadAsync is logged. + RunIOTracerParserTool(trace_file_path); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + Close(); +} +#endif // GFLAGS + +class FilePrefetchBufferTest : public testing::Test { + public: + void SetUp() override { + SetupSyncPointsToMockDirectIO(); + env_ = Env::Default(); + fs_ = FileSystem::Default(); + test_dir_ = test::PerThreadDBPath("file_prefetch_buffer_test"); + ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); + } + + void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); } + + void Write(const std::string& fname, const std::string& content) { + std::unique_ptr<FSWritableFile> f; + ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr)); + ASSERT_OK(f->Append(content, IOOptions(), nullptr)); + ASSERT_OK(f->Close(IOOptions(), nullptr)); + } + + void Read(const std::string& fname, const FileOptions& opts, + std::unique_ptr<RandomAccessFileReader>* reader) { + std::string fpath = Path(fname); + std::unique_ptr<FSRandomAccessFile> f; + ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr)); + reader->reset(new RandomAccessFileReader(std::move(f), fpath, + env_->GetSystemClock().get())); + } + + void AssertResult(const std::string& content, + const std::vector<FSReadRequest>& reqs) { + for (const auto& r : reqs) { + ASSERT_OK(r.status); + ASSERT_EQ(r.len, r.result.size()); + ASSERT_EQ(content.substr(r.offset, r.len), r.result.ToString()); + } + } + + FileSystem* fs() { return fs_.get(); } + + private: + Env* env_; + std::shared_ptr<FileSystem> fs_; + std::string test_dir_; + + std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } +}; + +TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) { + std::string fname = "seek-with-block-cache-hit"; + Random rand(0); + std::string content = rand.RandomString(32768); + Write(fname, content); + + FileOptions opts; + std::unique_ptr<RandomAccessFileReader> r; + Read(fname, opts, &r); + + FilePrefetchBuffer fpb(16384, 16384, true, false, false, 0, 0, fs()); + Slice result; + // Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings, + // it will do two reads of 4096+8192 and 8192 + Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 0, 4096, &result); + ASSERT_EQ(s, Status::TryAgain()); + // Simulate a block cache hit + fpb.UpdateReadPattern(0, 4096, false); + // Now read some data that straddles the two prefetch buffers - offset 8192 to + // 16384 + ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), 8192, 8192, + &result, &s, Env::IOPriority::IO_LOW)); +} +#endif // ROCKSDB_LITE +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} |