diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/flush_job_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/flush_job_test.cc | 745 |
1 files changed, 745 insertions, 0 deletions
diff --git a/src/rocksdb/db/flush_job_test.cc b/src/rocksdb/db/flush_job_test.cc new file mode 100644 index 000000000..f994b4e9b --- /dev/null +++ b/src/rocksdb/db/flush_job_test.cc @@ -0,0 +1,745 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/flush_job.h" + +#include <algorithm> +#include <array> +#include <map> +#include <string> + +#include "db/blob/blob_index.h" +#include "db/column_family.h" +#include "db/db_impl/db_impl.h" +#include "db/version_set.h" +#include "file/writable_file_writer.h" +#include "rocksdb/cache.h" +#include "rocksdb/file_system.h" +#include "rocksdb/write_buffer_manager.h" +#include "table/mock_table.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/random.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +// TODO(icanadi) Mock out everything else: +// 1. VersionSet +// 2. Memtable +class FlushJobTestBase : public testing::Test { + protected: + FlushJobTestBase(std::string dbname, const Comparator* ucmp) + : env_(Env::Default()), + fs_(env_->GetFileSystem()), + dbname_(std::move(dbname)), + ucmp_(ucmp), + options_(), + db_options_(options_), + column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), + table_cache_(NewLRUCache(50000, 16)), + write_buffer_manager_(db_options_.db_write_buffer_size), + shutting_down_(false), + mock_table_factory_(new mock::MockTableFactory()) {} + + virtual ~FlushJobTestBase() { + if (getenv("KEEP_DB")) { + fprintf(stdout, "db is still in %s\n", dbname_.c_str()); + } else { + // destroy versions_ to release all file handles + versions_.reset(); + EXPECT_OK(DestroyDir(env_, dbname_)); + } + } + + void NewDB() { + ASSERT_OK(SetIdentityFile(env_, dbname_)); + VersionEdit new_db; + + new_db.SetLogNumber(0); + new_db.SetNextFile(2); + new_db.SetLastSequence(0); + + autovector<VersionEdit> new_cfs; + SequenceNumber last_seq = 1; + uint32_t cf_id = 1; + for (size_t i = 1; i != column_family_names_.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(column_family_names_[i]); + new_cf.SetColumnFamily(cf_id++); + new_cf.SetComparatorName(ucmp_->Name()); + new_cf.SetLogNumber(0); + new_cf.SetNextFile(2); + new_cf.SetLastSequence(last_seq++); + new_cfs.emplace_back(new_cf); + } + + const std::string manifest = DescriptorFileName(dbname_, 1); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr<WritableFileWriter> file_writer; + Status s = WritableFileWriter::Create( + fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, + nullptr); + ASSERT_OK(s); + + { + log::Writer log(std::move(file_writer), 0, false); + std::string record; + new_db.EncodeTo(&record); + s = log.AddRecord(record); + ASSERT_OK(s); + + for (const auto& e : new_cfs) { + record.clear(); + e.EncodeTo(&record); + s = log.AddRecord(record); + ASSERT_OK(s); + } + } + ASSERT_OK(s); + // Make "CURRENT" file that points to the new manifest file. + s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); + ASSERT_OK(s); + } + + void SetUp() override { + EXPECT_OK(env_->CreateDirIfMissing(dbname_)); + + // TODO(icanadi) Remove this once we mock out VersionSet + NewDB(); + + db_options_.env = env_; + db_options_.fs = fs_; + db_options_.db_paths.emplace_back(dbname_, + std::numeric_limits<uint64_t>::max()); + db_options_.statistics = CreateDBStatistics(); + + cf_options_.comparator = ucmp_; + + std::vector<ColumnFamilyDescriptor> column_families; + cf_options_.table_factory = mock_table_factory_; + for (const auto& cf_name : column_family_names_) { + column_families.emplace_back(cf_name, cf_options_); + } + + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "")); + EXPECT_OK(versions_->Recover(column_families, false)); + } + + Env* env_; + std::shared_ptr<FileSystem> fs_; + std::string dbname_; + const Comparator* const ucmp_; + EnvOptions env_options_; + Options options_; + ImmutableDBOptions db_options_; + const std::vector<std::string> column_family_names_; + std::shared_ptr<Cache> table_cache_; + WriteController write_controller_; + WriteBufferManager write_buffer_manager_; + ColumnFamilyOptions cf_options_; + std::unique_ptr<VersionSet> versions_; + InstrumentedMutex mutex_; + std::atomic<bool> shutting_down_; + std::shared_ptr<mock::MockTableFactory> mock_table_factory_; + + SeqnoToTimeMapping empty_seqno_to_time_mapping_; +}; + +class FlushJobTest : public FlushJobTestBase { + public: + FlushJobTest() + : FlushJobTestBase(test::PerThreadDBPath("flush_job_test"), + BytewiseComparator()) {} +}; + +TEST_F(FlushJobTest, Empty) { + JobContext job_context(0); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + std::numeric_limits<uint64_t>::max() /* memtable_id */, + env_options_, versions_.get(), &mutex_, &shutting_down_, + {}, kMaxSequenceNumber, snapshot_checker, &job_context, + nullptr, nullptr, nullptr, kNoCompression, nullptr, + &event_logger, false, true /* sync_output_directory */, + true /* write_manifest */, Env::Priority::USER, + nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); + { + InstrumentedMutexLock l(&mutex_); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run()); + } + job_context.Clean(); +} + +TEST_F(FlushJobTest, NonEmpty) { + JobContext job_context(0); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + new_mem->Ref(); + auto inserted_keys = mock::MakeMockFile(); + // Test data: + // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ] + // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ] + // range-delete "9995" -> "9999" at seqno 10000 + // blob references with seqnos 10001..10006 + for (int i = 1; i < 10000; ++i) { + std::string key(std::to_string((i + 1000) % 10000)); + std::string value("value" + key); + ASSERT_OK(new_mem->Add(SequenceNumber(i), kTypeValue, key, value, + nullptr /* kv_prot_info */)); + if ((i + 1000) % 10000 < 9995) { + InternalKey internal_key(key, SequenceNumber(i), kTypeValue); + inserted_keys.push_back({internal_key.Encode().ToString(), value}); + } + } + + { + ASSERT_OK(new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", + "9999a", nullptr /* kv_prot_info */)); + InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion); + inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"}); + } + + // Note: the first two blob references will not be considered when resolving + // the oldest blob file referenced (the first one is inlined TTL, while the + // second one is TTL and thus points to a TTL blob file). + constexpr std::array<uint64_t, 6> blob_file_numbers{ + {kInvalidBlobFileNumber, 5, 103, 17, 102, 101}}; + for (size_t i = 0; i < blob_file_numbers.size(); ++i) { + std::string key(std::to_string(i + 10001)); + std::string blob_index; + if (i == 0) { + BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 1234567890ULL, + "foo"); + } else if (i == 1) { + BlobIndex::EncodeBlobTTL(&blob_index, /* expiration */ 1234567890ULL, + blob_file_numbers[i], /* offset */ i << 10, + /* size */ i << 20, kNoCompression); + } else { + BlobIndex::EncodeBlob(&blob_index, blob_file_numbers[i], + /* offset */ i << 10, /* size */ i << 20, + kNoCompression); + } + + const SequenceNumber seq(i + 10001); + ASSERT_OK(new_mem->Add(seq, kTypeBlobIndex, key, blob_index, + nullptr /* kv_prot_info */)); + + InternalKey internal_key(key, seq, kTypeBlobIndex); + inserted_keys.push_back({internal_key.Encode().ToString(), blob_index}); + } + mock::SortKVVector(&inserted_keys); + + autovector<MemTable*> to_delete; + new_mem->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(new_mem, &to_delete); + for (auto& m : to_delete) { + delete m; + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), + std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); + + HistogramData hist; + FileMetaData file_meta; + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run(nullptr, &file_meta)); + mutex_.Unlock(); + db_options_.statistics->histogramData(FLUSH_TIME, &hist); + ASSERT_GT(hist.average, 0.0); + + ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString()); + ASSERT_EQ("9999a", file_meta.largest.user_key().ToString()); + ASSERT_EQ(1, file_meta.fd.smallest_seqno); + ASSERT_EQ(10006, file_meta.fd.largest_seqno); + ASSERT_EQ(17, file_meta.oldest_blob_file_number); + mock_table_factory_->AssertSingleFile(inserted_keys); + job_context.Clean(); +} + +TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { + const size_t num_mems = 2; + const size_t num_mems_to_flush = 1; + const size_t num_keys_per_table = 100; + JobContext job_context(0); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + std::vector<uint64_t> memtable_ids; + std::vector<MemTable*> new_mems; + for (size_t i = 0; i != num_mems; ++i) { + MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + mem->SetID(i); + mem->Ref(); + new_mems.emplace_back(mem); + memtable_ids.push_back(mem->GetID()); + + for (size_t j = 0; j < num_keys_per_table; ++j) { + std::string key(std::to_string(j + i * num_keys_per_table)); + std::string value("value" + key); + ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, + key, value, nullptr /* kv_prot_info */)); + } + } + + autovector<MemTable*> to_delete; + for (auto mem : new_mems) { + mem->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(mem, &to_delete); + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + + assert(memtable_ids.size() == num_mems); + uint64_t smallest_memtable_id = memtable_ids.front(); + uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); + HistogramData hist; + FileMetaData file_meta; + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta)); + mutex_.Unlock(); + db_options_.statistics->histogramData(FLUSH_TIME, &hist); + ASSERT_GT(hist.average, 0.0); + + ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString()); + ASSERT_EQ("99", file_meta.largest.user_key().ToString()); + ASSERT_EQ(0, file_meta.fd.smallest_seqno); + ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1), + file_meta.fd.largest_seqno); + ASSERT_EQ(kInvalidBlobFileNumber, file_meta.oldest_blob_file_number); + + for (auto m : to_delete) { + delete m; + } + to_delete.clear(); + job_context.Clean(); +} + +TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { + autovector<ColumnFamilyData*> all_cfds; + for (auto cfd : *versions_->GetColumnFamilySet()) { + all_cfds.push_back(cfd); + } + const std::vector<size_t> num_memtables = {2, 1, 3}; + assert(num_memtables.size() == column_family_names_.size()); + const size_t num_keys_per_memtable = 1000; + JobContext job_context(0); + std::vector<uint64_t> memtable_ids; + std::vector<SequenceNumber> smallest_seqs; + std::vector<SequenceNumber> largest_seqs; + autovector<MemTable*> to_delete; + SequenceNumber curr_seqno = 0; + size_t k = 0; + for (auto cfd : all_cfds) { + smallest_seqs.push_back(curr_seqno); + for (size_t i = 0; i != num_memtables[k]; ++i) { + MemTable* mem = cfd->ConstructNewMemtable( + *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); + mem->SetID(i); + mem->Ref(); + + for (size_t j = 0; j != num_keys_per_memtable; ++j) { + std::string key(std::to_string(j + i * num_keys_per_memtable)); + std::string value("value" + key); + ASSERT_OK(mem->Add(curr_seqno++, kTypeValue, key, value, + nullptr /* kv_prot_info */)); + } + mem->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(mem, &to_delete); + } + largest_seqs.push_back(curr_seqno - 1); + memtable_ids.push_back(num_memtables[k++] - 1); + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relevant + std::vector<std::unique_ptr<FlushJob>> flush_jobs; + k = 0; + for (auto cfd : all_cfds) { + std::vector<SequenceNumber> snapshot_seqs; + flush_jobs.emplace_back(new FlushJob( + dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), + memtable_ids[k], env_options_, versions_.get(), &mutex_, + &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, + &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + false /* sync_output_directory */, false /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, + empty_seqno_to_time_mapping_)); + k++; + } + HistogramData hist; + std::vector<FileMetaData> file_metas; + // Call reserve to avoid auto-resizing + file_metas.reserve(flush_jobs.size()); + mutex_.Lock(); + for (auto& job : flush_jobs) { + job->PickMemTable(); + } + for (auto& job : flush_jobs) { + FileMetaData meta; + // Run will release and re-acquire mutex + ASSERT_OK(job->Run(nullptr /**/, &meta)); + file_metas.emplace_back(meta); + } + autovector<FileMetaData*> file_meta_ptrs; + for (auto& meta : file_metas) { + file_meta_ptrs.push_back(&meta); + } + autovector<const autovector<MemTable*>*> mems_list; + for (size_t i = 0; i != all_cfds.size(); ++i) { + const auto& mems = flush_jobs[i]->GetMemTables(); + mems_list.push_back(&mems); + } + autovector<const MutableCFOptions*> mutable_cf_options_list; + for (auto cfd : all_cfds) { + mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); + } + autovector<std::list<std::unique_ptr<FlushJobInfo>>*> + committed_flush_jobs_info; +#ifndef ROCKSDB_LITE + for (auto& job : flush_jobs) { + committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo()); + } +#endif //! ROCKSDB_LITE + + Status s = InstallMemtableAtomicFlushResults( + nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, + versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs, + committed_flush_jobs_info, &job_context.memtables_to_free, + nullptr /* db_directory */, nullptr /* log_buffer */); + ASSERT_OK(s); + + mutex_.Unlock(); + db_options_.statistics->histogramData(FLUSH_TIME, &hist); + ASSERT_GT(hist.average, 0.0); + k = 0; + for (const auto& file_meta : file_metas) { + ASSERT_EQ(std::to_string(0), file_meta.smallest.user_key().ToString()); + ASSERT_EQ("999", file_meta.largest.user_key() + .ToString()); // max key by bytewise comparator + ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno); + ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno); + // Verify that imm is empty + ASSERT_EQ(std::numeric_limits<uint64_t>::max(), + all_cfds[k]->imm()->GetEarliestMemTableID()); + ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID()); + ++k; + } + + for (auto m : to_delete) { + delete m; + } + to_delete.clear(); + job_context.Clean(); +} + +TEST_F(FlushJobTest, Snapshots) { + JobContext job_context(0); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + + std::set<SequenceNumber> snapshots_set; + int keys = 10000; + int max_inserts_per_keys = 8; + + Random rnd(301); + for (int i = 0; i < keys / 2; ++i) { + snapshots_set.insert(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1); + } + // set has already removed the duplicate snapshots + std::vector<SequenceNumber> snapshots(snapshots_set.begin(), + snapshots_set.end()); + + new_mem->Ref(); + SequenceNumber current_seqno = 0; + auto inserted_keys = mock::MakeMockFile(); + for (int i = 1; i < keys; ++i) { + std::string key(std::to_string(i)); + int insertions = rnd.Uniform(max_inserts_per_keys); + for (int j = 0; j < insertions; ++j) { + std::string value(rnd.HumanReadableString(10)); + auto seqno = ++current_seqno; + ASSERT_OK(new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value, + nullptr /* kv_prot_info */)); + // a key is visible only if: + // 1. it's the last one written (j == insertions - 1) + // 2. there's a snapshot pointing at it + bool visible = (j == insertions - 1) || + (snapshots_set.find(seqno) != snapshots_set.end()); + if (visible) { + InternalKey internal_key(key, seqno, kTypeValue); + inserted_keys.push_back({internal_key.Encode().ToString(), value}); + } + } + } + mock::SortKVVector(&inserted_keys); + + autovector<MemTable*> to_delete; + new_mem->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(new_mem, &to_delete); + for (auto& m : to_delete) { + delete m; + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), + std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_, + versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run()); + mutex_.Unlock(); + mock_table_factory_->AssertSingleFile(inserted_keys); + HistogramData hist; + db_options_.statistics->histogramData(FLUSH_TIME, &hist); + ASSERT_GT(hist.average, 0.0); + job_context.Clean(); +} + +TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { + // Prepare a FlushJob that flush MemTables of Single Column Family. + const size_t num_mems = 2; + const size_t num_mems_to_flush = 1; + const size_t num_keys_per_table = 100; + JobContext job_context(0); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + std::vector<uint64_t> memtable_ids; + std::vector<MemTable*> new_mems; + for (size_t i = 0; i != num_mems; ++i) { + MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + mem->SetID(i); + mem->Ref(); + new_mems.emplace_back(mem); + memtable_ids.push_back(mem->GetID()); + + for (size_t j = 0; j < num_keys_per_table; ++j) { + std::string key(std::to_string(j + i * num_keys_per_table)); + std::string value("value" + key); + ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, + key, value, nullptr /* kv_prot_info */)); + } + } + + autovector<MemTable*> to_delete; + for (auto mem : new_mems) { + mem->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(mem, &to_delete); + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + + assert(memtable_ids.size() == num_mems); + uint64_t smallest_memtable_id = memtable_ids.front(); + uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); + + // When the state from WriteController is normal. + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_HIGH); + + WriteController* write_controller = + flush_job.versions_->GetColumnFamilySet()->write_controller(); + + { + // When the state from WriteController is Delayed. + std::unique_ptr<WriteControllerToken> delay_token = + write_controller->GetDelayToken(1000000); + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); + } + + { + // When the state from WriteController is Stopped. + std::unique_ptr<WriteControllerToken> stop_token = + write_controller->GetStopToken(); + ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); + } +} + +class FlushJobTimestampTest : public FlushJobTestBase { + public: + FlushJobTimestampTest() + : FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"), + test::BytewiseComparatorWithU64TsWrapper()) {} + + void AddKeyValueToMemtable(MemTable* memtable, std::string key, uint64_t ts, + SequenceNumber seq, ValueType value_type, + Slice value) { + std::string key_str(std::move(key)); + PutFixed64(&key_str, ts); + ASSERT_OK(memtable->Add(seq, value_type, key_str, value, + nullptr /* kv_prot_info */)); + } + + protected: + static constexpr uint64_t kStartTs = 10; + static constexpr SequenceNumber kStartSeq = 0; + SequenceNumber curr_seq_{kStartSeq}; + std::atomic<uint64_t> curr_ts_{kStartTs}; +}; + +TEST_F(FlushJobTimestampTest, AllKeysExpired) { + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + autovector<MemTable*> to_delete; + + { + MemTable* new_mem = cfd->ConstructNewMemtable( + *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); + new_mem->Ref(); + for (int i = 0; i < 100; ++i) { + uint64_t ts = curr_ts_.fetch_add(1); + SequenceNumber seq = (curr_seq_++); + AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, + ValueType::kTypeValue, "0_value"); + } + uint64_t ts = curr_ts_.fetch_add(1); + SequenceNumber seq = (curr_seq_++); + AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, + ValueType::kTypeDeletionWithTimestamp, ""); + new_mem->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(new_mem, &to_delete); + } + + std::vector<SequenceNumber> snapshots; + constexpr SnapshotChecker* const snapshot_checker = nullptr; + JobContext job_context(0); + EventLogger event_logger(db_options_.info_log.get()); + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max()); + FlushJob flush_job( + dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), + std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_, + versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_, + /*db_id=*/"", + /*db_session_id=*/"", full_history_ts_low); + + FileMetaData fmeta; + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta)); + mutex_.Unlock(); + + { + std::string key = test::EncodeInt(0); + key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1)); + InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp); + ASSERT_EQ(ikey.Encode(), fmeta.smallest.Encode()); + ASSERT_EQ(ikey.Encode(), fmeta.largest.Encode()); + } + + job_context.Clean(); + ASSERT_TRUE(to_delete.empty()); +} + +TEST_F(FlushJobTimestampTest, NoKeyExpired) { + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + autovector<MemTable*> to_delete; + + { + MemTable* new_mem = cfd->ConstructNewMemtable( + *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); + new_mem->Ref(); + for (int i = 0; i < 100; ++i) { + uint64_t ts = curr_ts_.fetch_add(1); + SequenceNumber seq = (curr_seq_++); + AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, + ValueType::kTypeValue, "0_value"); + } + new_mem->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(new_mem, &to_delete); + } + + std::vector<SequenceNumber> snapshots; + SnapshotChecker* const snapshot_checker = nullptr; + JobContext job_context(0); + EventLogger event_logger(db_options_.info_log.get()); + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 0); + FlushJob flush_job( + dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), + std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_, + versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_, + /*db_id=*/"", + /*db_session_id=*/"", full_history_ts_low); + + FileMetaData fmeta; + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta)); + mutex_.Unlock(); + + { + std::string ukey = test::EncodeInt(0); + std::string smallest_key = + ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1); + std::string largest_key = ukey + test::EncodeInt(kStartTs); + InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue); + InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue); + ASSERT_EQ(smallest.Encode(), fmeta.smallest.Encode()); + ASSERT_EQ(largest.Encode(), fmeta.largest.Encode()); + } + job_context.Clean(); + ASSERT_TRUE(to_delete.empty()); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |