diff options
Diffstat (limited to '')
13 files changed, 1377 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/merge_operators.h b/src/rocksdb/utilities/merge_operators.h new file mode 100644 index 000000000..018d097b1 --- /dev/null +++ b/src/rocksdb/utilities/merge_operators.h @@ -0,0 +1,55 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "rocksdb/merge_operator.h" + +#include <stdio.h> + +#include <memory> +#include <string> + +namespace ROCKSDB_NAMESPACE { + +class MergeOperators { + public: + static std::shared_ptr<MergeOperator> CreatePutOperator(); + static std::shared_ptr<MergeOperator> CreateDeprecatedPutOperator(); + static std::shared_ptr<MergeOperator> CreateUInt64AddOperator(); + static std::shared_ptr<MergeOperator> CreateStringAppendOperator(); + static std::shared_ptr<MergeOperator> CreateStringAppendOperator(char delim_char); + static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator(); + static std::shared_ptr<MergeOperator> CreateMaxOperator(); + static std::shared_ptr<MergeOperator> CreateBytesXOROperator(); + static std::shared_ptr<MergeOperator> CreateSortOperator(); + + // Will return a different merge operator depending on the string. + // TODO: Hook the "name" up to the actual Name() of the MergeOperators? + static std::shared_ptr<MergeOperator> CreateFromStringId( + const std::string& name) { + if (name == "put") { + return CreatePutOperator(); + } else if (name == "put_v1") { + return CreateDeprecatedPutOperator(); + } else if ( name == "uint64add") { + return CreateUInt64AddOperator(); + } else if (name == "stringappend") { + return CreateStringAppendOperator(); + } else if (name == "stringappendtest") { + return CreateStringAppendTESTOperator(); + } else if (name == "max") { + return CreateMaxOperator(); + } else if (name == "bytesxor") { + return CreateBytesXOROperator(); + } else if (name == "sortlist") { + return CreateSortOperator(); + } else { + // Empty or unknown, just return nullptr + return nullptr; + } + } +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/bytesxor.cc b/src/rocksdb/utilities/merge_operators/bytesxor.cc new file mode 100644 index 000000000..859affb5e --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/bytesxor.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <algorithm> +#include <string> + +#include "utilities/merge_operators/bytesxor.h" + +namespace ROCKSDB_NAMESPACE { + +std::shared_ptr<MergeOperator> MergeOperators::CreateBytesXOROperator() { + return std::make_shared<BytesXOROperator>(); +} + +bool BytesXOROperator::Merge(const Slice& /*key*/, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* /*logger*/) const { + XOR(existing_value, value, new_value); + return true; +} + +void BytesXOROperator::XOR(const Slice* existing_value, + const Slice& value, std::string* new_value) const { + if (!existing_value) { + new_value->clear(); + new_value->assign(value.data(), value.size()); + return; + } + + size_t min_size = std::min(existing_value->size(), value.size()); + size_t max_size = std::max(existing_value->size(), value.size()); + + new_value->clear(); + new_value->reserve(max_size); + + const char* existing_value_data = existing_value->data(); + const char* value_data = value.data(); + + for (size_t i = 0; i < min_size; i++) { + new_value->push_back(existing_value_data[i] ^ value_data[i]); + } + + if (existing_value->size() == max_size) { + for (size_t i = min_size; i < max_size; i++) { + new_value->push_back(existing_value_data[i]); + } + } else { + assert(value.size() == max_size); + for (size_t i = min_size; i < max_size; i++) { + new_value->push_back(value_data[i]); + } + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/bytesxor.h b/src/rocksdb/utilities/merge_operators/bytesxor.h new file mode 100644 index 000000000..ab0c5aecc --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/bytesxor.h @@ -0,0 +1,39 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <algorithm> +#include <memory> +#include <string> +#include "rocksdb/env.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { + +// A 'model' merge operator that XORs two (same sized) array of bytes. +// Implemented as an AssociativeMergeOperator for simplicity and example. +class BytesXOROperator : public AssociativeMergeOperator { + public: + // XORs the two array of bytes one byte at a time and stores the result + // in new_value. len is the number of xored bytes, and the length of new_value + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override; + + virtual const char* Name() const override { + return "BytesXOR"; + } + + void XOR(const Slice* existing_value, const Slice& value, + std::string* new_value) const; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/max.cc b/src/rocksdb/utilities/merge_operators/max.cc new file mode 100644 index 000000000..2270c1f03 --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/max.cc @@ -0,0 +1,77 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <memory> + +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "utilities/merge_operators.h" + +using ROCKSDB_NAMESPACE::Logger; +using ROCKSDB_NAMESPACE::MergeOperator; +using ROCKSDB_NAMESPACE::Slice; + +namespace { // anonymous namespace + +// Merge operator that picks the maximum operand, Comparison is based on +// Slice::compare +class MaxOperator : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + Slice& max = merge_out->existing_operand; + if (merge_in.existing_value) { + max = Slice(merge_in.existing_value->data(), + merge_in.existing_value->size()); + } else if (max.data() == nullptr) { + max = Slice(); + } + + for (const auto& op : merge_in.operand_list) { + if (max.compare(op) < 0) { + max = op; + } + } + + return true; + } + + bool PartialMerge(const Slice& /*key*/, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* /*logger*/) const override { + if (left_operand.compare(right_operand) >= 0) { + new_value->assign(left_operand.data(), left_operand.size()); + } else { + new_value->assign(right_operand.data(), right_operand.size()); + } + return true; + } + + bool PartialMergeMulti(const Slice& /*key*/, + const std::deque<Slice>& operand_list, + std::string* new_value, + Logger* /*logger*/) const override { + Slice max; + for (const auto& operand : operand_list) { + if (max.compare(operand) < 0) { + max = operand; + } + } + + new_value->assign(max.data(), max.size()); + return true; + } + + const char* Name() const override { return "MaxOperator"; } +}; + +} // end of anonymous namespace + +namespace ROCKSDB_NAMESPACE { + +std::shared_ptr<MergeOperator> MergeOperators::CreateMaxOperator() { + return std::make_shared<MaxOperator>(); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/put.cc b/src/rocksdb/utilities/merge_operators/put.cc new file mode 100644 index 000000000..901d69e94 --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/put.cc @@ -0,0 +1,83 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <memory> +#include "rocksdb/slice.h" +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" + +using namespace ROCKSDB_NAMESPACE; + +namespace { // anonymous namespace + +// A merge operator that mimics Put semantics +// Since this merge-operator will not be used in production, +// it is implemented as a non-associative merge operator to illustrate the +// new interface and for testing purposes. (That is, we inherit from +// the MergeOperator class rather than the AssociativeMergeOperator +// which would be simpler in this case). +// +// From the client-perspective, semantics are the same. +class PutOperator : public MergeOperator { + public: + bool FullMerge(const Slice& /*key*/, const Slice* /*existing_value*/, + const std::deque<std::string>& operand_sequence, + std::string* new_value, Logger* /*logger*/) const override { + // Put basically only looks at the current/latest value + assert(!operand_sequence.empty()); + assert(new_value != nullptr); + new_value->assign(operand_sequence.back()); + return true; + } + + bool PartialMerge(const Slice& /*key*/, const Slice& /*left_operand*/, + const Slice& right_operand, std::string* new_value, + Logger* /*logger*/) const override { + new_value->assign(right_operand.data(), right_operand.size()); + return true; + } + + using MergeOperator::PartialMergeMulti; + bool PartialMergeMulti(const Slice& /*key*/, + const std::deque<Slice>& operand_list, + std::string* new_value, + Logger* /*logger*/) const override { + new_value->assign(operand_list.back().data(), operand_list.back().size()); + return true; + } + + const char* Name() const override { return "PutOperator"; } +}; + +class PutOperatorV2 : public PutOperator { + bool FullMerge(const Slice& /*key*/, const Slice* /*existing_value*/, + const std::deque<std::string>& /*operand_sequence*/, + std::string* /*new_value*/, + Logger* /*logger*/) const override { + assert(false); + return false; + } + + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + // Put basically only looks at the current/latest value + assert(!merge_in.operand_list.empty()); + merge_out->existing_operand = merge_in.operand_list.back(); + return true; + } +}; + +} // end of anonymous namespace + +namespace ROCKSDB_NAMESPACE { + +std::shared_ptr<MergeOperator> MergeOperators::CreateDeprecatedPutOperator() { + return std::make_shared<PutOperator>(); +} + +std::shared_ptr<MergeOperator> MergeOperators::CreatePutOperator() { + return std::make_shared<PutOperatorV2>(); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/sortlist.cc b/src/rocksdb/utilities/merge_operators/sortlist.cc new file mode 100644 index 000000000..b6bd65b36 --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/sortlist.cc @@ -0,0 +1,100 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#include "utilities/merge_operators/sortlist.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "utilities/merge_operators.h" + +using ROCKSDB_NAMESPACE::Logger; +using ROCKSDB_NAMESPACE::MergeOperator; +using ROCKSDB_NAMESPACE::Slice; + +namespace ROCKSDB_NAMESPACE { + +bool SortList::FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + std::vector<int> left; + for (Slice slice : merge_in.operand_list) { + std::vector<int> right; + MakeVector(right, slice); + left = Merge(left, right); + } + for (int i = 0; i < static_cast<int>(left.size()) - 1; i++) { + merge_out->new_value.append(std::to_string(left[i])).append(","); + } + merge_out->new_value.append(std::to_string(left.back())); + return true; +} + +bool SortList::PartialMerge(const Slice& /*key*/, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* /*logger*/) const { + std::vector<int> left; + std::vector<int> right; + MakeVector(left, left_operand); + MakeVector(right, right_operand); + left = Merge(left, right); + for (int i = 0; i < static_cast<int>(left.size()) - 1; i++) { + new_value->append(std::to_string(left[i])).append(","); + } + new_value->append(std::to_string(left.back())); + return true; +} + +bool SortList::PartialMergeMulti(const Slice& /*key*/, + const std::deque<Slice>& operand_list, + std::string* new_value, + Logger* /*logger*/) const { + (void)operand_list; + (void)new_value; + return true; +} + +const char* SortList::Name() const { return "MergeSortOperator"; } + +void SortList::MakeVector(std::vector<int>& operand, Slice slice) const { + do { + const char* begin = slice.data_; + while (*slice.data_ != ',' && *slice.data_) slice.data_++; + operand.push_back(std::stoi(std::string(begin, slice.data_))); + } while (0 != *slice.data_++); +} + +std::vector<int> SortList::Merge(std::vector<int>& left, + std::vector<int>& right) const { + // Fill the resultant vector with sorted results from both vectors + std::vector<int> result; + unsigned left_it = 0, right_it = 0; + + while (left_it < left.size() && right_it < right.size()) { + // If the left value is smaller than the right it goes next + // into the resultant vector + if (left[left_it] < right[right_it]) { + result.push_back(left[left_it]); + left_it++; + } else { + result.push_back(right[right_it]); + right_it++; + } + } + + // Push the remaining data from both vectors onto the resultant + while (left_it < left.size()) { + result.push_back(left[left_it]); + left_it++; + } + + while (right_it < right.size()) { + result.push_back(right[right_it]); + right_it++; + } + + return result; +} + +std::shared_ptr<MergeOperator> MergeOperators::CreateSortOperator() { + return std::make_shared<SortList>(); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/sortlist.h b/src/rocksdb/utilities/merge_operators/sortlist.h new file mode 100644 index 000000000..5e08bd583 --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/sortlist.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// A MergeOperator for RocksDB that implements Merge Sort. +// It is built using the MergeOperator interface. The operator works by taking +// an input which contains one or more merge operands where each operand is a +// list of sorted ints and merges them to form a large sorted list. +#pragma once + +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" + +namespace ROCKSDB_NAMESPACE { + +class SortList : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; + + bool PartialMerge(const Slice& /*key*/, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* /*logger*/) const override; + + bool PartialMergeMulti(const Slice& key, + const std::deque<Slice>& operand_list, + std::string* new_value, Logger* logger) const override; + + const char* Name() const override; + + void MakeVector(std::vector<int>& operand, Slice slice) const; + + private: + std::vector<int> Merge(std::vector<int>& left, std::vector<int>& right) const; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/string_append/stringappend.cc b/src/rocksdb/utilities/merge_operators/string_append/stringappend.cc new file mode 100644 index 000000000..534f7a566 --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/string_append/stringappend.cc @@ -0,0 +1,59 @@ +/** + * A MergeOperator for rocksdb that implements string append. + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#include "stringappend.h" + +#include <memory> +#include <assert.h> + +#include "rocksdb/slice.h" +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { + +// Constructor: also specify the delimiter character. +StringAppendOperator::StringAppendOperator(char delim_char) + : delim_(delim_char) { +} + +// Implementation for the merge operation (concatenates two strings) +bool StringAppendOperator::Merge(const Slice& /*key*/, + const Slice* existing_value, + const Slice& value, std::string* new_value, + Logger* /*logger*/) const { + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + if (!existing_value) { + // No existing_value. Set *new_value = value + new_value->assign(value.data(),value.size()); + } else { + // Generic append (existing_value != null). + // Reserve *new_value to correct size, and apply concatenation. + new_value->reserve(existing_value->size() + 1 + value.size()); + new_value->assign(existing_value->data(),existing_value->size()); + new_value->append(1,delim_); + new_value->append(value.data(), value.size()); + } + + return true; +} + +const char* StringAppendOperator::Name() const { + return "StringAppendOperator"; +} + +std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator() { + return std::make_shared<StringAppendOperator>(','); +} + +std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator(char delim_char) { + return std::make_shared<StringAppendOperator>(delim_char); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/string_append/stringappend.h b/src/rocksdb/utilities/merge_operators/string_append/stringappend.h new file mode 100644 index 000000000..388612f1e --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/string_append/stringappend.h @@ -0,0 +1,31 @@ +/** + * A MergeOperator for rocksdb that implements string append. + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#pragma once +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" + +namespace ROCKSDB_NAMESPACE { + +class StringAppendOperator : public AssociativeMergeOperator { + public: + // Constructor: specify delimiter + explicit StringAppendOperator(char delim_char); + + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override; + + virtual const char* Name() const override; + + private: + char delim_; // The delimiter is inserted between elements + +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/string_append/stringappend2.cc b/src/rocksdb/utilities/merge_operators/string_append/stringappend2.cc new file mode 100644 index 000000000..b8c676ee5 --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/string_append/stringappend2.cc @@ -0,0 +1,117 @@ +/** + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#include "stringappend2.h" + +#include <memory> +#include <string> +#include <assert.h> + +#include "rocksdb/slice.h" +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { + +// Constructor: also specify the delimiter character. +StringAppendTESTOperator::StringAppendTESTOperator(char delim_char) + : delim_(delim_char) { +} + +// Implementation for the merge operation (concatenates two strings) +bool StringAppendTESTOperator::FullMergeV2( + const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + // Clear the *new_value for writing. + merge_out->new_value.clear(); + + if (merge_in.existing_value == nullptr && merge_in.operand_list.size() == 1) { + // Only one operand + merge_out->existing_operand = merge_in.operand_list.back(); + return true; + } + + // Compute the space needed for the final result. + size_t numBytes = 0; + for (auto it = merge_in.operand_list.begin(); + it != merge_in.operand_list.end(); ++it) { + numBytes += it->size() + 1; // Plus 1 for the delimiter + } + + // Only print the delimiter after the first entry has been printed + bool printDelim = false; + + // Prepend the *existing_value if one exists. + if (merge_in.existing_value) { + merge_out->new_value.reserve(numBytes + merge_in.existing_value->size()); + merge_out->new_value.append(merge_in.existing_value->data(), + merge_in.existing_value->size()); + printDelim = true; + } else if (numBytes) { + merge_out->new_value.reserve( + numBytes - 1); // Minus 1 since we have one less delimiter + } + + // Concatenate the sequence of strings (and add a delimiter between each) + for (auto it = merge_in.operand_list.begin(); + it != merge_in.operand_list.end(); ++it) { + if (printDelim) { + merge_out->new_value.append(1, delim_); + } + merge_out->new_value.append(it->data(), it->size()); + printDelim = true; + } + + return true; +} + +bool StringAppendTESTOperator::PartialMergeMulti( + const Slice& /*key*/, const std::deque<Slice>& /*operand_list*/, + std::string* /*new_value*/, Logger* /*logger*/) const { + return false; +} + +// A version of PartialMerge that actually performs "partial merging". +// Use this to simulate the exact behaviour of the StringAppendOperator. +bool StringAppendTESTOperator::_AssocPartialMergeMulti( + const Slice& /*key*/, const std::deque<Slice>& operand_list, + std::string* new_value, Logger* /*logger*/) const { + // Clear the *new_value for writing + assert(new_value); + new_value->clear(); + assert(operand_list.size() >= 2); + + // Generic append + // Determine and reserve correct size for *new_value. + size_t size = 0; + for (const auto& operand : operand_list) { + size += operand.size(); + } + size += operand_list.size() - 1; // Delimiters + new_value->reserve(size); + + // Apply concatenation + new_value->assign(operand_list.front().data(), operand_list.front().size()); + + for (std::deque<Slice>::const_iterator it = operand_list.begin() + 1; + it != operand_list.end(); ++it) { + new_value->append(1, delim_); + new_value->append(it->data(), it->size()); + } + + return true; +} + +const char* StringAppendTESTOperator::Name() const { + return "StringAppendTESTOperator"; +} + + +std::shared_ptr<MergeOperator> +MergeOperators::CreateStringAppendTESTOperator() { + return std::make_shared<StringAppendTESTOperator>(','); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/string_append/stringappend2.h b/src/rocksdb/utilities/merge_operators/string_append/stringappend2.h new file mode 100644 index 000000000..452164d8e --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/string_append/stringappend2.h @@ -0,0 +1,49 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +/** + * A TEST MergeOperator for rocksdb that implements string append. + * It is built using the MergeOperator interface rather than the simpler + * AssociativeMergeOperator interface. This is useful for testing/benchmarking. + * While the two operators are semantically the same, all production code + * should use the StringAppendOperator defined in stringappend.{h,cc}. The + * operator defined in the present file is primarily for testing. + * + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#pragma once +#include <deque> +#include <string> + +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" + +namespace ROCKSDB_NAMESPACE { + +class StringAppendTESTOperator : public MergeOperator { + public: + // Constructor with delimiter + explicit StringAppendTESTOperator(char delim_char); + + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; + + virtual bool PartialMergeMulti(const Slice& key, + const std::deque<Slice>& operand_list, + std::string* new_value, Logger* logger) const + override; + + virtual const char* Name() const override; + + private: + // A version of PartialMerge that actually performs "partial merging". + // Use this to simulate the exact behaviour of the StringAppendOperator. + bool _AssocPartialMergeMulti(const Slice& key, + const std::deque<Slice>& operand_list, + std::string* new_value, Logger* logger) const; + + char delim_; // The delimiter is inserted between elements + +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/merge_operators/string_append/stringappend_test.cc b/src/rocksdb/utilities/merge_operators/string_append/stringappend_test.cc new file mode 100644 index 000000000..c5f5e3e7c --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/string_append/stringappend_test.cc @@ -0,0 +1,601 @@ +/** + * An persistent map : key -> (list of strings), using rocksdb merge. + * This file is a test-harness / use-case for the StringAppendOperator. + * + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook, Inc. +*/ + +#include <iostream> +#include <map> + +#include "rocksdb/db.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/utilities/db_ttl.h" +#include "test_util/testharness.h" +#include "util/random.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/merge_operators/string_append/stringappend2.h" + +using namespace ROCKSDB_NAMESPACE; + +namespace ROCKSDB_NAMESPACE { + +// Path to the database on file system +const std::string kDbName = test::PerThreadDBPath("stringappend_test"); + +namespace { +// OpenDb opens a (possibly new) rocksdb database with a StringAppendOperator +std::shared_ptr<DB> OpenNormalDb(char delim_char) { + DB* db; + Options options; + options.create_if_missing = true; + options.merge_operator.reset(new StringAppendOperator(delim_char)); + EXPECT_OK(DB::Open(options, kDbName, &db)); + return std::shared_ptr<DB>(db); +} + +#ifndef ROCKSDB_LITE // TtlDb is not supported in Lite +// Open a TtlDB with a non-associative StringAppendTESTOperator +std::shared_ptr<DB> OpenTtlDb(char delim_char) { + DBWithTTL* db; + Options options; + options.create_if_missing = true; + options.merge_operator.reset(new StringAppendTESTOperator(delim_char)); + EXPECT_OK(DBWithTTL::Open(options, kDbName, &db, 123456)); + return std::shared_ptr<DB>(db); +} +#endif // !ROCKSDB_LITE +} // namespace + +/// StringLists represents a set of string-lists, each with a key-index. +/// Supports Append(list, string) and Get(list) +class StringLists { + public: + + //Constructor: specifies the rocksdb db + /* implicit */ + StringLists(std::shared_ptr<DB> db) + : db_(db), + merge_option_(), + get_option_() { + assert(db); + } + + // Append string val onto the list defined by key; return true on success + bool Append(const std::string& key, const std::string& val){ + Slice valSlice(val.data(), val.size()); + auto s = db_->Merge(merge_option_, key, valSlice); + + if (s.ok()) { + return true; + } else { + std::cerr << "ERROR " << s.ToString() << std::endl; + return false; + } + } + + // Returns the list of strings associated with key (or "" if does not exist) + bool Get(const std::string& key, std::string* const result){ + assert(result != nullptr); // we should have a place to store the result + auto s = db_->Get(get_option_, key, result); + + if (s.ok()) { + return true; + } + + // Either key does not exist, or there is some error. + *result = ""; // Always return empty string (just for convention) + + //NotFound is okay; just return empty (similar to std::map) + //But network or db errors, etc, should fail the test (or at least yell) + if (!s.IsNotFound()) { + std::cerr << "ERROR " << s.ToString() << std::endl; + } + + // Always return false if s.ok() was not true + return false; + } + + + private: + std::shared_ptr<DB> db_; + WriteOptions merge_option_; + ReadOptions get_option_; + +}; + + +// The class for unit-testing +class StringAppendOperatorTest : public testing::Test { + public: + StringAppendOperatorTest() { + DestroyDB(kDbName, Options()); // Start each test with a fresh DB + } + + typedef std::shared_ptr<DB> (* OpenFuncPtr)(char); + + // Allows user to open databases with different configurations. + // e.g.: Can open a DB or a TtlDB, etc. + static void SetOpenDbFunction(OpenFuncPtr func) { + OpenDb = func; + } + + protected: + static OpenFuncPtr OpenDb; +}; +StringAppendOperatorTest::OpenFuncPtr StringAppendOperatorTest::OpenDb = nullptr; + +// THE TEST CASES BEGIN HERE + +TEST_F(StringAppendOperatorTest, IteratorTest) { + auto db_ = OpenDb(','); + StringLists slists(db_); + + slists.Append("k1", "v1"); + slists.Append("k1", "v2"); + slists.Append("k1", "v3"); + + slists.Append("k2", "a1"); + slists.Append("k2", "a2"); + slists.Append("k2", "a3"); + + std::string res; + std::unique_ptr<ROCKSDB_NAMESPACE::Iterator> it( + db_->NewIterator(ReadOptions())); + std::string k1("k1"); + std::string k2("k2"); + bool first = true; + for (it->Seek(k1); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "v1,v2,v3"); + first = false; + } else { + ASSERT_EQ(res, "a1,a2,a3"); + } + } + slists.Append("k2", "a4"); + slists.Append("k1", "v4"); + + // Snapshot should still be the same. Should ignore a4 and v4. + first = true; + for (it->Seek(k1); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "v1,v2,v3"); + first = false; + } else { + ASSERT_EQ(res, "a1,a2,a3"); + } + } + + + // Should release the snapshot and be aware of the new stuff now + it.reset(db_->NewIterator(ReadOptions())); + first = true; + for (it->Seek(k1); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "v1,v2,v3,v4"); + first = false; + } else { + ASSERT_EQ(res, "a1,a2,a3,a4"); + } + } + + // start from k2 this time. + for (it->Seek(k2); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "v1,v2,v3,v4"); + first = false; + } else { + ASSERT_EQ(res, "a1,a2,a3,a4"); + } + } + + slists.Append("k3", "g1"); + + it.reset(db_->NewIterator(ReadOptions())); + first = true; + std::string k3("k3"); + for(it->Seek(k2); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + ASSERT_EQ(res, "a1,a2,a3,a4"); + first = false; + } else { + ASSERT_EQ(res, "g1"); + } + } + for(it->Seek(k3); it->Valid(); it->Next()) { + res = it->value().ToString(); + if (first) { + // should not be hit + ASSERT_EQ(res, "a1,a2,a3,a4"); + first = false; + } else { + ASSERT_EQ(res, "g1"); + } + } + +} + +TEST_F(StringAppendOperatorTest, SimpleTest) { + auto db = OpenDb(','); + StringLists slists(db); + + slists.Append("k1", "v1"); + slists.Append("k1", "v2"); + slists.Append("k1", "v3"); + + std::string res; + bool status = slists.Get("k1", &res); + + ASSERT_TRUE(status); + ASSERT_EQ(res, "v1,v2,v3"); +} + +TEST_F(StringAppendOperatorTest, SimpleDelimiterTest) { + auto db = OpenDb('|'); + StringLists slists(db); + + slists.Append("k1", "v1"); + slists.Append("k1", "v2"); + slists.Append("k1", "v3"); + + std::string res; + slists.Get("k1", &res); + ASSERT_EQ(res, "v1|v2|v3"); +} + +TEST_F(StringAppendOperatorTest, OneValueNoDelimiterTest) { + auto db = OpenDb('!'); + StringLists slists(db); + + slists.Append("random_key", "single_val"); + + std::string res; + slists.Get("random_key", &res); + ASSERT_EQ(res, "single_val"); +} + +TEST_F(StringAppendOperatorTest, VariousKeys) { + auto db = OpenDb('\n'); + StringLists slists(db); + + slists.Append("c", "asdasd"); + slists.Append("a", "x"); + slists.Append("b", "y"); + slists.Append("a", "t"); + slists.Append("a", "r"); + slists.Append("b", "2"); + slists.Append("c", "asdasd"); + + std::string a, b, c; + bool sa, sb, sc; + sa = slists.Get("a", &a); + sb = slists.Get("b", &b); + sc = slists.Get("c", &c); + + ASSERT_TRUE(sa && sb && sc); // All three keys should have been found + + ASSERT_EQ(a, "x\nt\nr"); + ASSERT_EQ(b, "y\n2"); + ASSERT_EQ(c, "asdasd\nasdasd"); +} + +// Generate semi random keys/words from a small distribution. +TEST_F(StringAppendOperatorTest, RandomMixGetAppend) { + auto db = OpenDb(' '); + StringLists slists(db); + + // Generate a list of random keys and values + const int kWordCount = 15; + std::string words[] = {"sdasd", "triejf", "fnjsdfn", "dfjisdfsf", "342839", + "dsuha", "mabuais", "sadajsid", "jf9834hf", "2d9j89", + "dj9823jd", "a", "dk02ed2dh", "$(jd4h984$(*", "mabz"}; + const int kKeyCount = 6; + std::string keys[] = {"dhaiusdhu", "denidw", "daisda", "keykey", "muki", + "shzassdianmd"}; + + // Will store a local copy of all data in order to verify correctness + std::map<std::string, std::string> parallel_copy; + + // Generate a bunch of random queries (Append and Get)! + enum query_t { APPEND_OP, GET_OP, NUM_OPS }; + Random randomGen(1337); //deterministic seed; always get same results! + + const int kNumQueries = 30; + for (int q=0; q<kNumQueries; ++q) { + // Generate a random query (Append or Get) and random parameters + query_t query = (query_t)randomGen.Uniform((int)NUM_OPS); + std::string key = keys[randomGen.Uniform((int)kKeyCount)]; + std::string word = words[randomGen.Uniform((int)kWordCount)]; + + // Apply the query and any checks. + if (query == APPEND_OP) { + + // Apply the rocksdb test-harness Append defined above + slists.Append(key, word); //apply the rocksdb append + + // Apply the similar "Append" to the parallel copy + if (parallel_copy[key].size() > 0) { + parallel_copy[key] += " " + word; + } else { + parallel_copy[key] = word; + } + + } else if (query == GET_OP) { + // Assumes that a non-existent key just returns <empty> + std::string res; + slists.Get(key, &res); + ASSERT_EQ(res, parallel_copy[key]); + } + + } + +} + +TEST_F(StringAppendOperatorTest, BIGRandomMixGetAppend) { + auto db = OpenDb(' '); + StringLists slists(db); + + // Generate a list of random keys and values + const int kWordCount = 15; + std::string words[] = {"sdasd", "triejf", "fnjsdfn", "dfjisdfsf", "342839", + "dsuha", "mabuais", "sadajsid", "jf9834hf", "2d9j89", + "dj9823jd", "a", "dk02ed2dh", "$(jd4h984$(*", "mabz"}; + const int kKeyCount = 6; + std::string keys[] = {"dhaiusdhu", "denidw", "daisda", "keykey", "muki", + "shzassdianmd"}; + + // Will store a local copy of all data in order to verify correctness + std::map<std::string, std::string> parallel_copy; + + // Generate a bunch of random queries (Append and Get)! + enum query_t { APPEND_OP, GET_OP, NUM_OPS }; + Random randomGen(9138204); // deterministic seed + + const int kNumQueries = 1000; + for (int q=0; q<kNumQueries; ++q) { + // Generate a random query (Append or Get) and random parameters + query_t query = (query_t)randomGen.Uniform((int)NUM_OPS); + std::string key = keys[randomGen.Uniform((int)kKeyCount)]; + std::string word = words[randomGen.Uniform((int)kWordCount)]; + + //Apply the query and any checks. + if (query == APPEND_OP) { + + // Apply the rocksdb test-harness Append defined above + slists.Append(key, word); //apply the rocksdb append + + // Apply the similar "Append" to the parallel copy + if (parallel_copy[key].size() > 0) { + parallel_copy[key] += " " + word; + } else { + parallel_copy[key] = word; + } + + } else if (query == GET_OP) { + // Assumes that a non-existent key just returns <empty> + std::string res; + slists.Get(key, &res); + ASSERT_EQ(res, parallel_copy[key]); + } + + } + +} + +TEST_F(StringAppendOperatorTest, PersistentVariousKeys) { + // Perform the following operations in limited scope + { + auto db = OpenDb('\n'); + StringLists slists(db); + + slists.Append("c", "asdasd"); + slists.Append("a", "x"); + slists.Append("b", "y"); + slists.Append("a", "t"); + slists.Append("a", "r"); + slists.Append("b", "2"); + slists.Append("c", "asdasd"); + + std::string a, b, c; + slists.Get("a", &a); + slists.Get("b", &b); + slists.Get("c", &c); + + ASSERT_EQ(a, "x\nt\nr"); + ASSERT_EQ(b, "y\n2"); + ASSERT_EQ(c, "asdasd\nasdasd"); + } + + // Reopen the database (the previous changes should persist / be remembered) + { + auto db = OpenDb('\n'); + StringLists slists(db); + + slists.Append("c", "bbnagnagsx"); + slists.Append("a", "sa"); + slists.Append("b", "df"); + slists.Append("a", "gh"); + slists.Append("a", "jk"); + slists.Append("b", "l;"); + slists.Append("c", "rogosh"); + + // The previous changes should be on disk (L0) + // The most recent changes should be in memory (MemTable) + // Hence, this will test both Get() paths. + std::string a, b, c; + slists.Get("a", &a); + slists.Get("b", &b); + slists.Get("c", &c); + + ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk"); + ASSERT_EQ(b, "y\n2\ndf\nl;"); + ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); + } + + // Reopen the database (the previous changes should persist / be remembered) + { + auto db = OpenDb('\n'); + StringLists slists(db); + + // All changes should be on disk. This will test VersionSet Get() + std::string a, b, c; + slists.Get("a", &a); + slists.Get("b", &b); + slists.Get("c", &c); + + ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk"); + ASSERT_EQ(b, "y\n2\ndf\nl;"); + ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); + } +} + +TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) { + // Perform the following operations in limited scope + { + auto db = OpenDb('\n'); + StringLists slists(db); + std::string a, b, c; + bool success; + + // Append, Flush, Get + slists.Append("c", "asdasd"); + db->Flush(ROCKSDB_NAMESPACE::FlushOptions()); + success = slists.Get("c", &c); + ASSERT_TRUE(success); + ASSERT_EQ(c, "asdasd"); + + // Append, Flush, Append, Get + slists.Append("a", "x"); + slists.Append("b", "y"); + db->Flush(ROCKSDB_NAMESPACE::FlushOptions()); + slists.Append("a", "t"); + slists.Append("a", "r"); + slists.Append("b", "2"); + + success = slists.Get("a", &a); + assert(success == true); + ASSERT_EQ(a, "x\nt\nr"); + + success = slists.Get("b", &b); + assert(success == true); + ASSERT_EQ(b, "y\n2"); + + // Append, Get + success = slists.Append("c", "asdasd"); + assert(success); + success = slists.Append("b", "monkey"); + assert(success); + + // I omit the "assert(success)" checks here. + slists.Get("a", &a); + slists.Get("b", &b); + slists.Get("c", &c); + + ASSERT_EQ(a, "x\nt\nr"); + ASSERT_EQ(b, "y\n2\nmonkey"); + ASSERT_EQ(c, "asdasd\nasdasd"); + } + + // Reopen the database (the previous changes should persist / be remembered) + { + auto db = OpenDb('\n'); + StringLists slists(db); + std::string a, b, c; + + // Get (Quick check for persistence of previous database) + slists.Get("a", &a); + ASSERT_EQ(a, "x\nt\nr"); + + //Append, Compact, Get + slists.Append("c", "bbnagnagsx"); + slists.Append("a", "sa"); + slists.Append("b", "df"); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); + slists.Get("a", &a); + slists.Get("b", &b); + slists.Get("c", &c); + ASSERT_EQ(a, "x\nt\nr\nsa"); + ASSERT_EQ(b, "y\n2\nmonkey\ndf"); + ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx"); + + // Append, Get + slists.Append("a", "gh"); + slists.Append("a", "jk"); + slists.Append("b", "l;"); + slists.Append("c", "rogosh"); + slists.Get("a", &a); + slists.Get("b", &b); + slists.Get("c", &c); + ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk"); + ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;"); + ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); + + // Compact, Get + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); + ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk"); + ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;"); + ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); + + // Append, Flush, Compact, Get + slists.Append("b", "afcg"); + db->Flush(ROCKSDB_NAMESPACE::FlushOptions()); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); + slists.Get("b", &b); + ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;\nafcg"); + } +} + +TEST_F(StringAppendOperatorTest, SimpleTestNullDelimiter) { + auto db = OpenDb('\0'); + StringLists slists(db); + + slists.Append("k1", "v1"); + slists.Append("k1", "v2"); + slists.Append("k1", "v3"); + + std::string res; + bool status = slists.Get("k1", &res); + ASSERT_TRUE(status); + + // Construct the desired string. Default constructor doesn't like '\0' chars. + std::string checker("v1,v2,v3"); // Verify that the string is right size. + checker[2] = '\0'; // Use null delimiter instead of comma. + checker[5] = '\0'; + assert(checker.size() == 8); // Verify it is still the correct size + + // Check that the rocksdb result string matches the desired string + assert(res.size() == checker.size()); + ASSERT_EQ(res, checker); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + // Run with regular database + int result; + { + fprintf(stderr, "Running tests with regular db and operator.\n"); + StringAppendOperatorTest::SetOpenDbFunction(&OpenNormalDb); + result = RUN_ALL_TESTS(); + } + +#ifndef ROCKSDB_LITE // TtlDb is not supported in Lite + // Run with TTL + { + fprintf(stderr, "Running tests with ttl db and generic operator.\n"); + StringAppendOperatorTest::SetOpenDbFunction(&OpenTtlDb); + result |= RUN_ALL_TESTS(); + } +#endif // !ROCKSDB_LITE + + return result; +} diff --git a/src/rocksdb/utilities/merge_operators/uint64add.cc b/src/rocksdb/utilities/merge_operators/uint64add.cc new file mode 100644 index 000000000..3ef240928 --- /dev/null +++ b/src/rocksdb/utilities/merge_operators/uint64add.cc @@ -0,0 +1,69 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <memory> + +#include "logging/logging.h" +#include "rocksdb/env.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "utilities/merge_operators.h" + +using namespace ROCKSDB_NAMESPACE; + +namespace { // anonymous namespace + +// A 'model' merge operator with uint64 addition semantics +// Implemented as an AssociativeMergeOperator for simplicity and example. +class UInt64AddOperator : public AssociativeMergeOperator { + public: + bool Merge(const Slice& /*key*/, const Slice* existing_value, + const Slice& value, std::string* new_value, + Logger* logger) const override { + uint64_t orig_value = 0; + if (existing_value){ + orig_value = DecodeInteger(*existing_value, logger); + } + uint64_t operand = DecodeInteger(value, logger); + + assert(new_value); + new_value->clear(); + PutFixed64(new_value, orig_value + operand); + + return true; // Return true always since corruption will be treated as 0 + } + + const char* Name() const override { return "UInt64AddOperator"; } + + private: + // Takes the string and decodes it into a uint64_t + // On error, prints a message and returns 0 + uint64_t DecodeInteger(const Slice& value, Logger* logger) const { + uint64_t result = 0; + + if (value.size() == sizeof(uint64_t)) { + result = DecodeFixed64(value.data()); + } else if (logger != nullptr) { + // If value is corrupted, treat it as 0 + ROCKS_LOG_ERROR(logger, "uint64 value corruption, size: %" ROCKSDB_PRIszt + " > %" ROCKSDB_PRIszt, + value.size(), sizeof(uint64_t)); + } + + return result; + } + +}; + +} + +namespace ROCKSDB_NAMESPACE { + +std::shared_ptr<MergeOperator> MergeOperators::CreateUInt64AddOperator() { + return std::make_shared<UInt64AddOperator>(); +} + +} // namespace ROCKSDB_NAMESPACE |