summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/db_basic_test.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/db_basic_test.cc
parentInitial commit. (diff)
downloadceph-upstream/18.2.2.tar.xz
ceph-upstream/18.2.2.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/db_basic_test.cc')
-rw-r--r--src/rocksdb/db/db_basic_test.cc4643
1 files changed, 4643 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_basic_test.cc b/src/rocksdb/db/db_basic_test.cc
new file mode 100644
index 000000000..a28ac2b88
--- /dev/null
+++ b/src/rocksdb/db/db_basic_test.cc
@@ -0,0 +1,4643 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include <cstring>
+
+#include "db/db_test_util.h"
+#include "options/options_helper.h"
+#include "port/stack_trace.h"
+#include "rocksdb/filter_policy.h"
+#include "rocksdb/flush_block_policy.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/perf_context.h"
+#include "rocksdb/table.h"
+#include "rocksdb/utilities/debug.h"
+#include "table/block_based/block_based_table_reader.h"
+#include "table/block_based/block_builder.h"
+#if !defined(ROCKSDB_LITE)
+#include "test_util/sync_point.h"
+#endif
+#include "util/file_checksum_helper.h"
+#include "util/random.h"
+#include "utilities/counted_fs.h"
+#include "utilities/fault_injection_env.h"
+#include "utilities/merge_operators.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class DBBasicTest : public DBTestBase {
+ public:
+ DBBasicTest() : DBTestBase("db_basic_test", /*env_do_fsync=*/false) {}
+};
+
+TEST_F(DBBasicTest, OpenWhenOpen) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ DB* db2 = nullptr;
+ Status s = DB::Open(options, dbname_, &db2);
+ ASSERT_NOK(s) << [db2]() {
+ delete db2;
+ return "db2 open: ok";
+ }();
+ ASSERT_EQ(Status::Code::kIOError, s.code());
+ ASSERT_EQ(Status::SubCode::kNone, s.subcode());
+ ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
+
+ delete db2;
+}
+
+TEST_F(DBBasicTest, EnableDirectIOWithZeroBuf) {
+ if (!IsDirectIOSupported()) {
+ ROCKSDB_GTEST_BYPASS("Direct IO not supported");
+ return;
+ }
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+ options.use_direct_io_for_flush_and_compaction = true;
+ options.writable_file_max_buffer_size = 0;
+ ASSERT_TRUE(TryReopen(options).IsInvalidArgument());
+
+ options.writable_file_max_buffer_size = 1024;
+ Reopen(options);
+ const std::unordered_map<std::string, std::string> new_db_opts = {
+ {"writable_file_max_buffer_size", "0"}};
+ ASSERT_TRUE(db_->SetDBOptions(new_db_opts).IsInvalidArgument());
+}
+
+TEST_F(DBBasicTest, UniqueSession) {
+ Options options = CurrentOptions();
+ std::string sid1, sid2, sid3, sid4;
+
+ ASSERT_OK(db_->GetDbSessionId(sid1));
+ Reopen(options);
+ ASSERT_OK(db_->GetDbSessionId(sid2));
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_OK(db_->GetDbSessionId(sid4));
+ Reopen(options);
+ ASSERT_OK(db_->GetDbSessionId(sid3));
+
+ ASSERT_NE(sid1, sid2);
+ ASSERT_NE(sid1, sid3);
+ ASSERT_NE(sid2, sid3);
+
+ ASSERT_EQ(sid2, sid4);
+
+ // Expected compact format for session ids (see notes in implementation)
+ TestRegex expected("[0-9A-Z]{20}");
+ EXPECT_MATCHES_REGEX(sid1, expected);
+ EXPECT_MATCHES_REGEX(sid2, expected);
+ EXPECT_MATCHES_REGEX(sid3, expected);
+
+#ifndef ROCKSDB_LITE
+ Close();
+ ASSERT_OK(ReadOnlyReopen(options));
+ ASSERT_OK(db_->GetDbSessionId(sid1));
+ // Test uniqueness between readonly open (sid1) and regular open (sid3)
+ ASSERT_NE(sid1, sid3);
+ Close();
+ ASSERT_OK(ReadOnlyReopen(options));
+ ASSERT_OK(db_->GetDbSessionId(sid2));
+ ASSERT_EQ("v1", Get("foo"));
+ ASSERT_OK(db_->GetDbSessionId(sid3));
+
+ ASSERT_NE(sid1, sid2);
+
+ ASSERT_EQ(sid2, sid3);
+#endif // ROCKSDB_LITE
+
+ CreateAndReopenWithCF({"goku"}, options);
+ ASSERT_OK(db_->GetDbSessionId(sid1));
+ ASSERT_OK(Put("bar", "e1"));
+ ASSERT_OK(db_->GetDbSessionId(sid2));
+ ASSERT_EQ("e1", Get("bar"));
+ ASSERT_OK(db_->GetDbSessionId(sid3));
+ ReopenWithColumnFamilies({"default", "goku"}, options);
+ ASSERT_OK(db_->GetDbSessionId(sid4));
+
+ ASSERT_EQ(sid1, sid2);
+ ASSERT_EQ(sid2, sid3);
+
+ ASSERT_NE(sid1, sid4);
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBBasicTest, ReadOnlyDB) {
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_OK(Put("bar", "v2"));
+ ASSERT_OK(Put("foo", "v3"));
+ Close();
+
+ auto verify_one_iter = [&](Iterator* iter) {
+ int count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ASSERT_OK(iter->status());
+ ++count;
+ }
+ // Always expect two keys: "foo" and "bar"
+ ASSERT_EQ(count, 2);
+ };
+
+ auto verify_all_iters = [&]() {
+ Iterator* iter = db_->NewIterator(ReadOptions());
+ verify_one_iter(iter);
+ delete iter;
+
+ std::vector<Iterator*> iters;
+ ASSERT_OK(db_->NewIterators(ReadOptions(),
+ {dbfull()->DefaultColumnFamily()}, &iters));
+ ASSERT_EQ(static_cast<uint64_t>(1), iters.size());
+ verify_one_iter(iters[0]);
+ delete iters[0];
+ };
+
+ auto options = CurrentOptions();
+ assert(options.env == env_);
+ ASSERT_OK(ReadOnlyReopen(options));
+ ASSERT_EQ("v3", Get("foo"));
+ ASSERT_EQ("v2", Get("bar"));
+ verify_all_iters();
+ Close();
+
+ // Reopen and flush memtable.
+ Reopen(options);
+ ASSERT_OK(Flush());
+ Close();
+ // Now check keys in read only mode.
+ ASSERT_OK(ReadOnlyReopen(options));
+ ASSERT_EQ("v3", Get("foo"));
+ ASSERT_EQ("v2", Get("bar"));
+ verify_all_iters();
+ ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
+}
+
+// TODO akanksha: Update the test to check that combination
+// does not actually write to FS (use open read-only with
+// CompositeEnvWrapper+ReadOnlyFileSystem).
+TEST_F(DBBasicTest, DISABLED_ReadOnlyDBWithWriteDBIdToManifestSet) {
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_OK(Put("bar", "v2"));
+ ASSERT_OK(Put("foo", "v3"));
+ Close();
+
+ auto options = CurrentOptions();
+ options.write_dbid_to_manifest = true;
+ assert(options.env == env_);
+ ASSERT_OK(ReadOnlyReopen(options));
+ std::string db_id1;
+ ASSERT_OK(db_->GetDbIdentity(db_id1));
+ ASSERT_EQ("v3", Get("foo"));
+ ASSERT_EQ("v2", Get("bar"));
+ Iterator* iter = db_->NewIterator(ReadOptions());
+ int count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ASSERT_OK(iter->status());
+ ++count;
+ }
+ ASSERT_EQ(count, 2);
+ delete iter;
+ Close();
+
+ // Reopen and flush memtable.
+ Reopen(options);
+ ASSERT_OK(Flush());
+ Close();
+ // Now check keys in read only mode.
+ ASSERT_OK(ReadOnlyReopen(options));
+ ASSERT_EQ("v3", Get("foo"));
+ ASSERT_EQ("v2", Get("bar"));
+ ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
+ std::string db_id2;
+ ASSERT_OK(db_->GetDbIdentity(db_id2));
+ ASSERT_EQ(db_id1, db_id2);
+}
+
+TEST_F(DBBasicTest, CompactedDB) {
+ const uint64_t kFileSize = 1 << 20;
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.write_buffer_size = kFileSize;
+ options.target_file_size_base = kFileSize;
+ options.max_bytes_for_level_base = 1 << 30;
+ options.compression = kNoCompression;
+ Reopen(options);
+ // 1 L0 file, use CompactedDB if max_open_files = -1
+ ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
+ ASSERT_OK(Flush());
+ Close();
+ ASSERT_OK(ReadOnlyReopen(options));
+ Status s = Put("new", "value");
+ ASSERT_EQ(s.ToString(),
+ "Not implemented: Not supported operation in read only mode.");
+ ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
+ Close();
+ options.max_open_files = -1;
+ ASSERT_OK(ReadOnlyReopen(options));
+ s = Put("new", "value");
+ ASSERT_EQ(s.ToString(),
+ "Not implemented: Not supported in compacted db mode.");
+ ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
+ Close();
+ Reopen(options);
+ // Add more L0 files
+ ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
+ ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("something_not_flushed", "x"));
+ Close();
+
+ ASSERT_OK(ReadOnlyReopen(options));
+ // Fallback to read-only DB
+ s = Put("new", "value");
+ ASSERT_EQ(s.ToString(),
+ "Not implemented: Not supported operation in read only mode.");
+
+ // TODO: validate that other write ops return NotImplemented
+ // (DBImplReadOnly is missing some overrides)
+
+ // Ensure no deadlock on flush triggered by another API function
+ // (Old deadlock bug depends on something_not_flushed above.)
+ std::vector<std::string> files;
+ uint64_t manifest_file_size;
+ ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true));
+ LiveFilesStorageInfoOptions lfsi_opts;
+ lfsi_opts.wal_size_for_flush = 0; // always
+ std::vector<LiveFileStorageInfo> files2;
+ ASSERT_OK(db_->GetLiveFilesStorageInfo(lfsi_opts, &files2));
+
+ Close();
+
+ // Full compaction
+ Reopen(options);
+ // Add more keys
+ ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
+ ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
+ ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
+ ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_EQ(3, NumTableFilesAtLevel(1));
+ Close();
+
+ // CompactedDB
+ ASSERT_OK(ReadOnlyReopen(options));
+ s = Put("new", "value");
+ ASSERT_EQ(s.ToString(),
+ "Not implemented: Not supported in compacted db mode.");
+ ASSERT_EQ("NOT_FOUND", Get("abc"));
+ ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
+ ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
+ ASSERT_EQ("NOT_FOUND", Get("ccc"));
+ ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
+ ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
+ ASSERT_EQ("NOT_FOUND", Get("ggg"));
+ ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
+ ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
+ ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
+ ASSERT_EQ("NOT_FOUND", Get("kkk"));
+
+ // TODO: validate that other write ops return NotImplemented
+ // (CompactedDB is missing some overrides)
+
+ // Ensure no deadlock on flush triggered by another API function
+ ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true));
+ ASSERT_OK(db_->GetLiveFilesStorageInfo(lfsi_opts, &files2));
+
+ // MultiGet
+ std::vector<std::string> values;
+ std::vector<Status> status_list = dbfull()->MultiGet(
+ ReadOptions(),
+ std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
+ Slice("ggg"), Slice("iii"), Slice("kkk")}),
+ &values);
+ ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
+ ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
+ ASSERT_OK(status_list[0]);
+ ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
+ ASSERT_TRUE(status_list[1].IsNotFound());
+ ASSERT_OK(status_list[2]);
+ ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
+ ASSERT_TRUE(status_list[3].IsNotFound());
+ ASSERT_OK(status_list[4]);
+ ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
+ ASSERT_TRUE(status_list[5].IsNotFound());
+
+ Reopen(options);
+ // Add a key
+ ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
+ Close();
+ ASSERT_OK(ReadOnlyReopen(options));
+ s = Put("new", "value");
+ ASSERT_EQ(s.ToString(),
+ "Not implemented: Not supported operation in read only mode.");
+}
+
+TEST_F(DBBasicTest, LevelLimitReopen) {
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ const std::string value(1024 * 1024, ' ');
+ int i = 0;
+ while (NumTableFilesAtLevel(2, 1) == 0) {
+ ASSERT_OK(Put(1, Key(i++), value));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+
+ options.num_levels = 1;
+ options.max_bytes_for_level_multiplier_additional.resize(1, 1);
+ Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
+ ASSERT_EQ(s.IsInvalidArgument(), true);
+ ASSERT_EQ(s.ToString(),
+ "Invalid argument: db has more levels than options.num_levels");
+
+ options.num_levels = 10;
+ options.max_bytes_for_level_multiplier_additional.resize(10, 1);
+ ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
+}
+#endif // ROCKSDB_LITE
+
+TEST_F(DBBasicTest, PutDeleteGet) {
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ ASSERT_OK(Put(1, "foo", "v1"));
+ ASSERT_EQ("v1", Get(1, "foo"));
+ ASSERT_OK(Put(1, "foo", "v2"));
+ ASSERT_EQ("v2", Get(1, "foo"));
+ ASSERT_OK(Delete(1, "foo"));
+ ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
+ } while (ChangeOptions());
+}
+
+TEST_F(DBBasicTest, PutSingleDeleteGet) {
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ ASSERT_OK(Put(1, "foo", "v1"));
+ ASSERT_EQ("v1", Get(1, "foo"));
+ ASSERT_OK(Put(1, "foo2", "v2"));
+ ASSERT_EQ("v2", Get(1, "foo2"));
+ ASSERT_OK(SingleDelete(1, "foo"));
+ ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
+ // Ski FIFO and universal compaction because they do not apply to the test
+ // case. Skip MergePut because single delete does not get removed when it
+ // encounters a merge.
+ } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
+ kSkipMergePut));
+}
+
+TEST_F(DBBasicTest, EmptyFlush) {
+ // It is possible to produce empty flushes when using single deletes. Tests
+ // whether empty flushes cause issues.
+ do {
+ Random rnd(301);
+
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ ASSERT_OK(Put(1, "a", Slice()));
+ ASSERT_OK(SingleDelete(1, "a"));
+ ASSERT_OK(Flush(1));
+
+ ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
+ // Skip FIFO and universal compaction as they do not apply to the test
+ // case. Skip MergePut because merges cannot be combined with single
+ // deletions.
+ } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
+ kSkipMergePut));
+}
+
+TEST_F(DBBasicTest, GetFromVersions) {
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ ASSERT_OK(Put(1, "foo", "v1"));
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ("v1", Get(1, "foo"));
+ ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
+ } while (ChangeOptions());
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBBasicTest, GetSnapshot) {
+ anon::OptionsOverride options_override;
+ options_override.skip_policy = kSkipNoSnapshot;
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
+ // Try with both a short key and a long key
+ for (int i = 0; i < 2; i++) {
+ std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
+ ASSERT_OK(Put(1, key, "v1"));
+ const Snapshot* s1 = db_->GetSnapshot();
+ ASSERT_OK(Put(1, key, "v2"));
+ ASSERT_EQ("v2", Get(1, key));
+ ASSERT_EQ("v1", Get(1, key, s1));
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ("v2", Get(1, key));
+ ASSERT_EQ("v1", Get(1, key, s1));
+ db_->ReleaseSnapshot(s1);
+ }
+ } while (ChangeOptions());
+}
+#endif // ROCKSDB_LITE
+
+TEST_F(DBBasicTest, CheckLock) {
+ do {
+ DB* localdb = nullptr;
+ Options options = CurrentOptions();
+ ASSERT_OK(TryReopen(options));
+
+ // second open should fail
+ Status s = DB::Open(options, dbname_, &localdb);
+ ASSERT_NOK(s) << [localdb]() {
+ delete localdb;
+ return "localdb open: ok";
+ }();
+#ifdef OS_LINUX
+ ASSERT_TRUE(s.ToString().find("lock ") != std::string::npos);
+#endif // OS_LINUX
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBBasicTest, FlushMultipleMemtable) {
+ do {
+ Options options = CurrentOptions();
+ WriteOptions writeOpt = WriteOptions();
+ writeOpt.disableWAL = true;
+ options.max_write_buffer_number = 4;
+ options.min_write_buffer_number_to_merge = 3;
+ options.max_write_buffer_size_to_maintain = -1;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
+
+ ASSERT_EQ("v1", Get(1, "foo"));
+ ASSERT_EQ("v1", Get(1, "bar"));
+ ASSERT_OK(Flush(1));
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
+ // Block flush thread and disable compaction thread
+ env_->SetBackgroundThreads(1, Env::HIGH);
+ env_->SetBackgroundThreads(1, Env::LOW);
+ test::SleepingBackgroundTask sleeping_task_low;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+ Env::Priority::LOW);
+ test::SleepingBackgroundTask sleeping_task_high;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+ &sleeping_task_high, Env::Priority::HIGH);
+
+ Options options = CurrentOptions();
+ // disable compaction
+ options.disable_auto_compactions = true;
+ WriteOptions writeOpt = WriteOptions();
+ writeOpt.disableWAL = true;
+ options.max_write_buffer_number = 2;
+ options.min_write_buffer_number_to_merge = 1;
+ options.max_write_buffer_size_to_maintain =
+ static_cast<int64_t>(options.write_buffer_size);
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ // Compaction can still go through even if no thread can flush the
+ // mem table.
+ ASSERT_OK(Flush(0));
+ ASSERT_OK(Flush(1));
+
+ // Insert can go through
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
+
+ ASSERT_EQ("v1", Get(0, "foo"));
+ ASSERT_EQ("v1", Get(1, "bar"));
+
+ sleeping_task_high.WakeUp();
+ sleeping_task_high.WaitUntilDone();
+
+ // Flush can still go through.
+ ASSERT_OK(Flush(0));
+ ASSERT_OK(Flush(1));
+
+ sleeping_task_low.WakeUp();
+ sleeping_task_low.WaitUntilDone();
+}
+
+TEST_F(DBBasicTest, Flush) {
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ WriteOptions writeOpt = WriteOptions();
+ writeOpt.disableWAL = true;
+ SetPerfLevel(kEnableTime);
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
+ // this will now also flush the last 2 writes
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
+
+ get_perf_context()->Reset();
+ Get(1, "foo");
+ ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
+ ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
+
+ ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
+ ASSERT_EQ("v1", Get(1, "foo"));
+ ASSERT_EQ("v1", Get(1, "bar"));
+
+ writeOpt.disableWAL = true;
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
+ ASSERT_OK(Flush(1));
+
+ ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
+ ASSERT_EQ("v2", Get(1, "bar"));
+ get_perf_context()->Reset();
+ ASSERT_EQ("v2", Get(1, "foo"));
+ ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
+
+ writeOpt.disableWAL = false;
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
+ ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
+ ASSERT_OK(Flush(1));
+
+ ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
+ // 'foo' should be there because its put
+ // has WAL enabled.
+ ASSERT_EQ("v3", Get(1, "foo"));
+ ASSERT_EQ("v3", Get(1, "bar"));
+
+ SetPerfLevel(kDisable);
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBBasicTest, ManifestRollOver) {
+ do {
+ Options options;
+ options.max_manifest_file_size = 10; // 10 bytes
+ options = CurrentOptions(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ {
+ ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
+ ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
+ ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
+ uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
+ ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
+ uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
+ ASSERT_GT(manifest_after_flush, manifest_before_flush);
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
+ // check if a new manifest file got inserted or not.
+ ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
+ ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
+ ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
+ }
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBBasicTest, IdentityAcrossRestarts) {
+ constexpr size_t kMinIdSize = 10;
+ do {
+ for (bool with_manifest : {false, true}) {
+ std::string idfilename = IdentityFileName(dbname_);
+ std::string id1, tmp;
+ ASSERT_OK(db_->GetDbIdentity(id1));
+ ASSERT_GE(id1.size(), kMinIdSize);
+
+ Options options = CurrentOptions();
+ options.write_dbid_to_manifest = with_manifest;
+ Reopen(options);
+ std::string id2;
+ ASSERT_OK(db_->GetDbIdentity(id2));
+ // id2 should match id1 because identity was not regenerated
+ ASSERT_EQ(id1, id2);
+ ASSERT_OK(ReadFileToString(env_, idfilename, &tmp));
+ ASSERT_EQ(tmp, id2);
+
+ // Recover from deleted/missing IDENTITY
+ ASSERT_OK(env_->DeleteFile(idfilename));
+ Reopen(options);
+ std::string id3;
+ ASSERT_OK(db_->GetDbIdentity(id3));
+ if (with_manifest) {
+ // id3 should match id1 because identity was restored from manifest
+ ASSERT_EQ(id1, id3);
+ } else {
+ // id3 should NOT match id1 because identity was regenerated
+ ASSERT_NE(id1, id3);
+ ASSERT_GE(id3.size(), kMinIdSize);
+ }
+ ASSERT_OK(ReadFileToString(env_, idfilename, &tmp));
+ ASSERT_EQ(tmp, id3);
+
+ // Recover from truncated IDENTITY
+ {
+ std::unique_ptr<WritableFile> w;
+ ASSERT_OK(env_->NewWritableFile(idfilename, &w, EnvOptions()));
+ ASSERT_OK(w->Close());
+ }
+ Reopen(options);
+ std::string id4;
+ ASSERT_OK(db_->GetDbIdentity(id4));
+ if (with_manifest) {
+ // id4 should match id1 because identity was restored from manifest
+ ASSERT_EQ(id1, id4);
+ } else {
+ // id4 should NOT match id1 because identity was regenerated
+ ASSERT_NE(id1, id4);
+ ASSERT_GE(id4.size(), kMinIdSize);
+ }
+ ASSERT_OK(ReadFileToString(env_, idfilename, &tmp));
+ ASSERT_EQ(tmp, id4);
+
+ // Recover from overwritten IDENTITY
+ std::string silly_id = "asdf123456789";
+ {
+ std::unique_ptr<WritableFile> w;
+ ASSERT_OK(env_->NewWritableFile(idfilename, &w, EnvOptions()));
+ ASSERT_OK(w->Append(silly_id));
+ ASSERT_OK(w->Close());
+ }
+ Reopen(options);
+ std::string id5;
+ ASSERT_OK(db_->GetDbIdentity(id5));
+ if (with_manifest) {
+ // id4 should match id1 because identity was restored from manifest
+ ASSERT_EQ(id1, id5);
+ } else {
+ ASSERT_EQ(id5, silly_id);
+ }
+ ASSERT_OK(ReadFileToString(env_, idfilename, &tmp));
+ ASSERT_EQ(tmp, id5);
+ }
+ } while (ChangeCompactOptions());
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBBasicTest, Snapshot) {
+ env_->SetMockSleep();
+ anon::OptionsOverride options_override;
+ options_override.skip_policy = kSkipNoSnapshot;
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
+ ASSERT_OK(Put(0, "foo", "0v1"));
+ ASSERT_OK(Put(1, "foo", "1v1"));
+
+ const Snapshot* s1 = db_->GetSnapshot();
+ ASSERT_EQ(1U, GetNumSnapshots());
+ uint64_t time_snap1 = GetTimeOldestSnapshots();
+ ASSERT_GT(time_snap1, 0U);
+ ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
+ ASSERT_OK(Put(0, "foo", "0v2"));
+ ASSERT_OK(Put(1, "foo", "1v2"));
+
+ env_->MockSleepForSeconds(1);
+
+ const Snapshot* s2 = db_->GetSnapshot();
+ ASSERT_EQ(2U, GetNumSnapshots());
+ ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
+ ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
+ ASSERT_OK(Put(0, "foo", "0v3"));
+ ASSERT_OK(Put(1, "foo", "1v3"));
+
+ {
+ ManagedSnapshot s3(db_);
+ ASSERT_EQ(3U, GetNumSnapshots());
+ ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
+ ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
+
+ ASSERT_OK(Put(0, "foo", "0v4"));
+ ASSERT_OK(Put(1, "foo", "1v4"));
+ ASSERT_EQ("0v1", Get(0, "foo", s1));
+ ASSERT_EQ("1v1", Get(1, "foo", s1));
+ ASSERT_EQ("0v2", Get(0, "foo", s2));
+ ASSERT_EQ("1v2", Get(1, "foo", s2));
+ ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
+ ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
+ ASSERT_EQ("0v4", Get(0, "foo"));
+ ASSERT_EQ("1v4", Get(1, "foo"));
+ }
+
+ ASSERT_EQ(2U, GetNumSnapshots());
+ ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
+ ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
+ ASSERT_EQ("0v1", Get(0, "foo", s1));
+ ASSERT_EQ("1v1", Get(1, "foo", s1));
+ ASSERT_EQ("0v2", Get(0, "foo", s2));
+ ASSERT_EQ("1v2", Get(1, "foo", s2));
+ ASSERT_EQ("0v4", Get(0, "foo"));
+ ASSERT_EQ("1v4", Get(1, "foo"));
+
+ db_->ReleaseSnapshot(s1);
+ ASSERT_EQ("0v2", Get(0, "foo", s2));
+ ASSERT_EQ("1v2", Get(1, "foo", s2));
+ ASSERT_EQ("0v4", Get(0, "foo"));
+ ASSERT_EQ("1v4", Get(1, "foo"));
+ ASSERT_EQ(1U, GetNumSnapshots());
+ ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
+ ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
+
+ db_->ReleaseSnapshot(s2);
+ ASSERT_EQ(0U, GetNumSnapshots());
+ ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
+ ASSERT_EQ("0v4", Get(0, "foo"));
+ ASSERT_EQ("1v4", Get(1, "foo"));
+ } while (ChangeOptions());
+}
+
+#endif // ROCKSDB_LITE
+
+class DBBasicMultiConfigs : public DBBasicTest,
+ public ::testing::WithParamInterface<int> {
+ public:
+ DBBasicMultiConfigs() { option_config_ = GetParam(); }
+
+ static std::vector<int> GenerateOptionConfigs() {
+ std::vector<int> option_configs;
+ for (int option_config = kDefault; option_config < kEnd; ++option_config) {
+ if (!ShouldSkipOptions(option_config, kSkipFIFOCompaction)) {
+ option_configs.push_back(option_config);
+ }
+ }
+ return option_configs;
+ }
+};
+
+TEST_P(DBBasicMultiConfigs, CompactBetweenSnapshots) {
+ anon::OptionsOverride options_override;
+ options_override.skip_policy = kSkipNoSnapshot;
+ Options options = CurrentOptions(options_override);
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ Random rnd(301);
+ FillLevels("a", "z", 1);
+
+ ASSERT_OK(Put(1, "foo", "first"));
+ const Snapshot* snapshot1 = db_->GetSnapshot();
+ ASSERT_OK(Put(1, "foo", "second"));
+ ASSERT_OK(Put(1, "foo", "third"));
+ ASSERT_OK(Put(1, "foo", "fourth"));
+ const Snapshot* snapshot2 = db_->GetSnapshot();
+ ASSERT_OK(Put(1, "foo", "fifth"));
+ ASSERT_OK(Put(1, "foo", "sixth"));
+
+ // All entries (including duplicates) exist
+ // before any compaction or flush is triggered.
+ ASSERT_EQ(AllEntriesFor("foo", 1),
+ "[ sixth, fifth, fourth, third, second, first ]");
+ ASSERT_EQ("sixth", Get(1, "foo"));
+ ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
+ ASSERT_EQ("first", Get(1, "foo", snapshot1));
+
+ // After a flush, "second", "third" and "fifth" should
+ // be removed
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
+
+ // after we release the snapshot1, only two values left
+ db_->ReleaseSnapshot(snapshot1);
+ FillLevels("a", "z", 1);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
+ nullptr));
+
+ // We have only one valid snapshot snapshot2. Since snapshot1 is
+ // not valid anymore, "first" should be removed by a compaction.
+ ASSERT_EQ("sixth", Get(1, "foo"));
+ ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
+
+ // after we release the snapshot2, only one value should be left
+ db_->ReleaseSnapshot(snapshot2);
+ FillLevels("a", "z", 1);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
+ nullptr));
+ ASSERT_EQ("sixth", Get(1, "foo"));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
+}
+
+INSTANTIATE_TEST_CASE_P(
+ DBBasicMultiConfigs, DBBasicMultiConfigs,
+ ::testing::ValuesIn(DBBasicMultiConfigs::GenerateOptionConfigs()));
+
+TEST_F(DBBasicTest, DBOpen_Options) {
+ Options options = CurrentOptions();
+ Close();
+ Destroy(options);
+
+ // Does not exist, and create_if_missing == false: error
+ DB* db = nullptr;
+ options.create_if_missing = false;
+ Status s = DB::Open(options, dbname_, &db);
+ ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
+ ASSERT_TRUE(db == nullptr);
+
+ // Does not exist, and create_if_missing == true: OK
+ options.create_if_missing = true;
+ s = DB::Open(options, dbname_, &db);
+ ASSERT_OK(s);
+ ASSERT_TRUE(db != nullptr);
+
+ delete db;
+ db = nullptr;
+
+ // Does exist, and error_if_exists == true: error
+ options.create_if_missing = false;
+ options.error_if_exists = true;
+ s = DB::Open(options, dbname_, &db);
+ ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
+ ASSERT_TRUE(db == nullptr);
+
+ // Does exist, and error_if_exists == false: OK
+ options.create_if_missing = true;
+ options.error_if_exists = false;
+ s = DB::Open(options, dbname_, &db);
+ ASSERT_OK(s);
+ ASSERT_TRUE(db != nullptr);
+
+ delete db;
+ db = nullptr;
+}
+
+TEST_F(DBBasicTest, CompactOnFlush) {
+ anon::OptionsOverride options_override;
+ options_override.skip_policy = kSkipNoSnapshot;
+ do {
+ Options options = CurrentOptions(options_override);
+ options.disable_auto_compactions = true;
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ ASSERT_OK(Put(1, "foo", "v1"));
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
+
+ // Write two new keys
+ ASSERT_OK(Put(1, "a", "begin"));
+ ASSERT_OK(Put(1, "z", "end"));
+ ASSERT_OK(Flush(1));
+
+ // Case1: Delete followed by a put
+ ASSERT_OK(Delete(1, "foo"));
+ ASSERT_OK(Put(1, "foo", "v2"));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
+
+ // After the current memtable is flushed, the DEL should
+ // have been removed
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
+
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
+ nullptr, nullptr));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
+
+ // Case 2: Delete followed by another delete
+ ASSERT_OK(Delete(1, "foo"));
+ ASSERT_OK(Delete(1, "foo"));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
+ nullptr, nullptr));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
+
+ // Case 3: Put followed by a delete
+ ASSERT_OK(Put(1, "foo", "v3"));
+ ASSERT_OK(Delete(1, "foo"));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
+ nullptr, nullptr));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
+
+ // Case 4: Put followed by another Put
+ ASSERT_OK(Put(1, "foo", "v4"));
+ ASSERT_OK(Put(1, "foo", "v5"));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
+ nullptr, nullptr));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
+
+ // clear database
+ ASSERT_OK(Delete(1, "foo"));
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
+ nullptr, nullptr));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
+
+ // Case 5: Put followed by snapshot followed by another Put
+ // Both puts should remain.
+ ASSERT_OK(Put(1, "foo", "v6"));
+ const Snapshot* snapshot = db_->GetSnapshot();
+ ASSERT_OK(Put(1, "foo", "v7"));
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
+ db_->ReleaseSnapshot(snapshot);
+
+ // clear database
+ ASSERT_OK(Delete(1, "foo"));
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
+ nullptr, nullptr));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
+
+ // Case 5: snapshot followed by a put followed by another Put
+ // Only the last put should remain.
+ const Snapshot* snapshot1 = db_->GetSnapshot();
+ ASSERT_OK(Put(1, "foo", "v8"));
+ ASSERT_OK(Put(1, "foo", "v9"));
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
+ db_->ReleaseSnapshot(snapshot1);
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBBasicTest, FlushOneColumnFamily) {
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
+ "alyosha", "popovich"},
+ options);
+
+ ASSERT_OK(Put(0, "Default", "Default"));
+ ASSERT_OK(Put(1, "pikachu", "pikachu"));
+ ASSERT_OK(Put(2, "ilya", "ilya"));
+ ASSERT_OK(Put(3, "muromec", "muromec"));
+ ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
+ ASSERT_OK(Put(5, "nikitich", "nikitich"));
+ ASSERT_OK(Put(6, "alyosha", "alyosha"));
+ ASSERT_OK(Put(7, "popovich", "popovich"));
+
+ for (int i = 0; i < 8; ++i) {
+ ASSERT_OK(Flush(i));
+ auto tables = ListTableFiles(env_, dbname_);
+ ASSERT_EQ(tables.size(), i + 1U);
+ }
+}
+
+TEST_F(DBBasicTest, MultiGetSimple) {
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ SetPerfLevel(kEnableCount);
+ ASSERT_OK(Put(1, "k1", "v1"));
+ ASSERT_OK(Put(1, "k2", "v2"));
+ ASSERT_OK(Put(1, "k3", "v3"));
+ ASSERT_OK(Put(1, "k4", "v4"));
+ ASSERT_OK(Delete(1, "k4"));
+ ASSERT_OK(Put(1, "k5", "v5"));
+ ASSERT_OK(Delete(1, "no_key"));
+
+ std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
+
+ std::vector<std::string> values(20, "Temporary data to be overwritten");
+ std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
+
+ get_perf_context()->Reset();
+ std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
+ ASSERT_EQ(values.size(), keys.size());
+ ASSERT_EQ(values[0], "v1");
+ ASSERT_EQ(values[1], "v2");
+ ASSERT_EQ(values[2], "v3");
+ ASSERT_EQ(values[4], "v5");
+ // four kv pairs * two bytes per value
+ ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
+
+ ASSERT_OK(s[0]);
+ ASSERT_OK(s[1]);
+ ASSERT_OK(s[2]);
+ ASSERT_TRUE(s[3].IsNotFound());
+ ASSERT_OK(s[4]);
+ ASSERT_TRUE(s[5].IsNotFound());
+ SetPerfLevel(kDisable);
+ } while (ChangeCompactOptions());
+}
+
+TEST_F(DBBasicTest, MultiGetEmpty) {
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ // Empty Key Set
+ std::vector<Slice> keys;
+ std::vector<std::string> values;
+ std::vector<ColumnFamilyHandle*> cfs;
+ std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
+ ASSERT_EQ(s.size(), 0U);
+
+ // Empty Database, Empty Key Set
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
+ ASSERT_EQ(s.size(), 0U);
+
+ // Empty Database, Search for Keys
+ keys.resize(2);
+ keys[0] = "a";
+ keys[1] = "b";
+ cfs.push_back(handles_[0]);
+ cfs.push_back(handles_[1]);
+ s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
+ ASSERT_EQ(static_cast<int>(s.size()), 2);
+ ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
+ } while (ChangeCompactOptions());
+}
+
+class DBBlockChecksumTest : public DBBasicTest,
+ public testing::WithParamInterface<uint32_t> {};
+
+INSTANTIATE_TEST_CASE_P(FormatVersions, DBBlockChecksumTest,
+ testing::ValuesIn(test::kFooterFormatVersionsToTest));
+
+TEST_P(DBBlockChecksumTest, BlockChecksumTest) {
+ BlockBasedTableOptions table_options;
+ table_options.format_version = GetParam();
+ Options options = CurrentOptions();
+ const int kNumPerFile = 2;
+
+ const auto algs = GetSupportedChecksums();
+ const int algs_size = static_cast<int>(algs.size());
+
+ // generate one table with each type of checksum
+ for (int i = 0; i < algs_size; ++i) {
+ table_options.checksum = algs[i];
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ Reopen(options);
+ for (int j = 0; j < kNumPerFile; ++j) {
+ ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ // with each valid checksum type setting...
+ for (int i = 0; i < algs_size; ++i) {
+ table_options.checksum = algs[i];
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ Reopen(options);
+ // verify every type of checksum (should be regardless of that setting)
+ for (int j = 0; j < algs_size * kNumPerFile; ++j) {
+ ASSERT_EQ(Key(j), Get(Key(j)));
+ }
+ }
+
+ // Now test invalid checksum type
+ table_options.checksum = static_cast<ChecksumType>(123);
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ ASSERT_TRUE(TryReopen(options).IsInvalidArgument());
+}
+
+// On Windows you can have either memory mapped file or a file
+// with unbuffered access. So this asserts and does not make
+// sense to run
+#ifndef OS_WIN
+TEST_F(DBBasicTest, MmapAndBufferOptions) {
+ if (!IsMemoryMappedAccessSupported()) {
+ return;
+ }
+ Options options = CurrentOptions();
+
+ options.use_direct_reads = true;
+ options.allow_mmap_reads = true;
+ ASSERT_NOK(TryReopen(options));
+
+ // All other combinations are acceptable
+ options.use_direct_reads = false;
+ ASSERT_OK(TryReopen(options));
+
+ if (IsDirectIOSupported()) {
+ options.use_direct_reads = true;
+ options.allow_mmap_reads = false;
+ ASSERT_OK(TryReopen(options));
+ }
+
+ options.use_direct_reads = false;
+ ASSERT_OK(TryReopen(options));
+}
+#endif
+
+class TestEnv : public EnvWrapper {
+ public:
+ explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
+ static const char* kClassName() { return "TestEnv"; }
+ const char* Name() const override { return kClassName(); }
+
+ class TestLogger : public Logger {
+ public:
+ using Logger::Logv;
+ explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
+ ~TestLogger() override {
+ if (!closed_) {
+ CloseHelper().PermitUncheckedError();
+ }
+ }
+ void Logv(const char* /*format*/, va_list /*ap*/) override {}
+
+ protected:
+ Status CloseImpl() override { return CloseHelper(); }
+
+ private:
+ Status CloseHelper() {
+ env->CloseCountInc();
+ ;
+ return Status::IOError();
+ }
+ TestEnv* env;
+ };
+
+ void CloseCountInc() { close_count++; }
+
+ int GetCloseCount() { return close_count; }
+
+ Status NewLogger(const std::string& /*fname*/,
+ std::shared_ptr<Logger>* result) override {
+ result->reset(new TestLogger(this));
+ return Status::OK();
+ }
+
+ private:
+ int close_count;
+};
+
+TEST_F(DBBasicTest, DBClose) {
+ Options options = GetDefaultOptions();
+ std::string dbname = test::PerThreadDBPath("db_close_test");
+ ASSERT_OK(DestroyDB(dbname, options));
+
+ DB* db = nullptr;
+ TestEnv* env = new TestEnv(env_);
+ std::unique_ptr<TestEnv> local_env_guard(env);
+ options.create_if_missing = true;
+ options.env = env;
+ Status s = DB::Open(options, dbname, &db);
+ ASSERT_OK(s);
+ ASSERT_TRUE(db != nullptr);
+
+ s = db->Close();
+ ASSERT_EQ(env->GetCloseCount(), 1);
+ ASSERT_EQ(s, Status::IOError());
+
+ delete db;
+ ASSERT_EQ(env->GetCloseCount(), 1);
+
+ // Do not call DB::Close() and ensure our logger Close() still gets called
+ s = DB::Open(options, dbname, &db);
+ ASSERT_OK(s);
+ ASSERT_TRUE(db != nullptr);
+ delete db;
+ ASSERT_EQ(env->GetCloseCount(), 2);
+
+ // Provide our own logger and ensure DB::Close() does not close it
+ options.info_log.reset(new TestEnv::TestLogger(env));
+ options.create_if_missing = false;
+ s = DB::Open(options, dbname, &db);
+ ASSERT_OK(s);
+ ASSERT_TRUE(db != nullptr);
+
+ s = db->Close();
+ ASSERT_EQ(s, Status::OK());
+ delete db;
+ ASSERT_EQ(env->GetCloseCount(), 2);
+ options.info_log.reset();
+ ASSERT_EQ(env->GetCloseCount(), 3);
+}
+
+TEST_F(DBBasicTest, DBCloseAllDirectoryFDs) {
+ Options options = GetDefaultOptions();
+ std::string dbname = test::PerThreadDBPath("db_close_all_dir_fds_test");
+ // Configure a specific WAL directory
+ options.wal_dir = dbname + "_wal_dir";
+ // Configure 3 different data directories
+ options.db_paths.emplace_back(dbname + "_1", 512 * 1024);
+ options.db_paths.emplace_back(dbname + "_2", 4 * 1024 * 1024);
+ options.db_paths.emplace_back(dbname + "_3", 1024 * 1024 * 1024);
+
+ ASSERT_OK(DestroyDB(dbname, options));
+
+ DB* db = nullptr;
+ std::unique_ptr<Env> env = NewCompositeEnv(
+ std::make_shared<CountedFileSystem>(FileSystem::Default()));
+ options.create_if_missing = true;
+ options.env = env.get();
+ Status s = DB::Open(options, dbname, &db);
+ ASSERT_OK(s);
+ ASSERT_TRUE(db != nullptr);
+
+ // Explicitly close the database to ensure the open and close counter for
+ // directories are equivalent
+ s = db->Close();
+ auto* counted_fs =
+ options.env->GetFileSystem()->CheckedCast<CountedFileSystem>();
+ ASSERT_TRUE(counted_fs != nullptr);
+ ASSERT_EQ(counted_fs->counters()->dir_opens,
+ counted_fs->counters()->dir_closes);
+ ASSERT_OK(s);
+ delete db;
+}
+
+TEST_F(DBBasicTest, DBCloseFlushError) {
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
+ new FaultInjectionTestEnv(env_));
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+ options.manual_wal_flush = true;
+ options.write_buffer_size = 100;
+ options.env = fault_injection_env.get();
+
+ Reopen(options);
+ ASSERT_OK(Put("key1", "value1"));
+ ASSERT_OK(Put("key2", "value2"));
+ ASSERT_OK(dbfull()->TEST_SwitchMemtable());
+ ASSERT_OK(Put("key3", "value3"));
+ fault_injection_env->SetFilesystemActive(false);
+ Status s = dbfull()->Close();
+ ASSERT_NE(s, Status::OK());
+ // retry should return the same error
+ s = dbfull()->Close();
+ ASSERT_NE(s, Status::OK());
+ fault_injection_env->SetFilesystemActive(true);
+ // retry close() is no-op even the system is back. Could be improved if
+ // Close() is retry-able: #9029
+ s = dbfull()->Close();
+ ASSERT_NE(s, Status::OK());
+ Destroy(options);
+}
+
+class DBMultiGetTestWithParam
+ : public DBBasicTest,
+ public testing::WithParamInterface<std::tuple<bool, bool>> {};
+
+TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
+ "alyosha", "popovich"},
+ options);
+ // <CF, key, value> tuples
+ std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
+ static const int num_keys = 24;
+ cf_kv_vec.reserve(num_keys);
+
+ for (int i = 0; i < num_keys; ++i) {
+ int cf = i / 3;
+ int cf_key = 1 % 3;
+ cf_kv_vec.emplace_back(std::make_tuple(
+ cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
+ "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
+ ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
+ std::get<2>(cf_kv_vec[i])));
+ }
+
+ int get_sv_count = 0;
+ ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
+ if (++get_sv_count == 2) {
+ // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
+ // is forced to repeat the process
+ for (int i = 0; i < num_keys; ++i) {
+ int cf = i / 3;
+ int cf_key = i % 8;
+ if (cf_key == 0) {
+ ASSERT_OK(Flush(cf));
+ }
+ ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
+ std::get<2>(cf_kv_vec[i]) + "_2"));
+ }
+ }
+ if (get_sv_count == 11) {
+ for (int i = 0; i < 8; ++i) {
+ auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
+ db->GetColumnFamilyHandle(i))
+ ->cfd();
+ ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
+ }
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<int> cfs;
+ std::vector<std::string> keys;
+ std::vector<std::string> values;
+
+ for (int i = 0; i < num_keys; ++i) {
+ cfs.push_back(std::get<0>(cf_kv_vec[i]));
+ keys.push_back(std::get<1>(cf_kv_vec[i]));
+ }
+
+ values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()),
+ std::get<1>(GetParam()));
+ ASSERT_EQ(values.size(), num_keys);
+ for (unsigned int j = 0; j < values.size(); ++j) {
+ ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
+ }
+
+ keys.clear();
+ cfs.clear();
+ cfs.push_back(std::get<0>(cf_kv_vec[0]));
+ keys.push_back(std::get<1>(cf_kv_vec[0]));
+ cfs.push_back(std::get<0>(cf_kv_vec[3]));
+ keys.push_back(std::get<1>(cf_kv_vec[3]));
+ cfs.push_back(std::get<0>(cf_kv_vec[4]));
+ keys.push_back(std::get<1>(cf_kv_vec[4]));
+ values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()),
+ std::get<1>(GetParam()));
+ ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
+ ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
+ ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
+
+ keys.clear();
+ cfs.clear();
+ cfs.push_back(std::get<0>(cf_kv_vec[7]));
+ keys.push_back(std::get<1>(cf_kv_vec[7]));
+ cfs.push_back(std::get<0>(cf_kv_vec[6]));
+ keys.push_back(std::get<1>(cf_kv_vec[6]));
+ cfs.push_back(std::get<0>(cf_kv_vec[1]));
+ keys.push_back(std::get<1>(cf_kv_vec[1]));
+ values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()),
+ std::get<1>(GetParam()));
+ ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
+ ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
+ ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
+
+ for (int cf = 0; cf < 8; ++cf) {
+ auto* cfd =
+ static_cast_with_check<ColumnFamilyHandleImpl>(
+ static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(cf))
+ ->cfd();
+ ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
+ ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
+ }
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
+ "alyosha", "popovich"},
+ options);
+
+ for (int i = 0; i < 8; ++i) {
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val"));
+ }
+
+ int get_sv_count = 0;
+ int retries = 0;
+ bool last_try = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
+ last_try = true;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
+ if (last_try) {
+ return;
+ }
+ if (++get_sv_count == 2) {
+ ++retries;
+ get_sv_count = 0;
+ for (int i = 0; i < 8; ++i) {
+ ASSERT_OK(Flush(i));
+ ASSERT_OK(Put(
+ i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
+ }
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<int> cfs;
+ std::vector<std::string> keys;
+ std::vector<std::string> values;
+
+ for (int i = 0; i < 8; ++i) {
+ cfs.push_back(i);
+ keys.push_back("cf" + std::to_string(i) + "_key");
+ }
+
+ values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()),
+ std::get<1>(GetParam()));
+ ASSERT_TRUE(last_try);
+ ASSERT_EQ(values.size(), 8);
+ for (unsigned int j = 0; j < values.size(); ++j) {
+ ASSERT_EQ(values[j],
+ "cf" + std::to_string(j) + "_val" + std::to_string(retries));
+ }
+ for (int i = 0; i < 8; ++i) {
+ auto* cfd =
+ static_cast_with_check<ColumnFamilyHandleImpl>(
+ static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(i))
+ ->cfd();
+ ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
+ }
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
+ "alyosha", "popovich"},
+ options);
+
+ for (int i = 0; i < 8; ++i) {
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val"));
+ }
+
+ int get_sv_count = 0;
+ ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
+ if (++get_sv_count == 2) {
+ for (int i = 0; i < 8; ++i) {
+ ASSERT_OK(Flush(i));
+ ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
+ "cf" + std::to_string(i) + "_val2"));
+ }
+ }
+ if (get_sv_count == 8) {
+ for (int i = 0; i < 8; ++i) {
+ auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
+ db->GetColumnFamilyHandle(i))
+ ->cfd();
+ ASSERT_TRUE(
+ (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
+ (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
+ }
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::vector<int> cfs;
+ std::vector<std::string> keys;
+ std::vector<std::string> values;
+
+ for (int i = 0; i < 8; ++i) {
+ cfs.push_back(i);
+ keys.push_back("cf" + std::to_string(i) + "_key");
+ }
+
+ const Snapshot* snapshot = db_->GetSnapshot();
+ values = MultiGet(cfs, keys, snapshot, std::get<0>(GetParam()),
+ std::get<1>(GetParam()));
+ db_->ReleaseSnapshot(snapshot);
+ ASSERT_EQ(values.size(), 8);
+ for (unsigned int j = 0; j < values.size(); ++j) {
+ ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
+ }
+ for (int i = 0; i < 8; ++i) {
+ auto* cfd =
+ static_cast_with_check<ColumnFamilyHandleImpl>(
+ static_cast_with_check<DBImpl>(db_)->GetColumnFamilyHandle(i))
+ ->cfd();
+ ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
+ }
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ Options options = CurrentOptions();
+ CreateAndReopenWithCF({"one", "two"}, options);
+
+ ASSERT_OK(Put(1, "foo", "bar"));
+ ASSERT_OK(Put(2, "baz", "xyz"));
+ ASSERT_OK(Put(1, "abc", "def"));
+
+ // Note: keys for the same CF do not form a consecutive range
+ std::vector<int> cfs{1, 2, 1};
+ std::vector<std::string> keys{"foo", "baz", "abc"};
+ std::vector<std::string> values;
+
+ values = MultiGet(cfs, keys, /* snapshot */ nullptr,
+ /* batched */ std::get<0>(GetParam()),
+ /* async */ std::get<1>(GetParam()));
+
+ ASSERT_EQ(values.size(), 3);
+ ASSERT_EQ(values[0], "bar");
+ ASSERT_EQ(values[1], "xyz");
+ ASSERT_EQ(values[2], "def");
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetBatchedSimpleUnsorted) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ // Skip for unbatched MultiGet
+ if (!std::get<0>(GetParam())) {
+ ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
+ return;
+ }
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ SetPerfLevel(kEnableCount);
+ ASSERT_OK(Put(1, "k1", "v1"));
+ ASSERT_OK(Put(1, "k2", "v2"));
+ ASSERT_OK(Put(1, "k3", "v3"));
+ ASSERT_OK(Put(1, "k4", "v4"));
+ ASSERT_OK(Delete(1, "k4"));
+ ASSERT_OK(Put(1, "k5", "v5"));
+ ASSERT_OK(Delete(1, "no_key"));
+
+ get_perf_context()->Reset();
+
+ std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
+ std::vector<PinnableSlice> values(keys.size());
+ std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
+ std::vector<Status> s(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = std::get<1>(GetParam());
+ db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
+ s.data(), false);
+
+ ASSERT_EQ(values.size(), keys.size());
+ ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
+ ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
+ ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
+ ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
+ // four kv pairs * two bytes per value
+ ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
+
+ ASSERT_TRUE(s[0].IsNotFound());
+ ASSERT_OK(s[1]);
+ ASSERT_TRUE(s[2].IsNotFound());
+ ASSERT_OK(s[3]);
+ ASSERT_OK(s[4]);
+ ASSERT_OK(s[5]);
+
+ SetPerfLevel(kDisable);
+ } while (ChangeCompactOptions());
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetBatchedSortedMultiFile) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ // Skip for unbatched MultiGet
+ if (!std::get<0>(GetParam())) {
+ ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
+ return;
+ }
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ SetPerfLevel(kEnableCount);
+ // To expand the power of this test, generate > 1 table file and
+ // mix with memtable
+ ASSERT_OK(Put(1, "k1", "v1"));
+ ASSERT_OK(Put(1, "k2", "v2"));
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(Put(1, "k3", "v3"));
+ ASSERT_OK(Put(1, "k4", "v4"));
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(Delete(1, "k4"));
+ ASSERT_OK(Put(1, "k5", "v5"));
+ ASSERT_OK(Delete(1, "no_key"));
+
+ get_perf_context()->Reset();
+
+ std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
+ std::vector<PinnableSlice> values(keys.size());
+ std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
+ std::vector<Status> s(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = std::get<1>(GetParam());
+ db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
+ s.data(), true);
+
+ ASSERT_EQ(values.size(), keys.size());
+ ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
+ ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
+ ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
+ ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
+ // four kv pairs * two bytes per value
+ ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
+
+ ASSERT_OK(s[0]);
+ ASSERT_OK(s[1]);
+ ASSERT_OK(s[2]);
+ ASSERT_TRUE(s[3].IsNotFound());
+ ASSERT_OK(s[4]);
+ ASSERT_TRUE(s[5].IsNotFound());
+
+ SetPerfLevel(kDisable);
+ } while (ChangeOptions());
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetBatchedDuplicateKeys) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ // Skip for unbatched MultiGet
+ if (!std::get<0>(GetParam())) {
+ ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
+ return;
+ }
+ Options opts = CurrentOptions();
+ opts.merge_operator = MergeOperators::CreateStringAppendOperator();
+ CreateAndReopenWithCF({"pikachu"}, opts);
+ SetPerfLevel(kEnableCount);
+ // To expand the power of this test, generate > 1 table file and
+ // mix with memtable
+ ASSERT_OK(Merge(1, "k1", "v1"));
+ ASSERT_OK(Merge(1, "k2", "v2"));
+ ASSERT_OK(Flush(1));
+ MoveFilesToLevel(2, 1);
+ ASSERT_OK(Merge(1, "k3", "v3"));
+ ASSERT_OK(Merge(1, "k4", "v4"));
+ ASSERT_OK(Flush(1));
+ MoveFilesToLevel(2, 1);
+ ASSERT_OK(Merge(1, "k4", "v4_2"));
+ ASSERT_OK(Merge(1, "k6", "v6"));
+ ASSERT_OK(Flush(1));
+ MoveFilesToLevel(2, 1);
+ ASSERT_OK(Merge(1, "k7", "v7"));
+ ASSERT_OK(Merge(1, "k8", "v8"));
+ ASSERT_OK(Flush(1));
+ MoveFilesToLevel(2, 1);
+
+ get_perf_context()->Reset();
+
+ std::vector<Slice> keys({"k8", "k8", "k8", "k4", "k4", "k1", "k3"});
+ std::vector<PinnableSlice> values(keys.size());
+ std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
+ std::vector<Status> s(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = std::get<1>(GetParam());
+ db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
+ s.data(), false);
+
+ ASSERT_EQ(values.size(), keys.size());
+ ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v8");
+ ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v8");
+ ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v8");
+ ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v4,v4_2");
+ ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v4,v4_2");
+ ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
+ ASSERT_EQ(std::string(values[6].data(), values[6].size()), "v3");
+ ASSERT_EQ(24, (int)get_perf_context()->multiget_read_bytes);
+
+ for (Status& status : s) {
+ ASSERT_OK(status);
+ }
+
+ SetPerfLevel(kDisable);
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetBatchedMultiLevel) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ // Skip for unbatched MultiGet
+ if (!std::get<0>(GetParam())) {
+ ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
+ return;
+ }
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ Reopen(options);
+ int num_keys = 0;
+
+ for (int i = 0; i < 128; ++i) {
+ ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ MoveFilesToLevel(2);
+
+ for (int i = 0; i < 128; i += 3) {
+ ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ MoveFilesToLevel(1);
+
+ for (int i = 0; i < 128; i += 5) {
+ ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ ASSERT_EQ(0, num_keys);
+
+ for (int i = 0; i < 128; i += 9) {
+ ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
+ }
+
+ std::vector<std::string> keys;
+ std::vector<std::string> values;
+
+ for (int i = 64; i < 80; ++i) {
+ keys.push_back("key_" + std::to_string(i));
+ }
+
+ values = MultiGet(keys, nullptr, std::get<1>(GetParam()));
+ ASSERT_EQ(values.size(), 16);
+ for (unsigned int j = 0; j < values.size(); ++j) {
+ int key = j + 64;
+ if (key % 9 == 0) {
+ ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
+ } else if (key % 5 == 0) {
+ ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
+ } else if (key % 3 == 0) {
+ ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
+ } else {
+ ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
+ }
+ }
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetBatchedMultiLevelMerge) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ // Skip for unbatched MultiGet
+ if (!std::get<0>(GetParam())) {
+ ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
+ return;
+ }
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ BlockBasedTableOptions bbto;
+ bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ Reopen(options);
+ int num_keys = 0;
+
+ for (int i = 0; i < 128; ++i) {
+ ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ MoveFilesToLevel(2);
+
+ for (int i = 0; i < 128; i += 3) {
+ ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ MoveFilesToLevel(1);
+
+ for (int i = 0; i < 128; i += 5) {
+ ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ ASSERT_EQ(0, num_keys);
+
+ for (int i = 0; i < 128; i += 9) {
+ ASSERT_OK(
+ Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
+ }
+
+ std::vector<std::string> keys;
+ std::vector<std::string> values;
+
+ for (int i = 32; i < 80; ++i) {
+ keys.push_back("key_" + std::to_string(i));
+ }
+
+ values = MultiGet(keys, nullptr, std::get<1>(GetParam()));
+ ASSERT_EQ(values.size(), keys.size());
+ for (unsigned int j = 0; j < 48; ++j) {
+ int key = j + 32;
+ std::string value;
+ value.append("val_l2_" + std::to_string(key));
+ if (key % 3 == 0) {
+ value.append(",");
+ value.append("val_l1_" + std::to_string(key));
+ }
+ if (key % 5 == 0) {
+ value.append(",");
+ value.append("val_l0_" + std::to_string(key));
+ }
+ if (key % 9 == 0) {
+ value.append(",");
+ value.append("val_mem_" + std::to_string(key));
+ }
+ ASSERT_EQ(values[j], value);
+ }
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeInMemory) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ // Skip for unbatched MultiGet
+ if (!std::get<0>(GetParam())) {
+ ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
+ return;
+ }
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ SetPerfLevel(kEnableCount);
+ ASSERT_OK(Put(1, "k1", "v_1"));
+ ASSERT_OK(Put(1, "k2", "v_2"));
+ ASSERT_OK(Put(1, "k3", "v_3"));
+ ASSERT_OK(Put(1, "k4", "v_4"));
+ ASSERT_OK(Put(1, "k5", "v_5"));
+ ASSERT_OK(Put(1, "k6", "v_6"));
+ std::vector<Slice> keys = {"k1", "k2", "k3", "k4", "k5", "k6"};
+ std::vector<PinnableSlice> values(keys.size());
+ std::vector<Status> s(keys.size());
+ std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
+
+ get_perf_context()->Reset();
+ ReadOptions ro;
+ ro.value_size_soft_limit = 11;
+ ro.async_io = std::get<1>(GetParam());
+ db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
+ s.data(), false);
+
+ ASSERT_EQ(values.size(), keys.size());
+ for (unsigned int i = 0; i < 4; i++) {
+ ASSERT_EQ(std::string(values[i].data(), values[i].size()),
+ "v_" + std::to_string(i + 1));
+ }
+
+ for (unsigned int i = 4; i < 6; i++) {
+ ASSERT_TRUE(s[i].IsAborted());
+ }
+
+ ASSERT_EQ(12, (int)get_perf_context()->multiget_read_bytes);
+ SetPerfLevel(kDisable);
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSize) {
+#ifndef USE_COROUTINES
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ // Skip for unbatched MultiGet
+ if (!std::get<0>(GetParam())) {
+ return;
+ }
+ do {
+ CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
+ SetPerfLevel(kEnableCount);
+
+ ASSERT_OK(Put(1, "k6", "v6"));
+ ASSERT_OK(Put(1, "k7", "v7_"));
+ ASSERT_OK(Put(1, "k3", "v3_"));
+ ASSERT_OK(Put(1, "k4", "v4"));
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(Delete(1, "k4"));
+ ASSERT_OK(Put(1, "k11", "v11"));
+ ASSERT_OK(Delete(1, "no_key"));
+ ASSERT_OK(Put(1, "k8", "v8_"));
+ ASSERT_OK(Put(1, "k13", "v13"));
+ ASSERT_OK(Put(1, "k14", "v14"));
+ ASSERT_OK(Put(1, "k15", "v15"));
+ ASSERT_OK(Put(1, "k16", "v16"));
+ ASSERT_OK(Put(1, "k17", "v17"));
+ ASSERT_OK(Flush(1));
+
+ ASSERT_OK(Put(1, "k1", "v1_"));
+ ASSERT_OK(Put(1, "k2", "v2_"));
+ ASSERT_OK(Put(1, "k5", "v5_"));
+ ASSERT_OK(Put(1, "k9", "v9_"));
+ ASSERT_OK(Put(1, "k10", "v10"));
+ ASSERT_OK(Delete(1, "k2"));
+ ASSERT_OK(Delete(1, "k6"));
+
+ get_perf_context()->Reset();
+
+ std::vector<Slice> keys({"k1", "k10", "k11", "k12", "k13", "k14", "k15",
+ "k16", "k17", "k2", "k3", "k4", "k5", "k6", "k7",
+ "k8", "k9", "no_key"});
+ std::vector<PinnableSlice> values(keys.size());
+ std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
+ std::vector<Status> s(keys.size());
+
+ ReadOptions ro;
+ ro.value_size_soft_limit = 20;
+ ro.async_io = std::get<1>(GetParam());
+ db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
+ s.data(), false);
+
+ ASSERT_EQ(values.size(), keys.size());
+
+ // In memory keys
+ ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1_");
+ ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v10");
+ ASSERT_TRUE(s[9].IsNotFound()); // k2
+ ASSERT_EQ(std::string(values[12].data(), values[12].size()), "v5_");
+ ASSERT_TRUE(s[13].IsNotFound()); // k6
+ ASSERT_EQ(std::string(values[16].data(), values[16].size()), "v9_");
+
+ // In sst files
+ ASSERT_EQ(std::string(values[2].data(), values[1].size()), "v11");
+ ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v13");
+ ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v14");
+
+ // Remaining aborted after value_size exceeds.
+ ASSERT_TRUE(s[3].IsAborted());
+ ASSERT_TRUE(s[6].IsAborted());
+ ASSERT_TRUE(s[7].IsAborted());
+ ASSERT_TRUE(s[8].IsAborted());
+ ASSERT_TRUE(s[10].IsAborted());
+ ASSERT_TRUE(s[11].IsAborted());
+ ASSERT_TRUE(s[14].IsAborted());
+ ASSERT_TRUE(s[15].IsAborted());
+ ASSERT_TRUE(s[17].IsAborted());
+
+ // 6 kv pairs * 3 bytes per value (i.e. 18)
+ ASSERT_EQ(21, (int)get_perf_context()->multiget_read_bytes);
+ SetPerfLevel(kDisable);
+ } while (ChangeCompactOptions());
+}
+
+TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) {
+ if (std::get<1>(GetParam())) {
+ ROCKSDB_GTEST_BYPASS("This test needs to be fixed for async IO");
+ return;
+ }
+ // Skip for unbatched MultiGet
+ if (!std::get<0>(GetParam())) {
+ ROCKSDB_GTEST_BYPASS("This test is only for batched MultiGet");
+ return;
+ }
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ BlockBasedTableOptions bbto;
+ bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ Reopen(options);
+ int num_keys = 0;
+
+ for (int i = 0; i < 64; ++i) {
+ ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ MoveFilesToLevel(2);
+
+ for (int i = 0; i < 64; i += 3) {
+ ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ MoveFilesToLevel(1);
+
+ for (int i = 0; i < 64; i += 5) {
+ ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ ASSERT_OK(Flush());
+ num_keys = 0;
+ }
+ ASSERT_EQ(0, num_keys);
+
+ for (int i = 0; i < 64; i += 9) {
+ ASSERT_OK(
+ Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
+ }
+
+ std::vector<std::string> keys_str;
+ for (int i = 10; i < 50; ++i) {
+ keys_str.push_back("key_" + std::to_string(i));
+ }
+
+ std::vector<Slice> keys(keys_str.size());
+ for (int i = 0; i < 40; i++) {
+ keys[i] = Slice(keys_str[i]);
+ }
+
+ std::vector<PinnableSlice> values(keys_str.size());
+ std::vector<Status> statuses(keys_str.size());
+ ReadOptions read_options;
+ read_options.verify_checksums = true;
+ read_options.value_size_soft_limit = 380;
+ read_options.async_io = std::get<1>(GetParam());
+ db_->MultiGet(read_options, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+
+ ASSERT_EQ(values.size(), keys.size());
+
+ for (unsigned int j = 0; j < 26; ++j) {
+ int key = j + 10;
+ std::string value;
+ value.append("val_l2_" + std::to_string(key));
+ if (key % 3 == 0) {
+ value.append(",");
+ value.append("val_l1_" + std::to_string(key));
+ }
+ if (key % 5 == 0) {
+ value.append(",");
+ value.append("val_l0_" + std::to_string(key));
+ }
+ if (key % 9 == 0) {
+ value.append(",");
+ value.append("val_mem_" + std::to_string(key));
+ }
+ ASSERT_EQ(values[j], value);
+ ASSERT_OK(statuses[j]);
+ }
+
+ // All remaning keys status is set Status::Abort
+ for (unsigned int j = 26; j < 40; j++) {
+ ASSERT_TRUE(statuses[j].IsAborted());
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
+ testing::Combine(testing::Bool(), testing::Bool()));
+
+#if USE_COROUTINES
+class DBMultiGetAsyncIOTest : public DBBasicTest,
+ public ::testing::WithParamInterface<bool> {
+ public:
+ DBMultiGetAsyncIOTest()
+ : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) {
+ BlockBasedTableOptions bbto;
+ bbto.filter_policy.reset(NewBloomFilterPolicy(10));
+ options_ = CurrentOptions();
+ options_.disable_auto_compactions = true;
+ options_.statistics = statistics_;
+ options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ Reopen(options_);
+ int num_keys = 0;
+
+ // Put all keys in the bottommost level, and overwrite some keys
+ // in L0 and L1
+ for (int i = 0; i < 256; ++i) {
+ EXPECT_OK(Put(Key(i), "val_l2_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ EXPECT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ EXPECT_OK(Flush());
+ num_keys = 0;
+ }
+ MoveFilesToLevel(2);
+
+ for (int i = 0; i < 128; i += 3) {
+ EXPECT_OK(Put(Key(i), "val_l1_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ EXPECT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ EXPECT_OK(Flush());
+ num_keys = 0;
+ }
+ // Put some range deletes in L1
+ for (int i = 128; i < 256; i += 32) {
+ std::string range_begin = Key(i);
+ std::string range_end = Key(i + 16);
+ EXPECT_OK(dbfull()->DeleteRange(WriteOptions(),
+ dbfull()->DefaultColumnFamily(),
+ range_begin, range_end));
+ // Also do some Puts to force creation of bloom filter
+ for (int j = i + 16; j < i + 32; ++j) {
+ if (j % 3 == 0) {
+ EXPECT_OK(Put(Key(j), "val_l1_" + std::to_string(j)));
+ }
+ }
+ EXPECT_OK(Flush());
+ }
+ MoveFilesToLevel(1);
+
+ for (int i = 0; i < 128; i += 5) {
+ EXPECT_OK(Put(Key(i), "val_l0_" + std::to_string(i)));
+ num_keys++;
+ if (num_keys == 8) {
+ EXPECT_OK(Flush());
+ num_keys = 0;
+ }
+ }
+ if (num_keys > 0) {
+ EXPECT_OK(Flush());
+ num_keys = 0;
+ }
+ EXPECT_EQ(0, num_keys);
+ }
+
+ const std::shared_ptr<Statistics>& statistics() { return statistics_; }
+
+ protected:
+ void ReopenDB() { Reopen(options_); }
+
+ private:
+ std::shared_ptr<Statistics> statistics_;
+ Options options_;
+};
+
+TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
+ // All 3 keys in L0. The L0 files should be read serially.
+ std::vector<std::string> key_strs{Key(0), Key(40), Key(80)};
+ std::vector<Slice> keys{key_strs[0], key_strs[1], key_strs[2]};
+ std::vector<PinnableSlice> values(key_strs.size());
+ std::vector<Status> statuses(key_strs.size());
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values.size(), 3);
+ ASSERT_OK(statuses[0]);
+ ASSERT_OK(statuses[1]);
+ ASSERT_OK(statuses[2]);
+ ASSERT_EQ(values[0], "val_l0_" + std::to_string(0));
+ ASSERT_EQ(values[1], "val_l0_" + std::to_string(40));
+ ASSERT_EQ(values[2], "val_l0_" + std::to_string(80));
+
+ HistogramData multiget_io_batch_size;
+
+ statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+ // With async IO, lookups will happen in parallel for each key
+ if (GetParam()) {
+ ASSERT_EQ(multiget_io_batch_size.count, 1);
+ ASSERT_EQ(multiget_io_batch_size.max, 3);
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
+ } else {
+ // Without Async IO, MultiGet will call MultiRead 3 times, once for each
+ // L0 file
+ ASSERT_EQ(multiget_io_batch_size.count, 3);
+ }
+}
+
+TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ key_strs.push_back(Key(33));
+ key_strs.push_back(Key(54));
+ key_strs.push_back(Key(102));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ keys.push_back(key_strs[2]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values.size(), 3);
+ ASSERT_EQ(statuses[0], Status::OK());
+ ASSERT_EQ(statuses[1], Status::OK());
+ ASSERT_EQ(statuses[2], Status::OK());
+ ASSERT_EQ(values[0], "val_l1_" + std::to_string(33));
+ ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
+ ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
+
+ HistogramData multiget_io_batch_size;
+
+ statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+ // A batch of 3 async IOs is expected, one for each overlapping file in L1
+ ASSERT_EQ(multiget_io_batch_size.count, 1);
+ ASSERT_EQ(multiget_io_batch_size.max, 3);
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
+}
+
+TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ key_strs.push_back(Key(33));
+ key_strs.push_back(Key(54));
+ key_strs.push_back(Key(102));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ keys.push_back(key_strs[2]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
+ static int count = 0;
+ count++;
+ // Fail the last table reader open, which is the 6th SST file
+ // since 3 overlapping L0 files + 3 L1 files containing the keys
+ if (count == 6) {
+ Status* s = static_cast<Status*>(status);
+ *s = Status::IOError();
+ }
+ });
+ // DB open will create table readers unless we reduce the table cache
+ // capacity.
+ // SanitizeOptions will set max_open_files to minimum of 20. Table cache
+ // is allocated with max_open_files - 10 as capacity. So override
+ // max_open_files to 11 so table cache capacity will become 1. This will
+ // prevent file open during DB open and force the file to be opened
+ // during MultiGet
+ SyncPoint::GetInstance()->SetCallBack(
+ "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
+ int* max_open_files = (int*)arg;
+ *max_open_files = 11;
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ ReopenDB();
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ SyncPoint::GetInstance()->DisableProcessing();
+ ASSERT_EQ(values.size(), 3);
+ ASSERT_EQ(statuses[0], Status::OK());
+ ASSERT_EQ(statuses[1], Status::OK());
+ ASSERT_EQ(statuses[2], Status::IOError());
+
+ HistogramData multiget_io_batch_size;
+
+ statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+ // A batch of 3 async IOs is expected, one for each overlapping file in L1
+ ASSERT_EQ(multiget_io_batch_size.count, 1);
+ ASSERT_EQ(multiget_io_batch_size.max, 2);
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
+}
+
+TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ // 21 is the last key in the first L1 file
+ key_strs.push_back(Key(21));
+ key_strs.push_back(Key(54));
+ key_strs.push_back(Key(102));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ keys.push_back(key_strs[2]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values.size(), 3);
+ ASSERT_EQ(statuses[0], Status::OK());
+ ASSERT_EQ(statuses[1], Status::OK());
+ ASSERT_EQ(statuses[2], Status::OK());
+ ASSERT_EQ(values[0], "val_l1_" + std::to_string(21));
+ ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
+ ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
+
+ HistogramData multiget_io_batch_size;
+
+ statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+ // Since the first MultiGet key is the last key in a file, the MultiGet is
+ // expected to lookup in that file first, before moving on to other files.
+ // So the first file lookup will issue one async read, and the next lookup
+ // will lookup 2 files in parallel and issue 2 async reads
+ ASSERT_EQ(multiget_io_batch_size.count, 2);
+ ASSERT_EQ(multiget_io_batch_size.max, 2);
+}
+
+TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ // 33 and 102 are in L1, and 56 is in L2
+ key_strs.push_back(Key(33));
+ key_strs.push_back(Key(56));
+ key_strs.push_back(Key(102));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ keys.push_back(key_strs[2]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values.size(), 3);
+ ASSERT_EQ(statuses[0], Status::OK());
+ ASSERT_EQ(statuses[1], Status::OK());
+ ASSERT_EQ(statuses[2], Status::OK());
+ ASSERT_EQ(values[0], "val_l1_" + std::to_string(33));
+ ASSERT_EQ(values[1], "val_l2_" + std::to_string(56));
+ ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
+
+ HistogramData multiget_io_batch_size;
+
+ statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
+
+ // There are 2 keys in L1 in twp separate files, and 1 in L2. With
+ // optimize_multiget_for_io, all three lookups will happen in parallel.
+ // Otherwise, the L2 lookup will happen after L1.
+ ASSERT_EQ(multiget_io_batch_size.count, GetParam() ? 1 : 2);
+ ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2);
+}
+
+TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ // 19 and 26 are in L2, but overlap with L0 and L1 file ranges
+ key_strs.push_back(Key(19));
+ key_strs.push_back(Key(26));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values.size(), 2);
+ ASSERT_EQ(statuses[0], Status::OK());
+ ASSERT_EQ(statuses[1], Status::OK());
+ ASSERT_EQ(values[0], "val_l2_" + std::to_string(19));
+ ASSERT_EQ(values[1], "val_l2_" + std::to_string(26));
+
+ // Bloom filters in L0/L1 will avoid the coroutine calls in those levels
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
+}
+
+TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ // 139 and 163 are in L2, but overlap with a range deletes in L1
+ key_strs.push_back(Key(139));
+ key_strs.push_back(Key(163));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values.size(), 2);
+ ASSERT_EQ(statuses[0], Status::NotFound());
+ ASSERT_EQ(statuses[1], Status::NotFound());
+
+ // Bloom filters in L0/L1 will avoid the coroutine calls in those levels
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
+}
+
+TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) {
+ std::vector<std::string> key_strs;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values;
+ std::vector<Status> statuses;
+
+ // 139 and 163 are in L2, but overlap with a range deletes in L1
+ key_strs.push_back(Key(139));
+ key_strs.push_back(Key(144));
+ key_strs.push_back(Key(163));
+ keys.push_back(key_strs[0]);
+ keys.push_back(key_strs[1]);
+ keys.push_back(key_strs[2]);
+ values.resize(keys.size());
+ statuses.resize(keys.size());
+
+ ReadOptions ro;
+ ro.async_io = true;
+ ro.optimize_multiget_for_io = GetParam();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data());
+ ASSERT_EQ(values.size(), keys.size());
+ ASSERT_EQ(statuses[0], Status::NotFound());
+ ASSERT_EQ(statuses[1], Status::OK());
+ ASSERT_EQ(values[1], "val_l1_" + std::to_string(144));
+ ASSERT_EQ(statuses[2], Status::NotFound());
+
+ // Bloom filters in L0/L1 will avoid the coroutine calls in those levels
+ ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
+}
+
+INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest,
+ testing::Bool());
+#endif // USE_COROUTINES
+
+TEST_F(DBBasicTest, MultiGetStats) {
+ Options options;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.env = env_;
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ BlockBasedTableOptions table_options;
+ table_options.block_size = 1;
+ table_options.index_type =
+ BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+ table_options.partition_filters = true;
+ table_options.no_block_cache = true;
+ table_options.cache_index_and_filter_blocks = false;
+ table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ int total_keys = 2000;
+ std::vector<std::string> keys_str(total_keys);
+ std::vector<Slice> keys(total_keys);
+ static size_t kMultiGetBatchSize = 100;
+ std::vector<PinnableSlice> values(kMultiGetBatchSize);
+ std::vector<Status> s(kMultiGetBatchSize);
+ ReadOptions read_opts;
+
+ Random rnd(309);
+ // Create Multiple SST files at multiple levels.
+ for (int i = 0; i < 500; ++i) {
+ keys_str[i] = "k" + std::to_string(i);
+ keys[i] = Slice(keys_str[i]);
+ ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
+ if (i % 100 == 0) {
+ ASSERT_OK(Flush(1));
+ }
+ }
+ ASSERT_OK(Flush(1));
+ MoveFilesToLevel(2, 1);
+
+ for (int i = 501; i < 1000; ++i) {
+ keys_str[i] = "k" + std::to_string(i);
+ keys[i] = Slice(keys_str[i]);
+ ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
+ if (i % 100 == 0) {
+ ASSERT_OK(Flush(1));
+ }
+ }
+
+ ASSERT_OK(Flush(1));
+ MoveFilesToLevel(2, 1);
+
+ for (int i = 1001; i < total_keys; ++i) {
+ keys_str[i] = "k" + std::to_string(i);
+ keys[i] = Slice(keys_str[i]);
+ ASSERT_OK(Put(1, "k" + std::to_string(i), rnd.RandomString(1000)));
+ if (i % 100 == 0) {
+ ASSERT_OK(Flush(1));
+ }
+ }
+ ASSERT_OK(Flush(1));
+ MoveFilesToLevel(1, 1);
+ Close();
+
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ ASSERT_OK(options.statistics->Reset());
+
+ db_->MultiGet(read_opts, handles_[1], kMultiGetBatchSize, &keys[1250],
+ values.data(), s.data(), false);
+
+ ASSERT_EQ(values.size(), kMultiGetBatchSize);
+ HistogramData hist_level;
+ HistogramData hist_index_and_filter_blocks;
+ HistogramData hist_sst;
+
+ options.statistics->histogramData(NUM_LEVEL_READ_PER_MULTIGET, &hist_level);
+ options.statistics->histogramData(NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
+ &hist_index_and_filter_blocks);
+ options.statistics->histogramData(NUM_SST_READ_PER_LEVEL, &hist_sst);
+
+ // Maximum number of blocks read from a file system in a level.
+ ASSERT_EQ(hist_level.max, 1);
+ ASSERT_GT(hist_index_and_filter_blocks.max, 0);
+ // Maximum number of sst files read from file system in a level.
+ ASSERT_EQ(hist_sst.max, 2);
+
+ // Minimun number of blocks read in a level.
+ ASSERT_EQ(hist_level.min, 1);
+ ASSERT_GT(hist_index_and_filter_blocks.min, 0);
+ // Minimun number of sst files read in a level.
+ ASSERT_EQ(hist_sst.min, 1);
+
+ for (PinnableSlice& value : values) {
+ value.Reset();
+ }
+ for (Status& status : s) {
+ status = Status::OK();
+ }
+ db_->MultiGet(read_opts, handles_[1], kMultiGetBatchSize, &keys[950],
+ values.data(), s.data(), false);
+ options.statistics->histogramData(NUM_LEVEL_READ_PER_MULTIGET, &hist_level);
+ ASSERT_EQ(hist_level.max, 2);
+}
+
+// Test class for batched MultiGet with prefix extractor
+// Param bool - If true, use partitioned filters
+// If false, use full filter block
+class MultiGetPrefixExtractorTest : public DBBasicTest,
+ public ::testing::WithParamInterface<bool> {
+};
+
+TEST_P(MultiGetPrefixExtractorTest, Batched) {
+ Options options = CurrentOptions();
+ options.prefix_extractor.reset(NewFixedPrefixTransform(2));
+ options.memtable_prefix_bloom_size_ratio = 10;
+ BlockBasedTableOptions bbto;
+ if (GetParam()) {
+ bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+ bbto.partition_filters = true;
+ }
+ bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
+ bbto.whole_key_filtering = false;
+ bbto.cache_index_and_filter_blocks = false;
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ Reopen(options);
+
+ SetPerfLevel(kEnableCount);
+ get_perf_context()->Reset();
+
+ ASSERT_OK(Put("k", "v0"));
+ ASSERT_OK(Put("kk1", "v1"));
+ ASSERT_OK(Put("kk2", "v2"));
+ ASSERT_OK(Put("kk3", "v3"));
+ ASSERT_OK(Put("kk4", "v4"));
+ std::vector<std::string> keys(
+ {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
+ std::vector<std::string> expected(
+ {"v0", "v1", "v2", "v3", "v4", "NOT_FOUND", "NOT_FOUND"});
+ std::vector<std::string> values;
+ values = MultiGet(keys, nullptr);
+ ASSERT_EQ(values, expected);
+ // One key ("k") is not queried against the filter because it is outside
+ // the prefix_extractor domain, leaving 6 keys with queried prefixes.
+ ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
+ ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 4);
+ ASSERT_OK(Flush());
+
+ get_perf_context()->Reset();
+ values = MultiGet(keys, nullptr);
+ ASSERT_EQ(values, expected);
+ ASSERT_EQ(get_perf_context()->bloom_sst_miss_count, 2);
+ ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
+
+ // Also check Get stat
+ get_perf_context()->Reset();
+ for (size_t i = 0; i < keys.size(); ++i) {
+ values[i] = Get(keys[i]);
+ }
+ ASSERT_EQ(values, expected);
+ ASSERT_EQ(get_perf_context()->bloom_sst_miss_count, 2);
+ ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
+}
+
+INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
+ ::testing::Bool());
+
+#ifndef ROCKSDB_LITE
+class DBMultiGetRowCacheTest : public DBBasicTest,
+ public ::testing::WithParamInterface<bool> {};
+
+TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
+ do {
+ option_config_ = kRowCache;
+ Options options = CurrentOptions();
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ CreateAndReopenWithCF({"pikachu"}, options);
+ SetPerfLevel(kEnableCount);
+ ASSERT_OK(Put(1, "k1", "v1"));
+ ASSERT_OK(Put(1, "k2", "v2"));
+ ASSERT_OK(Put(1, "k3", "v3"));
+ ASSERT_OK(Put(1, "k4", "v4"));
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(Put(1, "k5", "v5"));
+ const Snapshot* snap1 = dbfull()->GetSnapshot();
+ ASSERT_OK(Delete(1, "k4"));
+ ASSERT_OK(Flush(1));
+ const Snapshot* snap2 = dbfull()->GetSnapshot();
+
+ get_perf_context()->Reset();
+
+ std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
+ std::vector<PinnableSlice> values(keys.size());
+ std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
+ std::vector<Status> s(keys.size());
+
+ ReadOptions ro;
+ bool use_snapshots = GetParam();
+ if (use_snapshots) {
+ ro.snapshot = snap2;
+ }
+ db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
+ s.data(), false);
+
+ ASSERT_EQ(values.size(), keys.size());
+ ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
+ ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
+ ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
+ // four kv pairs * two bytes per value
+ ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
+
+ ASSERT_TRUE(s[0].IsNotFound());
+ ASSERT_OK(s[1]);
+ ASSERT_TRUE(s[2].IsNotFound());
+ ASSERT_OK(s[3]);
+ ASSERT_OK(s[4]);
+
+ // Call MultiGet() again with some intersection with the previous set of
+ // keys. Those should already be in the row cache.
+ keys.assign({"no_key", "k5", "k3", "k2"});
+ for (size_t i = 0; i < keys.size(); ++i) {
+ values[i].Reset();
+ s[i] = Status::OK();
+ }
+ get_perf_context()->Reset();
+
+ if (use_snapshots) {
+ ro.snapshot = snap1;
+ }
+ db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
+ values.data(), s.data(), false);
+
+ ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
+ ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
+ ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
+ // four kv pairs * two bytes per value
+ ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
+
+ ASSERT_TRUE(s[0].IsNotFound());
+ ASSERT_OK(s[1]);
+ ASSERT_OK(s[2]);
+ ASSERT_OK(s[3]);
+ if (use_snapshots) {
+ // Only reads from the first SST file would have been cached, since
+ // snapshot seq no is > fd.largest_seqno
+ ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
+ } else {
+ ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
+ }
+
+ SetPerfLevel(kDisable);
+ dbfull()->ReleaseSnapshot(snap1);
+ dbfull()->ReleaseSnapshot(snap2);
+ } while (ChangeCompactOptions());
+}
+
+INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
+ testing::Values(true, false));
+
+TEST_F(DBBasicTest, GetAllKeyVersions) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ CreateAndReopenWithCF({"pikachu"}, options);
+ ASSERT_EQ(2, handles_.size());
+ const size_t kNumInserts = 4;
+ const size_t kNumDeletes = 4;
+ const size_t kNumUpdates = 4;
+
+ // Check default column family
+ for (size_t i = 0; i != kNumInserts; ++i) {
+ ASSERT_OK(Put(std::to_string(i), "value"));
+ }
+ for (size_t i = 0; i != kNumUpdates; ++i) {
+ ASSERT_OK(Put(std::to_string(i), "value1"));
+ }
+ for (size_t i = 0; i != kNumDeletes; ++i) {
+ ASSERT_OK(Delete(std::to_string(i)));
+ }
+ std::vector<KeyVersion> key_versions;
+ ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(),
+ std::numeric_limits<size_t>::max(),
+ &key_versions));
+ ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
+ for (size_t i = 0; i < kNumInserts + kNumDeletes + kNumUpdates; i++) {
+ if (i % 3 == 0) {
+ ASSERT_EQ(key_versions[i].GetTypeName(), "TypeDeletion");
+ } else {
+ ASSERT_EQ(key_versions[i].GetTypeName(), "TypeValue");
+ }
+ }
+ ASSERT_OK(GetAllKeyVersions(db_, handles_[0], Slice(), Slice(),
+ std::numeric_limits<size_t>::max(),
+ &key_versions));
+ ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
+
+ // Check non-default column family
+ for (size_t i = 0; i + 1 != kNumInserts; ++i) {
+ ASSERT_OK(Put(1, std::to_string(i), "value"));
+ }
+ for (size_t i = 0; i + 1 != kNumUpdates; ++i) {
+ ASSERT_OK(Put(1, std::to_string(i), "value1"));
+ }
+ for (size_t i = 0; i + 1 != kNumDeletes; ++i) {
+ ASSERT_OK(Delete(1, std::to_string(i)));
+ }
+ ASSERT_OK(GetAllKeyVersions(db_, handles_[1], Slice(), Slice(),
+ std::numeric_limits<size_t>::max(),
+ &key_versions));
+ ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
+}
+
+TEST_F(DBBasicTest, ValueTypeString) {
+ KeyVersion key_version;
+ // when adding new type, please also update `value_type_string_map`
+ for (unsigned char i = ValueType::kTypeDeletion; i < ValueType::kTypeMaxValid;
+ i++) {
+ key_version.type = i;
+ ASSERT_TRUE(key_version.GetTypeName() != "Invalid");
+ }
+}
+#endif // !ROCKSDB_LITE
+
+TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
+ Options options = CurrentOptions();
+ Random rnd(301);
+ BlockBasedTableOptions table_options;
+ table_options.pin_l0_filter_and_index_blocks_in_cache = true;
+ table_options.block_size = 16 * 1024;
+ ASSERT_TRUE(table_options.block_size >
+ BlockBasedTable::kMultiGetReadStackBufSize);
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ Reopen(options);
+
+ std::string zero_str(128, '\0');
+ for (int i = 0; i < 100; ++i) {
+ // Make the value compressible. A purely random string doesn't compress
+ // and the resultant data block will not be compressed
+ std::string value(rnd.RandomString(128) + zero_str);
+ assert(Put(Key(i), value) == Status::OK());
+ }
+ ASSERT_OK(Flush());
+
+ std::vector<std::string> key_data(10);
+ std::vector<Slice> keys;
+ // We cannot resize a PinnableSlice vector, so just set initial size to
+ // largest we think we will need
+ std::vector<PinnableSlice> values(10);
+ std::vector<Status> statuses;
+ ReadOptions ro;
+
+ // Warm up the cache first
+ key_data.emplace_back(Key(0));
+ keys.emplace_back(Slice(key_data.back()));
+ key_data.emplace_back(Key(50));
+ keys.emplace_back(Slice(key_data.back()));
+ statuses.resize(keys.size());
+
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+}
+
+TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu", "eevee"}, options);
+ size_t num_cfs = handles_.size();
+ ASSERT_EQ(3, num_cfs);
+ WriteOptions write_opts;
+ write_opts.disableWAL = true;
+ for (size_t cf = 0; cf != num_cfs; ++cf) {
+ for (size_t i = 0; i != 10000; ++i) {
+ std::string key_str = Key(static_cast<int>(i));
+ std::string value_str = std::to_string(cf) + "_" + std::to_string(i);
+
+ ASSERT_OK(Put(static_cast<int>(cf), key_str, value_str));
+ if (0 == (i % 1000)) {
+ ASSERT_OK(Flush(static_cast<int>(cf)));
+ }
+ }
+ }
+ for (size_t cf = 0; cf != num_cfs; ++cf) {
+ ASSERT_OK(Flush(static_cast<int>(cf)));
+ }
+ Close();
+ options.best_efforts_recovery = true;
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
+ options);
+ num_cfs = handles_.size();
+ ASSERT_EQ(3, num_cfs);
+ for (size_t cf = 0; cf != num_cfs; ++cf) {
+ for (int i = 0; i != 10000; ++i) {
+ std::string key_str = Key(static_cast<int>(i));
+ std::string expected_value_str =
+ std::to_string(cf) + "_" + std::to_string(i);
+ ASSERT_EQ(expected_value_str, Get(static_cast<int>(cf), key_str));
+ }
+ }
+}
+
+TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("foo", "value"));
+ ASSERT_OK(Flush());
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
+ ASSERT_NE(nullptr, arg);
+ *(reinterpret_cast<Status*>(arg)) =
+ Status::Corruption("Inject corruption");
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ options.best_efforts_recovery = true;
+ Status s = TryReopen(options);
+ ASSERT_TRUE(s.IsCorruption());
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+#ifndef ROCKSDB_LITE
+namespace {
+class TableFileListener : public EventListener {
+ public:
+ void OnTableFileCreated(const TableFileCreationInfo& info) override {
+ InstrumentedMutexLock lock(&mutex_);
+ cf_to_paths_[info.cf_name].push_back(info.file_path);
+ }
+ std::vector<std::string>& GetFiles(const std::string& cf_name) {
+ InstrumentedMutexLock lock(&mutex_);
+ return cf_to_paths_[cf_name];
+ }
+
+ private:
+ InstrumentedMutex mutex_;
+ std::unordered_map<std::string, std::vector<std::string>> cf_to_paths_;
+};
+} // anonymous namespace
+
+TEST_F(DBBasicTest, LastSstFileNotInManifest) {
+ // If the last sst file is not tracked in MANIFEST,
+ // or the VersionEdit for the last sst file is not synced,
+ // on recovery, the last sst file should be deleted,
+ // and new sst files shouldn't reuse its file number.
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ Close();
+
+ // Manually add a sst file.
+ constexpr uint64_t kSstFileNumber = 100;
+ const std::string kSstFile = MakeTableFileName(dbname_, kSstFileNumber);
+ ASSERT_OK(WriteStringToFile(env_, /* data = */ "bad sst file content",
+ /* fname = */ kSstFile,
+ /* should_sync = */ true));
+ ASSERT_OK(env_->FileExists(kSstFile));
+
+ TableFileListener* listener = new TableFileListener();
+ options.listeners.emplace_back(listener);
+ Reopen(options);
+ // kSstFile should already be deleted.
+ ASSERT_TRUE(env_->FileExists(kSstFile).IsNotFound());
+
+ ASSERT_OK(Put("k", "v"));
+ ASSERT_OK(Flush());
+ // New sst file should have file number > kSstFileNumber.
+ std::vector<std::string>& files =
+ listener->GetFiles(kDefaultColumnFamilyName);
+ ASSERT_EQ(files.size(), 1);
+ const std::string fname = files[0].erase(0, (dbname_ + "/").size());
+ uint64_t number = 0;
+ FileType type = kTableFile;
+ ASSERT_TRUE(ParseFileName(fname, &number, &type));
+ ASSERT_EQ(type, kTableFile);
+ ASSERT_GT(number, kSstFileNumber);
+}
+
+TEST_F(DBBasicTest, RecoverWithMissingFiles) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ TableFileListener* listener = new TableFileListener();
+ // Disable auto compaction to simplify SST file name tracking.
+ options.disable_auto_compactions = true;
+ options.listeners.emplace_back(listener);
+ CreateAndReopenWithCF({"pikachu", "eevee"}, options);
+ std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
+ "eevee"};
+ size_t num_cfs = handles_.size();
+ ASSERT_EQ(3, num_cfs);
+ for (size_t cf = 0; cf != num_cfs; ++cf) {
+ ASSERT_OK(Put(static_cast<int>(cf), "a", "0_value"));
+ ASSERT_OK(Flush(static_cast<int>(cf)));
+ ASSERT_OK(Put(static_cast<int>(cf), "b", "0_value"));
+ ASSERT_OK(Flush(static_cast<int>(cf)));
+ ASSERT_OK(Put(static_cast<int>(cf), "c", "0_value"));
+ ASSERT_OK(Flush(static_cast<int>(cf)));
+ }
+
+ // Delete and corrupt files
+ for (size_t i = 0; i < all_cf_names.size(); ++i) {
+ std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
+ ASSERT_EQ(3, files.size());
+ std::string corrupted_data;
+ ASSERT_OK(ReadFileToString(env_, files[files.size() - 1], &corrupted_data));
+ ASSERT_OK(WriteStringToFile(
+ env_, corrupted_data.substr(0, corrupted_data.size() - 2),
+ files[files.size() - 1], /*should_sync=*/true));
+ for (int j = static_cast<int>(files.size() - 2); j >= static_cast<int>(i);
+ --j) {
+ ASSERT_OK(env_->DeleteFile(files[j]));
+ }
+ }
+ options.best_efforts_recovery = true;
+ ReopenWithColumnFamilies(all_cf_names, options);
+ // Verify data
+ ReadOptions read_opts;
+ read_opts.total_order_seek = true;
+ {
+ std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
+ iter->SeekToFirst();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ iter.reset(db_->NewIterator(read_opts, handles_[1]));
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a", iter->key());
+ iter->Next();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ iter.reset(db_->NewIterator(read_opts, handles_[2]));
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a", iter->key());
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("b", iter->key());
+ iter->Next();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ }
+}
+
+TEST_F(DBBasicTest, BestEffortsRecoveryTryMultipleManifests) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("foo", "value0"));
+ ASSERT_OK(Flush());
+ Close();
+ {
+ // Hack by adding a new MANIFEST with high file number
+ std::string garbage(10, '\0');
+ ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/MANIFEST-001000",
+ /*should_sync=*/true));
+ }
+ {
+ // Hack by adding a corrupted SST not referenced by any MANIFEST
+ std::string garbage(10, '\0');
+ ASSERT_OK(WriteStringToFile(env_, garbage, dbname_ + "/001001.sst",
+ /*should_sync=*/true));
+ }
+
+ options.best_efforts_recovery = true;
+
+ Reopen(options);
+ ASSERT_OK(Put("bar", "value"));
+}
+
+TEST_F(DBBasicTest, RecoverWithNoCurrentFile) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ options.best_efforts_recovery = true;
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
+ ASSERT_EQ(2, handles_.size());
+ ASSERT_OK(Put("foo", "value"));
+ ASSERT_OK(Put(1, "bar", "value"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Flush(1));
+ Close();
+ ASSERT_OK(env_->DeleteFile(CurrentFileName(dbname_)));
+ ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
+ std::vector<std::string> cf_names;
+ ASSERT_OK(DB::ListColumnFamilies(DBOptions(options), dbname_, &cf_names));
+ ASSERT_EQ(2, cf_names.size());
+ for (const auto& name : cf_names) {
+ ASSERT_TRUE(name == kDefaultColumnFamilyName || name == "pikachu");
+ }
+}
+
+TEST_F(DBBasicTest, RecoverWithNoManifest) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("foo", "value"));
+ ASSERT_OK(Flush());
+ Close();
+ {
+ // Delete all MANIFEST.
+ std::vector<std::string> files;
+ ASSERT_OK(env_->GetChildren(dbname_, &files));
+ for (const auto& file : files) {
+ uint64_t number = 0;
+ FileType type = kWalFile;
+ if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
+ ASSERT_OK(env_->DeleteFile(dbname_ + "/" + file));
+ }
+ }
+ }
+ options.best_efforts_recovery = true;
+ options.create_if_missing = false;
+ Status s = TryReopen(options);
+ ASSERT_TRUE(s.IsInvalidArgument());
+ options.create_if_missing = true;
+ Reopen(options);
+ // Since no MANIFEST exists, best-efforts recovery creates a new, empty db.
+ ASSERT_EQ("NOT_FOUND", Get("foo"));
+}
+
+TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ TableFileListener* listener = new TableFileListener();
+ options.listeners.emplace_back(listener);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ std::vector<std::string> kAllCfNames = {kDefaultColumnFamilyName, "pikachu"};
+ size_t num_cfs = handles_.size();
+ ASSERT_EQ(2, num_cfs);
+ for (int cf = 0; cf < static_cast<int>(kAllCfNames.size()); ++cf) {
+ ASSERT_OK(Put(cf, "a", "0_value"));
+ ASSERT_OK(Flush(cf));
+ ASSERT_OK(Put(cf, "b", "0_value"));
+ }
+ // Delete files
+ for (size_t i = 0; i < kAllCfNames.size(); ++i) {
+ std::vector<std::string>& files = listener->GetFiles(kAllCfNames[i]);
+ ASSERT_EQ(1, files.size());
+ for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
+ --j) {
+ ASSERT_OK(env_->DeleteFile(files[j]));
+ }
+ }
+ options.best_efforts_recovery = true;
+ ReopenWithColumnFamilies(kAllCfNames, options);
+ // Verify WAL is not applied
+ ReadOptions read_opts;
+ read_opts.total_order_seek = true;
+ std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
+ iter->SeekToFirst();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ iter.reset(db_->NewIterator(read_opts, handles_[1]));
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a", iter->key());
+ iter->Next();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+}
+
+TEST_F(DBBasicTest, DisableTrackWal) {
+ // If WAL tracking was enabled, and then disabled during reopen,
+ // the previously tracked WALs should be removed from MANIFEST.
+
+ Options options = CurrentOptions();
+ options.track_and_verify_wals_in_manifest = true;
+ // extremely small write buffer size,
+ // so that new WALs are created more frequently.
+ options.write_buffer_size = 100;
+ options.env = env_;
+ DestroyAndReopen(options);
+ for (int i = 0; i < 100; i++) {
+ ASSERT_OK(Put("foo" + std::to_string(i), "value" + std::to_string(i)));
+ }
+ ASSERT_OK(dbfull()->TEST_SwitchMemtable());
+ ASSERT_OK(db_->SyncWAL());
+ // Some WALs are tracked.
+ ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
+ Close();
+
+ // Disable WAL tracking.
+ options.track_and_verify_wals_in_manifest = false;
+ options.create_if_missing = false;
+ ASSERT_OK(TryReopen(options));
+ // Previously tracked WALs are cleared.
+ ASSERT_TRUE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
+ Close();
+
+ // Re-enable WAL tracking again.
+ options.track_and_verify_wals_in_manifest = true;
+ options.create_if_missing = false;
+ ASSERT_OK(TryReopen(options));
+ ASSERT_TRUE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
+ Close();
+}
+#endif // !ROCKSDB_LITE
+
+TEST_F(DBBasicTest, ManifestChecksumMismatch) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("bar", "value"));
+ ASSERT_OK(Flush());
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
+ auto* crc = reinterpret_cast<uint32_t*>(arg);
+ *crc = *crc + 1;
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ WriteOptions write_opts;
+ write_opts.disableWAL = true;
+ Status s = db_->Put(write_opts, "foo", "value");
+ ASSERT_OK(s);
+ ASSERT_OK(Flush());
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ ASSERT_OK(Put("foo", "value1"));
+ ASSERT_OK(Flush());
+ s = TryReopen(options);
+ ASSERT_TRUE(s.IsCorruption());
+}
+
+TEST_F(DBBasicTest, ConcurrentlyCloseDB) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ std::vector<std::thread> workers;
+ for (int i = 0; i < 10; i++) {
+ workers.push_back(std::thread([&]() {
+ auto s = db_->Close();
+ ASSERT_OK(s);
+ }));
+ }
+ for (auto& w : workers) {
+ w.join();
+ }
+}
+
+#ifndef ROCKSDB_LITE
+class DBBasicTestTrackWal : public DBTestBase,
+ public testing::WithParamInterface<bool> {
+ public:
+ DBBasicTestTrackWal()
+ : DBTestBase("db_basic_test_track_wal", /*env_do_fsync=*/false) {}
+
+ int CountWalFiles() {
+ VectorLogPtr log_files;
+ EXPECT_OK(dbfull()->GetSortedWalFiles(log_files));
+ return static_cast<int>(log_files.size());
+ };
+};
+
+TEST_P(DBBasicTestTrackWal, DoNotTrackObsoleteWal) {
+ // If a WAL becomes obsolete after flushing, but is not deleted from disk yet,
+ // then if SyncWAL is called afterwards, the obsolete WAL should not be
+ // tracked in MANIFEST.
+
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ options.track_and_verify_wals_in_manifest = true;
+ options.atomic_flush = GetParam();
+
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"cf"}, options);
+ ASSERT_EQ(handles_.size(), 2); // default, cf
+ // Do not delete WALs.
+ ASSERT_OK(db_->DisableFileDeletions());
+ constexpr int n = 10;
+ std::vector<std::unique_ptr<LogFile>> wals(n);
+ for (size_t i = 0; i < n; i++) {
+ // Generate a new WAL for each key-value.
+ const int cf = i % 2;
+ ASSERT_OK(db_->GetCurrentWalFile(&wals[i]));
+ ASSERT_OK(Put(cf, "k" + std::to_string(i), "v" + std::to_string(i)));
+ ASSERT_OK(Flush({0, 1}));
+ }
+ ASSERT_EQ(CountWalFiles(), n);
+ // Since all WALs are obsolete, no WAL should be tracked in MANIFEST.
+ ASSERT_OK(db_->SyncWAL());
+
+ // Manually delete all WALs.
+ Close();
+ for (const auto& wal : wals) {
+ ASSERT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
+ }
+
+ // If SyncWAL tracks the obsolete WALs in MANIFEST,
+ // reopen will fail because the WALs are missing from disk.
+ ASSERT_OK(TryReopenWithColumnFamilies({"default", "cf"}, options));
+ Destroy(options);
+}
+
+INSTANTIATE_TEST_CASE_P(DBBasicTestTrackWal, DBBasicTestTrackWal,
+ testing::Bool());
+#endif // ROCKSDB_LITE
+
+class DBBasicTestMultiGet : public DBTestBase {
+ public:
+ DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
+ bool uncompressed_cache, bool _compression_enabled,
+ bool _fill_cache, uint32_t compression_parallel_threads)
+ : DBTestBase(test_dir, /*env_do_fsync=*/false) {
+ compression_enabled_ = _compression_enabled;
+ fill_cache_ = _fill_cache;
+
+ if (compressed_cache) {
+ std::shared_ptr<Cache> cache = NewLRUCache(1048576);
+ compressed_cache_ = std::make_shared<MyBlockCache>(cache);
+ }
+ if (uncompressed_cache) {
+ std::shared_ptr<Cache> cache = NewLRUCache(1048576);
+ uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
+ }
+
+ env_->count_random_reads_ = true;
+
+ Options options = CurrentOptions();
+ Random rnd(301);
+ BlockBasedTableOptions table_options;
+
+#ifndef ROCKSDB_LITE
+ if (compression_enabled_) {
+ std::vector<CompressionType> compression_types;
+ compression_types = GetSupportedCompressions();
+ // Not every platform may have compression libraries available, so
+ // dynamically pick based on what's available
+ CompressionType tmp_type = kNoCompression;
+ for (auto c_type : compression_types) {
+ if (c_type != kNoCompression) {
+ tmp_type = c_type;
+ break;
+ }
+ }
+ if (tmp_type != kNoCompression) {
+ options.compression = tmp_type;
+ } else {
+ compression_enabled_ = false;
+ }
+ }
+#else
+ // GetSupportedCompressions() is not available in LITE build
+ if (!Snappy_Supported()) {
+ compression_enabled_ = false;
+ }
+#endif // ROCKSDB_LITE
+
+ table_options.block_cache = uncompressed_cache_;
+ if (table_options.block_cache == nullptr) {
+ table_options.no_block_cache = true;
+ } else {
+ table_options.pin_l0_filter_and_index_blocks_in_cache = true;
+ }
+ table_options.block_cache_compressed = compressed_cache_;
+ table_options.flush_block_policy_factory.reset(
+ new MyFlushBlockPolicyFactory());
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ if (!compression_enabled_) {
+ options.compression = kNoCompression;
+ } else {
+ options.compression_opts.parallel_threads = compression_parallel_threads;
+ }
+ options_ = options;
+ Reopen(options);
+
+ if (num_cfs > 1) {
+ for (int cf = 0; cf < num_cfs; ++cf) {
+ cf_names_.emplace_back("cf" + std::to_string(cf));
+ }
+ CreateColumnFamilies(cf_names_, options);
+ cf_names_.emplace_back("default");
+ }
+
+ std::string zero_str(128, '\0');
+ for (int cf = 0; cf < num_cfs; ++cf) {
+ for (int i = 0; i < 100; ++i) {
+ // Make the value compressible. A purely random string doesn't compress
+ // and the resultant data block will not be compressed
+ values_.emplace_back(rnd.RandomString(128) + zero_str);
+ assert(((num_cfs == 1) ? Put(Key(i), values_[i])
+ : Put(cf, Key(i), values_[i])) == Status::OK());
+ }
+ if (num_cfs == 1) {
+ EXPECT_OK(Flush());
+ } else {
+ EXPECT_OK(dbfull()->Flush(FlushOptions(), handles_[cf]));
+ }
+
+ for (int i = 0; i < 100; ++i) {
+ // block cannot gain space by compression
+ uncompressable_values_.emplace_back(rnd.RandomString(256) + '\0');
+ std::string tmp_key = "a" + Key(i);
+ assert(((num_cfs == 1) ? Put(tmp_key, uncompressable_values_[i])
+ : Put(cf, tmp_key, uncompressable_values_[i])) ==
+ Status::OK());
+ }
+ if (num_cfs == 1) {
+ EXPECT_OK(Flush());
+ } else {
+ EXPECT_OK(dbfull()->Flush(FlushOptions(), handles_[cf]));
+ }
+ }
+ // Clear compressed cache, which is always pre-populated
+ if (compressed_cache_) {
+ compressed_cache_->SetCapacity(0);
+ compressed_cache_->SetCapacity(1048576);
+ }
+ }
+
+ bool CheckValue(int i, const std::string& value) {
+ if (values_[i].compare(value) == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ bool CheckUncompressableValue(int i, const std::string& value) {
+ if (uncompressable_values_[i].compare(value) == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ const std::vector<std::string>& GetCFNames() const { return cf_names_; }
+
+ int num_lookups() { return uncompressed_cache_->num_lookups(); }
+ int num_found() { return uncompressed_cache_->num_found(); }
+ int num_inserts() { return uncompressed_cache_->num_inserts(); }
+
+ int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
+ int num_found_compressed() { return compressed_cache_->num_found(); }
+ int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
+
+ bool fill_cache() { return fill_cache_; }
+ bool compression_enabled() { return compression_enabled_; }
+ bool has_compressed_cache() { return compressed_cache_ != nullptr; }
+ bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
+ Options get_options() { return options_; }
+
+ static void SetUpTestCase() {}
+ static void TearDownTestCase() {}
+
+ protected:
+ class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
+ public:
+ MyFlushBlockPolicyFactory() {}
+
+ virtual const char* Name() const override {
+ return "MyFlushBlockPolicyFactory";
+ }
+
+ virtual FlushBlockPolicy* NewFlushBlockPolicy(
+ const BlockBasedTableOptions& /*table_options*/,
+ const BlockBuilder& data_block_builder) const override {
+ return new MyFlushBlockPolicy(data_block_builder);
+ }
+ };
+
+ class MyFlushBlockPolicy : public FlushBlockPolicy {
+ public:
+ explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
+ : num_keys_(0), data_block_builder_(data_block_builder) {}
+
+ bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
+ if (data_block_builder_.empty()) {
+ // First key in this block
+ num_keys_ = 1;
+ return false;
+ }
+ // Flush every 10 keys
+ if (num_keys_ == 10) {
+ num_keys_ = 1;
+ return true;
+ }
+ num_keys_++;
+ return false;
+ }
+
+ private:
+ int num_keys_;
+ const BlockBuilder& data_block_builder_;
+ };
+
+ class MyBlockCache : public CacheWrapper {
+ public:
+ explicit MyBlockCache(std::shared_ptr<Cache> target)
+ : CacheWrapper(target),
+ num_lookups_(0),
+ num_found_(0),
+ num_inserts_(0) {}
+
+ const char* Name() const override { return "MyBlockCache"; }
+
+ using Cache::Insert;
+ Status Insert(const Slice& key, void* value, size_t charge,
+ void (*deleter)(const Slice& key, void* value),
+ Handle** handle = nullptr,
+ Priority priority = Priority::LOW) override {
+ num_inserts_++;
+ return target_->Insert(key, value, charge, deleter, handle, priority);
+ }
+
+ using Cache::Lookup;
+ Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override {
+ num_lookups_++;
+ Handle* handle = target_->Lookup(key, stats);
+ if (handle != nullptr) {
+ num_found_++;
+ }
+ return handle;
+ }
+ int num_lookups() { return num_lookups_; }
+
+ int num_found() { return num_found_; }
+
+ int num_inserts() { return num_inserts_; }
+
+ private:
+ int num_lookups_;
+ int num_found_;
+ int num_inserts_;
+ };
+
+ std::shared_ptr<MyBlockCache> compressed_cache_;
+ std::shared_ptr<MyBlockCache> uncompressed_cache_;
+ Options options_;
+ bool compression_enabled_;
+ std::vector<std::string> values_;
+ std::vector<std::string> uncompressable_values_;
+ bool fill_cache_;
+ std::vector<std::string> cf_names_;
+};
+
+class DBBasicTestWithParallelIO
+ : public DBBasicTestMultiGet,
+ public testing::WithParamInterface<
+ std::tuple<bool, bool, bool, bool, uint32_t>> {
+ public:
+ DBBasicTestWithParallelIO()
+ : DBBasicTestMultiGet("/db_basic_test_with_parallel_io", 1,
+ std::get<0>(GetParam()), std::get<1>(GetParam()),
+ std::get<2>(GetParam()), std::get<3>(GetParam()),
+ std::get<4>(GetParam())) {}
+};
+
+TEST_P(DBBasicTestWithParallelIO, MultiGet) {
+ std::vector<std::string> key_data(10);
+ std::vector<Slice> keys;
+ // We cannot resize a PinnableSlice vector, so just set initial size to
+ // largest we think we will need
+ std::vector<PinnableSlice> values(10);
+ std::vector<Status> statuses;
+ ReadOptions ro;
+ ro.fill_cache = fill_cache();
+
+ // Warm up the cache first
+ key_data.emplace_back(Key(0));
+ keys.emplace_back(Slice(key_data.back()));
+ key_data.emplace_back(Key(50));
+ keys.emplace_back(Slice(key_data.back()));
+ statuses.resize(keys.size());
+
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ ASSERT_TRUE(CheckValue(0, values[0].ToString()));
+ ASSERT_TRUE(CheckValue(50, values[1].ToString()));
+
+ int random_reads = env_->random_read_counter_.Read();
+ key_data[0] = Key(1);
+ key_data[1] = Key(51);
+ keys[0] = Slice(key_data[0]);
+ keys[1] = Slice(key_data[1]);
+ values[0].Reset();
+ values[1].Reset();
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ ASSERT_TRUE(CheckValue(1, values[0].ToString()));
+ ASSERT_TRUE(CheckValue(51, values[1].ToString()));
+
+ bool read_from_cache = false;
+ if (fill_cache()) {
+ if (has_uncompressed_cache()) {
+ read_from_cache = true;
+ } else if (has_compressed_cache() && compression_enabled()) {
+ read_from_cache = true;
+ }
+ }
+
+ int expected_reads = random_reads + (read_from_cache ? 0 : 2);
+ ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
+
+ keys.resize(10);
+ statuses.resize(10);
+ std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
+ for (size_t i = 0; i < key_ints.size(); ++i) {
+ key_data[i] = Key(key_ints[i]);
+ keys[i] = Slice(key_data[i]);
+ statuses[i] = Status::OK();
+ values[i].Reset();
+ }
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ for (size_t i = 0; i < key_ints.size(); ++i) {
+ ASSERT_OK(statuses[i]);
+ ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
+ }
+ if (compression_enabled() && !has_compressed_cache()) {
+ expected_reads += (read_from_cache ? 2 : 3);
+ } else {
+ expected_reads += (read_from_cache ? 2 : 4);
+ }
+ ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
+
+ keys.resize(10);
+ statuses.resize(10);
+ std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
+ for (size_t i = 0; i < key_uncmp.size(); ++i) {
+ key_data[i] = "a" + Key(key_uncmp[i]);
+ keys[i] = Slice(key_data[i]);
+ statuses[i] = Status::OK();
+ values[i].Reset();
+ }
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ for (size_t i = 0; i < key_uncmp.size(); ++i) {
+ ASSERT_OK(statuses[i]);
+ ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
+ }
+ if (compression_enabled() && !has_compressed_cache()) {
+ expected_reads += (read_from_cache ? 3 : 3);
+ } else {
+ expected_reads += (read_from_cache ? 4 : 4);
+ }
+ ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
+
+ keys.resize(5);
+ statuses.resize(5);
+ std::vector<int> key_tr{1, 2, 15, 16, 55};
+ for (size_t i = 0; i < key_tr.size(); ++i) {
+ key_data[i] = "a" + Key(key_tr[i]);
+ keys[i] = Slice(key_data[i]);
+ statuses[i] = Status::OK();
+ values[i].Reset();
+ }
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ for (size_t i = 0; i < key_tr.size(); ++i) {
+ ASSERT_OK(statuses[i]);
+ ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
+ }
+ if (compression_enabled() && !has_compressed_cache()) {
+ expected_reads += (read_from_cache ? 0 : 2);
+ ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
+ } else {
+ if (has_uncompressed_cache()) {
+ expected_reads += (read_from_cache ? 0 : 3);
+ ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
+ } else {
+ // A rare case, even we enable the block compression but some of data
+ // blocks are not compressed due to content. If user only enable the
+ // compressed cache, the uncompressed blocks will not tbe cached, and
+ // block reads will be triggered. The number of reads is related to
+ // the compression algorithm.
+ ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
+ }
+ }
+}
+
+#ifndef ROCKSDB_LITE
+TEST_P(DBBasicTestWithParallelIO, MultiGetDirectIO) {
+ class FakeDirectIOEnv : public EnvWrapper {
+ class FakeDirectIOSequentialFile;
+ class FakeDirectIORandomAccessFile;
+
+ public:
+ FakeDirectIOEnv(Env* env) : EnvWrapper(env) {}
+ static const char* kClassName() { return "FakeDirectIOEnv"; }
+ const char* Name() const override { return kClassName(); }
+
+ Status NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& options) override {
+ std::unique_ptr<RandomAccessFile> file;
+ assert(options.use_direct_reads);
+ EnvOptions opts = options;
+ opts.use_direct_reads = false;
+ Status s = target()->NewRandomAccessFile(fname, &file, opts);
+ if (!s.ok()) {
+ return s;
+ }
+ result->reset(new FakeDirectIORandomAccessFile(std::move(file)));
+ return s;
+ }
+
+ private:
+ class FakeDirectIOSequentialFile : public SequentialFileWrapper {
+ public:
+ FakeDirectIOSequentialFile(std::unique_ptr<SequentialFile>&& file)
+ : SequentialFileWrapper(file.get()), file_(std::move(file)) {}
+ ~FakeDirectIOSequentialFile() {}
+
+ bool use_direct_io() const override { return true; }
+ size_t GetRequiredBufferAlignment() const override { return 1; }
+
+ private:
+ std::unique_ptr<SequentialFile> file_;
+ };
+
+ class FakeDirectIORandomAccessFile : public RandomAccessFileWrapper {
+ public:
+ FakeDirectIORandomAccessFile(std::unique_ptr<RandomAccessFile>&& file)
+ : RandomAccessFileWrapper(file.get()), file_(std::move(file)) {}
+ ~FakeDirectIORandomAccessFile() {}
+
+ bool use_direct_io() const override { return true; }
+ size_t GetRequiredBufferAlignment() const override { return 1; }
+
+ private:
+ std::unique_ptr<RandomAccessFile> file_;
+ };
+ };
+
+ std::unique_ptr<FakeDirectIOEnv> env(new FakeDirectIOEnv(env_));
+ Options opts = get_options();
+ opts.env = env.get();
+ opts.use_direct_reads = true;
+ Reopen(opts);
+
+ std::vector<std::string> key_data(10);
+ std::vector<Slice> keys;
+ // We cannot resize a PinnableSlice vector, so just set initial size to
+ // largest we think we will need
+ std::vector<PinnableSlice> values(10);
+ std::vector<Status> statuses;
+ ReadOptions ro;
+ ro.fill_cache = fill_cache();
+
+ // Warm up the cache first
+ key_data.emplace_back(Key(0));
+ keys.emplace_back(Slice(key_data.back()));
+ key_data.emplace_back(Key(50));
+ keys.emplace_back(Slice(key_data.back()));
+ statuses.resize(keys.size());
+
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ ASSERT_TRUE(CheckValue(0, values[0].ToString()));
+ ASSERT_TRUE(CheckValue(50, values[1].ToString()));
+
+ int random_reads = env_->random_read_counter_.Read();
+ key_data[0] = Key(1);
+ key_data[1] = Key(51);
+ keys[0] = Slice(key_data[0]);
+ keys[1] = Slice(key_data[1]);
+ values[0].Reset();
+ values[1].Reset();
+ if (uncompressed_cache_) {
+ uncompressed_cache_->SetCapacity(0);
+ uncompressed_cache_->SetCapacity(1048576);
+ }
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ ASSERT_TRUE(CheckValue(1, values[0].ToString()));
+ ASSERT_TRUE(CheckValue(51, values[1].ToString()));
+
+ bool read_from_cache = false;
+ if (fill_cache()) {
+ if (has_uncompressed_cache()) {
+ read_from_cache = true;
+ } else if (has_compressed_cache() && compression_enabled()) {
+ read_from_cache = true;
+ }
+ }
+
+ int expected_reads = random_reads;
+ if (!compression_enabled() || !has_compressed_cache()) {
+ expected_reads += 2;
+ } else {
+ expected_reads += (read_from_cache ? 0 : 2);
+ }
+ if (env_->random_read_counter_.Read() != expected_reads) {
+ ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
+ }
+ Close();
+}
+#endif // ROCKSDB_LITE
+
+TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
+ std::vector<std::string> key_data(10);
+ std::vector<Slice> keys;
+ // We cannot resize a PinnableSlice vector, so just set initial size to
+ // largest we think we will need
+ std::vector<PinnableSlice> values(10);
+ std::vector<Status> statuses;
+ int read_count = 0;
+ ReadOptions ro;
+ ro.fill_cache = fill_cache();
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) {
+ Status* s = static_cast<Status*>(status);
+ read_count++;
+ if (read_count == 2) {
+ *s = Status::Corruption();
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // Warm up the cache first
+ key_data.emplace_back(Key(0));
+ keys.emplace_back(Slice(key_data.back()));
+ key_data.emplace_back(Key(50));
+ keys.emplace_back(Slice(key_data.back()));
+ statuses.resize(keys.size());
+
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ ASSERT_TRUE(CheckValue(0, values[0].ToString()));
+ // ASSERT_TRUE(CheckValue(50, values[1].ToString()));
+ ASSERT_EQ(statuses[0], Status::OK());
+ ASSERT_EQ(statuses[1], Status::Corruption());
+
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
+ std::vector<std::string> key_data(10);
+ std::vector<Slice> keys;
+ // We cannot resize a PinnableSlice vector, so just set initial size to
+ // largest we think we will need
+ std::vector<PinnableSlice> values(10);
+ std::vector<Status> statuses;
+ ReadOptions ro;
+ ro.fill_cache = fill_cache();
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "TableCache::MultiGet:FindTable", [&](void* status) {
+ Status* s = static_cast<Status*>(status);
+ *s = Status::IOError();
+ });
+ // DB open will create table readers unless we reduce the table cache
+ // capacity.
+ // SanitizeOptions will set max_open_files to minimum of 20. Table cache
+ // is allocated with max_open_files - 10 as capacity. So override
+ // max_open_files to 11 so table cache capacity will become 1. This will
+ // prevent file open during DB open and force the file to be opened
+ // during MultiGet
+ SyncPoint::GetInstance()->SetCallBack(
+ "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
+ int* max_open_files = (int*)arg;
+ *max_open_files = 11;
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Reopen(CurrentOptions());
+
+ // Warm up the cache first
+ key_data.emplace_back(Key(0));
+ keys.emplace_back(Slice(key_data.back()));
+ key_data.emplace_back(Key(50));
+ keys.emplace_back(Slice(key_data.back()));
+ statuses.resize(keys.size());
+
+ dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
+ keys.data(), values.data(), statuses.data(), true);
+ ASSERT_EQ(statuses[0], Status::IOError());
+ ASSERT_EQ(statuses[1], Status::IOError());
+
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
+ // Params are as follows -
+ // Param 0 - Compressed cache enabled
+ // Param 1 - Uncompressed cache enabled
+ // Param 2 - Data compression enabled
+ // Param 3 - ReadOptions::fill_cache
+ // Param 4 - CompressionOptions::parallel_threads
+ ::testing::Combine(::testing::Bool(), ::testing::Bool(),
+ ::testing::Bool(), ::testing::Bool(),
+ ::testing::Values(1, 4)));
+
+// Forward declaration
+class DeadlineFS;
+
+class DeadlineRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
+ public:
+ DeadlineRandomAccessFile(DeadlineFS& fs,
+ std::unique_ptr<FSRandomAccessFile>& file)
+ : FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {}
+
+ IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
+ Slice* result, char* scratch,
+ IODebugContext* dbg) const override;
+
+ IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
+ const IOOptions& options, IODebugContext* dbg) override;
+
+ IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
+ std::function<void(const FSReadRequest&, void*)> cb,
+ void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
+ IODebugContext* dbg) override;
+
+ private:
+ DeadlineFS& fs_;
+ std::unique_ptr<FSRandomAccessFile> file_;
+};
+
+class DeadlineFS : public FileSystemWrapper {
+ public:
+ // The error_on_delay parameter specifies whether a IOStatus::TimedOut()
+ // status should be returned after delaying the IO to exceed the timeout,
+ // or to simply delay but return success anyway. The latter mimics the
+ // behavior of PosixFileSystem, which does not enforce any timeout
+ explicit DeadlineFS(SpecialEnv* env, bool error_on_delay)
+ : FileSystemWrapper(env->GetFileSystem()),
+ deadline_(std::chrono::microseconds::zero()),
+ io_timeout_(std::chrono::microseconds::zero()),
+ env_(env),
+ timedout_(false),
+ ignore_deadline_(false),
+ error_on_delay_(error_on_delay) {}
+
+ static const char* kClassName() { return "DeadlineFileSystem"; }
+ const char* Name() const override { return kClassName(); }
+
+ IOStatus NewRandomAccessFile(const std::string& fname,
+ const FileOptions& opts,
+ std::unique_ptr<FSRandomAccessFile>* result,
+ IODebugContext* dbg) override {
+ std::unique_ptr<FSRandomAccessFile> file;
+ IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
+ EXPECT_OK(s);
+ result->reset(new DeadlineRandomAccessFile(*this, file));
+
+ const std::chrono::microseconds deadline = GetDeadline();
+ const std::chrono::microseconds io_timeout = GetIOTimeout();
+ if (deadline.count() || io_timeout.count()) {
+ AssertDeadline(deadline, io_timeout, opts.io_options);
+ }
+ return ShouldDelay(opts.io_options);
+ }
+
+ // Set a vector of {IO counter, delay in microseconds, return status} tuples
+ // that control when to inject a delay and duration of the delay
+ void SetDelayTrigger(const std::chrono::microseconds deadline,
+ const std::chrono::microseconds io_timeout,
+ const int trigger) {
+ delay_trigger_ = trigger;
+ io_count_ = 0;
+ deadline_ = deadline;
+ io_timeout_ = io_timeout;
+ timedout_ = false;
+ }
+
+ // Increment the IO counter and return a delay in microseconds
+ IOStatus ShouldDelay(const IOOptions& opts) {
+ if (timedout_) {
+ return IOStatus::TimedOut();
+ } else if (!deadline_.count() && !io_timeout_.count()) {
+ return IOStatus::OK();
+ }
+ if (!ignore_deadline_ && delay_trigger_ == io_count_++) {
+ env_->SleepForMicroseconds(static_cast<int>(opts.timeout.count() + 1));
+ timedout_ = true;
+ if (error_on_delay_) {
+ return IOStatus::TimedOut();
+ }
+ }
+ return IOStatus::OK();
+ }
+
+ const std::chrono::microseconds GetDeadline() {
+ return ignore_deadline_ ? std::chrono::microseconds::zero() : deadline_;
+ }
+
+ const std::chrono::microseconds GetIOTimeout() {
+ return ignore_deadline_ ? std::chrono::microseconds::zero() : io_timeout_;
+ }
+
+ bool TimedOut() { return timedout_; }
+
+ void IgnoreDeadline(bool ignore) { ignore_deadline_ = ignore; }
+
+ void AssertDeadline(const std::chrono::microseconds deadline,
+ const std::chrono::microseconds io_timeout,
+ const IOOptions& opts) const {
+ // Give a leeway of +- 10us as it can take some time for the Get/
+ // MultiGet call to reach here, in order to avoid false alarms
+ std::chrono::microseconds now =
+ std::chrono::microseconds(env_->NowMicros());
+ std::chrono::microseconds timeout;
+ if (deadline.count()) {
+ timeout = deadline - now;
+ if (io_timeout.count()) {
+ timeout = std::min(timeout, io_timeout);
+ }
+ } else {
+ timeout = io_timeout;
+ }
+ if (opts.timeout != timeout) {
+ ASSERT_EQ(timeout, opts.timeout);
+ }
+ }
+
+ private:
+ // The number of IOs to trigger the delay after
+ int delay_trigger_;
+ // Current IO count
+ int io_count_;
+ // ReadOptions deadline for the Get/MultiGet/Iterator
+ std::chrono::microseconds deadline_;
+ // ReadOptions io_timeout for the Get/MultiGet/Iterator
+ std::chrono::microseconds io_timeout_;
+ SpecialEnv* env_;
+ // Flag to indicate whether we injected a delay
+ bool timedout_;
+ // Temporarily ignore deadlines/timeouts
+ bool ignore_deadline_;
+ // Return IOStatus::TimedOut() or IOStatus::OK()
+ bool error_on_delay_;
+};
+
+IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len,
+ const IOOptions& opts, Slice* result,
+ char* scratch,
+ IODebugContext* dbg) const {
+ const std::chrono::microseconds deadline = fs_.GetDeadline();
+ const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
+ IOStatus s;
+ if (deadline.count() || io_timeout.count()) {
+ fs_.AssertDeadline(deadline, io_timeout, opts);
+ }
+ if (s.ok()) {
+ s = FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch,
+ dbg);
+ }
+ if (s.ok()) {
+ s = fs_.ShouldDelay(opts);
+ }
+ return s;
+}
+
+IOStatus DeadlineRandomAccessFile::ReadAsync(
+ FSReadRequest& req, const IOOptions& opts,
+ std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
+ void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) {
+ const std::chrono::microseconds deadline = fs_.GetDeadline();
+ const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
+ IOStatus s;
+ if (deadline.count() || io_timeout.count()) {
+ fs_.AssertDeadline(deadline, io_timeout, opts);
+ }
+ if (s.ok()) {
+ s = FSRandomAccessFileWrapper::ReadAsync(req, opts, cb, cb_arg, io_handle,
+ del_fn, dbg);
+ }
+ if (s.ok()) {
+ s = fs_.ShouldDelay(opts);
+ }
+ return s;
+}
+
+IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs,
+ size_t num_reqs,
+ const IOOptions& options,
+ IODebugContext* dbg) {
+ const std::chrono::microseconds deadline = fs_.GetDeadline();
+ const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
+ IOStatus s;
+ if (deadline.count() || io_timeout.count()) {
+ fs_.AssertDeadline(deadline, io_timeout, options);
+ }
+ if (s.ok()) {
+ s = FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg);
+ }
+ if (s.ok()) {
+ s = fs_.ShouldDelay(options);
+ }
+ return s;
+}
+
+// A test class for intercepting random reads and injecting artificial
+// delays. Used for testing the MultiGet deadline feature
+class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet,
+ public testing::WithParamInterface<bool> {
+ public:
+ DBBasicTestMultiGetDeadline()
+ : DBBasicTestMultiGet(
+ "db_basic_test_multiget_deadline" /*Test dir*/,
+ 10 /*# of column families*/, false /*compressed cache enabled*/,
+ true /*uncompressed cache enabled*/, true /*compression enabled*/,
+ true /*ReadOptions.fill_cache*/,
+ 1 /*# of parallel compression threads*/) {}
+
+ inline void CheckStatus(std::vector<Status>& statuses, size_t num_ok) {
+ for (size_t i = 0; i < statuses.size(); ++i) {
+ if (i < num_ok) {
+ EXPECT_OK(statuses[i]);
+ } else {
+ if (statuses[i] != Status::TimedOut()) {
+ EXPECT_EQ(statuses[i], Status::TimedOut());
+ }
+ }
+ }
+ }
+};
+
+TEST_P(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
+#ifndef USE_COROUTINES
+ if (GetParam()) {
+ ROCKSDB_GTEST_SKIP("This test requires coroutine support");
+ return;
+ }
+#endif // USE_COROUTINES
+ std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, false);
+ std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
+ Options options = CurrentOptions();
+
+ std::shared_ptr<Cache> cache = NewLRUCache(1048576);
+ BlockBasedTableOptions table_options;
+ table_options.block_cache = cache;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ options.env = env.get();
+ SetTimeElapseOnlySleepOnReopen(&options);
+ ReopenWithColumnFamilies(GetCFNames(), options);
+
+ // Test the non-batched version of MultiGet with multiple column
+ // families
+ std::vector<std::string> key_str;
+ size_t i;
+ for (i = 0; i < 5; ++i) {
+ key_str.emplace_back(Key(static_cast<int>(i)));
+ }
+ std::vector<ColumnFamilyHandle*> cfs(key_str.size());
+ ;
+ std::vector<Slice> keys(key_str.size());
+ std::vector<std::string> values(key_str.size());
+ for (i = 0; i < key_str.size(); ++i) {
+ cfs[i] = handles_[i];
+ keys[i] = Slice(key_str[i].data(), key_str[i].size());
+ }
+
+ ReadOptions ro;
+ ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
+ ro.async_io = GetParam();
+ // Delay the first IO
+ fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
+
+ std::vector<Status> statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
+ // The first key is successful because we check after the lookup, but
+ // subsequent keys fail due to deadline exceeded
+ CheckStatus(statuses, 1);
+
+ // Clear the cache
+ cache->SetCapacity(0);
+ cache->SetCapacity(1048576);
+ // Test non-batched Multiget with multiple column families and
+ // introducing an IO delay in one of the middle CFs
+ key_str.clear();
+ for (i = 0; i < 10; ++i) {
+ key_str.emplace_back(Key(static_cast<int>(i)));
+ }
+ cfs.resize(key_str.size());
+ keys.resize(key_str.size());
+ values.resize(key_str.size());
+ for (i = 0; i < key_str.size(); ++i) {
+ // 2 keys per CF
+ cfs[i] = handles_[i / 2];
+ keys[i] = Slice(key_str[i].data(), key_str[i].size());
+ }
+ ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
+ fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
+ statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
+ CheckStatus(statuses, 3);
+
+ // Test batched MultiGet with an IO delay in the first data block read.
+ // Both keys in the first CF should succeed as they're in the same data
+ // block and would form one batch, and we check for deadline between
+ // batches.
+ std::vector<PinnableSlice> pin_values(keys.size());
+ cache->SetCapacity(0);
+ cache->SetCapacity(1048576);
+ statuses.clear();
+ statuses.resize(keys.size());
+ ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
+ fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
+ dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
+ pin_values.data(), statuses.data());
+ CheckStatus(statuses, 2);
+
+ // Similar to the previous one, but an IO delay in the third CF data block
+ // read
+ for (PinnableSlice& value : pin_values) {
+ value.Reset();
+ }
+ cache->SetCapacity(0);
+ cache->SetCapacity(1048576);
+ statuses.clear();
+ statuses.resize(keys.size());
+ ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
+ fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 2);
+ dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
+ pin_values.data(), statuses.data());
+ CheckStatus(statuses, 6);
+
+ // Similar to the previous one, but an IO delay in the last but one CF
+ for (PinnableSlice& value : pin_values) {
+ value.Reset();
+ }
+ cache->SetCapacity(0);
+ cache->SetCapacity(1048576);
+ statuses.clear();
+ statuses.resize(keys.size());
+ ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
+ fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 3);
+ dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
+ pin_values.data(), statuses.data());
+ CheckStatus(statuses, 8);
+
+ // Test batched MultiGet with single CF and lots of keys. Inject delay
+ // into the second batch of keys. As each batch is 32, the first 64 keys,
+ // i.e first two batches, should succeed and the rest should time out
+ for (PinnableSlice& value : pin_values) {
+ value.Reset();
+ }
+ cache->SetCapacity(0);
+ cache->SetCapacity(1048576);
+ key_str.clear();
+ for (i = 0; i < 100; ++i) {
+ key_str.emplace_back(Key(static_cast<int>(i)));
+ }
+ keys.resize(key_str.size());
+ pin_values.clear();
+ pin_values.resize(key_str.size());
+ for (i = 0; i < key_str.size(); ++i) {
+ keys[i] = Slice(key_str[i].data(), key_str[i].size());
+ }
+ statuses.clear();
+ statuses.resize(keys.size());
+ ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
+ fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
+ dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(),
+ pin_values.data(), statuses.data());
+ CheckStatus(statuses, 64);
+ Close();
+}
+
+INSTANTIATE_TEST_CASE_P(DeadlineIO, DBBasicTestMultiGetDeadline,
+ ::testing::Bool());
+
+TEST_F(DBBasicTest, ManifestWriteFailure) {
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.env = env_;
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Flush());
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
+ ASSERT_NE(nullptr, arg);
+ auto* s = reinterpret_cast<Status*>(arg);
+ ASSERT_OK(*s);
+ // Manually overwrite return status
+ *s = Status::IOError();
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(Put("key", "value"));
+ ASSERT_NOK(Flush());
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->EnableProcessing();
+ Reopen(options);
+}
+
+TEST_F(DBBasicTest, DestroyDefaultCfHandle) {
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+ DestroyAndReopen(options);
+ CreateAndReopenWithCF({"pikachu"}, options);
+ for (const auto* h : handles_) {
+ ASSERT_NE(db_->DefaultColumnFamily(), h);
+ }
+
+ // We have two handles to the default column family. The two handles point to
+ // different ColumnFamilyHandle objects.
+ assert(db_->DefaultColumnFamily());
+ ASSERT_EQ(0U, db_->DefaultColumnFamily()->GetID());
+ assert(handles_[0]);
+ ASSERT_EQ(0U, handles_[0]->GetID());
+
+ // You can destroy handles_[...].
+ for (auto* h : handles_) {
+ ASSERT_OK(db_->DestroyColumnFamilyHandle(h));
+ }
+ handles_.clear();
+
+ // But you should not destroy db_->DefaultColumnFamily(), since it's going to
+ // be deleted in `DBImpl::CloseHelper()`. Before that, it may be used
+ // elsewhere internally too.
+ ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
+ ASSERT_TRUE(db_->DestroyColumnFamilyHandle(default_cf).IsInvalidArgument());
+}
+
+TEST_F(DBBasicTest, FailOpenIfLoggerCreationFail) {
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "rocksdb::CreateLoggerFromOptions:AfterGetPath", [&](void* arg) {
+ auto* s = reinterpret_cast<Status*>(arg);
+ assert(s);
+ *s = Status::IOError("Injected");
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Status s = TryReopen(options);
+ ASSERT_EQ(nullptr, options.info_log);
+ ASSERT_TRUE(s.IsIOError());
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(DBBasicTest, VerifyFileChecksums) {
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+ options.env = env_;
+ DestroyAndReopen(options);
+ ASSERT_OK(Put("a", "value"));
+ ASSERT_OK(Flush());
+ ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
+
+ options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
+ Reopen(options);
+ ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
+
+ // Write an L0 with checksum computed.
+ ASSERT_OK(Put("b", "value"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
+
+ // Does the right thing but with the wrong name -- using it should lead to an
+ // error.
+ class MisnamedFileChecksumGenerator : public FileChecksumGenCrc32c {
+ public:
+ MisnamedFileChecksumGenerator(const FileChecksumGenContext& context)
+ : FileChecksumGenCrc32c(context) {}
+
+ const char* Name() const override { return "sha1"; }
+ };
+
+ class MisnamedFileChecksumGenFactory : public FileChecksumGenCrc32cFactory {
+ public:
+ std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator(
+ const FileChecksumGenContext& context) override {
+ return std::unique_ptr<FileChecksumGenerator>(
+ new MisnamedFileChecksumGenerator(context));
+ }
+ };
+
+ options.file_checksum_gen_factory.reset(new MisnamedFileChecksumGenFactory());
+ Reopen(options);
+ ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
+}
+
+// TODO: re-enable after we provide finer-grained control for WAL tracking to
+// meet the needs of different use cases, durability levels and recovery modes.
+TEST_F(DBBasicTest, DISABLED_ManualWalSync) {
+ Options options = CurrentOptions();
+ options.track_and_verify_wals_in_manifest = true;
+ options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("x", "y"));
+ // This does not create a new WAL.
+ ASSERT_OK(db_->SyncWAL());
+ EXPECT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());
+
+ std::unique_ptr<LogFile> wal;
+ Status s = db_->GetCurrentWalFile(&wal);
+ ASSERT_OK(s);
+ Close();
+
+ EXPECT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
+
+ ASSERT_TRUE(TryReopen(options).IsCorruption());
+}
+#endif // !ROCKSDB_LITE
+
+// A test class for intercepting random reads and injecting artificial
+// delays. Used for testing the deadline/timeout feature
+class DBBasicTestDeadline
+ : public DBBasicTest,
+ public testing::WithParamInterface<std::tuple<bool, bool>> {};
+
+TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
+ std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
+ std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
+ bool set_deadline = std::get<0>(GetParam());
+ bool set_timeout = std::get<1>(GetParam());
+
+ for (int option_config = kDefault; option_config < kEnd; ++option_config) {
+ if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
+ continue;
+ }
+ option_config_ = option_config;
+ Options options = CurrentOptions();
+ if (options.use_direct_reads) {
+ continue;
+ }
+ options.env = env.get();
+ options.disable_auto_compactions = true;
+ Cache* block_cache = nullptr;
+ // Fileter block reads currently don't cause the request to get
+ // aborted on a read timeout, so its possible those block reads
+ // may get issued even if the deadline is past
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTable::Get:BeforeFilterMatch",
+ [&](void* /*arg*/) { fs->IgnoreDeadline(true); });
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTable::Get:AfterFilterMatch",
+ [&](void* /*arg*/) { fs->IgnoreDeadline(false); });
+ // DB open will create table readers unless we reduce the table cache
+ // capacity.
+ // SanitizeOptions will set max_open_files to minimum of 20. Table cache
+ // is allocated with max_open_files - 10 as capacity. So override
+ // max_open_files to 11 so table cache capacity will become 1. This will
+ // prevent file open during DB open and force the file to be opened
+ // during MultiGet
+ SyncPoint::GetInstance()->SetCallBack(
+ "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
+ int* max_open_files = (int*)arg;
+ *max_open_files = 11;
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ SetTimeElapseOnlySleepOnReopen(&options);
+ Reopen(options);
+
+ if (options.table_factory) {
+ block_cache = options.table_factory->GetOptions<Cache>(
+ TableFactory::kBlockCacheOpts());
+ }
+
+ Random rnd(301);
+ for (int i = 0; i < 400; ++i) {
+ std::string key = "k" + std::to_string(i);
+ ASSERT_OK(Put(key, rnd.RandomString(100)));
+ }
+ ASSERT_OK(Flush());
+
+ bool timedout = true;
+ // A timeout will be forced when the IO counter reaches this value
+ int io_deadline_trigger = 0;
+ // Keep incrementing io_deadline_trigger and call Get() until there is an
+ // iteration that doesn't cause a timeout. This ensures that we cover
+ // all file reads in the point lookup path that can potentially timeout
+ // and cause the Get() to fail.
+ while (timedout) {
+ ReadOptions ro;
+ if (set_deadline) {
+ ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
+ }
+ if (set_timeout) {
+ ro.io_timeout = std::chrono::microseconds{5000};
+ }
+ fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
+
+ block_cache->SetCapacity(0);
+ block_cache->SetCapacity(1048576);
+
+ std::string value;
+ Status s = dbfull()->Get(ro, "k50", &value);
+ if (fs->TimedOut()) {
+ ASSERT_EQ(s, Status::TimedOut());
+ } else {
+ timedout = false;
+ ASSERT_OK(s);
+ }
+ io_deadline_trigger++;
+ }
+ // Reset the delay sequence in order to avoid false alarms during Reopen
+ fs->SetDelayTrigger(std::chrono::microseconds::zero(),
+ std::chrono::microseconds::zero(), 0);
+ }
+ Close();
+}
+
+TEST_P(DBBasicTestDeadline, IteratorDeadline) {
+ std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
+ std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
+ bool set_deadline = std::get<0>(GetParam());
+ bool set_timeout = std::get<1>(GetParam());
+
+ for (int option_config = kDefault; option_config < kEnd; ++option_config) {
+ if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
+ continue;
+ }
+ Options options = CurrentOptions();
+ if (options.use_direct_reads) {
+ continue;
+ }
+ options.env = env.get();
+ options.disable_auto_compactions = true;
+ Cache* block_cache = nullptr;
+ // DB open will create table readers unless we reduce the table cache
+ // capacity.
+ // SanitizeOptions will set max_open_files to minimum of 20. Table cache
+ // is allocated with max_open_files - 10 as capacity. So override
+ // max_open_files to 11 so table cache capacity will become 1. This will
+ // prevent file open during DB open and force the file to be opened
+ // during MultiGet
+ SyncPoint::GetInstance()->SetCallBack(
+ "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
+ int* max_open_files = (int*)arg;
+ *max_open_files = 11;
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ SetTimeElapseOnlySleepOnReopen(&options);
+ Reopen(options);
+
+ if (options.table_factory) {
+ block_cache = options.table_factory->GetOptions<Cache>(
+ TableFactory::kBlockCacheOpts());
+ }
+
+ Random rnd(301);
+ for (int i = 0; i < 400; ++i) {
+ std::string key = "k" + std::to_string(i);
+ ASSERT_OK(Put(key, rnd.RandomString(100)));
+ }
+ ASSERT_OK(Flush());
+
+ bool timedout = true;
+ // A timeout will be forced when the IO counter reaches this value
+ int io_deadline_trigger = 0;
+ // Keep incrementing io_deadline_trigger and call Get() until there is an
+ // iteration that doesn't cause a timeout. This ensures that we cover
+ // all file reads in the point lookup path that can potentially timeout
+ while (timedout) {
+ ReadOptions ro;
+ if (set_deadline) {
+ ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
+ }
+ if (set_timeout) {
+ ro.io_timeout = std::chrono::microseconds{5000};
+ }
+ fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
+
+ block_cache->SetCapacity(0);
+ block_cache->SetCapacity(1048576);
+
+ Iterator* iter = dbfull()->NewIterator(ro);
+ int count = 0;
+ iter->Seek("k50");
+ while (iter->Valid() && count++ < 100) {
+ iter->Next();
+ }
+ if (fs->TimedOut()) {
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_EQ(iter->status(), Status::TimedOut());
+ } else {
+ timedout = false;
+ ASSERT_OK(iter->status());
+ }
+ delete iter;
+ io_deadline_trigger++;
+ }
+ // Reset the delay sequence in order to avoid false alarms during Reopen
+ fs->SetDelayTrigger(std::chrono::microseconds::zero(),
+ std::chrono::microseconds::zero(), 0);
+ }
+ Close();
+}
+
+// Param 0: If true, set read_options.deadline
+// Param 1: If true, set read_options.io_timeout
+INSTANTIATE_TEST_CASE_P(DBBasicTestDeadline, DBBasicTestDeadline,
+ ::testing::Values(std::make_tuple(true, false),
+ std::make_tuple(false, true),
+ std::make_tuple(true, true)));
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ RegisterCustomObjects(argc, argv);
+ return RUN_ALL_TESTS();
+}