diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/wide | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/wide')
-rw-r--r-- | src/rocksdb/db/wide/db_wide_basic_test.cc | 654 | ||||
-rw-r--r-- | src/rocksdb/db/wide/wide_column_serialization.cc | 182 | ||||
-rw-r--r-- | src/rocksdb/db/wide/wide_column_serialization.h | 77 | ||||
-rw-r--r-- | src/rocksdb/db/wide/wide_column_serialization_test.cc | 338 | ||||
-rw-r--r-- | src/rocksdb/db/wide/wide_columns.cc | 22 |
5 files changed, 1273 insertions, 0 deletions
diff --git a/src/rocksdb/db/wide/db_wide_basic_test.cc b/src/rocksdb/db/wide/db_wide_basic_test.cc new file mode 100644 index 000000000..1ffe314fe --- /dev/null +++ b/src/rocksdb/db/wide/db_wide_basic_test.cc @@ -0,0 +1,654 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <array> +#include <memory> + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/testutil.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { + +class DBWideBasicTest : public DBTestBase { + protected: + explicit DBWideBasicTest() + : DBTestBase("db_wide_basic_test", /* env_do_fsync */ false) {} +}; + +TEST_F(DBWideBasicTest, PutEntity) { + Options options = GetDefaultOptions(); + + // Write a couple of wide-column entities and a plain old key-value, then read + // them back. + constexpr char first_key[] = "first"; + constexpr char first_value_of_default_column[] = "hello"; + WideColumns first_columns{ + {kDefaultWideColumnName, first_value_of_default_column}, + {"attr_name1", "foo"}, + {"attr_name2", "bar"}}; + + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + + constexpr char third_key[] = "third"; + constexpr char third_value[] = "baz"; + + auto verify = [&]() { + const WideColumns expected_third_columns{ + {kDefaultWideColumnName, third_value}}; + + { + PinnableSlice result; + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key, + &result)); + ASSERT_EQ(result, first_value_of_default_column); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result)); + ASSERT_EQ(result.columns(), first_columns); + } + + { + PinnableSlice result; + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key, + &result)); + ASSERT_TRUE(result.empty()); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result)); + ASSERT_EQ(result.columns(), second_columns); + } + + { + PinnableSlice result; + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), third_key, + &result)); + ASSERT_EQ(result, third_value); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + third_key, &result)); + + ASSERT_EQ(result.columns(), expected_third_columns); + } + + { + constexpr size_t num_keys = 3; + + std::array<Slice, num_keys> keys{{first_key, second_key, third_key}}; + std::array<PinnableSlice, num_keys> values; + std::array<Status, num_keys> statuses; + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + &keys[0], &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value_of_default_column); + + ASSERT_OK(statuses[1]); + ASSERT_TRUE(values[1].empty()); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } + + { + std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions())); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_value_of_default_column); + ASSERT_EQ(iter->columns(), first_columns); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_TRUE(iter->value().empty()); + ASSERT_EQ(iter->columns(), second_columns); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), third_value); + ASSERT_EQ(iter->columns(), expected_third_columns); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), third_value); + ASSERT_EQ(iter->columns(), expected_third_columns); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_TRUE(iter->value().empty()); + ASSERT_EQ(iter->columns(), second_columns); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_value_of_default_column); + ASSERT_EQ(iter->columns(), first_columns); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + // Use the DB::PutEntity API to write the first entity + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns)); + + // Use WriteBatch to write the second entity + WriteBatch batch; + ASSERT_OK( + batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + // Use Put to write the plain key-value + ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), third_key, + third_value)); + + // Try reading from memtable + verify(); + + // Try reading after recovery + Close(); + options.avoid_flush_during_recovery = true; + Reopen(options); + + verify(); + + // Try reading from storage + ASSERT_OK(Flush()); + + verify(); +} + +TEST_F(DBWideBasicTest, PutEntityColumnFamily) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"corinthian"}, options); + + // Use the DB::PutEntity API + constexpr char first_key[] = "first"; + WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; + + ASSERT_OK( + db_->PutEntity(WriteOptions(), handles_[1], first_key, first_columns)); + + // Use WriteBatch + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + + WriteBatch batch; + ASSERT_OK(batch.PutEntity(handles_[1], second_key, second_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); +} + +TEST_F(DBWideBasicTest, MergePlainKeyValue) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + Reopen(options); + + // Put + Merge + constexpr char first_key[] = "first"; + constexpr char first_base_value[] = "hello"; + constexpr char first_merge_op[] = "world"; + + // Delete + Merge + constexpr char second_key[] = "second"; + constexpr char second_merge_op[] = "foo"; + + // Merge without any preceding KV + constexpr char third_key[] = "third"; + constexpr char third_merge_op[] = "bar"; + + auto write_base = [&]() { + // Write "base" KVs: a Put for the 1st key and a Delete for the 2nd one; + // note there is no "base" KV for the 3rd + ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), first_key, + first_base_value)); + ASSERT_OK( + db_->Delete(WriteOptions(), db_->DefaultColumnFamily(), second_key)); + }; + + auto write_merge = [&]() { + // Write Merge operands + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, + first_merge_op)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, + second_merge_op)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), third_key, + third_merge_op)); + }; + + const std::string expected_first_column(std::string(first_base_value) + "," + + first_merge_op); + const WideColumns expected_first_columns{ + {kDefaultWideColumnName, expected_first_column}}; + const WideColumns expected_second_columns{ + {kDefaultWideColumnName, second_merge_op}}; + const WideColumns expected_third_columns{ + {kDefaultWideColumnName, third_merge_op}}; + + auto verify = [&]() { + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result)); + ASSERT_EQ(result.columns(), expected_first_columns); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result)); + ASSERT_EQ(result.columns(), expected_second_columns); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + third_key, &result)); + + ASSERT_EQ(result.columns(), expected_third_columns); + } + + { + std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions())); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), expected_first_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_first_columns); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), expected_second_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_second_columns); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), expected_third_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_third_columns); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), third_key); + ASSERT_EQ(iter->value(), expected_third_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_third_columns); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), expected_second_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_second_columns); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), expected_first_columns[0].value()); + ASSERT_EQ(iter->columns(), expected_first_columns); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + { + // Base KVs (if any) and Merge operands both in memtable (note: we take a + // snapshot in between to make sure they do not get reconciled during the + // subsequent flush) + write_base(); + ManagedSnapshot snapshot(db_); + write_merge(); + verify(); + + // Base KVs (if any) and Merge operands both in storage + ASSERT_OK(Flush()); + verify(); + } + + // Base KVs (if any) in storage, Merge operands in memtable + DestroyAndReopen(options); + write_base(); + ASSERT_OK(Flush()); + write_merge(); + verify(); +} + +TEST_F(DBWideBasicTest, MergeEntity) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + + const std::string delim("|"); + options.merge_operator = MergeOperators::CreateStringAppendOperator(delim); + + Reopen(options); + + // Test Merge with two entities: one that has the default column and one that + // doesn't + constexpr char first_key[] = "first"; + WideColumns first_columns{{kDefaultWideColumnName, "a"}, + {"attr_name1", "foo"}, + {"attr_name2", "bar"}}; + constexpr char first_merge_operand[] = "bla1"; + + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + constexpr char second_merge_operand[] = "bla2"; + + auto write_base = [&]() { + // Use the DB::PutEntity API + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns)); + + // Use WriteBatch + WriteBatch batch; + ASSERT_OK(batch.PutEntity(db_->DefaultColumnFamily(), second_key, + second_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + }; + + auto write_merge = [&]() { + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, + first_merge_operand)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, + second_merge_operand)); + }; + + const std::string first_expected_default(first_columns[0].value().ToString() + + delim + first_merge_operand); + const std::string second_expected_default(delim + second_merge_operand); + + auto verify_basic = [&]() { + WideColumns first_expected_columns{ + {kDefaultWideColumnName, first_expected_default}, + first_columns[1], + first_columns[2]}; + + WideColumns second_expected_columns{ + {kDefaultWideColumnName, second_expected_default}, + second_columns[0], + second_columns[1]}; + + { + PinnableSlice result; + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key, + &result)); + ASSERT_EQ(result, first_expected_default); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result)); + ASSERT_EQ(result.columns(), first_expected_columns); + } + + { + PinnableSlice result; + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key, + &result)); + ASSERT_EQ(result, second_expected_default); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result)); + ASSERT_EQ(result.columns(), second_expected_columns); + } + + { + constexpr size_t num_keys = 2; + + std::array<Slice, num_keys> keys{{first_key, second_key}}; + std::array<PinnableSlice, num_keys> values; + std::array<Status, num_keys> statuses; + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + &keys[0], &values[0], &statuses[0]); + + ASSERT_EQ(values[0], first_expected_default); + ASSERT_OK(statuses[0]); + + ASSERT_EQ(values[1], second_expected_default); + ASSERT_OK(statuses[1]); + } + + { + std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions())); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_expected_default); + ASSERT_EQ(iter->columns(), first_expected_columns); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), second_expected_default); + ASSERT_EQ(iter->columns(), second_expected_columns); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_EQ(iter->value(), second_expected_default); + ASSERT_EQ(iter->columns(), second_expected_columns); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_expected_default); + ASSERT_EQ(iter->columns(), first_expected_columns); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + auto verify_merge_ops_pre_compaction = [&]() { + constexpr size_t num_merge_operands = 2; + + GetMergeOperandsOptions get_merge_opts; + get_merge_opts.expected_max_number_of_operands = num_merge_operands; + + { + std::array<PinnableSlice, num_merge_operands> merge_operands; + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_EQ(merge_operands[0], first_columns[0].value()); + ASSERT_EQ(merge_operands[1], first_merge_operand); + } + + { + std::array<PinnableSlice, num_merge_operands> merge_operands; + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_TRUE(merge_operands[0].empty()); + ASSERT_EQ(merge_operands[1], second_merge_operand); + } + }; + + auto verify_merge_ops_post_compaction = [&]() { + constexpr size_t num_merge_operands = 1; + + GetMergeOperandsOptions get_merge_opts; + get_merge_opts.expected_max_number_of_operands = num_merge_operands; + + { + std::array<PinnableSlice, num_merge_operands> merge_operands; + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_EQ(merge_operands[0], first_expected_default); + } + + { + std::array<PinnableSlice, num_merge_operands> merge_operands; + int number_of_operands = 0; + + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &merge_operands[0], + &get_merge_opts, &number_of_operands)); + + ASSERT_EQ(number_of_operands, num_merge_operands); + ASSERT_EQ(merge_operands[0], second_expected_default); + } + }; + + { + // Base KVs and Merge operands both in memtable (note: we take a snapshot in + // between to make sure they do not get reconciled during the subsequent + // flush) + write_base(); + ManagedSnapshot snapshot(db_); + write_merge(); + verify_basic(); + verify_merge_ops_pre_compaction(); + + // Base KVs and Merge operands both in storage + ASSERT_OK(Flush()); + verify_basic(); + verify_merge_ops_pre_compaction(); + } + + // Base KVs in storage, Merge operands in memtable + DestroyAndReopen(options); + write_base(); + ASSERT_OK(Flush()); + write_merge(); + verify_basic(); + verify_merge_ops_pre_compaction(); + + // Flush and compact + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr, + /* end */ nullptr)); + verify_basic(); + verify_merge_ops_post_compaction(); +} + +TEST_F(DBWideBasicTest, PutEntityTimestampError) { + // Note: timestamps are currently not supported + + Options options = GetDefaultOptions(); + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + + ColumnFamilyHandle* handle = nullptr; + ASSERT_OK(db_->CreateColumnFamily(options, "corinthian", &handle)); + std::unique_ptr<ColumnFamilyHandle> handle_guard(handle); + + // Use the DB::PutEntity API + constexpr char first_key[] = "first"; + WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; + + ASSERT_TRUE(db_->PutEntity(WriteOptions(), handle, first_key, first_columns) + .IsInvalidArgument()); + + // Use WriteBatch + constexpr char second_key[] = "second"; + WideColumns second_columns{{"doric", "column"}, {"ionic", "column"}}; + + WriteBatch batch; + ASSERT_TRUE( + batch.PutEntity(handle, second_key, second_columns).IsInvalidArgument()); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); +} + +TEST_F(DBWideBasicTest, PutEntitySerializationError) { + // Make sure duplicate columns are caught + + Options options = GetDefaultOptions(); + + // Use the DB::PutEntity API + constexpr char first_key[] = "first"; + WideColumns first_columns{{"foo", "bar"}, {"foo", "baz"}}; + + ASSERT_TRUE(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns) + .IsCorruption()); + + // Use WriteBatch + constexpr char second_key[] = "second"; + WideColumns second_columns{{"column", "doric"}, {"column", "ionic"}}; + + WriteBatch batch; + ASSERT_TRUE( + batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns) + .IsCorruption()); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + RegisterCustomObjects(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/db/wide/wide_column_serialization.cc b/src/rocksdb/db/wide/wide_column_serialization.cc new file mode 100644 index 000000000..f62143c40 --- /dev/null +++ b/src/rocksdb/db/wide/wide_column_serialization.cc @@ -0,0 +1,182 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/wide/wide_column_serialization.h" + +#include <algorithm> +#include <cassert> +#include <limits> + +#include "rocksdb/slice.h" +#include "util/autovector.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +Status WideColumnSerialization::SerializeImpl(const Slice* value_of_default, + const WideColumns& columns, + std::string& output) { + const size_t num_columns = + value_of_default ? columns.size() + 1 : columns.size(); + + if (num_columns > static_cast<size_t>(std::numeric_limits<uint32_t>::max())) { + return Status::InvalidArgument("Too many wide columns"); + } + + PutVarint32(&output, kCurrentVersion); + + PutVarint32(&output, static_cast<uint32_t>(num_columns)); + + const Slice* prev_name = nullptr; + if (value_of_default) { + if (value_of_default->size() > + static_cast<size_t>(std::numeric_limits<uint32_t>::max())) { + return Status::InvalidArgument("Wide column value too long"); + } + + PutLengthPrefixedSlice(&output, kDefaultWideColumnName); + PutVarint32(&output, static_cast<uint32_t>(value_of_default->size())); + + prev_name = &kDefaultWideColumnName; + } + + for (size_t i = 0; i < columns.size(); ++i) { + const WideColumn& column = columns[i]; + + const Slice& name = column.name(); + if (name.size() > + static_cast<size_t>(std::numeric_limits<uint32_t>::max())) { + return Status::InvalidArgument("Wide column name too long"); + } + + if (prev_name && prev_name->compare(name) >= 0) { + return Status::Corruption("Wide columns out of order"); + } + + const Slice& value = column.value(); + if (value.size() > + static_cast<size_t>(std::numeric_limits<uint32_t>::max())) { + return Status::InvalidArgument("Wide column value too long"); + } + + PutLengthPrefixedSlice(&output, name); + PutVarint32(&output, static_cast<uint32_t>(value.size())); + + prev_name = &name; + } + + if (value_of_default) { + output.append(value_of_default->data(), value_of_default->size()); + } + + for (const auto& column : columns) { + const Slice& value = column.value(); + + output.append(value.data(), value.size()); + } + + return Status::OK(); +} + +Status WideColumnSerialization::Deserialize(Slice& input, + WideColumns& columns) { + assert(columns.empty()); + + uint32_t version = 0; + if (!GetVarint32(&input, &version)) { + return Status::Corruption("Error decoding wide column version"); + } + + if (version > kCurrentVersion) { + return Status::NotSupported("Unsupported wide column version"); + } + + uint32_t num_columns = 0; + if (!GetVarint32(&input, &num_columns)) { + return Status::Corruption("Error decoding number of wide columns"); + } + + if (!num_columns) { + return Status::OK(); + } + + columns.reserve(num_columns); + + autovector<uint32_t, 16> column_value_sizes; + column_value_sizes.reserve(num_columns); + + for (uint32_t i = 0; i < num_columns; ++i) { + Slice name; + if (!GetLengthPrefixedSlice(&input, &name)) { + return Status::Corruption("Error decoding wide column name"); + } + + if (!columns.empty() && columns.back().name().compare(name) >= 0) { + return Status::Corruption("Wide columns out of order"); + } + + columns.emplace_back(name, Slice()); + + uint32_t value_size = 0; + if (!GetVarint32(&input, &value_size)) { + return Status::Corruption("Error decoding wide column value size"); + } + + column_value_sizes.emplace_back(value_size); + } + + const Slice data(input); + size_t pos = 0; + + for (uint32_t i = 0; i < num_columns; ++i) { + const uint32_t value_size = column_value_sizes[i]; + + if (pos + value_size > data.size()) { + return Status::Corruption("Error decoding wide column value payload"); + } + + columns[i].value() = Slice(data.data() + pos, value_size); + + pos += value_size; + } + + return Status::OK(); +} + +WideColumns::const_iterator WideColumnSerialization::Find( + const WideColumns& columns, const Slice& column_name) { + const auto it = + std::lower_bound(columns.cbegin(), columns.cend(), column_name, + [](const WideColumn& lhs, const Slice& rhs) { + return lhs.name().compare(rhs) < 0; + }); + + if (it == columns.cend() || it->name() != column_name) { + return columns.cend(); + } + + return it; +} + +Status WideColumnSerialization::GetValueOfDefaultColumn(Slice& input, + Slice& value) { + WideColumns columns; + + const Status s = Deserialize(input, columns); + if (!s.ok()) { + return s; + } + + if (columns.empty() || columns[0].name() != kDefaultWideColumnName) { + value.clear(); + return Status::OK(); + } + + value = columns[0].value(); + + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/db/wide/wide_column_serialization.h b/src/rocksdb/db/wide/wide_column_serialization.h new file mode 100644 index 000000000..f0ffbd392 --- /dev/null +++ b/src/rocksdb/db/wide/wide_column_serialization.h @@ -0,0 +1,77 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <cstdint> +#include <string> + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/wide_columns.h" + +namespace ROCKSDB_NAMESPACE { + +class Slice; + +// Wide-column serialization/deserialization primitives. +// +// The two main parts of the layout are 1) a sorted index containing the column +// names and column value sizes and 2) the column values themselves. Keeping the +// index and the values separate will enable selectively reading column values +// down the line. Note that currently the index has to be fully parsed in order +// to find out the offset of each column value. +// +// Legend: cn = column name, cv = column value, cns = column name size, cvs = +// column value size. +// +// +----------+--------------+----------+-------+----------+---... +// | version | # of columns | cns 1 | cn 1 | cvs 1 | +// +----------+--------------+------------------+--------- +---... +// | varint32 | varint32 | varint32 | bytes | varint32 | +// +----------+--------------+----------+-------+----------+---... +// +// ... continued ... +// +// ...---+----------+-------+----------+-------+---...---+-------+ +// | cns N | cn N | cvs N | cv 1 | | cv N | +// ...---+----------+-------+----------+-------+---...---+-------+ +// | varint32 | bytes | varint32 | bytes | | bytes | +// ...---+----------+-------+----------+-------+---...---+-------+ + +class WideColumnSerialization { + public: + static Status Serialize(const WideColumns& columns, std::string& output); + static Status Serialize(const Slice& value_of_default, + const WideColumns& other_columns, + std::string& output); + + static Status Deserialize(Slice& input, WideColumns& columns); + + static WideColumns::const_iterator Find(const WideColumns& columns, + const Slice& column_name); + static Status GetValueOfDefaultColumn(Slice& input, Slice& value); + + static constexpr uint32_t kCurrentVersion = 1; + + private: + static Status SerializeImpl(const Slice* value_of_default, + const WideColumns& columns, std::string& output); +}; + +inline Status WideColumnSerialization::Serialize(const WideColumns& columns, + std::string& output) { + constexpr Slice* value_of_default = nullptr; + + return SerializeImpl(value_of_default, columns, output); +} + +inline Status WideColumnSerialization::Serialize( + const Slice& value_of_default, const WideColumns& other_columns, + std::string& output) { + return SerializeImpl(&value_of_default, other_columns, output); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/db/wide/wide_column_serialization_test.cc b/src/rocksdb/db/wide/wide_column_serialization_test.cc new file mode 100644 index 000000000..8060d2f24 --- /dev/null +++ b/src/rocksdb/db/wide/wide_column_serialization_test.cc @@ -0,0 +1,338 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/wide/wide_column_serialization.h" + +#include "test_util/testharness.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +TEST(WideColumnSerializationTest, Construct) { + constexpr char foo[] = "foo"; + constexpr char bar[] = "bar"; + + const std::string foo_str(foo); + const std::string bar_str(bar); + + const Slice foo_slice(foo_str); + const Slice bar_slice(bar_str); + + { + WideColumn column(foo, bar); + ASSERT_EQ(column.name(), foo); + ASSERT_EQ(column.value(), bar); + } + + { + WideColumn column(foo_str, bar); + ASSERT_EQ(column.name(), foo_str); + ASSERT_EQ(column.value(), bar); + } + + { + WideColumn column(foo_slice, bar); + ASSERT_EQ(column.name(), foo_slice); + ASSERT_EQ(column.value(), bar); + } + + { + WideColumn column(foo, bar_str); + ASSERT_EQ(column.name(), foo); + ASSERT_EQ(column.value(), bar_str); + } + + { + WideColumn column(foo_str, bar_str); + ASSERT_EQ(column.name(), foo_str); + ASSERT_EQ(column.value(), bar_str); + } + + { + WideColumn column(foo_slice, bar_str); + ASSERT_EQ(column.name(), foo_slice); + ASSERT_EQ(column.value(), bar_str); + } + + { + WideColumn column(foo, bar_slice); + ASSERT_EQ(column.name(), foo); + ASSERT_EQ(column.value(), bar_slice); + } + + { + WideColumn column(foo_str, bar_slice); + ASSERT_EQ(column.name(), foo_str); + ASSERT_EQ(column.value(), bar_slice); + } + + { + WideColumn column(foo_slice, bar_slice); + ASSERT_EQ(column.name(), foo_slice); + ASSERT_EQ(column.value(), bar_slice); + } + + { + constexpr char foo_name[] = "foo_name"; + constexpr char bar_value[] = "bar_value"; + + WideColumn column(std::piecewise_construct, + std::forward_as_tuple(foo_name, sizeof(foo) - 1), + std::forward_as_tuple(bar_value, sizeof(bar) - 1)); + ASSERT_EQ(column.name(), foo); + ASSERT_EQ(column.value(), bar); + } +} + +TEST(WideColumnSerializationTest, SerializeDeserialize) { + WideColumns columns{{"foo", "bar"}, {"hello", "world"}}; + std::string output; + + ASSERT_OK(WideColumnSerialization::Serialize(columns, output)); + + Slice input(output); + WideColumns deserialized_columns; + + ASSERT_OK(WideColumnSerialization::Deserialize(input, deserialized_columns)); + ASSERT_EQ(columns, deserialized_columns); + + { + const auto it = WideColumnSerialization::Find(deserialized_columns, "foo"); + ASSERT_NE(it, deserialized_columns.cend()); + ASSERT_EQ(*it, deserialized_columns.front()); + } + + { + const auto it = + WideColumnSerialization::Find(deserialized_columns, "hello"); + ASSERT_NE(it, deserialized_columns.cend()); + ASSERT_EQ(*it, deserialized_columns.back()); + } + + { + const auto it = + WideColumnSerialization::Find(deserialized_columns, "fubar"); + ASSERT_EQ(it, deserialized_columns.cend()); + } + + { + const auto it = + WideColumnSerialization::Find(deserialized_columns, "snafu"); + ASSERT_EQ(it, deserialized_columns.cend()); + } +} + +TEST(WideColumnSerializationTest, SerializeWithPrepend) { + Slice value_of_default("baz"); + WideColumns other_columns{{"foo", "bar"}, {"hello", "world"}}; + + std::string output; + ASSERT_OK(WideColumnSerialization::Serialize(value_of_default, other_columns, + output)); + + Slice input(output); + + WideColumns deserialized_columns; + ASSERT_OK(WideColumnSerialization::Deserialize(input, deserialized_columns)); + + WideColumns expected_columns{{kDefaultWideColumnName, value_of_default}, + other_columns[0], + other_columns[1]}; + ASSERT_EQ(deserialized_columns, expected_columns); +} + +TEST(WideColumnSerializationTest, SerializeDuplicateError) { + WideColumns columns{{"foo", "bar"}, {"foo", "baz"}}; + std::string output; + + ASSERT_TRUE( + WideColumnSerialization::Serialize(columns, output).IsCorruption()); +} + +TEST(WideColumnSerializationTest, SerializeWithPrependDuplicateError) { + Slice value_of_default("baz"); + WideColumns other_columns{{kDefaultWideColumnName, "dup"}, {"foo", "bar"}}; + + std::string output; + ASSERT_TRUE(WideColumnSerialization::Serialize(value_of_default, + other_columns, output) + .IsCorruption()); +} + +TEST(WideColumnSerializationTest, SerializeOutOfOrderError) { + WideColumns columns{{"hello", "world"}, {"foo", "bar"}}; + std::string output; + + ASSERT_TRUE( + WideColumnSerialization::Serialize(columns, output).IsCorruption()); +} + +TEST(WideColumnSerializationTest, DeserializeVersionError) { + // Can't decode version + + std::string buf; + + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "version")); +} + +TEST(WideColumnSerializationTest, DeserializeUnsupportedVersion) { + // Unsupported version + constexpr uint32_t future_version = 1000; + + std::string buf; + PutVarint32(&buf, future_version); + + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsNotSupported()); + ASSERT_TRUE(std::strstr(s.getState(), "version")); +} + +TEST(WideColumnSerializationTest, DeserializeNumberOfColumnsError) { + // Can't decode number of columns + + std::string buf; + PutVarint32(&buf, WideColumnSerialization::kCurrentVersion); + + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "number")); +} + +TEST(WideColumnSerializationTest, DeserializeColumnsError) { + std::string buf; + + PutVarint32(&buf, WideColumnSerialization::kCurrentVersion); + + constexpr uint32_t num_columns = 2; + PutVarint32(&buf, num_columns); + + // Can't decode the first column name + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "name")); + } + + constexpr char first_column_name[] = "foo"; + PutLengthPrefixedSlice(&buf, first_column_name); + + // Can't decode the size of the first column value + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "value size")); + } + + constexpr uint32_t first_value_size = 16; + PutVarint32(&buf, first_value_size); + + // Can't decode the second column name + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "name")); + } + + constexpr char second_column_name[] = "hello"; + PutLengthPrefixedSlice(&buf, second_column_name); + + // Can't decode the size of the second column value + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "value size")); + } + + constexpr uint32_t second_value_size = 64; + PutVarint32(&buf, second_value_size); + + // Can't decode the payload of the first column + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "payload")); + } + + buf.append(first_value_size, '0'); + + // Can't decode the payload of the second column + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "payload")); + } + + buf.append(second_value_size, 'x'); + + // Success + { + Slice input(buf); + WideColumns columns; + + ASSERT_OK(WideColumnSerialization::Deserialize(input, columns)); + } +} + +TEST(WideColumnSerializationTest, DeserializeColumnsOutOfOrder) { + std::string buf; + + PutVarint32(&buf, WideColumnSerialization::kCurrentVersion); + + constexpr uint32_t num_columns = 2; + PutVarint32(&buf, num_columns); + + constexpr char first_column_name[] = "b"; + PutLengthPrefixedSlice(&buf, first_column_name); + + constexpr uint32_t first_value_size = 16; + PutVarint32(&buf, first_value_size); + + constexpr char second_column_name[] = "a"; + PutLengthPrefixedSlice(&buf, second_column_name); + + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "order")); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/db/wide/wide_columns.cc b/src/rocksdb/db/wide/wide_columns.cc new file mode 100644 index 000000000..186be7f85 --- /dev/null +++ b/src/rocksdb/db/wide/wide_columns.cc @@ -0,0 +1,22 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/wide_columns.h" + +#include "db/wide/wide_column_serialization.h" + +namespace ROCKSDB_NAMESPACE { + +const Slice kDefaultWideColumnName; + +const WideColumns kNoWideColumns; + +Status PinnableWideColumns::CreateIndexForWideColumns() { + Slice value_copy = value_; + + return WideColumnSerialization::Deserialize(value_copy, columns_); +} + +} // namespace ROCKSDB_NAMESPACE |