summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/cassandra
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc110
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h57
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_format_test.cc377
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_functional_test.cc446
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_options.h43
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc98
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc164
-rw-r--r--src/rocksdb/utilities/cassandra/format.cc367
-rw-r--r--src/rocksdb/utilities/cassandra/format.h183
-rw-r--r--src/rocksdb/utilities/cassandra/merge_operator.cc82
-rw-r--r--src/rocksdb/utilities/cassandra/merge_operator.h44
-rw-r--r--src/rocksdb/utilities/cassandra/serialize.h81
-rw-r--r--src/rocksdb/utilities/cassandra/test_utils.cc69
-rw-r--r--src/rocksdb/utilities/cassandra/test_utils.h42
14 files changed, 2163 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc
new file mode 100644
index 000000000..4e48d63aa
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc
@@ -0,0 +1,110 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "utilities/cassandra/cassandra_compaction_filter.h"
+
+#include <string>
+
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "rocksdb/utilities/options_type.h"
+#include "utilities/cassandra/format.h"
+#include "utilities/cassandra/merge_operator.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+static std::unordered_map<std::string, OptionTypeInfo>
+ cassandra_filter_type_info = {
+#ifndef ROCKSDB_LITE
+ {"purge_ttl_on_expiration",
+ {offsetof(struct CassandraOptions, purge_ttl_on_expiration),
+ OptionType::kBoolean, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+ {"gc_grace_period_in_seconds",
+ {offsetof(struct CassandraOptions, gc_grace_period_in_seconds),
+ OptionType::kUInt32T, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+#endif // ROCKSDB_LITE
+};
+
+CassandraCompactionFilter::CassandraCompactionFilter(
+ bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds)
+ : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) {
+ RegisterOptions(&options_, &cassandra_filter_type_info);
+}
+
+CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
+ int /*level*/, const Slice& /*key*/, ValueType value_type,
+ const Slice& existing_value, std::string* new_value,
+ std::string* /*skip_until*/) const {
+ bool value_changed = false;
+ RowValue row_value =
+ RowValue::Deserialize(existing_value.data(), existing_value.size());
+ RowValue compacted =
+ options_.purge_ttl_on_expiration
+ ? row_value.RemoveExpiredColumns(&value_changed)
+ : row_value.ConvertExpiredColumnsToTombstones(&value_changed);
+
+ if (value_type == ValueType::kValue) {
+ compacted = compacted.RemoveTombstones(options_.gc_grace_period_in_seconds);
+ }
+
+ if (compacted.Empty()) {
+ return Decision::kRemove;
+ }
+
+ if (value_changed) {
+ compacted.Serialize(new_value);
+ return Decision::kChangeValue;
+ }
+
+ return Decision::kKeep;
+}
+
+CassandraCompactionFilterFactory::CassandraCompactionFilterFactory(
+ bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds)
+ : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) {
+ RegisterOptions(&options_, &cassandra_filter_type_info);
+}
+
+std::unique_ptr<CompactionFilter>
+CassandraCompactionFilterFactory::CreateCompactionFilter(
+ const CompactionFilter::Context&) {
+ std::unique_ptr<CompactionFilter> result(new CassandraCompactionFilter(
+ options_.purge_ttl_on_expiration, options_.gc_grace_period_in_seconds));
+ return result;
+}
+
+#ifndef ROCKSDB_LITE
+int RegisterCassandraObjects(ObjectLibrary& library,
+ const std::string& /*arg*/) {
+ library.AddFactory<MergeOperator>(
+ CassandraValueMergeOperator::kClassName(),
+ [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new CassandraValueMergeOperator(0));
+ return guard->get();
+ });
+ library.AddFactory<CompactionFilter>(
+ CassandraCompactionFilter::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilter>* /*guard */,
+ std::string* /* errmsg */) {
+ return new CassandraCompactionFilter(false, 0);
+ });
+ library.AddFactory<CompactionFilterFactory>(
+ CassandraCompactionFilterFactory::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilterFactory>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new CassandraCompactionFilterFactory(false, 0));
+ return guard->get();
+ });
+ size_t num_types;
+ return static_cast<int>(library.GetFactoryCount(&num_types));
+}
+#endif // ROCKSDB_LITE
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h
new file mode 100644
index 000000000..0325a4c39
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h
@@ -0,0 +1,57 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include <string>
+
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/slice.h"
+#include "utilities/cassandra/cassandra_options.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+/**
+ * Compaction filter for removing expired Cassandra data with ttl.
+ * If option `purge_ttl_on_expiration` is set to true, expired data
+ * will be directly purged. Otherwise expired data will be converted
+ * tombstones first, then be eventally removed after gc grace period.
+ * `purge_ttl_on_expiration` should only be on in the case all the
+ * writes have same ttl setting, otherwise it could bring old data back.
+ *
+ * Compaction filter is also in charge of removing tombstone that has been
+ * promoted to kValue type after serials of merging in compaction.
+ */
+class CassandraCompactionFilter : public CompactionFilter {
+ public:
+ explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
+ int32_t gc_grace_period_in_seconds);
+ static const char* kClassName() { return "CassandraCompactionFilter"; }
+ const char* Name() const override { return kClassName(); }
+
+ virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
+ const Slice& existing_value, std::string* new_value,
+ std::string* skip_until) const override;
+
+ private:
+ CassandraOptions options_;
+};
+
+class CassandraCompactionFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit CassandraCompactionFilterFactory(bool purge_ttl_on_expiration,
+ int32_t gc_grace_period_in_seconds);
+ ~CassandraCompactionFilterFactory() override {}
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override;
+ static const char* kClassName() { return "CassandraCompactionFilterFactory"; }
+ const char* Name() const override { return kClassName(); }
+
+ private:
+ CassandraOptions options_;
+};
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/cassandra_format_test.cc b/src/rocksdb/utilities/cassandra/cassandra_format_test.cc
new file mode 100644
index 000000000..4f12947ad
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_format_test.cc
@@ -0,0 +1,377 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <cstring>
+#include <memory>
+
+#include "test_util/testharness.h"
+#include "utilities/cassandra/format.h"
+#include "utilities/cassandra/serialize.h"
+#include "utilities/cassandra/test_utils.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+TEST(ColumnTest, Column) {
+ char data[4] = {'d', 'a', 't', 'a'};
+ int8_t mask = 0;
+ int8_t index = 1;
+ int64_t timestamp = 1494022807044;
+ Column c = Column(mask, index, timestamp, sizeof(data), data);
+
+ EXPECT_EQ(c.Index(), index);
+ EXPECT_EQ(c.Timestamp(), timestamp);
+ EXPECT_EQ(c.Size(), 14 + sizeof(data));
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(c.Size() * 2);
+ c.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), c.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), timestamp);
+ offset += sizeof(int64_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(data));
+ offset += sizeof(int32_t);
+ EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0);
+
+ // Verify the deserialization.
+ std::string saved_dest = dest;
+ std::shared_ptr<Column> c1 = Column::Deserialize(saved_dest.c_str(), 0);
+ EXPECT_EQ(c1->Index(), index);
+ EXPECT_EQ(c1->Timestamp(), timestamp);
+ EXPECT_EQ(c1->Size(), 14 + sizeof(data));
+
+ c1->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * c.Size());
+ EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) ==
+ 0);
+
+ // Verify the ColumnBase::Deserialization.
+ saved_dest = dest;
+ std::shared_ptr<ColumnBase> c2 =
+ ColumnBase::Deserialize(saved_dest.c_str(), c.Size());
+ c2->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 3 * c.Size());
+ EXPECT_TRUE(std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2,
+ c.Size()) == 0);
+}
+
+TEST(ExpiringColumnTest, ExpiringColumn) {
+ char data[4] = {'d', 'a', 't', 'a'};
+ int8_t mask = ColumnTypeMask::EXPIRATION_MASK;
+ int8_t index = 3;
+ int64_t timestamp = 1494022807044;
+ int32_t ttl = 3600;
+ ExpiringColumn c =
+ ExpiringColumn(mask, index, timestamp, sizeof(data), data, ttl);
+
+ EXPECT_EQ(c.Index(), index);
+ EXPECT_EQ(c.Timestamp(), timestamp);
+ EXPECT_EQ(c.Size(), 18 + sizeof(data));
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(c.Size() * 2);
+ c.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), c.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), timestamp);
+ offset += sizeof(int64_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(data));
+ offset += sizeof(int32_t);
+ EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0);
+ offset += sizeof(data);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), ttl);
+
+ // Verify the deserialization.
+ std::string saved_dest = dest;
+ std::shared_ptr<ExpiringColumn> c1 =
+ ExpiringColumn::Deserialize(saved_dest.c_str(), 0);
+ EXPECT_EQ(c1->Index(), index);
+ EXPECT_EQ(c1->Timestamp(), timestamp);
+ EXPECT_EQ(c1->Size(), 18 + sizeof(data));
+
+ c1->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * c.Size());
+ EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) ==
+ 0);
+
+ // Verify the ColumnBase::Deserialization.
+ saved_dest = dest;
+ std::shared_ptr<ColumnBase> c2 =
+ ColumnBase::Deserialize(saved_dest.c_str(), c.Size());
+ c2->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 3 * c.Size());
+ EXPECT_TRUE(std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2,
+ c.Size()) == 0);
+}
+
+TEST(TombstoneTest, TombstoneCollectable) {
+ int32_t now = (int32_t)time(nullptr);
+ int32_t gc_grace_seconds = 16440;
+ int32_t time_delta_seconds = 10;
+ EXPECT_TRUE(
+ Tombstone(ColumnTypeMask::DELETION_MASK, 0,
+ now - gc_grace_seconds - time_delta_seconds,
+ ToMicroSeconds(now - gc_grace_seconds - time_delta_seconds))
+ .Collectable(gc_grace_seconds));
+ EXPECT_FALSE(
+ Tombstone(ColumnTypeMask::DELETION_MASK, 0,
+ now - gc_grace_seconds + time_delta_seconds,
+ ToMicroSeconds(now - gc_grace_seconds + time_delta_seconds))
+ .Collectable(gc_grace_seconds));
+}
+
+TEST(TombstoneTest, Tombstone) {
+ int8_t mask = ColumnTypeMask::DELETION_MASK;
+ int8_t index = 2;
+ int32_t local_deletion_time = 1494022807;
+ int64_t marked_for_delete_at = 1494022807044;
+ Tombstone c =
+ Tombstone(mask, index, local_deletion_time, marked_for_delete_at);
+
+ EXPECT_EQ(c.Index(), index);
+ EXPECT_EQ(c.Timestamp(), marked_for_delete_at);
+ EXPECT_EQ(c.Size(), 14);
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(c.Size() * 2);
+ c.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), c.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), local_deletion_time);
+ offset += sizeof(int32_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at);
+
+ // Verify the deserialization.
+ std::shared_ptr<Tombstone> c1 = Tombstone::Deserialize(dest.c_str(), 0);
+ EXPECT_EQ(c1->Index(), index);
+ EXPECT_EQ(c1->Timestamp(), marked_for_delete_at);
+ EXPECT_EQ(c1->Size(), 14);
+
+ c1->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * c.Size());
+ EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) ==
+ 0);
+
+ // Verify the ColumnBase::Deserialization.
+ std::shared_ptr<ColumnBase> c2 =
+ ColumnBase::Deserialize(dest.c_str(), c.Size());
+ c2->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 3 * c.Size());
+ EXPECT_TRUE(std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2,
+ c.Size()) == 0);
+}
+
+class RowValueTest : public testing::Test {};
+
+TEST(RowValueTest, RowTombstone) {
+ int32_t local_deletion_time = 1494022807;
+ int64_t marked_for_delete_at = 1494022807044;
+ RowValue r = RowValue(local_deletion_time, marked_for_delete_at);
+
+ EXPECT_EQ(r.Size(), 12);
+ EXPECT_EQ(r.IsTombstone(), true);
+ EXPECT_EQ(r.LastModifiedTime(), marked_for_delete_at);
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(r.Size() * 2);
+ r.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), r.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), local_deletion_time);
+ offset += sizeof(int32_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at);
+
+ // Verify the deserialization.
+ RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size());
+ EXPECT_EQ(r1.Size(), 12);
+ EXPECT_EQ(r1.IsTombstone(), true);
+ EXPECT_EQ(r1.LastModifiedTime(), marked_for_delete_at);
+
+ r1.Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * r.Size());
+ EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) ==
+ 0);
+}
+
+TEST(RowValueTest, RowWithColumns) {
+ std::vector<std::shared_ptr<ColumnBase>> columns;
+ int64_t last_modified_time = 1494022807048;
+ std::size_t columns_data_size = 0;
+
+ char e_data[5] = {'e', 'd', 'a', 't', 'a'};
+ int8_t e_index = 0;
+ int64_t e_timestamp = 1494022807044;
+ int32_t e_ttl = 3600;
+ columns.push_back(std::shared_ptr<ExpiringColumn>(
+ new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index, e_timestamp,
+ sizeof(e_data), e_data, e_ttl)));
+ columns_data_size += columns[0]->Size();
+
+ char c_data[4] = {'d', 'a', 't', 'a'};
+ int8_t c_index = 1;
+ int64_t c_timestamp = 1494022807048;
+ columns.push_back(std::shared_ptr<Column>(
+ new Column(0, c_index, c_timestamp, sizeof(c_data), c_data)));
+ columns_data_size += columns[1]->Size();
+
+ int8_t t_index = 2;
+ int32_t t_local_deletion_time = 1494022801;
+ int64_t t_marked_for_delete_at = 1494022807043;
+ columns.push_back(std::shared_ptr<Tombstone>(
+ new Tombstone(ColumnTypeMask::DELETION_MASK, t_index,
+ t_local_deletion_time, t_marked_for_delete_at)));
+ columns_data_size += columns[2]->Size();
+
+ RowValue r = RowValue(std::move(columns), last_modified_time);
+
+ EXPECT_EQ(r.Size(), columns_data_size + 12);
+ EXPECT_EQ(r.IsTombstone(), false);
+ EXPECT_EQ(r.LastModifiedTime(), last_modified_time);
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(r.Size() * 2);
+ r.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), r.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset),
+ std::numeric_limits<int32_t>::max());
+ offset += sizeof(int32_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset),
+ std::numeric_limits<int64_t>::min());
+ offset += sizeof(int64_t);
+
+ // Column0: ExpiringColumn
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset),
+ ColumnTypeMask::EXPIRATION_MASK);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), e_index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), e_timestamp);
+ offset += sizeof(int64_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(e_data));
+ offset += sizeof(int32_t);
+ EXPECT_TRUE(std::memcmp(e_data, dest.c_str() + offset, sizeof(e_data)) == 0);
+ offset += sizeof(e_data);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), e_ttl);
+ offset += sizeof(int32_t);
+
+ // Column1: Column
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), 0);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), c_index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), c_timestamp);
+ offset += sizeof(int64_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(c_data));
+ offset += sizeof(int32_t);
+ EXPECT_TRUE(std::memcmp(c_data, dest.c_str() + offset, sizeof(c_data)) == 0);
+ offset += sizeof(c_data);
+
+ // Column2: Tombstone
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset),
+ ColumnTypeMask::DELETION_MASK);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), t_index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), t_local_deletion_time);
+ offset += sizeof(int32_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), t_marked_for_delete_at);
+
+ // Verify the deserialization.
+ RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size());
+ EXPECT_EQ(r1.Size(), columns_data_size + 12);
+ EXPECT_EQ(r1.IsTombstone(), false);
+ EXPECT_EQ(r1.LastModifiedTime(), last_modified_time);
+
+ r1.Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * r.Size());
+ EXPECT_TRUE(std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) ==
+ 0);
+}
+
+TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) {
+ int64_t now = time(nullptr);
+
+ auto row_value = CreateTestRowValue(
+ {CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)),
+ CreateTestColumnSpec(kExpiringColumn, 1,
+ ToMicroSeconds(now - kTtl - 10)), // expired
+ CreateTestColumnSpec(kExpiringColumn, 2,
+ ToMicroSeconds(now)), // not expired
+ CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))});
+
+ bool changed = false;
+ auto purged = row_value.RemoveExpiredColumns(&changed);
+ EXPECT_TRUE(changed);
+ EXPECT_EQ(purged.get_columns().size(), 3);
+ VerifyRowValueColumns(purged.get_columns(), 0, kColumn, 0,
+ ToMicroSeconds(now));
+ VerifyRowValueColumns(purged.get_columns(), 1, kExpiringColumn, 2,
+ ToMicroSeconds(now));
+ VerifyRowValueColumns(purged.get_columns(), 2, kTombstone, 3,
+ ToMicroSeconds(now));
+
+ purged.RemoveExpiredColumns(&changed);
+ EXPECT_FALSE(changed);
+}
+
+TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) {
+ int64_t now = time(nullptr);
+
+ auto row_value = CreateTestRowValue(
+ {CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)),
+ CreateTestColumnSpec(kExpiringColumn, 1,
+ ToMicroSeconds(now - kTtl - 10)), // expired
+ CreateTestColumnSpec(kExpiringColumn, 2,
+ ToMicroSeconds(now)), // not expired
+ CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))});
+
+ bool changed = false;
+ auto compacted = row_value.ConvertExpiredColumnsToTombstones(&changed);
+ EXPECT_TRUE(changed);
+ EXPECT_EQ(compacted.get_columns().size(), 4);
+ VerifyRowValueColumns(compacted.get_columns(), 0, kColumn, 0,
+ ToMicroSeconds(now));
+ VerifyRowValueColumns(compacted.get_columns(), 1, kTombstone, 1,
+ ToMicroSeconds(now - 10));
+ VerifyRowValueColumns(compacted.get_columns(), 2, kExpiringColumn, 2,
+ ToMicroSeconds(now));
+ VerifyRowValueColumns(compacted.get_columns(), 3, kTombstone, 3,
+ ToMicroSeconds(now));
+
+ compacted.ConvertExpiredColumnsToTombstones(&changed);
+ EXPECT_FALSE(changed);
+}
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc b/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc
new file mode 100644
index 000000000..c5be836e8
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc
@@ -0,0 +1,446 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <iostream>
+
+#include "db/db_impl/db_impl.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/db.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "test_util/testharness.h"
+#include "util/cast_util.h"
+#include "util/random.h"
+#include "utilities/cassandra/cassandra_compaction_filter.h"
+#include "utilities/cassandra/merge_operator.h"
+#include "utilities/cassandra/test_utils.h"
+#include "utilities/merge_operators.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+// Path to the database on file system
+const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
+
+class CassandraStore {
+ public:
+ explicit CassandraStore(std::shared_ptr<DB> db)
+ : db_(db), write_option_(), get_option_() {
+ assert(db);
+ }
+
+ bool Append(const std::string& key, const RowValue& val) {
+ std::string result;
+ val.Serialize(&result);
+ Slice valSlice(result.data(), result.size());
+ auto s = db_->Merge(write_option_, key, valSlice);
+
+ if (s.ok()) {
+ return true;
+ } else {
+ std::cerr << "ERROR " << s.ToString() << std::endl;
+ return false;
+ }
+ }
+
+ bool Put(const std::string& key, const RowValue& val) {
+ std::string result;
+ val.Serialize(&result);
+ Slice valSlice(result.data(), result.size());
+ auto s = db_->Put(write_option_, key, valSlice);
+ if (s.ok()) {
+ return true;
+ } else {
+ std::cerr << "ERROR " << s.ToString() << std::endl;
+ return false;
+ }
+ }
+
+ Status Flush() {
+ Status s = dbfull()->TEST_FlushMemTable();
+ if (s.ok()) {
+ s = dbfull()->TEST_WaitForCompact();
+ }
+ return s;
+ }
+
+ Status Compact() {
+ return dbfull()->TEST_CompactRange(0, nullptr, nullptr,
+ db_->DefaultColumnFamily());
+ }
+
+ std::tuple<bool, RowValue> Get(const std::string& key) {
+ std::string result;
+ auto s = db_->Get(get_option_, key, &result);
+
+ if (s.ok()) {
+ return std::make_tuple(
+ true, RowValue::Deserialize(result.data(), result.size()));
+ }
+
+ if (!s.IsNotFound()) {
+ std::cerr << "ERROR " << s.ToString() << std::endl;
+ }
+
+ return std::make_tuple(false, RowValue(0, 0));
+ }
+
+ private:
+ std::shared_ptr<DB> db_;
+ WriteOptions write_option_;
+ ReadOptions get_option_;
+
+ DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_.get()); }
+};
+
+class TestCompactionFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
+ int32_t gc_grace_period_in_seconds)
+ : purge_ttl_on_expiration_(purge_ttl_on_expiration),
+ gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
+ purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
+ }
+
+ const char* Name() const override { return "TestCompactionFilterFactory"; }
+
+ private:
+ bool purge_ttl_on_expiration_;
+ int32_t gc_grace_period_in_seconds_;
+};
+
+// The class for unit-testing
+class CassandraFunctionalTest : public testing::Test {
+ public:
+ CassandraFunctionalTest() {
+ EXPECT_OK(
+ DestroyDB(kDbName, Options())); // Start each test with a fresh DB
+ }
+
+ std::shared_ptr<DB> OpenDb() {
+ DB* db;
+ Options options;
+ options.create_if_missing = true;
+ options.merge_operator.reset(
+ new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
+ auto* cf_factory = new TestCompactionFilterFactory(
+ purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
+ options.compaction_filter_factory.reset(cf_factory);
+ EXPECT_OK(DB::Open(options, kDbName, &db));
+ return std::shared_ptr<DB>(db);
+ }
+
+ bool purge_ttl_on_expiration_ = false;
+ int32_t gc_grace_period_in_seconds_ = 100;
+};
+
+// THE TEST CASES BEGIN HERE
+
+TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append(
+ "k1",
+ CreateTestRowValue({
+ CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
+ CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
+ CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
+ }));
+ store.Append(
+ "k1",
+ CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
+ CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
+ CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
+ CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
+ }));
+ store.Append(
+ "k1",
+ CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
+ CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
+ CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
+ CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
+ }));
+
+ auto ret = store.Get("k1");
+
+ ASSERT_TRUE(std::get<0>(ret));
+ RowValue& merged = std::get<1>(ret);
+ EXPECT_EQ(merged.get_columns().size(), 5);
+ VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 0,
+ ToMicroSeconds(now + 6));
+ VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 1,
+ ToMicroSeconds(now + 8));
+ VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 2,
+ ToMicroSeconds(now + 7));
+ VerifyRowValueColumns(merged.get_columns(), 3, kExpiringColumn, 7,
+ ToMicroSeconds(now + 17));
+ VerifyRowValueColumns(merged.get_columns(), 4, kTombstone, 11,
+ ToMicroSeconds(now + 11));
+}
+
+constexpr int64_t kTestTimeoutSecs = 600;
+
+TEST_F(CassandraFunctionalTest,
+ CompactionShouldConvertExpiredColumnsToTombstone) {
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append(
+ "k1",
+ CreateTestRowValue(
+ {CreateTestColumnSpec(kExpiringColumn, 0,
+ ToMicroSeconds(now - kTtl - 20)), // expired
+ CreateTestColumnSpec(
+ kExpiringColumn, 1,
+ ToMicroSeconds(now - kTtl + kTestTimeoutSecs)), // not expired
+ CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))}));
+
+ ASSERT_OK(store.Flush());
+
+ store.Append(
+ "k1",
+ CreateTestRowValue(
+ {CreateTestColumnSpec(kExpiringColumn, 0,
+ ToMicroSeconds(now - kTtl - 10)), // expired
+ CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))}));
+
+ ASSERT_OK(store.Flush());
+ ASSERT_OK(store.Compact());
+
+ auto ret = store.Get("k1");
+ ASSERT_TRUE(std::get<0>(ret));
+ RowValue& merged = std::get<1>(ret);
+ EXPECT_EQ(merged.get_columns().size(), 4);
+ VerifyRowValueColumns(merged.get_columns(), 0, kTombstone, 0,
+ ToMicroSeconds(now - 10));
+ VerifyRowValueColumns(merged.get_columns(), 1, kExpiringColumn, 1,
+ ToMicroSeconds(now - kTtl + kTestTimeoutSecs));
+ VerifyRowValueColumns(merged.get_columns(), 2, kColumn, 2,
+ ToMicroSeconds(now));
+ VerifyRowValueColumns(merged.get_columns(), 3, kTombstone, 3,
+ ToMicroSeconds(now));
+}
+
+TEST_F(CassandraFunctionalTest,
+ CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
+ purge_ttl_on_expiration_ = true;
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append(
+ "k1",
+ CreateTestRowValue(
+ {CreateTestColumnSpec(kExpiringColumn, 0,
+ ToMicroSeconds(now - kTtl - 20)), // expired
+ CreateTestColumnSpec(kExpiringColumn, 1,
+ ToMicroSeconds(now)), // not expired
+ CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))}));
+
+ ASSERT_OK(store.Flush());
+
+ store.Append(
+ "k1",
+ CreateTestRowValue(
+ {CreateTestColumnSpec(kExpiringColumn, 0,
+ ToMicroSeconds(now - kTtl - 10)), // expired
+ CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))}));
+
+ ASSERT_OK(store.Flush());
+ ASSERT_OK(store.Compact());
+
+ auto ret = store.Get("k1");
+ ASSERT_TRUE(std::get<0>(ret));
+ RowValue& merged = std::get<1>(ret);
+ EXPECT_EQ(merged.get_columns().size(), 3);
+ VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 1,
+ ToMicroSeconds(now));
+ VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 2,
+ ToMicroSeconds(now));
+ VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 3,
+ ToMicroSeconds(now));
+}
+
+TEST_F(CassandraFunctionalTest,
+ CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
+ purge_ttl_on_expiration_ = true;
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0,
+ ToMicroSeconds(now - kTtl - 20)),
+ CreateTestColumnSpec(kExpiringColumn, 1,
+ ToMicroSeconds(now - kTtl - 20)),
+ }));
+
+ ASSERT_OK(store.Flush());
+
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0,
+ ToMicroSeconds(now - kTtl - 10)),
+ }));
+
+ ASSERT_OK(store.Flush());
+ ASSERT_OK(store.Compact());
+ ASSERT_FALSE(std::get<0>(store.Get("k1")));
+}
+
+TEST_F(CassandraFunctionalTest,
+ CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
+ purge_ttl_on_expiration_ = true;
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append("k1",
+ CreateTestRowValue(
+ {CreateTestColumnSpec(
+ kTombstone, 0,
+ ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
+ CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))}));
+
+ store.Append("k2", CreateTestRowValue({CreateTestColumnSpec(
+ kColumn, 0, ToMicroSeconds(now))}));
+
+ ASSERT_OK(store.Flush());
+
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
+ }));
+
+ ASSERT_OK(store.Flush());
+ ASSERT_OK(store.Compact());
+
+ auto ret = store.Get("k1");
+ ASSERT_TRUE(std::get<0>(ret));
+ RowValue& gced = std::get<1>(ret);
+ EXPECT_EQ(gced.get_columns().size(), 1);
+ VerifyRowValueColumns(gced.get_columns(), 0, kColumn, 1, ToMicroSeconds(now));
+}
+
+TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
+ purge_ttl_on_expiration_ = true;
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Put("k1",
+ CreateTestRowValue({
+ CreateTestColumnSpec(
+ kTombstone, 0,
+ ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
+ }));
+
+ ASSERT_OK(store.Flush());
+ ASSERT_OK(store.Compact());
+ ASSERT_FALSE(std::get<0>(store.Get("k1")));
+}
+
+#ifndef ROCKSDB_LITE
+TEST_F(CassandraFunctionalTest, LoadMergeOperator) {
+ ConfigOptions config_options;
+ std::shared_ptr<MergeOperator> mo;
+ config_options.ignore_unsupported_options = false;
+
+ ASSERT_NOK(MergeOperator::CreateFromString(
+ config_options, CassandraValueMergeOperator::kClassName(), &mo));
+
+ config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+ "cassandra");
+
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options, CassandraValueMergeOperator::kClassName(), &mo));
+ ASSERT_NE(mo, nullptr);
+ ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
+ mo.reset();
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options,
+ std::string("operands_limit=20;gc_grace_period_in_seconds=42;id=") +
+ CassandraValueMergeOperator::kClassName(),
+ &mo));
+ ASSERT_NE(mo, nullptr);
+ ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
+ const auto* opts = mo->GetOptions<CassandraOptions>();
+ ASSERT_NE(opts, nullptr);
+ ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+ ASSERT_EQ(opts->operands_limit, 20);
+}
+
+TEST_F(CassandraFunctionalTest, LoadCompactionFilter) {
+ ConfigOptions config_options;
+ const CompactionFilter* filter = nullptr;
+ config_options.ignore_unsupported_options = false;
+
+ ASSERT_NOK(CompactionFilter::CreateFromString(
+ config_options, CassandraCompactionFilter::kClassName(), &filter));
+ config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+ "cassandra");
+
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options, CassandraCompactionFilter::kClassName(), &filter));
+ ASSERT_NE(filter, nullptr);
+ ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
+ delete filter;
+ filter = nullptr;
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options,
+ std::string(
+ "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
+ CassandraCompactionFilter::kClassName(),
+ &filter));
+ ASSERT_NE(filter, nullptr);
+ ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
+ const auto* opts = filter->GetOptions<CassandraOptions>();
+ ASSERT_NE(opts, nullptr);
+ ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+ ASSERT_TRUE(opts->purge_ttl_on_expiration);
+ delete filter;
+}
+
+TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) {
+ ConfigOptions config_options;
+ std::shared_ptr<CompactionFilterFactory> factory;
+
+ config_options.ignore_unsupported_options = false;
+ ASSERT_NOK(CompactionFilterFactory::CreateFromString(
+ config_options, CassandraCompactionFilterFactory::kClassName(),
+ &factory));
+ config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+ "cassandra");
+
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options, CassandraCompactionFilterFactory::kClassName(),
+ &factory));
+ ASSERT_NE(factory, nullptr);
+ ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
+ factory.reset();
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options,
+ std::string(
+ "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
+ CassandraCompactionFilterFactory::kClassName(),
+ &factory));
+ ASSERT_NE(factory, nullptr);
+ ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
+ const auto* opts = factory->GetOptions<CassandraOptions>();
+ ASSERT_NE(opts, nullptr);
+ ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+ ASSERT_TRUE(opts->purge_ttl_on_expiration);
+}
+#endif // ROCKSDB_LITE
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/cassandra/cassandra_options.h b/src/rocksdb/utilities/cassandra/cassandra_options.h
new file mode 100644
index 000000000..efa73a308
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_options.h
@@ -0,0 +1,43 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <string>
+
+#include "rocksdb/rocksdb_namespace.h"
+
+namespace ROCKSDB_NAMESPACE {
+class ObjectLibrary;
+namespace cassandra {
+struct CassandraOptions {
+ static const char* kName() { return "CassandraOptions"; }
+ CassandraOptions(int32_t _gc_grace_period_in_seconds, size_t _operands_limit,
+ bool _purge_ttl_on_expiration = false)
+ : operands_limit(_operands_limit),
+ gc_grace_period_in_seconds(_gc_grace_period_in_seconds),
+ purge_ttl_on_expiration(_purge_ttl_on_expiration) {}
+ // Limit on the number of merge operands.
+ size_t operands_limit;
+
+ // How long (in seconds) tombstoned data remains before it is purged
+ int32_t gc_grace_period_in_seconds;
+
+ // If is set to true, expired data will be directly purged.
+ // Otherwise expired data will be converted tombstones first,
+ // then be eventually removed after gc grace period. This value should
+ // only true if all writes have same ttl setting, otherwise it could bring old
+ // data back.
+ bool purge_ttl_on_expiration;
+};
+#ifndef ROCKSDB_LITE
+extern "C" {
+int RegisterCassandraObjects(ObjectLibrary& library, const std::string& arg);
+} // extern "C"
+#endif // ROCKSDB_LITE
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc b/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc
new file mode 100644
index 000000000..0b4a89287
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc
@@ -0,0 +1,98 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <memory>
+
+#include "test_util/testharness.h"
+#include "utilities/cassandra/format.h"
+#include "utilities/cassandra/test_utils.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+class RowValueMergeTest : public testing::Test {};
+
+TEST(RowValueMergeTest, Merge) {
+ std::vector<RowValue> row_values;
+ row_values.push_back(CreateTestRowValue({
+ CreateTestColumnSpec(kTombstone, 0, 5),
+ CreateTestColumnSpec(kColumn, 1, 8),
+ CreateTestColumnSpec(kExpiringColumn, 2, 5),
+ }));
+
+ row_values.push_back(CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, 2),
+ CreateTestColumnSpec(kExpiringColumn, 1, 5),
+ CreateTestColumnSpec(kTombstone, 2, 7),
+ CreateTestColumnSpec(kExpiringColumn, 7, 17),
+ }));
+
+ row_values.push_back(CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, 6),
+ CreateTestColumnSpec(kTombstone, 1, 5),
+ CreateTestColumnSpec(kColumn, 2, 4),
+ CreateTestColumnSpec(kTombstone, 11, 11),
+ }));
+
+ RowValue merged = RowValue::Merge(std::move(row_values));
+ EXPECT_FALSE(merged.IsTombstone());
+ EXPECT_EQ(merged.get_columns().size(), 5);
+ VerifyRowValueColumns(merged.get_columns(), 0, kExpiringColumn, 0, 6);
+ VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 1, 8);
+ VerifyRowValueColumns(merged.get_columns(), 2, kTombstone, 2, 7);
+ VerifyRowValueColumns(merged.get_columns(), 3, kExpiringColumn, 7, 17);
+ VerifyRowValueColumns(merged.get_columns(), 4, kTombstone, 11, 11);
+}
+
+TEST(RowValueMergeTest, MergeWithRowTombstone) {
+ std::vector<RowValue> row_values;
+
+ // A row tombstone.
+ row_values.push_back(CreateRowTombstone(11));
+
+ // This row's timestamp is smaller than tombstone.
+ row_values.push_back(CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, 5),
+ CreateTestColumnSpec(kColumn, 1, 6),
+ }));
+
+ // Some of the column's row is smaller, some is larger.
+ row_values.push_back(CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 2, 10),
+ CreateTestColumnSpec(kColumn, 3, 12),
+ }));
+
+ // All of the column's rows are larger than tombstone.
+ row_values.push_back(CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 4, 13),
+ CreateTestColumnSpec(kColumn, 5, 14),
+ }));
+
+ RowValue merged = RowValue::Merge(std::move(row_values));
+ EXPECT_FALSE(merged.IsTombstone());
+ EXPECT_EQ(merged.get_columns().size(), 3);
+ VerifyRowValueColumns(merged.get_columns(), 0, kColumn, 3, 12);
+ VerifyRowValueColumns(merged.get_columns(), 1, kColumn, 4, 13);
+ VerifyRowValueColumns(merged.get_columns(), 2, kColumn, 5, 14);
+
+ // If the tombstone's timestamp is the latest, then it returns a
+ // row tombstone.
+ row_values.push_back(CreateRowTombstone(15));
+
+ row_values.push_back(CreateRowTombstone(17));
+
+ merged = RowValue::Merge(std::move(row_values));
+ EXPECT_TRUE(merged.IsTombstone());
+ EXPECT_EQ(merged.LastModifiedTime(), 17);
+}
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc b/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc
new file mode 100644
index 000000000..c14d8fd80
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc
@@ -0,0 +1,164 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/testharness.h"
+#include "utilities/cassandra/serialize.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+TEST(SerializeTest, SerializeI64) {
+ std::string dest;
+ Serialize<int64_t>(0, &dest);
+ EXPECT_EQ(std::string({'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00',
+ '\x00'}),
+ dest);
+
+ dest.clear();
+ Serialize<int64_t>(1, &dest);
+ EXPECT_EQ(std::string({'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00',
+ '\x01'}),
+ dest);
+
+ dest.clear();
+ Serialize<int64_t>(-1, &dest);
+ EXPECT_EQ(std::string({'\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff',
+ '\xff'}),
+ dest);
+
+ dest.clear();
+ Serialize<int64_t>(9223372036854775807, &dest);
+ EXPECT_EQ(std::string({'\x7f', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff',
+ '\xff'}),
+ dest);
+
+ dest.clear();
+ Serialize<int64_t>(-9223372036854775807, &dest);
+ EXPECT_EQ(std::string({'\x80', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00',
+ '\x01'}),
+ dest);
+}
+
+TEST(SerializeTest, DeserializeI64) {
+ std::string dest;
+ std::size_t offset = dest.size();
+ Serialize<int64_t>(0, &dest);
+ EXPECT_EQ(0, Deserialize<int64_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int64_t>(1, &dest);
+ EXPECT_EQ(1, Deserialize<int64_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int64_t>(-1, &dest);
+ EXPECT_EQ(-1, Deserialize<int64_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int64_t>(-9223372036854775807, &dest);
+ EXPECT_EQ(-9223372036854775807, Deserialize<int64_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int64_t>(9223372036854775807, &dest);
+ EXPECT_EQ(9223372036854775807, Deserialize<int64_t>(dest.c_str(), offset));
+}
+
+TEST(SerializeTest, SerializeI32) {
+ std::string dest;
+ Serialize<int32_t>(0, &dest);
+ EXPECT_EQ(std::string({'\x00', '\x00', '\x00', '\x00'}), dest);
+
+ dest.clear();
+ Serialize<int32_t>(1, &dest);
+ EXPECT_EQ(std::string({'\x00', '\x00', '\x00', '\x01'}), dest);
+
+ dest.clear();
+ Serialize<int32_t>(-1, &dest);
+ EXPECT_EQ(std::string({'\xff', '\xff', '\xff', '\xff'}), dest);
+
+ dest.clear();
+ Serialize<int32_t>(2147483647, &dest);
+ EXPECT_EQ(std::string({'\x7f', '\xff', '\xff', '\xff'}), dest);
+
+ dest.clear();
+ Serialize<int32_t>(-2147483648LL, &dest);
+ EXPECT_EQ(std::string({'\x80', '\x00', '\x00', '\x00'}), dest);
+}
+
+TEST(SerializeTest, DeserializeI32) {
+ std::string dest;
+ std::size_t offset = dest.size();
+ Serialize<int32_t>(0, &dest);
+ EXPECT_EQ(0, Deserialize<int32_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int32_t>(1, &dest);
+ EXPECT_EQ(1, Deserialize<int32_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int32_t>(-1, &dest);
+ EXPECT_EQ(-1, Deserialize<int32_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int32_t>(2147483647, &dest);
+ EXPECT_EQ(2147483647, Deserialize<int32_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int32_t>(-2147483648LL, &dest);
+ EXPECT_EQ(-2147483648LL, Deserialize<int32_t>(dest.c_str(), offset));
+}
+
+TEST(SerializeTest, SerializeI8) {
+ std::string dest;
+ Serialize<int8_t>(0, &dest);
+ EXPECT_EQ(std::string({'\x00'}), dest);
+
+ dest.clear();
+ Serialize<int8_t>(1, &dest);
+ EXPECT_EQ(std::string({'\x01'}), dest);
+
+ dest.clear();
+ Serialize<int8_t>(-1, &dest);
+ EXPECT_EQ(std::string({'\xff'}), dest);
+
+ dest.clear();
+ Serialize<int8_t>(127, &dest);
+ EXPECT_EQ(std::string({'\x7f'}), dest);
+
+ dest.clear();
+ Serialize<int8_t>(-128, &dest);
+ EXPECT_EQ(std::string({'\x80'}), dest);
+}
+
+TEST(SerializeTest, DeserializeI8) {
+ std::string dest;
+ std::size_t offset = dest.size();
+ Serialize<int8_t>(0, &dest);
+ EXPECT_EQ(0, Deserialize<int8_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int8_t>(1, &dest);
+ EXPECT_EQ(1, Deserialize<int8_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int8_t>(-1, &dest);
+ EXPECT_EQ(-1, Deserialize<int8_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int8_t>(127, &dest);
+ EXPECT_EQ(127, Deserialize<int8_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int8_t>(-128, &dest);
+ EXPECT_EQ(-128, Deserialize<int8_t>(dest.c_str(), offset));
+}
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/cassandra/format.cc b/src/rocksdb/utilities/cassandra/format.cc
new file mode 100644
index 000000000..cc1dd2f28
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/format.cc
@@ -0,0 +1,367 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "format.h"
+
+#include <algorithm>
+#include <map>
+#include <memory>
+
+#include "utilities/cassandra/serialize.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+namespace {
+const int32_t kDefaultLocalDeletionTime = std::numeric_limits<int32_t>::max();
+const int64_t kDefaultMarkedForDeleteAt = std::numeric_limits<int64_t>::min();
+} // namespace
+
+ColumnBase::ColumnBase(int8_t mask, int8_t index)
+ : mask_(mask), index_(index) {}
+
+std::size_t ColumnBase::Size() const { return sizeof(mask_) + sizeof(index_); }
+
+int8_t ColumnBase::Mask() const { return mask_; }
+
+int8_t ColumnBase::Index() const { return index_; }
+
+void ColumnBase::Serialize(std::string* dest) const {
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(mask_, dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(index_, dest);
+}
+
+std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
+ std::size_t offset) {
+ int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
+ return Tombstone::Deserialize(src, offset);
+ } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
+ return ExpiringColumn::Deserialize(src, offset);
+ } else {
+ return Column::Deserialize(src, offset);
+ }
+}
+
+Column::Column(int8_t mask, int8_t index, int64_t timestamp, int32_t value_size,
+ const char* value)
+ : ColumnBase(mask, index),
+ timestamp_(timestamp),
+ value_size_(value_size),
+ value_(value) {}
+
+int64_t Column::Timestamp() const { return timestamp_; }
+
+std::size_t Column::Size() const {
+ return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_) +
+ value_size_;
+}
+
+void Column::Serialize(std::string* dest) const {
+ ColumnBase::Serialize(dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(timestamp_, dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(value_size_, dest);
+ dest->append(value_, value_size_);
+}
+
+std::shared_ptr<Column> Column::Deserialize(const char* src,
+ std::size_t offset) {
+ int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(mask);
+ int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(index);
+ int64_t timestamp =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
+ offset += sizeof(timestamp);
+ int32_t value_size =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ offset += sizeof(value_size);
+ return std::make_shared<Column>(mask, index, timestamp, value_size,
+ src + offset);
+}
+
+ExpiringColumn::ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
+ int32_t value_size, const char* value,
+ int32_t ttl)
+ : Column(mask, index, timestamp, value_size, value), ttl_(ttl) {}
+
+std::size_t ExpiringColumn::Size() const {
+ return Column::Size() + sizeof(ttl_);
+}
+
+void ExpiringColumn::Serialize(std::string* dest) const {
+ Column::Serialize(dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(ttl_, dest);
+}
+
+std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint()
+ const {
+ return std::chrono::time_point<std::chrono::system_clock>(
+ std::chrono::microseconds(Timestamp()));
+}
+
+std::chrono::seconds ExpiringColumn::Ttl() const {
+ return std::chrono::seconds(ttl_);
+}
+
+bool ExpiringColumn::Expired() const {
+ return TimePoint() + Ttl() < std::chrono::system_clock::now();
+}
+
+std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
+ auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
+ int32_t local_deletion_time = static_cast<int32_t>(
+ std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
+ int64_t marked_for_delete_at =
+ std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
+ return std::make_shared<Tombstone>(
+ static_cast<int8_t>(ColumnTypeMask::DELETION_MASK), Index(),
+ local_deletion_time, marked_for_delete_at);
+}
+
+std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
+ const char* src, std::size_t offset) {
+ int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(mask);
+ int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(index);
+ int64_t timestamp =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
+ offset += sizeof(timestamp);
+ int32_t value_size =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ offset += sizeof(value_size);
+ const char* value = src + offset;
+ offset += value_size;
+ int32_t ttl = ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ return std::make_shared<ExpiringColumn>(mask, index, timestamp, value_size,
+ value, ttl);
+}
+
+Tombstone::Tombstone(int8_t mask, int8_t index, int32_t local_deletion_time,
+ int64_t marked_for_delete_at)
+ : ColumnBase(mask, index),
+ local_deletion_time_(local_deletion_time),
+ marked_for_delete_at_(marked_for_delete_at) {}
+
+int64_t Tombstone::Timestamp() const { return marked_for_delete_at_; }
+
+std::size_t Tombstone::Size() const {
+ return ColumnBase::Size() + sizeof(local_deletion_time_) +
+ sizeof(marked_for_delete_at_);
+}
+
+void Tombstone::Serialize(std::string* dest) const {
+ ColumnBase::Serialize(dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
+}
+
+bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
+ auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
+ std::chrono::seconds(local_deletion_time_));
+ auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
+ return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
+}
+
+std::shared_ptr<Tombstone> Tombstone::Deserialize(const char* src,
+ std::size_t offset) {
+ int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(mask);
+ int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(index);
+ int32_t local_deletion_time =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ offset += sizeof(int32_t);
+ int64_t marked_for_delete_at =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
+ return std::make_shared<Tombstone>(mask, index, local_deletion_time,
+ marked_for_delete_at);
+}
+
+RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
+ : local_deletion_time_(local_deletion_time),
+ marked_for_delete_at_(marked_for_delete_at),
+ columns_(),
+ last_modified_time_(0) {}
+
+RowValue::RowValue(Columns columns, int64_t last_modified_time)
+ : local_deletion_time_(kDefaultLocalDeletionTime),
+ marked_for_delete_at_(kDefaultMarkedForDeleteAt),
+ columns_(std::move(columns)),
+ last_modified_time_(last_modified_time) {}
+
+std::size_t RowValue::Size() const {
+ std::size_t size =
+ sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_);
+ for (const auto& column : columns_) {
+ size += column->Size();
+ }
+ return size;
+}
+
+int64_t RowValue::LastModifiedTime() const {
+ if (IsTombstone()) {
+ return marked_for_delete_at_;
+ } else {
+ return last_modified_time_;
+ }
+}
+
+bool RowValue::IsTombstone() const {
+ return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
+}
+
+void RowValue::Serialize(std::string* dest) const {
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
+ for (const auto& column : columns_) {
+ column->Serialize(dest);
+ }
+}
+
+RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
+ *changed = false;
+ Columns new_columns;
+ for (auto& column : columns_) {
+ if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
+ std::shared_ptr<ExpiringColumn> expiring_column =
+ std::static_pointer_cast<ExpiringColumn>(column);
+
+ if (expiring_column->Expired()) {
+ *changed = true;
+ continue;
+ }
+ }
+
+ new_columns.push_back(column);
+ }
+ return RowValue(std::move(new_columns), last_modified_time_);
+}
+
+RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
+ *changed = false;
+ Columns new_columns;
+ for (auto& column : columns_) {
+ if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
+ std::shared_ptr<ExpiringColumn> expiring_column =
+ std::static_pointer_cast<ExpiringColumn>(column);
+
+ if (expiring_column->Expired()) {
+ std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
+ new_columns.push_back(tombstone);
+ *changed = true;
+ continue;
+ }
+ }
+ new_columns.push_back(column);
+ }
+ return RowValue(std::move(new_columns), last_modified_time_);
+}
+
+RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
+ Columns new_columns;
+ for (auto& column : columns_) {
+ if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
+ std::shared_ptr<Tombstone> tombstone =
+ std::static_pointer_cast<Tombstone>(column);
+
+ if (tombstone->Collectable(gc_grace_period)) {
+ continue;
+ }
+ }
+
+ new_columns.push_back(column);
+ }
+ return RowValue(std::move(new_columns), last_modified_time_);
+}
+
+bool RowValue::Empty() const { return columns_.empty(); }
+
+RowValue RowValue::Deserialize(const char* src, std::size_t size) {
+ std::size_t offset = 0;
+ assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
+ int32_t local_deletion_time =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ offset += sizeof(int32_t);
+ int64_t marked_for_delete_at =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
+ offset += sizeof(int64_t);
+ if (offset == size) {
+ return RowValue(local_deletion_time, marked_for_delete_at);
+ }
+
+ assert(local_deletion_time == kDefaultLocalDeletionTime);
+ assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
+ Columns columns;
+ int64_t last_modified_time = 0;
+ while (offset < size) {
+ auto c = ColumnBase::Deserialize(src, offset);
+ offset += c->Size();
+ assert(offset <= size);
+ last_modified_time = std::max(last_modified_time, c->Timestamp());
+ columns.push_back(std::move(c));
+ }
+
+ return RowValue(std::move(columns), last_modified_time);
+}
+
+// Merge multiple row values into one.
+// For each column in rows with same index, we pick the one with latest
+// timestamp. And we also take row tombstone into consideration, by iterating
+// each row from reverse timestamp order, and stop once we hit the first
+// row tombstone.
+RowValue RowValue::Merge(std::vector<RowValue>&& values) {
+ assert(values.size() > 0);
+ if (values.size() == 1) {
+ return std::move(values[0]);
+ }
+
+ // Merge columns by their last modified time, and skip once we hit
+ // a row tombstone.
+ std::sort(values.begin(), values.end(),
+ [](const RowValue& r1, const RowValue& r2) {
+ return r1.LastModifiedTime() > r2.LastModifiedTime();
+ });
+
+ std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
+ int64_t tombstone_timestamp = 0;
+
+ for (auto& value : values) {
+ if (value.IsTombstone()) {
+ if (merged_columns.size() == 0) {
+ return std::move(value);
+ }
+ tombstone_timestamp = value.LastModifiedTime();
+ break;
+ }
+ for (auto& column : value.columns_) {
+ int8_t index = column->Index();
+ if (merged_columns.find(index) == merged_columns.end()) {
+ merged_columns[index] = column;
+ } else {
+ if (column->Timestamp() > merged_columns[index]->Timestamp()) {
+ merged_columns[index] = column;
+ }
+ }
+ }
+ }
+
+ int64_t last_modified_time = 0;
+ Columns columns;
+ for (auto& pair : merged_columns) {
+ // For some row, its last_modified_time > row tombstone_timestamp, but
+ // it might have rows whose timestamp is ealier than tombstone, so we
+ // ned to filter these rows.
+ if (pair.second->Timestamp() <= tombstone_timestamp) {
+ continue;
+ }
+ last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
+ columns.push_back(std::move(pair.second));
+ }
+ return RowValue(std::move(columns), last_modified_time);
+}
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/format.h b/src/rocksdb/utilities/cassandra/format.h
new file mode 100644
index 000000000..1b2714735
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/format.h
@@ -0,0 +1,183 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+/**
+ * The encoding of Cassandra Row Value.
+ *
+ * A Cassandra Row Value could either be a row tombstone,
+ * or contains multiple columns, it has following fields:
+ *
+ * struct row_value {
+ * int32_t local_deletion_time; // Time in second when the row is deleted,
+ * // only used for Cassandra tombstone gc.
+ * int64_t marked_for_delete_at; // Ms that marked this row is deleted.
+ * struct column_base columns[]; // For non tombstone row, all columns
+ * // are stored here.
+ * }
+ *
+ * If the local_deletion_time and marked_for_delete_at is set, then this is
+ * a tombstone, otherwise it contains multiple columns.
+ *
+ * There are three type of Columns: Normal Column, Expiring Column and Column
+ * Tombstone, which have following fields:
+ *
+ * // Identify the type of the column.
+ * enum mask {
+ * DELETION_MASK = 0x01,
+ * EXPIRATION_MASK = 0x02,
+ * };
+ *
+ * struct column {
+ * int8_t mask = 0;
+ * int8_t index;
+ * int64_t timestamp;
+ * int32_t value_length;
+ * char value[value_length];
+ * }
+ *
+ * struct expiring_column {
+ * int8_t mask = mask.EXPIRATION_MASK;
+ * int8_t index;
+ * int64_t timestamp;
+ * int32_t value_length;
+ * char value[value_length];
+ * int32_t ttl;
+ * }
+ *
+ * struct tombstone_column {
+ * int8_t mask = mask.DELETION_MASK;
+ * int8_t index;
+ * int32_t local_deletion_time; // Similar to row_value's field.
+ * int64_t marked_for_delete_at;
+ * }
+ */
+
+#pragma once
+#include <chrono>
+#include <memory>
+#include <vector>
+
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+// Identify the type of the column.
+enum ColumnTypeMask {
+ DELETION_MASK = 0x01,
+ EXPIRATION_MASK = 0x02,
+};
+
+class ColumnBase {
+ public:
+ ColumnBase(int8_t mask, int8_t index);
+ virtual ~ColumnBase() = default;
+
+ virtual int64_t Timestamp() const = 0;
+ virtual int8_t Mask() const;
+ virtual int8_t Index() const;
+ virtual std::size_t Size() const;
+ virtual void Serialize(std::string* dest) const;
+ static std::shared_ptr<ColumnBase> Deserialize(const char* src,
+ std::size_t offset);
+
+ private:
+ int8_t mask_;
+ int8_t index_;
+};
+
+class Column : public ColumnBase {
+ public:
+ Column(int8_t mask, int8_t index, int64_t timestamp, int32_t value_size,
+ const char* value);
+
+ virtual int64_t Timestamp() const override;
+ virtual std::size_t Size() const override;
+ virtual void Serialize(std::string* dest) const override;
+ static std::shared_ptr<Column> Deserialize(const char* src,
+ std::size_t offset);
+
+ private:
+ int64_t timestamp_;
+ int32_t value_size_;
+ const char* value_;
+};
+
+class Tombstone : public ColumnBase {
+ public:
+ Tombstone(int8_t mask, int8_t index, int32_t local_deletion_time,
+ int64_t marked_for_delete_at);
+
+ virtual int64_t Timestamp() const override;
+ virtual std::size_t Size() const override;
+ virtual void Serialize(std::string* dest) const override;
+ bool Collectable(int32_t gc_grace_period) const;
+ static std::shared_ptr<Tombstone> Deserialize(const char* src,
+ std::size_t offset);
+
+ private:
+ int32_t local_deletion_time_;
+ int64_t marked_for_delete_at_;
+};
+
+class ExpiringColumn : public Column {
+ public:
+ ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
+ int32_t value_size, const char* value, int32_t ttl);
+
+ virtual std::size_t Size() const override;
+ virtual void Serialize(std::string* dest) const override;
+ bool Expired() const;
+ std::shared_ptr<Tombstone> ToTombstone() const;
+
+ static std::shared_ptr<ExpiringColumn> Deserialize(const char* src,
+ std::size_t offset);
+
+ private:
+ int32_t ttl_;
+ std::chrono::time_point<std::chrono::system_clock> TimePoint() const;
+ std::chrono::seconds Ttl() const;
+};
+
+using Columns = std::vector<std::shared_ptr<ColumnBase>>;
+
+class RowValue {
+ public:
+ // Create a Row Tombstone.
+ RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at);
+ // Create a Row containing columns.
+ RowValue(Columns columns, int64_t last_modified_time);
+ RowValue(const RowValue& /*that*/) = delete;
+ RowValue(RowValue&& /*that*/) noexcept = default;
+ RowValue& operator=(const RowValue& /*that*/) = delete;
+ RowValue& operator=(RowValue&& /*that*/) = default;
+
+ std::size_t Size() const;
+ bool IsTombstone() const;
+ // For Tombstone this returns the marked_for_delete_at_,
+ // otherwise it returns the max timestamp of containing columns.
+ int64_t LastModifiedTime() const;
+ void Serialize(std::string* dest) const;
+ RowValue RemoveExpiredColumns(bool* changed) const;
+ RowValue ConvertExpiredColumnsToTombstones(bool* changed) const;
+ RowValue RemoveTombstones(int32_t gc_grace_period) const;
+ bool Empty() const;
+
+ static RowValue Deserialize(const char* src, std::size_t size);
+ // Merge multiple rows according to their timestamp.
+ static RowValue Merge(std::vector<RowValue>&& values);
+
+ const Columns& get_columns() { return columns_; }
+
+ private:
+ int32_t local_deletion_time_;
+ int64_t marked_for_delete_at_;
+ Columns columns_;
+ int64_t last_modified_time_;
+};
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/merge_operator.cc b/src/rocksdb/utilities/cassandra/merge_operator.cc
new file mode 100644
index 000000000..bde5dcbad
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/merge_operator.cc
@@ -0,0 +1,82 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "merge_operator.h"
+
+#include <assert.h>
+
+#include <memory>
+
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/options_type.h"
+#include "utilities/cassandra/format.h"
+#include "utilities/merge_operators.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+static std::unordered_map<std::string, OptionTypeInfo>
+ merge_operator_options_info = {
+#ifndef ROCKSDB_LITE
+ {"gc_grace_period_in_seconds",
+ {offsetof(struct CassandraOptions, gc_grace_period_in_seconds),
+ OptionType::kUInt32T, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+ {"operands_limit",
+ {offsetof(struct CassandraOptions, operands_limit), OptionType::kSizeT,
+ OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
+#endif // ROCKSDB_LITE
+};
+
+CassandraValueMergeOperator::CassandraValueMergeOperator(
+ int32_t gc_grace_period_in_seconds, size_t operands_limit)
+ : options_(gc_grace_period_in_seconds, operands_limit) {
+ RegisterOptions(&options_, &merge_operator_options_info);
+}
+
+// Implementation for the merge operation (merges two Cassandra values)
+bool CassandraValueMergeOperator::FullMergeV2(
+ const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const {
+ // Clear the *new_value for writing.
+ merge_out->new_value.clear();
+ std::vector<RowValue> row_values;
+ if (merge_in.existing_value) {
+ row_values.push_back(RowValue::Deserialize(
+ merge_in.existing_value->data(), merge_in.existing_value->size()));
+ }
+
+ for (auto& operand : merge_in.operand_list) {
+ row_values.push_back(RowValue::Deserialize(operand.data(), operand.size()));
+ }
+
+ RowValue merged = RowValue::Merge(std::move(row_values));
+ merged = merged.RemoveTombstones(options_.gc_grace_period_in_seconds);
+ merge_out->new_value.reserve(merged.Size());
+ merged.Serialize(&(merge_out->new_value));
+
+ return true;
+}
+
+bool CassandraValueMergeOperator::PartialMergeMulti(
+ const Slice& /*key*/, const std::deque<Slice>& operand_list,
+ std::string* new_value, Logger* /*logger*/) const {
+ // Clear the *new_value for writing.
+ assert(new_value);
+ new_value->clear();
+
+ std::vector<RowValue> row_values;
+ for (auto& operand : operand_list) {
+ row_values.push_back(RowValue::Deserialize(operand.data(), operand.size()));
+ }
+ RowValue merged = RowValue::Merge(std::move(row_values));
+ new_value->reserve(merged.Size());
+ merged.Serialize(new_value);
+ return true;
+}
+
+} // namespace cassandra
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/merge_operator.h b/src/rocksdb/utilities/cassandra/merge_operator.h
new file mode 100644
index 000000000..af8725db7
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/merge_operator.h
@@ -0,0 +1,44 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+#include "utilities/cassandra/cassandra_options.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+/**
+ * A MergeOperator for rocksdb that implements Cassandra row value merge.
+ */
+class CassandraValueMergeOperator : public MergeOperator {
+ public:
+ explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds,
+ size_t operands_limit = 0);
+
+ virtual bool FullMergeV2(const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const override;
+
+ virtual bool PartialMergeMulti(const Slice& key,
+ const std::deque<Slice>& operand_list,
+ std::string* new_value,
+ Logger* logger) const override;
+
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "CassandraValueMergeOperator"; }
+
+ virtual bool AllowSingleOperand() const override { return true; }
+
+ virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
+ return options_.operands_limit > 0 &&
+ operands.size() >= options_.operands_limit;
+ }
+
+ private:
+ CassandraOptions options_;
+};
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/serialize.h b/src/rocksdb/utilities/cassandra/serialize.h
new file mode 100644
index 000000000..4bd552bfc
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/serialize.h
@@ -0,0 +1,81 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+/**
+ * Helper functions which serialize and deserialize integers
+ * into bytes in big endian.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+
+#include "rocksdb/rocksdb_namespace.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+namespace {
+const int64_t kCharMask = 0xFFLL;
+const int32_t kBitsPerByte = 8;
+} // namespace
+
+template <typename T>
+void Serialize(T val, std::string* dest);
+
+template <typename T>
+T Deserialize(const char* src, std::size_t offset = 0);
+
+// Specializations
+template <>
+inline void Serialize<int8_t>(int8_t t, std::string* dest) {
+ dest->append(1, static_cast<char>(t & kCharMask));
+}
+
+template <>
+inline void Serialize<int32_t>(int32_t t, std::string* dest) {
+ for (unsigned long i = 0; i < sizeof(int32_t); i++) {
+ dest->append(
+ 1, static_cast<char>((t >> (sizeof(int32_t) - 1 - i) * kBitsPerByte) &
+ kCharMask));
+ }
+}
+
+template <>
+inline void Serialize<int64_t>(int64_t t, std::string* dest) {
+ for (unsigned long i = 0; i < sizeof(int64_t); i++) {
+ dest->append(
+ 1, static_cast<char>((t >> (sizeof(int64_t) - 1 - i) * kBitsPerByte) &
+ kCharMask));
+ }
+}
+
+template <>
+inline int8_t Deserialize<int8_t>(const char* src, std::size_t offset) {
+ return static_cast<int8_t>(src[offset]);
+}
+
+template <>
+inline int32_t Deserialize<int32_t>(const char* src, std::size_t offset) {
+ int32_t result = 0;
+ for (unsigned long i = 0; i < sizeof(int32_t); i++) {
+ result |= static_cast<int32_t>(static_cast<unsigned char>(src[offset + i]))
+ << ((sizeof(int32_t) - 1 - i) * kBitsPerByte);
+ }
+ return result;
+}
+
+template <>
+inline int64_t Deserialize<int64_t>(const char* src, std::size_t offset) {
+ int64_t result = 0;
+ for (unsigned long i = 0; i < sizeof(int64_t); i++) {
+ result |= static_cast<int64_t>(static_cast<unsigned char>(src[offset + i]))
+ << ((sizeof(int64_t) - 1 - i) * kBitsPerByte);
+ }
+ return result;
+}
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/test_utils.cc b/src/rocksdb/utilities/cassandra/test_utils.cc
new file mode 100644
index 000000000..ec6e5752d
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/test_utils.cc
@@ -0,0 +1,69 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_utils.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+const char kData[] = {'d', 'a', 't', 'a'};
+const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'};
+const int32_t kTtl = 86400;
+const int8_t kColumn = 0;
+const int8_t kTombstone = 1;
+const int8_t kExpiringColumn = 2;
+
+std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask, int8_t index,
+ int64_t timestamp) {
+ if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
+ return std::shared_ptr<Tombstone>(
+ new Tombstone(mask, index, ToSeconds(timestamp), timestamp));
+ } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
+ return std::shared_ptr<ExpiringColumn>(new ExpiringColumn(
+ mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl));
+ } else {
+ return std::shared_ptr<Column>(
+ new Column(mask, index, timestamp, sizeof(kData), kData));
+ }
+}
+
+std::tuple<int8_t, int8_t, int64_t> CreateTestColumnSpec(int8_t mask,
+ int8_t index,
+ int64_t timestamp) {
+ return std::make_tuple(mask, index, timestamp);
+}
+
+RowValue CreateTestRowValue(
+ std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs) {
+ std::vector<std::shared_ptr<ColumnBase>> columns;
+ int64_t last_modified_time = 0;
+ for (auto spec : column_specs) {
+ auto c = CreateTestColumn(std::get<0>(spec), std::get<1>(spec),
+ std::get<2>(spec));
+ last_modified_time = std::max(last_modified_time, c->Timestamp());
+ columns.push_back(std::move(c));
+ }
+ return RowValue(std::move(columns), last_modified_time);
+}
+
+RowValue CreateRowTombstone(int64_t timestamp) {
+ return RowValue(ToSeconds(timestamp), timestamp);
+}
+
+void VerifyRowValueColumns(
+ const std::vector<std::shared_ptr<ColumnBase>> &columns,
+ std::size_t index_of_vector, int8_t expected_mask, int8_t expected_index,
+ int64_t expected_timestamp) {
+ EXPECT_EQ(expected_timestamp, columns[index_of_vector]->Timestamp());
+ EXPECT_EQ(expected_mask, columns[index_of_vector]->Mask());
+ EXPECT_EQ(expected_index, columns[index_of_vector]->Index());
+}
+
+int64_t ToMicroSeconds(int64_t seconds) { return seconds * (int64_t)1000000; }
+
+int32_t ToSeconds(int64_t microseconds) {
+ return (int32_t)(microseconds / (int64_t)1000000);
+}
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/test_utils.h b/src/rocksdb/utilities/cassandra/test_utils.h
new file mode 100644
index 000000000..be23f7076
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/test_utils.h
@@ -0,0 +1,42 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include <memory>
+
+#include "test_util/testharness.h"
+#include "utilities/cassandra/format.h"
+#include "utilities/cassandra/serialize.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+extern const char kData[];
+extern const char kExpiringData[];
+extern const int32_t kTtl;
+extern const int8_t kColumn;
+extern const int8_t kTombstone;
+extern const int8_t kExpiringColumn;
+
+std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask, int8_t index,
+ int64_t timestamp);
+
+std::tuple<int8_t, int8_t, int64_t> CreateTestColumnSpec(int8_t mask,
+ int8_t index,
+ int64_t timestamp);
+
+RowValue CreateTestRowValue(
+ std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs);
+
+RowValue CreateRowTombstone(int64_t timestamp);
+
+void VerifyRowValueColumns(
+ const std::vector<std::shared_ptr<ColumnBase>> &columns,
+ std::size_t index_of_vector, int8_t expected_mask, int8_t expected_index,
+ int64_t expected_timestamp);
+
+int64_t ToMicroSeconds(int64_t seconds);
+int32_t ToSeconds(int64_t microseconds);
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE