summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/forward_iterator.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db/forward_iterator.cc1062
1 files changed, 1062 insertions, 0 deletions
diff --git a/src/rocksdb/db/forward_iterator.cc b/src/rocksdb/db/forward_iterator.cc
new file mode 100644
index 000000000..3fbc2cf47
--- /dev/null
+++ b/src/rocksdb/db/forward_iterator.cc
@@ -0,0 +1,1062 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+#include "db/forward_iterator.h"
+
+#include <limits>
+#include <string>
+#include <utility>
+
+#include "db/column_family.h"
+#include "db/db_impl/db_impl.h"
+#include "db/db_iter.h"
+#include "db/dbformat.h"
+#include "db/job_context.h"
+#include "db/range_del_aggregator.h"
+#include "db/range_tombstone_fragmenter.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "table/merging_iterator.h"
+#include "test_util/sync_point.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Usage:
+// ForwardLevelIterator iter;
+// iter.SetFileIndex(file_index);
+// iter.Seek(target); // or iter.SeekToFirst();
+// iter.Next()
+class ForwardLevelIterator : public InternalIterator {
+ public:
+ ForwardLevelIterator(
+ const ColumnFamilyData* const cfd, const ReadOptions& read_options,
+ const std::vector<FileMetaData*>& files,
+ const std::shared_ptr<const SliceTransform>& prefix_extractor,
+ bool allow_unprepared_value)
+ : cfd_(cfd),
+ read_options_(read_options),
+ files_(files),
+ valid_(false),
+ file_index_(std::numeric_limits<uint32_t>::max()),
+ file_iter_(nullptr),
+ pinned_iters_mgr_(nullptr),
+ prefix_extractor_(prefix_extractor),
+ allow_unprepared_value_(allow_unprepared_value) {
+ status_.PermitUncheckedError(); // Allow uninitialized status through
+ }
+
+ ~ForwardLevelIterator() override {
+ // Reset current pointer
+ if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+ pinned_iters_mgr_->PinIterator(file_iter_);
+ } else {
+ delete file_iter_;
+ }
+ }
+
+ void SetFileIndex(uint32_t file_index) {
+ assert(file_index < files_.size());
+ status_ = Status::OK();
+ if (file_index != file_index_) {
+ file_index_ = file_index;
+ Reset();
+ }
+ }
+ void Reset() {
+ assert(file_index_ < files_.size());
+
+ // Reset current pointer
+ if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+ pinned_iters_mgr_->PinIterator(file_iter_);
+ } else {
+ delete file_iter_;
+ }
+
+ ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
+ kMaxSequenceNumber /* upper_bound */);
+ file_iter_ = cfd_->table_cache()->NewIterator(
+ read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
+ *files_[file_index_],
+ read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
+ prefix_extractor_, /*table_reader_ptr=*/nullptr,
+ /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
+ /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
+ /*max_file_size_for_l0_meta_pin=*/0,
+ /*smallest_compaction_key=*/nullptr,
+ /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
+ file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
+ valid_ = false;
+ if (!range_del_agg.IsEmpty()) {
+ status_ = Status::NotSupported(
+ "Range tombstones unsupported with ForwardIterator");
+ }
+ }
+ void SeekToLast() override {
+ status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
+ valid_ = false;
+ }
+ void Prev() override {
+ status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
+ valid_ = false;
+ }
+ bool Valid() const override { return valid_; }
+ void SeekToFirst() override {
+ assert(file_iter_ != nullptr);
+ if (!status_.ok()) {
+ assert(!valid_);
+ return;
+ }
+ file_iter_->SeekToFirst();
+ valid_ = file_iter_->Valid();
+ }
+ void Seek(const Slice& internal_key) override {
+ assert(file_iter_ != nullptr);
+
+ // This deviates from the usual convention for InternalIterator::Seek() in
+ // that it doesn't discard pre-existing error status. That's because this
+ // Seek() is only supposed to be called immediately after SetFileIndex()
+ // (which discards pre-existing error status), and SetFileIndex() may set
+ // an error status, which we shouldn't discard.
+ if (!status_.ok()) {
+ assert(!valid_);
+ return;
+ }
+
+ file_iter_->Seek(internal_key);
+ valid_ = file_iter_->Valid();
+ }
+ void SeekForPrev(const Slice& /*internal_key*/) override {
+ status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
+ valid_ = false;
+ }
+ void Next() override {
+ assert(valid_);
+ file_iter_->Next();
+ for (;;) {
+ valid_ = file_iter_->Valid();
+ if (!file_iter_->status().ok()) {
+ assert(!valid_);
+ return;
+ }
+ if (valid_) {
+ return;
+ }
+ if (file_index_ + 1 >= files_.size()) {
+ valid_ = false;
+ return;
+ }
+ SetFileIndex(file_index_ + 1);
+ if (!status_.ok()) {
+ assert(!valid_);
+ return;
+ }
+ file_iter_->SeekToFirst();
+ }
+ }
+ Slice key() const override {
+ assert(valid_);
+ return file_iter_->key();
+ }
+ Slice value() const override {
+ assert(valid_);
+ return file_iter_->value();
+ }
+ Status status() const override {
+ if (!status_.ok()) {
+ return status_;
+ } else if (file_iter_) {
+ return file_iter_->status();
+ }
+ return Status::OK();
+ }
+ bool PrepareValue() override {
+ assert(valid_);
+ if (file_iter_->PrepareValue()) {
+ return true;
+ }
+
+ assert(!file_iter_->Valid());
+ valid_ = false;
+ return false;
+ }
+ bool IsKeyPinned() const override {
+ return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+ file_iter_->IsKeyPinned();
+ }
+ bool IsValuePinned() const override {
+ return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+ file_iter_->IsValuePinned();
+ }
+ void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
+ pinned_iters_mgr_ = pinned_iters_mgr;
+ if (file_iter_) {
+ file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
+ }
+ }
+
+ private:
+ const ColumnFamilyData* const cfd_;
+ const ReadOptions& read_options_;
+ const std::vector<FileMetaData*>& files_;
+
+ bool valid_;
+ uint32_t file_index_;
+ Status status_;
+ InternalIterator* file_iter_;
+ PinnedIteratorsManager* pinned_iters_mgr_;
+ // Kept alive by ForwardIterator::sv_->mutable_cf_options
+ const std::shared_ptr<const SliceTransform>& prefix_extractor_;
+ const bool allow_unprepared_value_;
+};
+
+ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
+ ColumnFamilyData* cfd,
+ SuperVersion* current_sv,
+ bool allow_unprepared_value)
+ : db_(db),
+ read_options_(read_options),
+ cfd_(cfd),
+ prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
+ user_comparator_(cfd->user_comparator()),
+ allow_unprepared_value_(allow_unprepared_value),
+ immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
+ sv_(current_sv),
+ mutable_iter_(nullptr),
+ current_(nullptr),
+ valid_(false),
+ status_(Status::OK()),
+ immutable_status_(Status::OK()),
+ has_iter_trimmed_for_upper_bound_(false),
+ current_over_upper_bound_(false),
+ is_prev_set_(false),
+ is_prev_inclusive_(false),
+ pinned_iters_mgr_(nullptr) {
+ if (sv_) {
+ RebuildIterators(false);
+ }
+
+ // immutable_status_ is a local aggregation of the
+ // status of the immutable Iterators.
+ // We have to PermitUncheckedError in case it is never
+ // used, otherwise it will fail ASSERT_STATUS_CHECKED.
+ immutable_status_.PermitUncheckedError();
+}
+
+ForwardIterator::~ForwardIterator() { Cleanup(true); }
+
+void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
+ bool background_purge_on_iterator_cleanup) {
+ if (sv->Unref()) {
+ // Job id == 0 means that this is not our background process, but rather
+ // user thread
+ JobContext job_context(0);
+ db->mutex_.Lock();
+ sv->Cleanup();
+ db->FindObsoleteFiles(&job_context, false, true);
+ if (background_purge_on_iterator_cleanup) {
+ db->ScheduleBgLogWriterClose(&job_context);
+ db->AddSuperVersionsToFreeQueue(sv);
+ db->SchedulePurge();
+ }
+ db->mutex_.Unlock();
+ if (!background_purge_on_iterator_cleanup) {
+ delete sv;
+ }
+ if (job_context.HaveSomethingToDelete()) {
+ db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
+ }
+ job_context.Clean();
+ }
+}
+
+namespace {
+struct SVCleanupParams {
+ DBImpl* db;
+ SuperVersion* sv;
+ bool background_purge_on_iterator_cleanup;
+};
+} // anonymous namespace
+
+// Used in PinnedIteratorsManager to release pinned SuperVersion
+void ForwardIterator::DeferredSVCleanup(void* arg) {
+ auto d = reinterpret_cast<SVCleanupParams*>(arg);
+ ForwardIterator::SVCleanup(d->db, d->sv,
+ d->background_purge_on_iterator_cleanup);
+ delete d;
+}
+
+void ForwardIterator::SVCleanup() {
+ if (sv_ == nullptr) {
+ return;
+ }
+ bool background_purge =
+ read_options_.background_purge_on_iterator_cleanup ||
+ db_->immutable_db_options().avoid_unnecessary_blocking_io;
+ if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+ // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
+ // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
+ // The slices may point into some memtables owned by sv_, so we need to keep
+ // sv_ referenced until pinned_iters_mgr_ unpins everything.
+ auto p = new SVCleanupParams{db_, sv_, background_purge};
+ pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
+ } else {
+ SVCleanup(db_, sv_, background_purge);
+ }
+}
+
+void ForwardIterator::Cleanup(bool release_sv) {
+ if (mutable_iter_ != nullptr) {
+ DeleteIterator(mutable_iter_, true /* is_arena */);
+ }
+
+ for (auto* m : imm_iters_) {
+ DeleteIterator(m, true /* is_arena */);
+ }
+ imm_iters_.clear();
+
+ for (auto* f : l0_iters_) {
+ DeleteIterator(f);
+ }
+ l0_iters_.clear();
+
+ for (auto* l : level_iters_) {
+ DeleteIterator(l);
+ }
+ level_iters_.clear();
+
+ if (release_sv) {
+ SVCleanup();
+ }
+}
+
+bool ForwardIterator::Valid() const {
+ // See UpdateCurrent().
+ return valid_ ? !current_over_upper_bound_ : false;
+}
+
+void ForwardIterator::SeekToFirst() {
+ if (sv_ == nullptr) {
+ RebuildIterators(true);
+ } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
+ RenewIterators();
+ } else if (immutable_status_.IsIncomplete()) {
+ ResetIncompleteIterators();
+ }
+ SeekInternal(Slice(), true, false);
+}
+
+bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
+ return !(read_options_.iterate_upper_bound == nullptr ||
+ cfd_->internal_comparator().user_comparator()->Compare(
+ ExtractUserKey(internal_key),
+ *read_options_.iterate_upper_bound) < 0);
+}
+
+void ForwardIterator::Seek(const Slice& internal_key) {
+ if (sv_ == nullptr) {
+ RebuildIterators(true);
+ } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
+ RenewIterators();
+ } else if (immutable_status_.IsIncomplete()) {
+ ResetIncompleteIterators();
+ }
+
+ SeekInternal(internal_key, false, false);
+ if (read_options_.async_io) {
+ SeekInternal(internal_key, false, true);
+ }
+}
+
+// In case of async_io, SeekInternal is called twice with seek_after_async_io
+// enabled in second call which only does seeking part to retrieve the blocks.
+void ForwardIterator::SeekInternal(const Slice& internal_key,
+ bool seek_to_first,
+ bool seek_after_async_io) {
+ assert(mutable_iter_);
+ // mutable
+ if (!seek_after_async_io) {
+ seek_to_first ? mutable_iter_->SeekToFirst()
+ : mutable_iter_->Seek(internal_key);
+ }
+
+ // immutable
+ // TODO(ljin): NeedToSeekImmutable has negative impact on performance
+ // if it turns to need to seek immutable often. We probably want to have
+ // an option to turn it off.
+ if (seek_to_first || seek_after_async_io ||
+ NeedToSeekImmutable(internal_key)) {
+ if (!seek_after_async_io) {
+ immutable_status_ = Status::OK();
+ if (has_iter_trimmed_for_upper_bound_ &&
+ (
+ // prev_ is not set yet
+ is_prev_set_ == false ||
+ // We are doing SeekToFirst() and internal_key.size() = 0
+ seek_to_first ||
+ // prev_key_ > internal_key
+ cfd_->internal_comparator().InternalKeyComparator::Compare(
+ prev_key_.GetInternalKey(), internal_key) > 0)) {
+ // Some iterators are trimmed. Need to rebuild.
+ RebuildIterators(true);
+ // Already seeked mutable iter, so seek again
+ seek_to_first ? mutable_iter_->SeekToFirst()
+ : mutable_iter_->Seek(internal_key);
+ }
+ {
+ auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
+ immutable_min_heap_.swap(tmp);
+ }
+ for (size_t i = 0; i < imm_iters_.size(); i++) {
+ auto* m = imm_iters_[i];
+ seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
+ if (!m->status().ok()) {
+ immutable_status_ = m->status();
+ } else if (m->Valid()) {
+ immutable_min_heap_.push(m);
+ }
+ }
+ }
+
+ Slice target_user_key;
+ if (!seek_to_first) {
+ target_user_key = ExtractUserKey(internal_key);
+ }
+ const VersionStorageInfo* vstorage = sv_->current->storage_info();
+ const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
+ for (size_t i = 0; i < l0.size(); ++i) {
+ if (!l0_iters_[i]) {
+ continue;
+ }
+ if (seek_after_async_io) {
+ if (!l0_iters_[i]->status().IsTryAgain()) {
+ continue;
+ }
+ }
+
+ if (seek_to_first) {
+ l0_iters_[i]->SeekToFirst();
+ } else {
+ // If the target key passes over the largest key, we are sure Next()
+ // won't go over this file.
+ if (seek_after_async_io == false &&
+ user_comparator_->Compare(target_user_key,
+ l0[i]->largest.user_key()) > 0) {
+ if (read_options_.iterate_upper_bound != nullptr) {
+ has_iter_trimmed_for_upper_bound_ = true;
+ DeleteIterator(l0_iters_[i]);
+ l0_iters_[i] = nullptr;
+ }
+ continue;
+ }
+ l0_iters_[i]->Seek(internal_key);
+ }
+
+ if (l0_iters_[i]->status().IsTryAgain()) {
+ assert(!seek_after_async_io);
+ continue;
+ } else if (!l0_iters_[i]->status().ok()) {
+ immutable_status_ = l0_iters_[i]->status();
+ } else if (l0_iters_[i]->Valid() &&
+ !IsOverUpperBound(l0_iters_[i]->key())) {
+ immutable_min_heap_.push(l0_iters_[i]);
+ } else {
+ has_iter_trimmed_for_upper_bound_ = true;
+ DeleteIterator(l0_iters_[i]);
+ l0_iters_[i] = nullptr;
+ }
+ }
+
+ for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
+ const std::vector<FileMetaData*>& level_files =
+ vstorage->LevelFiles(level);
+ if (level_files.empty()) {
+ continue;
+ }
+ if (level_iters_[level - 1] == nullptr) {
+ continue;
+ }
+
+ if (seek_after_async_io) {
+ if (!level_iters_[level - 1]->status().IsTryAgain()) {
+ continue;
+ }
+ }
+ uint32_t f_idx = 0;
+ if (!seek_to_first && !seek_after_async_io) {
+ f_idx = FindFileInRange(level_files, internal_key, 0,
+ static_cast<uint32_t>(level_files.size()));
+ }
+
+ // Seek
+ if (seek_after_async_io || f_idx < level_files.size()) {
+ if (!seek_after_async_io) {
+ level_iters_[level - 1]->SetFileIndex(f_idx);
+ }
+ seek_to_first ? level_iters_[level - 1]->SeekToFirst()
+ : level_iters_[level - 1]->Seek(internal_key);
+
+ if (level_iters_[level - 1]->status().IsTryAgain()) {
+ assert(!seek_after_async_io);
+ continue;
+ } else if (!level_iters_[level - 1]->status().ok()) {
+ immutable_status_ = level_iters_[level - 1]->status();
+ } else if (level_iters_[level - 1]->Valid() &&
+ !IsOverUpperBound(level_iters_[level - 1]->key())) {
+ immutable_min_heap_.push(level_iters_[level - 1]);
+ } else {
+ // Nothing in this level is interesting. Remove.
+ has_iter_trimmed_for_upper_bound_ = true;
+ DeleteIterator(level_iters_[level - 1]);
+ level_iters_[level - 1] = nullptr;
+ }
+ }
+ }
+
+ if (seek_to_first) {
+ is_prev_set_ = false;
+ } else {
+ prev_key_.SetInternalKey(internal_key);
+ is_prev_set_ = true;
+ is_prev_inclusive_ = true;
+ }
+
+ TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
+ } else if (current_ && current_ != mutable_iter_) {
+ // current_ is one of immutable iterators, push it back to the heap
+ immutable_min_heap_.push(current_);
+ }
+
+ // For async_io, it should be updated when seek_after_async_io is true (in
+ // second call).
+ if (seek_to_first || !read_options_.async_io || seek_after_async_io) {
+ UpdateCurrent();
+ }
+ TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
+}
+
+void ForwardIterator::Next() {
+ assert(valid_);
+ bool update_prev_key = false;
+
+ if (sv_ == nullptr || sv_->version_number != cfd_->GetSuperVersionNumber()) {
+ std::string current_key = key().ToString();
+ Slice old_key(current_key.data(), current_key.size());
+
+ if (sv_ == nullptr) {
+ RebuildIterators(true);
+ } else {
+ RenewIterators();
+ }
+
+ SeekInternal(old_key, false, false);
+ if (read_options_.async_io) {
+ SeekInternal(old_key, false, true);
+ }
+
+ if (!valid_ || key().compare(old_key) != 0) {
+ return;
+ }
+ } else if (current_ != mutable_iter_) {
+ // It is going to advance immutable iterator
+
+ if (is_prev_set_ && prefix_extractor_) {
+ // advance prev_key_ to current_ only if they share the same prefix
+ update_prev_key =
+ prefix_extractor_->Transform(prev_key_.GetUserKey())
+ .compare(prefix_extractor_->Transform(current_->key())) == 0;
+ } else {
+ update_prev_key = true;
+ }
+
+ if (update_prev_key) {
+ prev_key_.SetInternalKey(current_->key());
+ is_prev_set_ = true;
+ is_prev_inclusive_ = false;
+ }
+ }
+
+ current_->Next();
+ if (current_ != mutable_iter_) {
+ if (!current_->status().ok()) {
+ immutable_status_ = current_->status();
+ } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
+ immutable_min_heap_.push(current_);
+ } else {
+ if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
+ // remove the current iterator
+ DeleteCurrentIter();
+ current_ = nullptr;
+ }
+ if (update_prev_key) {
+ mutable_iter_->Seek(prev_key_.GetInternalKey());
+ }
+ }
+ }
+ UpdateCurrent();
+ TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
+}
+
+Slice ForwardIterator::key() const {
+ assert(valid_);
+ return current_->key();
+}
+
+Slice ForwardIterator::value() const {
+ assert(valid_);
+ return current_->value();
+}
+
+Status ForwardIterator::status() const {
+ if (!status_.ok()) {
+ return status_;
+ } else if (!mutable_iter_->status().ok()) {
+ return mutable_iter_->status();
+ }
+
+ return immutable_status_;
+}
+
+bool ForwardIterator::PrepareValue() {
+ assert(valid_);
+ if (current_->PrepareValue()) {
+ return true;
+ }
+
+ assert(!current_->Valid());
+ assert(!current_->status().ok());
+ assert(current_ != mutable_iter_); // memtable iterator can't fail
+ assert(immutable_status_.ok());
+
+ valid_ = false;
+ immutable_status_ = current_->status();
+ return false;
+}
+
+Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
+ assert(prop != nullptr);
+ if (prop_name == "rocksdb.iterator.super-version-number") {
+ *prop = std::to_string(sv_->version_number);
+ return Status::OK();
+ }
+ return Status::InvalidArgument();
+}
+
+void ForwardIterator::SetPinnedItersMgr(
+ PinnedIteratorsManager* pinned_iters_mgr) {
+ pinned_iters_mgr_ = pinned_iters_mgr;
+ UpdateChildrenPinnedItersMgr();
+}
+
+void ForwardIterator::UpdateChildrenPinnedItersMgr() {
+ // Set PinnedIteratorsManager for mutable memtable iterator.
+ if (mutable_iter_) {
+ mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
+ }
+
+ // Set PinnedIteratorsManager for immutable memtable iterators.
+ for (InternalIterator* child_iter : imm_iters_) {
+ if (child_iter) {
+ child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
+ }
+ }
+
+ // Set PinnedIteratorsManager for L0 files iterators.
+ for (InternalIterator* child_iter : l0_iters_) {
+ if (child_iter) {
+ child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
+ }
+ }
+
+ // Set PinnedIteratorsManager for L1+ levels iterators.
+ for (ForwardLevelIterator* child_iter : level_iters_) {
+ if (child_iter) {
+ child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
+ }
+ }
+}
+
+bool ForwardIterator::IsKeyPinned() const {
+ return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+ current_->IsKeyPinned();
+}
+
+bool ForwardIterator::IsValuePinned() const {
+ return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+ current_->IsValuePinned();
+}
+
+void ForwardIterator::RebuildIterators(bool refresh_sv) {
+ // Clean up
+ Cleanup(refresh_sv);
+ if (refresh_sv) {
+ // New
+ sv_ = cfd_->GetReferencedSuperVersion(db_);
+ }
+ ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
+ kMaxSequenceNumber /* upper_bound */);
+ mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
+ sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
+ if (!read_options_.ignore_range_deletions) {
+ std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
+ sv_->mem->NewRangeTombstoneIterator(
+ read_options_, sv_->current->version_set()->LastSequence(),
+ false /* immutable_memtable */));
+ range_del_agg.AddTombstones(std::move(range_del_iter));
+ // Always return Status::OK().
+ Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
+ &range_del_agg);
+ assert(temp_s.ok());
+ }
+ has_iter_trimmed_for_upper_bound_ = false;
+
+ const auto* vstorage = sv_->current->storage_info();
+ const auto& l0_files = vstorage->LevelFiles(0);
+ l0_iters_.reserve(l0_files.size());
+ for (const auto* l0 : l0_files) {
+ if ((read_options_.iterate_upper_bound != nullptr) &&
+ cfd_->internal_comparator().user_comparator()->Compare(
+ l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
+ // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
+ // will never be interested in files with smallest key above
+ // iterate_upper_bound, since iterate_upper_bound can't be changed.
+ l0_iters_.push_back(nullptr);
+ continue;
+ }
+ l0_iters_.push_back(cfd_->table_cache()->NewIterator(
+ read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
+ read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
+ sv_->mutable_cf_options.prefix_extractor,
+ /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
+ TableReaderCaller::kUserIterator, /*arena=*/nullptr,
+ /*skip_filters=*/false, /*level=*/-1,
+ MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
+ /*smallest_compaction_key=*/nullptr,
+ /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
+ }
+ BuildLevelIterators(vstorage, sv_);
+ current_ = nullptr;
+ is_prev_set_ = false;
+
+ UpdateChildrenPinnedItersMgr();
+ if (!range_del_agg.IsEmpty()) {
+ status_ = Status::NotSupported(
+ "Range tombstones unsupported with ForwardIterator");
+ valid_ = false;
+ }
+}
+
+void ForwardIterator::RenewIterators() {
+ SuperVersion* svnew;
+ assert(sv_);
+ svnew = cfd_->GetReferencedSuperVersion(db_);
+
+ if (mutable_iter_ != nullptr) {
+ DeleteIterator(mutable_iter_, true /* is_arena */);
+ }
+ for (auto* m : imm_iters_) {
+ DeleteIterator(m, true /* is_arena */);
+ }
+ imm_iters_.clear();
+
+ mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
+ svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
+ ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
+ kMaxSequenceNumber /* upper_bound */);
+ if (!read_options_.ignore_range_deletions) {
+ std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
+ svnew->mem->NewRangeTombstoneIterator(
+ read_options_, sv_->current->version_set()->LastSequence(),
+ false /* immutable_memtable */));
+ range_del_agg.AddTombstones(std::move(range_del_iter));
+ // Always return Status::OK().
+ Status temp_s = svnew->imm->AddRangeTombstoneIterators(
+ read_options_, &arena_, &range_del_agg);
+ assert(temp_s.ok());
+ }
+
+ const auto* vstorage = sv_->current->storage_info();
+ const auto& l0_files = vstorage->LevelFiles(0);
+ const auto* vstorage_new = svnew->current->storage_info();
+ const auto& l0_files_new = vstorage_new->LevelFiles(0);
+ size_t iold, inew;
+ bool found;
+ std::vector<InternalIterator*> l0_iters_new;
+ l0_iters_new.reserve(l0_files_new.size());
+
+ for (inew = 0; inew < l0_files_new.size(); inew++) {
+ found = false;
+ for (iold = 0; iold < l0_files.size(); iold++) {
+ if (l0_files[iold] == l0_files_new[inew]) {
+ found = true;
+ break;
+ }
+ }
+ if (found) {
+ if (l0_iters_[iold] == nullptr) {
+ l0_iters_new.push_back(nullptr);
+ TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
+ } else {
+ l0_iters_new.push_back(l0_iters_[iold]);
+ l0_iters_[iold] = nullptr;
+ TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
+ }
+ continue;
+ }
+ l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
+ read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
+ *l0_files_new[inew],
+ read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
+ svnew->mutable_cf_options.prefix_extractor,
+ /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
+ TableReaderCaller::kUserIterator, /*arena=*/nullptr,
+ /*skip_filters=*/false, /*level=*/-1,
+ MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
+ /*smallest_compaction_key=*/nullptr,
+ /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
+ }
+
+ for (auto* f : l0_iters_) {
+ DeleteIterator(f);
+ }
+ l0_iters_.clear();
+ l0_iters_ = l0_iters_new;
+
+ for (auto* l : level_iters_) {
+ DeleteIterator(l);
+ }
+ level_iters_.clear();
+ BuildLevelIterators(vstorage_new, svnew);
+ current_ = nullptr;
+ is_prev_set_ = false;
+ SVCleanup();
+ sv_ = svnew;
+
+ UpdateChildrenPinnedItersMgr();
+ if (!range_del_agg.IsEmpty()) {
+ status_ = Status::NotSupported(
+ "Range tombstones unsupported with ForwardIterator");
+ valid_ = false;
+ }
+}
+
+void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
+ SuperVersion* sv) {
+ level_iters_.reserve(vstorage->num_levels() - 1);
+ for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
+ const auto& level_files = vstorage->LevelFiles(level);
+ if ((level_files.empty()) ||
+ ((read_options_.iterate_upper_bound != nullptr) &&
+ (user_comparator_->Compare(*read_options_.iterate_upper_bound,
+ level_files[0]->smallest.user_key()) <
+ 0))) {
+ level_iters_.push_back(nullptr);
+ if (!level_files.empty()) {
+ has_iter_trimmed_for_upper_bound_ = true;
+ }
+ } else {
+ level_iters_.push_back(new ForwardLevelIterator(
+ cfd_, read_options_, level_files,
+ sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_));
+ }
+ }
+}
+
+void ForwardIterator::ResetIncompleteIterators() {
+ const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
+ for (size_t i = 0; i < l0_iters_.size(); ++i) {
+ assert(i < l0_files.size());
+ if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
+ continue;
+ }
+ DeleteIterator(l0_iters_[i]);
+ l0_iters_[i] = cfd_->table_cache()->NewIterator(
+ read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
+ *l0_files[i], /*range_del_agg=*/nullptr,
+ sv_->mutable_cf_options.prefix_extractor,
+ /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
+ TableReaderCaller::kUserIterator, /*arena=*/nullptr,
+ /*skip_filters=*/false, /*level=*/-1,
+ MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
+ /*smallest_compaction_key=*/nullptr,
+ /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
+ l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
+ }
+
+ for (auto* level_iter : level_iters_) {
+ if (level_iter && level_iter->status().IsIncomplete()) {
+ level_iter->Reset();
+ }
+ }
+
+ current_ = nullptr;
+ is_prev_set_ = false;
+}
+
+void ForwardIterator::UpdateCurrent() {
+ if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
+ current_ = nullptr;
+ } else if (immutable_min_heap_.empty()) {
+ current_ = mutable_iter_;
+ } else if (!mutable_iter_->Valid()) {
+ current_ = immutable_min_heap_.top();
+ immutable_min_heap_.pop();
+ } else {
+ current_ = immutable_min_heap_.top();
+ assert(current_ != nullptr);
+ assert(current_->Valid());
+ int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
+ mutable_iter_->key(), current_->key());
+ assert(cmp != 0);
+ if (cmp > 0) {
+ immutable_min_heap_.pop();
+ } else {
+ current_ = mutable_iter_;
+ }
+ }
+ valid_ = current_ != nullptr && immutable_status_.ok();
+ if (!status_.ok()) {
+ status_ = Status::OK();
+ }
+
+ // Upper bound doesn't apply to the memtable iterator. We want Valid() to
+ // return false when all iterators are over iterate_upper_bound, but can't
+ // just set valid_ to false, as that would effectively disable the tailing
+ // optimization (Seek() would be called on all immutable iterators regardless
+ // of whether the target key is greater than prev_key_).
+ current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
+}
+
+bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
+ // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
+ // such that there are no records with keys within that range in
+ // immutable_min_heap_. Since immutable structures (SST files and immutable
+ // memtables) can't change in this version, we don't need to do a seek if
+ // 'target' belongs to that interval (immutable_min_heap_.top() is already
+ // at the correct position).
+
+ if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
+ return true;
+ }
+ Slice prev_key = prev_key_.GetInternalKey();
+ if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
+ prefix_extractor_->Transform(prev_key)) != 0) {
+ return true;
+ }
+ if (cfd_->internal_comparator().InternalKeyComparator::Compare(
+ prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
+ return true;
+ }
+
+ if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
+ // Nothing to seek on.
+ return false;
+ }
+ if (cfd_->internal_comparator().InternalKeyComparator::Compare(
+ target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
+ : current_->key()) > 0) {
+ return true;
+ }
+ return false;
+}
+
+void ForwardIterator::DeleteCurrentIter() {
+ const VersionStorageInfo* vstorage = sv_->current->storage_info();
+ const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
+ for (size_t i = 0; i < l0.size(); ++i) {
+ if (!l0_iters_[i]) {
+ continue;
+ }
+ if (l0_iters_[i] == current_) {
+ has_iter_trimmed_for_upper_bound_ = true;
+ DeleteIterator(l0_iters_[i]);
+ l0_iters_[i] = nullptr;
+ return;
+ }
+ }
+
+ for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
+ if (level_iters_[level - 1] == nullptr) {
+ continue;
+ }
+ if (level_iters_[level - 1] == current_) {
+ has_iter_trimmed_for_upper_bound_ = true;
+ DeleteIterator(level_iters_[level - 1]);
+ level_iters_[level - 1] = nullptr;
+ }
+ }
+}
+
+bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
+ int* pnum_iters) {
+ bool retval = false;
+ int deleted_iters = 0;
+ int num_iters = 0;
+
+ const VersionStorageInfo* vstorage = sv_->current->storage_info();
+ const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
+ for (size_t i = 0; i < l0.size(); ++i) {
+ if (!l0_iters_[i]) {
+ retval = true;
+ deleted_iters++;
+ } else {
+ num_iters++;
+ }
+ }
+
+ for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
+ if ((level_iters_[level - 1] == nullptr) &&
+ (!vstorage->LevelFiles(level).empty())) {
+ retval = true;
+ deleted_iters++;
+ } else if (!vstorage->LevelFiles(level).empty()) {
+ num_iters++;
+ }
+ }
+ if ((!retval) && num_iters <= 1) {
+ retval = true;
+ }
+ if (pdeleted_iters) {
+ *pdeleted_iters = deleted_iters;
+ }
+ if (pnum_iters) {
+ *pnum_iters = num_iters;
+ }
+ return retval;
+}
+
+uint32_t ForwardIterator::FindFileInRange(
+ const std::vector<FileMetaData*>& files, const Slice& internal_key,
+ uint32_t left, uint32_t right) {
+ auto cmp = [&](const FileMetaData* f, const Slice& k) -> bool {
+ return cfd_->internal_comparator().InternalKeyComparator::Compare(
+ f->largest.Encode(), k) < 0;
+ };
+ const auto& b = files.begin();
+ return static_cast<uint32_t>(
+ std::lower_bound(b + left, b + right, internal_key, cmp) - b);
+}
+
+void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
+ if (iter == nullptr) {
+ return;
+ }
+
+ if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+ pinned_iters_mgr_->PinIterator(iter, is_arena);
+ } else {
+ if (is_arena) {
+ iter->~InternalIterator();
+ } else {
+ delete iter;
+ }
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE