From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/db/listener_test.cc | 1595 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1595 insertions(+) create mode 100644 src/rocksdb/db/listener_test.cc (limited to 'src/rocksdb/db/listener_test.cc') diff --git a/src/rocksdb/db/listener_test.cc b/src/rocksdb/db/listener_test.cc new file mode 100644 index 000000000..160866bb7 --- /dev/null +++ b/src/rocksdb/db/listener_test.cc @@ -0,0 +1,1595 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_index.h" +#include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" +#include "db/dbformat.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "file/filename.h" +#include "monitoring/statistics.h" +#include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/options.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/table.h" +#include "rocksdb/table_properties.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/hash.h" +#include "util/mutexlock.h" +#include "util/rate_limiter.h" +#include "util/string_util.h" +#include "utilities/merge_operators.h" + +#ifndef ROCKSDB_LITE + +namespace ROCKSDB_NAMESPACE { + +class EventListenerTest : public DBTestBase { + public: + EventListenerTest() : DBTestBase("listener_test", /*env_do_fsync=*/true) {} + + static std::string BlobStr(uint64_t blob_file_number, uint64_t offset, + uint64_t size) { + std::string blob_index; + BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, + kNoCompression); + return blob_index; + } + + const size_t k110KB = 110 << 10; +}; + +struct TestPropertiesCollector + : public ROCKSDB_NAMESPACE::TablePropertiesCollector { + ROCKSDB_NAMESPACE::Status AddUserKey( + const ROCKSDB_NAMESPACE::Slice& /*key*/, + const ROCKSDB_NAMESPACE::Slice& /*value*/, + ROCKSDB_NAMESPACE::EntryType /*type*/, + ROCKSDB_NAMESPACE::SequenceNumber /*seq*/, + uint64_t /*file_size*/) override { + return Status::OK(); + } + ROCKSDB_NAMESPACE::Status Finish( + ROCKSDB_NAMESPACE::UserCollectedProperties* properties) override { + properties->insert({"0", "1"}); + return Status::OK(); + } + + const char* Name() const override { return "TestTablePropertiesCollector"; } + + ROCKSDB_NAMESPACE::UserCollectedProperties GetReadableProperties() + const override { + ROCKSDB_NAMESPACE::UserCollectedProperties ret; + ret["2"] = "3"; + return ret; + } +}; + +class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory { + public: + TablePropertiesCollector* CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context /*context*/) override { + return new TestPropertiesCollector; + } + const char* Name() const override { return "TestTablePropertiesCollector"; } +}; + +class TestCompactionListener : public EventListener { + public: + explicit TestCompactionListener(EventListenerTest* test) : test_(test) {} + + void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override { + std::lock_guard lock(mutex_); + compacted_dbs_.push_back(db); + ASSERT_GT(ci.input_files.size(), 0U); + ASSERT_EQ(ci.input_files.size(), ci.input_file_infos.size()); + + for (size_t i = 0; i < ci.input_file_infos.size(); ++i) { + ASSERT_EQ(ci.input_file_infos[i].level, ci.base_input_level); + ASSERT_EQ(ci.input_file_infos[i].file_number, + TableFileNameToNumber(ci.input_files[i])); + } + + ASSERT_GT(ci.output_files.size(), 0U); + ASSERT_EQ(ci.output_files.size(), ci.output_file_infos.size()); + + ASSERT_TRUE(test_); + ASSERT_EQ(test_->db_, db); + + std::vector> files_by_level; + test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[ci.cf_id], + &files_by_level); + ASSERT_GT(files_by_level.size(), ci.output_level); + + for (size_t i = 0; i < ci.output_file_infos.size(); ++i) { + ASSERT_EQ(ci.output_file_infos[i].level, ci.output_level); + ASSERT_EQ(ci.output_file_infos[i].file_number, + TableFileNameToNumber(ci.output_files[i])); + + auto it = std::find_if( + files_by_level[ci.output_level].begin(), + files_by_level[ci.output_level].end(), [&](const FileMetaData& meta) { + return meta.fd.GetNumber() == ci.output_file_infos[i].file_number; + }); + ASSERT_NE(it, files_by_level[ci.output_level].end()); + + ASSERT_EQ(ci.output_file_infos[i].oldest_blob_file_number, + it->oldest_blob_file_number); + } + + ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id); + ASSERT_GT(ci.thread_id, 0U); + + for (auto fl : {ci.input_files, ci.output_files}) { + for (auto fn : fl) { + auto it = ci.table_properties.find(fn); + ASSERT_NE(it, ci.table_properties.end()); + auto tp = it->second; + ASSERT_TRUE(tp != nullptr); + ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1"); + } + } + } + + EventListenerTest* test_; + std::vector compacted_dbs_; + std::mutex mutex_; +}; + +TEST_F(EventListenerTest, OnSingleDBCompactionTest) { + const int kTestKeySize = 16; + const int kTestValueSize = 984; + const int kEntrySize = kTestKeySize + kTestValueSize; + const int kEntriesPerBuffer = 100; + const int kNumL0Files = 4; + + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + options.write_buffer_size = kEntrySize * kEntriesPerBuffer; + options.compaction_style = kCompactionStyleLevel; + options.target_file_size_base = options.write_buffer_size; + options.max_bytes_for_level_base = options.target_file_size_base * 2; + options.max_bytes_for_level_multiplier = 2; + options.compression = kNoCompression; +#ifdef ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS + options.level0_file_num_compaction_trigger = kNumL0Files; + options.table_properties_collector_factories.push_back( + std::make_shared()); + + TestCompactionListener* listener = new TestCompactionListener(this); + options.listeners.emplace_back(listener); + std::vector cf_names = {"pikachu", "ilya", "muromec", + "dobrynia", "nikitich", "alyosha", + "popovich"}; + CreateAndReopenWithCF(cf_names, options); + ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto", + BlobStr(123, 0, 1 << 10))); + ASSERT_OK(dbfull()->Write(WriteOptions(), &batch)); + + ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); + ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); + ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); + ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); + ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); + ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); + for (int i = 1; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i], + nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + } + + ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size()); + for (size_t i = 0; i < cf_names.size(); ++i) { + ASSERT_EQ(listener->compacted_dbs_[i], db_); + } +} + +// This simple Listener can only handle one flush at a time. +class TestFlushListener : public EventListener { + public: + TestFlushListener(Env* env, EventListenerTest* test) + : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) { + db_closed = false; + } + + virtual ~TestFlushListener() { + prev_fc_info_.status.PermitUncheckedError(); // Ignore the status + } + void OnTableFileCreated(const TableFileCreationInfo& info) override { + // remember the info for later checking the FlushJobInfo. + prev_fc_info_ = info; + ASSERT_GT(info.db_name.size(), 0U); + ASSERT_GT(info.cf_name.size(), 0U); + ASSERT_GT(info.file_path.size(), 0U); + ASSERT_GT(info.job_id, 0); + ASSERT_GT(info.table_properties.data_size, 0U); + ASSERT_GT(info.table_properties.raw_key_size, 0U); + ASSERT_GT(info.table_properties.raw_value_size, 0U); + ASSERT_GT(info.table_properties.num_data_blocks, 0U); + ASSERT_GT(info.table_properties.num_entries, 0U); + ASSERT_EQ(info.file_checksum, kUnknownFileChecksum); + ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName); + +#ifdef ROCKSDB_USING_THREAD_STATUS + // Verify the id of the current thread that created this table + // file matches the id of any active flush or compaction thread. + uint64_t thread_id = env_->GetThreadID(); + std::vector thread_list; + ASSERT_OK(env_->GetThreadList(&thread_list)); + bool found_match = false; + for (auto thread_status : thread_list) { + if (thread_status.operation_type == ThreadStatus::OP_FLUSH || + thread_status.operation_type == ThreadStatus::OP_COMPACTION) { + if (thread_id == thread_status.thread_id) { + found_match = true; + break; + } + } + } + ASSERT_TRUE(found_match); +#endif // ROCKSDB_USING_THREAD_STATUS + } + + void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { + flushed_dbs_.push_back(db); + flushed_column_family_names_.push_back(info.cf_name); + if (info.triggered_writes_slowdown) { + slowdown_count++; + } + if (info.triggered_writes_stop) { + stop_count++; + } + // verify whether the previously created file matches the flushed file. + ASSERT_EQ(prev_fc_info_.db_name, db->GetName()); + ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name); + ASSERT_EQ(prev_fc_info_.job_id, info.job_id); + ASSERT_EQ(prev_fc_info_.file_path, info.file_path); + ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number); + + // Note: the following chunk relies on the notification pertaining to the + // database pointed to by DBTestBase::db_, and is thus bypassed when + // that assumption does not hold (see the test case MultiDBMultiListeners + // below). + ASSERT_TRUE(test_); + if (db == test_->db_) { + std::vector> files_by_level; + ASSERT_LT(info.cf_id, test_->handles_.size()); + ASSERT_GE(info.cf_id, 0u); + ASSERT_NE(test_->handles_[info.cf_id], nullptr); + test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[info.cf_id], + &files_by_level); + + ASSERT_FALSE(files_by_level.empty()); + auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(), + [&](const FileMetaData& meta) { + return meta.fd.GetNumber() == info.file_number; + }); + ASSERT_NE(it, files_by_level[0].end()); + ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number); + } + + ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id); + ASSERT_GT(info.thread_id, 0U); + ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second, + "1"); + } + + std::vector flushed_column_family_names_; + std::vector flushed_dbs_; + int slowdown_count; + int stop_count; + bool db_closing; + std::atomic_bool db_closed; + TableFileCreationInfo prev_fc_info_; + + protected: + Env* env_; + EventListenerTest* test_; +}; + +TEST_F(EventListenerTest, OnSingleDBFlushTest) { + Options options; + options.env = CurrentOptions().env; + options.write_buffer_size = k110KB; +#ifdef ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS + TestFlushListener* listener = new TestFlushListener(options.env, this); + options.listeners.emplace_back(listener); + std::vector cf_names = {"pikachu", "ilya", "muromec", + "dobrynia", "nikitich", "alyosha", + "popovich"}; + options.table_properties_collector_factories.push_back( + std::make_shared()); + CreateAndReopenWithCF(cf_names, options); + + ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto", + BlobStr(456, 0, 1 << 10))); + ASSERT_OK(dbfull()->Write(WriteOptions(), &batch)); + + ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); + ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); + ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); + ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); + ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); + ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); + for (int i = 1; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); + ASSERT_EQ(listener->flushed_dbs_.size(), i); + ASSERT_EQ(listener->flushed_column_family_names_.size(), i); + } + + // make sure callback functions are called in the right order + for (size_t i = 0; i < cf_names.size(); ++i) { + ASSERT_EQ(listener->flushed_dbs_[i], db_); + ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); + } +} + +TEST_F(EventListenerTest, MultiCF) { + Options options; + options.env = CurrentOptions().env; + options.write_buffer_size = k110KB; +#ifdef ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS + for (auto atomic_flush : {false, true}) { + options.atomic_flush = atomic_flush; + options.create_if_missing = true; + DestroyAndReopen(options); + TestFlushListener* listener = new TestFlushListener(options.env, this); + options.listeners.emplace_back(listener); + options.table_properties_collector_factories.push_back( + std::make_shared()); + std::vector cf_names = {"pikachu", "ilya", "muromec", + "dobrynia", "nikitich", "alyosha", + "popovich"}; + CreateAndReopenWithCF(cf_names, options); + + ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); + ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); + ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); + ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); + ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); + ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); + ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 1; i < 8; ++i) { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted", + "EventListenerTest.MultiCF:PreVerifyListener"}}); + ASSERT_OK(Flush(i)); + TEST_SYNC_POINT("EventListenerTest.MultiCF:PreVerifyListener"); + ASSERT_EQ(listener->flushed_dbs_.size(), i); + ASSERT_EQ(listener->flushed_column_family_names_.size(), i); + // make sure callback functions are called in the right order + if (i == 7) { + for (size_t j = 0; j < cf_names.size(); j++) { + ASSERT_EQ(listener->flushed_dbs_[j], db_); + ASSERT_EQ(listener->flushed_column_family_names_[j], cf_names[j]); + } + } + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + Close(); + } +} + +TEST_F(EventListenerTest, MultiDBMultiListeners) { + Options options; + options.env = CurrentOptions().env; +#ifdef ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS + options.table_properties_collector_factories.push_back( + std::make_shared()); + std::vector listeners; + const int kNumDBs = 5; + const int kNumListeners = 10; + for (int i = 0; i < kNumListeners; ++i) { + listeners.emplace_back(new TestFlushListener(options.env, this)); + } + + std::vector cf_names = {"pikachu", "ilya", "muromec", + "dobrynia", "nikitich", "alyosha", + "popovich"}; + + options.create_if_missing = true; + for (int i = 0; i < kNumListeners; ++i) { + options.listeners.emplace_back(listeners[i]); + } + DBOptions db_opts(options); + ColumnFamilyOptions cf_opts(options); + + std::vector dbs; + std::vector> vec_handles; + + for (int d = 0; d < kNumDBs; ++d) { + ASSERT_OK(DestroyDB(dbname_ + std::to_string(d), options)); + DB* db; + std::vector handles; + ASSERT_OK(DB::Open(options, dbname_ + std::to_string(d), &db)); + for (size_t c = 0; c < cf_names.size(); ++c) { + ColumnFamilyHandle* handle; + ASSERT_OK(db->CreateColumnFamily(cf_opts, cf_names[c], &handle)); + handles.push_back(handle); + } + + vec_handles.push_back(std::move(handles)); + dbs.push_back(db); + } + + for (int d = 0; d < kNumDBs; ++d) { + for (size_t c = 0; c < cf_names.size(); ++c) { + ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c], cf_names[c], + cf_names[c])); + } + } + + for (size_t c = 0; c < cf_names.size(); ++c) { + for (int d = 0; d < kNumDBs; ++d) { + ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c])); + ASSERT_OK( + static_cast_with_check(dbs[d])->TEST_WaitForFlushMemTable()); + } + } + + for (int d = 0; d < kNumDBs; ++d) { + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK( + static_cast_with_check(dbs[d])->TEST_WaitForBackgroundWork()); + } + + for (auto* listener : listeners) { + int pos = 0; + for (size_t c = 0; c < cf_names.size(); ++c) { + for (int d = 0; d < kNumDBs; ++d) { + ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]); + ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]); + pos++; + } + } + } + + for (auto handles : vec_handles) { + for (auto h : handles) { + delete h; + } + handles.clear(); + } + vec_handles.clear(); + + for (auto db : dbs) { + delete db; + } +} + +TEST_F(EventListenerTest, DisableBGCompaction) { + Options options; + options.env = CurrentOptions().env; +#ifdef ROCKSDB_USING_THREAD_STATUS + options.enable_thread_tracking = true; +#endif // ROCKSDB_USING_THREAD_STATUS + TestFlushListener* listener = new TestFlushListener(options.env, this); + const int kCompactionTrigger = 1; + const int kSlowdownTrigger = 5; + const int kStopTrigger = 100; + options.level0_file_num_compaction_trigger = kCompactionTrigger; + options.level0_slowdown_writes_trigger = kSlowdownTrigger; + options.level0_stop_writes_trigger = kStopTrigger; + options.max_write_buffer_number = 10; + options.listeners.emplace_back(listener); + // BG compaction is disabled. Number of L0 files will simply keeps + // increasing in this test. + options.compaction_style = kCompactionStyleNone; + options.compression = kNoCompression; + options.write_buffer_size = 100000; // Small write buffer + options.table_properties_collector_factories.push_back( + std::make_shared()); + + CreateAndReopenWithCF({"pikachu"}, options); + ColumnFamilyMetaData cf_meta; + db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); + + // keep writing until writes are forced to stop. + for (int i = 0; static_cast(cf_meta.file_count) < kSlowdownTrigger * 10; + ++i) { + ASSERT_OK( + Put(1, std::to_string(i), std::string(10000, 'x'), WriteOptions())); + FlushOptions fo; + fo.allow_write_stall = true; + ASSERT_OK(db_->Flush(fo, handles_[1])); + db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); + } + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); + ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9); +} + +class TestCompactionReasonListener : public EventListener { + public: + void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override { + std::lock_guard lock(mutex_); + compaction_reasons_.push_back(ci.compaction_reason); + } + + std::vector compaction_reasons_; + std::mutex mutex_; +}; + +TEST_F(EventListenerTest, CompactionReasonLevel) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + options.memtable_factory.reset(test::NewSpecialSkipListFactory( + DBTestBase::kNumKeysByGenerateNewRandomFile)); + + TestCompactionReasonListener* listener = new TestCompactionReasonListener(); + options.listeners.emplace_back(listener); + + options.level0_file_num_compaction_trigger = 4; + options.compaction_style = kCompactionStyleLevel; + + DestroyAndReopen(options); + Random rnd(301); + + // Write 4 files in L0 + for (int i = 0; i < 4; i++) { + GenerateNewRandomFile(&rnd); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_EQ(listener->compaction_reasons_.size(), 1); + ASSERT_EQ(listener->compaction_reasons_[0], + CompactionReason::kLevelL0FilesNum); + + DestroyAndReopen(options); + + // Write 3 non-overlapping files in L0 + for (int k = 1; k <= 30; k++) { + ASSERT_OK(Put(Key(k), Key(k))); + if (k % 10 == 0) { + Flush(); + } + } + + // Do a trivial move from L0 -> L1 + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + options.max_bytes_for_level_base = 1; + Close(); + listener->compaction_reasons_.clear(); + Reopen(options); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_GT(listener->compaction_reasons_.size(), 1); + + for (auto compaction_reason : listener->compaction_reasons_) { + ASSERT_EQ(compaction_reason, CompactionReason::kLevelMaxLevelSize); + } + + options.disable_auto_compactions = true; + Close(); + listener->compaction_reasons_.clear(); + Reopen(options); + + ASSERT_OK(Put("key", "value")); + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_GT(listener->compaction_reasons_.size(), 0); + for (auto compaction_reason : listener->compaction_reasons_) { + ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction); + } +} + +TEST_F(EventListenerTest, CompactionReasonUniversal) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + options.memtable_factory.reset(test::NewSpecialSkipListFactory( + DBTestBase::kNumKeysByGenerateNewRandomFile)); + + TestCompactionReasonListener* listener = new TestCompactionReasonListener(); + options.listeners.emplace_back(listener); + + options.compaction_style = kCompactionStyleUniversal; + + Random rnd(301); + + options.level0_file_num_compaction_trigger = 8; + options.compaction_options_universal.max_size_amplification_percent = 100000; + options.compaction_options_universal.size_ratio = 100000; + DestroyAndReopen(options); + listener->compaction_reasons_.clear(); + + // Write 8 files in L0 + for (int i = 0; i < 8; i++) { + GenerateNewRandomFile(&rnd); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_GT(listener->compaction_reasons_.size(), 0); + for (auto compaction_reason : listener->compaction_reasons_) { + ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeRatio); + } + + options.level0_file_num_compaction_trigger = 8; + options.compaction_options_universal.max_size_amplification_percent = 1; + options.compaction_options_universal.size_ratio = 100000; + + DestroyAndReopen(options); + listener->compaction_reasons_.clear(); + + // Write 8 files in L0 + for (int i = 0; i < 8; i++) { + GenerateNewRandomFile(&rnd); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_GT(listener->compaction_reasons_.size(), 0); + for (auto compaction_reason : listener->compaction_reasons_) { + ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeAmplification); + } + + options.disable_auto_compactions = true; + Close(); + listener->compaction_reasons_.clear(); + Reopen(options); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + ASSERT_GT(listener->compaction_reasons_.size(), 0); + for (auto compaction_reason : listener->compaction_reasons_) { + ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction); + } +} + +TEST_F(EventListenerTest, CompactionReasonFIFO) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + options.memtable_factory.reset(test::NewSpecialSkipListFactory( + DBTestBase::kNumKeysByGenerateNewRandomFile)); + + TestCompactionReasonListener* listener = new TestCompactionReasonListener(); + options.listeners.emplace_back(listener); + + options.level0_file_num_compaction_trigger = 4; + options.compaction_style = kCompactionStyleFIFO; + options.compaction_options_fifo.max_table_files_size = 1; + + DestroyAndReopen(options); + Random rnd(301); + + // Write 4 files in L0 + for (int i = 0; i < 4; i++) { + GenerateNewRandomFile(&rnd); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_GT(listener->compaction_reasons_.size(), 0); + for (auto compaction_reason : listener->compaction_reasons_) { + ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize); + } +} + +class TableFileCreationListener : public EventListener { + public: + class TestEnv : public EnvWrapper { + public: + explicit TestEnv(Env* t) : EnvWrapper(t) {} + static const char* kClassName() { return "TestEnv"; } + const char* Name() const override { return kClassName(); } + + void SetStatus(Status s) { status_ = s; } + + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + if (fname.size() > 4 && fname.substr(fname.size() - 4) == ".sst") { + if (!status_.ok()) { + return status_; + } + } + return target()->NewWritableFile(fname, result, options); + } + + private: + Status status_; + }; + + TableFileCreationListener() { + for (int i = 0; i < 2; i++) { + started_[i] = finished_[i] = failure_[i] = 0; + } + } + + int Index(TableFileCreationReason reason) { + int idx; + switch (reason) { + case TableFileCreationReason::kFlush: + idx = 0; + break; + case TableFileCreationReason::kCompaction: + idx = 1; + break; + default: + idx = -1; + } + return idx; + } + + void CheckAndResetCounters(int flush_started, int flush_finished, + int flush_failure, int compaction_started, + int compaction_finished, int compaction_failure) { + ASSERT_EQ(started_[0], flush_started); + ASSERT_EQ(finished_[0], flush_finished); + ASSERT_EQ(failure_[0], flush_failure); + ASSERT_EQ(started_[1], compaction_started); + ASSERT_EQ(finished_[1], compaction_finished); + ASSERT_EQ(failure_[1], compaction_failure); + for (int i = 0; i < 2; i++) { + started_[i] = finished_[i] = failure_[i] = 0; + } + } + + void OnTableFileCreationStarted( + const TableFileCreationBriefInfo& info) override { + int idx = Index(info.reason); + if (idx >= 0) { + started_[idx]++; + } + ASSERT_GT(info.db_name.size(), 0U); + ASSERT_GT(info.cf_name.size(), 0U); + ASSERT_GT(info.file_path.size(), 0U); + ASSERT_GT(info.job_id, 0); + } + + void OnTableFileCreated(const TableFileCreationInfo& info) override { + int idx = Index(info.reason); + if (idx >= 0) { + finished_[idx]++; + } + ASSERT_GT(info.db_name.size(), 0U); + ASSERT_GT(info.cf_name.size(), 0U); + ASSERT_GT(info.file_path.size(), 0U); + ASSERT_GT(info.job_id, 0); + ASSERT_EQ(info.file_checksum, kUnknownFileChecksum); + ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName); + if (info.status.ok()) { + if (info.table_properties.num_range_deletions == 0U) { + ASSERT_GT(info.table_properties.data_size, 0U); + ASSERT_GT(info.table_properties.raw_key_size, 0U); + ASSERT_GT(info.table_properties.raw_value_size, 0U); + ASSERT_GT(info.table_properties.num_data_blocks, 0U); + ASSERT_GT(info.table_properties.num_entries, 0U); + } + } else { + if (idx >= 0) { + failure_[idx]++; + last_failure_ = info.status; + } + } + } + + int started_[2]; + int finished_[2]; + int failure_[2]; + Status last_failure_; +}; + +TEST_F(EventListenerTest, TableFileCreationListenersTest) { + auto listener = std::make_shared(); + Options options; + std::unique_ptr test_env( + new TableFileCreationListener::TestEnv(CurrentOptions().env)); + options.create_if_missing = true; + options.listeners.push_back(listener); + options.env = test_env.get(); + DestroyAndReopen(options); + + ASSERT_OK(Put("foo", "aaa")); + ASSERT_OK(Put("bar", "bbb")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0); + ASSERT_OK(Put("foo", "aaa1")); + ASSERT_OK(Put("bar", "bbb1")); + test_env->SetStatus(Status::NotSupported("not supported")); + ASSERT_NOK(Flush()); + listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0); + ASSERT_TRUE(listener->last_failure_.IsNotSupported()); + test_env->SetStatus(Status::OK()); + + Reopen(options); + ASSERT_OK(Put("foo", "aaa2")); + ASSERT_OK(Put("bar", "bbb2")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0); + + const Slice kRangeStart = "a"; + const Slice kRangeEnd = "z"; + ASSERT_OK( + dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0); + + ASSERT_OK(Put("foo", "aaa3")); + ASSERT_OK(Put("bar", "bbb3")); + ASSERT_OK(Flush()); + test_env->SetStatus(Status::NotSupported("not supported")); + ASSERT_NOK( + dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd)); + ASSERT_NOK(dbfull()->TEST_WaitForCompact()); + listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1); + ASSERT_TRUE(listener->last_failure_.IsNotSupported()); + + // Reset + test_env->SetStatus(Status::OK()); + DestroyAndReopen(options); + + // Verify that an empty table file that is immediately deleted gives Aborted + // status to listener. + ASSERT_OK(Put("baz", "z")); + ASSERT_OK(SingleDelete("baz")); + ASSERT_OK(Flush()); + listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0); + ASSERT_TRUE(listener->last_failure_.IsAborted()); + + // Also in compaction + ASSERT_OK(Put("baz", "z")); + ASSERT_OK(Flush()); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + kRangeStart, kRangeEnd)); + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + listener->CheckAndResetCounters(2, 2, 0, 1, 1, 1); + ASSERT_TRUE(listener->last_failure_.IsAborted()); + + Close(); // Avoid UAF on listener +} + +class MemTableSealedListener : public EventListener { + private: + SequenceNumber latest_seq_number_; + + public: + MemTableSealedListener() {} + void OnMemTableSealed(const MemTableInfo& info) override { + latest_seq_number_ = info.first_seqno; + } + + void OnFlushCompleted(DB* /*db*/, + const FlushJobInfo& flush_job_info) override { + ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_); + } +}; + +TEST_F(EventListenerTest, MemTableSealedListenerTest) { + auto listener = std::make_shared(); + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + options.listeners.push_back(listener); + DestroyAndReopen(options); + + for (unsigned int i = 0; i < 10; i++) { + std::string tag = std::to_string(i); + ASSERT_OK(Put("foo" + tag, "aaa")); + ASSERT_OK(Put("bar" + tag, "bbb")); + + ASSERT_OK(Flush()); + } +} + +class ColumnFamilyHandleDeletionStartedListener : public EventListener { + private: + std::vector cfs_; + int counter; + + public: + explicit ColumnFamilyHandleDeletionStartedListener( + const std::vector& cfs) + : cfs_(cfs), counter(0) { + cfs_.insert(cfs_.begin(), kDefaultColumnFamilyName); + } + void OnColumnFamilyHandleDeletionStarted( + ColumnFamilyHandle* handle) override { + ASSERT_EQ(cfs_[handle->GetID()], handle->GetName()); + counter++; + } + int getCounter() { return counter; } +}; + +TEST_F(EventListenerTest, ColumnFamilyHandleDeletionStartedListenerTest) { + std::vector cfs{"pikachu", "eevee", "Mewtwo"}; + auto listener = + std::make_shared(cfs); + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + options.listeners.push_back(listener); + CreateAndReopenWithCF(cfs, options); + ASSERT_EQ(handles_.size(), 4); + delete handles_[3]; + delete handles_[2]; + delete handles_[1]; + handles_.resize(1); + ASSERT_EQ(listener->getCounter(), 3); +} + +class BackgroundErrorListener : public EventListener { + private: + SpecialEnv* env_; + int counter_; + + public: + BackgroundErrorListener(SpecialEnv* env) : env_(env), counter_(0) {} + + void OnBackgroundError(BackgroundErrorReason /*reason*/, + Status* bg_error) override { + if (counter_ == 0) { + // suppress the first error and disable write-dropping such that a retry + // can succeed. + *bg_error = Status::OK(); + env_->drop_writes_.store(false, std::memory_order_release); + env_->SetMockSleep(false); + } + ++counter_; + } + + int counter() { return counter_; } +}; + +TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) { + auto listener = std::make_shared(env_); + Options options; + options.create_if_missing = true; + options.env = env_; + options.listeners.push_back(listener); + options.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); + options.paranoid_checks = true; + DestroyAndReopen(options); + + // the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so + // forge a custom one for the failed flush case. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + env_->drop_writes_.store(true, std::memory_order_release); + env_->SetMockSleep(); + + ASSERT_OK(Put("key0", "val")); + ASSERT_OK(Put("key1", "val")); + TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"); + ASSERT_EQ(1, listener->counter()); + ASSERT_OK(Put("key2", "val")); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); +} + +TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) { + auto listener = std::make_shared(env_); + Options options; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.env = env_; + options.level0_file_num_compaction_trigger = 2; + options.listeners.push_back(listener); + options.memtable_factory.reset(test::NewSpecialSkipListFactory(2)); + options.paranoid_checks = true; + DestroyAndReopen(options); + + // third iteration triggers the second memtable's flush + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("key0", "val")); + if (i > 0) { + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + } + ASSERT_OK(Put("key1", "val")); + } + ASSERT_EQ(2, NumTableFilesAtLevel(0)); + + env_->drop_writes_.store(true, std::memory_order_release); + env_->SetMockSleep(); + ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(1, listener->counter()); + + // trigger flush so compaction is triggered again; this time it succeeds + // The previous failed compaction may get retried automatically, so we may + // be left with 0 or 1 files in level 1, depending on when the retry gets + // scheduled + ASSERT_OK(Put("key0", "val")); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_LE(1, NumTableFilesAtLevel(0)); +} + +class TestFileOperationListener : public EventListener { + public: + TestFileOperationListener() { + file_reads_.store(0); + file_reads_success_.store(0); + file_writes_.store(0); + file_writes_success_.store(0); + file_flushes_.store(0); + file_flushes_success_.store(0); + file_closes_.store(0); + file_closes_success_.store(0); + file_syncs_.store(0); + file_syncs_success_.store(0); + file_truncates_.store(0); + file_truncates_success_.store(0); + file_seq_reads_.store(0); + blob_file_reads_.store(0); + blob_file_writes_.store(0); + blob_file_flushes_.store(0); + blob_file_closes_.store(0); + blob_file_syncs_.store(0); + blob_file_truncates_.store(0); + } + + void OnFileReadFinish(const FileOperationInfo& info) override { + ++file_reads_; + if (info.status.ok()) { + ++file_reads_success_; + } + if (info.path.find("MANIFEST") != std::string::npos) { + ++file_seq_reads_; + } + if (EndsWith(info.path, ".blob")) { + ++blob_file_reads_; + } + ReportDuration(info); + } + + void OnFileWriteFinish(const FileOperationInfo& info) override { + ++file_writes_; + if (info.status.ok()) { + ++file_writes_success_; + } + if (EndsWith(info.path, ".blob")) { + ++blob_file_writes_; + } + ReportDuration(info); + } + + void OnFileFlushFinish(const FileOperationInfo& info) override { + ++file_flushes_; + if (info.status.ok()) { + ++file_flushes_success_; + } + if (EndsWith(info.path, ".blob")) { + ++blob_file_flushes_; + } + ReportDuration(info); + } + + void OnFileCloseFinish(const FileOperationInfo& info) override { + ++file_closes_; + if (info.status.ok()) { + ++file_closes_success_; + } + if (EndsWith(info.path, ".blob")) { + ++blob_file_closes_; + } + ReportDuration(info); + } + + void OnFileSyncFinish(const FileOperationInfo& info) override { + ++file_syncs_; + if (info.status.ok()) { + ++file_syncs_success_; + } + if (EndsWith(info.path, ".blob")) { + ++blob_file_syncs_; + } + ReportDuration(info); + } + + void OnFileTruncateFinish(const FileOperationInfo& info) override { + ++file_truncates_; + if (info.status.ok()) { + ++file_truncates_success_; + } + if (EndsWith(info.path, ".blob")) { + ++blob_file_truncates_; + } + ReportDuration(info); + } + + bool ShouldBeNotifiedOnFileIO() override { return true; } + + std::atomic file_reads_; + std::atomic file_reads_success_; + std::atomic file_writes_; + std::atomic file_writes_success_; + std::atomic file_flushes_; + std::atomic file_flushes_success_; + std::atomic file_closes_; + std::atomic file_closes_success_; + std::atomic file_syncs_; + std::atomic file_syncs_success_; + std::atomic file_truncates_; + std::atomic file_truncates_success_; + std::atomic file_seq_reads_; + std::atomic blob_file_reads_; + std::atomic blob_file_writes_; + std::atomic blob_file_flushes_; + std::atomic blob_file_closes_; + std::atomic blob_file_syncs_; + std::atomic blob_file_truncates_; + + private: + void ReportDuration(const FileOperationInfo& info) const { + ASSERT_GT(info.duration.count(), 0); + } +}; + +TEST_F(EventListenerTest, OnFileOperationTest) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + + TestFileOperationListener* listener = new TestFileOperationListener(); + options.listeners.emplace_back(listener); + + options.use_direct_io_for_flush_and_compaction = false; + Status s = TryReopen(options); + if (s.IsInvalidArgument()) { + options.use_direct_io_for_flush_and_compaction = false; + } else { + ASSERT_OK(s); + } + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "aaa")); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_GE(listener->file_writes_.load(), + listener->file_writes_success_.load()); + ASSERT_GT(listener->file_writes_.load(), 0); + ASSERT_GE(listener->file_flushes_.load(), + listener->file_flushes_success_.load()); + ASSERT_GT(listener->file_flushes_.load(), 0); + Close(); + + Reopen(options); + ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load()); + ASSERT_GT(listener->file_reads_.load(), 0); + ASSERT_GE(listener->file_closes_.load(), + listener->file_closes_success_.load()); + ASSERT_GT(listener->file_closes_.load(), 0); + ASSERT_GE(listener->file_syncs_.load(), listener->file_syncs_success_.load()); + ASSERT_GT(listener->file_syncs_.load(), 0); + if (true == options.use_direct_io_for_flush_and_compaction) { + ASSERT_GE(listener->file_truncates_.load(), + listener->file_truncates_success_.load()); + ASSERT_GT(listener->file_truncates_.load(), 0); + } +} + +TEST_F(EventListenerTest, OnBlobFileOperationTest) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + TestFileOperationListener* listener = new TestFileOperationListener(); + options.listeners.emplace_back(listener); + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + ASSERT_GT(listener->blob_file_writes_.load(), 0U); + ASSERT_GT(listener->blob_file_flushes_.load(), 0U); + Close(); + + Reopen(options); + ASSERT_GT(listener->blob_file_closes_.load(), 0U); + ASSERT_GT(listener->blob_file_syncs_.load(), 0U); + if (true == options.use_direct_io_for_flush_and_compaction) { + ASSERT_GT(listener->blob_file_truncates_.load(), 0U); + } +} + +TEST_F(EventListenerTest, ReadManifestAndWALOnRecovery) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + + TestFileOperationListener* listener = new TestFileOperationListener(); + options.listeners.emplace_back(listener); + + options.use_direct_io_for_flush_and_compaction = false; + Status s = TryReopen(options); + if (s.IsInvalidArgument()) { + options.use_direct_io_for_flush_and_compaction = false; + } else { + ASSERT_OK(s); + } + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "aaa")); + Close(); + + size_t seq_reads = listener->file_seq_reads_.load(); + Reopen(options); + ASSERT_GT(listener->file_seq_reads_.load(), seq_reads); +} + +class BlobDBJobLevelEventListenerTest : public EventListener { + public: + explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test) + : test_(test), call_count_(0) {} + + const VersionStorageInfo* GetVersionStorageInfo() const { + VersionSet* const versions = test_->dbfull()->GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + EXPECT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + EXPECT_NE(current, nullptr); + + const VersionStorageInfo* const storage_info = current->storage_info(); + EXPECT_NE(storage_info, nullptr); + + return storage_info; + } + + void CheckBlobFileAdditions( + const std::vector& blob_file_addition_infos) const { + const auto* vstorage = GetVersionStorageInfo(); + + EXPECT_FALSE(blob_file_addition_infos.empty()); + + for (const auto& blob_file_addition_info : blob_file_addition_infos) { + const auto meta = vstorage->GetBlobFileMetaData( + blob_file_addition_info.blob_file_number); + + EXPECT_NE(meta, nullptr); + EXPECT_EQ(meta->GetBlobFileNumber(), + blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetTotalBlobBytes(), + blob_file_addition_info.total_blob_bytes); + EXPECT_EQ(meta->GetTotalBlobCount(), + blob_file_addition_info.total_blob_count); + EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty()); + } + } + + std::vector GetFlushedFiles() { + std::lock_guard lock(mutex_); + std::vector result; + for (const auto& fname : flushed_files_) { + result.push_back(fname); + } + return result; + } + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + call_count_++; + + { + std::lock_guard lock(mutex_); + flushed_files_.push_back(info.file_path); + } + + EXPECT_EQ(info.blob_compression_type, kNoCompression); + + CheckBlobFileAdditions(info.blob_file_addition_infos); + } + + void OnCompactionCompleted(DB* /*db*/, + const CompactionJobInfo& info) override { + call_count_++; + + EXPECT_EQ(info.blob_compression_type, kNoCompression); + + CheckBlobFileAdditions(info.blob_file_addition_infos); + + EXPECT_FALSE(info.blob_file_garbage_infos.empty()); + + for (const auto& blob_file_garbage_info : info.blob_file_garbage_infos) { + EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U); + EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty()); + } + } + + EventListenerTest* test_; + uint32_t call_count_; + + private: + std::vector flushed_files_; + std::mutex mutex_; +}; + +// Test OnFlushCompleted EventListener called for blob files +TEST_F(EventListenerTest, BlobDBOnFlushCompleted) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + + options.min_blob_size = 0; + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Flush()); + + ASSERT_EQ(Get("Key1"), "blob_value1"); + ASSERT_EQ(Get("Key2"), "blob_value2"); + ASSERT_EQ(Get("Key3"), "blob_value3"); + + ASSERT_GT(blob_event_listener->call_count_, 0U); +} + +// Test OnCompactionCompleted EventListener called for blob files +TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + blob_event_listener->call_count_ = 0; + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + // Make sure, OnCompactionCompleted is called. + ASSERT_GT(blob_event_listener->call_count_, 0U); +} + +// Test CompactFiles calls OnCompactionCompleted EventListener for blob files +// and populate the blob files info. +TEST_F(EventListenerTest, BlobDBCompactFiles) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + std::vector output_file_names; + CompactionJobInfo compaction_job_info; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction + // which will be populated in output_files_names. + ASSERT_OK(dbfull()->CompactFiles( + CompactionOptions(), blob_event_listener->GetFlushedFiles(), 1, -1, + &output_file_names, &compaction_job_info)); + + bool is_blob_in_output = false; + for (const auto& file : output_file_names) { + if (EndsWith(file, ".blob")) { + is_blob_in_output = true; + } + } + ASSERT_TRUE(is_blob_in_output); + + for (const auto& blob_file_addition_info : + compaction_job_info.blob_file_addition_infos) { + EXPECT_GT(blob_file_addition_info.blob_file_number, 0U); + EXPECT_GT(blob_file_addition_info.total_blob_bytes, 0U); + EXPECT_GT(blob_file_addition_info.total_blob_count, 0U); + EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty()); + } + + for (const auto& blob_file_garbage_info : + compaction_job_info.blob_file_garbage_infos) { + EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U); + EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty()); + } +} + +class BlobDBFileLevelEventListener : public EventListener { + public: + void OnBlobFileCreationStarted( + const BlobFileCreationBriefInfo& info) override { + files_started_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.cf_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + } + + void OnBlobFileCreated(const BlobFileCreationInfo& info) override { + files_created_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.cf_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + EXPECT_GT(info.total_blob_count, 0U); + EXPECT_GT(info.total_blob_bytes, 0U); + EXPECT_EQ(info.file_checksum, kUnknownFileChecksum); + EXPECT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName); + EXPECT_TRUE(info.status.ok()); + } + + void OnBlobFileDeleted(const BlobFileDeletionInfo& info) override { + files_deleted_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + EXPECT_TRUE(info.status.ok()); + } + + void CheckCounters() { + EXPECT_EQ(files_started_, files_created_); + EXPECT_GT(files_started_, 0U); + EXPECT_GT(files_deleted_, 0U); + EXPECT_LT(files_deleted_, files_created_); + } + + private: + std::atomic files_started_{}; + std::atomic files_created_{}; + std::atomic files_deleted_{}; +}; + +TEST_F(EventListenerTest, BlobDBFileTest) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + BlobDBFileLevelEventListener* blob_event_listener = + new BlobDBFileLevelEventListener(); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + blob_event_listener->CheckCounters(); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3