diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rocksdb/table/merging_iterator.cc | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/table/merging_iterator.cc')
-rw-r--r-- | src/rocksdb/table/merging_iterator.cc | 468 |
1 files changed, 468 insertions, 0 deletions
diff --git a/src/rocksdb/table/merging_iterator.cc b/src/rocksdb/table/merging_iterator.cc new file mode 100644 index 000000000..47fa048f3 --- /dev/null +++ b/src/rocksdb/table/merging_iterator.cc @@ -0,0 +1,468 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/merging_iterator.h" +#include <string> +#include <vector> +#include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" +#include "memory/arena.h" +#include "monitoring/perf_context_imp.h" +#include "rocksdb/comparator.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "table/internal_iterator.h" +#include "table/iter_heap.h" +#include "table/iterator_wrapper.h" +#include "test_util/sync_point.h" +#include "util/autovector.h" +#include "util/heap.h" +#include "util/stop_watch.h" + +namespace ROCKSDB_NAMESPACE { +// Without anonymous namespace here, we fail the warning -Wmissing-prototypes +namespace { +typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap; +typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap; +} // namespace + +const size_t kNumIterReserve = 4; + +class MergingIterator : public InternalIterator { + public: + MergingIterator(const InternalKeyComparator* comparator, + InternalIterator** children, int n, bool is_arena_mode, + bool prefix_seek_mode) + : is_arena_mode_(is_arena_mode), + comparator_(comparator), + current_(nullptr), + direction_(kForward), + minHeap_(comparator_), + prefix_seek_mode_(prefix_seek_mode), + pinned_iters_mgr_(nullptr) { + children_.resize(n); + for (int i = 0; i < n; i++) { + children_[i].Set(children[i]); + } + for (auto& child : children_) { + AddToMinHeapOrCheckStatus(&child); + } + current_ = CurrentForward(); + } + + void considerStatus(Status s) { + if (!s.ok() && status_.ok()) { + status_ = s; + } + } + + virtual void AddIterator(InternalIterator* iter) { + assert(direction_ == kForward); + children_.emplace_back(iter); + if (pinned_iters_mgr_) { + iter->SetPinnedItersMgr(pinned_iters_mgr_); + } + auto new_wrapper = children_.back(); + AddToMinHeapOrCheckStatus(&new_wrapper); + if (new_wrapper.Valid()) { + current_ = CurrentForward(); + } + } + + ~MergingIterator() override { + for (auto& child : children_) { + child.DeleteIter(is_arena_mode_); + } + } + + bool Valid() const override { return current_ != nullptr && status_.ok(); } + + Status status() const override { return status_; } + + void SeekToFirst() override { + ClearHeaps(); + status_ = Status::OK(); + for (auto& child : children_) { + child.SeekToFirst(); + AddToMinHeapOrCheckStatus(&child); + } + direction_ = kForward; + current_ = CurrentForward(); + } + + void SeekToLast() override { + ClearHeaps(); + InitMaxHeap(); + status_ = Status::OK(); + for (auto& child : children_) { + child.SeekToLast(); + AddToMaxHeapOrCheckStatus(&child); + } + direction_ = kReverse; + current_ = CurrentReverse(); + } + + void Seek(const Slice& target) override { + ClearHeaps(); + status_ = Status::OK(); + for (auto& child : children_) { + { + PERF_TIMER_GUARD(seek_child_seek_time); + child.Seek(target); + } + + PERF_COUNTER_ADD(seek_child_seek_count, 1); + { + // Strictly, we timed slightly more than min heap operation, + // but these operations are very cheap. + PERF_TIMER_GUARD(seek_min_heap_time); + AddToMinHeapOrCheckStatus(&child); + } + } + direction_ = kForward; + { + PERF_TIMER_GUARD(seek_min_heap_time); + current_ = CurrentForward(); + } + } + + void SeekForPrev(const Slice& target) override { + ClearHeaps(); + InitMaxHeap(); + status_ = Status::OK(); + + for (auto& child : children_) { + { + PERF_TIMER_GUARD(seek_child_seek_time); + child.SeekForPrev(target); + } + PERF_COUNTER_ADD(seek_child_seek_count, 1); + + { + PERF_TIMER_GUARD(seek_max_heap_time); + AddToMaxHeapOrCheckStatus(&child); + } + } + direction_ = kReverse; + { + PERF_TIMER_GUARD(seek_max_heap_time); + current_ = CurrentReverse(); + } + } + + void Next() override { + assert(Valid()); + + // Ensure that all children are positioned after key(). + // If we are moving in the forward direction, it is already + // true for all of the non-current children since current_ is + // the smallest child and key() == current_->key(). + if (direction_ != kForward) { + SwitchToForward(); + // The loop advanced all non-current children to be > key() so current_ + // should still be strictly the smallest key. + } + + // For the heap modifications below to be correct, current_ must be the + // current top of the heap. + assert(current_ == CurrentForward()); + + // as the current points to the current record. move the iterator forward. + current_->Next(); + if (current_->Valid()) { + // current is still valid after the Next() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + assert(current_->status().ok()); + minHeap_.replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + considerStatus(current_->status()); + minHeap_.pop(); + } + current_ = CurrentForward(); + } + + bool NextAndGetResult(IterateResult* result) override { + Next(); + bool is_valid = Valid(); + if (is_valid) { + result->key = key(); + result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); + } + return is_valid; + } + + void Prev() override { + assert(Valid()); + // Ensure that all children are positioned before key(). + // If we are moving in the reverse direction, it is already + // true for all of the non-current children since current_ is + // the largest child and key() == current_->key(). + if (direction_ != kReverse) { + // Otherwise, retreat the non-current children. We retreat current_ + // just after the if-block. + SwitchToBackward(); + } + + // For the heap modifications below to be correct, current_ must be the + // current top of the heap. + assert(current_ == CurrentReverse()); + + current_->Prev(); + if (current_->Valid()) { + // current is still valid after the Prev() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + assert(current_->status().ok()); + maxHeap_->replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + considerStatus(current_->status()); + maxHeap_->pop(); + } + current_ = CurrentReverse(); + } + + Slice key() const override { + assert(Valid()); + return current_->key(); + } + + Slice value() const override { + assert(Valid()); + return current_->value(); + } + + // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result + // from current child iterator. Potentially as long as one of child iterator + // report out of bound is not possible, we know current key is within bound. + + bool MayBeOutOfLowerBound() override { + assert(Valid()); + return current_->MayBeOutOfLowerBound(); + } + + bool MayBeOutOfUpperBound() override { + assert(Valid()); + return current_->MayBeOutOfUpperBound(); + } + + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + for (auto& child : children_) { + child.SetPinnedItersMgr(pinned_iters_mgr); + } + } + + bool IsKeyPinned() const override { + assert(Valid()); + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + current_->IsKeyPinned(); + } + + bool IsValuePinned() const override { + assert(Valid()); + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + current_->IsValuePinned(); + } + + private: + // Clears heaps for both directions, used when changing direction or seeking + void ClearHeaps(); + // Ensures that maxHeap_ is initialized when starting to go in the reverse + // direction + void InitMaxHeap(); + + bool is_arena_mode_; + const InternalKeyComparator* comparator_; + autovector<IteratorWrapper, kNumIterReserve> children_; + + // Cached pointer to child iterator with the current key, or nullptr if no + // child iterators are valid. This is the top of minHeap_ or maxHeap_ + // depending on the direction. + IteratorWrapper* current_; + // If any of the children have non-ok status, this is one of them. + Status status_; + // Which direction is the iterator moving? + enum Direction { + kForward, + kReverse + }; + Direction direction_; + MergerMinIterHeap minHeap_; + bool prefix_seek_mode_; + + // Max heap is used for reverse iteration, which is way less common than + // forward. Lazily initialize it to save memory. + std::unique_ptr<MergerMaxIterHeap> maxHeap_; + PinnedIteratorsManager* pinned_iters_mgr_; + + // In forward direction, process a child that is not in the min heap. + // If valid, add to the min heap. Otherwise, check status. + void AddToMinHeapOrCheckStatus(IteratorWrapper*); + + // In backward direction, process a child that is not in the max heap. + // If valid, add to the min heap. Otherwise, check status. + void AddToMaxHeapOrCheckStatus(IteratorWrapper*); + + void SwitchToForward(); + + // Switch the direction from forward to backward without changing the + // position. Iterator should still be valid. + void SwitchToBackward(); + + IteratorWrapper* CurrentForward() const { + assert(direction_ == kForward); + return !minHeap_.empty() ? minHeap_.top() : nullptr; + } + + IteratorWrapper* CurrentReverse() const { + assert(direction_ == kReverse); + assert(maxHeap_); + return !maxHeap_->empty() ? maxHeap_->top() : nullptr; + } +}; + +void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) { + if (child->Valid()) { + assert(child->status().ok()); + minHeap_.push(child); + } else { + considerStatus(child->status()); + } +} + +void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) { + if (child->Valid()) { + assert(child->status().ok()); + maxHeap_->push(child); + } else { + considerStatus(child->status()); + } +} + +void MergingIterator::SwitchToForward() { + // Otherwise, advance the non-current children. We advance current_ + // just after the if-block. + ClearHeaps(); + Slice target = key(); + for (auto& child : children_) { + if (&child != current_) { + child.Seek(target); + if (child.Valid() && comparator_->Equal(target, child.key())) { + assert(child.status().ok()); + child.Next(); + } + } + AddToMinHeapOrCheckStatus(&child); + } + direction_ = kForward; +} + +void MergingIterator::SwitchToBackward() { + ClearHeaps(); + InitMaxHeap(); + Slice target = key(); + for (auto& child : children_) { + if (&child != current_) { + child.SeekForPrev(target); + TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); + if (child.Valid() && comparator_->Equal(target, child.key())) { + assert(child.status().ok()); + child.Prev(); + } + } + AddToMaxHeapOrCheckStatus(&child); + } + direction_ = kReverse; + if (!prefix_seek_mode_) { + // Note that we don't do assert(current_ == CurrentReverse()) here + // because it is possible to have some keys larger than the seek-key + // inserted between Seek() and SeekToLast(), which makes current_ not + // equal to CurrentReverse(). + current_ = CurrentReverse(); + } + assert(current_ == CurrentReverse()); +} + +void MergingIterator::ClearHeaps() { + minHeap_.clear(); + if (maxHeap_) { + maxHeap_->clear(); + } +} + +void MergingIterator::InitMaxHeap() { + if (!maxHeap_) { + maxHeap_.reset(new MergerMaxIterHeap(comparator_)); + } +} + +InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp, + InternalIterator** list, int n, + Arena* arena, bool prefix_seek_mode) { + assert(n >= 0); + if (n == 0) { + return NewEmptyInternalIterator<Slice>(arena); + } else if (n == 1) { + return list[0]; + } else { + if (arena == nullptr) { + return new MergingIterator(cmp, list, n, false, prefix_seek_mode); + } else { + auto mem = arena->AllocateAligned(sizeof(MergingIterator)); + return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode); + } + } +} + +MergeIteratorBuilder::MergeIteratorBuilder( + const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode) + : first_iter(nullptr), use_merging_iter(false), arena(a) { + auto mem = arena->AllocateAligned(sizeof(MergingIterator)); + merge_iter = + new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode); +} + +MergeIteratorBuilder::~MergeIteratorBuilder() { + if (first_iter != nullptr) { + first_iter->~InternalIterator(); + } + if (merge_iter != nullptr) { + merge_iter->~MergingIterator(); + } +} + +void MergeIteratorBuilder::AddIterator(InternalIterator* iter) { + if (!use_merging_iter && first_iter != nullptr) { + merge_iter->AddIterator(first_iter); + use_merging_iter = true; + first_iter = nullptr; + } + if (use_merging_iter) { + merge_iter->AddIterator(iter); + } else { + first_iter = iter; + } +} + +InternalIterator* MergeIteratorBuilder::Finish() { + InternalIterator* ret = nullptr; + if (!use_merging_iter) { + ret = first_iter; + first_iter = nullptr; + } else { + ret = merge_iter; + merge_iter = nullptr; + } + return ret; +} + +} // namespace ROCKSDB_NAMESPACE |