summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/compaction/compaction_iterator.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db/compaction/compaction_iterator.cc1338
1 files changed, 1338 insertions, 0 deletions
diff --git a/src/rocksdb/db/compaction/compaction_iterator.cc b/src/rocksdb/db/compaction/compaction_iterator.cc
new file mode 100644
index 000000000..9f54f7813
--- /dev/null
+++ b/src/rocksdb/db/compaction/compaction_iterator.cc
@@ -0,0 +1,1338 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/compaction/compaction_iterator.h"
+
+#include <iterator>
+#include <limits>
+
+#include "db/blob/blob_fetcher.h"
+#include "db/blob/blob_file_builder.h"
+#include "db/blob/blob_index.h"
+#include "db/blob/prefetch_buffer_collection.h"
+#include "db/snapshot_checker.h"
+#include "logging/logging.h"
+#include "port/likely.h"
+#include "rocksdb/listener.h"
+#include "table/internal_iterator.h"
+#include "test_util/sync_point.h"
+
+namespace ROCKSDB_NAMESPACE {
+CompactionIterator::CompactionIterator(
+ InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
+ SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
+ SequenceNumber earliest_write_conflict_snapshot,
+ SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
+ Env* env, bool report_detailed_time, bool expect_valid_internal_key,
+ CompactionRangeDelAggregator* range_del_agg,
+ BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
+ bool enforce_single_del_contracts,
+ const std::atomic<bool>& manual_compaction_canceled,
+ const Compaction* compaction, const CompactionFilter* compaction_filter,
+ const std::atomic<bool>* shutting_down,
+ const std::shared_ptr<Logger> info_log,
+ const std::string* full_history_ts_low,
+ const SequenceNumber preserve_time_min_seqno,
+ const SequenceNumber preclude_last_level_min_seqno)
+ : CompactionIterator(
+ input, cmp, merge_helper, last_sequence, snapshots,
+ earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
+ report_detailed_time, expect_valid_internal_key, range_del_agg,
+ blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
+ manual_compaction_canceled,
+ std::unique_ptr<CompactionProxy>(
+ compaction ? new RealCompaction(compaction) : nullptr),
+ compaction_filter, shutting_down, info_log, full_history_ts_low,
+ preserve_time_min_seqno, preclude_last_level_min_seqno) {}
+
+CompactionIterator::CompactionIterator(
+ InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
+ SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
+ SequenceNumber earliest_write_conflict_snapshot,
+ SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
+ Env* env, bool report_detailed_time, bool expect_valid_internal_key,
+ CompactionRangeDelAggregator* range_del_agg,
+ BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
+ bool enforce_single_del_contracts,
+ const std::atomic<bool>& manual_compaction_canceled,
+ std::unique_ptr<CompactionProxy> compaction,
+ const CompactionFilter* compaction_filter,
+ const std::atomic<bool>* shutting_down,
+ const std::shared_ptr<Logger> info_log,
+ const std::string* full_history_ts_low,
+ const SequenceNumber preserve_time_min_seqno,
+ const SequenceNumber preclude_last_level_min_seqno)
+ : input_(input, cmp,
+ !compaction || compaction->DoesInputReferenceBlobFiles()),
+ cmp_(cmp),
+ merge_helper_(merge_helper),
+ snapshots_(snapshots),
+ earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
+ job_snapshot_(job_snapshot),
+ snapshot_checker_(snapshot_checker),
+ env_(env),
+ clock_(env_->GetSystemClock().get()),
+ report_detailed_time_(report_detailed_time),
+ expect_valid_internal_key_(expect_valid_internal_key),
+ range_del_agg_(range_del_agg),
+ blob_file_builder_(blob_file_builder),
+ compaction_(std::move(compaction)),
+ compaction_filter_(compaction_filter),
+ shutting_down_(shutting_down),
+ manual_compaction_canceled_(manual_compaction_canceled),
+ bottommost_level_(!compaction_ ? false
+ : compaction_->bottommost_level() &&
+ !compaction_->allow_ingest_behind()),
+ // snapshots_ cannot be nullptr, but we will assert later in the body of
+ // the constructor.
+ visible_at_tip_(snapshots_ ? snapshots_->empty() : false),
+ earliest_snapshot_(!snapshots_ || snapshots_->empty()
+ ? kMaxSequenceNumber
+ : snapshots_->at(0)),
+ info_log_(info_log),
+ allow_data_in_errors_(allow_data_in_errors),
+ enforce_single_del_contracts_(enforce_single_del_contracts),
+ timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
+ full_history_ts_low_(full_history_ts_low),
+ current_user_key_sequence_(0),
+ current_user_key_snapshot_(0),
+ merge_out_iter_(merge_helper_),
+ blob_garbage_collection_cutoff_file_number_(
+ ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
+ blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())),
+ prefetch_buffers_(
+ CreatePrefetchBufferCollectionIfNeeded(compaction_.get())),
+ current_key_committed_(false),
+ cmp_with_history_ts_low_(0),
+ level_(compaction_ == nullptr ? 0 : compaction_->level()),
+ preserve_time_min_seqno_(preserve_time_min_seqno),
+ preclude_last_level_min_seqno_(preclude_last_level_min_seqno) {
+ assert(snapshots_ != nullptr);
+ assert(preserve_time_min_seqno_ <= preclude_last_level_min_seqno_);
+
+ if (compaction_ != nullptr) {
+ level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
+ }
+#ifndef NDEBUG
+ // findEarliestVisibleSnapshot assumes this ordering.
+ for (size_t i = 1; i < snapshots_->size(); ++i) {
+ assert(snapshots_->at(i - 1) < snapshots_->at(i));
+ }
+ assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
+ timestamp_size_ == full_history_ts_low_->size());
+#endif
+ input_.SetPinnedItersMgr(&pinned_iters_mgr_);
+ TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
+}
+
+CompactionIterator::~CompactionIterator() {
+ // input_ Iterator lifetime is longer than pinned_iters_mgr_ lifetime
+ input_.SetPinnedItersMgr(nullptr);
+}
+
+void CompactionIterator::ResetRecordCounts() {
+ iter_stats_.num_record_drop_user = 0;
+ iter_stats_.num_record_drop_hidden = 0;
+ iter_stats_.num_record_drop_obsolete = 0;
+ iter_stats_.num_record_drop_range_del = 0;
+ iter_stats_.num_range_del_drop_obsolete = 0;
+ iter_stats_.num_optimized_del_drop_obsolete = 0;
+}
+
+void CompactionIterator::SeekToFirst() {
+ NextFromInput();
+ PrepareOutput();
+}
+
+void CompactionIterator::Next() {
+ // If there is a merge output, return it before continuing to process the
+ // input.
+ if (merge_out_iter_.Valid()) {
+ merge_out_iter_.Next();
+
+ // Check if we returned all records of the merge output.
+ if (merge_out_iter_.Valid()) {
+ key_ = merge_out_iter_.key();
+ value_ = merge_out_iter_.value();
+ Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
+ // MergeUntil stops when it encounters a corrupt key and does not
+ // include them in the result, so we expect the keys here to be valid.
+ if (!s.ok()) {
+ ROCKS_LOG_FATAL(
+ info_log_, "Invalid ikey %s in compaction. %s",
+ allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
+ s.getState());
+ assert(false);
+ }
+
+ // Keep current_key_ in sync.
+ if (0 == timestamp_size_) {
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ } else {
+ Slice ts = ikey_.GetTimestamp(timestamp_size_);
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type, &ts);
+ }
+ key_ = current_key_.GetInternalKey();
+ ikey_.user_key = current_key_.GetUserKey();
+ validity_info_.SetValid(ValidContext::kMerge1);
+ } else {
+ // We consumed all pinned merge operands, release pinned iterators
+ pinned_iters_mgr_.ReleasePinnedData();
+ // MergeHelper moves the iterator to the first record after the merged
+ // records, so even though we reached the end of the merge output, we do
+ // not want to advance the iterator.
+ NextFromInput();
+ }
+ } else {
+ // Only advance the input iterator if there is no merge output and the
+ // iterator is not already at the next record.
+ if (!at_next_) {
+ AdvanceInputIter();
+ }
+ NextFromInput();
+ }
+
+ if (Valid()) {
+ // Record that we've outputted a record for the current key.
+ has_outputted_key_ = true;
+ }
+
+ PrepareOutput();
+}
+
+bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
+ Slice* skip_until) {
+ // TODO: support compaction filter for wide-column entities
+ if (!compaction_filter_ ||
+ (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) {
+ return true;
+ }
+ bool error = false;
+ // If the user has specified a compaction filter and the sequence
+ // number is greater than any external snapshot, then invoke the
+ // filter. If the return value of the compaction filter is true,
+ // replace the entry with a deletion marker.
+ CompactionFilter::Decision filter = CompactionFilter::Decision::kUndetermined;
+ compaction_filter_value_.clear();
+ compaction_filter_skip_until_.Clear();
+ CompactionFilter::ValueType value_type =
+ ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
+ : CompactionFilter::ValueType::kBlobIndex;
+ // Hack: pass internal key to BlobIndexCompactionFilter since it needs
+ // to get sequence number.
+ assert(compaction_filter_);
+ Slice& filter_key =
+ (ikey_.type == kTypeValue ||
+ !compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
+ ? ikey_.user_key
+ : key_;
+ {
+ StopWatchNano timer(clock_, report_detailed_time_);
+ if (kTypeBlobIndex == ikey_.type) {
+ filter = compaction_filter_->FilterBlobByKey(
+ level_, filter_key, &compaction_filter_value_,
+ compaction_filter_skip_until_.rep());
+ if (CompactionFilter::Decision::kUndetermined == filter &&
+ !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
+ if (compaction_ == nullptr) {
+ status_ =
+ Status::Corruption("Unexpected blob index outside of compaction");
+ validity_info_.Invalidate();
+ return false;
+ }
+
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
+ &value_);
+
+ // For integrated BlobDB impl, CompactionIterator reads blob value.
+ // For Stacked BlobDB impl, the corresponding CompactionFilter's
+ // FilterV2 method should read the blob value.
+ BlobIndex blob_index;
+ Status s = blob_index.DecodeFrom(value_);
+ if (!s.ok()) {
+ status_ = s;
+ validity_info_.Invalidate();
+ return false;
+ }
+
+ FilePrefetchBuffer* prefetch_buffer =
+ prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
+ blob_index.file_number())
+ : nullptr;
+
+ uint64_t bytes_read = 0;
+
+ assert(blob_fetcher_);
+
+ s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index,
+ prefetch_buffer, &blob_value_,
+ &bytes_read);
+ if (!s.ok()) {
+ status_ = s;
+ validity_info_.Invalidate();
+ return false;
+ }
+
+ ++iter_stats_.num_blobs_read;
+ iter_stats_.total_blob_bytes_read += bytes_read;
+
+ value_type = CompactionFilter::ValueType::kValue;
+ }
+ }
+ if (CompactionFilter::Decision::kUndetermined == filter) {
+ filter = compaction_filter_->FilterV2(
+ level_, filter_key, value_type,
+ blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
+ compaction_filter_skip_until_.rep());
+ }
+ iter_stats_.total_filter_time +=
+ env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
+ }
+
+ if (CompactionFilter::Decision::kUndetermined == filter) {
+ // Should not reach here, since FilterV2 should never return kUndetermined.
+ status_ =
+ Status::NotSupported("FilterV2() should never return kUndetermined");
+ validity_info_.Invalidate();
+ return false;
+ }
+
+ if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
+ cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
+ 0) {
+ // Can't skip to a key smaller than the current one.
+ // Keep the key as per FilterV2 documentation.
+ filter = CompactionFilter::Decision::kKeep;
+ }
+
+ if (filter == CompactionFilter::Decision::kRemove) {
+ // convert the current key to a delete; key_ is pointing into
+ // current_key_ at this point, so updating current_key_ updates key()
+ ikey_.type = kTypeDeletion;
+ current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
+ // no value associated with delete
+ value_.clear();
+ iter_stats_.num_record_drop_user++;
+ } else if (filter == CompactionFilter::Decision::kPurge) {
+ // convert the current key to a single delete; key_ is pointing into
+ // current_key_ at this point, so updating current_key_ updates key()
+ ikey_.type = kTypeSingleDeletion;
+ current_key_.UpdateInternalKey(ikey_.sequence, kTypeSingleDeletion);
+ // no value associated with single delete
+ value_.clear();
+ iter_stats_.num_record_drop_user++;
+ } else if (filter == CompactionFilter::Decision::kChangeValue) {
+ if (ikey_.type == kTypeBlobIndex) {
+ // value transfer from blob file to inlined data
+ ikey_.type = kTypeValue;
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ }
+ value_ = compaction_filter_value_;
+ } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
+ *need_skip = true;
+ compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
+ kValueTypeForSeek);
+ *skip_until = compaction_filter_skip_until_.Encode();
+ } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) {
+ // Only the StackableDB-based BlobDB impl's compaction filter should return
+ // kChangeBlobIndex. Decision about rewriting blob and changing blob index
+ // in the integrated BlobDB impl is made in subsequent call to
+ // PrepareOutput() and its callees.
+ if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
+ status_ = Status::NotSupported(
+ "Only stacked BlobDB's internal compaction filter can return "
+ "kChangeBlobIndex.");
+ validity_info_.Invalidate();
+ return false;
+ }
+ if (ikey_.type == kTypeValue) {
+ // value transfer from inlined data to blob file
+ ikey_.type = kTypeBlobIndex;
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ }
+ value_ = compaction_filter_value_;
+ } else if (filter == CompactionFilter::Decision::kIOError) {
+ if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
+ status_ = Status::NotSupported(
+ "CompactionFilter for integrated BlobDB should not return kIOError");
+ validity_info_.Invalidate();
+ return false;
+ }
+ status_ = Status::IOError("Failed to access blob during compaction filter");
+ error = true;
+ }
+ return !error;
+}
+
+void CompactionIterator::NextFromInput() {
+ at_next_ = false;
+ validity_info_.Invalidate();
+
+ while (!Valid() && input_.Valid() && !IsPausingManualCompaction() &&
+ !IsShuttingDown()) {
+ key_ = input_.key();
+ value_ = input_.value();
+ blob_value_.Reset();
+ iter_stats_.num_input_records++;
+
+ Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
+ if (!pik_status.ok()) {
+ iter_stats_.num_input_corrupt_records++;
+
+ // If `expect_valid_internal_key_` is false, return the corrupted key
+ // and let the caller decide what to do with it.
+ if (expect_valid_internal_key_) {
+ status_ = pik_status;
+ return;
+ }
+ key_ = current_key_.SetInternalKey(key_);
+ has_current_user_key_ = false;
+ current_user_key_sequence_ = kMaxSequenceNumber;
+ current_user_key_snapshot_ = 0;
+ validity_info_.SetValid(ValidContext::kParseKeyError);
+ break;
+ }
+ TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
+
+ // Update input statistics
+ if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
+ ikey_.type == kTypeDeletionWithTimestamp) {
+ iter_stats_.num_input_deletion_records++;
+ }
+ iter_stats_.total_input_raw_key_bytes += key_.size();
+ iter_stats_.total_input_raw_value_bytes += value_.size();
+
+ // If need_skip is true, we should seek the input iterator
+ // to internal key skip_until and continue from there.
+ bool need_skip = false;
+ // Points either into compaction_filter_skip_until_ or into
+ // merge_helper_->compaction_filter_skip_until_.
+ Slice skip_until;
+
+ bool user_key_equal_without_ts = false;
+ int cmp_ts = 0;
+ if (has_current_user_key_) {
+ user_key_equal_without_ts =
+ cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_);
+ // if timestamp_size_ > 0, then curr_ts_ has been initialized by a
+ // previous key.
+ cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
+ ExtractTimestampFromUserKey(
+ ikey_.user_key, timestamp_size_),
+ curr_ts_)
+ : 0;
+ }
+
+ // Check whether the user key changed. After this if statement current_key_
+ // is a copy of the current input key (maybe converted to a delete by the
+ // compaction filter). ikey_.user_key is pointing to the copy.
+ if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) {
+ // First occurrence of this user key
+ // Copy key for output
+ key_ = current_key_.SetInternalKey(key_, &ikey_);
+
+ int prev_cmp_with_ts_low =
+ !full_history_ts_low_ ? 0
+ : curr_ts_.empty()
+ ? 0
+ : cmp_->CompareTimestamp(curr_ts_, *full_history_ts_low_);
+
+ // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
+ // in next iteration to compare with the timestamp of next key.
+ UpdateTimestampAndCompareWithFullHistoryLow();
+
+ // If
+ // (1) !has_current_user_key_, OR
+ // (2) timestamp is disabled, OR
+ // (3) all history will be preserved, OR
+ // (4) user key (excluding timestamp) is different from previous key, OR
+ // (5) timestamp is NO older than *full_history_ts_low_, OR
+ // (6) timestamp is the largest one older than full_history_ts_low_,
+ // then current_user_key_ must be treated as a different user key.
+ // This means, if a user key (excluding ts) is the same as the previous
+ // user key, and its ts is older than *full_history_ts_low_, then we
+ // consider this key for GC, e.g. it may be dropped if certain conditions
+ // match.
+ if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
+ !user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0 ||
+ prev_cmp_with_ts_low >= 0) {
+ // Initialize for future comparison for rule (A) and etc.
+ current_user_key_sequence_ = kMaxSequenceNumber;
+ current_user_key_snapshot_ = 0;
+ has_current_user_key_ = true;
+ }
+ current_user_key_ = ikey_.user_key;
+
+ has_outputted_key_ = false;
+
+ last_key_seq_zeroed_ = false;
+
+ current_key_committed_ = KeyCommitted(ikey_.sequence);
+
+ // Apply the compaction filter to the first committed version of the user
+ // key.
+ if (current_key_committed_ &&
+ !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
+ break;
+ }
+ } else {
+ // Update the current key to reflect the new sequence number/type without
+ // copying the user key.
+ // TODO(rven): Compaction filter does not process keys in this path
+ // Need to have the compaction filter process multiple versions
+ // if we have versions on both sides of a snapshot
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ key_ = current_key_.GetInternalKey();
+ ikey_.user_key = current_key_.GetUserKey();
+
+ // Note that newer version of a key is ordered before older versions. If a
+ // newer version of a key is committed, so as the older version. No need
+ // to query snapshot_checker_ in that case.
+ if (UNLIKELY(!current_key_committed_)) {
+ assert(snapshot_checker_ != nullptr);
+ current_key_committed_ = KeyCommitted(ikey_.sequence);
+ // Apply the compaction filter to the first committed version of the
+ // user key.
+ if (current_key_committed_ &&
+ !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
+ break;
+ }
+ }
+ }
+
+ if (UNLIKELY(!current_key_committed_)) {
+ assert(snapshot_checker_ != nullptr);
+ validity_info_.SetValid(ValidContext::kCurrentKeyUncommitted);
+ break;
+ }
+
+ // If there are no snapshots, then this kv affect visibility at tip.
+ // Otherwise, search though all existing snapshots to find the earliest
+ // snapshot that is affected by this kv.
+ SequenceNumber last_sequence = current_user_key_sequence_;
+ current_user_key_sequence_ = ikey_.sequence;
+ SequenceNumber last_snapshot = current_user_key_snapshot_;
+ SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
+ current_user_key_snapshot_ =
+ visible_at_tip_
+ ? earliest_snapshot_
+ : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
+
+ if (need_skip) {
+ // This case is handled below.
+ } else if (clear_and_output_next_key_) {
+ // In the previous iteration we encountered a single delete that we could
+ // not compact out. We will keep this Put, but can drop it's data.
+ // (See Optimization 3, below.)
+ if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
+ ikey_.type != kTypeWideColumnEntity) {
+ ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
+ ikey_.DebugString(allow_data_in_errors_, true).c_str());
+ assert(false);
+ }
+ if (current_user_key_snapshot_ < last_snapshot) {
+ ROCKS_LOG_FATAL(info_log_,
+ "key %s, current_user_key_snapshot_ (%" PRIu64
+ ") < last_snapshot (%" PRIu64 ")",
+ ikey_.DebugString(allow_data_in_errors_, true).c_str(),
+ current_user_key_snapshot_, last_snapshot);
+ assert(false);
+ }
+
+ if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) {
+ ikey_.type = kTypeValue;
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ }
+
+ value_.clear();
+ validity_info_.SetValid(ValidContext::kKeepSDAndClearPut);
+ clear_and_output_next_key_ = false;
+ } else if (ikey_.type == kTypeSingleDeletion) {
+ // We can compact out a SingleDelete if:
+ // 1) We encounter the corresponding PUT -OR- we know that this key
+ // doesn't appear past this output level
+ // =AND=
+ // 2) We've already returned a record in this snapshot -OR-
+ // there are no earlier earliest_write_conflict_snapshot.
+ //
+ // A note about 2) above:
+ // we try to determine whether there is any earlier write conflict
+ // checking snapshot by calling DefinitelyInSnapshot() with seq and
+ // earliest_write_conflict_snapshot as arguments. For write-prepared
+ // and write-unprepared transactions, if earliest_write_conflict_snapshot
+ // is evicted from WritePreparedTxnDB::commit_cache, then
+ // DefinitelyInSnapshot(seq, earliest_write_conflict_snapshot) returns
+ // false, even if the seq is actually visible within
+ // earliest_write_conflict_snapshot. Consequently, CompactionIterator
+ // may try to zero out its sequence number, thus hitting assertion error
+ // in debug mode or cause incorrect DBIter return result.
+ // We observe that earliest_write_conflict_snapshot >= earliest_snapshot,
+ // and the seq zeroing logic depends on
+ // DefinitelyInSnapshot(seq, earliest_snapshot). Therefore, if we cannot
+ // determine whether seq is **definitely** in
+ // earliest_write_conflict_snapshot, then we can additionally check if
+ // seq is definitely in earliest_snapshot. If the latter holds, then the
+ // former holds too.
+ //
+ // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
+ // allow Transactions to do write-conflict checking (if we compacted away
+ // all keys, then we wouldn't know that a write happened in this
+ // snapshot). If there is no earlier snapshot, then we know that there
+ // are no active transactions that need to know about any writes.
+ //
+ // Optimization 3:
+ // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
+ // true, then we must output a SingleDelete. In this case, we will decide
+ // to also output the PUT. While we are compacting less by outputting the
+ // PUT now, hopefully this will lead to better compaction in the future
+ // when Rule 2 is later true (Ie, We are hoping we can later compact out
+ // both the SingleDelete and the Put, while we couldn't if we only
+ // outputted the SingleDelete now).
+ // In this case, we can save space by removing the PUT's value as it will
+ // never be read.
+ //
+ // Deletes and Merges are not supported on the same key that has a
+ // SingleDelete as it is not possible to correctly do any partial
+ // compaction of such a combination of operations. The result of mixing
+ // those operations for a given key is documented as being undefined. So
+ // we can choose how to handle such a combinations of operations. We will
+ // try to compact out as much as we can in these cases.
+ // We will report counts on these anomalous cases.
+ //
+ // Note: If timestamp is enabled, then record will be eligible for
+ // deletion, only if, along with above conditions (Rule 1 and Rule 2)
+ // full_history_ts_low_ is specified and timestamp for that key is less
+ // than *full_history_ts_low_. If it's not eligible for deletion, then we
+ // will output the SingleDelete. For Optimization 3 also, if
+ // full_history_ts_low_ is specified and timestamp for the key is less
+ // than *full_history_ts_low_ then only optimization will be applied.
+
+ // The easiest way to process a SingleDelete during iteration is to peek
+ // ahead at the next key.
+ const bool is_timestamp_eligible_for_gc =
+ (timestamp_size_ == 0 ||
+ (full_history_ts_low_ && cmp_with_history_ts_low_ < 0));
+
+ ParsedInternalKey next_ikey;
+ AdvanceInputIter();
+
+ // Check whether the next key exists, is not corrupt, and is the same key
+ // as the single delete.
+ if (input_.Valid() &&
+ ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
+ .ok() &&
+ cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
+#ifndef NDEBUG
+ const Compaction* c =
+ compaction_ ? compaction_->real_compaction() : nullptr;
+#endif
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:SingleDelete:1",
+ const_cast<Compaction*>(c));
+ if (last_key_seq_zeroed_) {
+ ++iter_stats_.num_record_drop_hidden;
+ ++iter_stats_.num_record_drop_obsolete;
+ assert(bottommost_level_);
+ AdvanceInputIter();
+ } else if (prev_snapshot == 0 ||
+ DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
+ // Check whether the next key belongs to the same snapshot as the
+ // SingleDelete.
+
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
+ if (next_ikey.type == kTypeSingleDeletion) {
+ // We encountered two SingleDeletes for same key in a row. This
+ // could be due to unexpected user input. If write-(un)prepared
+ // transaction is used, this could also be due to releasing an old
+ // snapshot between a Put and its matching SingleDelete.
+ // Skip the first SingleDelete and let the next iteration decide
+ // how to handle the second SingleDelete.
+
+ // First SingleDelete has been skipped since we already called
+ // input_.Next().
+ ++iter_stats_.num_record_drop_obsolete;
+ ++iter_stats_.num_single_del_mismatch;
+ } else if (next_ikey.type == kTypeDeletion) {
+ std::ostringstream oss;
+ oss << "Found SD and type: " << static_cast<int>(next_ikey.type)
+ << " on the same key, violating the contract "
+ "of SingleDelete. Check your application to make sure the "
+ "application does not mix SingleDelete and Delete for "
+ "the same key. If you are using "
+ "write-prepared/write-unprepared transactions, and use "
+ "SingleDelete to delete certain keys, then make sure "
+ "TransactionDBOptions::rollback_deletion_type_callback is "
+ "configured properly. Mixing SD and DEL can lead to "
+ "undefined behaviors";
+ ++iter_stats_.num_record_drop_obsolete;
+ ++iter_stats_.num_single_del_mismatch;
+ if (enforce_single_del_contracts_) {
+ ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
+ validity_info_.Invalidate();
+ status_ = Status::Corruption(oss.str());
+ return;
+ }
+ ROCKS_LOG_WARN(info_log_, "%s", oss.str().c_str());
+ } else if (!is_timestamp_eligible_for_gc) {
+ // We cannot drop the SingleDelete as timestamp is enabled, and
+ // timestamp of this key is greater than or equal to
+ // *full_history_ts_low_. We will output the SingleDelete.
+ validity_info_.SetValid(ValidContext::kKeepTsHistory);
+ } else if (has_outputted_key_ ||
+ DefinitelyInSnapshot(ikey_.sequence,
+ earliest_write_conflict_snapshot_) ||
+ (earliest_snapshot_ < earliest_write_conflict_snapshot_ &&
+ DefinitelyInSnapshot(ikey_.sequence,
+ earliest_snapshot_))) {
+ // Found a matching value, we can drop the single delete and the
+ // value. It is safe to drop both records since we've already
+ // outputted a key in this snapshot, or there is no earlier
+ // snapshot (Rule 2 above).
+
+ // Note: it doesn't matter whether the second key is a Put or if it
+ // is an unexpected Merge or Delete. We will compact it out
+ // either way. We will maintain counts of how many mismatches
+ // happened
+ if (next_ikey.type != kTypeValue &&
+ next_ikey.type != kTypeBlobIndex &&
+ next_ikey.type != kTypeWideColumnEntity) {
+ ++iter_stats_.num_single_del_mismatch;
+ }
+
+ ++iter_stats_.num_record_drop_hidden;
+ ++iter_stats_.num_record_drop_obsolete;
+ // Already called input_.Next() once. Call it a second time to
+ // skip past the second key.
+ AdvanceInputIter();
+ } else {
+ // Found a matching value, but we cannot drop both keys since
+ // there is an earlier snapshot and we need to leave behind a record
+ // to know that a write happened in this snapshot (Rule 2 above).
+ // Clear the value and output the SingleDelete. (The value will be
+ // outputted on the next iteration.)
+
+ // Setting valid_ to true will output the current SingleDelete
+ validity_info_.SetValid(ValidContext::kKeepSDForConflictCheck);
+
+ // Set up the Put to be outputted in the next iteration.
+ // (Optimization 3).
+ clear_and_output_next_key_ = true;
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:KeepSDForWW",
+ /*arg=*/nullptr);
+ }
+ } else {
+ // We hit the next snapshot without hitting a put, so the iterator
+ // returns the single delete.
+ validity_info_.SetValid(ValidContext::kKeepSDForSnapshot);
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:SingleDelete:3",
+ const_cast<Compaction*>(c));
+ }
+ } else {
+ // We are at the end of the input, could not parse the next key, or hit
+ // a different key. The iterator returns the single delete if the key
+ // possibly exists beyond the current output level. We set
+ // has_current_user_key to false so that if the iterator is at the next
+ // key, we do not compare it again against the previous key at the next
+ // iteration. If the next key is corrupt, we return before the
+ // comparison, so the value of has_current_user_key does not matter.
+ has_current_user_key_ = false;
+ if (compaction_ != nullptr &&
+ DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
+ compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
+ &level_ptrs_) &&
+ is_timestamp_eligible_for_gc) {
+ // Key doesn't exist outside of this range.
+ // Can compact out this SingleDelete.
+ ++iter_stats_.num_record_drop_obsolete;
+ ++iter_stats_.num_single_del_fallthru;
+ if (!bottommost_level_) {
+ ++iter_stats_.num_optimized_del_drop_obsolete;
+ }
+ } else if (last_key_seq_zeroed_) {
+ // Skip.
+ ++iter_stats_.num_record_drop_hidden;
+ ++iter_stats_.num_record_drop_obsolete;
+ assert(bottommost_level_);
+ } else {
+ // Output SingleDelete
+ validity_info_.SetValid(ValidContext::kKeepSD);
+ }
+ }
+
+ if (Valid()) {
+ at_next_ = true;
+ }
+ } else if (last_snapshot == current_user_key_snapshot_ ||
+ (last_snapshot > 0 &&
+ last_snapshot < current_user_key_snapshot_)) {
+ // If the earliest snapshot is which this key is visible in
+ // is the same as the visibility of a previous instance of the
+ // same key, then this kv is not visible in any snapshot.
+ // Hidden by an newer entry for same user key
+ //
+ // Note: Dropping this key will not affect TransactionDB write-conflict
+ // checking since there has already been a record returned for this key
+ // in this snapshot.
+ if (last_sequence < current_user_key_sequence_) {
+ ROCKS_LOG_FATAL(info_log_,
+ "key %s, last_sequence (%" PRIu64
+ ") < current_user_key_sequence_ (%" PRIu64 ")",
+ ikey_.DebugString(allow_data_in_errors_, true).c_str(),
+ last_sequence, current_user_key_sequence_);
+ assert(false);
+ }
+
+ ++iter_stats_.num_record_drop_hidden; // rule (A)
+ AdvanceInputIter();
+ } else if (compaction_ != nullptr &&
+ (ikey_.type == kTypeDeletion ||
+ (ikey_.type == kTypeDeletionWithTimestamp &&
+ cmp_with_history_ts_low_ < 0)) &&
+ DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
+ compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
+ &level_ptrs_)) {
+ // TODO(noetzli): This is the only place where we use compaction_
+ // (besides the constructor). We should probably get rid of this
+ // dependency and find a way to do similar filtering during flushes.
+ //
+ // For this user key:
+ // (1) there is no data in higher levels
+ // (2) data in lower levels will have larger sequence numbers
+ // (3) data in layers that are being compacted here and have
+ // smaller sequence numbers will be dropped in the next
+ // few iterations of this loop (by rule (A) above).
+ // Therefore this deletion marker is obsolete and can be dropped.
+ //
+ // Note: Dropping this Delete will not affect TransactionDB
+ // write-conflict checking since it is earlier than any snapshot.
+ //
+ // It seems that we can also drop deletion later than earliest snapshot
+ // given that:
+ // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
+ // (2) No value exist earlier than the deletion.
+ //
+ // Note also that a deletion marker of type kTypeDeletionWithTimestamp
+ // will be treated as a different user key unless the timestamp is older
+ // than *full_history_ts_low_.
+ ++iter_stats_.num_record_drop_obsolete;
+ if (!bottommost_level_) {
+ ++iter_stats_.num_optimized_del_drop_obsolete;
+ }
+ AdvanceInputIter();
+ } else if ((ikey_.type == kTypeDeletion ||
+ (ikey_.type == kTypeDeletionWithTimestamp &&
+ cmp_with_history_ts_low_ < 0)) &&
+ bottommost_level_) {
+ // Handle the case where we have a delete key at the bottom most level
+ // We can skip outputting the key iff there are no subsequent puts for
+ // this key
+ assert(!compaction_ || compaction_->KeyNotExistsBeyondOutputLevel(
+ ikey_.user_key, &level_ptrs_));
+ ParsedInternalKey next_ikey;
+ AdvanceInputIter();
+#ifndef NDEBUG
+ const Compaction* c =
+ compaction_ ? compaction_->real_compaction() : nullptr;
+#endif
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::NextFromInput:BottommostDelete:1",
+ const_cast<Compaction*>(c));
+ // Skip over all versions of this key that happen to occur in the same
+ // snapshot range as the delete.
+ //
+ // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
+ // considered to have a different user key unless the timestamp is older
+ // than *full_history_ts_low_.
+ while (!IsPausingManualCompaction() && !IsShuttingDown() &&
+ input_.Valid() &&
+ (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
+ .ok()) &&
+ cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
+ (prev_snapshot == 0 ||
+ DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
+ AdvanceInputIter();
+ }
+ // If you find you still need to output a row with this key, we need to
+ // output the delete too
+ if (input_.Valid() &&
+ (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
+ .ok()) &&
+ cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
+ validity_info_.SetValid(ValidContext::kKeepDel);
+ at_next_ = true;
+ }
+ } else if (ikey_.type == kTypeMerge) {
+ if (!merge_helper_->HasOperator()) {
+ status_ = Status::InvalidArgument(
+ "merge_operator is not properly initialized.");
+ return;
+ }
+
+ pinned_iters_mgr_.StartPinning();
+
+ // We know the merge type entry is not hidden, otherwise we would
+ // have hit (A)
+ // We encapsulate the merge related state machine in a different
+ // object to minimize change to the existing flow.
+ Status s = merge_helper_->MergeUntil(
+ &input_, range_del_agg_, prev_snapshot, bottommost_level_,
+ allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
+ prefetch_buffers_.get(), &iter_stats_);
+ merge_out_iter_.SeekToFirst();
+
+ if (!s.ok() && !s.IsMergeInProgress()) {
+ status_ = s;
+ return;
+ } else if (merge_out_iter_.Valid()) {
+ // NOTE: key, value, and ikey_ refer to old entries.
+ // These will be correctly set below.
+ key_ = merge_out_iter_.key();
+ value_ = merge_out_iter_.value();
+ pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
+ // MergeUntil stops when it encounters a corrupt key and does not
+ // include them in the result, so we expect the keys here to valid.
+ if (!pik_status.ok()) {
+ ROCKS_LOG_FATAL(
+ info_log_, "Invalid key %s in compaction. %s",
+ allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
+ pik_status.getState());
+ assert(false);
+ }
+ // Keep current_key_ in sync.
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ key_ = current_key_.GetInternalKey();
+ ikey_.user_key = current_key_.GetUserKey();
+ validity_info_.SetValid(ValidContext::kMerge2);
+ } else {
+ // all merge operands were filtered out. reset the user key, since the
+ // batch consumed by the merge operator should not shadow any keys
+ // coming after the merges
+ has_current_user_key_ = false;
+ pinned_iters_mgr_.ReleasePinnedData();
+
+ if (merge_helper_->FilteredUntil(&skip_until)) {
+ need_skip = true;
+ }
+ }
+ } else {
+ // 1. new user key -OR-
+ // 2. different snapshot stripe
+ // If user-defined timestamp is enabled, we consider keys for GC if they
+ // are below history_ts_low_. CompactionRangeDelAggregator::ShouldDelete()
+ // only considers range deletions that are at or below history_ts_low_ and
+ // trim_ts_. We drop keys here that are below history_ts_low_ and are
+ // covered by a range tombstone that is at or below history_ts_low_ and
+ // trim_ts.
+ bool should_delete = false;
+ if (!timestamp_size_ || cmp_with_history_ts_low_ < 0) {
+ should_delete = range_del_agg_->ShouldDelete(
+ key_, RangeDelPositioningMode::kForwardTraversal);
+ }
+ if (should_delete) {
+ ++iter_stats_.num_record_drop_hidden;
+ ++iter_stats_.num_record_drop_range_del;
+ AdvanceInputIter();
+ } else {
+ validity_info_.SetValid(ValidContext::kNewUserKey);
+ }
+ }
+
+ if (need_skip) {
+ SkipUntil(skip_until);
+ }
+ }
+
+ if (!Valid() && IsShuttingDown()) {
+ status_ = Status::ShutdownInProgress();
+ }
+
+ if (IsPausingManualCompaction()) {
+ status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
+ }
+
+ // Propagate corruption status from memtable itereator
+ if (!input_.Valid() && input_.status().IsCorruption()) {
+ status_ = input_.status();
+ }
+}
+
+bool CompactionIterator::ExtractLargeValueIfNeededImpl() {
+ if (!blob_file_builder_) {
+ return false;
+ }
+
+ blob_index_.clear();
+ const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_);
+
+ if (!s.ok()) {
+ status_ = s;
+ validity_info_.Invalidate();
+
+ return false;
+ }
+
+ if (blob_index_.empty()) {
+ return false;
+ }
+
+ value_ = blob_index_;
+
+ return true;
+}
+
+void CompactionIterator::ExtractLargeValueIfNeeded() {
+ assert(ikey_.type == kTypeValue);
+
+ if (!ExtractLargeValueIfNeededImpl()) {
+ return;
+ }
+
+ ikey_.type = kTypeBlobIndex;
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+}
+
+void CompactionIterator::GarbageCollectBlobIfNeeded() {
+ assert(ikey_.type == kTypeBlobIndex);
+
+ if (!compaction_) {
+ return;
+ }
+
+ // GC for integrated BlobDB
+ if (compaction_->enable_blob_garbage_collection()) {
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
+ &value_);
+
+ BlobIndex blob_index;
+
+ {
+ const Status s = blob_index.DecodeFrom(value_);
+
+ if (!s.ok()) {
+ status_ = s;
+ validity_info_.Invalidate();
+
+ return;
+ }
+ }
+
+ if (blob_index.file_number() >=
+ blob_garbage_collection_cutoff_file_number_) {
+ return;
+ }
+
+ FilePrefetchBuffer* prefetch_buffer =
+ prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
+ blob_index.file_number())
+ : nullptr;
+
+ uint64_t bytes_read = 0;
+
+ {
+ assert(blob_fetcher_);
+
+ const Status s = blob_fetcher_->FetchBlob(
+ user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read);
+
+ if (!s.ok()) {
+ status_ = s;
+ validity_info_.Invalidate();
+
+ return;
+ }
+ }
+
+ ++iter_stats_.num_blobs_read;
+ iter_stats_.total_blob_bytes_read += bytes_read;
+
+ ++iter_stats_.num_blobs_relocated;
+ iter_stats_.total_blob_bytes_relocated += blob_index.size();
+
+ value_ = blob_value_;
+
+ if (ExtractLargeValueIfNeededImpl()) {
+ return;
+ }
+
+ ikey_.type = kTypeValue;
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+
+ return;
+ }
+
+ // GC for stacked BlobDB
+ if (compaction_filter_ &&
+ compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
+ const auto blob_decision = compaction_filter_->PrepareBlobOutput(
+ user_key(), value_, &compaction_filter_value_);
+
+ if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
+ status_ =
+ Status::Corruption("Corrupted blob reference encountered during GC");
+ validity_info_.Invalidate();
+
+ return;
+ }
+
+ if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
+ status_ = Status::IOError("Could not relocate blob during GC");
+ validity_info_.Invalidate();
+
+ return;
+ }
+
+ if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) {
+ value_ = compaction_filter_value_;
+
+ return;
+ }
+ }
+}
+
+void CompactionIterator::DecideOutputLevel() {
+ assert(compaction_->SupportsPerKeyPlacement());
+#ifndef NDEBUG
+ // Could be overridden by unittest
+ PerKeyPlacementContext context(level_, ikey_.user_key, value_,
+ ikey_.sequence);
+ TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context",
+ &context);
+ output_to_penultimate_level_ = context.output_to_penultimate_level;
+#else
+ output_to_penultimate_level_ = false;
+#endif // NDEBUG
+
+ // if the key is newer than the cutoff sequence or within the earliest
+ // snapshot, it should output to the penultimate level.
+ if (ikey_.sequence > preclude_last_level_min_seqno_ ||
+ ikey_.sequence > earliest_snapshot_) {
+ output_to_penultimate_level_ = true;
+ }
+
+ if (output_to_penultimate_level_) {
+ // If it's decided to output to the penultimate level, but unsafe to do so,
+ // still output to the last level. For example, moving the data from a lower
+ // level to a higher level outside of the higher-level input key range is
+ // considered unsafe, because the key may conflict with higher-level SSTs
+ // not from this compaction.
+ // TODO: add statistic for declined output_to_penultimate_level
+ bool safe_to_penultimate_level =
+ compaction_->WithinPenultimateLevelOutputRange(ikey_.user_key);
+ if (!safe_to_penultimate_level) {
+ output_to_penultimate_level_ = false;
+ // It could happen when disable/enable `last_level_temperature` while
+ // holding a snapshot. When `last_level_temperature` is not set
+ // (==kUnknown), the data newer than any snapshot is pushed to the last
+ // level, but when the per_key_placement feature is enabled on the fly,
+ // the data later than the snapshot has to be moved to the penultimate
+ // level, which may or may not be safe. So the user needs to make sure all
+ // snapshot is released before enabling `last_level_temperature` feature
+ // We will migrate the feature to `last_level_temperature` and maybe make
+ // it not dynamically changeable.
+ if (ikey_.sequence > earliest_snapshot_) {
+ status_ = Status::Corruption(
+ "Unsafe to store Seq later than snapshot in the last level if "
+ "per_key_placement is enabled");
+ }
+ }
+ }
+}
+
+void CompactionIterator::PrepareOutput() {
+ if (Valid()) {
+ if (ikey_.type == kTypeValue) {
+ ExtractLargeValueIfNeeded();
+ } else if (ikey_.type == kTypeBlobIndex) {
+ GarbageCollectBlobIfNeeded();
+ }
+
+ if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) {
+ DecideOutputLevel();
+ }
+
+ // Zeroing out the sequence number leads to better compression.
+ // If this is the bottommost level (no files in lower levels)
+ // and the earliest snapshot is larger than this seqno
+ // and the userkey differs from the last userkey in compaction
+ // then we can squash the seqno to zero.
+ //
+ // This is safe for TransactionDB write-conflict checking since transactions
+ // only care about sequence number larger than any active snapshots.
+ //
+ // Can we do the same for levels above bottom level as long as
+ // KeyNotExistsBeyondOutputLevel() return true?
+ if (Valid() && compaction_ != nullptr &&
+ !compaction_->allow_ingest_behind() && bottommost_level_ &&
+ DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
+ ikey_.type != kTypeMerge && current_key_committed_ &&
+ !output_to_penultimate_level_ &&
+ ikey_.sequence < preserve_time_min_seqno_) {
+ if (ikey_.type == kTypeDeletion ||
+ (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
+ ROCKS_LOG_FATAL(
+ info_log_,
+ "Unexpected key %s for seq-zero optimization. "
+ "earliest_snapshot %" PRIu64
+ ", earliest_write_conflict_snapshot %" PRIu64
+ " job_snapshot %" PRIu64
+ ". timestamp_size: %d full_history_ts_low_ %s. validity %x",
+ ikey_.DebugString(allow_data_in_errors_, true).c_str(),
+ earliest_snapshot_, earliest_write_conflict_snapshot_,
+ job_snapshot_, static_cast<int>(timestamp_size_),
+ full_history_ts_low_ != nullptr
+ ? Slice(*full_history_ts_low_).ToString(true).c_str()
+ : "null",
+ validity_info_.rep);
+ assert(false);
+ }
+ ikey_.sequence = 0;
+ last_key_seq_zeroed_ = true;
+ TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq",
+ &ikey_);
+ if (!timestamp_size_) {
+ current_key_.UpdateInternalKey(0, ikey_.type);
+ } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
+ // We can also zero out timestamp for better compression.
+ // For the same user key (excluding timestamp), the timestamp-based
+ // history can be collapsed to save some space if the timestamp is
+ // older than *full_history_ts_low_.
+ const std::string kTsMin(timestamp_size_, static_cast<char>(0));
+ const Slice ts_slice = kTsMin;
+ ikey_.SetTimestamp(ts_slice);
+ current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
+ }
+ }
+ }
+}
+
+inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
+ SequenceNumber in, SequenceNumber* prev_snapshot) {
+ assert(snapshots_->size());
+ if (snapshots_->size() == 0) {
+ ROCKS_LOG_FATAL(info_log_,
+ "No snapshot left in findEarliestVisibleSnapshot");
+ }
+ auto snapshots_iter =
+ std::lower_bound(snapshots_->begin(), snapshots_->end(), in);
+ assert(prev_snapshot != nullptr);
+ if (snapshots_iter == snapshots_->begin()) {
+ *prev_snapshot = 0;
+ } else {
+ *prev_snapshot = *std::prev(snapshots_iter);
+ if (*prev_snapshot >= in) {
+ ROCKS_LOG_FATAL(info_log_,
+ "*prev_snapshot (%" PRIu64 ") >= in (%" PRIu64
+ ") in findEarliestVisibleSnapshot",
+ *prev_snapshot, in);
+ assert(false);
+ }
+ }
+ if (snapshot_checker_ == nullptr) {
+ return snapshots_iter != snapshots_->end() ? *snapshots_iter
+ : kMaxSequenceNumber;
+ }
+ bool has_released_snapshot = !released_snapshots_.empty();
+ for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
+ auto cur = *snapshots_iter;
+ if (in > cur) {
+ ROCKS_LOG_FATAL(info_log_,
+ "in (%" PRIu64 ") > cur (%" PRIu64
+ ") in findEarliestVisibleSnapshot",
+ in, cur);
+ assert(false);
+ }
+ // Skip if cur is in released_snapshots.
+ if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
+ continue;
+ }
+ auto res = snapshot_checker_->CheckInSnapshot(in, cur);
+ if (res == SnapshotCheckerResult::kInSnapshot) {
+ return cur;
+ } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
+ released_snapshots_.insert(cur);
+ }
+ *prev_snapshot = cur;
+ }
+ return kMaxSequenceNumber;
+}
+
+uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
+ const CompactionProxy* compaction) {
+ if (!compaction) {
+ return 0;
+ }
+
+ if (!compaction->enable_blob_garbage_collection()) {
+ return 0;
+ }
+
+ const Version* const version = compaction->input_version();
+ assert(version);
+
+ const VersionStorageInfo* const storage_info = version->storage_info();
+ assert(storage_info);
+
+ const auto& blob_files = storage_info->GetBlobFiles();
+
+ const size_t cutoff_index = static_cast<size_t>(
+ compaction->blob_garbage_collection_age_cutoff() * blob_files.size());
+
+ if (cutoff_index >= blob_files.size()) {
+ return std::numeric_limits<uint64_t>::max();
+ }
+
+ const auto& meta = blob_files[cutoff_index];
+ assert(meta);
+
+ return meta->GetBlobFileNumber();
+}
+
+std::unique_ptr<BlobFetcher> CompactionIterator::CreateBlobFetcherIfNeeded(
+ const CompactionProxy* compaction) {
+ if (!compaction) {
+ return nullptr;
+ }
+
+ const Version* const version = compaction->input_version();
+ if (!version) {
+ return nullptr;
+ }
+
+ ReadOptions read_options;
+ read_options.fill_cache = false;
+
+ return std::unique_ptr<BlobFetcher>(new BlobFetcher(version, read_options));
+}
+
+std::unique_ptr<PrefetchBufferCollection>
+CompactionIterator::CreatePrefetchBufferCollectionIfNeeded(
+ const CompactionProxy* compaction) {
+ if (!compaction) {
+ return nullptr;
+ }
+
+ if (!compaction->input_version()) {
+ return nullptr;
+ }
+
+ if (compaction->allow_mmap_reads()) {
+ return nullptr;
+ }
+
+ const uint64_t readahead_size = compaction->blob_compaction_readahead_size();
+ if (!readahead_size) {
+ return nullptr;
+ }
+
+ return std::unique_ptr<PrefetchBufferCollection>(
+ new PrefetchBufferCollection(readahead_size));
+}
+
+} // namespace ROCKSDB_NAMESPACE