summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/cassandra
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc47
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h42
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_format_test.cc367
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_functional_test.cc311
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc112
-rw-r--r--src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc188
-rw-r--r--src/rocksdb/utilities/cassandra/format.cc390
-rw-r--r--src/rocksdb/utilities/cassandra/format.h197
-rw-r--r--src/rocksdb/utilities/cassandra/merge_operator.cc67
-rw-r--r--src/rocksdb/utilities/cassandra/merge_operator.h44
-rw-r--r--src/rocksdb/utilities/cassandra/serialize.h75
-rw-r--r--src/rocksdb/utilities/cassandra/test_utils.cc75
-rw-r--r--src/rocksdb/utilities/cassandra/test_utils.h46
13 files changed, 1961 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc
new file mode 100644
index 000000000..f0a00e4d1
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.cc
@@ -0,0 +1,47 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "utilities/cassandra/cassandra_compaction_filter.h"
+#include <string>
+#include "rocksdb/slice.h"
+#include "utilities/cassandra/format.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+const char* CassandraCompactionFilter::Name() const {
+ return "CassandraCompactionFilter";
+}
+
+CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
+ int /*level*/, const Slice& /*key*/, ValueType value_type,
+ const Slice& existing_value, std::string* new_value,
+ std::string* /*skip_until*/) const {
+ bool value_changed = false;
+ RowValue row_value = RowValue::Deserialize(
+ existing_value.data(), existing_value.size());
+ RowValue compacted =
+ purge_ttl_on_expiration_
+ ? row_value.RemoveExpiredColumns(&value_changed)
+ : row_value.ConvertExpiredColumnsToTombstones(&value_changed);
+
+ if (value_type == ValueType::kValue) {
+ compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_);
+ }
+
+ if(compacted.Empty()) {
+ return Decision::kRemove;
+ }
+
+ if (value_changed) {
+ compacted.Serialize(new_value);
+ return Decision::kChangeValue;
+ }
+
+ return Decision::kKeep;
+}
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h
new file mode 100644
index 000000000..ac2588106
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_compaction_filter.h
@@ -0,0 +1,42 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include <string>
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/slice.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+/**
+ * Compaction filter for removing expired Cassandra data with ttl.
+ * If option `purge_ttl_on_expiration` is set to true, expired data
+ * will be directly purged. Otherwise expired data will be converted
+ * tombstones first, then be eventally removed after gc grace period.
+ * `purge_ttl_on_expiration` should only be on in the case all the
+ * writes have same ttl setting, otherwise it could bring old data back.
+ *
+ * Compaction filter is also in charge of removing tombstone that has been
+ * promoted to kValue type after serials of merging in compaction.
+ */
+class CassandraCompactionFilter : public CompactionFilter {
+public:
+ explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
+ int32_t gc_grace_period_in_seconds)
+ : purge_ttl_on_expiration_(purge_ttl_on_expiration),
+ gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
+
+ const char* Name() const override;
+ virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
+ const Slice& existing_value, std::string* new_value,
+ std::string* skip_until) const override;
+
+private:
+ bool purge_ttl_on_expiration_;
+ int32_t gc_grace_period_in_seconds_;
+};
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/cassandra_format_test.cc b/src/rocksdb/utilities/cassandra/cassandra_format_test.cc
new file mode 100644
index 000000000..a8e6ad3f1
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_format_test.cc
@@ -0,0 +1,367 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <cstring>
+#include <memory>
+#include "test_util/testharness.h"
+#include "utilities/cassandra/format.h"
+#include "utilities/cassandra/serialize.h"
+#include "utilities/cassandra/test_utils.h"
+
+using namespace ROCKSDB_NAMESPACE::cassandra;
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+TEST(ColumnTest, Column) {
+ char data[4] = {'d', 'a', 't', 'a'};
+ int8_t mask = 0;
+ int8_t index = 1;
+ int64_t timestamp = 1494022807044;
+ Column c = Column(mask, index, timestamp, sizeof(data), data);
+
+ EXPECT_EQ(c.Index(), index);
+ EXPECT_EQ(c.Timestamp(), timestamp);
+ EXPECT_EQ(c.Size(), 14 + sizeof(data));
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(c.Size() * 2);
+ c.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), c.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), timestamp);
+ offset += sizeof(int64_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(data));
+ offset += sizeof(int32_t);
+ EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0);
+
+ // Verify the deserialization.
+ std::string saved_dest = dest;
+ std::shared_ptr<Column> c1 = Column::Deserialize(saved_dest.c_str(), 0);
+ EXPECT_EQ(c1->Index(), index);
+ EXPECT_EQ(c1->Timestamp(), timestamp);
+ EXPECT_EQ(c1->Size(), 14 + sizeof(data));
+
+ c1->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * c.Size());
+ EXPECT_TRUE(
+ std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
+
+ // Verify the ColumnBase::Deserialization.
+ saved_dest = dest;
+ std::shared_ptr<ColumnBase> c2 =
+ ColumnBase::Deserialize(saved_dest.c_str(), c.Size());
+ c2->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 3 * c.Size());
+ EXPECT_TRUE(
+ std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
+ == 0);
+}
+
+TEST(ExpiringColumnTest, ExpiringColumn) {
+ char data[4] = {'d', 'a', 't', 'a'};
+ int8_t mask = ColumnTypeMask::EXPIRATION_MASK;
+ int8_t index = 3;
+ int64_t timestamp = 1494022807044;
+ int32_t ttl = 3600;
+ ExpiringColumn c = ExpiringColumn(mask, index, timestamp,
+ sizeof(data), data, ttl);
+
+ EXPECT_EQ(c.Index(), index);
+ EXPECT_EQ(c.Timestamp(), timestamp);
+ EXPECT_EQ(c.Size(), 18 + sizeof(data));
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(c.Size() * 2);
+ c.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), c.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), timestamp);
+ offset += sizeof(int64_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(data));
+ offset += sizeof(int32_t);
+ EXPECT_TRUE(std::memcmp(data, dest.c_str() + offset, sizeof(data)) == 0);
+ offset += sizeof(data);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), ttl);
+
+ // Verify the deserialization.
+ std::string saved_dest = dest;
+ std::shared_ptr<ExpiringColumn> c1 =
+ ExpiringColumn::Deserialize(saved_dest.c_str(), 0);
+ EXPECT_EQ(c1->Index(), index);
+ EXPECT_EQ(c1->Timestamp(), timestamp);
+ EXPECT_EQ(c1->Size(), 18 + sizeof(data));
+
+ c1->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * c.Size());
+ EXPECT_TRUE(
+ std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
+
+ // Verify the ColumnBase::Deserialization.
+ saved_dest = dest;
+ std::shared_ptr<ColumnBase> c2 =
+ ColumnBase::Deserialize(saved_dest.c_str(), c.Size());
+ c2->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 3 * c.Size());
+ EXPECT_TRUE(
+ std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
+ == 0);
+}
+
+TEST(TombstoneTest, TombstoneCollectable) {
+ int32_t now = (int32_t)time(nullptr);
+ int32_t gc_grace_seconds = 16440;
+ int32_t time_delta_seconds = 10;
+ EXPECT_TRUE(Tombstone(ColumnTypeMask::DELETION_MASK, 0,
+ now - gc_grace_seconds - time_delta_seconds,
+ ToMicroSeconds(now - gc_grace_seconds - time_delta_seconds))
+ .Collectable(gc_grace_seconds));
+ EXPECT_FALSE(Tombstone(ColumnTypeMask::DELETION_MASK, 0,
+ now - gc_grace_seconds + time_delta_seconds,
+ ToMicroSeconds(now - gc_grace_seconds + time_delta_seconds))
+ .Collectable(gc_grace_seconds));
+}
+
+TEST(TombstoneTest, Tombstone) {
+ int8_t mask = ColumnTypeMask::DELETION_MASK;
+ int8_t index = 2;
+ int32_t local_deletion_time = 1494022807;
+ int64_t marked_for_delete_at = 1494022807044;
+ Tombstone c = Tombstone(mask, index, local_deletion_time,
+ marked_for_delete_at);
+
+ EXPECT_EQ(c.Index(), index);
+ EXPECT_EQ(c.Timestamp(), marked_for_delete_at);
+ EXPECT_EQ(c.Size(), 14);
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(c.Size() * 2);
+ c.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), c.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), mask);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), local_deletion_time);
+ offset += sizeof(int32_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at);
+
+ // Verify the deserialization.
+ std::shared_ptr<Tombstone> c1 = Tombstone::Deserialize(dest.c_str(), 0);
+ EXPECT_EQ(c1->Index(), index);
+ EXPECT_EQ(c1->Timestamp(), marked_for_delete_at);
+ EXPECT_EQ(c1->Size(), 14);
+
+ c1->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * c.Size());
+ EXPECT_TRUE(
+ std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
+
+ // Verify the ColumnBase::Deserialization.
+ std::shared_ptr<ColumnBase> c2 =
+ ColumnBase::Deserialize(dest.c_str(), c.Size());
+ c2->Serialize(&dest);
+ EXPECT_EQ(dest.size(), 3 * c.Size());
+ EXPECT_TRUE(
+ std::memcmp(dest.c_str() + c.Size(), dest.c_str() + c.Size() * 2, c.Size())
+ == 0);
+}
+
+TEST(RowValueTest, RowTombstone) {
+ int32_t local_deletion_time = 1494022807;
+ int64_t marked_for_delete_at = 1494022807044;
+ RowValue r = RowValue(local_deletion_time, marked_for_delete_at);
+
+ EXPECT_EQ(r.Size(), 12);
+ EXPECT_EQ(r.IsTombstone(), true);
+ EXPECT_EQ(r.LastModifiedTime(), marked_for_delete_at);
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(r.Size() * 2);
+ r.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), r.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), local_deletion_time);
+ offset += sizeof(int32_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at);
+
+ // Verify the deserialization.
+ RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size());
+ EXPECT_EQ(r1.Size(), 12);
+ EXPECT_EQ(r1.IsTombstone(), true);
+ EXPECT_EQ(r1.LastModifiedTime(), marked_for_delete_at);
+
+ r1.Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * r.Size());
+ EXPECT_TRUE(
+ std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0);
+}
+
+TEST(RowValueTest, RowWithColumns) {
+ std::vector<std::shared_ptr<ColumnBase>> columns;
+ int64_t last_modified_time = 1494022807048;
+ std::size_t columns_data_size = 0;
+
+ char e_data[5] = {'e', 'd', 'a', 't', 'a'};
+ int8_t e_index = 0;
+ int64_t e_timestamp = 1494022807044;
+ int32_t e_ttl = 3600;
+ columns.push_back(std::shared_ptr<ExpiringColumn>(
+ new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index,
+ e_timestamp, sizeof(e_data), e_data, e_ttl)));
+ columns_data_size += columns[0]->Size();
+
+ char c_data[4] = {'d', 'a', 't', 'a'};
+ int8_t c_index = 1;
+ int64_t c_timestamp = 1494022807048;
+ columns.push_back(std::shared_ptr<Column>(
+ new Column(0, c_index, c_timestamp, sizeof(c_data), c_data)));
+ columns_data_size += columns[1]->Size();
+
+ int8_t t_index = 2;
+ int32_t t_local_deletion_time = 1494022801;
+ int64_t t_marked_for_delete_at = 1494022807043;
+ columns.push_back(std::shared_ptr<Tombstone>(
+ new Tombstone(ColumnTypeMask::DELETION_MASK,
+ t_index, t_local_deletion_time, t_marked_for_delete_at)));
+ columns_data_size += columns[2]->Size();
+
+ RowValue r = RowValue(std::move(columns), last_modified_time);
+
+ EXPECT_EQ(r.Size(), columns_data_size + 12);
+ EXPECT_EQ(r.IsTombstone(), false);
+ EXPECT_EQ(r.LastModifiedTime(), last_modified_time);
+
+ // Verify the serialization.
+ std::string dest;
+ dest.reserve(r.Size() * 2);
+ r.Serialize(&dest);
+
+ EXPECT_EQ(dest.size(), r.Size());
+ std::size_t offset = 0;
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset),
+ std::numeric_limits<int32_t>::max());
+ offset += sizeof(int32_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset),
+ std::numeric_limits<int64_t>::min());
+ offset += sizeof(int64_t);
+
+ // Column0: ExpiringColumn
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset),
+ ColumnTypeMask::EXPIRATION_MASK);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), e_index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), e_timestamp);
+ offset += sizeof(int64_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(e_data));
+ offset += sizeof(int32_t);
+ EXPECT_TRUE(std::memcmp(e_data, dest.c_str() + offset, sizeof(e_data)) == 0);
+ offset += sizeof(e_data);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), e_ttl);
+ offset += sizeof(int32_t);
+
+ // Column1: Column
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), 0);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), c_index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), c_timestamp);
+ offset += sizeof(int64_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), sizeof(c_data));
+ offset += sizeof(int32_t);
+ EXPECT_TRUE(std::memcmp(c_data, dest.c_str() + offset, sizeof(c_data)) == 0);
+ offset += sizeof(c_data);
+
+ // Column2: Tombstone
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset),
+ ColumnTypeMask::DELETION_MASK);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int8_t>(dest.c_str(), offset), t_index);
+ offset += sizeof(int8_t);
+ EXPECT_EQ(Deserialize<int32_t>(dest.c_str(), offset), t_local_deletion_time);
+ offset += sizeof(int32_t);
+ EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), t_marked_for_delete_at);
+
+ // Verify the deserialization.
+ RowValue r1 = RowValue::Deserialize(dest.c_str(), r.Size());
+ EXPECT_EQ(r1.Size(), columns_data_size + 12);
+ EXPECT_EQ(r1.IsTombstone(), false);
+ EXPECT_EQ(r1.LastModifiedTime(), last_modified_time);
+
+ r1.Serialize(&dest);
+ EXPECT_EQ(dest.size(), 2 * r.Size());
+ EXPECT_TRUE(
+ std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0);
+}
+
+TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) {
+ int64_t now = time(nullptr);
+
+ auto row_value = CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)),
+ CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired
+ CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired
+ CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
+ });
+
+ bool changed = false;
+ auto purged = row_value.RemoveExpiredColumns(&changed);
+ EXPECT_TRUE(changed);
+ EXPECT_EQ(purged.columns_.size(), 3);
+ VerifyRowValueColumns(purged.columns_, 0, kColumn, 0, ToMicroSeconds(now));
+ VerifyRowValueColumns(purged.columns_, 1, kExpiringColumn, 2, ToMicroSeconds(now));
+ VerifyRowValueColumns(purged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
+
+ purged.RemoveExpiredColumns(&changed);
+ EXPECT_FALSE(changed);
+}
+
+TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) {
+ int64_t now = time(nullptr);
+
+ auto row_value = CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now)),
+ CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired
+ CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired
+ CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
+ });
+
+ bool changed = false;
+ auto compacted = row_value.ConvertExpiredColumnsToTombstones(&changed);
+ EXPECT_TRUE(changed);
+ EXPECT_EQ(compacted.columns_.size(), 4);
+ VerifyRowValueColumns(compacted.columns_, 0, kColumn, 0, ToMicroSeconds(now));
+ VerifyRowValueColumns(compacted.columns_, 1, kTombstone, 1, ToMicroSeconds(now - 10));
+ VerifyRowValueColumns(compacted.columns_, 2, kExpiringColumn, 2, ToMicroSeconds(now));
+ VerifyRowValueColumns(compacted.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
+
+ compacted.ConvertExpiredColumnsToTombstones(&changed);
+ EXPECT_FALSE(changed);
+}
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc b/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc
new file mode 100644
index 000000000..501988423
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_functional_test.cc
@@ -0,0 +1,311 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <iostream>
+#include "db/db_impl/db_impl.h"
+#include "rocksdb/db.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "test_util/testharness.h"
+#include "util/random.h"
+#include "utilities/cassandra/cassandra_compaction_filter.h"
+#include "utilities/cassandra/merge_operator.h"
+#include "utilities/cassandra/test_utils.h"
+#include "utilities/merge_operators.h"
+
+using namespace ROCKSDB_NAMESPACE;
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+// Path to the database on file system
+const std::string kDbName = test::PerThreadDBPath("cassandra_functional_test");
+
+class CassandraStore {
+ public:
+ explicit CassandraStore(std::shared_ptr<DB> db)
+ : db_(db), write_option_(), get_option_() {
+ assert(db);
+ }
+
+ bool Append(const std::string& key, const RowValue& val){
+ std::string result;
+ val.Serialize(&result);
+ Slice valSlice(result.data(), result.size());
+ auto s = db_->Merge(write_option_, key, valSlice);
+
+ if (s.ok()) {
+ return true;
+ } else {
+ std::cerr << "ERROR " << s.ToString() << std::endl;
+ return false;
+ }
+ }
+
+ bool Put(const std::string& key, const RowValue& val) {
+ std::string result;
+ val.Serialize(&result);
+ Slice valSlice(result.data(), result.size());
+ auto s = db_->Put(write_option_, key, valSlice);
+ if (s.ok()) {
+ return true;
+ } else {
+ std::cerr << "ERROR " << s.ToString() << std::endl;
+ return false;
+ }
+ }
+
+ void Flush() {
+ dbfull()->TEST_FlushMemTable();
+ dbfull()->TEST_WaitForCompact();
+ }
+
+ void Compact() {
+ dbfull()->TEST_CompactRange(
+ 0, nullptr, nullptr, db_->DefaultColumnFamily());
+ }
+
+ std::tuple<bool, RowValue> Get(const std::string& key){
+ std::string result;
+ auto s = db_->Get(get_option_, key, &result);
+
+ if (s.ok()) {
+ return std::make_tuple(true,
+ RowValue::Deserialize(result.data(),
+ result.size()));
+ }
+
+ if (!s.IsNotFound()) {
+ std::cerr << "ERROR " << s.ToString() << std::endl;
+ }
+
+ return std::make_tuple(false, RowValue(0, 0));
+ }
+
+ private:
+ std::shared_ptr<DB> db_;
+ WriteOptions write_option_;
+ ReadOptions get_option_;
+
+ DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); }
+};
+
+class TestCompactionFilterFactory : public CompactionFilterFactory {
+public:
+ explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration,
+ int32_t gc_grace_period_in_seconds)
+ : purge_ttl_on_expiration_(purge_ttl_on_expiration),
+ gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(new CassandraCompactionFilter(
+ purge_ttl_on_expiration_, gc_grace_period_in_seconds_));
+ }
+
+ const char* Name() const override { return "TestCompactionFilterFactory"; }
+
+private:
+ bool purge_ttl_on_expiration_;
+ int32_t gc_grace_period_in_seconds_;
+};
+
+
+// The class for unit-testing
+class CassandraFunctionalTest : public testing::Test {
+public:
+ CassandraFunctionalTest() {
+ DestroyDB(kDbName, Options()); // Start each test with a fresh DB
+ }
+
+ std::shared_ptr<DB> OpenDb() {
+ DB* db;
+ Options options;
+ options.create_if_missing = true;
+ options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
+ auto* cf_factory = new TestCompactionFilterFactory(
+ purge_ttl_on_expiration_, gc_grace_period_in_seconds_);
+ options.compaction_filter_factory.reset(cf_factory);
+ EXPECT_OK(DB::Open(options, kDbName, &db));
+ return std::shared_ptr<DB>(db);
+ }
+
+ bool purge_ttl_on_expiration_ = false;
+ int32_t gc_grace_period_in_seconds_ = 100;
+};
+
+// THE TEST CASES BEGIN HERE
+
+TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now + 5)),
+ CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now + 8)),
+ CreateTestColumnSpec(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
+ }));
+ store.Append("k1",CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now + 2)),
+ CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
+ CreateTestColumnSpec(kTombstone, 2, ToMicroSeconds(now + 7)),
+ CreateTestColumnSpec(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
+ }));
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
+ CreateTestColumnSpec(kTombstone, 1, ToMicroSeconds(now + 5)),
+ CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now + 4)),
+ CreateTestColumnSpec(kTombstone, 11, ToMicroSeconds(now + 11)),
+ }));
+
+ auto ret = store.Get("k1");
+
+ ASSERT_TRUE(std::get<0>(ret));
+ RowValue& merged = std::get<1>(ret);
+ EXPECT_EQ(merged.columns_.size(), 5);
+ VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6));
+ VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8));
+ VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7));
+ VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17));
+ VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11));
+}
+
+TEST_F(CassandraFunctionalTest,
+ CompactionShouldConvertExpiredColumnsToTombstone) {
+ CassandraStore store(OpenDb());
+ int64_t now= time(nullptr);
+
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
+ CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
+ CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
+ }));
+
+ store.Flush();
+
+ store.Append("k1",CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
+ CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
+ }));
+
+ store.Flush();
+ store.Compact();
+
+ auto ret = store.Get("k1");
+ ASSERT_TRUE(std::get<0>(ret));
+ RowValue& merged = std::get<1>(ret);
+ EXPECT_EQ(merged.columns_.size(), 4);
+ VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10));
+ VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10));
+ VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now));
+ VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
+}
+
+
+TEST_F(CassandraFunctionalTest,
+ CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
+ purge_ttl_on_expiration_ = true;
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
+ CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
+ CreateTestColumnSpec(kTombstone, 3, ToMicroSeconds(now))
+ }));
+
+ store.Flush();
+
+ store.Append("k1",CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
+ CreateTestColumnSpec(kColumn, 2, ToMicroSeconds(now))
+ }));
+
+ store.Flush();
+ store.Compact();
+
+ auto ret = store.Get("k1");
+ ASSERT_TRUE(std::get<0>(ret));
+ RowValue& merged = std::get<1>(ret);
+ EXPECT_EQ(merged.columns_.size(), 3);
+ VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now));
+ VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now));
+ VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
+}
+
+TEST_F(CassandraFunctionalTest,
+ CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
+ purge_ttl_on_expiration_ = true;
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
+ CreateTestColumnSpec(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
+ }));
+
+ store.Flush();
+
+ store.Append("k1",CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
+ }));
+
+ store.Flush();
+ store.Compact();
+ ASSERT_FALSE(std::get<0>(store.Get("k1")));
+}
+
+TEST_F(CassandraFunctionalTest,
+ CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
+ purge_ttl_on_expiration_ = true;
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Append("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
+ CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now))
+ }));
+
+ store.Append("k2", CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, ToMicroSeconds(now))
+ }));
+
+ store.Flush();
+
+ store.Append("k1",CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 1, ToMicroSeconds(now)),
+ }));
+
+ store.Flush();
+ store.Compact();
+
+ auto ret = store.Get("k1");
+ ASSERT_TRUE(std::get<0>(ret));
+ RowValue& gced = std::get<1>(ret);
+ EXPECT_EQ(gced.columns_.size(), 1);
+ VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now));
+}
+
+TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
+ purge_ttl_on_expiration_ = true;
+ CassandraStore store(OpenDb());
+ int64_t now = time(nullptr);
+
+ store.Put("k1", CreateTestRowValue({
+ CreateTestColumnSpec(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
+ }));
+
+ store.Flush();
+ store.Compact();
+ ASSERT_FALSE(std::get<0>(store.Get("k1")));
+}
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc b/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc
new file mode 100644
index 000000000..9e9ff1494
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_row_merge_test.cc
@@ -0,0 +1,112 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <memory>
+#include "test_util/testharness.h"
+#include "utilities/cassandra/format.h"
+#include "utilities/cassandra/test_utils.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+TEST(RowValueMergeTest, Merge) {
+ std::vector<RowValue> row_values;
+ row_values.push_back(
+ CreateTestRowValue({
+ CreateTestColumnSpec(kTombstone, 0, 5),
+ CreateTestColumnSpec(kColumn, 1, 8),
+ CreateTestColumnSpec(kExpiringColumn, 2, 5),
+ })
+ );
+
+ row_values.push_back(
+ CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, 2),
+ CreateTestColumnSpec(kExpiringColumn, 1, 5),
+ CreateTestColumnSpec(kTombstone, 2, 7),
+ CreateTestColumnSpec(kExpiringColumn, 7, 17),
+ })
+ );
+
+ row_values.push_back(
+ CreateTestRowValue({
+ CreateTestColumnSpec(kExpiringColumn, 0, 6),
+ CreateTestColumnSpec(kTombstone, 1, 5),
+ CreateTestColumnSpec(kColumn, 2, 4),
+ CreateTestColumnSpec(kTombstone, 11, 11),
+ })
+ );
+
+ RowValue merged = RowValue::Merge(std::move(row_values));
+ EXPECT_FALSE(merged.IsTombstone());
+ EXPECT_EQ(merged.columns_.size(), 5);
+ VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6);
+ VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8);
+ VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7);
+ VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17);
+ VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11);
+}
+
+TEST(RowValueMergeTest, MergeWithRowTombstone) {
+ std::vector<RowValue> row_values;
+
+ // A row tombstone.
+ row_values.push_back(
+ CreateRowTombstone(11)
+ );
+
+ // This row's timestamp is smaller than tombstone.
+ row_values.push_back(
+ CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 0, 5),
+ CreateTestColumnSpec(kColumn, 1, 6),
+ })
+ );
+
+ // Some of the column's row is smaller, some is larger.
+ row_values.push_back(
+ CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 2, 10),
+ CreateTestColumnSpec(kColumn, 3, 12),
+ })
+ );
+
+ // All of the column's rows are larger than tombstone.
+ row_values.push_back(
+ CreateTestRowValue({
+ CreateTestColumnSpec(kColumn, 4, 13),
+ CreateTestColumnSpec(kColumn, 5, 14),
+ })
+ );
+
+ RowValue merged = RowValue::Merge(std::move(row_values));
+ EXPECT_FALSE(merged.IsTombstone());
+ EXPECT_EQ(merged.columns_.size(), 3);
+ VerifyRowValueColumns(merged.columns_, 0, kColumn, 3, 12);
+ VerifyRowValueColumns(merged.columns_, 1, kColumn, 4, 13);
+ VerifyRowValueColumns(merged.columns_, 2, kColumn, 5, 14);
+
+ // If the tombstone's timestamp is the latest, then it returns a
+ // row tombstone.
+ row_values.push_back(
+ CreateRowTombstone(15)
+ );
+
+ row_values.push_back(
+ CreateRowTombstone(17)
+ );
+
+ merged = RowValue::Merge(std::move(row_values));
+ EXPECT_TRUE(merged.IsTombstone());
+ EXPECT_EQ(merged.LastModifiedTime(), 17);
+}
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc b/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc
new file mode 100644
index 000000000..491540bfe
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/cassandra_serialize_test.cc
@@ -0,0 +1,188 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_util/testharness.h"
+#include "utilities/cassandra/serialize.h"
+
+using namespace ROCKSDB_NAMESPACE::cassandra;
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+TEST(SerializeTest, SerializeI64) {
+ std::string dest;
+ Serialize<int64_t>(0, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00'}),
+ dest);
+
+ dest.clear();
+ Serialize<int64_t>(1, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}),
+ dest);
+
+
+ dest.clear();
+ Serialize<int64_t>(-1, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}),
+ dest);
+
+ dest.clear();
+ Serialize<int64_t>(9223372036854775807, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\x7f', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff', '\xff'}),
+ dest);
+
+ dest.clear();
+ Serialize<int64_t>(-9223372036854775807, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\x80', '\x00', '\x00', '\x00', '\x00', '\x00', '\x00', '\x01'}),
+ dest);
+}
+
+TEST(SerializeTest, DeserializeI64) {
+ std::string dest;
+ std::size_t offset = dest.size();
+ Serialize<int64_t>(0, &dest);
+ EXPECT_EQ(0, Deserialize<int64_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int64_t>(1, &dest);
+ EXPECT_EQ(1, Deserialize<int64_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int64_t>(-1, &dest);
+ EXPECT_EQ(-1, Deserialize<int64_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int64_t>(-9223372036854775807, &dest);
+ EXPECT_EQ(-9223372036854775807, Deserialize<int64_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int64_t>(9223372036854775807, &dest);
+ EXPECT_EQ(9223372036854775807, Deserialize<int64_t>(dest.c_str(), offset));
+}
+
+TEST(SerializeTest, SerializeI32) {
+ std::string dest;
+ Serialize<int32_t>(0, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\x00', '\x00', '\x00', '\x00'}),
+ dest);
+
+ dest.clear();
+ Serialize<int32_t>(1, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\x00', '\x00', '\x00', '\x01'}),
+ dest);
+
+
+ dest.clear();
+ Serialize<int32_t>(-1, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\xff', '\xff', '\xff', '\xff'}),
+ dest);
+
+ dest.clear();
+ Serialize<int32_t>(2147483647, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\x7f', '\xff', '\xff', '\xff'}),
+ dest);
+
+ dest.clear();
+ Serialize<int32_t>(-2147483648LL, &dest);
+ EXPECT_EQ(
+ std::string(
+ {'\x80', '\x00', '\x00', '\x00'}),
+ dest);
+}
+
+TEST(SerializeTest, DeserializeI32) {
+ std::string dest;
+ std::size_t offset = dest.size();
+ Serialize<int32_t>(0, &dest);
+ EXPECT_EQ(0, Deserialize<int32_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int32_t>(1, &dest);
+ EXPECT_EQ(1, Deserialize<int32_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int32_t>(-1, &dest);
+ EXPECT_EQ(-1, Deserialize<int32_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int32_t>(2147483647, &dest);
+ EXPECT_EQ(2147483647, Deserialize<int32_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int32_t>(-2147483648LL, &dest);
+ EXPECT_EQ(-2147483648LL, Deserialize<int32_t>(dest.c_str(), offset));
+}
+
+TEST(SerializeTest, SerializeI8) {
+ std::string dest;
+ Serialize<int8_t>(0, &dest);
+ EXPECT_EQ(std::string({'\x00'}), dest);
+
+ dest.clear();
+ Serialize<int8_t>(1, &dest);
+ EXPECT_EQ(std::string({'\x01'}), dest);
+
+
+ dest.clear();
+ Serialize<int8_t>(-1, &dest);
+ EXPECT_EQ(std::string({'\xff'}), dest);
+
+ dest.clear();
+ Serialize<int8_t>(127, &dest);
+ EXPECT_EQ(std::string({'\x7f'}), dest);
+
+ dest.clear();
+ Serialize<int8_t>(-128, &dest);
+ EXPECT_EQ(std::string({'\x80'}), dest);
+}
+
+TEST(SerializeTest, DeserializeI8) {
+ std::string dest;
+ std::size_t offset = dest.size();
+ Serialize<int8_t>(0, &dest);
+ EXPECT_EQ(0, Deserialize<int8_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int8_t>(1, &dest);
+ EXPECT_EQ(1, Deserialize<int8_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int8_t>(-1, &dest);
+ EXPECT_EQ(-1, Deserialize<int8_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int8_t>(127, &dest);
+ EXPECT_EQ(127, Deserialize<int8_t>(dest.c_str(), offset));
+
+ offset = dest.size();
+ Serialize<int8_t>(-128, &dest);
+ EXPECT_EQ(-128, Deserialize<int8_t>(dest.c_str(), offset));
+}
+
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/utilities/cassandra/format.cc b/src/rocksdb/utilities/cassandra/format.cc
new file mode 100644
index 000000000..a767f41e7
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/format.cc
@@ -0,0 +1,390 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "format.h"
+
+#include <algorithm>
+#include <map>
+#include <memory>
+
+#include "utilities/cassandra/serialize.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+namespace {
+const int32_t kDefaultLocalDeletionTime =
+ std::numeric_limits<int32_t>::max();
+const int64_t kDefaultMarkedForDeleteAt =
+ std::numeric_limits<int64_t>::min();
+}
+
+ColumnBase::ColumnBase(int8_t mask, int8_t index)
+ : mask_(mask), index_(index) {}
+
+std::size_t ColumnBase::Size() const {
+ return sizeof(mask_) + sizeof(index_);
+}
+
+int8_t ColumnBase::Mask() const {
+ return mask_;
+}
+
+int8_t ColumnBase::Index() const {
+ return index_;
+}
+
+void ColumnBase::Serialize(std::string* dest) const {
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(mask_, dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(index_, dest);
+}
+
+std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
+ std::size_t offset) {
+ int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
+ return Tombstone::Deserialize(src, offset);
+ } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
+ return ExpiringColumn::Deserialize(src, offset);
+ } else {
+ return Column::Deserialize(src, offset);
+ }
+}
+
+Column::Column(
+ int8_t mask,
+ int8_t index,
+ int64_t timestamp,
+ int32_t value_size,
+ const char* value
+) : ColumnBase(mask, index), timestamp_(timestamp),
+ value_size_(value_size), value_(value) {}
+
+int64_t Column::Timestamp() const {
+ return timestamp_;
+}
+
+std::size_t Column::Size() const {
+ return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_)
+ + value_size_;
+}
+
+void Column::Serialize(std::string* dest) const {
+ ColumnBase::Serialize(dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(timestamp_, dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(value_size_, dest);
+ dest->append(value_, value_size_);
+}
+
+std::shared_ptr<Column> Column::Deserialize(const char *src,
+ std::size_t offset) {
+ int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(mask);
+ int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(index);
+ int64_t timestamp =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
+ offset += sizeof(timestamp);
+ int32_t value_size =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ offset += sizeof(value_size);
+ return std::make_shared<Column>(
+ mask, index, timestamp, value_size, src + offset);
+}
+
+ExpiringColumn::ExpiringColumn(
+ int8_t mask,
+ int8_t index,
+ int64_t timestamp,
+ int32_t value_size,
+ const char* value,
+ int32_t ttl
+) : Column(mask, index, timestamp, value_size, value),
+ ttl_(ttl) {}
+
+std::size_t ExpiringColumn::Size() const {
+ return Column::Size() + sizeof(ttl_);
+}
+
+void ExpiringColumn::Serialize(std::string* dest) const {
+ Column::Serialize(dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(ttl_, dest);
+}
+
+std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const {
+ return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp()));
+}
+
+std::chrono::seconds ExpiringColumn::Ttl() const {
+ return std::chrono::seconds(ttl_);
+}
+
+bool ExpiringColumn::Expired() const {
+ return TimePoint() + Ttl() < std::chrono::system_clock::now();
+}
+
+std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
+ auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
+ int32_t local_deletion_time = static_cast<int32_t>(
+ std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
+ int64_t marked_for_delete_at =
+ std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
+ return std::make_shared<Tombstone>(
+ static_cast<int8_t>(ColumnTypeMask::DELETION_MASK),
+ Index(),
+ local_deletion_time,
+ marked_for_delete_at);
+}
+
+std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
+ const char *src,
+ std::size_t offset) {
+ int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(mask);
+ int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(index);
+ int64_t timestamp =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
+ offset += sizeof(timestamp);
+ int32_t value_size =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ offset += sizeof(value_size);
+ const char* value = src + offset;
+ offset += value_size;
+ int32_t ttl = ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ return std::make_shared<ExpiringColumn>(
+ mask, index, timestamp, value_size, value, ttl);
+}
+
+Tombstone::Tombstone(
+ int8_t mask,
+ int8_t index,
+ int32_t local_deletion_time,
+ int64_t marked_for_delete_at
+) : ColumnBase(mask, index), local_deletion_time_(local_deletion_time),
+ marked_for_delete_at_(marked_for_delete_at) {}
+
+int64_t Tombstone::Timestamp() const {
+ return marked_for_delete_at_;
+}
+
+std::size_t Tombstone::Size() const {
+ return ColumnBase::Size() + sizeof(local_deletion_time_)
+ + sizeof(marked_for_delete_at_);
+}
+
+void Tombstone::Serialize(std::string* dest) const {
+ ColumnBase::Serialize(dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
+}
+
+bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
+ auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
+ std::chrono::seconds(local_deletion_time_));
+ auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
+ return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
+}
+
+std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
+ std::size_t offset) {
+ int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(mask);
+ int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
+ offset += sizeof(index);
+ int32_t local_deletion_time =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ offset += sizeof(int32_t);
+ int64_t marked_for_delete_at =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
+ return std::make_shared<Tombstone>(
+ mask, index, local_deletion_time, marked_for_delete_at);
+}
+
+RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
+ : local_deletion_time_(local_deletion_time),
+ marked_for_delete_at_(marked_for_delete_at), columns_(),
+ last_modified_time_(0) {}
+
+RowValue::RowValue(Columns columns,
+ int64_t last_modified_time)
+ : local_deletion_time_(kDefaultLocalDeletionTime),
+ marked_for_delete_at_(kDefaultMarkedForDeleteAt),
+ columns_(std::move(columns)), last_modified_time_(last_modified_time) {}
+
+std::size_t RowValue::Size() const {
+ std::size_t size = sizeof(local_deletion_time_)
+ + sizeof(marked_for_delete_at_);
+ for (const auto& column : columns_) {
+ size += column -> Size();
+ }
+ return size;
+}
+
+int64_t RowValue::LastModifiedTime() const {
+ if (IsTombstone()) {
+ return marked_for_delete_at_;
+ } else {
+ return last_modified_time_;
+ }
+}
+
+bool RowValue::IsTombstone() const {
+ return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
+}
+
+void RowValue::Serialize(std::string* dest) const {
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
+ ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
+ for (const auto& column : columns_) {
+ column -> Serialize(dest);
+ }
+}
+
+RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
+ *changed = false;
+ Columns new_columns;
+ for (auto& column : columns_) {
+ if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
+ std::shared_ptr<ExpiringColumn> expiring_column =
+ std::static_pointer_cast<ExpiringColumn>(column);
+
+ if(expiring_column->Expired()){
+ *changed = true;
+ continue;
+ }
+ }
+
+ new_columns.push_back(column);
+ }
+ return RowValue(std::move(new_columns), last_modified_time_);
+}
+
+RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
+ *changed = false;
+ Columns new_columns;
+ for (auto& column : columns_) {
+ if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
+ std::shared_ptr<ExpiringColumn> expiring_column =
+ std::static_pointer_cast<ExpiringColumn>(column);
+
+ if(expiring_column->Expired()) {
+ std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
+ new_columns.push_back(tombstone);
+ *changed = true;
+ continue;
+ }
+ }
+ new_columns.push_back(column);
+ }
+ return RowValue(std::move(new_columns), last_modified_time_);
+}
+
+RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
+ Columns new_columns;
+ for (auto& column : columns_) {
+ if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
+ std::shared_ptr<Tombstone> tombstone =
+ std::static_pointer_cast<Tombstone>(column);
+
+ if (tombstone->Collectable(gc_grace_period)) {
+ continue;
+ }
+ }
+
+ new_columns.push_back(column);
+ }
+ return RowValue(std::move(new_columns), last_modified_time_);
+}
+
+bool RowValue::Empty() const {
+ return columns_.empty();
+}
+
+RowValue RowValue::Deserialize(const char *src, std::size_t size) {
+ std::size_t offset = 0;
+ assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
+ int32_t local_deletion_time =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
+ offset += sizeof(int32_t);
+ int64_t marked_for_delete_at =
+ ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
+ offset += sizeof(int64_t);
+ if (offset == size) {
+ return RowValue(local_deletion_time, marked_for_delete_at);
+ }
+
+ assert(local_deletion_time == kDefaultLocalDeletionTime);
+ assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
+ Columns columns;
+ int64_t last_modified_time = 0;
+ while (offset < size) {
+ auto c = ColumnBase::Deserialize(src, offset);
+ offset += c -> Size();
+ assert(offset <= size);
+ last_modified_time = std::max(last_modified_time, c -> Timestamp());
+ columns.push_back(std::move(c));
+ }
+
+ return RowValue(std::move(columns), last_modified_time);
+}
+
+// Merge multiple row values into one.
+// For each column in rows with same index, we pick the one with latest
+// timestamp. And we also take row tombstone into consideration, by iterating
+// each row from reverse timestamp order, and stop once we hit the first
+// row tombstone.
+RowValue RowValue::Merge(std::vector<RowValue>&& values) {
+ assert(values.size() > 0);
+ if (values.size() == 1) {
+ return std::move(values[0]);
+ }
+
+ // Merge columns by their last modified time, and skip once we hit
+ // a row tombstone.
+ std::sort(values.begin(), values.end(),
+ [](const RowValue& r1, const RowValue& r2) {
+ return r1.LastModifiedTime() > r2.LastModifiedTime();
+ });
+
+ std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
+ int64_t tombstone_timestamp = 0;
+
+ for (auto& value : values) {
+ if (value.IsTombstone()) {
+ if (merged_columns.size() == 0) {
+ return std::move(value);
+ }
+ tombstone_timestamp = value.LastModifiedTime();
+ break;
+ }
+ for (auto& column : value.columns_) {
+ int8_t index = column->Index();
+ if (merged_columns.find(index) == merged_columns.end()) {
+ merged_columns[index] = column;
+ } else {
+ if (column->Timestamp() > merged_columns[index]->Timestamp()) {
+ merged_columns[index] = column;
+ }
+ }
+ }
+ }
+
+ int64_t last_modified_time = 0;
+ Columns columns;
+ for (auto& pair: merged_columns) {
+ // For some row, its last_modified_time > row tombstone_timestamp, but
+ // it might have rows whose timestamp is ealier than tombstone, so we
+ // ned to filter these rows.
+ if (pair.second->Timestamp() <= tombstone_timestamp) {
+ continue;
+ }
+ last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
+ columns.push_back(std::move(pair.second));
+ }
+ return RowValue(std::move(columns), last_modified_time);
+}
+
+} // namepsace cassandrda
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/format.h b/src/rocksdb/utilities/cassandra/format.h
new file mode 100644
index 000000000..3f9b433c7
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/format.h
@@ -0,0 +1,197 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+/**
+ * The encoding of Cassandra Row Value.
+ *
+ * A Cassandra Row Value could either be a row tombstone,
+ * or contains multiple columns, it has following fields:
+ *
+ * struct row_value {
+ * int32_t local_deletion_time; // Time in second when the row is deleted,
+ * // only used for Cassandra tombstone gc.
+ * int64_t marked_for_delete_at; // Ms that marked this row is deleted.
+ * struct column_base columns[]; // For non tombstone row, all columns
+ * // are stored here.
+ * }
+ *
+ * If the local_deletion_time and marked_for_delete_at is set, then this is
+ * a tombstone, otherwise it contains multiple columns.
+ *
+ * There are three type of Columns: Normal Column, Expiring Column and Column
+ * Tombstone, which have following fields:
+ *
+ * // Identify the type of the column.
+ * enum mask {
+ * DELETION_MASK = 0x01,
+ * EXPIRATION_MASK = 0x02,
+ * };
+ *
+ * struct column {
+ * int8_t mask = 0;
+ * int8_t index;
+ * int64_t timestamp;
+ * int32_t value_length;
+ * char value[value_length];
+ * }
+ *
+ * struct expiring_column {
+ * int8_t mask = mask.EXPIRATION_MASK;
+ * int8_t index;
+ * int64_t timestamp;
+ * int32_t value_length;
+ * char value[value_length];
+ * int32_t ttl;
+ * }
+ *
+ * struct tombstone_column {
+ * int8_t mask = mask.DELETION_MASK;
+ * int8_t index;
+ * int32_t local_deletion_time; // Similar to row_value's field.
+ * int64_t marked_for_delete_at;
+ * }
+ */
+
+#pragma once
+#include <chrono>
+#include <memory>
+#include <vector>
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+#include "test_util/testharness.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+// Identify the type of the column.
+enum ColumnTypeMask {
+ DELETION_MASK = 0x01,
+ EXPIRATION_MASK = 0x02,
+};
+
+
+class ColumnBase {
+public:
+ ColumnBase(int8_t mask, int8_t index);
+ virtual ~ColumnBase() = default;
+
+ virtual int64_t Timestamp() const = 0;
+ virtual int8_t Mask() const;
+ virtual int8_t Index() const;
+ virtual std::size_t Size() const;
+ virtual void Serialize(std::string* dest) const;
+ static std::shared_ptr<ColumnBase> Deserialize(const char* src,
+ std::size_t offset);
+
+private:
+ int8_t mask_;
+ int8_t index_;
+};
+
+class Column : public ColumnBase {
+public:
+ Column(int8_t mask, int8_t index, int64_t timestamp,
+ int32_t value_size, const char* value);
+
+ virtual int64_t Timestamp() const override;
+ virtual std::size_t Size() const override;
+ virtual void Serialize(std::string* dest) const override;
+ static std::shared_ptr<Column> Deserialize(const char* src,
+ std::size_t offset);
+
+private:
+ int64_t timestamp_;
+ int32_t value_size_;
+ const char* value_;
+};
+
+class Tombstone : public ColumnBase {
+public:
+ Tombstone(int8_t mask, int8_t index,
+ int32_t local_deletion_time, int64_t marked_for_delete_at);
+
+ virtual int64_t Timestamp() const override;
+ virtual std::size_t Size() const override;
+ virtual void Serialize(std::string* dest) const override;
+ bool Collectable(int32_t gc_grace_period) const;
+ static std::shared_ptr<Tombstone> Deserialize(const char* src,
+ std::size_t offset);
+
+private:
+ int32_t local_deletion_time_;
+ int64_t marked_for_delete_at_;
+};
+
+class ExpiringColumn : public Column {
+public:
+ ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
+ int32_t value_size, const char* value, int32_t ttl);
+
+ virtual std::size_t Size() const override;
+ virtual void Serialize(std::string* dest) const override;
+ bool Expired() const;
+ std::shared_ptr<Tombstone> ToTombstone() const;
+
+ static std::shared_ptr<ExpiringColumn> Deserialize(const char* src,
+ std::size_t offset);
+
+private:
+ int32_t ttl_;
+ std::chrono::time_point<std::chrono::system_clock> TimePoint() const;
+ std::chrono::seconds Ttl() const;
+};
+
+typedef std::vector<std::shared_ptr<ColumnBase>> Columns;
+
+class RowValue {
+public:
+ // Create a Row Tombstone.
+ RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at);
+ // Create a Row containing columns.
+ RowValue(Columns columns,
+ int64_t last_modified_time);
+ RowValue(const RowValue& /*that*/) = delete;
+ RowValue(RowValue&& /*that*/) noexcept = default;
+ RowValue& operator=(const RowValue& /*that*/) = delete;
+ RowValue& operator=(RowValue&& /*that*/) = default;
+
+ std::size_t Size() const;;
+ bool IsTombstone() const;
+ // For Tombstone this returns the marked_for_delete_at_,
+ // otherwise it returns the max timestamp of containing columns.
+ int64_t LastModifiedTime() const;
+ void Serialize(std::string* dest) const;
+ RowValue RemoveExpiredColumns(bool* changed) const;
+ RowValue ConvertExpiredColumnsToTombstones(bool* changed) const;
+ RowValue RemoveTombstones(int32_t gc_grace_period) const;
+ bool Empty() const;
+
+ static RowValue Deserialize(const char* src, std::size_t size);
+ // Merge multiple rows according to their timestamp.
+ static RowValue Merge(std::vector<RowValue>&& values);
+
+private:
+ int32_t local_deletion_time_;
+ int64_t marked_for_delete_at_;
+ Columns columns_;
+ int64_t last_modified_time_;
+
+ FRIEND_TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired);
+ FRIEND_TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones);
+ FRIEND_TEST(RowValueMergeTest, Merge);
+ FRIEND_TEST(RowValueMergeTest, MergeWithRowTombstone);
+ FRIEND_TEST(CassandraFunctionalTest, SimpleMergeTest);
+ FRIEND_TEST(
+ CassandraFunctionalTest, CompactionShouldConvertExpiredColumnsToTombstone);
+ FRIEND_TEST(
+ CassandraFunctionalTest, CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn);
+ FRIEND_TEST(
+ CassandraFunctionalTest, CompactionShouldRemoveRowWhenAllColumnExpiredIfPurgeTtlIsOn);
+ FRIEND_TEST(CassandraFunctionalTest,
+ CompactionShouldRemoveTombstoneExceedingGCGracePeriod);
+};
+
+} // namepsace cassandrda
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/merge_operator.cc b/src/rocksdb/utilities/cassandra/merge_operator.cc
new file mode 100644
index 000000000..82fe5d661
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/merge_operator.cc
@@ -0,0 +1,67 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "merge_operator.h"
+
+#include <memory>
+#include <assert.h>
+
+#include "rocksdb/slice.h"
+#include "rocksdb/merge_operator.h"
+#include "utilities/merge_operators.h"
+#include "utilities/cassandra/format.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+// Implementation for the merge operation (merges two Cassandra values)
+bool CassandraValueMergeOperator::FullMergeV2(
+ const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const {
+ // Clear the *new_value for writing.
+ merge_out->new_value.clear();
+ std::vector<RowValue> row_values;
+ if (merge_in.existing_value) {
+ row_values.push_back(
+ RowValue::Deserialize(merge_in.existing_value->data(),
+ merge_in.existing_value->size()));
+ }
+
+ for (auto& operand : merge_in.operand_list) {
+ row_values.push_back(RowValue::Deserialize(operand.data(), operand.size()));
+ }
+
+ RowValue merged = RowValue::Merge(std::move(row_values));
+ merged = merged.RemoveTombstones(gc_grace_period_in_seconds_);
+ merge_out->new_value.reserve(merged.Size());
+ merged.Serialize(&(merge_out->new_value));
+
+ return true;
+}
+
+bool CassandraValueMergeOperator::PartialMergeMulti(
+ const Slice& /*key*/, const std::deque<Slice>& operand_list,
+ std::string* new_value, Logger* /*logger*/) const {
+ // Clear the *new_value for writing.
+ assert(new_value);
+ new_value->clear();
+
+ std::vector<RowValue> row_values;
+ for (auto& operand : operand_list) {
+ row_values.push_back(RowValue::Deserialize(operand.data(), operand.size()));
+ }
+ RowValue merged = RowValue::Merge(std::move(row_values));
+ new_value->reserve(merged.Size());
+ merged.Serialize(new_value);
+ return true;
+}
+
+const char* CassandraValueMergeOperator::Name() const {
+ return "CassandraValueMergeOperator";
+}
+
+} // namespace cassandra
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/merge_operator.h b/src/rocksdb/utilities/cassandra/merge_operator.h
new file mode 100644
index 000000000..b5bf7c520
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/merge_operator.h
@@ -0,0 +1,44 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+
+/**
+ * A MergeOperator for rocksdb that implements Cassandra row value merge.
+ */
+class CassandraValueMergeOperator : public MergeOperator {
+public:
+ explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds,
+ size_t operands_limit = 0)
+ : gc_grace_period_in_seconds_(gc_grace_period_in_seconds),
+ operands_limit_(operands_limit) {}
+
+ virtual bool FullMergeV2(const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const override;
+
+ virtual bool PartialMergeMulti(const Slice& key,
+ const std::deque<Slice>& operand_list,
+ std::string* new_value,
+ Logger* logger) const override;
+
+ virtual const char* Name() const override;
+
+ virtual bool AllowSingleOperand() const override { return true; }
+
+ virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
+ return operands_limit_ > 0 && operands.size() >= operands_limit_;
+ }
+
+private:
+ int32_t gc_grace_period_in_seconds_;
+ size_t operands_limit_;
+};
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/serialize.h b/src/rocksdb/utilities/cassandra/serialize.h
new file mode 100644
index 000000000..cd980ade0
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/serialize.h
@@ -0,0 +1,75 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+/**
+ * Helper functions which serialize and deserialize integers
+ * into bytes in big endian.
+ */
+
+#pragma once
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+namespace {
+const int64_t kCharMask = 0xFFLL;
+const int32_t kBitsPerByte = 8;
+}
+
+template<typename T>
+void Serialize(T val, std::string* dest);
+
+template<typename T>
+T Deserialize(const char* src, std::size_t offset=0);
+
+// Specializations
+template<>
+inline void Serialize<int8_t>(int8_t t, std::string* dest) {
+ dest->append(1, static_cast<char>(t & kCharMask));
+}
+
+template<>
+inline void Serialize<int32_t>(int32_t t, std::string* dest) {
+ for (unsigned long i = 0; i < sizeof(int32_t); i++) {
+ dest->append(1, static_cast<char>(
+ (t >> (sizeof(int32_t) - 1 - i) * kBitsPerByte) & kCharMask));
+ }
+}
+
+template<>
+inline void Serialize<int64_t>(int64_t t, std::string* dest) {
+ for (unsigned long i = 0; i < sizeof(int64_t); i++) {
+ dest->append(
+ 1, static_cast<char>(
+ (t >> (sizeof(int64_t) - 1 - i) * kBitsPerByte) & kCharMask));
+ }
+}
+
+template<>
+inline int8_t Deserialize<int8_t>(const char* src, std::size_t offset) {
+ return static_cast<int8_t>(src[offset]);
+}
+
+template<>
+inline int32_t Deserialize<int32_t>(const char* src, std::size_t offset) {
+ int32_t result = 0;
+ for (unsigned long i = 0; i < sizeof(int32_t); i++) {
+ result |= static_cast<int32_t>(static_cast<unsigned char>(src[offset + i]))
+ << ((sizeof(int32_t) - 1 - i) * kBitsPerByte);
+ }
+ return result;
+}
+
+template<>
+inline int64_t Deserialize<int64_t>(const char* src, std::size_t offset) {
+ int64_t result = 0;
+ for (unsigned long i = 0; i < sizeof(int64_t); i++) {
+ result |= static_cast<int64_t>(static_cast<unsigned char>(src[offset + i]))
+ << ((sizeof(int64_t) - 1 - i) * kBitsPerByte);
+ }
+ return result;
+}
+
+} // namepsace cassandrda
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/test_utils.cc b/src/rocksdb/utilities/cassandra/test_utils.cc
new file mode 100644
index 000000000..47919bf62
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/test_utils.cc
@@ -0,0 +1,75 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "test_utils.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+const char kData[] = {'d', 'a', 't', 'a'};
+const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'};
+const int32_t kTtl = 86400;
+const int8_t kColumn = 0;
+const int8_t kTombstone = 1;
+const int8_t kExpiringColumn = 2;
+
+std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask,
+ int8_t index,
+ int64_t timestamp) {
+ if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
+ return std::shared_ptr<Tombstone>(
+ new Tombstone(mask, index, ToSeconds(timestamp), timestamp));
+ } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
+ return std::shared_ptr<ExpiringColumn>(new ExpiringColumn(
+ mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl));
+ } else {
+ return std::shared_ptr<Column>(
+ new Column(mask, index, timestamp, sizeof(kData), kData));
+ }
+}
+
+std::tuple<int8_t, int8_t, int64_t> CreateTestColumnSpec(int8_t mask,
+ int8_t index,
+ int64_t timestamp) {
+ return std::make_tuple(mask, index, timestamp);
+}
+
+RowValue CreateTestRowValue(
+ std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs) {
+ std::vector<std::shared_ptr<ColumnBase>> columns;
+ int64_t last_modified_time = 0;
+ for (auto spec: column_specs) {
+ auto c = CreateTestColumn(std::get<0>(spec), std::get<1>(spec),
+ std::get<2>(spec));
+ last_modified_time = std::max(last_modified_time, c -> Timestamp());
+ columns.push_back(std::move(c));
+ }
+ return RowValue(std::move(columns), last_modified_time);
+}
+
+RowValue CreateRowTombstone(int64_t timestamp) {
+ return RowValue(ToSeconds(timestamp), timestamp);
+}
+
+void VerifyRowValueColumns(
+ std::vector<std::shared_ptr<ColumnBase>> &columns,
+ std::size_t index_of_vector,
+ int8_t expected_mask,
+ int8_t expected_index,
+ int64_t expected_timestamp
+) {
+ EXPECT_EQ(expected_timestamp, columns[index_of_vector]->Timestamp());
+ EXPECT_EQ(expected_mask, columns[index_of_vector]->Mask());
+ EXPECT_EQ(expected_index, columns[index_of_vector]->Index());
+}
+
+int64_t ToMicroSeconds(int64_t seconds) {
+ return seconds * (int64_t)1000000;
+}
+
+int32_t ToSeconds(int64_t microseconds) {
+ return (int32_t)(microseconds / (int64_t)1000000);
+}
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/cassandra/test_utils.h b/src/rocksdb/utilities/cassandra/test_utils.h
new file mode 100644
index 000000000..235b35a02
--- /dev/null
+++ b/src/rocksdb/utilities/cassandra/test_utils.h
@@ -0,0 +1,46 @@
+// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include <memory>
+#include "test_util/testharness.h"
+#include "utilities/cassandra/format.h"
+#include "utilities/cassandra/serialize.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace cassandra {
+extern const char kData[];
+extern const char kExpiringData[];
+extern const int32_t kTtl;
+extern const int8_t kColumn;
+extern const int8_t kTombstone;
+extern const int8_t kExpiringColumn;
+
+
+std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask,
+ int8_t index,
+ int64_t timestamp);
+
+std::tuple<int8_t, int8_t, int64_t> CreateTestColumnSpec(int8_t mask,
+ int8_t index,
+ int64_t timestamp);
+
+RowValue CreateTestRowValue(
+ std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs);
+
+RowValue CreateRowTombstone(int64_t timestamp);
+
+void VerifyRowValueColumns(
+ std::vector<std::shared_ptr<ColumnBase>> &columns,
+ std::size_t index_of_vector,
+ int8_t expected_mask,
+ int8_t expected_index,
+ int64_t expected_timestamp
+);
+
+int64_t ToMicroSeconds(int64_t seconds);
+int32_t ToSeconds(int64_t microseconds);
+}
+} // namespace ROCKSDB_NAMESPACE