summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/compaction/compaction_service_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/compaction/compaction_service_test.cc')
-rw-r--r--src/rocksdb/db/compaction/compaction_service_test.cc966
1 files changed, 966 insertions, 0 deletions
diff --git a/src/rocksdb/db/compaction/compaction_service_test.cc b/src/rocksdb/db/compaction/compaction_service_test.cc
new file mode 100644
index 000000000..c475c4e3b
--- /dev/null
+++ b/src/rocksdb/db/compaction/compaction_service_test.cc
@@ -0,0 +1,966 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "db/db_test_util.h"
+#include "port/stack_trace.h"
+#include "table/unique_id_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class MyTestCompactionService : public CompactionService {
+ public:
+ MyTestCompactionService(
+ std::string db_path, Options& options,
+ std::shared_ptr<Statistics>& statistics,
+ std::vector<std::shared_ptr<EventListener>>& listeners,
+ std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
+ table_properties_collector_factories)
+ : db_path_(std::move(db_path)),
+ options_(options),
+ statistics_(statistics),
+ start_info_("na", "na", "na", 0, Env::TOTAL),
+ wait_info_("na", "na", "na", 0, Env::TOTAL),
+ listeners_(listeners),
+ table_properties_collector_factories_(
+ std::move(table_properties_collector_factories)) {}
+
+ static const char* kClassName() { return "MyTestCompactionService"; }
+
+ const char* Name() const override { return kClassName(); }
+
+ CompactionServiceJobStatus StartV2(
+ const CompactionServiceJobInfo& info,
+ const std::string& compaction_service_input) override {
+ InstrumentedMutexLock l(&mutex_);
+ start_info_ = info;
+ assert(info.db_name == db_path_);
+ jobs_.emplace(info.job_id, compaction_service_input);
+ CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
+ if (is_override_start_status_) {
+ return override_start_status_;
+ }
+ return s;
+ }
+
+ CompactionServiceJobStatus WaitForCompleteV2(
+ const CompactionServiceJobInfo& info,
+ std::string* compaction_service_result) override {
+ std::string compaction_input;
+ assert(info.db_name == db_path_);
+ {
+ InstrumentedMutexLock l(&mutex_);
+ wait_info_ = info;
+ auto i = jobs_.find(info.job_id);
+ if (i == jobs_.end()) {
+ return CompactionServiceJobStatus::kFailure;
+ }
+ compaction_input = std::move(i->second);
+ jobs_.erase(i);
+ }
+
+ if (is_override_wait_status_) {
+ return override_wait_status_;
+ }
+
+ CompactionServiceOptionsOverride options_override;
+ options_override.env = options_.env;
+ options_override.file_checksum_gen_factory =
+ options_.file_checksum_gen_factory;
+ options_override.comparator = options_.comparator;
+ options_override.merge_operator = options_.merge_operator;
+ options_override.compaction_filter = options_.compaction_filter;
+ options_override.compaction_filter_factory =
+ options_.compaction_filter_factory;
+ options_override.prefix_extractor = options_.prefix_extractor;
+ options_override.table_factory = options_.table_factory;
+ options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
+ options_override.statistics = statistics_;
+ if (!listeners_.empty()) {
+ options_override.listeners = listeners_;
+ }
+
+ if (!table_properties_collector_factories_.empty()) {
+ options_override.table_properties_collector_factories =
+ table_properties_collector_factories_;
+ }
+
+ OpenAndCompactOptions options;
+ options.canceled = &canceled_;
+
+ Status s = DB::OpenAndCompact(
+ options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
+ compaction_input, compaction_service_result, options_override);
+ if (is_override_wait_result_) {
+ *compaction_service_result = override_wait_result_;
+ }
+ compaction_num_.fetch_add(1);
+ if (s.ok()) {
+ return CompactionServiceJobStatus::kSuccess;
+ } else {
+ return CompactionServiceJobStatus::kFailure;
+ }
+ }
+
+ int GetCompactionNum() { return compaction_num_.load(); }
+
+ CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
+ CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
+
+ void OverrideStartStatus(CompactionServiceJobStatus s) {
+ is_override_start_status_ = true;
+ override_start_status_ = s;
+ }
+
+ void OverrideWaitStatus(CompactionServiceJobStatus s) {
+ is_override_wait_status_ = true;
+ override_wait_status_ = s;
+ }
+
+ void OverrideWaitResult(std::string str) {
+ is_override_wait_result_ = true;
+ override_wait_result_ = std::move(str);
+ }
+
+ void ResetOverride() {
+ is_override_wait_result_ = false;
+ is_override_start_status_ = false;
+ is_override_wait_status_ = false;
+ }
+
+ void SetCanceled(bool canceled) { canceled_ = canceled; }
+
+ private:
+ InstrumentedMutex mutex_;
+ std::atomic_int compaction_num_{0};
+ std::map<uint64_t, std::string> jobs_;
+ const std::string db_path_;
+ Options options_;
+ std::shared_ptr<Statistics> statistics_;
+ CompactionServiceJobInfo start_info_;
+ CompactionServiceJobInfo wait_info_;
+ bool is_override_start_status_ = false;
+ CompactionServiceJobStatus override_start_status_ =
+ CompactionServiceJobStatus::kFailure;
+ bool is_override_wait_status_ = false;
+ CompactionServiceJobStatus override_wait_status_ =
+ CompactionServiceJobStatus::kFailure;
+ bool is_override_wait_result_ = false;
+ std::string override_wait_result_;
+ std::vector<std::shared_ptr<EventListener>> listeners_;
+ std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
+ table_properties_collector_factories_;
+ std::atomic_bool canceled_{false};
+};
+
+class CompactionServiceTest : public DBTestBase {
+ public:
+ explicit CompactionServiceTest()
+ : DBTestBase("compaction_service_test", true) {}
+
+ protected:
+ void ReopenWithCompactionService(Options* options) {
+ options->env = env_;
+ primary_statistics_ = CreateDBStatistics();
+ options->statistics = primary_statistics_;
+ compactor_statistics_ = CreateDBStatistics();
+
+ compaction_service_ = std::make_shared<MyTestCompactionService>(
+ dbname_, *options, compactor_statistics_, remote_listeners,
+ remote_table_properties_collector_factories);
+ options->compaction_service = compaction_service_;
+ DestroyAndReopen(*options);
+ }
+
+ Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
+
+ Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
+
+ MyTestCompactionService* GetCompactionService() {
+ CompactionService* cs = compaction_service_.get();
+ return static_cast_with_check<MyTestCompactionService>(cs);
+ }
+
+ void GenerateTestData() {
+ // Generate 20 files @ L2
+ for (int i = 0; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ MoveFilesToLevel(2);
+
+ // Generate 10 files @ L1 overlap with all 20 files @ L2
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ MoveFilesToLevel(1);
+ ASSERT_EQ(FilesPerLevel(), "0,10,20");
+ }
+
+ void VerifyTestData() {
+ for (int i = 0; i < 200; i++) {
+ auto result = Get(Key(i));
+ if (i % 2) {
+ ASSERT_EQ(result, "value" + std::to_string(i));
+ } else {
+ ASSERT_EQ(result, "value_new" + std::to_string(i));
+ }
+ }
+ }
+
+ std::vector<std::shared_ptr<EventListener>> remote_listeners;
+ std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
+ remote_table_properties_collector_factories;
+
+ private:
+ std::shared_ptr<Statistics> compactor_statistics_;
+ std::shared_ptr<Statistics> primary_statistics_;
+ std::shared_ptr<CompactionService> compaction_service_;
+};
+
+TEST_F(CompactionServiceTest, BasicCompactions) {
+ Options options = CurrentOptions();
+ ReopenWithCompactionService(&options);
+
+ Statistics* primary_statistics = GetPrimaryStatistics();
+ Statistics* compactor_statistics = GetCompactorStatistics();
+
+ for (int i = 0; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // verify result
+ for (int i = 0; i < 200; i++) {
+ auto result = Get(Key(i));
+ if (i % 2) {
+ ASSERT_EQ(result, "value" + std::to_string(i));
+ } else {
+ ASSERT_EQ(result, "value_new" + std::to_string(i));
+ }
+ }
+ auto my_cs = GetCompactionService();
+ ASSERT_GE(my_cs->GetCompactionNum(), 1);
+
+ // make sure the compaction statistics is only recorded on the remote side
+ ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
+ ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
+ ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
+ // even with remote compaction, primary host still needs to read SST files to
+ // `verify_table()`.
+ ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
+ // all the compaction write happens on the remote side
+ ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
+ compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
+ ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
+ ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
+ primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
+ // compactor is already the remote side, which doesn't have remote
+ ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
+ ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
+ 0);
+
+ // Test failed compaction
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
+ // override job status
+ auto s = static_cast<Status*>(status);
+ *s = Status::Aborted("MyTestCompactionService failed to compact!");
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Status s;
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ s = Put(Key(key_id), "value_new" + std::to_string(key_id));
+ if (s.IsAborted()) {
+ break;
+ }
+ }
+ if (s.IsAborted()) {
+ break;
+ }
+ s = Flush();
+ if (s.IsAborted()) {
+ break;
+ }
+ s = dbfull()->TEST_WaitForCompact();
+ if (s.IsAborted()) {
+ break;
+ }
+ }
+ ASSERT_TRUE(s.IsAborted());
+
+ // Test re-open and successful unique id verification
+ std::atomic_int verify_passed{0};
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlockBasedTable::Open::PassedVerifyUniqueId", [&](void* arg) {
+ // override job status
+ auto id = static_cast<UniqueId64x2*>(arg);
+ assert(*id != kNullUniqueId64x2);
+ verify_passed++;
+ });
+ Reopen(options);
+ ASSERT_GT(verify_passed, 0);
+ Close();
+}
+
+TEST_F(CompactionServiceTest, ManualCompaction) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+ VerifyTestData();
+
+ start_str = Key(120);
+ start = start_str;
+ comp_num = my_cs->GetCompactionNum();
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+ VerifyTestData();
+
+ end_str = Key(92);
+ end = end_str;
+ comp_num = my_cs->GetCompactionNum();
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+ VerifyTestData();
+
+ comp_num = my_cs->GetCompactionNum();
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+ VerifyTestData();
+}
+
+TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+
+ // Test cancel compaction at the beginning
+ my_cs->SetCanceled(true);
+ auto s = db_->CompactRange(CompactRangeOptions(), &start, &end);
+ ASSERT_TRUE(s.IsIncomplete());
+ // compaction number is not increased
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
+ VerifyTestData();
+
+ // Test cancel compaction in progress
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+ my_cs = GetCompactionService();
+ my_cs->SetCanceled(false);
+
+ std::atomic_bool cancel_issued{false};
+ SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
+ [&](void* /*arg*/) {
+ cancel_issued = true;
+ my_cs->SetCanceled(true);
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ s = db_->CompactRange(CompactRangeOptions(), &start, &end);
+ ASSERT_TRUE(s.IsIncomplete());
+ ASSERT_TRUE(cancel_issued);
+ // compaction number is not increased
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
+ VerifyTestData();
+}
+
+TEST_F(CompactionServiceTest, FailedToStart) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+ my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
+ ASSERT_TRUE(s.IsIncomplete());
+}
+
+TEST_F(CompactionServiceTest, InvalidResult) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+
+ GenerateTestData();
+
+ auto my_cs = GetCompactionService();
+ my_cs->OverrideWaitResult("Invalid Str");
+
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
+ ASSERT_FALSE(s.ok());
+}
+
+TEST_F(CompactionServiceTest, SubCompaction) {
+ Options options = CurrentOptions();
+ options.max_subcompactions = 10;
+ options.target_file_size_base = 1 << 10; // 1KB
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+
+ GenerateTestData();
+ VerifyTestData();
+
+ auto my_cs = GetCompactionService();
+ int compaction_num_before = my_cs->GetCompactionNum();
+
+ auto cro = CompactRangeOptions();
+ cro.max_subcompactions = 10;
+ Status s = db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(s);
+ VerifyTestData();
+ int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
+ // make sure there's sub-compaction by checking the compaction number
+ ASSERT_GE(compaction_num, 2);
+}
+
+class PartialDeleteCompactionFilter : public CompactionFilter {
+ public:
+ CompactionFilter::Decision FilterV2(
+ int /*level*/, const Slice& key, ValueType /*value_type*/,
+ const Slice& /*existing_value*/, std::string* /*new_value*/,
+ std::string* /*skip_until*/) const override {
+ int i = std::stoi(key.ToString().substr(3));
+ if (i > 5 && i <= 105) {
+ return CompactionFilter::Decision::kRemove;
+ }
+ return CompactionFilter::Decision::kKeep;
+ }
+
+ const char* Name() const override { return "PartialDeleteCompactionFilter"; }
+};
+
+TEST_F(CompactionServiceTest, CompactionFilter) {
+ Options options = CurrentOptions();
+ std::unique_ptr<CompactionFilter> delete_comp_filter(
+ new PartialDeleteCompactionFilter());
+ options.compaction_filter = delete_comp_filter.get();
+ ReopenWithCompactionService(&options);
+
+ for (int i = 0; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ // verify result
+ for (int i = 0; i < 200; i++) {
+ auto result = Get(Key(i));
+ if (i > 5 && i <= 105) {
+ ASSERT_EQ(result, "NOT_FOUND");
+ } else if (i % 2) {
+ ASSERT_EQ(result, "value" + std::to_string(i));
+ } else {
+ ASSERT_EQ(result, "value_new" + std::to_string(i));
+ }
+ }
+ auto my_cs = GetCompactionService();
+ ASSERT_GE(my_cs->GetCompactionNum(), 1);
+}
+
+TEST_F(CompactionServiceTest, Snapshot) {
+ Options options = CurrentOptions();
+ ReopenWithCompactionService(&options);
+
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value1"));
+ const Snapshot* s1 = db_->GetSnapshot();
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(1), "value2"));
+ ASSERT_OK(Put(Key(3), "value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ auto my_cs = GetCompactionService();
+ ASSERT_GE(my_cs->GetCompactionNum(), 1);
+ ASSERT_EQ("value1", Get(Key(1), s1));
+ ASSERT_EQ("value2", Get(Key(1)));
+ db_->ReleaseSnapshot(s1);
+}
+
+TEST_F(CompactionServiceTest, ConcurrentCompaction) {
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = 100;
+ options.max_background_jobs = 20;
+ ReopenWithCompactionService(&options);
+ GenerateTestData();
+
+ ColumnFamilyMetaData meta;
+ db_->GetColumnFamilyMetaData(&meta);
+
+ std::vector<std::thread> threads;
+ for (const auto& file : meta.levels[1].files) {
+ threads.emplace_back(std::thread([&]() {
+ std::string fname = file.db_path + "/" + file.name;
+ ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
+ }));
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // verify result
+ for (int i = 0; i < 200; i++) {
+ auto result = Get(Key(i));
+ if (i % 2) {
+ ASSERT_EQ(result, "value" + std::to_string(i));
+ } else {
+ ASSERT_EQ(result, "value_new" + std::to_string(i));
+ }
+ }
+ auto my_cs = GetCompactionService();
+ ASSERT_EQ(my_cs->GetCompactionNum(), 10);
+ ASSERT_EQ(FilesPerLevel(), "0,0,10");
+}
+
+TEST_F(CompactionServiceTest, CompactionInfo) {
+ Options options = CurrentOptions();
+ ReopenWithCompactionService(&options);
+
+ for (int i = 0; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ auto my_cs =
+ static_cast_with_check<MyTestCompactionService>(GetCompactionService());
+ uint64_t comp_num = my_cs->GetCompactionNum();
+ ASSERT_GE(comp_num, 1);
+
+ CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
+ ASSERT_EQ(dbname_, info.db_name);
+ std::string db_id, db_session_id;
+ ASSERT_OK(db_->GetDbIdentity(db_id));
+ ASSERT_EQ(db_id, info.db_id);
+ ASSERT_OK(db_->GetDbSessionId(db_session_id));
+ ASSERT_EQ(db_session_id, info.db_session_id);
+ ASSERT_EQ(Env::LOW, info.priority);
+ info = my_cs->GetCompactionInfoForWait();
+ ASSERT_EQ(dbname_, info.db_name);
+ ASSERT_EQ(db_id, info.db_id);
+ ASSERT_EQ(db_session_id, info.db_session_id);
+ ASSERT_EQ(Env::LOW, info.priority);
+
+ // Test priority USER
+ ColumnFamilyMetaData meta;
+ db_->GetColumnFamilyMetaData(&meta);
+ SstFileMetaData file = meta.levels[1].files[0];
+ ASSERT_OK(db_->CompactFiles(CompactionOptions(),
+ {file.db_path + "/" + file.name}, 2));
+ info = my_cs->GetCompactionInfoForStart();
+ ASSERT_EQ(Env::USER, info.priority);
+ info = my_cs->GetCompactionInfoForWait();
+ ASSERT_EQ(Env::USER, info.priority);
+
+ // Test priority BOTTOM
+ env_->SetBackgroundThreads(1, Env::BOTTOM);
+ options.num_levels = 2;
+ ReopenWithCompactionService(&options);
+ my_cs =
+ static_cast_with_check<MyTestCompactionService>(GetCompactionService());
+
+ for (int i = 0; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ info = my_cs->GetCompactionInfoForStart();
+ ASSERT_EQ(Env::BOTTOM, info.priority);
+ info = my_cs->GetCompactionInfoForWait();
+ ASSERT_EQ(Env::BOTTOM, info.priority);
+}
+
+TEST_F(CompactionServiceTest, FallbackLocalAuto) {
+ Options options = CurrentOptions();
+ ReopenWithCompactionService(&options);
+
+ auto my_cs = GetCompactionService();
+ Statistics* compactor_statistics = GetCompactorStatistics();
+ Statistics* primary_statistics = GetPrimaryStatistics();
+ uint64_t compactor_write_bytes =
+ compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
+ uint64_t primary_write_bytes =
+ primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
+
+ my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
+
+ for (int i = 0; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // verify result
+ for (int i = 0; i < 200; i++) {
+ auto result = Get(Key(i));
+ if (i % 2) {
+ ASSERT_EQ(result, "value" + std::to_string(i));
+ } else {
+ ASSERT_EQ(result, "value_new" + std::to_string(i));
+ }
+ }
+
+ ASSERT_EQ(my_cs->GetCompactionNum(), 0);
+
+ // make sure the compaction statistics is only recorded on the local side
+ ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
+ compactor_write_bytes);
+ ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
+ primary_write_bytes);
+ ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
+ ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
+}
+
+TEST_F(CompactionServiceTest, FallbackLocalManual) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ ReopenWithCompactionService(&options);
+
+ GenerateTestData();
+ VerifyTestData();
+
+ auto my_cs = GetCompactionService();
+ Statistics* compactor_statistics = GetCompactorStatistics();
+ Statistics* primary_statistics = GetPrimaryStatistics();
+ uint64_t compactor_write_bytes =
+ compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
+ uint64_t primary_write_bytes =
+ primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
+
+ // re-enable remote compaction
+ my_cs->ResetOverride();
+ std::string start_str = Key(15);
+ std::string end_str = Key(45);
+ Slice start(start_str);
+ Slice end(end_str);
+ uint64_t comp_num = my_cs->GetCompactionNum();
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
+ ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
+ // make sure the compaction statistics is only recorded on the remote side
+ ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
+ compactor_write_bytes);
+ ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
+ compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
+ ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
+ primary_write_bytes);
+
+ // return run local again with API WaitForComplete
+ my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
+ start_str = Key(120);
+ start = start_str;
+ comp_num = my_cs->GetCompactionNum();
+ compactor_write_bytes =
+ compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
+ primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
+ ASSERT_EQ(my_cs->GetCompactionNum(),
+ comp_num); // no remote compaction is run
+ // make sure the compaction statistics is only recorded on the local side
+ ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
+ compactor_write_bytes);
+ ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
+ primary_write_bytes);
+ ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
+ compactor_write_bytes);
+
+ // verify result after 2 manual compactions
+ VerifyTestData();
+}
+
+TEST_F(CompactionServiceTest, RemoteEventListener) {
+ class RemoteEventListenerTest : public EventListener {
+ public:
+ const char* Name() const override { return "RemoteEventListenerTest"; }
+
+ void OnSubcompactionBegin(const SubcompactionJobInfo& info) override {
+ auto result = on_going_compactions.emplace(info.job_id);
+ ASSERT_TRUE(result.second); // make sure there's no duplication
+ compaction_num++;
+ EventListener::OnSubcompactionBegin(info);
+ }
+ void OnSubcompactionCompleted(const SubcompactionJobInfo& info) override {
+ auto num = on_going_compactions.erase(info.job_id);
+ ASSERT_TRUE(num == 1); // make sure the compaction id exists
+ EventListener::OnSubcompactionCompleted(info);
+ }
+ void OnTableFileCreated(const TableFileCreationInfo& info) override {
+ ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
+ file_created++;
+ EventListener::OnTableFileCreated(info);
+ }
+ void OnTableFileCreationStarted(
+ const TableFileCreationBriefInfo& info) override {
+ ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
+ file_creation_started++;
+ EventListener::OnTableFileCreationStarted(info);
+ }
+
+ bool ShouldBeNotifiedOnFileIO() override {
+ file_io_notified++;
+ return EventListener::ShouldBeNotifiedOnFileIO();
+ }
+
+ std::atomic_uint64_t file_io_notified{0};
+ std::atomic_uint64_t file_creation_started{0};
+ std::atomic_uint64_t file_created{0};
+
+ std::set<int> on_going_compactions; // store the job_id
+ std::atomic_uint64_t compaction_num{0};
+ };
+
+ auto listener = new RemoteEventListenerTest();
+ remote_listeners.emplace_back(listener);
+
+ Options options = CurrentOptions();
+ ReopenWithCompactionService(&options);
+
+ for (int i = 0; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // check the events are triggered
+ ASSERT_TRUE(listener->file_io_notified > 0);
+ ASSERT_TRUE(listener->file_creation_started > 0);
+ ASSERT_TRUE(listener->file_created > 0);
+ ASSERT_TRUE(listener->compaction_num > 0);
+ ASSERT_TRUE(listener->on_going_compactions.empty());
+
+ // verify result
+ for (int i = 0; i < 200; i++) {
+ auto result = Get(Key(i));
+ if (i % 2) {
+ ASSERT_EQ(result, "value" + std::to_string(i));
+ } else {
+ ASSERT_EQ(result, "value_new" + std::to_string(i));
+ }
+ }
+}
+
+TEST_F(CompactionServiceTest, TablePropertiesCollector) {
+ const static std::string kUserPropertyName = "TestCount";
+
+ class TablePropertiesCollectorTest : public TablePropertiesCollector {
+ public:
+ Status Finish(UserCollectedProperties* properties) override {
+ *properties = UserCollectedProperties{
+ {kUserPropertyName, std::to_string(count_)},
+ };
+ return Status::OK();
+ }
+
+ UserCollectedProperties GetReadableProperties() const override {
+ return UserCollectedProperties();
+ }
+
+ const char* Name() const override { return "TablePropertiesCollectorTest"; }
+
+ Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
+ EntryType /*type*/, SequenceNumber /*seq*/,
+ uint64_t /*file_size*/) override {
+ count_++;
+ return Status::OK();
+ }
+
+ private:
+ uint32_t count_ = 0;
+ };
+
+ class TablePropertiesCollectorFactoryTest
+ : public TablePropertiesCollectorFactory {
+ public:
+ TablePropertiesCollector* CreateTablePropertiesCollector(
+ TablePropertiesCollectorFactory::Context /*context*/) override {
+ return new TablePropertiesCollectorTest();
+ }
+
+ const char* Name() const override {
+ return "TablePropertiesCollectorFactoryTest";
+ }
+ };
+
+ auto factory = new TablePropertiesCollectorFactoryTest();
+ remote_table_properties_collector_factories.emplace_back(factory);
+
+ const int kNumSst = 3;
+ const int kLevel0Trigger = 4;
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kLevel0Trigger;
+ ReopenWithCompactionService(&options);
+
+ // generate a few SSTs locally which should not have user property
+ for (int i = 0; i < kNumSst; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(Key(i * 10 + j), "value"));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ TablePropertiesCollection fname_to_props;
+ ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
+ for (const auto& file_props : fname_to_props) {
+ auto properties = file_props.second->user_collected_properties;
+ auto it = properties.find(kUserPropertyName);
+ ASSERT_EQ(it, properties.end());
+ }
+
+ // trigger compaction
+ for (int i = kNumSst; i < kLevel0Trigger; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(Key(i * 10 + j), "value"));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+
+ ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
+
+ bool has_user_property = false;
+ for (const auto& file_props : fname_to_props) {
+ auto properties = file_props.second->user_collected_properties;
+ auto it = properties.find(kUserPropertyName);
+ if (it != properties.end()) {
+ has_user_property = true;
+ ASSERT_GT(std::stoi(it->second), 0);
+ }
+ }
+ ASSERT_TRUE(has_user_property);
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ RegisterCustomObjects(argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr,
+ "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+
+#endif // ROCKSDB_LITE