summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/merge_helper.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/merge_helper.h')
-rw-r--r--src/rocksdb/db/merge_helper.h194
1 files changed, 194 insertions, 0 deletions
diff --git a/src/rocksdb/db/merge_helper.h b/src/rocksdb/db/merge_helper.h
new file mode 100644
index 00000000..670cba59
--- /dev/null
+++ b/src/rocksdb/db/merge_helper.h
@@ -0,0 +1,194 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#include <deque>
+#include <string>
+#include <vector>
+
+#include "db/dbformat.h"
+#include "db/merge_context.h"
+#include "db/range_del_aggregator.h"
+#include "db/snapshot_checker.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "util/stop_watch.h"
+
+namespace rocksdb {
+
+class Comparator;
+class Iterator;
+class Logger;
+class MergeOperator;
+class Statistics;
+
+class MergeHelper {
+ public:
+ MergeHelper(Env* env, const Comparator* user_comparator,
+ const MergeOperator* user_merge_operator,
+ const CompactionFilter* compaction_filter, Logger* logger,
+ bool assert_valid_internal_key, SequenceNumber latest_snapshot,
+ const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
+ Statistics* stats = nullptr,
+ const std::atomic<bool>* shutting_down = nullptr);
+
+ // Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
+ // Result of merge will be written to result if status returned is OK.
+ // If operands is empty, the value will simply be copied to result.
+ // Set `update_num_ops_stats` to true if it is from a user read, so that
+ // the latency is sensitive.
+ // Returns one of the following statuses:
+ // - OK: Entries were successfully merged.
+ // - Corruption: Merge operator reported unsuccessful merge.
+ static Status TimedFullMerge(const MergeOperator* merge_operator,
+ const Slice& key, const Slice* value,
+ const std::vector<Slice>& operands,
+ std::string* result, Logger* logger,
+ Statistics* statistics, Env* env,
+ Slice* result_operand = nullptr,
+ bool update_num_ops_stats = false);
+
+ // Merge entries until we hit
+ // - a corrupted key
+ // - a Put/Delete,
+ // - a different user key,
+ // - a specific sequence number (snapshot boundary),
+ // - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
+ // or - the end of iteration
+ // iter: (IN) points to the first merge type entry
+ // (OUT) points to the first entry not included in the merge process
+ // range_del_agg: (IN) filters merge operands covered by range tombstones.
+ // stop_before: (IN) a sequence number that merge should not cross.
+ // 0 means no restriction
+ // at_bottom: (IN) true if the iterator covers the bottem level, which means
+ // we could reach the start of the history of this user key.
+ //
+ // Returns one of the following statuses:
+ // - OK: Entries were successfully merged.
+ // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
+ // of key's history. Output consists of merge operands only.
+ // - Corruption: Merge operator reported unsuccessful merge or a corrupted
+ // key has been encountered and not expected (applies only when compiling
+ // with asserts removed).
+ // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
+ //
+ // REQUIRED: The first key in the input is not corrupted.
+ Status MergeUntil(InternalIterator* iter,
+ CompactionRangeDelAggregator* range_del_agg = nullptr,
+ const SequenceNumber stop_before = 0,
+ const bool at_bottom = false);
+
+ // Filters a merge operand using the compaction filter specified
+ // in the constructor. Returns the decision that the filter made.
+ // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
+ // optional outputs of compaction filter.
+ CompactionFilter::Decision FilterMerge(const Slice& user_key,
+ const Slice& value_slice);
+
+ // Query the merge result
+ // These are valid until the next MergeUntil call
+ // If the merging was successful:
+ // - keys() contains a single element with the latest sequence number of
+ // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
+ // - values() contains a single element with the result of merging all the
+ // operands together
+ //
+ // IMPORTANT 1: the key type could change after the MergeUntil call.
+ // Put/Delete + Merge + ... + Merge => Put
+ // Merge + ... + Merge => Merge
+ //
+ // If the merge operator is not associative, and if a Put/Delete is not found
+ // then the merging will be unsuccessful. In this case:
+ // - keys() contains the list of internal keys seen in order of iteration.
+ // - values() contains the list of values (merges) seen in the same order.
+ // values() is parallel to keys() so that the first entry in
+ // keys() is the key associated with the first entry in values()
+ // and so on. These lists will be the same length.
+ // All of these pairs will be merges over the same user key.
+ // See IMPORTANT 2 note below.
+ //
+ // IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
+ // So keys().back() was the first key seen by iterator.
+ // TODO: Re-style this comment to be like the first one
+ const std::deque<std::string>& keys() const { return keys_; }
+ const std::vector<Slice>& values() const {
+ return merge_context_.GetOperands();
+ }
+ uint64_t TotalFilterTime() const { return total_filter_time_; }
+ bool HasOperator() const { return user_merge_operator_ != nullptr; }
+
+ // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
+ // return true and fill *until with the key to which we should skip.
+ // If true, keys() and values() are empty.
+ bool FilteredUntil(Slice* skip_until) const {
+ if (!has_compaction_filter_skip_until_) {
+ return false;
+ }
+ assert(compaction_filter_ != nullptr);
+ assert(skip_until != nullptr);
+ assert(compaction_filter_skip_until_.Valid());
+ *skip_until = compaction_filter_skip_until_.Encode();
+ return true;
+ }
+
+ private:
+ Env* env_;
+ const Comparator* user_comparator_;
+ const MergeOperator* user_merge_operator_;
+ const CompactionFilter* compaction_filter_;
+ const std::atomic<bool>* shutting_down_;
+ Logger* logger_;
+ bool assert_valid_internal_key_; // enforce no internal key corruption?
+ bool allow_single_operand_;
+ SequenceNumber latest_snapshot_;
+ const SnapshotChecker* const snapshot_checker_;
+ int level_;
+
+ // the scratch area that holds the result of MergeUntil
+ // valid up to the next MergeUntil call
+
+ // Keeps track of the sequence of keys seen
+ std::deque<std::string> keys_;
+ // Parallel with keys_; stores the operands
+ mutable MergeContext merge_context_;
+
+ StopWatchNano filter_timer_;
+ uint64_t total_filter_time_;
+ Statistics* stats_;
+
+ bool has_compaction_filter_skip_until_ = false;
+ std::string compaction_filter_value_;
+ InternalKey compaction_filter_skip_until_;
+
+ bool IsShuttingDown() {
+ // This is a best-effort facility, so memory_order_relaxed is sufficient.
+ return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
+ }
+};
+
+// MergeOutputIterator can be used to iterate over the result of a merge.
+class MergeOutputIterator {
+ public:
+ // The MergeOutputIterator is bound to a MergeHelper instance.
+ explicit MergeOutputIterator(const MergeHelper* merge_helper);
+
+ // Seeks to the first record in the output.
+ void SeekToFirst();
+ // Advances to the next record in the output.
+ void Next();
+
+ Slice key() { return Slice(*it_keys_); }
+ Slice value() { return Slice(*it_values_); }
+ bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
+
+ private:
+ const MergeHelper* merge_helper_;
+ std::deque<std::string>::const_reverse_iterator it_keys_;
+ std::vector<Slice>::const_reverse_iterator it_values_;
+};
+
+} // namespace rocksdb