From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../write_batch_with_index_internal.cc | 735 +++++++++++++++++++++ 1 file changed, 735 insertions(+) create mode 100644 src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc (limited to 'src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc') diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc new file mode 100644 index 000000000..3c9205bf7 --- /dev/null +++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -0,0 +1,735 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" + +#include "db/column_family.h" +#include "db/db_impl/db_impl.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/cast_util.h" +#include "util/coding.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family, + Iterator* base_iterator, + WBWIIteratorImpl* delta_iterator, + const Comparator* comparator, + const ReadOptions* read_options) + : forward_(true), + current_at_base_(true), + equal_keys_(false), + status_(Status::OK()), + base_iterator_(base_iterator), + delta_iterator_(delta_iterator), + comparator_(comparator), + iterate_upper_bound_(read_options ? read_options->iterate_upper_bound + : nullptr) { + assert(comparator_); + wbwii_.reset(new WriteBatchWithIndexInternal(column_family)); +} + +bool BaseDeltaIterator::Valid() const { + return status_.ok() ? (current_at_base_ ? BaseValid() : DeltaValid()) : false; +} + +void BaseDeltaIterator::SeekToFirst() { + forward_ = true; + base_iterator_->SeekToFirst(); + delta_iterator_->SeekToFirst(); + UpdateCurrent(); +} + +void BaseDeltaIterator::SeekToLast() { + forward_ = false; + base_iterator_->SeekToLast(); + delta_iterator_->SeekToLast(); + UpdateCurrent(); +} + +void BaseDeltaIterator::Seek(const Slice& k) { + forward_ = true; + base_iterator_->Seek(k); + delta_iterator_->Seek(k); + UpdateCurrent(); +} + +void BaseDeltaIterator::SeekForPrev(const Slice& k) { + forward_ = false; + base_iterator_->SeekForPrev(k); + delta_iterator_->SeekForPrev(k); + UpdateCurrent(); +} + +void BaseDeltaIterator::Next() { + if (!Valid()) { + status_ = Status::NotSupported("Next() on invalid iterator"); + return; + } + + if (!forward_) { + // Need to change direction + // if our direction was backward and we're not equal, we have two states: + // * both iterators are valid: we're already in a good state (current + // shows to smaller) + // * only one iterator is valid: we need to advance that iterator + forward_ = true; + equal_keys_ = false; + if (!BaseValid()) { + assert(DeltaValid()); + base_iterator_->SeekToFirst(); + } else if (!DeltaValid()) { + delta_iterator_->SeekToFirst(); + } else if (current_at_base_) { + // Change delta from larger than base to smaller + AdvanceDelta(); + } else { + // Change base from larger than delta to smaller + AdvanceBase(); + } + if (DeltaValid() && BaseValid()) { + if (0 == comparator_->CompareWithoutTimestamp( + delta_iterator_->Entry().key, /*a_has_ts=*/false, + base_iterator_->key(), /*b_has_ts=*/false)) { + equal_keys_ = true; + } + } + } + Advance(); +} + +void BaseDeltaIterator::Prev() { + if (!Valid()) { + status_ = Status::NotSupported("Prev() on invalid iterator"); + return; + } + + if (forward_) { + // Need to change direction + // if our direction was backward and we're not equal, we have two states: + // * both iterators are valid: we're already in a good state (current + // shows to smaller) + // * only one iterator is valid: we need to advance that iterator + forward_ = false; + equal_keys_ = false; + if (!BaseValid()) { + assert(DeltaValid()); + base_iterator_->SeekToLast(); + } else if (!DeltaValid()) { + delta_iterator_->SeekToLast(); + } else if (current_at_base_) { + // Change delta from less advanced than base to more advanced + AdvanceDelta(); + } else { + // Change base from less advanced than delta to more advanced + AdvanceBase(); + } + if (DeltaValid() && BaseValid()) { + if (0 == comparator_->CompareWithoutTimestamp( + delta_iterator_->Entry().key, /*a_has_ts=*/false, + base_iterator_->key(), /*b_has_ts=*/false)) { + equal_keys_ = true; + } + } + } + + Advance(); +} + +Slice BaseDeltaIterator::key() const { + return current_at_base_ ? base_iterator_->key() + : delta_iterator_->Entry().key; +} + +Slice BaseDeltaIterator::value() const { + if (current_at_base_) { + return base_iterator_->value(); + } else { + WriteEntry delta_entry = delta_iterator_->Entry(); + if (wbwii_->GetNumOperands() == 0) { + return delta_entry.value; + } else if (delta_entry.type == kDeleteRecord || + delta_entry.type == kSingleDeleteRecord) { + status_ = + wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf()); + } else if (delta_entry.type == kPutRecord) { + status_ = wbwii_->MergeKey(delta_entry.key, &delta_entry.value, + merge_result_.GetSelf()); + } else if (delta_entry.type == kMergeRecord) { + if (equal_keys_) { + Slice base_value = base_iterator_->value(); + status_ = wbwii_->MergeKey(delta_entry.key, &base_value, + merge_result_.GetSelf()); + } else { + status_ = + wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf()); + } + } + merge_result_.PinSelf(); + return merge_result_; + } +} + +Status BaseDeltaIterator::status() const { + if (!status_.ok()) { + return status_; + } + if (!base_iterator_->status().ok()) { + return base_iterator_->status(); + } + return delta_iterator_->status(); +} + +void BaseDeltaIterator::Invalidate(Status s) { status_ = s; } + +void BaseDeltaIterator::AssertInvariants() { +#ifndef NDEBUG + bool not_ok = false; + if (!base_iterator_->status().ok()) { + assert(!base_iterator_->Valid()); + not_ok = true; + } + if (!delta_iterator_->status().ok()) { + assert(!delta_iterator_->Valid()); + not_ok = true; + } + if (not_ok) { + assert(!Valid()); + assert(!status().ok()); + return; + } + + if (!Valid()) { + return; + } + if (!BaseValid()) { + assert(!current_at_base_ && delta_iterator_->Valid()); + return; + } + if (!DeltaValid()) { + assert(current_at_base_ && base_iterator_->Valid()); + return; + } + // we don't support those yet + assert(delta_iterator_->Entry().type != kMergeRecord && + delta_iterator_->Entry().type != kLogDataRecord); + int compare = comparator_->CompareWithoutTimestamp( + delta_iterator_->Entry().key, /*a_has_ts=*/false, base_iterator_->key(), + /*b_has_ts=*/false); + if (forward_) { + // current_at_base -> compare < 0 + assert(!current_at_base_ || compare < 0); + // !current_at_base -> compare <= 0 + assert(current_at_base_ && compare >= 0); + } else { + // current_at_base -> compare > 0 + assert(!current_at_base_ || compare > 0); + // !current_at_base -> compare <= 0 + assert(current_at_base_ && compare <= 0); + } + // equal_keys_ <=> compare == 0 + assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); +#endif +} + +void BaseDeltaIterator::Advance() { + if (equal_keys_) { + assert(BaseValid() && DeltaValid()); + AdvanceBase(); + AdvanceDelta(); + } else { + if (current_at_base_) { + assert(BaseValid()); + AdvanceBase(); + } else { + assert(DeltaValid()); + AdvanceDelta(); + } + } + UpdateCurrent(); +} + +void BaseDeltaIterator::AdvanceDelta() { + if (forward_) { + delta_iterator_->NextKey(); + } else { + delta_iterator_->PrevKey(); + } +} +void BaseDeltaIterator::AdvanceBase() { + if (forward_) { + base_iterator_->Next(); + } else { + base_iterator_->Prev(); + } +} + +bool BaseDeltaIterator::BaseValid() const { return base_iterator_->Valid(); } +bool BaseDeltaIterator::DeltaValid() const { return delta_iterator_->Valid(); } +void BaseDeltaIterator::UpdateCurrent() { +// Suppress false positive clang analyzer warnings. +#ifndef __clang_analyzer__ + status_ = Status::OK(); + while (true) { + auto delta_result = WBWIIteratorImpl::kNotFound; + WriteEntry delta_entry; + if (DeltaValid()) { + assert(delta_iterator_->status().ok()); + delta_result = + delta_iterator_->FindLatestUpdate(wbwii_->GetMergeContext()); + delta_entry = delta_iterator_->Entry(); + } else if (!delta_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = false; + return; + } + equal_keys_ = false; + if (!BaseValid()) { + if (!base_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = true; + return; + } + + // Base has finished. + if (!DeltaValid()) { + // Finished + return; + } + if (iterate_upper_bound_) { + if (comparator_->CompareWithoutTimestamp( + delta_entry.key, /*a_has_ts=*/false, *iterate_upper_bound_, + /*b_has_ts=*/false) >= 0) { + // out of upper bound -> finished. + return; + } + } + if (delta_result == WBWIIteratorImpl::kDeleted && + wbwii_->GetNumOperands() == 0) { + AdvanceDelta(); + } else { + current_at_base_ = false; + return; + } + } else if (!DeltaValid()) { + // Delta has finished. + current_at_base_ = true; + return; + } else { + int compare = + (forward_ ? 1 : -1) * comparator_->CompareWithoutTimestamp( + delta_entry.key, /*a_has_ts=*/false, + base_iterator_->key(), /*b_has_ts=*/false); + if (compare <= 0) { // delta bigger or equal + if (compare == 0) { + equal_keys_ = true; + } + if (delta_result != WBWIIteratorImpl::kDeleted || + wbwii_->GetNumOperands() > 0) { + current_at_base_ = false; + return; + } + // Delta is less advanced and is delete. + AdvanceDelta(); + if (equal_keys_) { + AdvanceBase(); + } + } else { + current_at_base_ = true; + return; + } + } + } + + AssertInvariants(); +#endif // __clang_analyzer__ +} + +void WBWIIteratorImpl::AdvanceKey(bool forward) { + if (Valid()) { + Slice key = Entry().key; + do { + if (forward) { + Next(); + } else { + Prev(); + } + } while (MatchesKey(column_family_id_, key)); + } +} + +void WBWIIteratorImpl::NextKey() { AdvanceKey(true); } + +void WBWIIteratorImpl::PrevKey() { + AdvanceKey(false); // Move to the tail of the previous key + if (Valid()) { + AdvanceKey(false); // Move back another key. Now we are at the start of + // the previous key + if (Valid()) { // Still a valid + Next(); // Move forward one onto this key + } else { + SeekToFirst(); // Not valid, move to the start + } + } +} + +WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate( + MergeContext* merge_context) { + if (Valid()) { + Slice key = Entry().key; + return FindLatestUpdate(key, merge_context); + } else { + merge_context->Clear(); // Clear any entries in the MergeContext + return WBWIIteratorImpl::kNotFound; + } +} + +WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate( + const Slice& key, MergeContext* merge_context) { + Result result = WBWIIteratorImpl::kNotFound; + merge_context->Clear(); // Clear any entries in the MergeContext + // TODO(agiardullo): consider adding support for reverse iteration + if (!Valid()) { + return result; + } else if (comparator_->CompareKey(column_family_id_, Entry().key, key) != + 0) { + return result; + } else { + // We want to iterate in the reverse order that the writes were added to the + // batch. Since we don't have a reverse iterator, we must seek past the + // end. We do this by seeking to the next key, and then back one step + NextKey(); + if (Valid()) { + Prev(); + } else { + SeekToLast(); + } + + // We are at the end of the iterator for this key. Search backwards for the + // last Put or Delete, accumulating merges along the way. + while (Valid()) { + const WriteEntry entry = Entry(); + if (comparator_->CompareKey(column_family_id_, entry.key, key) != 0) { + break; // Unexpected error or we've reached a different next key + } + + switch (entry.type) { + case kPutRecord: + return WBWIIteratorImpl::kFound; + case kDeleteRecord: + return WBWIIteratorImpl::kDeleted; + case kSingleDeleteRecord: + return WBWIIteratorImpl::kDeleted; + case kMergeRecord: + result = WBWIIteratorImpl::kMergeInProgress; + merge_context->PushOperand(entry.value); + break; + case kLogDataRecord: + break; // ignore + case kXIDRecord: + break; // ignore + default: + return WBWIIteratorImpl::kError; + } // end switch statement + Prev(); + } // End while Valid() + // At this point, we have been through the whole list and found no Puts or + // Deletes. The iterator points to the previous key. Move the iterator back + // onto this one. + if (Valid()) { + Next(); + } else { + SeekToFirst(); + } + } + return result; +} + +Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, + WriteType* type, Slice* Key, + Slice* value, Slice* blob, + Slice* xid) const { + if (type == nullptr || Key == nullptr || value == nullptr || + blob == nullptr || xid == nullptr) { + return Status::InvalidArgument("Output parameters cannot be null"); + } + + if (data_offset == GetDataSize()) { + // reached end of batch. + return Status::NotFound(); + } + + if (data_offset > GetDataSize()) { + return Status::InvalidArgument("data offset exceed write batch size"); + } + Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); + char tag; + uint32_t column_family; + Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, + blob, xid); + if (!s.ok()) { + return s; + } + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + *type = kPutRecord; + break; + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + *type = kDeleteRecord; + break; + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + *type = kSingleDeleteRecord; + break; + case kTypeColumnFamilyRangeDeletion: + case kTypeRangeDeletion: + *type = kDeleteRangeRecord; + break; + case kTypeColumnFamilyMerge: + case kTypeMerge: + *type = kMergeRecord; + break; + case kTypeLogData: + *type = kLogDataRecord; + break; + case kTypeNoop: + case kTypeBeginPrepareXID: + case kTypeBeginPersistedPrepareXID: + case kTypeBeginUnprepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + *type = kXIDRecord; + break; + default: + return Status::Corruption("unknown WriteBatch tag ", + std::to_string(static_cast(tag))); + } + return Status::OK(); +} + +// If both of `entry1` and `entry2` point to real entry in write batch, we +// compare the entries as following: +// 1. first compare the column family, the one with larger CF will be larger; +// 2. Inside the same CF, we first decode the entry to find the key of the entry +// and the entry with larger key will be larger; +// 3. If two entries are of the same CF and key, the one with larger offset +// will be larger. +// Some times either `entry1` or `entry2` is dummy entry, which is actually +// a search key. In this case, in step 2, we don't go ahead and decode the +// entry but use the value in WriteBatchIndexEntry::search_key. +// One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf. +// This indicate that we are going to seek to the first of the column family. +// Once we see this, this entry will be smaller than all the real entries of +// the column family. +int WriteBatchEntryComparator::operator()( + const WriteBatchIndexEntry* entry1, + const WriteBatchIndexEntry* entry2) const { + if (entry1->column_family > entry2->column_family) { + return 1; + } else if (entry1->column_family < entry2->column_family) { + return -1; + } + + // Deal with special case of seeking to the beginning of a column family + if (entry1->is_min_in_cf()) { + return -1; + } else if (entry2->is_min_in_cf()) { + return 1; + } + + Slice key1, key2; + if (entry1->search_key == nullptr) { + key1 = Slice(write_batch_->Data().data() + entry1->key_offset, + entry1->key_size); + } else { + key1 = *(entry1->search_key); + } + if (entry2->search_key == nullptr) { + key2 = Slice(write_batch_->Data().data() + entry2->key_offset, + entry2->key_size); + } else { + key2 = *(entry2->search_key); + } + + int cmp = CompareKey(entry1->column_family, key1, key2); + if (cmp != 0) { + return cmp; + } else if (entry1->offset > entry2->offset) { + return 1; + } else if (entry1->offset < entry2->offset) { + return -1; + } + return 0; +} + +int WriteBatchEntryComparator::CompareKey(uint32_t column_family, + const Slice& key1, + const Slice& key2) const { + if (column_family < cf_comparators_.size() && + cf_comparators_[column_family] != nullptr) { + return cf_comparators_[column_family]->CompareWithoutTimestamp( + key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false); + } else { + return default_comparator_->CompareWithoutTimestamp( + key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false); + } +} + +const Comparator* WriteBatchEntryComparator::GetComparator( + const ColumnFamilyHandle* column_family) const { + return column_family ? column_family->GetComparator() : default_comparator_; +} + +const Comparator* WriteBatchEntryComparator::GetComparator( + uint32_t column_family) const { + if (column_family < cf_comparators_.size() && + cf_comparators_[column_family]) { + return cf_comparators_[column_family]; + } + return default_comparator_; +} + +WriteEntry WBWIIteratorImpl::Entry() const { + WriteEntry ret; + Slice blob, xid; + const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); + // this is guaranteed with Valid() + assert(iter_entry != nullptr && + iter_entry->column_family == column_family_id_); + auto s = write_batch_->GetEntryFromDataOffset( + iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid); + assert(s.ok()); + assert(ret.type == kPutRecord || ret.type == kDeleteRecord || + ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord || + ret.type == kMergeRecord); + // Make sure entry.key does not include user-defined timestamp. + const Comparator* const ucmp = comparator_->GetComparator(column_family_id_); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz > 0) { + ret.key = StripTimestampFromUserKey(ret.key, ts_sz); + } + return ret; +} + +bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) { + if (Valid()) { + return comparator_->CompareKey(cf_id, key, Entry().key) == 0; + } else { + return false; + } +} + +WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( + ColumnFamilyHandle* column_family) + : db_(nullptr), db_options_(nullptr), column_family_(column_family) {} + +WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( + DB* db, ColumnFamilyHandle* column_family) + : db_(db), db_options_(nullptr), column_family_(column_family) { + if (db_ != nullptr && column_family_ == nullptr) { + column_family_ = db_->DefaultColumnFamily(); + } +} + +WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( + const DBOptions* db_options, ColumnFamilyHandle* column_family) + : db_(nullptr), db_options_(db_options), column_family_(column_family) {} + +Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, + const Slice* value, + const MergeContext& context, + std::string* result) const { + if (column_family_ != nullptr) { + auto cfh = static_cast_with_check(column_family_); + const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get(); + if (merge_operator == nullptr) { + return Status::InvalidArgument( + "Merge_operator must be set for column_family"); + } else if (db_ != nullptr) { + const ImmutableDBOptions& immutable_db_options = + static_cast_with_check(db_->GetRootDB()) + ->immutable_db_options(); + Statistics* statistics = immutable_db_options.statistics.get(); + Logger* logger = immutable_db_options.info_log.get(); + SystemClock* clock = immutable_db_options.clock; + return MergeHelper::TimedFullMerge( + merge_operator, key, value, context.GetOperands(), result, logger, + statistics, clock, /* result_operand */ nullptr, + /* update_num_ops_stats */ false); + } else if (db_options_ != nullptr) { + Statistics* statistics = db_options_->statistics.get(); + Env* env = db_options_->env; + Logger* logger = db_options_->info_log.get(); + SystemClock* clock = env->GetSystemClock().get(); + return MergeHelper::TimedFullMerge( + merge_operator, key, value, context.GetOperands(), result, logger, + statistics, clock, /* result_operand */ nullptr, + /* update_num_ops_stats */ false); + } else { + const auto cf_opts = cfh->cfd()->ioptions(); + return MergeHelper::TimedFullMerge( + merge_operator, key, value, context.GetOperands(), result, + cf_opts->logger, cf_opts->stats, cf_opts->clock, + /* result_operand */ nullptr, /* update_num_ops_stats */ false); + } + } else { + return Status::InvalidArgument("Must provide a column_family"); + } +} + +WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch( + WriteBatchWithIndex* batch, const Slice& key, MergeContext* context, + std::string* value, Status* s) { + *s = Status::OK(); + + std::unique_ptr iter( + static_cast_with_check( + batch->NewIterator(column_family_))); + + // Search the iterator for this key, and updates/merges to it. + iter->Seek(key); + auto result = iter->FindLatestUpdate(key, context); + if (result == WBWIIteratorImpl::kError) { + (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", + std::to_string(iter->Entry().type)); + return result; + } else if (result == WBWIIteratorImpl::kNotFound) { + return result; + } else if (result == WBWIIteratorImpl::Result::kFound) { // PUT + Slice entry_value = iter->Entry().value; + if (context->GetNumOperands() > 0) { + *s = MergeKey(key, &entry_value, *context, value); + if (!s->ok()) { + result = WBWIIteratorImpl::Result::kError; + } + } else { + value->assign(entry_value.data(), entry_value.size()); + } + } else if (result == WBWIIteratorImpl::kDeleted) { + if (context->GetNumOperands() > 0) { + *s = MergeKey(key, nullptr, *context, value); + if (s->ok()) { + result = WBWIIteratorImpl::Result::kFound; + } else { + result = WBWIIteratorImpl::Result::kError; + } + } + } + return result; +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // !ROCKSDB_LITE -- cgit v1.2.3