diff options
Diffstat (limited to 'src/rocksdb/db/memtable_list_test.cc')
-rw-r--r-- | src/rocksdb/db/memtable_list_test.cc | 922 |
1 files changed, 922 insertions, 0 deletions
diff --git a/src/rocksdb/db/memtable_list_test.cc b/src/rocksdb/db/memtable_list_test.cc new file mode 100644 index 000000000..a92bc6c79 --- /dev/null +++ b/src/rocksdb/db/memtable_list_test.cc @@ -0,0 +1,922 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/memtable_list.h" +#include <algorithm> +#include <string> +#include <vector> +#include "db/merge_context.h" +#include "db/version_set.h" +#include "db/write_controller.h" +#include "rocksdb/db.h" +#include "rocksdb/status.h" +#include "rocksdb/write_buffer_manager.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +class MemTableListTest : public testing::Test { + public: + std::string dbname; + DB* db; + Options options; + std::vector<ColumnFamilyHandle*> handles; + std::atomic<uint64_t> file_number; + + MemTableListTest() : db(nullptr), file_number(1) { + dbname = test::PerThreadDBPath("memtable_list_test"); + options.create_if_missing = true; + DestroyDB(dbname, options); + } + + // Create a test db if not yet created + void CreateDB() { + if (db == nullptr) { + options.create_if_missing = true; + DestroyDB(dbname, options); + // Open DB only with default column family + ColumnFamilyOptions cf_options; + std::vector<ColumnFamilyDescriptor> cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options); + Status s = DB::Open(options, dbname, cf_descs, &handles, &db); + EXPECT_OK(s); + + ColumnFamilyOptions cf_opt1, cf_opt2; + cf_opt1.cf_paths.emplace_back(dbname + "_one_1", + std::numeric_limits<uint64_t>::max()); + cf_opt2.cf_paths.emplace_back(dbname + "_two_1", + std::numeric_limits<uint64_t>::max()); + int sz = static_cast<int>(handles.size()); + handles.resize(sz + 2); + s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]); + EXPECT_OK(s); + s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]); + EXPECT_OK(s); + + cf_descs.emplace_back("one", cf_options); + cf_descs.emplace_back("two", cf_options); + } + } + + ~MemTableListTest() override { + if (db) { + std::vector<ColumnFamilyDescriptor> cf_descs(handles.size()); + for (int i = 0; i != static_cast<int>(handles.size()); ++i) { + handles[i]->GetDescriptor(&cf_descs[i]); + } + for (auto h : handles) { + if (h) { + db->DestroyColumnFamilyHandle(h); + } + } + handles.clear(); + delete db; + db = nullptr; + DestroyDB(dbname, options, cf_descs); + } + } + + // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all + // structures needed to call this function. + Status Mock_InstallMemtableFlushResults( + MemTableList* list, const MutableCFOptions& mutable_cf_options, + const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) { + // Create a mock Logger + test::NullLogger logger; + LogBuffer log_buffer(DEBUG_LEVEL, &logger); + + CreateDB(); + // Create a mock VersionSet + DBOptions db_options; + db_options.file_system = FileSystem::Default(); + ImmutableDBOptions immutable_db_options(db_options); + EnvOptions env_options; + std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16)); + WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); + WriteController write_controller(10000000u); + + VersionSet versions(dbname, &immutable_db_options, env_options, + table_cache.get(), &write_buffer_manager, + &write_controller, /*block_cache_tracer=*/nullptr); + std::vector<ColumnFamilyDescriptor> cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); + cf_descs.emplace_back("one", ColumnFamilyOptions()); + cf_descs.emplace_back("two", ColumnFamilyOptions()); + + EXPECT_OK(versions.Recover(cf_descs, false)); + + // Create mock default ColumnFamilyData + auto column_family_set = versions.GetColumnFamilySet(); + LogsWithPrepTracker dummy_prep_tracker; + auto cfd = column_family_set->GetDefault(); + EXPECT_TRUE(nullptr != cfd); + uint64_t file_num = file_number.fetch_add(1); + // Create dummy mutex. + InstrumentedMutex mutex; + InstrumentedMutexLock l(&mutex); + std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info; + Status s = list->TryInstallMemtableFlushResults( + cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, + file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info); + return s; + } + + // Calls MemTableList::InstallMemtableFlushResults() and sets up all + // structures needed to call this function. + Status Mock_InstallMemtableAtomicFlushResults( + autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids, + const autovector<const MutableCFOptions*>& mutable_cf_options_list, + const autovector<const autovector<MemTable*>*>& mems_list, + autovector<MemTable*>* to_delete) { + // Create a mock Logger + test::NullLogger logger; + LogBuffer log_buffer(DEBUG_LEVEL, &logger); + + CreateDB(); + // Create a mock VersionSet + DBOptions db_options; + db_options.file_system.reset(new LegacyFileSystemWrapper(db_options.env)); + + ImmutableDBOptions immutable_db_options(db_options); + EnvOptions env_options; + std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16)); + WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); + WriteController write_controller(10000000u); + + VersionSet versions(dbname, &immutable_db_options, env_options, + table_cache.get(), &write_buffer_manager, + &write_controller, /*block_cache_tracer=*/nullptr); + std::vector<ColumnFamilyDescriptor> cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); + cf_descs.emplace_back("one", ColumnFamilyOptions()); + cf_descs.emplace_back("two", ColumnFamilyOptions()); + EXPECT_OK(versions.Recover(cf_descs, false)); + + // Create mock default ColumnFamilyData + + auto column_family_set = versions.GetColumnFamilySet(); + + LogsWithPrepTracker dummy_prep_tracker; + autovector<ColumnFamilyData*> cfds; + for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) { + cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i])); + EXPECT_NE(nullptr, cfds[i]); + } + std::vector<FileMetaData> file_metas; + file_metas.reserve(cf_ids.size()); + for (size_t i = 0; i != cf_ids.size(); ++i) { + FileMetaData meta; + uint64_t file_num = file_number.fetch_add(1); + meta.fd = FileDescriptor(file_num, 0, 0); + file_metas.emplace_back(meta); + } + autovector<FileMetaData*> file_meta_ptrs; + for (auto& meta : file_metas) { + file_meta_ptrs.push_back(&meta); + } + InstrumentedMutex mutex; + InstrumentedMutexLock l(&mutex); + return InstallMemtableAtomicFlushResults( + &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex, + file_meta_ptrs, to_delete, nullptr, &log_buffer); + } +}; + +TEST_F(MemTableListTest, Empty) { + // Create an empty MemTableList and validate basic functions. + MemTableList list(1, 0, 0); + + ASSERT_EQ(0, list.NumNotFlushed()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + ASSERT_FALSE(list.IsFlushPending()); + + autovector<MemTable*> mems; + list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems); + ASSERT_EQ(0, mems.size()); + + autovector<MemTable*> to_delete; + list.current()->Unref(&to_delete); + ASSERT_EQ(0, to_delete.size()); +} + +TEST_F(MemTableListTest, GetTest) { + // Create MemTableList + int min_write_buffer_number_to_merge = 2; + int max_write_buffer_number_to_maintain = 0; + int64_t max_write_buffer_size_to_maintain = 0; + MemTableList list(min_write_buffer_number_to_merge, + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); + + SequenceNumber seq = 1; + std::string value; + Status s; + MergeContext merge_context; + InternalKeyComparator ikey_cmp(options.comparator); + SequenceNumber max_covering_tombstone_seq = 0; + autovector<MemTable*> to_delete; + + LookupKey lkey("key1", seq); + bool found = list.current()->Get(lkey, &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + // Create a MemTable + InternalKeyComparator cmp(BytewiseComparator()); + auto factory = std::make_shared<SkipListFactory>(); + options.memtable_factory = factory; + ImmutableCFOptions ioptions(options); + + WriteBufferManager wb(options.db_write_buffer_size); + MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + mem->Ref(); + + // Write some keys to this memtable. + mem->Add(++seq, kTypeDeletion, "key1", ""); + mem->Add(++seq, kTypeValue, "key2", "value2"); + mem->Add(++seq, kTypeValue, "key1", "value1"); + mem->Add(++seq, kTypeValue, "key2", "value2.2"); + + // Fetch the newly written keys + merge_context.Clear(); + found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(s.ok() && found); + ASSERT_EQ(value, "value1"); + + merge_context.Clear(); + found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + // MemTable found out that this key is *not* found (at this sequence#) + ASSERT_TRUE(found && s.IsNotFound()); + + merge_context.Clear(); + found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(s.ok() && found); + ASSERT_EQ(value, "value2.2"); + + ASSERT_EQ(4, mem->num_entries()); + ASSERT_EQ(1, mem->num_deletes()); + + // Add memtable to list + list.Add(mem, &to_delete); + + SequenceNumber saved_seq = seq; + + // Create another memtable and write some keys to it + WriteBufferManager wb2(options.db_write_buffer_size); + MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2, + kMaxSequenceNumber, 0 /* column_family_id */); + mem2->Ref(); + + mem2->Add(++seq, kTypeDeletion, "key1", ""); + mem2->Add(++seq, kTypeValue, "key2", "value2.3"); + + // Add second memtable to list + list.Add(mem2, &to_delete); + + // Fetch keys via MemTableList + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(found && s.IsNotFound()); + + merge_context.Clear(); + found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s, + &merge_context, &max_covering_tombstone_seq, + ReadOptions()); + ASSERT_TRUE(s.ok() && found); + ASSERT_EQ("value1", value); + + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(s.ok() && found); + ASSERT_EQ(value, "value2.3"); + + merge_context.Clear(); + found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + ASSERT_EQ(2, list.NumNotFlushed()); + + list.current()->Unref(&to_delete); + for (MemTable* m : to_delete) { + delete m; + } +} + +TEST_F(MemTableListTest, GetFromHistoryTest) { + // Create MemTableList + int min_write_buffer_number_to_merge = 2; + int max_write_buffer_number_to_maintain = 2; + int64_t max_write_buffer_size_to_maintain = 2000; + MemTableList list(min_write_buffer_number_to_merge, + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); + + SequenceNumber seq = 1; + std::string value; + Status s; + MergeContext merge_context; + InternalKeyComparator ikey_cmp(options.comparator); + SequenceNumber max_covering_tombstone_seq = 0; + autovector<MemTable*> to_delete; + + LookupKey lkey("key1", seq); + bool found = list.current()->Get(lkey, &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + // Create a MemTable + InternalKeyComparator cmp(BytewiseComparator()); + auto factory = std::make_shared<SkipListFactory>(); + options.memtable_factory = factory; + ImmutableCFOptions ioptions(options); + + WriteBufferManager wb(options.db_write_buffer_size); + MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + mem->Ref(); + + // Write some keys to this memtable. + mem->Add(++seq, kTypeDeletion, "key1", ""); + mem->Add(++seq, kTypeValue, "key2", "value2"); + mem->Add(++seq, kTypeValue, "key2", "value2.2"); + + // Fetch the newly written keys + merge_context.Clear(); + found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + // MemTable found out that this key is *not* found (at this sequence#) + ASSERT_TRUE(found && s.IsNotFound()); + + merge_context.Clear(); + found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(s.ok() && found); + ASSERT_EQ(value, "value2.2"); + + // Add memtable to list + list.Add(mem, &to_delete); + ASSERT_EQ(0, to_delete.size()); + + // Fetch keys via MemTableList + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(found && s.IsNotFound()); + + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(s.ok() && found); + ASSERT_EQ("value2.2", value); + + // Flush this memtable from the list. + // (It will then be a part of the memtable history). + autovector<MemTable*> to_flush; + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + ASSERT_EQ(1, to_flush.size()); + + MutableCFOptions mutable_cf_options(options); + s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush, + &to_delete); + ASSERT_OK(s); + ASSERT_EQ(0, list.NumNotFlushed()); + ASSERT_EQ(1, list.NumFlushed()); + ASSERT_EQ(0, to_delete.size()); + + // Verify keys are no longer in MemTableList + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + // Verify keys are present in history + merge_context.Clear(); + found = list.current()->GetFromHistory( + LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(found && s.IsNotFound()); + + merge_context.Clear(); + found = list.current()->GetFromHistory( + LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(found); + ASSERT_EQ("value2.2", value); + + // Create another memtable and write some keys to it + WriteBufferManager wb2(options.db_write_buffer_size); + MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2, + kMaxSequenceNumber, 0 /* column_family_id */); + mem2->Ref(); + + mem2->Add(++seq, kTypeDeletion, "key1", ""); + mem2->Add(++seq, kTypeValue, "key3", "value3"); + + // Add second memtable to list + list.Add(mem2, &to_delete); + ASSERT_EQ(0, to_delete.size()); + + to_flush.clear(); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + ASSERT_EQ(1, to_flush.size()); + + // Flush second memtable + s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush, + &to_delete); + ASSERT_OK(s); + ASSERT_EQ(0, list.NumNotFlushed()); + ASSERT_EQ(2, list.NumFlushed()); + ASSERT_EQ(0, to_delete.size()); + + // Add a third memtable to push the first memtable out of the history + WriteBufferManager wb3(options.db_write_buffer_size); + MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3, + kMaxSequenceNumber, 0 /* column_family_id */); + mem3->Ref(); + list.Add(mem3, &to_delete); + ASSERT_EQ(1, list.NumNotFlushed()); + ASSERT_EQ(1, list.NumFlushed()); + ASSERT_EQ(1, to_delete.size()); + + // Verify keys are no longer in MemTableList + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + // Verify that the second memtable's keys are in the history + merge_context.Clear(); + found = list.current()->GetFromHistory( + LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(found && s.IsNotFound()); + + merge_context.Clear(); + found = list.current()->GetFromHistory( + LookupKey("key3", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_TRUE(found); + ASSERT_EQ("value3", value); + + // Verify that key2 from the first memtable is no longer in the history + merge_context.Clear(); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); + ASSERT_FALSE(found); + + // Cleanup + list.current()->Unref(&to_delete); + ASSERT_EQ(3, to_delete.size()); + for (MemTable* m : to_delete) { + delete m; + } +} + +TEST_F(MemTableListTest, FlushPendingTest) { + const int num_tables = 6; + SequenceNumber seq = 1; + Status s; + + auto factory = std::make_shared<SkipListFactory>(); + options.memtable_factory = factory; + ImmutableCFOptions ioptions(options); + InternalKeyComparator cmp(BytewiseComparator()); + WriteBufferManager wb(options.db_write_buffer_size); + autovector<MemTable*> to_delete; + + // Create MemTableList + int min_write_buffer_number_to_merge = 3; + int max_write_buffer_number_to_maintain = 7; + int64_t max_write_buffer_size_to_maintain = + 7 * static_cast<int>(options.write_buffer_size); + MemTableList list(min_write_buffer_number_to_merge, + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain); + + // Create some MemTables + uint64_t memtable_id = 0; + std::vector<MemTable*> tables; + MutableCFOptions mutable_cf_options(options); + for (int i = 0; i < num_tables; i++) { + MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + mem->SetID(memtable_id++); + mem->Ref(); + + std::string value; + MergeContext merge_context; + + mem->Add(++seq, kTypeValue, "key1", ToString(i)); + mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"); + mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"); + mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"); + mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""); + + tables.push_back(mem); + } + + // Nothing to flush + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + autovector<MemTable*> to_flush; + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + ASSERT_EQ(0, to_flush.size()); + + // Request a flush even though there is nothing to flush + list.FlushRequested(); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Attempt to 'flush' to clear request for flush + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + ASSERT_EQ(0, to_flush.size()); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Request a flush again + list.FlushRequested(); + // No flush pending since the list is empty. + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Add 2 tables + list.Add(tables[0], &to_delete); + list.Add(tables[1], &to_delete); + ASSERT_EQ(2, list.NumNotFlushed()); + ASSERT_EQ(0, to_delete.size()); + + // Even though we have less than the minimum to flush, a flush is + // pending since we had previously requested a flush and never called + // PickMemtablesToFlush() to clear the flush. + ASSERT_TRUE(list.IsFlushPending()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Pick tables to flush + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + ASSERT_EQ(2, to_flush.size()); + ASSERT_EQ(2, list.NumNotFlushed()); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Revert flush + list.RollbackMemtableFlush(to_flush, 0); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + to_flush.clear(); + + // Add another table + list.Add(tables[2], &to_delete); + // We now have the minimum to flush regardles of whether FlushRequested() + // was called. + ASSERT_TRUE(list.IsFlushPending()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + ASSERT_EQ(0, to_delete.size()); + + // Pick tables to flush + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + ASSERT_EQ(3, to_flush.size()); + ASSERT_EQ(3, list.NumNotFlushed()); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Pick tables to flush again + autovector<MemTable*> to_flush2; + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2); + ASSERT_EQ(0, to_flush2.size()); + ASSERT_EQ(3, list.NumNotFlushed()); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Add another table + list.Add(tables[3], &to_delete); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + ASSERT_EQ(0, to_delete.size()); + + // Request a flush again + list.FlushRequested(); + ASSERT_TRUE(list.IsFlushPending()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Pick tables to flush again + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2); + ASSERT_EQ(1, to_flush2.size()); + ASSERT_EQ(4, list.NumNotFlushed()); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Rollback first pick of tables + list.RollbackMemtableFlush(to_flush, 0); + ASSERT_TRUE(list.IsFlushPending()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + to_flush.clear(); + + // Add another tables + list.Add(tables[4], &to_delete); + ASSERT_EQ(5, list.NumNotFlushed()); + // We now have the minimum to flush regardles of whether FlushRequested() + ASSERT_TRUE(list.IsFlushPending()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + ASSERT_EQ(0, to_delete.size()); + + // Pick tables to flush + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + // Should pick 4 of 5 since 1 table has been picked in to_flush2 + ASSERT_EQ(4, to_flush.size()); + ASSERT_EQ(5, list.NumNotFlushed()); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Pick tables to flush again + autovector<MemTable*> to_flush3; + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3); + ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed + ASSERT_EQ(5, list.NumNotFlushed()); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Flush the 4 memtables that were picked in to_flush + s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush, + &to_delete); + ASSERT_OK(s); + + // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains + // tables[3]. + // Current implementation will only commit memtables in the order they were + // created. So TryInstallMemtableFlushResults will install the first 3 tables + // in to_flush and stop when it encounters a table not yet flushed. + ASSERT_EQ(2, list.NumNotFlushed()); + int num_in_history = + std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) / + static_cast<int>(options.write_buffer_size)); + ASSERT_EQ(num_in_history, list.NumFlushed()); + ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size()); + + // Request a flush again. Should be nothing to flush + list.FlushRequested(); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + + // Flush the 1 memtable that was picked in to_flush2 + s = MemTableListTest::Mock_InstallMemtableFlushResults( + &list, mutable_cf_options, to_flush2, &to_delete); + ASSERT_OK(s); + + // This will actually install 2 tables. The 1 we told it to flush, and also + // tables[4] which has been waiting for tables[3] to commit. + ASSERT_EQ(0, list.NumNotFlushed()); + num_in_history = + std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) / + static_cast<int>(options.write_buffer_size)); + ASSERT_EQ(num_in_history, list.NumFlushed()); + ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size()); + + for (const auto& m : to_delete) { + // Refcount should be 0 after calling TryInstallMemtableFlushResults. + // Verify this, by Ref'ing then UnRef'ing: + m->Ref(); + ASSERT_EQ(m, m->Unref()); + delete m; + } + to_delete.clear(); + + // Add another table + list.Add(tables[5], &to_delete); + ASSERT_EQ(1, list.NumNotFlushed()); + ASSERT_EQ(5, list.GetLatestMemTableID()); + memtable_id = 4; + // Pick tables to flush. The tables to pick must have ID smaller than or + // equal to 4. Therefore, no table will be selected in this case. + autovector<MemTable*> to_flush4; + list.FlushRequested(); + ASSERT_TRUE(list.HasFlushRequested()); + list.PickMemtablesToFlush(&memtable_id, &to_flush4); + ASSERT_TRUE(to_flush4.empty()); + ASSERT_EQ(1, list.NumNotFlushed()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.HasFlushRequested()); + + // Pick tables to flush. The tables to pick must have ID smaller than or + // equal to 5. Therefore, only tables[5] will be selected. + memtable_id = 5; + list.FlushRequested(); + list.PickMemtablesToFlush(&memtable_id, &to_flush4); + ASSERT_EQ(1, static_cast<int>(to_flush4.size())); + ASSERT_EQ(1, list.NumNotFlushed()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + ASSERT_FALSE(list.IsFlushPending()); + to_delete.clear(); + + list.current()->Unref(&to_delete); + int to_delete_size = + std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) / + static_cast<int>(options.write_buffer_size)); + ASSERT_EQ(to_delete_size, to_delete.size()); + + for (const auto& m : to_delete) { + // Refcount should be 0 after calling TryInstallMemtableFlushResults. + // Verify this, by Ref'ing then UnRef'ing: + m->Ref(); + ASSERT_EQ(m, m->Unref()); + delete m; + } + to_delete.clear(); +} + +TEST_F(MemTableListTest, EmptyAtomicFlusTest) { + autovector<MemTableList*> lists; + autovector<uint32_t> cf_ids; + autovector<const MutableCFOptions*> options_list; + autovector<const autovector<MemTable*>*> to_flush; + autovector<MemTable*> to_delete; + Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list, + to_flush, &to_delete); + ASSERT_OK(s); + ASSERT_TRUE(to_delete.empty()); +} + +TEST_F(MemTableListTest, AtomicFlusTest) { + const int num_cfs = 3; + const int num_tables_per_cf = 2; + SequenceNumber seq = 1; + + auto factory = std::make_shared<SkipListFactory>(); + options.memtable_factory = factory; + ImmutableCFOptions ioptions(options); + InternalKeyComparator cmp(BytewiseComparator()); + WriteBufferManager wb(options.db_write_buffer_size); + + // Create MemTableLists + int min_write_buffer_number_to_merge = 3; + int max_write_buffer_number_to_maintain = 7; + int64_t max_write_buffer_size_to_maintain = + 7 * static_cast<int64_t>(options.write_buffer_size); + autovector<MemTableList*> lists; + for (int i = 0; i != num_cfs; ++i) { + lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge, + max_write_buffer_number_to_maintain, + max_write_buffer_size_to_maintain)); + } + + autovector<uint32_t> cf_ids; + std::vector<std::vector<MemTable*>> tables(num_cfs); + autovector<const MutableCFOptions*> mutable_cf_options_list; + uint32_t cf_id = 0; + for (auto& elem : tables) { + mutable_cf_options_list.emplace_back(new MutableCFOptions(options)); + uint64_t memtable_id = 0; + for (int i = 0; i != num_tables_per_cf; ++i) { + MemTable* mem = + new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb, + kMaxSequenceNumber, cf_id); + mem->SetID(memtable_id++); + mem->Ref(); + + std::string value; + + mem->Add(++seq, kTypeValue, "key1", ToString(i)); + mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"); + mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"); + mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"); + mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""); + + elem.push_back(mem); + } + cf_ids.push_back(cf_id++); + } + + std::vector<autovector<MemTable*>> flush_candidates(num_cfs); + + // Nothing to flush + for (auto i = 0; i != num_cfs; ++i) { + auto* list = lists[i]; + ASSERT_FALSE(list->IsFlushPending()); + ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); + list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]); + ASSERT_EQ(0, flush_candidates[i].size()); + } + // Request flush even though there is nothing to flush + for (auto i = 0; i != num_cfs; ++i) { + auto* list = lists[i]; + list->FlushRequested(); + ASSERT_FALSE(list->IsFlushPending()); + ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); + } + autovector<MemTable*> to_delete; + // Add tables to the immutable memtalbe lists associated with column families + for (auto i = 0; i != num_cfs; ++i) { + for (auto j = 0; j != num_tables_per_cf; ++j) { + lists[i]->Add(tables[i][j], &to_delete); + } + ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed()); + ASSERT_TRUE(lists[i]->IsFlushPending()); + ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire)); + } + std::vector<uint64_t> flush_memtable_ids = {1, 1, 0}; + // +----+ + // list[0]: |0 1| + // list[1]: |0 1| + // | +--+ + // list[2]: |0| 1 + // +-+ + // Pick memtables to flush + for (auto i = 0; i != num_cfs; ++i) { + flush_candidates[i].clear(); + lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i], + &flush_candidates[i]); + ASSERT_EQ(flush_memtable_ids[i] - 0 + 1, + static_cast<uint64_t>(flush_candidates[i].size())); + } + autovector<MemTableList*> tmp_lists; + autovector<uint32_t> tmp_cf_ids; + autovector<const MutableCFOptions*> tmp_options_list; + autovector<const autovector<MemTable*>*> to_flush; + for (auto i = 0; i != num_cfs; ++i) { + if (!flush_candidates[i].empty()) { + to_flush.push_back(&flush_candidates[i]); + tmp_lists.push_back(lists[i]); + tmp_cf_ids.push_back(i); + tmp_options_list.push_back(mutable_cf_options_list[i]); + } + } + Status s = Mock_InstallMemtableAtomicFlushResults( + tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete); + ASSERT_OK(s); + + for (auto i = 0; i != num_cfs; ++i) { + for (auto j = 0; j != num_tables_per_cf; ++j) { + if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) { + ASSERT_LT(0, tables[i][j]->GetFileNumber()); + } + } + ASSERT_EQ( + static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(), + lists[i]->NumNotFlushed()); + } + + to_delete.clear(); + for (auto list : lists) { + list->current()->Unref(&to_delete); + delete list; + } + for (auto& mutable_cf_options : mutable_cf_options_list) { + if (mutable_cf_options != nullptr) { + delete mutable_cf_options; + mutable_cf_options = nullptr; + } + } + // All memtables in tables array must have been flushed, thus ready to be + // deleted. + ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size()); + for (const auto& m : to_delete) { + // Refcount should be 0 after calling InstallMemtableFlushResults. + // Verify this by Ref'ing and then Unref'ing. + m->Ref(); + ASSERT_EQ(m, m->Unref()); + delete m; + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |