summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/wide
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/wide
parentInitial commit. (diff)
downloadceph-upstream/18.2.2.tar.xz
ceph-upstream/18.2.2.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/wide')
-rw-r--r--src/rocksdb/db/wide/db_wide_basic_test.cc654
-rw-r--r--src/rocksdb/db/wide/wide_column_serialization.cc182
-rw-r--r--src/rocksdb/db/wide/wide_column_serialization.h77
-rw-r--r--src/rocksdb/db/wide/wide_column_serialization_test.cc338
-rw-r--r--src/rocksdb/db/wide/wide_columns.cc22
5 files changed, 1273 insertions, 0 deletions
diff --git a/src/rocksdb/db/wide/db_wide_basic_test.cc b/src/rocksdb/db/wide/db_wide_basic_test.cc
new file mode 100644
index 000000000..1ffe314fe
--- /dev/null
+++ b/src/rocksdb/db/wide/db_wide_basic_test.cc
@@ -0,0 +1,654 @@
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <array>
+#include <memory>
+
+#include "db/db_test_util.h"
+#include "port/stack_trace.h"
+#include "test_util/testutil.h"
+#include "utilities/merge_operators.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class DBWideBasicTest : public DBTestBase {
+ protected:
+ explicit DBWideBasicTest()
+ : DBTestBase("db_wide_basic_test", /* env_do_fsync */ false) {}
+};
+
+TEST_F(DBWideBasicTest, PutEntity) {
+ Options options = GetDefaultOptions();
+
+ // Write a couple of wide-column entities and a plain old key-value, then read
+ // them back.
+ constexpr char first_key[] = "first";
+ constexpr char first_value_of_default_column[] = "hello";
+ WideColumns first_columns{
+ {kDefaultWideColumnName, first_value_of_default_column},
+ {"attr_name1", "foo"},
+ {"attr_name2", "bar"}};
+
+ constexpr char second_key[] = "second";
+ WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
+
+ constexpr char third_key[] = "third";
+ constexpr char third_value[] = "baz";
+
+ auto verify = [&]() {
+ const WideColumns expected_third_columns{
+ {kDefaultWideColumnName, third_value}};
+
+ {
+ PinnableSlice result;
+ ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key,
+ &result));
+ ASSERT_EQ(result, first_value_of_default_column);
+ }
+
+ {
+ PinnableWideColumns result;
+ ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
+ first_key, &result));
+ ASSERT_EQ(result.columns(), first_columns);
+ }
+
+ {
+ PinnableSlice result;
+ ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key,
+ &result));
+ ASSERT_TRUE(result.empty());
+ }
+
+ {
+ PinnableWideColumns result;
+ ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
+ second_key, &result));
+ ASSERT_EQ(result.columns(), second_columns);
+ }
+
+ {
+ PinnableSlice result;
+ ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), third_key,
+ &result));
+ ASSERT_EQ(result, third_value);
+ }
+
+ {
+ PinnableWideColumns result;
+ ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
+ third_key, &result));
+
+ ASSERT_EQ(result.columns(), expected_third_columns);
+ }
+
+ {
+ constexpr size_t num_keys = 3;
+
+ std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
+ std::array<PinnableSlice, num_keys> values;
+ std::array<Status, num_keys> statuses;
+
+ db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
+ &keys[0], &values[0], &statuses[0]);
+
+ ASSERT_OK(statuses[0]);
+ ASSERT_EQ(values[0], first_value_of_default_column);
+
+ ASSERT_OK(statuses[1]);
+ ASSERT_TRUE(values[1].empty());
+
+ ASSERT_OK(statuses[2]);
+ ASSERT_EQ(values[2], third_value);
+ }
+
+ {
+ std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
+
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), first_key);
+ ASSERT_EQ(iter->value(), first_value_of_default_column);
+ ASSERT_EQ(iter->columns(), first_columns);
+
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), second_key);
+ ASSERT_TRUE(iter->value().empty());
+ ASSERT_EQ(iter->columns(), second_columns);
+
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), third_key);
+ ASSERT_EQ(iter->value(), third_value);
+ ASSERT_EQ(iter->columns(), expected_third_columns);
+
+ iter->Next();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+
+ iter->SeekToLast();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), third_key);
+ ASSERT_EQ(iter->value(), third_value);
+ ASSERT_EQ(iter->columns(), expected_third_columns);
+
+ iter->Prev();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), second_key);
+ ASSERT_TRUE(iter->value().empty());
+ ASSERT_EQ(iter->columns(), second_columns);
+
+ iter->Prev();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), first_key);
+ ASSERT_EQ(iter->value(), first_value_of_default_column);
+ ASSERT_EQ(iter->columns(), first_columns);
+
+ iter->Prev();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ }
+ };
+
+ // Use the DB::PutEntity API to write the first entity
+ ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
+ first_key, first_columns));
+
+ // Use WriteBatch to write the second entity
+ WriteBatch batch;
+ ASSERT_OK(
+ batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns));
+ ASSERT_OK(db_->Write(WriteOptions(), &batch));
+
+ // Use Put to write the plain key-value
+ ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), third_key,
+ third_value));
+
+ // Try reading from memtable
+ verify();
+
+ // Try reading after recovery
+ Close();
+ options.avoid_flush_during_recovery = true;
+ Reopen(options);
+
+ verify();
+
+ // Try reading from storage
+ ASSERT_OK(Flush());
+
+ verify();
+}
+
+TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
+ Options options = GetDefaultOptions();
+ CreateAndReopenWithCF({"corinthian"}, options);
+
+ // Use the DB::PutEntity API
+ constexpr char first_key[] = "first";
+ WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
+
+ ASSERT_OK(
+ db_->PutEntity(WriteOptions(), handles_[1], first_key, first_columns));
+
+ // Use WriteBatch
+ constexpr char second_key[] = "second";
+ WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
+
+ WriteBatch batch;
+ ASSERT_OK(batch.PutEntity(handles_[1], second_key, second_columns));
+ ASSERT_OK(db_->Write(WriteOptions(), &batch));
+}
+
+TEST_F(DBWideBasicTest, MergePlainKeyValue) {
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+ options.merge_operator = MergeOperators::CreateStringAppendOperator();
+ Reopen(options);
+
+ // Put + Merge
+ constexpr char first_key[] = "first";
+ constexpr char first_base_value[] = "hello";
+ constexpr char first_merge_op[] = "world";
+
+ // Delete + Merge
+ constexpr char second_key[] = "second";
+ constexpr char second_merge_op[] = "foo";
+
+ // Merge without any preceding KV
+ constexpr char third_key[] = "third";
+ constexpr char third_merge_op[] = "bar";
+
+ auto write_base = [&]() {
+ // Write "base" KVs: a Put for the 1st key and a Delete for the 2nd one;
+ // note there is no "base" KV for the 3rd
+ ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), first_key,
+ first_base_value));
+ ASSERT_OK(
+ db_->Delete(WriteOptions(), db_->DefaultColumnFamily(), second_key));
+ };
+
+ auto write_merge = [&]() {
+ // Write Merge operands
+ ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
+ first_merge_op));
+ ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
+ second_merge_op));
+ ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), third_key,
+ third_merge_op));
+ };
+
+ const std::string expected_first_column(std::string(first_base_value) + "," +
+ first_merge_op);
+ const WideColumns expected_first_columns{
+ {kDefaultWideColumnName, expected_first_column}};
+ const WideColumns expected_second_columns{
+ {kDefaultWideColumnName, second_merge_op}};
+ const WideColumns expected_third_columns{
+ {kDefaultWideColumnName, third_merge_op}};
+
+ auto verify = [&]() {
+ {
+ PinnableWideColumns result;
+ ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
+ first_key, &result));
+ ASSERT_EQ(result.columns(), expected_first_columns);
+ }
+
+ {
+ PinnableWideColumns result;
+ ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
+ second_key, &result));
+ ASSERT_EQ(result.columns(), expected_second_columns);
+ }
+
+ {
+ PinnableWideColumns result;
+ ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
+ third_key, &result));
+
+ ASSERT_EQ(result.columns(), expected_third_columns);
+ }
+
+ {
+ std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
+
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), first_key);
+ ASSERT_EQ(iter->value(), expected_first_columns[0].value());
+ ASSERT_EQ(iter->columns(), expected_first_columns);
+
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), second_key);
+ ASSERT_EQ(iter->value(), expected_second_columns[0].value());
+ ASSERT_EQ(iter->columns(), expected_second_columns);
+
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), third_key);
+ ASSERT_EQ(iter->value(), expected_third_columns[0].value());
+ ASSERT_EQ(iter->columns(), expected_third_columns);
+
+ iter->Next();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+
+ iter->SeekToLast();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), third_key);
+ ASSERT_EQ(iter->value(), expected_third_columns[0].value());
+ ASSERT_EQ(iter->columns(), expected_third_columns);
+
+ iter->Prev();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), second_key);
+ ASSERT_EQ(iter->value(), expected_second_columns[0].value());
+ ASSERT_EQ(iter->columns(), expected_second_columns);
+
+ iter->Prev();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), first_key);
+ ASSERT_EQ(iter->value(), expected_first_columns[0].value());
+ ASSERT_EQ(iter->columns(), expected_first_columns);
+
+ iter->Prev();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ }
+ };
+
+ {
+ // Base KVs (if any) and Merge operands both in memtable (note: we take a
+ // snapshot in between to make sure they do not get reconciled during the
+ // subsequent flush)
+ write_base();
+ ManagedSnapshot snapshot(db_);
+ write_merge();
+ verify();
+
+ // Base KVs (if any) and Merge operands both in storage
+ ASSERT_OK(Flush());
+ verify();
+ }
+
+ // Base KVs (if any) in storage, Merge operands in memtable
+ DestroyAndReopen(options);
+ write_base();
+ ASSERT_OK(Flush());
+ write_merge();
+ verify();
+}
+
+TEST_F(DBWideBasicTest, MergeEntity) {
+ Options options = GetDefaultOptions();
+ options.create_if_missing = true;
+
+ const std::string delim("|");
+ options.merge_operator = MergeOperators::CreateStringAppendOperator(delim);
+
+ Reopen(options);
+
+ // Test Merge with two entities: one that has the default column and one that
+ // doesn't
+ constexpr char first_key[] = "first";
+ WideColumns first_columns{{kDefaultWideColumnName, "a"},
+ {"attr_name1", "foo"},
+ {"attr_name2", "bar"}};
+ constexpr char first_merge_operand[] = "bla1";
+
+ constexpr char second_key[] = "second";
+ WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
+ constexpr char second_merge_operand[] = "bla2";
+
+ auto write_base = [&]() {
+ // Use the DB::PutEntity API
+ ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
+ first_key, first_columns));
+
+ // Use WriteBatch
+ WriteBatch batch;
+ ASSERT_OK(batch.PutEntity(db_->DefaultColumnFamily(), second_key,
+ second_columns));
+ ASSERT_OK(db_->Write(WriteOptions(), &batch));
+ };
+
+ auto write_merge = [&]() {
+ ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
+ first_merge_operand));
+ ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
+ second_merge_operand));
+ };
+
+ const std::string first_expected_default(first_columns[0].value().ToString() +
+ delim + first_merge_operand);
+ const std::string second_expected_default(delim + second_merge_operand);
+
+ auto verify_basic = [&]() {
+ WideColumns first_expected_columns{
+ {kDefaultWideColumnName, first_expected_default},
+ first_columns[1],
+ first_columns[2]};
+
+ WideColumns second_expected_columns{
+ {kDefaultWideColumnName, second_expected_default},
+ second_columns[0],
+ second_columns[1]};
+
+ {
+ PinnableSlice result;
+ ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key,
+ &result));
+ ASSERT_EQ(result, first_expected_default);
+ }
+
+ {
+ PinnableWideColumns result;
+ ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
+ first_key, &result));
+ ASSERT_EQ(result.columns(), first_expected_columns);
+ }
+
+ {
+ PinnableSlice result;
+ ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key,
+ &result));
+ ASSERT_EQ(result, second_expected_default);
+ }
+
+ {
+ PinnableWideColumns result;
+ ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
+ second_key, &result));
+ ASSERT_EQ(result.columns(), second_expected_columns);
+ }
+
+ {
+ constexpr size_t num_keys = 2;
+
+ std::array<Slice, num_keys> keys{{first_key, second_key}};
+ std::array<PinnableSlice, num_keys> values;
+ std::array<Status, num_keys> statuses;
+
+ db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
+ &keys[0], &values[0], &statuses[0]);
+
+ ASSERT_EQ(values[0], first_expected_default);
+ ASSERT_OK(statuses[0]);
+
+ ASSERT_EQ(values[1], second_expected_default);
+ ASSERT_OK(statuses[1]);
+ }
+
+ {
+ std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
+
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), first_key);
+ ASSERT_EQ(iter->value(), first_expected_default);
+ ASSERT_EQ(iter->columns(), first_expected_columns);
+
+ iter->Next();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), second_key);
+ ASSERT_EQ(iter->value(), second_expected_default);
+ ASSERT_EQ(iter->columns(), second_expected_columns);
+
+ iter->Next();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+
+ iter->SeekToLast();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), second_key);
+ ASSERT_EQ(iter->value(), second_expected_default);
+ ASSERT_EQ(iter->columns(), second_expected_columns);
+
+ iter->Prev();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+ ASSERT_EQ(iter->key(), first_key);
+ ASSERT_EQ(iter->value(), first_expected_default);
+ ASSERT_EQ(iter->columns(), first_expected_columns);
+
+ iter->Prev();
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ }
+ };
+
+ auto verify_merge_ops_pre_compaction = [&]() {
+ constexpr size_t num_merge_operands = 2;
+
+ GetMergeOperandsOptions get_merge_opts;
+ get_merge_opts.expected_max_number_of_operands = num_merge_operands;
+
+ {
+ std::array<PinnableSlice, num_merge_operands> merge_operands;
+ int number_of_operands = 0;
+
+ ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
+ first_key, &merge_operands[0],
+ &get_merge_opts, &number_of_operands));
+
+ ASSERT_EQ(number_of_operands, num_merge_operands);
+ ASSERT_EQ(merge_operands[0], first_columns[0].value());
+ ASSERT_EQ(merge_operands[1], first_merge_operand);
+ }
+
+ {
+ std::array<PinnableSlice, num_merge_operands> merge_operands;
+ int number_of_operands = 0;
+
+ ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
+ second_key, &merge_operands[0],
+ &get_merge_opts, &number_of_operands));
+
+ ASSERT_EQ(number_of_operands, num_merge_operands);
+ ASSERT_TRUE(merge_operands[0].empty());
+ ASSERT_EQ(merge_operands[1], second_merge_operand);
+ }
+ };
+
+ auto verify_merge_ops_post_compaction = [&]() {
+ constexpr size_t num_merge_operands = 1;
+
+ GetMergeOperandsOptions get_merge_opts;
+ get_merge_opts.expected_max_number_of_operands = num_merge_operands;
+
+ {
+ std::array<PinnableSlice, num_merge_operands> merge_operands;
+ int number_of_operands = 0;
+
+ ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
+ first_key, &merge_operands[0],
+ &get_merge_opts, &number_of_operands));
+
+ ASSERT_EQ(number_of_operands, num_merge_operands);
+ ASSERT_EQ(merge_operands[0], first_expected_default);
+ }
+
+ {
+ std::array<PinnableSlice, num_merge_operands> merge_operands;
+ int number_of_operands = 0;
+
+ ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
+ second_key, &merge_operands[0],
+ &get_merge_opts, &number_of_operands));
+
+ ASSERT_EQ(number_of_operands, num_merge_operands);
+ ASSERT_EQ(merge_operands[0], second_expected_default);
+ }
+ };
+
+ {
+ // Base KVs and Merge operands both in memtable (note: we take a snapshot in
+ // between to make sure they do not get reconciled during the subsequent
+ // flush)
+ write_base();
+ ManagedSnapshot snapshot(db_);
+ write_merge();
+ verify_basic();
+ verify_merge_ops_pre_compaction();
+
+ // Base KVs and Merge operands both in storage
+ ASSERT_OK(Flush());
+ verify_basic();
+ verify_merge_ops_pre_compaction();
+ }
+
+ // Base KVs in storage, Merge operands in memtable
+ DestroyAndReopen(options);
+ write_base();
+ ASSERT_OK(Flush());
+ write_merge();
+ verify_basic();
+ verify_merge_ops_pre_compaction();
+
+ // Flush and compact
+ ASSERT_OK(Flush());
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
+ /* end */ nullptr));
+ verify_basic();
+ verify_merge_ops_post_compaction();
+}
+
+TEST_F(DBWideBasicTest, PutEntityTimestampError) {
+ // Note: timestamps are currently not supported
+
+ Options options = GetDefaultOptions();
+ options.comparator = test::BytewiseComparatorWithU64TsWrapper();
+
+ ColumnFamilyHandle* handle = nullptr;
+ ASSERT_OK(db_->CreateColumnFamily(options, "corinthian", &handle));
+ std::unique_ptr<ColumnFamilyHandle> handle_guard(handle);
+
+ // Use the DB::PutEntity API
+ constexpr char first_key[] = "first";
+ WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
+
+ ASSERT_TRUE(db_->PutEntity(WriteOptions(), handle, first_key, first_columns)
+ .IsInvalidArgument());
+
+ // Use WriteBatch
+ constexpr char second_key[] = "second";
+ WideColumns second_columns{{"doric", "column"}, {"ionic", "column"}};
+
+ WriteBatch batch;
+ ASSERT_TRUE(
+ batch.PutEntity(handle, second_key, second_columns).IsInvalidArgument());
+ ASSERT_OK(db_->Write(WriteOptions(), &batch));
+}
+
+TEST_F(DBWideBasicTest, PutEntitySerializationError) {
+ // Make sure duplicate columns are caught
+
+ Options options = GetDefaultOptions();
+
+ // Use the DB::PutEntity API
+ constexpr char first_key[] = "first";
+ WideColumns first_columns{{"foo", "bar"}, {"foo", "baz"}};
+
+ ASSERT_TRUE(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
+ first_key, first_columns)
+ .IsCorruption());
+
+ // Use WriteBatch
+ constexpr char second_key[] = "second";
+ WideColumns second_columns{{"column", "doric"}, {"column", "ionic"}};
+
+ WriteBatch batch;
+ ASSERT_TRUE(
+ batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns)
+ .IsCorruption());
+ ASSERT_OK(db_->Write(WriteOptions(), &batch));
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ RegisterCustomObjects(argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/db/wide/wide_column_serialization.cc b/src/rocksdb/db/wide/wide_column_serialization.cc
new file mode 100644
index 000000000..f62143c40
--- /dev/null
+++ b/src/rocksdb/db/wide/wide_column_serialization.cc
@@ -0,0 +1,182 @@
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/wide/wide_column_serialization.h"
+
+#include <algorithm>
+#include <cassert>
+#include <limits>
+
+#include "rocksdb/slice.h"
+#include "util/autovector.h"
+#include "util/coding.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+Status WideColumnSerialization::SerializeImpl(const Slice* value_of_default,
+ const WideColumns& columns,
+ std::string& output) {
+ const size_t num_columns =
+ value_of_default ? columns.size() + 1 : columns.size();
+
+ if (num_columns > static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
+ return Status::InvalidArgument("Too many wide columns");
+ }
+
+ PutVarint32(&output, kCurrentVersion);
+
+ PutVarint32(&output, static_cast<uint32_t>(num_columns));
+
+ const Slice* prev_name = nullptr;
+ if (value_of_default) {
+ if (value_of_default->size() >
+ static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
+ return Status::InvalidArgument("Wide column value too long");
+ }
+
+ PutLengthPrefixedSlice(&output, kDefaultWideColumnName);
+ PutVarint32(&output, static_cast<uint32_t>(value_of_default->size()));
+
+ prev_name = &kDefaultWideColumnName;
+ }
+
+ for (size_t i = 0; i < columns.size(); ++i) {
+ const WideColumn& column = columns[i];
+
+ const Slice& name = column.name();
+ if (name.size() >
+ static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
+ return Status::InvalidArgument("Wide column name too long");
+ }
+
+ if (prev_name && prev_name->compare(name) >= 0) {
+ return Status::Corruption("Wide columns out of order");
+ }
+
+ const Slice& value = column.value();
+ if (value.size() >
+ static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
+ return Status::InvalidArgument("Wide column value too long");
+ }
+
+ PutLengthPrefixedSlice(&output, name);
+ PutVarint32(&output, static_cast<uint32_t>(value.size()));
+
+ prev_name = &name;
+ }
+
+ if (value_of_default) {
+ output.append(value_of_default->data(), value_of_default->size());
+ }
+
+ for (const auto& column : columns) {
+ const Slice& value = column.value();
+
+ output.append(value.data(), value.size());
+ }
+
+ return Status::OK();
+}
+
+Status WideColumnSerialization::Deserialize(Slice& input,
+ WideColumns& columns) {
+ assert(columns.empty());
+
+ uint32_t version = 0;
+ if (!GetVarint32(&input, &version)) {
+ return Status::Corruption("Error decoding wide column version");
+ }
+
+ if (version > kCurrentVersion) {
+ return Status::NotSupported("Unsupported wide column version");
+ }
+
+ uint32_t num_columns = 0;
+ if (!GetVarint32(&input, &num_columns)) {
+ return Status::Corruption("Error decoding number of wide columns");
+ }
+
+ if (!num_columns) {
+ return Status::OK();
+ }
+
+ columns.reserve(num_columns);
+
+ autovector<uint32_t, 16> column_value_sizes;
+ column_value_sizes.reserve(num_columns);
+
+ for (uint32_t i = 0; i < num_columns; ++i) {
+ Slice name;
+ if (!GetLengthPrefixedSlice(&input, &name)) {
+ return Status::Corruption("Error decoding wide column name");
+ }
+
+ if (!columns.empty() && columns.back().name().compare(name) >= 0) {
+ return Status::Corruption("Wide columns out of order");
+ }
+
+ columns.emplace_back(name, Slice());
+
+ uint32_t value_size = 0;
+ if (!GetVarint32(&input, &value_size)) {
+ return Status::Corruption("Error decoding wide column value size");
+ }
+
+ column_value_sizes.emplace_back(value_size);
+ }
+
+ const Slice data(input);
+ size_t pos = 0;
+
+ for (uint32_t i = 0; i < num_columns; ++i) {
+ const uint32_t value_size = column_value_sizes[i];
+
+ if (pos + value_size > data.size()) {
+ return Status::Corruption("Error decoding wide column value payload");
+ }
+
+ columns[i].value() = Slice(data.data() + pos, value_size);
+
+ pos += value_size;
+ }
+
+ return Status::OK();
+}
+
+WideColumns::const_iterator WideColumnSerialization::Find(
+ const WideColumns& columns, const Slice& column_name) {
+ const auto it =
+ std::lower_bound(columns.cbegin(), columns.cend(), column_name,
+ [](const WideColumn& lhs, const Slice& rhs) {
+ return lhs.name().compare(rhs) < 0;
+ });
+
+ if (it == columns.cend() || it->name() != column_name) {
+ return columns.cend();
+ }
+
+ return it;
+}
+
+Status WideColumnSerialization::GetValueOfDefaultColumn(Slice& input,
+ Slice& value) {
+ WideColumns columns;
+
+ const Status s = Deserialize(input, columns);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (columns.empty() || columns[0].name() != kDefaultWideColumnName) {
+ value.clear();
+ return Status::OK();
+ }
+
+ value = columns[0].value();
+
+ return Status::OK();
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/db/wide/wide_column_serialization.h b/src/rocksdb/db/wide/wide_column_serialization.h
new file mode 100644
index 000000000..f0ffbd392
--- /dev/null
+++ b/src/rocksdb/db/wide/wide_column_serialization.h
@@ -0,0 +1,77 @@
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/status.h"
+#include "rocksdb/wide_columns.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class Slice;
+
+// Wide-column serialization/deserialization primitives.
+//
+// The two main parts of the layout are 1) a sorted index containing the column
+// names and column value sizes and 2) the column values themselves. Keeping the
+// index and the values separate will enable selectively reading column values
+// down the line. Note that currently the index has to be fully parsed in order
+// to find out the offset of each column value.
+//
+// Legend: cn = column name, cv = column value, cns = column name size, cvs =
+// column value size.
+//
+// +----------+--------------+----------+-------+----------+---...
+// | version | # of columns | cns 1 | cn 1 | cvs 1 |
+// +----------+--------------+------------------+--------- +---...
+// | varint32 | varint32 | varint32 | bytes | varint32 |
+// +----------+--------------+----------+-------+----------+---...
+//
+// ... continued ...
+//
+// ...---+----------+-------+----------+-------+---...---+-------+
+// | cns N | cn N | cvs N | cv 1 | | cv N |
+// ...---+----------+-------+----------+-------+---...---+-------+
+// | varint32 | bytes | varint32 | bytes | | bytes |
+// ...---+----------+-------+----------+-------+---...---+-------+
+
+class WideColumnSerialization {
+ public:
+ static Status Serialize(const WideColumns& columns, std::string& output);
+ static Status Serialize(const Slice& value_of_default,
+ const WideColumns& other_columns,
+ std::string& output);
+
+ static Status Deserialize(Slice& input, WideColumns& columns);
+
+ static WideColumns::const_iterator Find(const WideColumns& columns,
+ const Slice& column_name);
+ static Status GetValueOfDefaultColumn(Slice& input, Slice& value);
+
+ static constexpr uint32_t kCurrentVersion = 1;
+
+ private:
+ static Status SerializeImpl(const Slice* value_of_default,
+ const WideColumns& columns, std::string& output);
+};
+
+inline Status WideColumnSerialization::Serialize(const WideColumns& columns,
+ std::string& output) {
+ constexpr Slice* value_of_default = nullptr;
+
+ return SerializeImpl(value_of_default, columns, output);
+}
+
+inline Status WideColumnSerialization::Serialize(
+ const Slice& value_of_default, const WideColumns& other_columns,
+ std::string& output) {
+ return SerializeImpl(&value_of_default, other_columns, output);
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/db/wide/wide_column_serialization_test.cc b/src/rocksdb/db/wide/wide_column_serialization_test.cc
new file mode 100644
index 000000000..8060d2f24
--- /dev/null
+++ b/src/rocksdb/db/wide/wide_column_serialization_test.cc
@@ -0,0 +1,338 @@
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/wide/wide_column_serialization.h"
+
+#include "test_util/testharness.h"
+#include "util/coding.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+TEST(WideColumnSerializationTest, Construct) {
+ constexpr char foo[] = "foo";
+ constexpr char bar[] = "bar";
+
+ const std::string foo_str(foo);
+ const std::string bar_str(bar);
+
+ const Slice foo_slice(foo_str);
+ const Slice bar_slice(bar_str);
+
+ {
+ WideColumn column(foo, bar);
+ ASSERT_EQ(column.name(), foo);
+ ASSERT_EQ(column.value(), bar);
+ }
+
+ {
+ WideColumn column(foo_str, bar);
+ ASSERT_EQ(column.name(), foo_str);
+ ASSERT_EQ(column.value(), bar);
+ }
+
+ {
+ WideColumn column(foo_slice, bar);
+ ASSERT_EQ(column.name(), foo_slice);
+ ASSERT_EQ(column.value(), bar);
+ }
+
+ {
+ WideColumn column(foo, bar_str);
+ ASSERT_EQ(column.name(), foo);
+ ASSERT_EQ(column.value(), bar_str);
+ }
+
+ {
+ WideColumn column(foo_str, bar_str);
+ ASSERT_EQ(column.name(), foo_str);
+ ASSERT_EQ(column.value(), bar_str);
+ }
+
+ {
+ WideColumn column(foo_slice, bar_str);
+ ASSERT_EQ(column.name(), foo_slice);
+ ASSERT_EQ(column.value(), bar_str);
+ }
+
+ {
+ WideColumn column(foo, bar_slice);
+ ASSERT_EQ(column.name(), foo);
+ ASSERT_EQ(column.value(), bar_slice);
+ }
+
+ {
+ WideColumn column(foo_str, bar_slice);
+ ASSERT_EQ(column.name(), foo_str);
+ ASSERT_EQ(column.value(), bar_slice);
+ }
+
+ {
+ WideColumn column(foo_slice, bar_slice);
+ ASSERT_EQ(column.name(), foo_slice);
+ ASSERT_EQ(column.value(), bar_slice);
+ }
+
+ {
+ constexpr char foo_name[] = "foo_name";
+ constexpr char bar_value[] = "bar_value";
+
+ WideColumn column(std::piecewise_construct,
+ std::forward_as_tuple(foo_name, sizeof(foo) - 1),
+ std::forward_as_tuple(bar_value, sizeof(bar) - 1));
+ ASSERT_EQ(column.name(), foo);
+ ASSERT_EQ(column.value(), bar);
+ }
+}
+
+TEST(WideColumnSerializationTest, SerializeDeserialize) {
+ WideColumns columns{{"foo", "bar"}, {"hello", "world"}};
+ std::string output;
+
+ ASSERT_OK(WideColumnSerialization::Serialize(columns, output));
+
+ Slice input(output);
+ WideColumns deserialized_columns;
+
+ ASSERT_OK(WideColumnSerialization::Deserialize(input, deserialized_columns));
+ ASSERT_EQ(columns, deserialized_columns);
+
+ {
+ const auto it = WideColumnSerialization::Find(deserialized_columns, "foo");
+ ASSERT_NE(it, deserialized_columns.cend());
+ ASSERT_EQ(*it, deserialized_columns.front());
+ }
+
+ {
+ const auto it =
+ WideColumnSerialization::Find(deserialized_columns, "hello");
+ ASSERT_NE(it, deserialized_columns.cend());
+ ASSERT_EQ(*it, deserialized_columns.back());
+ }
+
+ {
+ const auto it =
+ WideColumnSerialization::Find(deserialized_columns, "fubar");
+ ASSERT_EQ(it, deserialized_columns.cend());
+ }
+
+ {
+ const auto it =
+ WideColumnSerialization::Find(deserialized_columns, "snafu");
+ ASSERT_EQ(it, deserialized_columns.cend());
+ }
+}
+
+TEST(WideColumnSerializationTest, SerializeWithPrepend) {
+ Slice value_of_default("baz");
+ WideColumns other_columns{{"foo", "bar"}, {"hello", "world"}};
+
+ std::string output;
+ ASSERT_OK(WideColumnSerialization::Serialize(value_of_default, other_columns,
+ output));
+
+ Slice input(output);
+
+ WideColumns deserialized_columns;
+ ASSERT_OK(WideColumnSerialization::Deserialize(input, deserialized_columns));
+
+ WideColumns expected_columns{{kDefaultWideColumnName, value_of_default},
+ other_columns[0],
+ other_columns[1]};
+ ASSERT_EQ(deserialized_columns, expected_columns);
+}
+
+TEST(WideColumnSerializationTest, SerializeDuplicateError) {
+ WideColumns columns{{"foo", "bar"}, {"foo", "baz"}};
+ std::string output;
+
+ ASSERT_TRUE(
+ WideColumnSerialization::Serialize(columns, output).IsCorruption());
+}
+
+TEST(WideColumnSerializationTest, SerializeWithPrependDuplicateError) {
+ Slice value_of_default("baz");
+ WideColumns other_columns{{kDefaultWideColumnName, "dup"}, {"foo", "bar"}};
+
+ std::string output;
+ ASSERT_TRUE(WideColumnSerialization::Serialize(value_of_default,
+ other_columns, output)
+ .IsCorruption());
+}
+
+TEST(WideColumnSerializationTest, SerializeOutOfOrderError) {
+ WideColumns columns{{"hello", "world"}, {"foo", "bar"}};
+ std::string output;
+
+ ASSERT_TRUE(
+ WideColumnSerialization::Serialize(columns, output).IsCorruption());
+}
+
+TEST(WideColumnSerializationTest, DeserializeVersionError) {
+ // Can't decode version
+
+ std::string buf;
+
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "version"));
+}
+
+TEST(WideColumnSerializationTest, DeserializeUnsupportedVersion) {
+ // Unsupported version
+ constexpr uint32_t future_version = 1000;
+
+ std::string buf;
+ PutVarint32(&buf, future_version);
+
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsNotSupported());
+ ASSERT_TRUE(std::strstr(s.getState(), "version"));
+}
+
+TEST(WideColumnSerializationTest, DeserializeNumberOfColumnsError) {
+ // Can't decode number of columns
+
+ std::string buf;
+ PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
+
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "number"));
+}
+
+TEST(WideColumnSerializationTest, DeserializeColumnsError) {
+ std::string buf;
+
+ PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
+
+ constexpr uint32_t num_columns = 2;
+ PutVarint32(&buf, num_columns);
+
+ // Can't decode the first column name
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "name"));
+ }
+
+ constexpr char first_column_name[] = "foo";
+ PutLengthPrefixedSlice(&buf, first_column_name);
+
+ // Can't decode the size of the first column value
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "value size"));
+ }
+
+ constexpr uint32_t first_value_size = 16;
+ PutVarint32(&buf, first_value_size);
+
+ // Can't decode the second column name
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "name"));
+ }
+
+ constexpr char second_column_name[] = "hello";
+ PutLengthPrefixedSlice(&buf, second_column_name);
+
+ // Can't decode the size of the second column value
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "value size"));
+ }
+
+ constexpr uint32_t second_value_size = 64;
+ PutVarint32(&buf, second_value_size);
+
+ // Can't decode the payload of the first column
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "payload"));
+ }
+
+ buf.append(first_value_size, '0');
+
+ // Can't decode the payload of the second column
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "payload"));
+ }
+
+ buf.append(second_value_size, 'x');
+
+ // Success
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ ASSERT_OK(WideColumnSerialization::Deserialize(input, columns));
+ }
+}
+
+TEST(WideColumnSerializationTest, DeserializeColumnsOutOfOrder) {
+ std::string buf;
+
+ PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
+
+ constexpr uint32_t num_columns = 2;
+ PutVarint32(&buf, num_columns);
+
+ constexpr char first_column_name[] = "b";
+ PutLengthPrefixedSlice(&buf, first_column_name);
+
+ constexpr uint32_t first_value_size = 16;
+ PutVarint32(&buf, first_value_size);
+
+ constexpr char second_column_name[] = "a";
+ PutLengthPrefixedSlice(&buf, second_column_name);
+
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "order"));
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/db/wide/wide_columns.cc b/src/rocksdb/db/wide/wide_columns.cc
new file mode 100644
index 000000000..186be7f85
--- /dev/null
+++ b/src/rocksdb/db/wide/wide_columns.cc
@@ -0,0 +1,22 @@
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/wide_columns.h"
+
+#include "db/wide/wide_column_serialization.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+const Slice kDefaultWideColumnName;
+
+const WideColumns kNoWideColumns;
+
+Status PinnableWideColumns::CreateIndexForWideColumns() {
+ Slice value_copy = value_;
+
+ return WideColumnSerialization::Deserialize(value_copy, columns_);
+}
+
+} // namespace ROCKSDB_NAMESPACE