From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/db/merge_helper.h | 216 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 src/rocksdb/db/merge_helper.h (limited to 'src/rocksdb/db/merge_helper.h') diff --git a/src/rocksdb/db/merge_helper.h b/src/rocksdb/db/merge_helper.h new file mode 100644 index 000000000..790ec6239 --- /dev/null +++ b/src/rocksdb/db/merge_helper.h @@ -0,0 +1,216 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#include +#include +#include + +#include "db/merge_context.h" +#include "db/range_del_aggregator.h" +#include "db/snapshot_checker.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/env.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "rocksdb/wide_columns.h" +#include "util/stop_watch.h" + +namespace ROCKSDB_NAMESPACE { + +class Comparator; +class Iterator; +class Logger; +class MergeOperator; +class Statistics; +class SystemClock; +class BlobFetcher; +class PrefetchBufferCollection; +struct CompactionIterationStats; + +class MergeHelper { + public: + MergeHelper(Env* env, const Comparator* user_comparator, + const MergeOperator* user_merge_operator, + const CompactionFilter* compaction_filter, Logger* logger, + bool assert_valid_internal_key, SequenceNumber latest_snapshot, + const SnapshotChecker* snapshot_checker = nullptr, int level = 0, + Statistics* stats = nullptr, + const std::atomic* shutting_down = nullptr); + + // Wrapper around MergeOperator::FullMergeV2() that records perf statistics. + // Result of merge will be written to result if status returned is OK. + // If operands is empty, the value will simply be copied to result. + // Set `update_num_ops_stats` to true if it is from a user read, so that + // the latency is sensitive. + // Returns one of the following statuses: + // - OK: Entries were successfully merged. + // - Corruption: Merge operator reported unsuccessful merge. + static Status TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, const Slice* value, + const std::vector& operands, + std::string* result, Logger* logger, + Statistics* statistics, SystemClock* clock, + Slice* result_operand, + bool update_num_ops_stats); + + static Status TimedFullMergeWithEntity( + const MergeOperator* merge_operator, const Slice& key, Slice base_entity, + const std::vector& operands, std::string* result, Logger* logger, + Statistics* statistics, SystemClock* clock, bool update_num_ops_stats); + + // During compaction, merge entries until we hit + // - a corrupted key + // - a Put/Delete, + // - a different user key, + // - a specific sequence number (snapshot boundary), + // - REMOVE_AND_SKIP_UNTIL returned from compaction filter, + // or - the end of iteration + // iter: (IN) points to the first merge type entry + // (OUT) points to the first entry not included in the merge process + // range_del_agg: (IN) filters merge operands covered by range tombstones. + // stop_before: (IN) a sequence number that merge should not cross. + // 0 means no restriction + // at_bottom: (IN) true if the iterator covers the bottem level, which means + // we could reach the start of the history of this user key. + // allow_data_in_errors: (IN) if true, data details will be displayed in + // error/log messages. + // blob_fetcher: (IN) blob fetcher object for the compaction's input version. + // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers + // used for compaction readahead. + // c_iter_stats: (OUT) compaction iteration statistics. + // + // Returns one of the following statuses: + // - OK: Entries were successfully merged. + // - MergeInProgress: Put/Delete not encountered, and didn't reach the start + // of key's history. Output consists of merge operands only. + // - Corruption: Merge operator reported unsuccessful merge or a corrupted + // key has been encountered and not expected (applies only when compiling + // with asserts removed). + // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true). + // + // REQUIRED: The first key in the input is not corrupted. + Status MergeUntil(InternalIterator* iter, + CompactionRangeDelAggregator* range_del_agg, + const SequenceNumber stop_before, const bool at_bottom, + const bool allow_data_in_errors, + const BlobFetcher* blob_fetcher, + const std::string* const full_history_ts_low, + PrefetchBufferCollection* prefetch_buffers, + CompactionIterationStats* c_iter_stats); + + // Filters a merge operand using the compaction filter specified + // in the constructor. Returns the decision that the filter made. + // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the + // optional outputs of compaction filter. + // user_key includes timestamp if user-defined timestamp is enabled. + CompactionFilter::Decision FilterMerge(const Slice& user_key, + const Slice& value_slice); + + // Query the merge result + // These are valid until the next MergeUntil call + // If the merging was successful: + // - keys() contains a single element with the latest sequence number of + // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below. + // - values() contains a single element with the result of merging all the + // operands together + // + // IMPORTANT 1: the key type could change after the MergeUntil call. + // Put/Delete + Merge + ... + Merge => Put + // Merge + ... + Merge => Merge + // + // If the merge operator is not associative, and if a Put/Delete is not found + // then the merging will be unsuccessful. In this case: + // - keys() contains the list of internal keys seen in order of iteration. + // - values() contains the list of values (merges) seen in the same order. + // values() is parallel to keys() so that the first entry in + // keys() is the key associated with the first entry in values() + // and so on. These lists will be the same length. + // All of these pairs will be merges over the same user key. + // See IMPORTANT 2 note below. + // + // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. + // So keys().back() was the first key seen by iterator. + // TODO: Re-style this comment to be like the first one + const std::deque& keys() const { return keys_; } + const std::vector& values() const { + return merge_context_.GetOperands(); + } + uint64_t TotalFilterTime() const { return total_filter_time_; } + bool HasOperator() const { return user_merge_operator_ != nullptr; } + + // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will + // return true and fill *until with the key to which we should skip. + // If true, keys() and values() are empty. + bool FilteredUntil(Slice* skip_until) const { + if (!has_compaction_filter_skip_until_) { + return false; + } + assert(compaction_filter_ != nullptr); + assert(skip_until != nullptr); + assert(compaction_filter_skip_until_.Valid()); + *skip_until = compaction_filter_skip_until_.Encode(); + return true; + } + + private: + Env* env_; + SystemClock* clock_; + const Comparator* user_comparator_; + const MergeOperator* user_merge_operator_; + const CompactionFilter* compaction_filter_; + const std::atomic* shutting_down_; + Logger* logger_; + bool assert_valid_internal_key_; // enforce no internal key corruption? + bool allow_single_operand_; + SequenceNumber latest_snapshot_; + const SnapshotChecker* const snapshot_checker_; + int level_; + + // the scratch area that holds the result of MergeUntil + // valid up to the next MergeUntil call + + // Keeps track of the sequence of keys seen + std::deque keys_; + // Parallel with keys_; stores the operands + mutable MergeContext merge_context_; + + StopWatchNano filter_timer_; + uint64_t total_filter_time_; + Statistics* stats_; + + bool has_compaction_filter_skip_until_ = false; + std::string compaction_filter_value_; + InternalKey compaction_filter_skip_until_; + + bool IsShuttingDown() { + // This is a best-effort facility, so memory_order_relaxed is sufficient. + return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); + } +}; + +// MergeOutputIterator can be used to iterate over the result of a merge. +class MergeOutputIterator { + public: + // The MergeOutputIterator is bound to a MergeHelper instance. + explicit MergeOutputIterator(const MergeHelper* merge_helper); + + // Seeks to the first record in the output. + void SeekToFirst(); + // Advances to the next record in the output. + void Next(); + + Slice key() { return Slice(*it_keys_); } + Slice value() { return Slice(*it_values_); } + bool Valid() { return it_keys_ != merge_helper_->keys().rend(); } + + private: + const MergeHelper* merge_helper_; + std::deque::const_reverse_iterator it_keys_; + std::vector::const_reverse_iterator it_values_; +}; + +} // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3