diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc | 48 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h | 42 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/cassandra_format_test.cc | 367 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/cassandra_functional_test.cc | 311 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc | 112 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc | 188 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/format.cc | 386 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/format.h | 197 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/merge_operator.cc | 67 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/merge_operator.h | 44 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/serialize.h | 75 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/test_utils.cc | 75 | ||||
-rw-r--r-- | src/rocksdb/utilities/cassandra/test_utils.h | 46 |
13 files changed, 1958 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc new file mode 100644 index 00000000..1b99d3a8 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc @@ -0,0 +1,48 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/cassandra/cassandra_compaction_filter.h" +#include <string> +#include "rocksdb/slice.h" +#include "utilities/cassandra/format.h" + + +namespace rocksdb { +namespace cassandra { + +const char* CassandraCompactionFilter::Name() const { + return "CassandraCompactionFilter"; +} + +CompactionFilter::Decision CassandraCompactionFilter::FilterV2( + int /*level*/, const Slice& /*key*/, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* /*skip_until*/) const { + bool value_changed = false; + RowValue row_value = RowValue::Deserialize( + existing_value.data(), existing_value.size()); + RowValue compacted = + purge_ttl_on_expiration_ + ? row_value.RemoveExpiredColumns(&value_changed) + : row_value.ConvertExpiredColumnsToTombstones(&value_changed); + + if (value_type == ValueType::kValue) { + compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_); + } + + if(compacted.Empty()) { + return Decision::kRemove; + } + + if (value_changed) { + compacted.Serialize(new_value); + return Decision::kChangeValue; + } + + return Decision::kKeep; +} + +} // namespace cassandra +} // namespace rocksdb diff --git a/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h new file mode 100644 index 00000000..4ee2445a --- /dev/null +++ b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h @@ -0,0 +1,42 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include <string> +#include "rocksdb/compaction_filter.h" +#include "rocksdb/slice.h" + +namespace rocksdb { +namespace cassandra { + +/** + * Compaction filter for removing expired Cassandra data with ttl. + * If option `purge_ttl_on_expiration` is set to true, expired data + * will be directly purged. Otherwise expired data will be converted + * tombstones first, then be eventally removed after gc grace period. + * `purge_ttl_on_expiration` should only be on in the case all the + * writes have same ttl setting, otherwise it could bring old data back. + * + * Compaction filter is also in charge of removing tombstone that has been + * promoted to kValue type after serials of merging in compaction. + */ +class CassandraCompactionFilter : public CompactionFilter { +public: + explicit CassandraCompactionFilter(bool purge_ttl_on_expiration, + int32_t gc_grace_period_in_seconds) + : purge_ttl_on_expiration_(purge_ttl_on_expiration), + gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} + + const char* Name() const override; + virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* skip_until) const override; + +private: + bool purge_ttl_on_expiration_; + int32_t gc_grace_period_in_seconds_; +}; +} // namespace cassandra +} // namespace rocksdb diff --git a/src/rocksdb/utilities/cassandra/cassandra_format_test.cc b/src/rocksdb/utilities/cassandra/cassandra_format_test.cc new file mode 100644 index 00000000..8f9baa72 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/cassandra_format_test.cc @@ -0,0 +1,367 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <cstring> +#include <memory> +#include "util/testharness.h" +#include "utilities/cassandra/format.h" +#include "utilities/cassandra/serialize.h" +#include "utilities/cassandra/test_utils.h" + +using namespace rocksdb::cassandra; + +namespace rocksdb { +namespace cassandra { + +TEST(ColumnTest, Column) { + char data[4] = {'d', 'a', 't', 'a'}; + int8_t mask = 0; + int8_t index = 1; + int64_t timestamp = 1494022807044; + Column c = Column(mask, index, timestamp, sizeof(data), data); + + EXPECT_EQ(c.Index(), index); + EXPECT_EQ(c.Timestamp(), timestamp); + EXPECT_EQ(c.Size(), 14 + sizeof(data)); + + // Verify the serialization. + std::string dest; + dest.reserve(c.Size() * 2); + c.Serialize(&dest); + + EXPECT_EQ(dest.size(), c.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), timestamp); + offset += sizeof(int64_t); + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(data)); + offset += sizeof(int32_t); + EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0); + + // Verify the deserialization. + std::string saved_dest = dest; + std::shared_ptr<Column> c1 = Column::Deserialize(saved_dest.c_str(), 0); + EXPECT_EQ(c1->Index(), index); + EXPECT_EQ(c1->Timestamp(), timestamp); + EXPECT_EQ(c1->Size(), 14 + sizeof(data)); + + c1->Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0); + + // Verify the ColumnBase::Deserialization. + saved_dest = dest; + std::shared_ptr<ColumnBase> c2 = + ColumnBase::Deserialize(saved_dest.c_str(), c.Size()); + c2->Serialize(&dest); + EXPECT_EQ(dest.size(), 3 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size()) + == 0); +} + +TEST(ExpiringColumnTest, ExpiringColumn) { + char data[4] = {'d', 'a', 't', 'a'}; + int8_t mask = ColumnTypeMask::EXPIRATION_MASK; + int8_t index = 3; + int64_t timestamp = 1494022807044; + int32_t ttl = 3600; + ExpiringColumn c = ExpiringColumn(mask, index, timestamp, + sizeof(data), data, ttl); + + EXPECT_EQ(c.Index(), index); + EXPECT_EQ(c.Timestamp(), timestamp); + EXPECT_EQ(c.Size(), 18 + sizeof(data)); + + // Verify the serialization. + std::string dest; + dest.reserve(c.Size() * 2); + c.Serialize(&dest); + + EXPECT_EQ(dest.size(), c.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), timestamp); + offset += sizeof(int64_t); + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(data)); + offset += sizeof(int32_t); + EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0); + offset += sizeof(data); + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), ttl); + + // Verify the deserialization. + std::string saved_dest = dest; + std::shared_ptr<ExpiringColumn> c1 = + ExpiringColumn::Deserialize(saved_dest.c_str(), 0); + EXPECT_EQ(c1->Index(), index); + EXPECT_EQ(c1->Timestamp(), timestamp); + EXPECT_EQ(c1->Size(), 18 + sizeof(data)); + + c1->Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0); + + // Verify the ColumnBase::Deserialization. + saved_dest = dest; + std::shared_ptr<ColumnBase> c2 = + ColumnBase::Deserialize(saved_dest.c_str(), c.Size()); + c2->Serialize(&dest); + EXPECT_EQ(dest.size(), 3 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size()) + == 0); +} + +TEST(TombstoneTest, TombstoneCollectable) { + int32_t now = (int32_t)time(nullptr); + int32_t gc_grace_seconds = 16440; + int32_t time_delta_seconds = 10; + EXPECT_TRUE(Tombstone(ColumnTypeMask::DELETION_MASK, 0, + now - gc_grace_seconds - time_delta_seconds, + ToMicroSeconds(now - gc_grace_seconds - time_delta_seconds)) + .Collectable(gc_grace_seconds)); + EXPECT_FALSE(Tombstone(ColumnTypeMask::DELETION_MASK, 0, + now - gc_grace_seconds + time_delta_seconds, + ToMicroSeconds(now - gc_grace_seconds + time_delta_seconds)) + .Collectable(gc_grace_seconds)); +} + +TEST(TombstoneTest, Tombstone) { + int8_t mask = ColumnTypeMask::DELETION_MASK; + int8_t index = 2; + int32_t local_deletion_time = 1494022807; + int64_t marked_for_delete_at = 1494022807044; + Tombstone c = Tombstone(mask, index, local_deletion_time, + marked_for_delete_at); + + EXPECT_EQ(c.Index(), index); + EXPECT_EQ(c.Timestamp(), marked_for_delete_at); + EXPECT_EQ(c.Size(), 14); + + // Verify the serialization. + std::string dest; + dest.reserve(c.Size() * 2); + c.Serialize(&dest); + + EXPECT_EQ(dest.size(), c.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), local_deletion_time); + offset += sizeof(int32_t); + EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at); + + // Verify the deserialization. + std::shared_ptr<Tombstone> c1 = Tombstone::Deserialize(dest.c_str(), 0); + EXPECT_EQ(c1->Index(), index); + EXPECT_EQ(c1->Timestamp(), marked_for_delete_at); + EXPECT_EQ(c1->Size(), 14); + + c1->Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0); + + // Verify the ColumnBase::Deserialization. + std::shared_ptr<ColumnBase> c2 = + ColumnBase::Deserialize(dest.c_str(), c.Size()); + c2->Serialize(&dest); + EXPECT_EQ(dest.size(), 3 * c.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size()) + == 0); +} + +TEST(RowValueTest, RowTombstone) { + int32_t local_deletion_time = 1494022807; + int64_t marked_for_delete_at = 1494022807044; + RowValue r = RowValue(local_deletion_time, marked_for_delete_at); + + EXPECT_EQ(r.Size(), 12); + EXPECT_EQ(r.IsTombstone(), true); + EXPECT_EQ(r.LastModifiedTime(), marked_for_delete_at); + + // Verify the serialization. + std::string dest; + dest.reserve(r.Size() * 2); + r.Serialize(&dest); + + EXPECT_EQ(dest.size(), r.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), local_deletion_time); + offset += sizeof(int32_t); + EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at); + + // Verify the deserialization. + RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size()); + EXPECT_EQ(r1.Size(), 12); + EXPECT_EQ(r1.IsTombstone(), true); + EXPECT_EQ(r1.LastModifiedTime(), marked_for_delete_at); + + r1.Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * r.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0); +} + +TEST(RowValueTest, RowWithColumns) { + std::vector<std::shared_ptr<ColumnBase>> columns; + int64_t last_modified_time = 1494022807048; + std::size_t columns_data_size = 0; + + char e_data[5] = {'e', 'd', 'a', 't', 'a'}; + int8_t e_index = 0; + int64_t e_timestamp = 1494022807044; + int32_t e_ttl = 3600; + columns.push_back(std::shared_ptr<ExpiringColumn>( + new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index, + e_timestamp, sizeof(e_data), e_data, e_ttl))); + columns_data_size += columns[0]->Size(); + + char c_data[4] = {'d', 'a', 't', 'a'}; + int8_t c_index = 1; + int64_t c_timestamp = 1494022807048; + columns.push_back(std::shared_ptr<Column>( + new Column(0, c_index, c_timestamp, sizeof(c_data), c_data))); + columns_data_size += columns[1]->Size(); + + int8_t t_index = 2; + int32_t t_local_deletion_time = 1494022801; + int64_t t_marked_for_delete_at = 1494022807043; + columns.push_back(std::shared_ptr<Tombstone>( + new Tombstone(ColumnTypeMask::DELETION_MASK, + t_index, t_local_deletion_time, t_marked_for_delete_at))); + columns_data_size += columns[2]->Size(); + + RowValue r = RowValue(std::move(columns), last_modified_time); + + EXPECT_EQ(r.Size(), columns_data_size + 12); + EXPECT_EQ(r.IsTombstone(), false); + EXPECT_EQ(r.LastModifiedTime(), last_modified_time); + + // Verify the serialization. + std::string dest; + dest.reserve(r.Size() * 2); + r.Serialize(&dest); + + EXPECT_EQ(dest.size(), r.Size()); + std::size_t offset = 0; + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), + std::numeric_limits<int32_t>::max()); + offset += sizeof(int32_t); + EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), + std::numeric_limits<int64_t>::min()); + offset += sizeof(int64_t); + + // Column0: ExpiringColumn + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), + ColumnTypeMask::EXPIRATION_MASK); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), e_index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), e_timestamp); + offset += sizeof(int64_t); + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(e_data)); + offset += sizeof(int32_t); + EXPECT_TRUE(std::memcmp(e_data, dest.c_str() + offset, sizeof(e_data)) == 0); + offset += sizeof(e_data); + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), e_ttl); + offset += sizeof(int32_t); + + // Column1: Column + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), 0); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), c_index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), c_timestamp); + offset += sizeof(int64_t); + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(c_data)); + offset += sizeof(int32_t); + EXPECT_TRUE(std::memcmp(c_data, dest.c_str() + offset, sizeof(c_data)) == 0); + offset += sizeof(c_data); + + // Column2: Tombstone + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), + ColumnTypeMask::DELETION_MASK); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), t_index); + offset += sizeof(int8_t); + EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), t_local_deletion_time); + offset += sizeof(int32_t); + EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), t_marked_for_delete_at); + + // Verify the deserialization. + RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size()); + EXPECT_EQ(r1.Size(), columns_data_size + 12); + EXPECT_EQ(r1.IsTombstone(), false); + EXPECT_EQ(r1.LastModifiedTime(), last_modified_time); + + r1.Serialize(&dest); + EXPECT_EQ(dest.size(), 2 * r.Size()); + EXPECT_TRUE( + std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0); +} + +TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) { + int64_t now = time(nullptr); + + auto row_value = CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)), + CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired + CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) + }); + + bool changed = false; + auto purged = row_value.RemoveExpiredColumns(&changed); + EXPECT_TRUE(changed); + EXPECT_EQ(purged.columns_.size(), 3); + VerifyRowValueColumns(purged.columns_, 0, kColumn, 0, ToMicroSeconds(now)); + VerifyRowValueColumns(purged.columns_, 1, kExpiringColumn, 2, ToMicroSeconds(now)); + VerifyRowValueColumns(purged.columns_, 2, kTombstone, 3, ToMicroSeconds(now)); + + purged.RemoveExpiredColumns(&changed); + EXPECT_FALSE(changed); +} + +TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) { + int64_t now = time(nullptr); + + auto row_value = CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)), + CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired + CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) + }); + + bool changed = false; + auto compacted = row_value.ConvertExpiredColumnsToTombstones(&changed); + EXPECT_TRUE(changed); + EXPECT_EQ(compacted.columns_.size(), 4); + VerifyRowValueColumns(compacted.columns_, 0, kColumn, 0, ToMicroSeconds(now)); + VerifyRowValueColumns(compacted.columns_, 1, kTombstone, 1, ToMicroSeconds(now - 10)); + VerifyRowValueColumns(compacted.columns_, 2, kExpiringColumn, 2, ToMicroSeconds(now)); + VerifyRowValueColumns(compacted.columns_, 3, kTombstone, 3, ToMicroSeconds(now)); + + compacted.ConvertExpiredColumnsToTombstones(&changed); + EXPECT_FALSE(changed); +} +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc b/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc new file mode 100644 index 00000000..dacc6f03 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc @@ -0,0 +1,311 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <iostream> +#include "rocksdb/db.h" +#include "db/db_impl.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/utilities/db_ttl.h" +#include "util/testharness.h" +#include "util/random.h" +#include "utilities/merge_operators.h" +#include "utilities/cassandra/cassandra_compaction_filter.h" +#include "utilities/cassandra/merge_operator.h" +#include "utilities/cassandra/test_utils.h" + +using namespace rocksdb; + +namespace rocksdb { +namespace cassandra { + +// Path to the database on file system +const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test"); + +class CassandraStore { + public: + explicit CassandraStore(std::shared_ptr<DB> db) + : db_(db), write_option_(), get_option_() { + assert(db); + } + + bool Append(const std::string& key, const RowValue& val){ + std::string result; + val.Serialize(&result); + Slice valSlice(result.data(), result.size()); + auto s = db_->Merge(write_option_, key, valSlice); + + if (s.ok()) { + return true; + } else { + std::cerr << "ERROR " << s.ToString() << std::endl; + return false; + } + } + + bool Put(const std::string& key, const RowValue& val) { + std::string result; + val.Serialize(&result); + Slice valSlice(result.data(), result.size()); + auto s = db_->Put(write_option_, key, valSlice); + if (s.ok()) { + return true; + } else { + std::cerr << "ERROR " << s.ToString() << std::endl; + return false; + } + } + + void Flush() { + dbfull()->TEST_FlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + + void Compact() { + dbfull()->TEST_CompactRange( + 0, nullptr, nullptr, db_->DefaultColumnFamily()); + } + + std::tuple<bool, RowValue> Get(const std::string& key){ + std::string result; + auto s = db_->Get(get_option_, key, &result); + + if (s.ok()) { + return std::make_tuple(true, + RowValue::Deserialize(result.data(), + result.size())); + } + + if (!s.IsNotFound()) { + std::cerr << "ERROR " << s.ToString() << std::endl; + } + + return std::make_tuple(false, RowValue(0, 0)); + } + + private: + std::shared_ptr<DB> db_; + WriteOptions write_option_; + ReadOptions get_option_; + + DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); } +}; + +class TestCompactionFilterFactory : public CompactionFilterFactory { +public: + explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration, + int32_t gc_grace_period_in_seconds) + : purge_ttl_on_expiration_(purge_ttl_on_expiration), + gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} + + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override { + return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter( + purge_ttl_on_expiration_, gc_grace_period_in_seconds_)); + } + + const char* Name() const override { return "TestCompactionFilterFactory"; } + +private: + bool purge_ttl_on_expiration_; + int32_t gc_grace_period_in_seconds_; +}; + + +// The class for unit-testing +class CassandraFunctionalTest : public testing::Test { +public: + CassandraFunctionalTest() { + DestroyDB(kDbName, Options()); // Start each test with a fresh DB + } + + std::shared_ptr<DB> OpenDb() { + DB* db; + Options options; + options.create_if_missing = true; + options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_)); + auto* cf_factory = new TestCompactionFilterFactory( + purge_ttl_on_expiration_, gc_grace_period_in_seconds_); + options.compaction_filter_factory.reset(cf_factory); + EXPECT_OK(DB::Open(options, kDbName, &db)); + return std::shared_ptr<DB>(db); + } + + bool purge_ttl_on_expiration_ = false; + int32_t gc_grace_period_in_seconds_ = 100; +}; + +// THE TEST CASES BEGIN HERE + +TEST_F(CassandraFunctionalTest, SimpleMergeTest) { + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)), + CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)), + })); + store.Append("k1",CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)), + CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)), + CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)), + })); + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)), + CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)), + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)), + CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)), + })); + + auto ret = store.Get("k1"); + + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.columns_.size(), 5); + VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6)); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8)); + VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7)); + VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17)); + VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11)); +} + +TEST_F(CassandraFunctionalTest, + CompactionShouldConvertExpiredColumnsToTombstone) { + CassandraStore store(OpenDb()); + int64_t now= time(nullptr); + + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired + CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) + })); + + store.Flush(); + + store.Append("k1",CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now)) + })); + + store.Flush(); + store.Compact(); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.columns_.size(), 4); + VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10)); + VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)); + VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now)); + VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now)); +} + + +TEST_F(CassandraFunctionalTest, + CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired + CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired + CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now)) + })); + + store.Flush(); + + store.Append("k1",CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired + CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now)) + })); + + store.Flush(); + store.Compact(); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.columns_.size(), 3); + VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now)); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now)); + VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now)); +} + +TEST_F(CassandraFunctionalTest, + CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), + CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)), + })); + + store.Flush(); + + store.Append("k1",CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), + })); + + store.Flush(); + store.Compact(); + ASSERT_FALSE(std::get<0>(store.Get("k1"))); +} + +TEST_F(CassandraFunctionalTest, + CompactionShouldRemoveTombstoneExceedingGCGracePeriod) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", CreateTestRowValue({ + CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)) + })); + + store.Append("k2", CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)) + })); + + store.Flush(); + + store.Append("k1",CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)), + })); + + store.Flush(); + store.Compact(); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& gced = std::get<1>(ret); + EXPECT_EQ(gced.columns_.size(), 1); + VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now)); +} + +TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Put("k1", CreateTestRowValue({ + CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), + })); + + store.Flush(); + store.Compact(); + ASSERT_FALSE(std::get<0>(store.Get("k1"))); +} + +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc b/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc new file mode 100644 index 00000000..8d6dc10d --- /dev/null +++ b/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <memory> +#include "util/testharness.h" +#include "utilities/cassandra/format.h" +#include "utilities/cassandra/test_utils.h" + +namespace rocksdb { +namespace cassandra { + +TEST(RowValueMergeTest, Merge) { + std::vector<RowValue> row_values; + row_values.push_back( + CreateTestRowValue({ + CreateTestColumnSpec(kTombstone, 0, 5), + CreateTestColumnSpec(kColumn, 1, 8), + CreateTestColumnSpec(kExpiringColumn, 2, 5), + }) + ); + + row_values.push_back( + CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, 2), + CreateTestColumnSpec(kExpiringColumn, 1, 5), + CreateTestColumnSpec(kTombstone, 2, 7), + CreateTestColumnSpec(kExpiringColumn, 7, 17), + }) + ); + + row_values.push_back( + CreateTestRowValue({ + CreateTestColumnSpec(kExpiringColumn, 0, 6), + CreateTestColumnSpec(kTombstone, 1, 5), + CreateTestColumnSpec(kColumn, 2, 4), + CreateTestColumnSpec(kTombstone, 11, 11), + }) + ); + + RowValue merged = RowValue::Merge(std::move(row_values)); + EXPECT_FALSE(merged.IsTombstone()); + EXPECT_EQ(merged.columns_.size(), 5); + VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8); + VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7); + VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17); + VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11); +} + +TEST(RowValueMergeTest, MergeWithRowTombstone) { + std::vector<RowValue> row_values; + + // A row tombstone. + row_values.push_back( + CreateRowTombstone(11) + ); + + // This row's timestamp is smaller than tombstone. + row_values.push_back( + CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 0, 5), + CreateTestColumnSpec(kColumn, 1, 6), + }) + ); + + // Some of the column's row is smaller, some is larger. + row_values.push_back( + CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 2, 10), + CreateTestColumnSpec(kColumn, 3, 12), + }) + ); + + // All of the column's rows are larger than tombstone. + row_values.push_back( + CreateTestRowValue({ + CreateTestColumnSpec(kColumn, 4, 13), + CreateTestColumnSpec(kColumn, 5, 14), + }) + ); + + RowValue merged = RowValue::Merge(std::move(row_values)); + EXPECT_FALSE(merged.IsTombstone()); + EXPECT_EQ(merged.columns_.size(), 3); + VerifyRowValueColumns(merged.columns_, 0, kColumn, 3, 12); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 4, 13); + VerifyRowValueColumns(merged.columns_, 2, kColumn, 5, 14); + + // If the tombstone's timestamp is the latest, then it returns a + // row tombstone. + row_values.push_back( + CreateRowTombstone(15) + ); + + row_values.push_back( + CreateRowTombstone(17) + ); + + merged = RowValue::Merge(std::move(row_values)); + EXPECT_TRUE(merged.IsTombstone()); + EXPECT_EQ(merged.LastModifiedTime(), 17); +} + +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc b/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc new file mode 100644 index 00000000..68d2c163 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc @@ -0,0 +1,188 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/testharness.h" +#include "utilities/cassandra/serialize.h" + +using namespace rocksdb::cassandra; + +namespace rocksdb { +namespace cassandra { + +TEST(SerializeTest, SerializeI64) { + std::string dest; + Serialize<int64_t>(0, &dest); + EXPECT_EQ( + std::string( + {'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00'}), + dest); + + dest.clear(); + Serialize<int64_t>(1, &dest); + EXPECT_EQ( + std::string( + {'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}), + dest); + + + dest.clear(); + Serialize<int64_t>(-1, &dest); + EXPECT_EQ( + std::string( + {'\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}), + dest); + + dest.clear(); + Serialize<int64_t>(9223372036854775807, &dest); + EXPECT_EQ( + std::string( + {'\x7f', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}), + dest); + + dest.clear(); + Serialize<int64_t>(-9223372036854775807, &dest); + EXPECT_EQ( + std::string( + {'\x80', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}), + dest); +} + +TEST(SerializeTest, DeserializeI64) { + std::string dest; + std::size_t offset = dest.size(); + Serialize<int64_t>(0, &dest); + EXPECT_EQ(0, Deserialize<int64_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int64_t>(1, &dest); + EXPECT_EQ(1, Deserialize<int64_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int64_t>(-1, &dest); + EXPECT_EQ(-1, Deserialize<int64_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int64_t>(-9223372036854775807, &dest); + EXPECT_EQ(-9223372036854775807, Deserialize<int64_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int64_t>(9223372036854775807, &dest); + EXPECT_EQ(9223372036854775807, Deserialize<int64_t>(dest.c_str(), offset)); +} + +TEST(SerializeTest, SerializeI32) { + std::string dest; + Serialize<int32_t>(0, &dest); + EXPECT_EQ( + std::string( + {'\x00', '\x00', '\x00', '\x00'}), + dest); + + dest.clear(); + Serialize<int32_t>(1, &dest); + EXPECT_EQ( + std::string( + {'\x00', '\x00', '\x00', '\x01'}), + dest); + + + dest.clear(); + Serialize<int32_t>(-1, &dest); + EXPECT_EQ( + std::string( + {'\xff', '\xff', '\xff', '\xff'}), + dest); + + dest.clear(); + Serialize<int32_t>(2147483647, &dest); + EXPECT_EQ( + std::string( + {'\x7f', '\xff', '\xff', '\xff'}), + dest); + + dest.clear(); + Serialize<int32_t>(-2147483648LL, &dest); + EXPECT_EQ( + std::string( + {'\x80', '\x00', '\x00', '\x00'}), + dest); +} + +TEST(SerializeTest, DeserializeI32) { + std::string dest; + std::size_t offset = dest.size(); + Serialize<int32_t>(0, &dest); + EXPECT_EQ(0, Deserialize<int32_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int32_t>(1, &dest); + EXPECT_EQ(1, Deserialize<int32_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int32_t>(-1, &dest); + EXPECT_EQ(-1, Deserialize<int32_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int32_t>(2147483647, &dest); + EXPECT_EQ(2147483647, Deserialize<int32_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int32_t>(-2147483648LL, &dest); + EXPECT_EQ(-2147483648LL, Deserialize<int32_t>(dest.c_str(), offset)); +} + +TEST(SerializeTest, SerializeI8) { + std::string dest; + Serialize<int8_t>(0, &dest); + EXPECT_EQ(std::string({'\x00'}), dest); + + dest.clear(); + Serialize<int8_t>(1, &dest); + EXPECT_EQ(std::string({'\x01'}), dest); + + + dest.clear(); + Serialize<int8_t>(-1, &dest); + EXPECT_EQ(std::string({'\xff'}), dest); + + dest.clear(); + Serialize<int8_t>(127, &dest); + EXPECT_EQ(std::string({'\x7f'}), dest); + + dest.clear(); + Serialize<int8_t>(-128, &dest); + EXPECT_EQ(std::string({'\x80'}), dest); +} + +TEST(SerializeTest, DeserializeI8) { + std::string dest; + std::size_t offset = dest.size(); + Serialize<int8_t>(0, &dest); + EXPECT_EQ(0, Deserialize<int8_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int8_t>(1, &dest); + EXPECT_EQ(1, Deserialize<int8_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int8_t>(-1, &dest); + EXPECT_EQ(-1, Deserialize<int8_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int8_t>(127, &dest); + EXPECT_EQ(127, Deserialize<int8_t>(dest.c_str(), offset)); + + offset = dest.size(); + Serialize<int8_t>(-128, &dest); + EXPECT_EQ(-128, Deserialize<int8_t>(dest.c_str(), offset)); +} + +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/utilities/cassandra/format.cc b/src/rocksdb/utilities/cassandra/format.cc new file mode 100644 index 00000000..42cd7206 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/format.cc @@ -0,0 +1,386 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "format.h" + +#include <algorithm> +#include <map> +#include <memory> + +#include "utilities/cassandra/serialize.h" + +namespace rocksdb { +namespace cassandra { +namespace { +const int32_t kDefaultLocalDeletionTime = + std::numeric_limits<int32_t>::max(); +const int64_t kDefaultMarkedForDeleteAt = + std::numeric_limits<int64_t>::min(); +} + +ColumnBase::ColumnBase(int8_t mask, int8_t index) + : mask_(mask), index_(index) {} + +std::size_t ColumnBase::Size() const { + return sizeof(mask_) + sizeof(index_); +} + +int8_t ColumnBase::Mask() const { + return mask_; +} + +int8_t ColumnBase::Index() const { + return index_; +} + +void ColumnBase::Serialize(std::string* dest) const { + rocksdb::cassandra::Serialize<int8_t>(mask_, dest); + rocksdb::cassandra::Serialize<int8_t>(index_, dest); +} + +std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src, + std::size_t offset) { + int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); + if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { + return Tombstone::Deserialize(src, offset); + } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) { + return ExpiringColumn::Deserialize(src, offset); + } else { + return Column::Deserialize(src, offset); + } +} + +Column::Column( + int8_t mask, + int8_t index, + int64_t timestamp, + int32_t value_size, + const char* value +) : ColumnBase(mask, index), timestamp_(timestamp), + value_size_(value_size), value_(value) {} + +int64_t Column::Timestamp() const { + return timestamp_; +} + +std::size_t Column::Size() const { + return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_) + + value_size_; +} + +void Column::Serialize(std::string* dest) const { + ColumnBase::Serialize(dest); + rocksdb::cassandra::Serialize<int64_t>(timestamp_, dest); + rocksdb::cassandra::Serialize<int32_t>(value_size_, dest); + dest->append(value_, value_size_); +} + +std::shared_ptr<Column> Column::Deserialize(const char *src, + std::size_t offset) { + int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); + offset += sizeof(mask); + int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset); + offset += sizeof(index); + int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset); + offset += sizeof(timestamp); + int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset); + offset += sizeof(value_size); + return std::make_shared<Column>( + mask, index, timestamp, value_size, src + offset); +} + +ExpiringColumn::ExpiringColumn( + int8_t mask, + int8_t index, + int64_t timestamp, + int32_t value_size, + const char* value, + int32_t ttl +) : Column(mask, index, timestamp, value_size, value), + ttl_(ttl) {} + +std::size_t ExpiringColumn::Size() const { + return Column::Size() + sizeof(ttl_); +} + +void ExpiringColumn::Serialize(std::string* dest) const { + Column::Serialize(dest); + rocksdb::cassandra::Serialize<int32_t>(ttl_, dest); +} + +std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const { + return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp())); +} + +std::chrono::seconds ExpiringColumn::Ttl() const { + return std::chrono::seconds(ttl_); +} + +bool ExpiringColumn::Expired() const { + return TimePoint() + Ttl() < std::chrono::system_clock::now(); +} + +std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const { + auto expired_at = (TimePoint() + Ttl()).time_since_epoch(); + int32_t local_deletion_time = static_cast<int32_t>( + std::chrono::duration_cast<std::chrono::seconds>(expired_at).count()); + int64_t marked_for_delete_at = + std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count(); + return std::make_shared<Tombstone>( + static_cast<int8_t>(ColumnTypeMask::DELETION_MASK), + Index(), + local_deletion_time, + marked_for_delete_at); +} + +std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize( + const char *src, + std::size_t offset) { + int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); + offset += sizeof(mask); + int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset); + offset += sizeof(index); + int64_t timestamp = rocksdb::cassandra::Deserialize<int64_t>(src, offset); + offset += sizeof(timestamp); + int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset); + offset += sizeof(value_size); + const char* value = src + offset; + offset += value_size; + int32_t ttl = rocksdb::cassandra::Deserialize<int32_t>(src, offset); + return std::make_shared<ExpiringColumn>( + mask, index, timestamp, value_size, value, ttl); +} + +Tombstone::Tombstone( + int8_t mask, + int8_t index, + int32_t local_deletion_time, + int64_t marked_for_delete_at +) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time), + marked_for_delete_at_(marked_for_delete_at) {} + +int64_t Tombstone::Timestamp() const { + return marked_for_delete_at_; +} + +std::size_t Tombstone::Size() const { + return ColumnBase::Size() + sizeof(local_deletion_time_) + + sizeof(marked_for_delete_at_); +} + +void Tombstone::Serialize(std::string* dest) const { + ColumnBase::Serialize(dest); + rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest); + rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest); +} + +bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const { + auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>( + std::chrono::seconds(local_deletion_time_)); + auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds); + return local_deleted_at + gc_grace_period < std::chrono::system_clock::now(); +} + +std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src, + std::size_t offset) { + int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); + offset += sizeof(mask); + int8_t index = rocksdb::cassandra::Deserialize<int8_t>(src, offset); + offset += sizeof(index); + int32_t local_deletion_time = + rocksdb::cassandra::Deserialize<int32_t>(src, offset); + offset += sizeof(int32_t); + int64_t marked_for_delete_at = + rocksdb::cassandra::Deserialize<int64_t>(src, offset); + return std::make_shared<Tombstone>( + mask, index, local_deletion_time, marked_for_delete_at); +} + +RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at) + : local_deletion_time_(local_deletion_time), + marked_for_delete_at_(marked_for_delete_at), columns_(), + last_modified_time_(0) {} + +RowValue::RowValue(Columns columns, + int64_t last_modified_time) + : local_deletion_time_(kDefaultLocalDeletionTime), + marked_for_delete_at_(kDefaultMarkedForDeleteAt), + columns_(std::move(columns)), last_modified_time_(last_modified_time) {} + +std::size_t RowValue::Size() const { + std::size_t size = sizeof(local_deletion_time_) + + sizeof(marked_for_delete_at_); + for (const auto& column : columns_) { + size += column -> Size(); + } + return size; +} + +int64_t RowValue::LastModifiedTime() const { + if (IsTombstone()) { + return marked_for_delete_at_; + } else { + return last_modified_time_; + } +} + +bool RowValue::IsTombstone() const { + return marked_for_delete_at_ > kDefaultMarkedForDeleteAt; +} + +void RowValue::Serialize(std::string* dest) const { + rocksdb::cassandra::Serialize<int32_t>(local_deletion_time_, dest); + rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest); + for (const auto& column : columns_) { + column -> Serialize(dest); + } +} + +RowValue RowValue::RemoveExpiredColumns(bool* changed) const { + *changed = false; + Columns new_columns; + for (auto& column : columns_) { + if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) { + std::shared_ptr<ExpiringColumn> expiring_column = + std::static_pointer_cast<ExpiringColumn>(column); + + if(expiring_column->Expired()){ + *changed = true; + continue; + } + } + + new_columns.push_back(column); + } + return RowValue(std::move(new_columns), last_modified_time_); +} + +RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const { + *changed = false; + Columns new_columns; + for (auto& column : columns_) { + if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) { + std::shared_ptr<ExpiringColumn> expiring_column = + std::static_pointer_cast<ExpiringColumn>(column); + + if(expiring_column->Expired()) { + std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone(); + new_columns.push_back(tombstone); + *changed = true; + continue; + } + } + new_columns.push_back(column); + } + return RowValue(std::move(new_columns), last_modified_time_); +} + +RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const { + Columns new_columns; + for (auto& column : columns_) { + if (column->Mask() == ColumnTypeMask::DELETION_MASK) { + std::shared_ptr<Tombstone> tombstone = + std::static_pointer_cast<Tombstone>(column); + + if (tombstone->Collectable(gc_grace_period)) { + continue; + } + } + + new_columns.push_back(column); + } + return RowValue(std::move(new_columns), last_modified_time_); +} + +bool RowValue::Empty() const { + return columns_.empty(); +} + +RowValue RowValue::Deserialize(const char *src, std::size_t size) { + std::size_t offset = 0; + assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_)); + int32_t local_deletion_time = + rocksdb::cassandra::Deserialize<int32_t>(src, offset); + offset += sizeof(int32_t); + int64_t marked_for_delete_at = + rocksdb::cassandra::Deserialize<int64_t>(src, offset); + offset += sizeof(int64_t); + if (offset == size) { + return RowValue(local_deletion_time, marked_for_delete_at); + } + + assert(local_deletion_time == kDefaultLocalDeletionTime); + assert(marked_for_delete_at == kDefaultMarkedForDeleteAt); + Columns columns; + int64_t last_modified_time = 0; + while (offset < size) { + auto c = ColumnBase::Deserialize(src, offset); + offset += c -> Size(); + assert(offset <= size); + last_modified_time = std::max(last_modified_time, c -> Timestamp()); + columns.push_back(std::move(c)); + } + + return RowValue(std::move(columns), last_modified_time); +} + +// Merge multiple row values into one. +// For each column in rows with same index, we pick the one with latest +// timestamp. And we also take row tombstone into consideration, by iterating +// each row from reverse timestamp order, and stop once we hit the first +// row tombstone. +RowValue RowValue::Merge(std::vector<RowValue>&& values) { + assert(values.size() > 0); + if (values.size() == 1) { + return std::move(values[0]); + } + + // Merge columns by their last modified time, and skip once we hit + // a row tombstone. + std::sort(values.begin(), values.end(), + [](const RowValue& r1, const RowValue& r2) { + return r1.LastModifiedTime() > r2.LastModifiedTime(); + }); + + std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns; + int64_t tombstone_timestamp = 0; + + for (auto& value : values) { + if (value.IsTombstone()) { + if (merged_columns.size() == 0) { + return std::move(value); + } + tombstone_timestamp = value.LastModifiedTime(); + break; + } + for (auto& column : value.columns_) { + int8_t index = column->Index(); + if (merged_columns.find(index) == merged_columns.end()) { + merged_columns[index] = column; + } else { + if (column->Timestamp() > merged_columns[index]->Timestamp()) { + merged_columns[index] = column; + } + } + } + } + + int64_t last_modified_time = 0; + Columns columns; + for (auto& pair: merged_columns) { + // For some row, its last_modified_time > row tombstone_timestamp, but + // it might have rows whose timestamp is ealier than tombstone, so we + // ned to filter these rows. + if (pair.second->Timestamp() <= tombstone_timestamp) { + continue; + } + last_modified_time = std::max(last_modified_time, pair.second->Timestamp()); + columns.push_back(std::move(pair.second)); + } + return RowValue(std::move(columns), last_modified_time); +} + +} // namepsace cassandrda +} // namespace rocksdb diff --git a/src/rocksdb/utilities/cassandra/format.h b/src/rocksdb/utilities/cassandra/format.h new file mode 100644 index 00000000..09a49235 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/format.h @@ -0,0 +1,197 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +/** + * The encoding of Cassandra Row Value. + * + * A Cassandra Row Value could either be a row tombstone, + * or contains multiple columns, it has following fields: + * + * struct row_value { + * int32_t local_deletion_time; // Time in second when the row is deleted, + * // only used for Cassandra tombstone gc. + * int64_t marked_for_delete_at; // Ms that marked this row is deleted. + * struct column_base columns[]; // For non tombstone row, all columns + * // are stored here. + * } + * + * If the local_deletion_time and marked_for_delete_at is set, then this is + * a tombstone, otherwise it contains multiple columns. + * + * There are three type of Columns: Normal Column, Expiring Column and Column + * Tombstone, which have following fields: + * + * // Identify the type of the column. + * enum mask { + * DELETION_MASK = 0x01, + * EXPIRATION_MASK = 0x02, + * }; + * + * struct column { + * int8_t mask = 0; + * int8_t index; + * int64_t timestamp; + * int32_t value_length; + * char value[value_length]; + * } + * + * struct expiring_column { + * int8_t mask = mask.EXPIRATION_MASK; + * int8_t index; + * int64_t timestamp; + * int32_t value_length; + * char value[value_length]; + * int32_t ttl; + * } + * + * struct tombstone_column { + * int8_t mask = mask.DELETION_MASK; + * int8_t index; + * int32_t local_deletion_time; // Similar to row_value's field. + * int64_t marked_for_delete_at; + * } + */ + +#pragma once +#include <chrono> +#include <vector> +#include <memory> +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "util/testharness.h" + +namespace rocksdb { +namespace cassandra { + +// Identify the type of the column. +enum ColumnTypeMask { + DELETION_MASK = 0x01, + EXPIRATION_MASK = 0x02, +}; + + +class ColumnBase { +public: + ColumnBase(int8_t mask, int8_t index); + virtual ~ColumnBase() = default; + + virtual int64_t Timestamp() const = 0; + virtual int8_t Mask() const; + virtual int8_t Index() const; + virtual std::size_t Size() const; + virtual void Serialize(std::string* dest) const; + static std::shared_ptr<ColumnBase> Deserialize(const char* src, + std::size_t offset); + +private: + int8_t mask_; + int8_t index_; +}; + +class Column : public ColumnBase { +public: + Column(int8_t mask, int8_t index, int64_t timestamp, + int32_t value_size, const char* value); + + virtual int64_t Timestamp() const override; + virtual std::size_t Size() const override; + virtual void Serialize(std::string* dest) const override; + static std::shared_ptr<Column> Deserialize(const char* src, + std::size_t offset); + +private: + int64_t timestamp_; + int32_t value_size_; + const char* value_; +}; + +class Tombstone : public ColumnBase { +public: + Tombstone(int8_t mask, int8_t index, + int32_t local_deletion_time, int64_t marked_for_delete_at); + + virtual int64_t Timestamp() const override; + virtual std::size_t Size() const override; + virtual void Serialize(std::string* dest) const override; + bool Collectable(int32_t gc_grace_period) const; + static std::shared_ptr<Tombstone> Deserialize(const char* src, + std::size_t offset); + +private: + int32_t local_deletion_time_; + int64_t marked_for_delete_at_; +}; + +class ExpiringColumn : public Column { +public: + ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp, + int32_t value_size, const char* value, int32_t ttl); + + virtual std::size_t Size() const override; + virtual void Serialize(std::string* dest) const override; + bool Expired() const; + std::shared_ptr<Tombstone> ToTombstone() const; + + static std::shared_ptr<ExpiringColumn> Deserialize(const char* src, + std::size_t offset); + +private: + int32_t ttl_; + std::chrono::time_point<std::chrono::system_clock> TimePoint() const; + std::chrono::seconds Ttl() const; +}; + +typedef std::vector<std::shared_ptr<ColumnBase>> Columns; + +class RowValue { +public: + // Create a Row Tombstone. + RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at); + // Create a Row containing columns. + RowValue(Columns columns, + int64_t last_modified_time); + RowValue(const RowValue& /*that*/) = delete; + RowValue(RowValue&& /*that*/) noexcept = default; + RowValue& operator=(const RowValue& /*that*/) = delete; + RowValue& operator=(RowValue&& /*that*/) = default; + + std::size_t Size() const;; + bool IsTombstone() const; + // For Tombstone this returns the marked_for_delete_at_, + // otherwise it returns the max timestamp of containing columns. + int64_t LastModifiedTime() const; + void Serialize(std::string* dest) const; + RowValue RemoveExpiredColumns(bool* changed) const; + RowValue ConvertExpiredColumnsToTombstones(bool* changed) const; + RowValue RemoveTombstones(int32_t gc_grace_period) const; + bool Empty() const; + + static RowValue Deserialize(const char* src, std::size_t size); + // Merge multiple rows according to their timestamp. + static RowValue Merge(std::vector<RowValue>&& values); + +private: + int32_t local_deletion_time_; + int64_t marked_for_delete_at_; + Columns columns_; + int64_t last_modified_time_; + + FRIEND_TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired); + FRIEND_TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones); + FRIEND_TEST(RowValueMergeTest, Merge); + FRIEND_TEST(RowValueMergeTest, MergeWithRowTombstone); + FRIEND_TEST(CassandraFunctionalTest, SimpleMergeTest); + FRIEND_TEST( + CassandraFunctionalTest, CompactionShouldConvertExpiredColumnsToTombstone); + FRIEND_TEST( + CassandraFunctionalTest, CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn); + FRIEND_TEST( + CassandraFunctionalTest, CompactionShouldRemoveRowWhenAllColumnExpiredIfPurgeTtlIsOn); + FRIEND_TEST(CassandraFunctionalTest, + CompactionShouldRemoveTombstoneExceedingGCGracePeriod); +}; + +} // namepsace cassandrda +} // namespace rocksdb diff --git a/src/rocksdb/utilities/cassandra/merge_operator.cc b/src/rocksdb/utilities/cassandra/merge_operator.cc new file mode 100644 index 00000000..4e529a6f --- /dev/null +++ b/src/rocksdb/utilities/cassandra/merge_operator.cc @@ -0,0 +1,67 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "merge_operator.h" + +#include <memory> +#include <assert.h> + +#include "rocksdb/slice.h" +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" +#include "utilities/cassandra/format.h" + +namespace rocksdb { +namespace cassandra { + +// Implementation for the merge operation (merges two Cassandra values) +bool CassandraValueMergeOperator::FullMergeV2( + const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + // Clear the *new_value for writing. + merge_out->new_value.clear(); + std::vector<RowValue> row_values; + if (merge_in.existing_value) { + row_values.push_back( + RowValue::Deserialize(merge_in.existing_value->data(), + merge_in.existing_value->size())); + } + + for (auto& operand : merge_in.operand_list) { + row_values.push_back(RowValue::Deserialize(operand.data(), operand.size())); + } + + RowValue merged = RowValue::Merge(std::move(row_values)); + merged = merged.RemoveTombstones(gc_grace_period_in_seconds_); + merge_out->new_value.reserve(merged.Size()); + merged.Serialize(&(merge_out->new_value)); + + return true; +} + +bool CassandraValueMergeOperator::PartialMergeMulti( + const Slice& /*key*/, const std::deque<Slice>& operand_list, + std::string* new_value, Logger* /*logger*/) const { + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + std::vector<RowValue> row_values; + for (auto& operand : operand_list) { + row_values.push_back(RowValue::Deserialize(operand.data(), operand.size())); + } + RowValue merged = RowValue::Merge(std::move(row_values)); + new_value->reserve(merged.Size()); + merged.Serialize(new_value); + return true; +} + +const char* CassandraValueMergeOperator::Name() const { + return "CassandraValueMergeOperator"; +} + +} // namespace cassandra + +} // namespace rocksdb diff --git a/src/rocksdb/utilities/cassandra/merge_operator.h b/src/rocksdb/utilities/cassandra/merge_operator.h new file mode 100644 index 00000000..4d02c26d --- /dev/null +++ b/src/rocksdb/utilities/cassandra/merge_operator.h @@ -0,0 +1,44 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" + +namespace rocksdb { +namespace cassandra { + +/** + * A MergeOperator for rocksdb that implements Cassandra row value merge. + */ +class CassandraValueMergeOperator : public MergeOperator { +public: + explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds, + size_t operands_limit = 0) + : gc_grace_period_in_seconds_(gc_grace_period_in_seconds), + operands_limit_(operands_limit) {} + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; + + virtual bool PartialMergeMulti(const Slice& key, + const std::deque<Slice>& operand_list, + std::string* new_value, + Logger* logger) const override; + + virtual const char* Name() const override; + + virtual bool AllowSingleOperand() const override { return true; } + + virtual bool ShouldMerge(const std::vector<Slice>& operands) const override { + return operands_limit_ > 0 && operands.size() >= operands_limit_; + } + +private: + int32_t gc_grace_period_in_seconds_; + size_t operands_limit_; +}; +} // namespace cassandra +} // namespace rocksdb diff --git a/src/rocksdb/utilities/cassandra/serialize.h b/src/rocksdb/utilities/cassandra/serialize.h new file mode 100644 index 00000000..64ccd4c2 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/serialize.h @@ -0,0 +1,75 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +/** + * Helper functions which serialize and deserialize integers + * into bytes in big endian. + */ + +#pragma once + +namespace rocksdb { +namespace cassandra { +namespace { +const int64_t kCharMask = 0xFFLL; +const int32_t kBitsPerByte = 8; +} + +template<typename T> +void Serialize(T val, std::string* dest); + +template<typename T> +T Deserialize(const char* src, std::size_t offset=0); + +// Specializations +template<> +inline void Serialize<int8_t>(int8_t t, std::string* dest) { + dest->append(1, static_cast<char>(t & kCharMask)); +} + +template<> +inline void Serialize<int32_t>(int32_t t, std::string* dest) { + for (unsigned long i = 0; i < sizeof(int32_t); i++) { + dest->append(1, static_cast<char>( + (t >> (sizeof(int32_t) - 1 - i) * kBitsPerByte) & kCharMask)); + } +} + +template<> +inline void Serialize<int64_t>(int64_t t, std::string* dest) { + for (unsigned long i = 0; i < sizeof(int64_t); i++) { + dest->append( + 1, static_cast<char>( + (t >> (sizeof(int64_t) - 1 - i) * kBitsPerByte) & kCharMask)); + } +} + +template<> +inline int8_t Deserialize<int8_t>(const char* src, std::size_t offset) { + return static_cast<int8_t>(src[offset]); +} + +template<> +inline int32_t Deserialize<int32_t>(const char* src, std::size_t offset) { + int32_t result = 0; + for (unsigned long i = 0; i < sizeof(int32_t); i++) { + result |= static_cast<int32_t>(static_cast<unsigned char>(src[offset + i])) + << ((sizeof(int32_t) - 1 - i) * kBitsPerByte); + } + return result; +} + +template<> +inline int64_t Deserialize<int64_t>(const char* src, std::size_t offset) { + int64_t result = 0; + for (unsigned long i = 0; i < sizeof(int64_t); i++) { + result |= static_cast<int64_t>(static_cast<unsigned char>(src[offset + i])) + << ((sizeof(int64_t) - 1 - i) * kBitsPerByte); + } + return result; +} + +} // namepsace cassandrda +} // namespace rocksdb diff --git a/src/rocksdb/utilities/cassandra/test_utils.cc b/src/rocksdb/utilities/cassandra/test_utils.cc new file mode 100644 index 00000000..d4dc8f00 --- /dev/null +++ b/src/rocksdb/utilities/cassandra/test_utils.cc @@ -0,0 +1,75 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_utils.h" + +namespace rocksdb { +namespace cassandra { +const char kData[] = {'d', 'a', 't', 'a'}; +const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'}; +const int32_t kTtl = 86400; +const int8_t kColumn = 0; +const int8_t kTombstone = 1; +const int8_t kExpiringColumn = 2; + +std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask, + int8_t index, + int64_t timestamp) { + if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { + return std::shared_ptr<Tombstone>( + new Tombstone(mask, index, ToSeconds(timestamp), timestamp)); + } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) { + return std::shared_ptr<ExpiringColumn>(new ExpiringColumn( + mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl)); + } else { + return std::shared_ptr<Column>( + new Column(mask, index, timestamp, sizeof(kData), kData)); + } +} + +std::tuple<int8_t, int8_t, int64_t> CreateTestColumnSpec(int8_t mask, + int8_t index, + int64_t timestamp) { + return std::make_tuple(mask, index, timestamp); +} + +RowValue CreateTestRowValue( + std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs) { + std::vector<std::shared_ptr<ColumnBase>> columns; + int64_t last_modified_time = 0; + for (auto spec: column_specs) { + auto c = CreateTestColumn(std::get<0>(spec), std::get<1>(spec), + std::get<2>(spec)); + last_modified_time = std::max(last_modified_time, c -> Timestamp()); + columns.push_back(std::move(c)); + } + return RowValue(std::move(columns), last_modified_time); +} + +RowValue CreateRowTombstone(int64_t timestamp) { + return RowValue(ToSeconds(timestamp), timestamp); +} + +void VerifyRowValueColumns( + std::vector<std::shared_ptr<ColumnBase>> &columns, + std::size_t index_of_vector, + int8_t expected_mask, + int8_t expected_index, + int64_t expected_timestamp +) { + EXPECT_EQ(expected_timestamp, columns[index_of_vector]->Timestamp()); + EXPECT_EQ(expected_mask, columns[index_of_vector]->Mask()); + EXPECT_EQ(expected_index, columns[index_of_vector]->Index()); +} + +int64_t ToMicroSeconds(int64_t seconds) { + return seconds * (int64_t)1000000; +} + +int32_t ToSeconds(int64_t microseconds) { + return (int32_t)(microseconds / (int64_t)1000000); +} +} +} diff --git a/src/rocksdb/utilities/cassandra/test_utils.h b/src/rocksdb/utilities/cassandra/test_utils.h new file mode 100644 index 00000000..80374b0c --- /dev/null +++ b/src/rocksdb/utilities/cassandra/test_utils.h @@ -0,0 +1,46 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include <memory> +#include "util/testharness.h" +#include "utilities/cassandra/format.h" +#include "utilities/cassandra/serialize.h" + +namespace rocksdb { +namespace cassandra { +extern const char kData[]; +extern const char kExpiringData[]; +extern const int32_t kTtl; +extern const int8_t kColumn; +extern const int8_t kTombstone; +extern const int8_t kExpiringColumn; + + +std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask, + int8_t index, + int64_t timestamp); + +std::tuple<int8_t, int8_t, int64_t> CreateTestColumnSpec(int8_t mask, + int8_t index, + int64_t timestamp); + +RowValue CreateTestRowValue( + std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs); + +RowValue CreateRowTombstone(int64_t timestamp); + +void VerifyRowValueColumns( + std::vector<std::shared_ptr<ColumnBase>> &columns, + std::size_t index_of_vector, + int8_t expected_mask, + int8_t expected_index, + int64_t expected_timestamp +); + +int64_t ToMicroSeconds(int64_t seconds); +int32_t ToSeconds(int64_t microseconds); +} +} |