diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc | 288 |
1 files changed, 288 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc new file mode 100644 index 000000000..8c1222f21 --- /dev/null +++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -0,0 +1,288 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" + +#include "db/column_family.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/coding.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +class Env; +class Logger; +class Statistics; + +Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, + WriteType* type, Slice* Key, + Slice* value, Slice* blob, + Slice* xid) const { + if (type == nullptr || Key == nullptr || value == nullptr || + blob == nullptr || xid == nullptr) { + return Status::InvalidArgument("Output parameters cannot be null"); + } + + if (data_offset == GetDataSize()) { + // reached end of batch. + return Status::NotFound(); + } + + if (data_offset > GetDataSize()) { + return Status::InvalidArgument("data offset exceed write batch size"); + } + Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); + char tag; + uint32_t column_family; + Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, + blob, xid); + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + *type = kPutRecord; + break; + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + *type = kDeleteRecord; + break; + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + *type = kSingleDeleteRecord; + break; + case kTypeColumnFamilyRangeDeletion: + case kTypeRangeDeletion: + *type = kDeleteRangeRecord; + break; + case kTypeColumnFamilyMerge: + case kTypeMerge: + *type = kMergeRecord; + break; + case kTypeLogData: + *type = kLogDataRecord; + break; + case kTypeNoop: + case kTypeBeginPrepareXID: + case kTypeBeginPersistedPrepareXID: + case kTypeBeginUnprepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + *type = kXIDRecord; + break; + default: + return Status::Corruption("unknown WriteBatch tag ", + ToString(static_cast<unsigned int>(tag))); + } + return Status::OK(); +} + +// If both of `entry1` and `entry2` point to real entry in write batch, we +// compare the entries as following: +// 1. first compare the column family, the one with larger CF will be larger; +// 2. Inside the same CF, we first decode the entry to find the key of the entry +// and the entry with larger key will be larger; +// 3. If two entries are of the same CF and offset, the one with larger offset +// will be larger. +// Some times either `entry1` or `entry2` is dummy entry, which is actually +// a search key. In this case, in step 2, we don't go ahead and decode the +// entry but use the value in WriteBatchIndexEntry::search_key. +// One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf. +// This indicate that we are going to seek to the first of the column family. +// Once we see this, this entry will be smaller than all the real entries of +// the column family. +int WriteBatchEntryComparator::operator()( + const WriteBatchIndexEntry* entry1, + const WriteBatchIndexEntry* entry2) const { + if (entry1->column_family > entry2->column_family) { + return 1; + } else if (entry1->column_family < entry2->column_family) { + return -1; + } + + // Deal with special case of seeking to the beginning of a column family + if (entry1->is_min_in_cf()) { + return -1; + } else if (entry2->is_min_in_cf()) { + return 1; + } + + Slice key1, key2; + if (entry1->search_key == nullptr) { + key1 = Slice(write_batch_->Data().data() + entry1->key_offset, + entry1->key_size); + } else { + key1 = *(entry1->search_key); + } + if (entry2->search_key == nullptr) { + key2 = Slice(write_batch_->Data().data() + entry2->key_offset, + entry2->key_size); + } else { + key2 = *(entry2->search_key); + } + + int cmp = CompareKey(entry1->column_family, key1, key2); + if (cmp != 0) { + return cmp; + } else if (entry1->offset > entry2->offset) { + return 1; + } else if (entry1->offset < entry2->offset) { + return -1; + } + return 0; +} + +int WriteBatchEntryComparator::CompareKey(uint32_t column_family, + const Slice& key1, + const Slice& key2) const { + if (column_family < cf_comparators_.size() && + cf_comparators_[column_family] != nullptr) { + return cf_comparators_[column_family]->Compare(key1, key2); + } else { + return default_comparator_->Compare(key1, key2); + } +} + +WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( + const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family, const Slice& key, + MergeContext* merge_context, WriteBatchEntryComparator* cmp, + std::string* value, bool overwrite_key, Status* s) { + uint32_t cf_id = GetColumnFamilyID(column_family); + *s = Status::OK(); + WriteBatchWithIndexInternal::Result result = + WriteBatchWithIndexInternal::Result::kNotFound; + + std::unique_ptr<WBWIIterator> iter = + std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family)); + + // We want to iterate in the reverse order that the writes were added to the + // batch. Since we don't have a reverse iterator, we must seek past the end. + // TODO(agiardullo): consider adding support for reverse iteration + iter->Seek(key); + while (iter->Valid()) { + const WriteEntry entry = iter->Entry(); + if (cmp->CompareKey(cf_id, entry.key, key) != 0) { + break; + } + + iter->Next(); + } + + if (!(*s).ok()) { + return WriteBatchWithIndexInternal::Result::kError; + } + + if (!iter->Valid()) { + // Read past end of results. Reposition on last result. + iter->SeekToLast(); + } else { + iter->Prev(); + } + + Slice entry_value; + while (iter->Valid()) { + const WriteEntry entry = iter->Entry(); + if (cmp->CompareKey(cf_id, entry.key, key) != 0) { + // Unexpected error or we've reached a different next key + break; + } + + switch (entry.type) { + case kPutRecord: { + result = WriteBatchWithIndexInternal::Result::kFound; + entry_value = entry.value; + break; + } + case kMergeRecord: { + result = WriteBatchWithIndexInternal::Result::kMergeInProgress; + merge_context->PushOperand(entry.value); + break; + } + case kDeleteRecord: + case kSingleDeleteRecord: { + result = WriteBatchWithIndexInternal::Result::kDeleted; + break; + } + case kLogDataRecord: + case kXIDRecord: { + // ignore + break; + } + default: { + result = WriteBatchWithIndexInternal::Result::kError; + (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", + ToString(entry.type)); + break; + } + } + if (result == WriteBatchWithIndexInternal::Result::kFound || + result == WriteBatchWithIndexInternal::Result::kDeleted || + result == WriteBatchWithIndexInternal::Result::kError) { + // We can stop iterating once we find a PUT or DELETE + break; + } + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && + overwrite_key == true) { + // Since we've overwritten keys, we do not know what other operations are + // in this batch for this key, so we cannot do a Merge to compute the + // result. Instead, we will simply return MergeInProgress. + break; + } + + iter->Prev(); + } + + if ((*s).ok()) { + if (result == WriteBatchWithIndexInternal::Result::kFound || + result == WriteBatchWithIndexInternal::Result::kDeleted) { + // Found a Put or Delete. Merge if necessary. + if (merge_context->GetNumOperands() > 0) { + const MergeOperator* merge_operator; + + if (column_family != nullptr) { + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + merge_operator = cfh->cfd()->ioptions()->merge_operator; + } else { + *s = Status::InvalidArgument("Must provide a column_family"); + result = WriteBatchWithIndexInternal::Result::kError; + return result; + } + Statistics* statistics = immuable_db_options.statistics.get(); + Env* env = immuable_db_options.env; + Logger* logger = immuable_db_options.info_log.get(); + + if (merge_operator) { + *s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value, + merge_context->GetOperands(), value, + logger, statistics, env); + } else { + *s = Status::InvalidArgument("Options::merge_operator must be set"); + } + if ((*s).ok()) { + result = WriteBatchWithIndexInternal::Result::kFound; + } else { + result = WriteBatchWithIndexInternal::Result::kError; + } + } else { // nothing to merge + if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT + value->assign(entry_value.data(), entry_value.size()); + } + } + } + } + + return result; +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // !ROCKSDB_LITE |