From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../cassandra/cassandra_functional_test.cc | 446 +++++++++++++++++++++ 1 file changed, 446 insertions(+) create mode 100644 src/rocksdb/utilities/cassandra/cassandra_functional_test.cc (limited to 'src/rocksdb/utilities/cassandra/cassandra_functional_test.cc') diff --git a/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc b/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc new file mode 100644 index 000000000..c5be836e8 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc @@ -0,0 +1,446 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include + +#include "db/db_impl/db_impl.h" +#include "rocksdb/convenience.h" +#include "rocksdb/db.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/utilities/object_registry.h" +#include "test_util/testharness.h" +#include "util/cast_util.h" +#include "util/random.h" +#include "utilities/cassandra/cassandra_compaction_filter.h" +#include "utilities/cassandra/merge_operator.h" +#include "utilities/cassandra/test_utils.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { +namespace cassandra { + +// Path to the database on file system +const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test"); + +class CassandraStore { + public: + explicit CassandraStore(std::shared_ptr db) + : db_(db), write_option_(), get_option_() { + assert(db); + } + + bool Append(const std::string& key, const RowValue& val) { + std::string result; + val.Serialize(&result); + Slice valSlice(result.data(), result.size()); + auto s = db_->Merge(write_option_, key, valSlice); + + if (s.ok()) { + return true; + } else { + std::cerr << "ERROR " << s.ToString() << std::endl; + return false; + } + } + + bool Put(const std::string& key, const RowValue& val) { + std::string result; + val.Serialize(&result); + Slice valSlice(result.data(), result.size()); + auto s = db_->Put(write_option_, key, valSlice); + if (s.ok()) { + return true; + } else { + std::cerr << "ERROR " << s.ToString() << std::endl; + return false; + } + } + + Status Flush() { + Status s = dbfull()->TEST_FlushMemTable(); + if (s.ok()) { + s = dbfull()->TEST_WaitForCompact(); + } + return s; + } + + Status Compact() { + return dbfull()->TEST_CompactRange(0, nullptr, nullptr, + db_->DefaultColumnFamily()); + } + + std::tuple Get(const std::string& key) { + std::string result; + auto s = db_->Get(get_option_, key, &result); + + if (s.ok()) { + return std::make_tuple( + true, RowValue::Deserialize(result.data(), result.size())); + } + + if (!s.IsNotFound()) { + std::cerr << "ERROR " << s.ToString() << std::endl; + } + + return std::make_tuple(false, RowValue(0, 0)); + } + + private: + std::shared_ptr db_; + WriteOptions write_option_; + ReadOptions get_option_; + + DBImpl* dbfull() { return static_cast_with_check(db_.get()); } +}; + +class TestCompactionFilterFactory : public CompactionFilterFactory { + public: + explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration, + int32_t gc_grace_period_in_seconds) + : purge_ttl_on_expiration_(purge_ttl_on_expiration), + gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + return std::unique_ptr(new CassandraCompactionFilter( + purge_ttl_on_expiration_, gc_grace_period_in_seconds_)); + } + + const char* Name() const override { return "TestCompactionFilterFactory"; } + + private: + bool purge_ttl_on_expiration_; + int32_t gc_grace_period_in_seconds_; +}; + +// The class for unit-testing +class CassandraFunctionalTest : public testing::Test { + public: + CassandraFunctionalTest() { + EXPECT_OK( + DestroyDB(kDbName, Options())); // Start each test with a fresh DB + } + + std::shared_ptr OpenDb() { + DB* db; + Options options; + options.create_if_missing = true; + options.merge_operator.reset( + new CassandraValueMergeOperator(gc_grace_period_in_seconds_)); + auto* cf_factory = new TestCompactionFilterFactory( + purge_ttl_on_expiration_, gc_grace_period_in_seconds_); + options.compaction_filter_factory.reset(cf_factory); + EXPECT_OK(DB::Open(options, kDbName, &db)); + return std::shared_ptr(db); + } + + bool purge_ttl_on_expiration_ = false; + int32_t gc_grace_period_in_seconds_ = 100; +}; + +// THE TEST CASES BEGIN HERE + +TEST_F(CassandraFunctionalTest, SimpleMergeTest) { + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append( + "k1", + CreateTestRowValue({ + CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)), + CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)), + })); + store.Append( + "k1", + CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)), + CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)), + CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)), + })); + store.Append( + "k1", + CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)), + CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)), + CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)), + })); + + auto ret = store.Get("k1"); + + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.get_columns().size(), 5); + VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 0, + ToMicroSeconds(now + 6)); + VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 1, + ToMicroSeconds(now + 8)); + VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 2, + ToMicroSeconds(now + 7)); + VerifyRowValueColumns(merged.get_columns(), 3, kExpiringColumn, 7, + ToMicroSeconds(now + 17)); + VerifyRowValueColumns(merged.get_columns(), 4, kTombstone, 11, + ToMicroSeconds(now + 11)); +} + +constexpr int64_t kTestTimeoutSecs = 600; + +TEST_F(CassandraFunctionalTest, + CompactionShouldConvertExpiredColumnsToTombstone) { + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append( + "k1", + CreateTestRowValue( + {CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 20)), // expired + CreateTestColumnSpec( + kExpiringColumn, 1, + ToMicroSeconds(now - kTtl + kTestTimeoutSecs)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))})); + + ASSERT_OK(store.Flush()); + + store.Append( + "k1", + CreateTestRowValue( + {CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 10)), // expired + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))})); + + ASSERT_OK(store.Flush()); + ASSERT_OK(store.Compact()); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.get_columns().size(), 4); + VerifyRowValueColumns(merged.get_columns(), 0, kTombstone, 0, + ToMicroSeconds(now - 10)); + VerifyRowValueColumns(merged.get_columns(), 1, kExpiringColumn, 1, + ToMicroSeconds(now - kTtl + kTestTimeoutSecs)); + VerifyRowValueColumns(merged.get_columns(), 2, kColumn, 2, + ToMicroSeconds(now)); + VerifyRowValueColumns(merged.get_columns(), 3, kTombstone, 3, + ToMicroSeconds(now)); +} + +TEST_F(CassandraFunctionalTest, + CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append( + "k1", + CreateTestRowValue( + {CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 20)), // expired + CreateTestColumnSpec(kExpiringColumn, 1, + ToMicroSeconds(now)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))})); + + ASSERT_OK(store.Flush()); + + store.Append( + "k1", + CreateTestRowValue( + {CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 10)), // expired + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))})); + + ASSERT_OK(store.Flush()); + ASSERT_OK(store.Compact()); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.get_columns().size(), 3); + VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 1, + ToMicroSeconds(now)); + VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 2, + ToMicroSeconds(now)); + VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 3, + ToMicroSeconds(now)); +} + +TEST_F(CassandraFunctionalTest, + CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 20)), + CreateTestColumnSpec(kExpiringColumn, 1, + ToMicroSeconds(now - kTtl - 20)), + })); + + ASSERT_OK(store.Flush()); + + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, + ToMicroSeconds(now - kTtl - 10)), + })); + + ASSERT_OK(store.Flush()); + ASSERT_OK(store.Compact()); + ASSERT_FALSE(std::get<0>(store.Get("k1"))); +} + +TEST_F(CassandraFunctionalTest, + CompactionShouldRemoveTombstoneExceedingGCGracePeriod) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", + CreateTestRowValue( + {CreateTestColumnSpec( + kTombstone, 0, + ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))})); + + store.Append("k2", CreateTestRowValue({CreateTestColumnSpec( + kColumn, 0, ToMicroSeconds(now))})); + + ASSERT_OK(store.Flush()); + + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)), + })); + + ASSERT_OK(store.Flush()); + ASSERT_OK(store.Compact()); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& gced = std::get<1>(ret); + EXPECT_EQ(gced.get_columns().size(), 1); + VerifyRowValueColumns(gced.get_columns(), 0, kColumn, 1, ToMicroSeconds(now)); +} + +TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Put("k1", + CreateTestRowValue({ + CreateTestColumnSpec( + kTombstone, 0, + ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), + })); + + ASSERT_OK(store.Flush()); + ASSERT_OK(store.Compact()); + ASSERT_FALSE(std::get<0>(store.Get("k1"))); +} + +#ifndef ROCKSDB_LITE +TEST_F(CassandraFunctionalTest, LoadMergeOperator) { + ConfigOptions config_options; + std::shared_ptr mo; + config_options.ignore_unsupported_options = false; + + ASSERT_NOK(MergeOperator::CreateFromString( + config_options, CassandraValueMergeOperator::kClassName(), &mo)); + + config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects, + "cassandra"); + + ASSERT_OK(MergeOperator::CreateFromString( + config_options, CassandraValueMergeOperator::kClassName(), &mo)); + ASSERT_NE(mo, nullptr); + ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName()); + mo.reset(); + ASSERT_OK(MergeOperator::CreateFromString( + config_options, + std::string("operands_limit=20;gc_grace_period_in_seconds=42;id=") + + CassandraValueMergeOperator::kClassName(), + &mo)); + ASSERT_NE(mo, nullptr); + ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName()); + const auto* opts = mo->GetOptions(); + ASSERT_NE(opts, nullptr); + ASSERT_EQ(opts->gc_grace_period_in_seconds, 42); + ASSERT_EQ(opts->operands_limit, 20); +} + +TEST_F(CassandraFunctionalTest, LoadCompactionFilter) { + ConfigOptions config_options; + const CompactionFilter* filter = nullptr; + config_options.ignore_unsupported_options = false; + + ASSERT_NOK(CompactionFilter::CreateFromString( + config_options, CassandraCompactionFilter::kClassName(), &filter)); + config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects, + "cassandra"); + + ASSERT_OK(CompactionFilter::CreateFromString( + config_options, CassandraCompactionFilter::kClassName(), &filter)); + ASSERT_NE(filter, nullptr); + ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName()); + delete filter; + filter = nullptr; + ASSERT_OK(CompactionFilter::CreateFromString( + config_options, + std::string( + "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") + + CassandraCompactionFilter::kClassName(), + &filter)); + ASSERT_NE(filter, nullptr); + ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName()); + const auto* opts = filter->GetOptions(); + ASSERT_NE(opts, nullptr); + ASSERT_EQ(opts->gc_grace_period_in_seconds, 42); + ASSERT_TRUE(opts->purge_ttl_on_expiration); + delete filter; +} + +TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) { + ConfigOptions config_options; + std::shared_ptr factory; + + config_options.ignore_unsupported_options = false; + ASSERT_NOK(CompactionFilterFactory::CreateFromString( + config_options, CassandraCompactionFilterFactory::kClassName(), + &factory)); + config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects, + "cassandra"); + + ASSERT_OK(CompactionFilterFactory::CreateFromString( + config_options, CassandraCompactionFilterFactory::kClassName(), + &factory)); + ASSERT_NE(factory, nullptr); + ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName()); + factory.reset(); + ASSERT_OK(CompactionFilterFactory::CreateFromString( + config_options, + std::string( + "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") + + CassandraCompactionFilterFactory::kClassName(), + &factory)); + ASSERT_NE(factory, nullptr); + ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName()); + const auto* opts = factory->GetOptions(); + ASSERT_NE(opts, nullptr); + ASSERT_EQ(opts->gc_grace_period_in_seconds, 42); + ASSERT_TRUE(opts->purge_ttl_on_expiration); +} +#endif // ROCKSDB_LITE + +} // namespace cassandra +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3