From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/db/arena_wrapped_db_iter.cc | 160 ++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 src/rocksdb/db/arena_wrapped_db_iter.cc (limited to 'src/rocksdb/db/arena_wrapped_db_iter.cc') diff --git a/src/rocksdb/db/arena_wrapped_db_iter.cc b/src/rocksdb/db/arena_wrapped_db_iter.cc new file mode 100644 index 000000000..607403ccc --- /dev/null +++ b/src/rocksdb/db/arena_wrapped_db_iter.cc @@ -0,0 +1,160 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/arena_wrapped_db_iter.h" + +#include "memory/arena.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "table/internal_iterator.h" +#include "table/iterator_wrapper.h" +#include "util/user_comparator_wrapper.h" + +namespace ROCKSDB_NAMESPACE { + +Status ArenaWrappedDBIter::GetProperty(std::string prop_name, + std::string* prop) { + if (prop_name == "rocksdb.iterator.super-version-number") { + // First try to pass the value returned from inner iterator. + if (!db_iter_->GetProperty(prop_name, prop).ok()) { + *prop = std::to_string(sv_number_); + } + return Status::OK(); + } + return db_iter_->GetProperty(prop_name, prop); +} + +void ArenaWrappedDBIter::Init( + Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions, + const MutableCFOptions& mutable_cf_options, const Version* version, + const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, + uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) { + auto mem = arena_.AllocateAligned(sizeof(DBIter)); + db_iter_ = + new (mem) DBIter(env, read_options, ioptions, mutable_cf_options, + ioptions.user_comparator, /* iter */ nullptr, version, + sequence, true, max_sequential_skip_in_iteration, + read_callback, db_impl, cfd, expose_blob_index); + sv_number_ = version_number; + read_options_ = read_options; + allow_refresh_ = allow_refresh; + memtable_range_tombstone_iter_ = nullptr; +} + +Status ArenaWrappedDBIter::Refresh() { + if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { + return Status::NotSupported("Creating renew iterator is not allowed."); + } + assert(db_iter_ != nullptr); + // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the + // correct behavior. Will be corrected automatically when we take a snapshot + // here for the case of WritePreparedTxnDB. + uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); + TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1"); + TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2"); + auto reinit_internal_iter = [&]() { + Env* env = db_iter_->env(); + db_iter_->~DBIter(); + arena_.~Arena(); + new (&arena_) Arena(); + + SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_); + SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); + if (read_callback_) { + read_callback_->Refresh(latest_seq); + } + Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, + sv->current, latest_seq, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_, + allow_refresh_); + + InternalIterator* internal_iter = db_impl_->NewInternalIterator( + read_options_, cfd_, sv, &arena_, latest_seq, + /* allow_unprepared_value */ true, /* db_iter */ this); + SetIterUnderDBIter(internal_iter); + }; + while (true) { + if (sv_number_ != cur_sv_number) { + reinit_internal_iter(); + break; + } else { + SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); + // Refresh range-tombstones in MemTable + if (!read_options_.ignore_range_deletions) { + SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_); + TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr); + auto t = sv->mem->NewRangeTombstoneIterator( + read_options_, latest_seq, false /* immutable_memtable */); + if (!t || t->empty()) { + // If memtable_range_tombstone_iter_ points to a non-empty tombstone + // iterator, then it means sv->mem is not the memtable that + // memtable_range_tombstone_iter_ points to, so SV must have changed + // after the sv_number_ != cur_sv_number check above. We will fall + // back to re-init the InternalIterator, and the tombstone iterator + // will be freed during db_iter destruction there. + if (memtable_range_tombstone_iter_) { + assert(!*memtable_range_tombstone_iter_ || + sv_number_ != cfd_->GetSuperVersionNumber()); + } + delete t; + } else { // current mutable memtable has range tombstones + if (!memtable_range_tombstone_iter_) { + delete t; + db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv); + // The memtable under DBIter did not have range tombstone before + // refresh. + reinit_internal_iter(); + break; + } else { + delete *memtable_range_tombstone_iter_; + *memtable_range_tombstone_iter_ = new TruncatedRangeDelIterator( + std::unique_ptr(t), + &cfd_->internal_comparator(), nullptr, nullptr); + } + } + db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv); + } + // Refresh latest sequence number + db_iter_->set_sequence(latest_seq); + db_iter_->set_valid(false); + // Check again if the latest super version number is changed + uint64_t latest_sv_number = cfd_->GetSuperVersionNumber(); + if (latest_sv_number != cur_sv_number) { + // If the super version number is changed after refreshing, + // fallback to Re-Init the InternalIterator + cur_sv_number = latest_sv_number; + continue; + } + break; + } + } + return Status::OK(); +} + +ArenaWrappedDBIter* NewArenaWrappedDbIterator( + Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions, + const MutableCFOptions& mutable_cf_options, const Version* version, + const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, + uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl, + ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) { + ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); + iter->Init(env, read_options, ioptions, mutable_cf_options, version, sequence, + max_sequential_skip_in_iterations, version_number, read_callback, + db_impl, cfd, expose_blob_index, allow_refresh); + if (db_impl != nullptr && cfd != nullptr && allow_refresh) { + iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index); + } + + return iter; +} + +} // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3