diff options
Diffstat (limited to 'src/rocksdb/db/merge_helper.cc')
-rw-r--r-- | src/rocksdb/db/merge_helper.cc | 583 |
1 files changed, 583 insertions, 0 deletions
diff --git a/src/rocksdb/db/merge_helper.cc b/src/rocksdb/db/merge_helper.cc new file mode 100644 index 000000000..6df841012 --- /dev/null +++ b/src/rocksdb/db/merge_helper.cc @@ -0,0 +1,583 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/merge_helper.h" + +#include <string> + +#include "db/blob/blob_fetcher.h" +#include "db/blob/blob_index.h" +#include "db/blob/prefetch_buffer_collection.h" +#include "db/compaction/compaction_iteration_stats.h" +#include "db/dbformat.h" +#include "db/wide/wide_column_serialization.h" +#include "logging/logging.h" +#include "monitoring/perf_context_imp.h" +#include "monitoring/statistics.h" +#include "port/likely.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/system_clock.h" +#include "table/format.h" +#include "table/internal_iterator.h" + +namespace ROCKSDB_NAMESPACE { + +MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator, + const MergeOperator* user_merge_operator, + const CompactionFilter* compaction_filter, + Logger* logger, bool assert_valid_internal_key, + SequenceNumber latest_snapshot, + const SnapshotChecker* snapshot_checker, int level, + Statistics* stats, + const std::atomic<bool>* shutting_down) + : env_(env), + clock_(env->GetSystemClock().get()), + user_comparator_(user_comparator), + user_merge_operator_(user_merge_operator), + compaction_filter_(compaction_filter), + shutting_down_(shutting_down), + logger_(logger), + assert_valid_internal_key_(assert_valid_internal_key), + allow_single_operand_(false), + latest_snapshot_(latest_snapshot), + snapshot_checker_(snapshot_checker), + level_(level), + keys_(), + filter_timer_(clock_), + total_filter_time_(0U), + stats_(stats) { + assert(user_comparator_ != nullptr); + if (user_merge_operator_) { + allow_single_operand_ = user_merge_operator_->AllowSingleOperand(); + } +} + +Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, const Slice* value, + const std::vector<Slice>& operands, + std::string* result, Logger* logger, + Statistics* statistics, SystemClock* clock, + Slice* result_operand, + bool update_num_ops_stats) { + assert(merge_operator != nullptr); + + if (operands.empty()) { + assert(value != nullptr && result != nullptr); + result->assign(value->data(), value->size()); + return Status::OK(); + } + + if (update_num_ops_stats) { + RecordInHistogram(statistics, READ_NUM_MERGE_OPERANDS, + static_cast<uint64_t>(operands.size())); + } + + bool success = false; + Slice tmp_result_operand(nullptr, 0); + const MergeOperator::MergeOperationInput merge_in(key, value, operands, + logger); + MergeOperator::MergeOperationOutput merge_out(*result, tmp_result_operand); + { + // Setup to time the merge + StopWatchNano timer(clock, statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + + // Do the merge + success = merge_operator->FullMergeV2(merge_in, &merge_out); + + if (tmp_result_operand.data()) { + // FullMergeV2 result is an existing operand + if (result_operand != nullptr) { + *result_operand = tmp_result_operand; + } else { + result->assign(tmp_result_operand.data(), tmp_result_operand.size()); + } + } else if (result_operand) { + *result_operand = Slice(nullptr, 0); + } + + RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, + statistics ? timer.ElapsedNanos() : 0); + } + + if (!success) { + RecordTick(statistics, NUMBER_MERGE_FAILURES); + return Status::Corruption("Error: Could not perform merge."); + } + + return Status::OK(); +} + +Status MergeHelper::TimedFullMergeWithEntity( + const MergeOperator* merge_operator, const Slice& key, Slice base_entity, + const std::vector<Slice>& operands, std::string* result, Logger* logger, + Statistics* statistics, SystemClock* clock, bool update_num_ops_stats) { + WideColumns base_columns; + + { + const Status s = + WideColumnSerialization::Deserialize(base_entity, base_columns); + if (!s.ok()) { + return s; + } + } + + const bool has_default_column = + !base_columns.empty() && base_columns[0].name() == kDefaultWideColumnName; + + Slice value_of_default; + if (has_default_column) { + value_of_default = base_columns[0].value(); + } + + std::string merge_result; + + { + constexpr Slice* result_operand = nullptr; + + const Status s = TimedFullMerge( + merge_operator, key, &value_of_default, operands, &merge_result, logger, + statistics, clock, result_operand, update_num_ops_stats); + if (!s.ok()) { + return s; + } + } + + if (has_default_column) { + base_columns[0].value() = merge_result; + + const Status s = WideColumnSerialization::Serialize(base_columns, *result); + if (!s.ok()) { + return s; + } + } else { + const Status s = + WideColumnSerialization::Serialize(merge_result, base_columns, *result); + if (!s.ok()) { + return s; + } + } + + return Status::OK(); +} + +// PRE: iter points to the first merge type entry +// POST: iter points to the first entry beyond the merge process (or the end) +// keys_, operands_ are updated to reflect the merge result. +// keys_ stores the list of keys encountered while merging. +// operands_ stores the list of merge operands encountered while merging. +// keys_[i] corresponds to operands_[i] for each i. +// +// TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator +// and just pass the StripeRep corresponding to the stripe being merged. +Status MergeHelper::MergeUntil(InternalIterator* iter, + CompactionRangeDelAggregator* range_del_agg, + const SequenceNumber stop_before, + const bool at_bottom, + const bool allow_data_in_errors, + const BlobFetcher* blob_fetcher, + const std::string* const full_history_ts_low, + PrefetchBufferCollection* prefetch_buffers, + CompactionIterationStats* c_iter_stats) { + // Get a copy of the internal key, before it's invalidated by iter->Next() + // Also maintain the list of merge operands seen. + assert(HasOperator()); + keys_.clear(); + merge_context_.Clear(); + has_compaction_filter_skip_until_ = false; + assert(user_merge_operator_); + assert(user_comparator_); + const size_t ts_sz = user_comparator_->timestamp_size(); + if (full_history_ts_low) { + assert(ts_sz > 0); + assert(ts_sz == full_history_ts_low->size()); + } + bool first_key = true; + + // We need to parse the internal key again as the parsed key is + // backed by the internal key! + // Assume no internal key corruption as it has been successfully parsed + // by the caller. + // original_key_is_iter variable is just caching the information: + // original_key_is_iter == (iter->key().ToString() == original_key) + bool original_key_is_iter = true; + std::string original_key = iter->key().ToString(); + // Important: + // orig_ikey is backed by original_key if keys_.empty() + // orig_ikey is backed by keys_.back() if !keys_.empty() + ParsedInternalKey orig_ikey; + + Status s = ParseInternalKey(original_key, &orig_ikey, allow_data_in_errors); + assert(s.ok()); + if (!s.ok()) return s; + + assert(kTypeMerge == orig_ikey.type); + + bool hit_the_next_user_key = false; + int cmp_with_full_history_ts_low = 0; + for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { + if (IsShuttingDown()) { + s = Status::ShutdownInProgress(); + return s; + } + + ParsedInternalKey ikey; + assert(keys_.size() == merge_context_.GetNumOperands()); + + Status pik_status = + ParseInternalKey(iter->key(), &ikey, allow_data_in_errors); + Slice ts; + if (pik_status.ok()) { + ts = ExtractTimestampFromUserKey(ikey.user_key, ts_sz); + if (full_history_ts_low) { + cmp_with_full_history_ts_low = + user_comparator_->CompareTimestamp(ts, *full_history_ts_low); + } + } + if (!pik_status.ok()) { + // stop at corrupted key + if (assert_valid_internal_key_) { + return pik_status; + } + break; + } else if (first_key) { + // If user-defined timestamp is enabled, we expect both user key and + // timestamps are equal, as a sanity check. + assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)); + first_key = false; + } else if (!user_comparator_->EqualWithoutTimestamp(ikey.user_key, + orig_ikey.user_key) || + (ts_sz > 0 && + !user_comparator_->Equal(ikey.user_key, orig_ikey.user_key) && + cmp_with_full_history_ts_low >= 0)) { + // 1) hit a different user key, or + // 2) user-defined timestamp is enabled, and hit a version of user key NOT + // eligible for GC, then stop right here. + hit_the_next_user_key = true; + break; + } else if (stop_before > 0 && ikey.sequence <= stop_before && + LIKELY(snapshot_checker_ == nullptr || + snapshot_checker_->CheckInSnapshot(ikey.sequence, + stop_before) != + SnapshotCheckerResult::kNotInSnapshot)) { + // hit an entry that's possibly visible by the previous snapshot, can't + // touch that + break; + } + + // At this point we are guaranteed that we need to process this key. + + assert(IsValueType(ikey.type)); + if (ikey.type != kTypeMerge) { + // hit a put/delete/single delete + // => merge the put value or a nullptr with operands_ + // => store result in operands_.back() (and update keys_.back()) + // => change the entry type to kTypeValue for keys_.back() + // We are done! Success! + + // If there are no operands, just return the Status::OK(). That will cause + // the compaction iterator to write out the key we're currently at, which + // is the put/delete we just encountered. + if (keys_.empty()) { + return s; + } + + // TODO(noetzli) If the merge operator returns false, we are currently + // (almost) silently dropping the put/delete. That's probably not what we + // want. Also if we're in compaction and it's a put, it would be nice to + // run compaction filter on it. + std::string merge_result; + + if (range_del_agg && + range_del_agg->ShouldDelete( + ikey, RangeDelPositioningMode::kForwardTraversal)) { + s = TimedFullMerge(user_merge_operator_, ikey.user_key, nullptr, + merge_context_.GetOperands(), &merge_result, logger_, + stats_, clock_, + /* result_operand */ nullptr, + /* update_num_ops_stats */ false); + } else if (ikey.type == kTypeValue) { + const Slice val = iter->value(); + + s = TimedFullMerge(user_merge_operator_, ikey.user_key, &val, + merge_context_.GetOperands(), &merge_result, logger_, + stats_, clock_, + /* result_operand */ nullptr, + /* update_num_ops_stats */ false); + } else if (ikey.type == kTypeBlobIndex) { + BlobIndex blob_index; + + s = blob_index.DecodeFrom(iter->value()); + if (!s.ok()) { + return s; + } + + FilePrefetchBuffer* prefetch_buffer = + prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer( + blob_index.file_number()) + : nullptr; + + uint64_t bytes_read = 0; + + assert(blob_fetcher); + + PinnableSlice blob_value; + s = blob_fetcher->FetchBlob(ikey.user_key, blob_index, prefetch_buffer, + &blob_value, &bytes_read); + if (!s.ok()) { + return s; + } + + if (c_iter_stats) { + ++c_iter_stats->num_blobs_read; + c_iter_stats->total_blob_bytes_read += bytes_read; + } + + s = TimedFullMerge(user_merge_operator_, ikey.user_key, &blob_value, + merge_context_.GetOperands(), &merge_result, logger_, + stats_, clock_, + /* result_operand */ nullptr, + /* update_num_ops_stats */ false); + } else if (ikey.type == kTypeWideColumnEntity) { + s = TimedFullMergeWithEntity( + user_merge_operator_, ikey.user_key, iter->value(), + merge_context_.GetOperands(), &merge_result, logger_, stats_, + clock_, /* update_num_ops_stats */ false); + } else { + s = TimedFullMerge(user_merge_operator_, ikey.user_key, nullptr, + merge_context_.GetOperands(), &merge_result, logger_, + stats_, clock_, + /* result_operand */ nullptr, + /* update_num_ops_stats */ false); + } + + // We store the result in keys_.back() and operands_.back() + // if nothing went wrong (i.e.: no operand corruption on disk) + if (s.ok()) { + // The original key encountered + original_key = std::move(keys_.back()); + orig_ikey.type = ikey.type == kTypeWideColumnEntity + ? kTypeWideColumnEntity + : kTypeValue; + UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); + keys_.clear(); + merge_context_.Clear(); + keys_.emplace_front(std::move(original_key)); + merge_context_.PushOperand(merge_result); + } + + // move iter to the next entry + iter->Next(); + return s; + } else { + // hit a merge + // => if there is a compaction filter, apply it. + // => check for range tombstones covering the operand + // => merge the operand into the front of the operands_ list + // if not filtered + // => then continue because we haven't yet seen a Put/Delete. + // + // Keep queuing keys and operands until we either meet a put / delete + // request or later did a partial merge. + + Slice value_slice = iter->value(); + // add an operand to the list if: + // 1) it's included in one of the snapshots. in that case we *must* write + // it out, no matter what compaction filter says + // 2) it's not filtered by a compaction filter + CompactionFilter::Decision filter = + ikey.sequence <= latest_snapshot_ + ? CompactionFilter::Decision::kKeep + : FilterMerge(orig_ikey.user_key, value_slice); + if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil && + range_del_agg != nullptr && + range_del_agg->ShouldDelete( + iter->key(), RangeDelPositioningMode::kForwardTraversal)) { + filter = CompactionFilter::Decision::kRemove; + } + if (filter == CompactionFilter::Decision::kKeep || + filter == CompactionFilter::Decision::kChangeValue) { + if (original_key_is_iter) { + // this is just an optimization that saves us one memcpy + keys_.emplace_front(original_key); + } else { + keys_.emplace_front(iter->key().ToString()); + } + if (keys_.size() == 1) { + // we need to re-anchor the orig_ikey because it was anchored by + // original_key before + pik_status = + ParseInternalKey(keys_.back(), &orig_ikey, allow_data_in_errors); + pik_status.PermitUncheckedError(); + assert(pik_status.ok()); + } + if (filter == CompactionFilter::Decision::kKeep) { + merge_context_.PushOperand( + value_slice, iter->IsValuePinned() /* operand_pinned */); + } else { + assert(filter == CompactionFilter::Decision::kChangeValue); + // Compaction filter asked us to change the operand from value_slice + // to compaction_filter_value_. + merge_context_.PushOperand(compaction_filter_value_, false); + } + } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + // Compaction filter asked us to remove this key altogether + // (not just this operand), along with some keys following it. + keys_.clear(); + merge_context_.Clear(); + has_compaction_filter_skip_until_ = true; + return s; + } + } + } + + if (cmp_with_full_history_ts_low >= 0) { + size_t num_merge_operands = merge_context_.GetNumOperands(); + if (ts_sz && num_merge_operands > 1) { + // We do not merge merge operands with different timestamps if they are + // not eligible for GC. + ROCKS_LOG_ERROR(logger_, "ts_sz=%d, %d merge oprands", + static_cast<int>(ts_sz), + static_cast<int>(num_merge_operands)); + assert(false); + } + } + + if (merge_context_.GetNumOperands() == 0) { + // we filtered out all the merge operands + return s; + } + + // We are sure we have seen this key's entire history if: + // at_bottom == true (this does not necessarily mean it is the bottommost + // layer, but rather that we are confident the key does not appear on any of + // the lower layers, at_bottom == false doesn't mean it does appear, just + // that we can't be sure, see Compaction::IsBottommostLevel for details) + // AND + // we have either encountered another key or end of key history on this + // layer. + // Note that if user-defined timestamp is enabled, we need some extra caution + // here: if full_history_ts_low is nullptr, or it's not null but the key's + // timestamp is greater than or equal to full_history_ts_low, it means this + // key cannot be dropped. We may not have seen the beginning of the key. + // + // When these conditions are true we are able to merge all the keys + // using full merge. + // + // For these cases we are not sure about, we simply miss the opportunity + // to combine the keys. Since VersionSet::SetupOtherInputs() always makes + // sure that all merge-operands on the same level get compacted together, + // this will simply lead to these merge operands moving to the next level. + bool surely_seen_the_beginning = + (hit_the_next_user_key || !iter->Valid()) && at_bottom && + (ts_sz == 0 || cmp_with_full_history_ts_low < 0); + if (surely_seen_the_beginning) { + // do a final merge with nullptr as the existing value and say + // bye to the merge type (it's now converted to a Put) + assert(kTypeMerge == orig_ikey.type); + assert(merge_context_.GetNumOperands() >= 1); + assert(merge_context_.GetNumOperands() == keys_.size()); + std::string merge_result; + s = TimedFullMerge( + user_merge_operator_, orig_ikey.user_key, nullptr, + merge_context_.GetOperands(), &merge_result, logger_, stats_, clock_, + /* result_operand */ nullptr, /* update_num_ops_stats */ false); + if (s.ok()) { + // The original key encountered + // We are certain that keys_ is not empty here (see assertions couple of + // lines before). + original_key = std::move(keys_.back()); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); + keys_.clear(); + merge_context_.Clear(); + keys_.emplace_front(std::move(original_key)); + merge_context_.PushOperand(merge_result); + } + } else { + // We haven't seen the beginning of the key nor a Put/Delete. + // Attempt to use the user's associative merge function to + // merge the stacked merge operands into a single operand. + s = Status::MergeInProgress(); + if (merge_context_.GetNumOperands() >= 2 || + (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) { + bool merge_success = false; + std::string merge_result; + { + StopWatchNano timer(clock_, stats_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = user_merge_operator_->PartialMergeMulti( + orig_ikey.user_key, + std::deque<Slice>(merge_context_.GetOperands().begin(), + merge_context_.GetOperands().end()), + &merge_result, logger_); + RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME, + stats_ ? timer.ElapsedNanosSafe() : 0); + } + if (merge_success) { + // Merging of operands (associative merge) was successful. + // Replace operands with the merge result + merge_context_.Clear(); + merge_context_.PushOperand(merge_result); + keys_.erase(keys_.begin(), keys_.end() - 1); + } + } + } + + return s; +} + +MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper) + : merge_helper_(merge_helper) { + it_keys_ = merge_helper_->keys().rend(); + it_values_ = merge_helper_->values().rend(); +} + +void MergeOutputIterator::SeekToFirst() { + const auto& keys = merge_helper_->keys(); + const auto& values = merge_helper_->values(); + assert(keys.size() == values.size()); + it_keys_ = keys.rbegin(); + it_values_ = values.rbegin(); +} + +void MergeOutputIterator::Next() { + ++it_keys_; + ++it_values_; +} + +CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key, + const Slice& value_slice) { + if (compaction_filter_ == nullptr) { + return CompactionFilter::Decision::kKeep; + } + if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) { + filter_timer_.Start(); + } + compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); + auto ret = compaction_filter_->FilterV2( + level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice, + &compaction_filter_value_, compaction_filter_skip_until_.rep()); + if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) { + if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(), + user_key) <= 0) { + // Invalid skip_until returned from compaction filter. + // Keep the key as per FilterV2 documentation. + ret = CompactionFilter::Decision::kKeep; + } else { + compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, + kValueTypeForSeek); + } + } + if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) { + total_filter_time_ += filter_timer_.ElapsedNanosSafe(); + } + return ret; +} + +} // namespace ROCKSDB_NAMESPACE |