summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/write_batch_with_index
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/write_batch_with_index
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/write_batch_with_index')
-rw-r--r--src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc695
-rw-r--r--src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc735
-rw-r--r--src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h344
-rw-r--r--src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc2419
4 files changed, 4193 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
new file mode 100644
index 000000000..408243b3f
--- /dev/null
+++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc
@@ -0,0 +1,695 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "rocksdb/utilities/write_batch_with_index.h"
+
+#include <memory>
+
+#include "db/column_family.h"
+#include "db/db_impl/db_impl.h"
+#include "db/merge_context.h"
+#include "db/merge_helper.h"
+#include "memory/arena.h"
+#include "memtable/skiplist.h"
+#include "options/db_options.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/iterator.h"
+#include "util/cast_util.h"
+#include "util/string_util.h"
+#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
+
+namespace ROCKSDB_NAMESPACE {
+struct WriteBatchWithIndex::Rep {
+ explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
+ size_t max_bytes = 0, bool _overwrite_key = false,
+ size_t protection_bytes_per_key = 0)
+ : write_batch(reserved_bytes, max_bytes, protection_bytes_per_key,
+ index_comparator ? index_comparator->timestamp_size() : 0),
+ comparator(index_comparator, &write_batch),
+ skip_list(comparator, &arena),
+ overwrite_key(_overwrite_key),
+ last_entry_offset(0),
+ last_sub_batch_offset(0),
+ sub_batch_cnt(1) {}
+ ReadableWriteBatch write_batch;
+ WriteBatchEntryComparator comparator;
+ Arena arena;
+ WriteBatchEntrySkipList skip_list;
+ bool overwrite_key;
+ size_t last_entry_offset;
+ // The starting offset of the last sub-batch. A sub-batch starts right before
+ // inserting a key that is a duplicate of a key in the last sub-batch. Zero,
+ // the default, means that no duplicate key is detected so far.
+ size_t last_sub_batch_offset;
+ // Total number of sub-batches in the write batch. Default is 1.
+ size_t sub_batch_cnt;
+
+ // Remember current offset of internal write batch, which is used as
+ // the starting offset of the next record.
+ void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
+
+ // In overwrite mode, find the existing entry for the same key and update it
+ // to point to the current entry.
+ // Return true if the key is found and updated.
+ bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key,
+ WriteType type);
+ bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key,
+ WriteType type);
+
+ // Add the recent entry to the update.
+ // In overwrite mode, if key already exists in the index, update it.
+ void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key,
+ WriteType type);
+ void AddOrUpdateIndex(const Slice& key, WriteType type);
+
+ // Allocate an index entry pointing to the last entry in the write batch and
+ // put it to skip list.
+ void AddNewEntry(uint32_t column_family_id);
+
+ // Clear all updates buffered in this batch.
+ void Clear();
+ void ClearIndex();
+
+ // Rebuild index by reading all records from the batch.
+ // Returns non-ok status on corruption.
+ Status ReBuildIndex();
+};
+
+bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
+ ColumnFamilyHandle* column_family, const Slice& key, WriteType type) {
+ uint32_t cf_id = GetColumnFamilyID(column_family);
+ return UpdateExistingEntryWithCfId(cf_id, key, type);
+}
+
+bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
+ uint32_t column_family_id, const Slice& key, WriteType type) {
+ if (!overwrite_key) {
+ return false;
+ }
+
+ WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch,
+ &comparator);
+ iter.Seek(key);
+ if (!iter.Valid()) {
+ return false;
+ } else if (!iter.MatchesKey(column_family_id, key)) {
+ return false;
+ } else {
+ // Move to the end of this key (NextKey-Prev)
+ iter.NextKey(); // Move to the next key
+ if (iter.Valid()) {
+ iter.Prev(); // Move back one entry
+ } else {
+ iter.SeekToLast();
+ }
+ }
+ WriteBatchIndexEntry* non_const_entry =
+ const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
+ if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
+ last_sub_batch_offset = last_entry_offset;
+ sub_batch_cnt++;
+ }
+ if (type == kMergeRecord) {
+ return false;
+ } else {
+ non_const_entry->offset = last_entry_offset;
+ return true;
+ }
+}
+
+void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
+ ColumnFamilyHandle* column_family, const Slice& key, WriteType type) {
+ if (!UpdateExistingEntry(column_family, key, type)) {
+ uint32_t cf_id = GetColumnFamilyID(column_family);
+ const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
+ if (cf_cmp != nullptr) {
+ comparator.SetComparatorForCF(cf_id, cf_cmp);
+ }
+ AddNewEntry(cf_id);
+ }
+}
+
+void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key,
+ WriteType type) {
+ if (!UpdateExistingEntryWithCfId(0, key, type)) {
+ AddNewEntry(0);
+ }
+}
+
+void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
+ const std::string& wb_data = write_batch.Data();
+ Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
+ wb_data.size() - last_entry_offset);
+ // Extract key
+ Slice key;
+ bool success =
+ ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
+#ifdef NDEBUG
+ (void)success;
+#endif
+ assert(success);
+
+ const Comparator* const ucmp = comparator.GetComparator(column_family_id);
+ size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
+
+ if (ts_sz > 0) {
+ key.remove_suffix(ts_sz);
+ }
+
+ auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
+ auto* index_entry =
+ new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
+ key.data() - wb_data.data(), key.size());
+ skip_list.Insert(index_entry);
+}
+
+void WriteBatchWithIndex::Rep::Clear() {
+ write_batch.Clear();
+ ClearIndex();
+}
+
+void WriteBatchWithIndex::Rep::ClearIndex() {
+ skip_list.~WriteBatchEntrySkipList();
+ arena.~Arena();
+ new (&arena) Arena();
+ new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
+ last_entry_offset = 0;
+ last_sub_batch_offset = 0;
+ sub_batch_cnt = 1;
+}
+
+Status WriteBatchWithIndex::Rep::ReBuildIndex() {
+ Status s;
+
+ ClearIndex();
+
+ if (write_batch.Count() == 0) {
+ // Nothing to re-index
+ return s;
+ }
+
+ size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
+
+ Slice input(write_batch.Data());
+ input.remove_prefix(offset);
+
+ // Loop through all entries in Rep and add each one to the index
+ uint32_t found = 0;
+ while (s.ok() && !input.empty()) {
+ Slice key, value, blob, xid;
+ uint32_t column_family_id = 0; // default
+ char tag = 0;
+
+ // set offset of current entry for call to AddNewEntry()
+ last_entry_offset = input.data() - write_batch.Data().data();
+
+ s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, &value,
+ &blob, &xid);
+ if (!s.ok()) {
+ break;
+ }
+
+ switch (tag) {
+ case kTypeColumnFamilyValue:
+ case kTypeValue:
+ found++;
+ if (!UpdateExistingEntryWithCfId(column_family_id, key, kPutRecord)) {
+ AddNewEntry(column_family_id);
+ }
+ break;
+ case kTypeColumnFamilyDeletion:
+ case kTypeDeletion:
+ found++;
+ if (!UpdateExistingEntryWithCfId(column_family_id, key,
+ kDeleteRecord)) {
+ AddNewEntry(column_family_id);
+ }
+ break;
+ case kTypeColumnFamilySingleDeletion:
+ case kTypeSingleDeletion:
+ found++;
+ if (!UpdateExistingEntryWithCfId(column_family_id, key,
+ kSingleDeleteRecord)) {
+ AddNewEntry(column_family_id);
+ }
+ break;
+ case kTypeColumnFamilyMerge:
+ case kTypeMerge:
+ found++;
+ if (!UpdateExistingEntryWithCfId(column_family_id, key, kMergeRecord)) {
+ AddNewEntry(column_family_id);
+ }
+ break;
+ case kTypeLogData:
+ case kTypeBeginPrepareXID:
+ case kTypeBeginPersistedPrepareXID:
+ case kTypeBeginUnprepareXID:
+ case kTypeEndPrepareXID:
+ case kTypeCommitXID:
+ case kTypeCommitXIDAndTimestamp:
+ case kTypeRollbackXID:
+ case kTypeNoop:
+ break;
+ default:
+ return Status::Corruption(
+ "unknown WriteBatch tag in ReBuildIndex",
+ std::to_string(static_cast<unsigned int>(tag)));
+ }
+ }
+
+ if (s.ok() && found != write_batch.Count()) {
+ s = Status::Corruption("WriteBatch has wrong count");
+ }
+
+ return s;
+}
+
+WriteBatchWithIndex::WriteBatchWithIndex(
+ const Comparator* default_index_comparator, size_t reserved_bytes,
+ bool overwrite_key, size_t max_bytes, size_t protection_bytes_per_key)
+ : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
+ overwrite_key, protection_bytes_per_key)) {}
+
+WriteBatchWithIndex::~WriteBatchWithIndex() {}
+
+WriteBatchWithIndex::WriteBatchWithIndex(WriteBatchWithIndex&&) = default;
+
+WriteBatchWithIndex& WriteBatchWithIndex::operator=(WriteBatchWithIndex&&) =
+ default;
+
+WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
+
+size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
+
+WBWIIterator* WriteBatchWithIndex::NewIterator() {
+ return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
+ &(rep->comparator));
+}
+
+WBWIIterator* WriteBatchWithIndex::NewIterator(
+ ColumnFamilyHandle* column_family) {
+ return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
+ &(rep->skip_list), &rep->write_batch,
+ &(rep->comparator));
+}
+
+Iterator* WriteBatchWithIndex::NewIteratorWithBase(
+ ColumnFamilyHandle* column_family, Iterator* base_iterator,
+ const ReadOptions* read_options) {
+ auto wbwiii =
+ new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list),
+ &rep->write_batch, &rep->comparator);
+ return new BaseDeltaIterator(column_family, base_iterator, wbwiii,
+ GetColumnFamilyUserComparator(column_family),
+ read_options);
+}
+
+Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
+ // default column family's comparator
+ auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
+ &rep->comparator);
+ return new BaseDeltaIterator(nullptr, base_iterator, wbwiii,
+ rep->comparator.default_comparator());
+}
+
+Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
+ const Slice& key, const Slice& value) {
+ rep->SetLastEntryOffset();
+ auto s = rep->write_batch.Put(column_family, key, value);
+ if (s.ok()) {
+ rep->AddOrUpdateIndex(column_family, key, kPutRecord);
+ }
+ return s;
+}
+
+Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
+ rep->SetLastEntryOffset();
+ auto s = rep->write_batch.Put(key, value);
+ if (s.ok()) {
+ rep->AddOrUpdateIndex(key, kPutRecord);
+ }
+ return s;
+}
+
+Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
+ const Slice& /*key*/, const Slice& /*ts*/,
+ const Slice& /*value*/) {
+ if (!column_family) {
+ return Status::InvalidArgument("column family handle cannot be nullptr");
+ }
+ // TODO: support WBWI::Put() with timestamp.
+ return Status::NotSupported();
+}
+
+Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
+ const Slice& key) {
+ rep->SetLastEntryOffset();
+ auto s = rep->write_batch.Delete(column_family, key);
+ if (s.ok()) {
+ rep->AddOrUpdateIndex(column_family, key, kDeleteRecord);
+ }
+ return s;
+}
+
+Status WriteBatchWithIndex::Delete(const Slice& key) {
+ rep->SetLastEntryOffset();
+ auto s = rep->write_batch.Delete(key);
+ if (s.ok()) {
+ rep->AddOrUpdateIndex(key, kDeleteRecord);
+ }
+ return s;
+}
+
+Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
+ const Slice& /*key*/, const Slice& /*ts*/) {
+ if (!column_family) {
+ return Status::InvalidArgument("column family handle cannot be nullptr");
+ }
+ // TODO: support WBWI::Delete() with timestamp.
+ return Status::NotSupported();
+}
+
+Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
+ const Slice& key) {
+ rep->SetLastEntryOffset();
+ auto s = rep->write_batch.SingleDelete(column_family, key);
+ if (s.ok()) {
+ rep->AddOrUpdateIndex(column_family, key, kSingleDeleteRecord);
+ }
+ return s;
+}
+
+Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
+ rep->SetLastEntryOffset();
+ auto s = rep->write_batch.SingleDelete(key);
+ if (s.ok()) {
+ rep->AddOrUpdateIndex(key, kSingleDeleteRecord);
+ }
+ return s;
+}
+
+Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
+ const Slice& /*key*/,
+ const Slice& /*ts*/) {
+ if (!column_family) {
+ return Status::InvalidArgument("column family handle cannot be nullptr");
+ }
+ // TODO: support WBWI::SingleDelete() with timestamp.
+ return Status::NotSupported();
+}
+
+Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
+ const Slice& key, const Slice& value) {
+ rep->SetLastEntryOffset();
+ auto s = rep->write_batch.Merge(column_family, key, value);
+ if (s.ok()) {
+ rep->AddOrUpdateIndex(column_family, key, kMergeRecord);
+ }
+ return s;
+}
+
+Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
+ rep->SetLastEntryOffset();
+ auto s = rep->write_batch.Merge(key, value);
+ if (s.ok()) {
+ rep->AddOrUpdateIndex(key, kMergeRecord);
+ }
+ return s;
+}
+
+Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
+ return rep->write_batch.PutLogData(blob);
+}
+
+void WriteBatchWithIndex::Clear() { rep->Clear(); }
+
+Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
+ const DBOptions& options,
+ const Slice& key, std::string* value) {
+ Status s;
+ WriteBatchWithIndexInternal wbwii(&options, column_family);
+ auto result = wbwii.GetFromBatch(this, key, value, &s);
+
+ switch (result) {
+ case WBWIIteratorImpl::kFound:
+ case WBWIIteratorImpl::kError:
+ // use returned status
+ break;
+ case WBWIIteratorImpl::kDeleted:
+ case WBWIIteratorImpl::kNotFound:
+ s = Status::NotFound();
+ break;
+ case WBWIIteratorImpl::kMergeInProgress:
+ s = Status::MergeInProgress();
+ break;
+ default:
+ assert(false);
+ }
+
+ return s;
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
+ const ReadOptions& read_options,
+ const Slice& key,
+ std::string* value) {
+ assert(value != nullptr);
+ PinnableSlice pinnable_val(value);
+ assert(!pinnable_val.IsPinned());
+ auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
+ &pinnable_val);
+ if (s.ok() && pinnable_val.IsPinned()) {
+ value->assign(pinnable_val.data(), pinnable_val.size());
+ } // else value is already assigned
+ return s;
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
+ const ReadOptions& read_options,
+ const Slice& key,
+ PinnableSlice* pinnable_val) {
+ return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key,
+ pinnable_val);
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
+ const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key,
+ std::string* value) {
+ assert(value != nullptr);
+ PinnableSlice pinnable_val(value);
+ assert(!pinnable_val.IsPinned());
+ auto s =
+ GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val);
+ if (s.ok() && pinnable_val.IsPinned()) {
+ value->assign(pinnable_val.data(), pinnable_val.size());
+ } // else value is already assigned
+ return s;
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
+ const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key,
+ PinnableSlice* pinnable_val) {
+ return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val,
+ nullptr);
+}
+
+Status WriteBatchWithIndex::GetFromBatchAndDB(
+ DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
+ const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
+ size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
+ if (ts_sz > 0 && !read_options.timestamp) {
+ return Status::InvalidArgument("Must specify timestamp");
+ }
+
+ Status s;
+ WriteBatchWithIndexInternal wbwii(db, column_family);
+
+ // Since the lifetime of the WriteBatch is the same as that of the transaction
+ // we cannot pin it as otherwise the returned value will not be available
+ // after the transaction finishes.
+ std::string& batch_value = *pinnable_val->GetSelf();
+ auto result = wbwii.GetFromBatch(this, key, &batch_value, &s);
+
+ if (result == WBWIIteratorImpl::kFound) {
+ pinnable_val->PinSelf();
+ return s;
+ } else if (!s.ok() || result == WBWIIteratorImpl::kError) {
+ return s;
+ } else if (result == WBWIIteratorImpl::kDeleted) {
+ return Status::NotFound();
+ }
+ assert(result == WBWIIteratorImpl::kMergeInProgress ||
+ result == WBWIIteratorImpl::kNotFound);
+
+ // Did not find key in batch OR could not resolve Merges. Try DB.
+ if (!callback) {
+ s = db->Get(read_options, column_family, key, pinnable_val);
+ } else {
+ DBImpl::GetImplOptions get_impl_options;
+ get_impl_options.column_family = column_family;
+ get_impl_options.value = pinnable_val;
+ get_impl_options.callback = callback;
+ s = static_cast_with_check<DBImpl>(db->GetRootDB())
+ ->GetImpl(read_options, key, get_impl_options);
+ }
+
+ if (s.ok() || s.IsNotFound()) { // DB Get Succeeded
+ if (result == WBWIIteratorImpl::kMergeInProgress) {
+ // Merge result from DB with merges in Batch
+ std::string merge_result;
+ if (s.ok()) {
+ s = wbwii.MergeKey(key, pinnable_val, &merge_result);
+ } else { // Key not present in db (s.IsNotFound())
+ s = wbwii.MergeKey(key, nullptr, &merge_result);
+ }
+ if (s.ok()) {
+ pinnable_val->Reset();
+ *pinnable_val->GetSelf() = std::move(merge_result);
+ pinnable_val->PinSelf();
+ }
+ }
+ }
+
+ return s;
+}
+
+void WriteBatchWithIndex::MultiGetFromBatchAndDB(
+ DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const size_t num_keys, const Slice* keys, PinnableSlice* values,
+ Status* statuses, bool sorted_input) {
+ MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
+ values, statuses, sorted_input, nullptr);
+}
+
+void WriteBatchWithIndex::MultiGetFromBatchAndDB(
+ DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const size_t num_keys, const Slice* keys, PinnableSlice* values,
+ Status* statuses, bool sorted_input, ReadCallback* callback) {
+ const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
+ size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
+ if (ts_sz > 0 && !read_options.timestamp) {
+ for (size_t i = 0; i < num_keys; ++i) {
+ statuses[i] = Status::InvalidArgument("Must specify timestamp");
+ }
+ return;
+ }
+
+ WriteBatchWithIndexInternal wbwii(db, column_family);
+
+ autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
+ autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
+ // To hold merges from the write batch
+ autovector<std::pair<WBWIIteratorImpl::Result, MergeContext>,
+ MultiGetContext::MAX_BATCH_SIZE>
+ merges;
+ // Since the lifetime of the WriteBatch is the same as that of the transaction
+ // we cannot pin it as otherwise the returned value will not be available
+ // after the transaction finishes.
+ for (size_t i = 0; i < num_keys; ++i) {
+ MergeContext merge_context;
+ std::string batch_value;
+ Status* s = &statuses[i];
+ PinnableSlice* pinnable_val = &values[i];
+ pinnable_val->Reset();
+ auto result =
+ wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s);
+
+ if (result == WBWIIteratorImpl::kFound) {
+ *pinnable_val->GetSelf() = std::move(batch_value);
+ pinnable_val->PinSelf();
+ continue;
+ }
+ if (result == WBWIIteratorImpl::kDeleted) {
+ *s = Status::NotFound();
+ continue;
+ }
+ if (result == WBWIIteratorImpl::kError) {
+ continue;
+ }
+ assert(result == WBWIIteratorImpl::kMergeInProgress ||
+ result == WBWIIteratorImpl::kNotFound);
+ key_context.emplace_back(column_family, keys[i], &values[i],
+ /*timestamp*/ nullptr, &statuses[i]);
+ merges.emplace_back(result, std::move(merge_context));
+ }
+
+ for (KeyContext& key : key_context) {
+ sorted_keys.emplace_back(&key);
+ }
+
+ // Did not find key in batch OR could not resolve Merges. Try DB.
+ static_cast_with_check<DBImpl>(db->GetRootDB())
+ ->PrepareMultiGetKeys(key_context.size(), sorted_input, &sorted_keys);
+ static_cast_with_check<DBImpl>(db->GetRootDB())
+ ->MultiGetWithCallback(read_options, column_family, callback,
+ &sorted_keys);
+
+ for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
+ KeyContext& key = *iter;
+ if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
+ size_t index = iter - key_context.begin();
+ std::pair<WBWIIteratorImpl::Result, MergeContext>& merge_result =
+ merges[index];
+ if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) {
+ std::string merged_value;
+ // Merge result from DB with merges in Batch
+ if (key.s->ok()) {
+ *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second,
+ &merged_value);
+ } else { // Key not present in db (s.IsNotFound())
+ *key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second,
+ &merged_value);
+ }
+ if (key.s->ok()) {
+ key.value->Reset();
+ *key.value->GetSelf() = std::move(merged_value);
+ key.value->PinSelf();
+ }
+ }
+ }
+ }
+}
+
+void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }
+
+Status WriteBatchWithIndex::RollbackToSavePoint() {
+ Status s = rep->write_batch.RollbackToSavePoint();
+
+ if (s.ok()) {
+ rep->sub_batch_cnt = 1;
+ rep->last_sub_batch_offset = 0;
+ s = rep->ReBuildIndex();
+ }
+
+ return s;
+}
+
+Status WriteBatchWithIndex::PopSavePoint() {
+ return rep->write_batch.PopSavePoint();
+}
+
+void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
+ rep->write_batch.SetMaxBytes(max_bytes);
+}
+
+size_t WriteBatchWithIndex::GetDataSize() const {
+ return rep->write_batch.GetDataSize();
+}
+
+const Comparator* WriteBatchWithIndexInternal::GetUserComparator(
+ const WriteBatchWithIndex& wbwi, uint32_t cf_id) {
+ const WriteBatchEntryComparator& ucmps = wbwi.rep->comparator;
+ return ucmps.GetComparator(cf_id);
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
new file mode 100644
index 000000000..3c9205bf7
--- /dev/null
+++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
@@ -0,0 +1,735 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
+
+#include "db/column_family.h"
+#include "db/db_impl/db_impl.h"
+#include "db/merge_context.h"
+#include "db/merge_helper.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/db.h"
+#include "rocksdb/utilities/write_batch_with_index.h"
+#include "util/cast_util.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family,
+ Iterator* base_iterator,
+ WBWIIteratorImpl* delta_iterator,
+ const Comparator* comparator,
+ const ReadOptions* read_options)
+ : forward_(true),
+ current_at_base_(true),
+ equal_keys_(false),
+ status_(Status::OK()),
+ base_iterator_(base_iterator),
+ delta_iterator_(delta_iterator),
+ comparator_(comparator),
+ iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
+ : nullptr) {
+ assert(comparator_);
+ wbwii_.reset(new WriteBatchWithIndexInternal(column_family));
+}
+
+bool BaseDeltaIterator::Valid() const {
+ return status_.ok() ? (current_at_base_ ? BaseValid() : DeltaValid()) : false;
+}
+
+void BaseDeltaIterator::SeekToFirst() {
+ forward_ = true;
+ base_iterator_->SeekToFirst();
+ delta_iterator_->SeekToFirst();
+ UpdateCurrent();
+}
+
+void BaseDeltaIterator::SeekToLast() {
+ forward_ = false;
+ base_iterator_->SeekToLast();
+ delta_iterator_->SeekToLast();
+ UpdateCurrent();
+}
+
+void BaseDeltaIterator::Seek(const Slice& k) {
+ forward_ = true;
+ base_iterator_->Seek(k);
+ delta_iterator_->Seek(k);
+ UpdateCurrent();
+}
+
+void BaseDeltaIterator::SeekForPrev(const Slice& k) {
+ forward_ = false;
+ base_iterator_->SeekForPrev(k);
+ delta_iterator_->SeekForPrev(k);
+ UpdateCurrent();
+}
+
+void BaseDeltaIterator::Next() {
+ if (!Valid()) {
+ status_ = Status::NotSupported("Next() on invalid iterator");
+ return;
+ }
+
+ if (!forward_) {
+ // Need to change direction
+ // if our direction was backward and we're not equal, we have two states:
+ // * both iterators are valid: we're already in a good state (current
+ // shows to smaller)
+ // * only one iterator is valid: we need to advance that iterator
+ forward_ = true;
+ equal_keys_ = false;
+ if (!BaseValid()) {
+ assert(DeltaValid());
+ base_iterator_->SeekToFirst();
+ } else if (!DeltaValid()) {
+ delta_iterator_->SeekToFirst();
+ } else if (current_at_base_) {
+ // Change delta from larger than base to smaller
+ AdvanceDelta();
+ } else {
+ // Change base from larger than delta to smaller
+ AdvanceBase();
+ }
+ if (DeltaValid() && BaseValid()) {
+ if (0 == comparator_->CompareWithoutTimestamp(
+ delta_iterator_->Entry().key, /*a_has_ts=*/false,
+ base_iterator_->key(), /*b_has_ts=*/false)) {
+ equal_keys_ = true;
+ }
+ }
+ }
+ Advance();
+}
+
+void BaseDeltaIterator::Prev() {
+ if (!Valid()) {
+ status_ = Status::NotSupported("Prev() on invalid iterator");
+ return;
+ }
+
+ if (forward_) {
+ // Need to change direction
+ // if our direction was backward and we're not equal, we have two states:
+ // * both iterators are valid: we're already in a good state (current
+ // shows to smaller)
+ // * only one iterator is valid: we need to advance that iterator
+ forward_ = false;
+ equal_keys_ = false;
+ if (!BaseValid()) {
+ assert(DeltaValid());
+ base_iterator_->SeekToLast();
+ } else if (!DeltaValid()) {
+ delta_iterator_->SeekToLast();
+ } else if (current_at_base_) {
+ // Change delta from less advanced than base to more advanced
+ AdvanceDelta();
+ } else {
+ // Change base from less advanced than delta to more advanced
+ AdvanceBase();
+ }
+ if (DeltaValid() && BaseValid()) {
+ if (0 == comparator_->CompareWithoutTimestamp(
+ delta_iterator_->Entry().key, /*a_has_ts=*/false,
+ base_iterator_->key(), /*b_has_ts=*/false)) {
+ equal_keys_ = true;
+ }
+ }
+ }
+
+ Advance();
+}
+
+Slice BaseDeltaIterator::key() const {
+ return current_at_base_ ? base_iterator_->key()
+ : delta_iterator_->Entry().key;
+}
+
+Slice BaseDeltaIterator::value() const {
+ if (current_at_base_) {
+ return base_iterator_->value();
+ } else {
+ WriteEntry delta_entry = delta_iterator_->Entry();
+ if (wbwii_->GetNumOperands() == 0) {
+ return delta_entry.value;
+ } else if (delta_entry.type == kDeleteRecord ||
+ delta_entry.type == kSingleDeleteRecord) {
+ status_ =
+ wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf());
+ } else if (delta_entry.type == kPutRecord) {
+ status_ = wbwii_->MergeKey(delta_entry.key, &delta_entry.value,
+ merge_result_.GetSelf());
+ } else if (delta_entry.type == kMergeRecord) {
+ if (equal_keys_) {
+ Slice base_value = base_iterator_->value();
+ status_ = wbwii_->MergeKey(delta_entry.key, &base_value,
+ merge_result_.GetSelf());
+ } else {
+ status_ =
+ wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf());
+ }
+ }
+ merge_result_.PinSelf();
+ return merge_result_;
+ }
+}
+
+Status BaseDeltaIterator::status() const {
+ if (!status_.ok()) {
+ return status_;
+ }
+ if (!base_iterator_->status().ok()) {
+ return base_iterator_->status();
+ }
+ return delta_iterator_->status();
+}
+
+void BaseDeltaIterator::Invalidate(Status s) { status_ = s; }
+
+void BaseDeltaIterator::AssertInvariants() {
+#ifndef NDEBUG
+ bool not_ok = false;
+ if (!base_iterator_->status().ok()) {
+ assert(!base_iterator_->Valid());
+ not_ok = true;
+ }
+ if (!delta_iterator_->status().ok()) {
+ assert(!delta_iterator_->Valid());
+ not_ok = true;
+ }
+ if (not_ok) {
+ assert(!Valid());
+ assert(!status().ok());
+ return;
+ }
+
+ if (!Valid()) {
+ return;
+ }
+ if (!BaseValid()) {
+ assert(!current_at_base_ && delta_iterator_->Valid());
+ return;
+ }
+ if (!DeltaValid()) {
+ assert(current_at_base_ && base_iterator_->Valid());
+ return;
+ }
+ // we don't support those yet
+ assert(delta_iterator_->Entry().type != kMergeRecord &&
+ delta_iterator_->Entry().type != kLogDataRecord);
+ int compare = comparator_->CompareWithoutTimestamp(
+ delta_iterator_->Entry().key, /*a_has_ts=*/false, base_iterator_->key(),
+ /*b_has_ts=*/false);
+ if (forward_) {
+ // current_at_base -> compare < 0
+ assert(!current_at_base_ || compare < 0);
+ // !current_at_base -> compare <= 0
+ assert(current_at_base_ && compare >= 0);
+ } else {
+ // current_at_base -> compare > 0
+ assert(!current_at_base_ || compare > 0);
+ // !current_at_base -> compare <= 0
+ assert(current_at_base_ && compare <= 0);
+ }
+ // equal_keys_ <=> compare == 0
+ assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
+#endif
+}
+
+void BaseDeltaIterator::Advance() {
+ if (equal_keys_) {
+ assert(BaseValid() && DeltaValid());
+ AdvanceBase();
+ AdvanceDelta();
+ } else {
+ if (current_at_base_) {
+ assert(BaseValid());
+ AdvanceBase();
+ } else {
+ assert(DeltaValid());
+ AdvanceDelta();
+ }
+ }
+ UpdateCurrent();
+}
+
+void BaseDeltaIterator::AdvanceDelta() {
+ if (forward_) {
+ delta_iterator_->NextKey();
+ } else {
+ delta_iterator_->PrevKey();
+ }
+}
+void BaseDeltaIterator::AdvanceBase() {
+ if (forward_) {
+ base_iterator_->Next();
+ } else {
+ base_iterator_->Prev();
+ }
+}
+
+bool BaseDeltaIterator::BaseValid() const { return base_iterator_->Valid(); }
+bool BaseDeltaIterator::DeltaValid() const { return delta_iterator_->Valid(); }
+void BaseDeltaIterator::UpdateCurrent() {
+// Suppress false positive clang analyzer warnings.
+#ifndef __clang_analyzer__
+ status_ = Status::OK();
+ while (true) {
+ auto delta_result = WBWIIteratorImpl::kNotFound;
+ WriteEntry delta_entry;
+ if (DeltaValid()) {
+ assert(delta_iterator_->status().ok());
+ delta_result =
+ delta_iterator_->FindLatestUpdate(wbwii_->GetMergeContext());
+ delta_entry = delta_iterator_->Entry();
+ } else if (!delta_iterator_->status().ok()) {
+ // Expose the error status and stop.
+ current_at_base_ = false;
+ return;
+ }
+ equal_keys_ = false;
+ if (!BaseValid()) {
+ if (!base_iterator_->status().ok()) {
+ // Expose the error status and stop.
+ current_at_base_ = true;
+ return;
+ }
+
+ // Base has finished.
+ if (!DeltaValid()) {
+ // Finished
+ return;
+ }
+ if (iterate_upper_bound_) {
+ if (comparator_->CompareWithoutTimestamp(
+ delta_entry.key, /*a_has_ts=*/false, *iterate_upper_bound_,
+ /*b_has_ts=*/false) >= 0) {
+ // out of upper bound -> finished.
+ return;
+ }
+ }
+ if (delta_result == WBWIIteratorImpl::kDeleted &&
+ wbwii_->GetNumOperands() == 0) {
+ AdvanceDelta();
+ } else {
+ current_at_base_ = false;
+ return;
+ }
+ } else if (!DeltaValid()) {
+ // Delta has finished.
+ current_at_base_ = true;
+ return;
+ } else {
+ int compare =
+ (forward_ ? 1 : -1) * comparator_->CompareWithoutTimestamp(
+ delta_entry.key, /*a_has_ts=*/false,
+ base_iterator_->key(), /*b_has_ts=*/false);
+ if (compare <= 0) { // delta bigger or equal
+ if (compare == 0) {
+ equal_keys_ = true;
+ }
+ if (delta_result != WBWIIteratorImpl::kDeleted ||
+ wbwii_->GetNumOperands() > 0) {
+ current_at_base_ = false;
+ return;
+ }
+ // Delta is less advanced and is delete.
+ AdvanceDelta();
+ if (equal_keys_) {
+ AdvanceBase();
+ }
+ } else {
+ current_at_base_ = true;
+ return;
+ }
+ }
+ }
+
+ AssertInvariants();
+#endif // __clang_analyzer__
+}
+
+void WBWIIteratorImpl::AdvanceKey(bool forward) {
+ if (Valid()) {
+ Slice key = Entry().key;
+ do {
+ if (forward) {
+ Next();
+ } else {
+ Prev();
+ }
+ } while (MatchesKey(column_family_id_, key));
+ }
+}
+
+void WBWIIteratorImpl::NextKey() { AdvanceKey(true); }
+
+void WBWIIteratorImpl::PrevKey() {
+ AdvanceKey(false); // Move to the tail of the previous key
+ if (Valid()) {
+ AdvanceKey(false); // Move back another key. Now we are at the start of
+ // the previous key
+ if (Valid()) { // Still a valid
+ Next(); // Move forward one onto this key
+ } else {
+ SeekToFirst(); // Not valid, move to the start
+ }
+ }
+}
+
+WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate(
+ MergeContext* merge_context) {
+ if (Valid()) {
+ Slice key = Entry().key;
+ return FindLatestUpdate(key, merge_context);
+ } else {
+ merge_context->Clear(); // Clear any entries in the MergeContext
+ return WBWIIteratorImpl::kNotFound;
+ }
+}
+
+WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate(
+ const Slice& key, MergeContext* merge_context) {
+ Result result = WBWIIteratorImpl::kNotFound;
+ merge_context->Clear(); // Clear any entries in the MergeContext
+ // TODO(agiardullo): consider adding support for reverse iteration
+ if (!Valid()) {
+ return result;
+ } else if (comparator_->CompareKey(column_family_id_, Entry().key, key) !=
+ 0) {
+ return result;
+ } else {
+ // We want to iterate in the reverse order that the writes were added to the
+ // batch. Since we don't have a reverse iterator, we must seek past the
+ // end. We do this by seeking to the next key, and then back one step
+ NextKey();
+ if (Valid()) {
+ Prev();
+ } else {
+ SeekToLast();
+ }
+
+ // We are at the end of the iterator for this key. Search backwards for the
+ // last Put or Delete, accumulating merges along the way.
+ while (Valid()) {
+ const WriteEntry entry = Entry();
+ if (comparator_->CompareKey(column_family_id_, entry.key, key) != 0) {
+ break; // Unexpected error or we've reached a different next key
+ }
+
+ switch (entry.type) {
+ case kPutRecord:
+ return WBWIIteratorImpl::kFound;
+ case kDeleteRecord:
+ return WBWIIteratorImpl::kDeleted;
+ case kSingleDeleteRecord:
+ return WBWIIteratorImpl::kDeleted;
+ case kMergeRecord:
+ result = WBWIIteratorImpl::kMergeInProgress;
+ merge_context->PushOperand(entry.value);
+ break;
+ case kLogDataRecord:
+ break; // ignore
+ case kXIDRecord:
+ break; // ignore
+ default:
+ return WBWIIteratorImpl::kError;
+ } // end switch statement
+ Prev();
+ } // End while Valid()
+ // At this point, we have been through the whole list and found no Puts or
+ // Deletes. The iterator points to the previous key. Move the iterator back
+ // onto this one.
+ if (Valid()) {
+ Next();
+ } else {
+ SeekToFirst();
+ }
+ }
+ return result;
+}
+
+Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
+ WriteType* type, Slice* Key,
+ Slice* value, Slice* blob,
+ Slice* xid) const {
+ if (type == nullptr || Key == nullptr || value == nullptr ||
+ blob == nullptr || xid == nullptr) {
+ return Status::InvalidArgument("Output parameters cannot be null");
+ }
+
+ if (data_offset == GetDataSize()) {
+ // reached end of batch.
+ return Status::NotFound();
+ }
+
+ if (data_offset > GetDataSize()) {
+ return Status::InvalidArgument("data offset exceed write batch size");
+ }
+ Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
+ char tag;
+ uint32_t column_family;
+ Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
+ blob, xid);
+ if (!s.ok()) {
+ return s;
+ }
+
+ switch (tag) {
+ case kTypeColumnFamilyValue:
+ case kTypeValue:
+ *type = kPutRecord;
+ break;
+ case kTypeColumnFamilyDeletion:
+ case kTypeDeletion:
+ *type = kDeleteRecord;
+ break;
+ case kTypeColumnFamilySingleDeletion:
+ case kTypeSingleDeletion:
+ *type = kSingleDeleteRecord;
+ break;
+ case kTypeColumnFamilyRangeDeletion:
+ case kTypeRangeDeletion:
+ *type = kDeleteRangeRecord;
+ break;
+ case kTypeColumnFamilyMerge:
+ case kTypeMerge:
+ *type = kMergeRecord;
+ break;
+ case kTypeLogData:
+ *type = kLogDataRecord;
+ break;
+ case kTypeNoop:
+ case kTypeBeginPrepareXID:
+ case kTypeBeginPersistedPrepareXID:
+ case kTypeBeginUnprepareXID:
+ case kTypeEndPrepareXID:
+ case kTypeCommitXID:
+ case kTypeRollbackXID:
+ *type = kXIDRecord;
+ break;
+ default:
+ return Status::Corruption("unknown WriteBatch tag ",
+ std::to_string(static_cast<unsigned int>(tag)));
+ }
+ return Status::OK();
+}
+
+// If both of `entry1` and `entry2` point to real entry in write batch, we
+// compare the entries as following:
+// 1. first compare the column family, the one with larger CF will be larger;
+// 2. Inside the same CF, we first decode the entry to find the key of the entry
+// and the entry with larger key will be larger;
+// 3. If two entries are of the same CF and key, the one with larger offset
+// will be larger.
+// Some times either `entry1` or `entry2` is dummy entry, which is actually
+// a search key. In this case, in step 2, we don't go ahead and decode the
+// entry but use the value in WriteBatchIndexEntry::search_key.
+// One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf.
+// This indicate that we are going to seek to the first of the column family.
+// Once we see this, this entry will be smaller than all the real entries of
+// the column family.
+int WriteBatchEntryComparator::operator()(
+ const WriteBatchIndexEntry* entry1,
+ const WriteBatchIndexEntry* entry2) const {
+ if (entry1->column_family > entry2->column_family) {
+ return 1;
+ } else if (entry1->column_family < entry2->column_family) {
+ return -1;
+ }
+
+ // Deal with special case of seeking to the beginning of a column family
+ if (entry1->is_min_in_cf()) {
+ return -1;
+ } else if (entry2->is_min_in_cf()) {
+ return 1;
+ }
+
+ Slice key1, key2;
+ if (entry1->search_key == nullptr) {
+ key1 = Slice(write_batch_->Data().data() + entry1->key_offset,
+ entry1->key_size);
+ } else {
+ key1 = *(entry1->search_key);
+ }
+ if (entry2->search_key == nullptr) {
+ key2 = Slice(write_batch_->Data().data() + entry2->key_offset,
+ entry2->key_size);
+ } else {
+ key2 = *(entry2->search_key);
+ }
+
+ int cmp = CompareKey(entry1->column_family, key1, key2);
+ if (cmp != 0) {
+ return cmp;
+ } else if (entry1->offset > entry2->offset) {
+ return 1;
+ } else if (entry1->offset < entry2->offset) {
+ return -1;
+ }
+ return 0;
+}
+
+int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
+ const Slice& key1,
+ const Slice& key2) const {
+ if (column_family < cf_comparators_.size() &&
+ cf_comparators_[column_family] != nullptr) {
+ return cf_comparators_[column_family]->CompareWithoutTimestamp(
+ key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false);
+ } else {
+ return default_comparator_->CompareWithoutTimestamp(
+ key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false);
+ }
+}
+
+const Comparator* WriteBatchEntryComparator::GetComparator(
+ const ColumnFamilyHandle* column_family) const {
+ return column_family ? column_family->GetComparator() : default_comparator_;
+}
+
+const Comparator* WriteBatchEntryComparator::GetComparator(
+ uint32_t column_family) const {
+ if (column_family < cf_comparators_.size() &&
+ cf_comparators_[column_family]) {
+ return cf_comparators_[column_family];
+ }
+ return default_comparator_;
+}
+
+WriteEntry WBWIIteratorImpl::Entry() const {
+ WriteEntry ret;
+ Slice blob, xid;
+ const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
+ // this is guaranteed with Valid()
+ assert(iter_entry != nullptr &&
+ iter_entry->column_family == column_family_id_);
+ auto s = write_batch_->GetEntryFromDataOffset(
+ iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
+ assert(s.ok());
+ assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
+ ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
+ ret.type == kMergeRecord);
+ // Make sure entry.key does not include user-defined timestamp.
+ const Comparator* const ucmp = comparator_->GetComparator(column_family_id_);
+ size_t ts_sz = ucmp->timestamp_size();
+ if (ts_sz > 0) {
+ ret.key = StripTimestampFromUserKey(ret.key, ts_sz);
+ }
+ return ret;
+}
+
+bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) {
+ if (Valid()) {
+ return comparator_->CompareKey(cf_id, key, Entry().key) == 0;
+ } else {
+ return false;
+ }
+}
+
+WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
+ ColumnFamilyHandle* column_family)
+ : db_(nullptr), db_options_(nullptr), column_family_(column_family) {}
+
+WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
+ DB* db, ColumnFamilyHandle* column_family)
+ : db_(db), db_options_(nullptr), column_family_(column_family) {
+ if (db_ != nullptr && column_family_ == nullptr) {
+ column_family_ = db_->DefaultColumnFamily();
+ }
+}
+
+WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
+ const DBOptions* db_options, ColumnFamilyHandle* column_family)
+ : db_(nullptr), db_options_(db_options), column_family_(column_family) {}
+
+Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
+ const Slice* value,
+ const MergeContext& context,
+ std::string* result) const {
+ if (column_family_ != nullptr) {
+ auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family_);
+ const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get();
+ if (merge_operator == nullptr) {
+ return Status::InvalidArgument(
+ "Merge_operator must be set for column_family");
+ } else if (db_ != nullptr) {
+ const ImmutableDBOptions& immutable_db_options =
+ static_cast_with_check<DBImpl>(db_->GetRootDB())
+ ->immutable_db_options();
+ Statistics* statistics = immutable_db_options.statistics.get();
+ Logger* logger = immutable_db_options.info_log.get();
+ SystemClock* clock = immutable_db_options.clock;
+ return MergeHelper::TimedFullMerge(
+ merge_operator, key, value, context.GetOperands(), result, logger,
+ statistics, clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ false);
+ } else if (db_options_ != nullptr) {
+ Statistics* statistics = db_options_->statistics.get();
+ Env* env = db_options_->env;
+ Logger* logger = db_options_->info_log.get();
+ SystemClock* clock = env->GetSystemClock().get();
+ return MergeHelper::TimedFullMerge(
+ merge_operator, key, value, context.GetOperands(), result, logger,
+ statistics, clock, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ false);
+ } else {
+ const auto cf_opts = cfh->cfd()->ioptions();
+ return MergeHelper::TimedFullMerge(
+ merge_operator, key, value, context.GetOperands(), result,
+ cf_opts->logger, cf_opts->stats, cf_opts->clock,
+ /* result_operand */ nullptr, /* update_num_ops_stats */ false);
+ }
+ } else {
+ return Status::InvalidArgument("Must provide a column_family");
+ }
+}
+
+WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch(
+ WriteBatchWithIndex* batch, const Slice& key, MergeContext* context,
+ std::string* value, Status* s) {
+ *s = Status::OK();
+
+ std::unique_ptr<WBWIIteratorImpl> iter(
+ static_cast_with_check<WBWIIteratorImpl>(
+ batch->NewIterator(column_family_)));
+
+ // Search the iterator for this key, and updates/merges to it.
+ iter->Seek(key);
+ auto result = iter->FindLatestUpdate(key, context);
+ if (result == WBWIIteratorImpl::kError) {
+ (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
+ std::to_string(iter->Entry().type));
+ return result;
+ } else if (result == WBWIIteratorImpl::kNotFound) {
+ return result;
+ } else if (result == WBWIIteratorImpl::Result::kFound) { // PUT
+ Slice entry_value = iter->Entry().value;
+ if (context->GetNumOperands() > 0) {
+ *s = MergeKey(key, &entry_value, *context, value);
+ if (!s->ok()) {
+ result = WBWIIteratorImpl::Result::kError;
+ }
+ } else {
+ value->assign(entry_value.data(), entry_value.size());
+ }
+ } else if (result == WBWIIteratorImpl::kDeleted) {
+ if (context->GetNumOperands() > 0) {
+ *s = MergeKey(key, nullptr, *context, value);
+ if (s->ok()) {
+ result = WBWIIteratorImpl::Result::kFound;
+ } else {
+ result = WBWIIteratorImpl::Result::kError;
+ }
+ }
+ }
+ return result;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h
new file mode 100644
index 000000000..edabc95bc
--- /dev/null
+++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h
@@ -0,0 +1,344 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "db/merge_context.h"
+#include "memtable/skiplist.h"
+#include "options/db_options.h"
+#include "port/port.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "rocksdb/utilities/write_batch_with_index.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class MergeContext;
+class WBWIIteratorImpl;
+class WriteBatchWithIndexInternal;
+struct Options;
+
+// when direction == forward
+// * current_at_base_ <=> base_iterator > delta_iterator
+// when direction == backwards
+// * current_at_base_ <=> base_iterator < delta_iterator
+// always:
+// * equal_keys_ <=> base_iterator == delta_iterator
+class BaseDeltaIterator : public Iterator {
+ public:
+ BaseDeltaIterator(ColumnFamilyHandle* column_family, Iterator* base_iterator,
+ WBWIIteratorImpl* delta_iterator,
+ const Comparator* comparator,
+ const ReadOptions* read_options = nullptr);
+
+ ~BaseDeltaIterator() override {}
+
+ bool Valid() const override;
+ void SeekToFirst() override;
+ void SeekToLast() override;
+ void Seek(const Slice& k) override;
+ void SeekForPrev(const Slice& k) override;
+ void Next() override;
+ void Prev() override;
+ Slice key() const override;
+ Slice value() const override;
+ Status status() const override;
+ void Invalidate(Status s);
+
+ private:
+ void AssertInvariants();
+ void Advance();
+ void AdvanceDelta();
+ void AdvanceBase();
+ bool BaseValid() const;
+ bool DeltaValid() const;
+ void UpdateCurrent();
+
+ std::unique_ptr<WriteBatchWithIndexInternal> wbwii_;
+ bool forward_;
+ bool current_at_base_;
+ bool equal_keys_;
+ mutable Status status_;
+ std::unique_ptr<Iterator> base_iterator_;
+ std::unique_ptr<WBWIIteratorImpl> delta_iterator_;
+ const Comparator* comparator_; // not owned
+ const Slice* iterate_upper_bound_;
+ mutable PinnableSlice merge_result_;
+};
+
+// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
+struct WriteBatchIndexEntry {
+ WriteBatchIndexEntry(size_t o, uint32_t c, size_t ko, size_t ksz)
+ : offset(o),
+ column_family(c),
+ key_offset(ko),
+ key_size(ksz),
+ search_key(nullptr) {}
+ // Create a dummy entry as the search key. This index entry won't be backed
+ // by an entry from the write batch, but a pointer to the search key. Or a
+ // special flag of offset can indicate we are seek to first.
+ // @_search_key: the search key
+ // @_column_family: column family
+ // @is_forward_direction: true for Seek(). False for SeekForPrev()
+ // @is_seek_to_first: true if we seek to the beginning of the column family
+ // _search_key should be null in this case.
+ WriteBatchIndexEntry(const Slice* _search_key, uint32_t _column_family,
+ bool is_forward_direction, bool is_seek_to_first)
+ // For SeekForPrev(), we need to make the dummy entry larger than any
+ // entry who has the same search key. Otherwise, we'll miss those entries.
+ : offset(is_forward_direction ? 0 : std::numeric_limits<size_t>::max()),
+ column_family(_column_family),
+ key_offset(0),
+ key_size(is_seek_to_first ? kFlagMinInCf : 0),
+ search_key(_search_key) {
+ assert(_search_key != nullptr || is_seek_to_first);
+ }
+
+ // If this flag appears in the key_size, it indicates a
+ // key that is smaller than any other entry for the same column family.
+ static const size_t kFlagMinInCf = std::numeric_limits<size_t>::max();
+
+ bool is_min_in_cf() const {
+ assert(key_size != kFlagMinInCf ||
+ (key_offset == 0 && search_key == nullptr));
+ return key_size == kFlagMinInCf;
+ }
+
+ // offset of an entry in write batch's string buffer. If this is a dummy
+ // lookup key, in which case search_key != nullptr, offset is set to either
+ // 0 or max, only for comparison purpose. Because when entries have the same
+ // key, the entry with larger offset is larger, offset = 0 will make a seek
+ // key small or equal than all the entries with the seek key, so that Seek()
+ // will find all the entries of the same key. Similarly, offset = MAX will
+ // make the entry just larger than all entries with the search key so
+ // SeekForPrev() will see all the keys with the same key.
+ size_t offset;
+ uint32_t column_family; // column family of the entry.
+ size_t key_offset; // offset of the key in write batch's string buffer.
+ size_t key_size; // size of the key. kFlagMinInCf indicates
+ // that this is a dummy look up entry for
+ // SeekToFirst() to the beginning of the column
+ // family. We use the flag here to save a boolean
+ // in the struct.
+
+ const Slice* search_key; // if not null, instead of reading keys from
+ // write batch, use it to compare. This is used
+ // for lookup key.
+};
+
+class ReadableWriteBatch : public WriteBatch {
+ public:
+ explicit ReadableWriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0,
+ size_t protection_bytes_per_key = 0,
+ size_t default_cf_ts_sz = 0)
+ : WriteBatch(reserved_bytes, max_bytes, protection_bytes_per_key,
+ default_cf_ts_sz) {}
+ // Retrieve some information from a write entry in the write batch, given
+ // the start offset of the write entry.
+ Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
+ Slice* value, Slice* blob, Slice* xid) const;
+};
+
+class WriteBatchEntryComparator {
+ public:
+ WriteBatchEntryComparator(const Comparator* _default_comparator,
+ const ReadableWriteBatch* write_batch)
+ : default_comparator_(_default_comparator), write_batch_(write_batch) {}
+ // Compare a and b. Return a negative value if a is less than b, 0 if they
+ // are equal, and a positive value if a is greater than b
+ int operator()(const WriteBatchIndexEntry* entry1,
+ const WriteBatchIndexEntry* entry2) const;
+
+ int CompareKey(uint32_t column_family, const Slice& key1,
+ const Slice& key2) const;
+
+ void SetComparatorForCF(uint32_t column_family_id,
+ const Comparator* comparator) {
+ if (column_family_id >= cf_comparators_.size()) {
+ cf_comparators_.resize(column_family_id + 1, nullptr);
+ }
+ cf_comparators_[column_family_id] = comparator;
+ }
+
+ const Comparator* default_comparator() { return default_comparator_; }
+
+ const Comparator* GetComparator(
+ const ColumnFamilyHandle* column_family) const;
+
+ const Comparator* GetComparator(uint32_t column_family) const;
+
+ private:
+ const Comparator* const default_comparator_;
+ std::vector<const Comparator*> cf_comparators_;
+ const ReadableWriteBatch* const write_batch_;
+};
+
+using WriteBatchEntrySkipList =
+ SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>;
+
+class WBWIIteratorImpl : public WBWIIterator {
+ public:
+ enum Result : uint8_t {
+ kFound,
+ kDeleted,
+ kNotFound,
+ kMergeInProgress,
+ kError
+ };
+ WBWIIteratorImpl(uint32_t column_family_id,
+ WriteBatchEntrySkipList* skip_list,
+ const ReadableWriteBatch* write_batch,
+ WriteBatchEntryComparator* comparator)
+ : column_family_id_(column_family_id),
+ skip_list_iter_(skip_list),
+ write_batch_(write_batch),
+ comparator_(comparator) {}
+
+ ~WBWIIteratorImpl() override {}
+
+ bool Valid() const override {
+ if (!skip_list_iter_.Valid()) {
+ return false;
+ }
+ const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
+ return (iter_entry != nullptr &&
+ iter_entry->column_family == column_family_id_);
+ }
+
+ void SeekToFirst() override {
+ WriteBatchIndexEntry search_entry(
+ nullptr /* search_key */, column_family_id_,
+ true /* is_forward_direction */, true /* is_seek_to_first */);
+ skip_list_iter_.Seek(&search_entry);
+ }
+
+ void SeekToLast() override {
+ WriteBatchIndexEntry search_entry(
+ nullptr /* search_key */, column_family_id_ + 1,
+ true /* is_forward_direction */, true /* is_seek_to_first */);
+ skip_list_iter_.Seek(&search_entry);
+ if (!skip_list_iter_.Valid()) {
+ skip_list_iter_.SeekToLast();
+ } else {
+ skip_list_iter_.Prev();
+ }
+ }
+
+ void Seek(const Slice& key) override {
+ WriteBatchIndexEntry search_entry(&key, column_family_id_,
+ true /* is_forward_direction */,
+ false /* is_seek_to_first */);
+ skip_list_iter_.Seek(&search_entry);
+ }
+
+ void SeekForPrev(const Slice& key) override {
+ WriteBatchIndexEntry search_entry(&key, column_family_id_,
+ false /* is_forward_direction */,
+ false /* is_seek_to_first */);
+ skip_list_iter_.SeekForPrev(&search_entry);
+ }
+
+ void Next() override { skip_list_iter_.Next(); }
+
+ void Prev() override { skip_list_iter_.Prev(); }
+
+ WriteEntry Entry() const override;
+
+ Status status() const override {
+ // this is in-memory data structure, so the only way status can be non-ok is
+ // through memory corruption
+ return Status::OK();
+ }
+
+ const WriteBatchIndexEntry* GetRawEntry() const {
+ return skip_list_iter_.key();
+ }
+
+ bool MatchesKey(uint32_t cf_id, const Slice& key);
+
+ // Moves the iterator to first entry of the previous key.
+ void PrevKey();
+ // Moves the iterator to first entry of the next key.
+ void NextKey();
+
+ // Moves the iterator to the Update (Put or Delete) for the current key
+ // If there are no Put/Delete, the Iterator will point to the first entry for
+ // this key
+ // @return kFound if a Put was found for the key
+ // @return kDeleted if a delete was found for the key
+ // @return kMergeInProgress if only merges were fouund for the key
+ // @return kError if an unsupported operation was found for the key
+ // @return kNotFound if no operations were found for this key
+ //
+ Result FindLatestUpdate(const Slice& key, MergeContext* merge_context);
+ Result FindLatestUpdate(MergeContext* merge_context);
+
+ protected:
+ void AdvanceKey(bool forward);
+
+ private:
+ uint32_t column_family_id_;
+ WriteBatchEntrySkipList::Iterator skip_list_iter_;
+ const ReadableWriteBatch* write_batch_;
+ WriteBatchEntryComparator* comparator_;
+};
+
+class WriteBatchWithIndexInternal {
+ public:
+ static const Comparator* GetUserComparator(const WriteBatchWithIndex& wbwi,
+ uint32_t cf_id);
+
+ // For GetFromBatchAndDB or similar
+ explicit WriteBatchWithIndexInternal(DB* db,
+ ColumnFamilyHandle* column_family);
+ // For GetFromBatchAndDB or similar
+ explicit WriteBatchWithIndexInternal(ColumnFamilyHandle* column_family);
+ // For GetFromBatch or similar
+ explicit WriteBatchWithIndexInternal(const DBOptions* db_options,
+ ColumnFamilyHandle* column_family);
+
+ // If batch contains a value for key, store it in *value and return kFound.
+ // If batch contains a deletion for key, return Deleted.
+ // If batch contains Merge operations as the most recent entry for a key,
+ // and the merge process does not stop (not reaching a value or delete),
+ // prepend the current merge operands to *operands,
+ // and return kMergeInProgress
+ // If batch does not contain this key, return kNotFound
+ // Else, return kError on error with error Status stored in *s.
+ WBWIIteratorImpl::Result GetFromBatch(WriteBatchWithIndex* batch,
+ const Slice& key, std::string* value,
+ Status* s) {
+ return GetFromBatch(batch, key, &merge_context_, value, s);
+ }
+ WBWIIteratorImpl::Result GetFromBatch(WriteBatchWithIndex* batch,
+ const Slice& key,
+ MergeContext* merge_context,
+ std::string* value, Status* s);
+ Status MergeKey(const Slice& key, const Slice* value,
+ std::string* result) const {
+ return MergeKey(key, value, merge_context_, result);
+ }
+ Status MergeKey(const Slice& key, const Slice* value,
+ const MergeContext& context, std::string* result) const;
+ size_t GetNumOperands() const { return merge_context_.GetNumOperands(); }
+ MergeContext* GetMergeContext() { return &merge_context_; }
+ Slice GetOperand(int index) const { return merge_context_.GetOperand(index); }
+
+ private:
+ DB* db_;
+ const DBOptions* db_options_;
+ ColumnFamilyHandle* column_family_;
+ MergeContext merge_context_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc
new file mode 100644
index 000000000..350dcc881
--- /dev/null
+++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc
@@ -0,0 +1,2419 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef ROCKSDB_LITE
+
+#include "rocksdb/utilities/write_batch_with_index.h"
+
+#include <map>
+#include <memory>
+
+#include "db/column_family.h"
+#include "port/stack_trace.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+#include "util/random.h"
+#include "util/string_util.h"
+#include "utilities/merge_operators.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
+#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
+ public:
+ explicit ColumnFamilyHandleImplDummy(int id, const Comparator* comparator)
+ : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr),
+ id_(id),
+ comparator_(comparator) {}
+ uint32_t GetID() const override { return id_; }
+ const Comparator* GetComparator() const override { return comparator_; }
+
+ private:
+ uint32_t id_;
+ const Comparator* comparator_;
+};
+
+struct Entry {
+ std::string key;
+ std::string value;
+ WriteType type;
+};
+
+struct TestHandler : public WriteBatch::Handler {
+ std::map<uint32_t, std::vector<Entry>> seen;
+ Status PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override {
+ Entry e;
+ e.key = key.ToString();
+ e.value = value.ToString();
+ e.type = kPutRecord;
+ seen[column_family_id].push_back(e);
+ return Status::OK();
+ }
+ Status MergeCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override {
+ Entry e;
+ e.key = key.ToString();
+ e.value = value.ToString();
+ e.type = kMergeRecord;
+ seen[column_family_id].push_back(e);
+ return Status::OK();
+ }
+ void LogData(const Slice& /*blob*/) override {}
+ Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
+ Entry e;
+ e.key = key.ToString();
+ e.value = "";
+ e.type = kDeleteRecord;
+ seen[column_family_id].push_back(e);
+ return Status::OK();
+ }
+};
+
+using KVMap = std::map<std::string, std::string>;
+
+class KVIter : public Iterator {
+ public:
+ explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {}
+ bool Valid() const override { return iter_ != map_->end(); }
+ void SeekToFirst() override { iter_ = map_->begin(); }
+ void SeekToLast() override {
+ if (map_->empty()) {
+ iter_ = map_->end();
+ } else {
+ iter_ = map_->find(map_->rbegin()->first);
+ }
+ }
+ void Seek(const Slice& k) override {
+ iter_ = map_->lower_bound(k.ToString());
+ }
+ void SeekForPrev(const Slice& k) override {
+ iter_ = map_->upper_bound(k.ToString());
+ Prev();
+ }
+ void Next() override { ++iter_; }
+ void Prev() override {
+ if (iter_ == map_->begin()) {
+ iter_ = map_->end();
+ return;
+ }
+ --iter_;
+ }
+ Slice key() const override { return iter_->first; }
+ Slice value() const override { return iter_->second; }
+ Status status() const override { return Status::OK(); }
+
+ private:
+ const KVMap* const map_;
+ KVMap::const_iterator iter_;
+};
+
+static std::string PrintContents(WriteBatchWithIndex* batch,
+ ColumnFamilyHandle* column_family,
+ bool hex = false) {
+ std::string result;
+
+ WBWIIterator* iter;
+ if (column_family == nullptr) {
+ iter = batch->NewIterator();
+ } else {
+ iter = batch->NewIterator(column_family);
+ }
+
+ iter->SeekToFirst();
+ while (iter->Valid()) {
+ WriteEntry e = iter->Entry();
+
+ if (e.type == kPutRecord) {
+ result.append("PUT(");
+ result.append(e.key.ToString(hex));
+ result.append("):");
+ result.append(e.value.ToString(hex));
+ } else if (e.type == kMergeRecord) {
+ result.append("MERGE(");
+ result.append(e.key.ToString(hex));
+ result.append("):");
+ result.append(e.value.ToString(hex));
+ } else if (e.type == kSingleDeleteRecord) {
+ result.append("SINGLE-DEL(");
+ result.append(e.key.ToString(hex));
+ result.append(")");
+ } else {
+ assert(e.type == kDeleteRecord);
+ result.append("DEL(");
+ result.append(e.key.ToString(hex));
+ result.append(")");
+ }
+
+ result.append(",");
+ iter->Next();
+ }
+
+ delete iter;
+ return result;
+}
+
+static std::string PrintContents(WriteBatchWithIndex* batch, KVMap* base_map,
+ ColumnFamilyHandle* column_family) {
+ std::string result;
+
+ Iterator* iter;
+ if (column_family == nullptr) {
+ iter = batch->NewIteratorWithBase(new KVIter(base_map));
+ } else {
+ iter = batch->NewIteratorWithBase(column_family, new KVIter(base_map));
+ }
+
+ iter->SeekToFirst();
+ while (iter->Valid()) {
+ assert(iter->status().ok());
+
+ Slice key = iter->key();
+ Slice value = iter->value();
+
+ result.append(key.ToString());
+ result.append(":");
+ result.append(value.ToString());
+ result.append(",");
+
+ iter->Next();
+ }
+
+ delete iter;
+ return result;
+}
+
+void AssertIter(Iterator* iter, const std::string& key,
+ const std::string& value) {
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(key, iter->key().ToString());
+ ASSERT_EQ(value, iter->value().ToString());
+}
+
+void AssertItersMatch(Iterator* iter1, Iterator* iter2) {
+ ASSERT_EQ(iter1->Valid(), iter2->Valid());
+ if (iter1->Valid()) {
+ ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString());
+ ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString());
+ }
+}
+
+void AssertItersEqual(Iterator* iter1, Iterator* iter2) {
+ iter1->SeekToFirst();
+ iter2->SeekToFirst();
+ while (iter1->Valid()) {
+ ASSERT_EQ(iter1->Valid(), iter2->Valid());
+ ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString());
+ ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString());
+ iter1->Next();
+ iter2->Next();
+ }
+ ASSERT_EQ(iter1->Valid(), iter2->Valid());
+}
+
+void AssertIterEqual(WBWIIteratorImpl* wbwii,
+ const std::vector<std::string>& keys) {
+ wbwii->SeekToFirst();
+ for (auto k : keys) {
+ ASSERT_TRUE(wbwii->Valid());
+ ASSERT_EQ(wbwii->Entry().key, k);
+ wbwii->NextKey();
+ }
+ ASSERT_FALSE(wbwii->Valid());
+ wbwii->SeekToLast();
+ for (auto kit = keys.rbegin(); kit != keys.rend(); ++kit) {
+ ASSERT_TRUE(wbwii->Valid());
+ ASSERT_EQ(wbwii->Entry().key, *kit);
+ wbwii->PrevKey();
+ }
+ ASSERT_FALSE(wbwii->Valid());
+}
+} // namespace
+
+class WBWIBaseTest : public testing::Test {
+ public:
+ explicit WBWIBaseTest(bool overwrite) : db_(nullptr) {
+ options_.merge_operator =
+ MergeOperators::CreateFromStringId("stringappend");
+ options_.create_if_missing = true;
+ dbname_ = test::PerThreadDBPath("write_batch_with_index_test");
+ EXPECT_OK(DestroyDB(dbname_, options_));
+ batch_.reset(new WriteBatchWithIndex(BytewiseComparator(), 20, overwrite));
+ }
+
+ virtual ~WBWIBaseTest() {
+ if (db_ != nullptr) {
+ ReleaseSnapshot();
+ delete db_;
+ EXPECT_OK(DestroyDB(dbname_, options_));
+ }
+ }
+
+ std::string AddToBatch(ColumnFamilyHandle* cf, const std::string& key) {
+ std::string result;
+ for (size_t i = 0; i < key.size(); i++) {
+ if (key[i] == 'd') {
+ batch_->Delete(cf, key);
+ result = "";
+ } else if (key[i] == 'p') {
+ result = key + std::to_string(i);
+ batch_->Put(cf, key, result);
+ } else if (key[i] == 'm') {
+ std::string value = key + std::to_string(i);
+ batch_->Merge(cf, key, value);
+ if (result.empty()) {
+ result = value;
+ } else {
+ result = result + "," + value;
+ }
+ }
+ }
+ return result;
+ }
+
+ virtual Status OpenDB() { return DB::Open(options_, dbname_, &db_); }
+
+ void ReleaseSnapshot() {
+ if (read_opts_.snapshot != nullptr) {
+ EXPECT_NE(db_, nullptr);
+ db_->ReleaseSnapshot(read_opts_.snapshot);
+ read_opts_.snapshot = nullptr;
+ }
+ }
+
+ public:
+ DB* db_;
+ std::string dbname_;
+ Options options_;
+ WriteOptions write_opts_;
+ ReadOptions read_opts_;
+ std::unique_ptr<WriteBatchWithIndex> batch_;
+};
+
+class WBWIKeepTest : public WBWIBaseTest {
+ public:
+ WBWIKeepTest() : WBWIBaseTest(false) {}
+};
+
+class WBWIOverwriteTest : public WBWIBaseTest {
+ public:
+ WBWIOverwriteTest() : WBWIBaseTest(true) {}
+};
+class WriteBatchWithIndexTest : public WBWIBaseTest,
+ public testing::WithParamInterface<bool> {
+ public:
+ WriteBatchWithIndexTest() : WBWIBaseTest(GetParam()) {}
+};
+
+void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
+ WriteBatchWithIndex* batch) {
+ // In this test, we insert <key, value> to column family `data`, and
+ // <value, key> to column family `index`. Then iterator them in order
+ // and seek them by key.
+
+ // Sort entries by key
+ std::map<std::string, std::vector<Entry*>> data_map;
+ // Sort entries by value
+ std::map<std::string, std::vector<Entry*>> index_map;
+ for (auto& e : entries) {
+ data_map[e.key].push_back(&e);
+ index_map[e.value].push_back(&e);
+ }
+
+ ColumnFamilyHandleImplDummy data(6, BytewiseComparator());
+ ColumnFamilyHandleImplDummy index(8, BytewiseComparator());
+ for (auto& e : entries) {
+ if (e.type == kPutRecord) {
+ ASSERT_OK(batch->Put(&data, e.key, e.value));
+ ASSERT_OK(batch->Put(&index, e.value, e.key));
+ } else if (e.type == kMergeRecord) {
+ ASSERT_OK(batch->Merge(&data, e.key, e.value));
+ ASSERT_OK(batch->Put(&index, e.value, e.key));
+ } else {
+ assert(e.type == kDeleteRecord);
+ std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
+ iter->Seek(e.key);
+ ASSERT_OK(iter->status());
+ auto write_entry = iter->Entry();
+ ASSERT_EQ(e.key, write_entry.key.ToString());
+ ASSERT_EQ(e.value, write_entry.value.ToString());
+ ASSERT_OK(batch->Delete(&data, e.key));
+ ASSERT_OK(batch->Put(&index, e.value, ""));
+ }
+ }
+
+ // Iterator all keys
+ {
+ std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
+ for (int seek_to_first : {0, 1}) {
+ if (seek_to_first) {
+ iter->SeekToFirst();
+ } else {
+ iter->Seek("");
+ }
+ for (auto pair : data_map) {
+ for (auto v : pair.second) {
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ auto write_entry = iter->Entry();
+ ASSERT_EQ(pair.first, write_entry.key.ToString());
+ ASSERT_EQ(v->type, write_entry.type);
+ if (write_entry.type != kDeleteRecord) {
+ ASSERT_EQ(v->value, write_entry.value.ToString());
+ }
+ iter->Next();
+ }
+ }
+ ASSERT_TRUE(!iter->Valid());
+ }
+ iter->SeekToLast();
+ for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) {
+ for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) {
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ auto write_entry = iter->Entry();
+ ASSERT_EQ(pair->first, write_entry.key.ToString());
+ ASSERT_EQ((*v)->type, write_entry.type);
+ if (write_entry.type != kDeleteRecord) {
+ ASSERT_EQ((*v)->value, write_entry.value.ToString());
+ }
+ iter->Prev();
+ }
+ }
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ // Iterator all indexes
+ {
+ std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&index));
+ for (int seek_to_first : {0, 1}) {
+ if (seek_to_first) {
+ iter->SeekToFirst();
+ } else {
+ iter->Seek("");
+ }
+ for (auto pair : index_map) {
+ for (auto v : pair.second) {
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ auto write_entry = iter->Entry();
+ ASSERT_EQ(pair.first, write_entry.key.ToString());
+ if (v->type != kDeleteRecord) {
+ ASSERT_EQ(v->key, write_entry.value.ToString());
+ ASSERT_EQ(v->value, write_entry.key.ToString());
+ }
+ iter->Next();
+ }
+ }
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ iter->SeekToLast();
+ for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) {
+ for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) {
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ auto write_entry = iter->Entry();
+ ASSERT_EQ(pair->first, write_entry.key.ToString());
+ if ((*v)->type != kDeleteRecord) {
+ ASSERT_EQ((*v)->key, write_entry.value.ToString());
+ ASSERT_EQ((*v)->value, write_entry.key.ToString());
+ }
+ iter->Prev();
+ }
+ }
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ // Seek to every key
+ {
+ std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
+
+ // Seek the keys one by one in reverse order
+ for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) {
+ iter->Seek(pair->first);
+ ASSERT_OK(iter->status());
+ for (auto v : pair->second) {
+ ASSERT_TRUE(iter->Valid());
+ auto write_entry = iter->Entry();
+ ASSERT_EQ(pair->first, write_entry.key.ToString());
+ ASSERT_EQ(v->type, write_entry.type);
+ if (write_entry.type != kDeleteRecord) {
+ ASSERT_EQ(v->value, write_entry.value.ToString());
+ }
+ iter->Next();
+ ASSERT_OK(iter->status());
+ }
+ }
+ }
+
+ // Seek to every index
+ {
+ std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&index));
+
+ // Seek the keys one by one in reverse order
+ for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) {
+ iter->Seek(pair->first);
+ ASSERT_OK(iter->status());
+ for (auto v : pair->second) {
+ ASSERT_TRUE(iter->Valid());
+ auto write_entry = iter->Entry();
+ ASSERT_EQ(pair->first, write_entry.key.ToString());
+ ASSERT_EQ(v->value, write_entry.key.ToString());
+ if (v->type != kDeleteRecord) {
+ ASSERT_EQ(v->key, write_entry.value.ToString());
+ }
+ iter->Next();
+ ASSERT_OK(iter->status());
+ }
+ }
+ }
+
+ // Verify WriteBatch can be iterated
+ TestHandler handler;
+ ASSERT_OK(batch->GetWriteBatch()->Iterate(&handler));
+
+ // Verify data column family
+ {
+ ASSERT_EQ(entries.size(), handler.seen[data.GetID()].size());
+ size_t i = 0;
+ for (auto e : handler.seen[data.GetID()]) {
+ auto write_entry = entries[i++];
+ ASSERT_EQ(e.type, write_entry.type);
+ ASSERT_EQ(e.key, write_entry.key);
+ if (e.type != kDeleteRecord) {
+ ASSERT_EQ(e.value, write_entry.value);
+ }
+ }
+ }
+
+ // Verify index column family
+ {
+ ASSERT_EQ(entries.size(), handler.seen[index.GetID()].size());
+ size_t i = 0;
+ for (auto e : handler.seen[index.GetID()]) {
+ auto write_entry = entries[i++];
+ ASSERT_EQ(e.key, write_entry.value);
+ if (write_entry.type != kDeleteRecord) {
+ ASSERT_EQ(e.value, write_entry.key);
+ }
+ }
+ }
+}
+
+TEST_F(WBWIKeepTest, TestValueAsSecondaryIndex) {
+ Entry entries[] = {
+ {"aaa", "0005", kPutRecord}, {"b", "0002", kPutRecord},
+ {"cdd", "0002", kMergeRecord}, {"aab", "00001", kPutRecord},
+ {"cc", "00005", kPutRecord}, {"cdd", "0002", kPutRecord},
+ {"aab", "0003", kPutRecord}, {"cc", "00005", kDeleteRecord},
+ };
+ std::vector<Entry> entries_list(entries, entries + 8);
+
+ batch_.reset(new WriteBatchWithIndex(nullptr, 20, false));
+
+ TestValueAsSecondaryIndexHelper(entries_list, batch_.get());
+
+ // Clear batch and re-run test with new values
+ batch_->Clear();
+
+ Entry new_entries[] = {
+ {"aaa", "0005", kPutRecord}, {"e", "0002", kPutRecord},
+ {"add", "0002", kMergeRecord}, {"aab", "00001", kPutRecord},
+ {"zz", "00005", kPutRecord}, {"add", "0002", kPutRecord},
+ {"aab", "0003", kPutRecord}, {"zz", "00005", kDeleteRecord},
+ };
+
+ entries_list = std::vector<Entry>(new_entries, new_entries + 8);
+
+ TestValueAsSecondaryIndexHelper(entries_list, batch_.get());
+}
+
+TEST_P(WriteBatchWithIndexTest, TestComparatorForCF) {
+ ColumnFamilyHandleImplDummy cf1(6, nullptr);
+ ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator());
+ ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator());
+
+ ASSERT_OK(batch_->Put(&cf1, "ddd", ""));
+ ASSERT_OK(batch_->Put(&cf2, "aaa", ""));
+ ASSERT_OK(batch_->Put(&cf2, "eee", ""));
+ ASSERT_OK(batch_->Put(&cf1, "ccc", ""));
+ ASSERT_OK(batch_->Put(&reverse_cf, "a11", ""));
+ ASSERT_OK(batch_->Put(&cf1, "bbb", ""));
+
+ Slice key_slices[] = {"a", "3", "3"};
+ Slice value_slice = "";
+ ASSERT_OK(batch_->Put(&reverse_cf, SliceParts(key_slices, 3),
+ SliceParts(&value_slice, 1)));
+ ASSERT_OK(batch_->Put(&reverse_cf, "a22", ""));
+
+ {
+ std::unique_ptr<WBWIIterator> iter(batch_->NewIterator(&cf1));
+ iter->Seek("");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("bbb", iter->Entry().key.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("ccc", iter->Entry().key.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("ddd", iter->Entry().key.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ {
+ std::unique_ptr<WBWIIterator> iter(batch_->NewIterator(&cf2));
+ iter->Seek("");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("aaa", iter->Entry().key.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("eee", iter->Entry().key.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ {
+ std::unique_ptr<WBWIIterator> iter(batch_->NewIterator(&reverse_cf));
+ iter->Seek("");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->Seek("z");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a33", iter->Entry().key.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a22", iter->Entry().key.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a11", iter->Entry().key.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->Seek("a22");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a22", iter->Entry().key.ToString());
+
+ iter->Seek("a13");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a11", iter->Entry().key.ToString());
+ }
+}
+
+TEST_F(WBWIOverwriteTest, TestOverwriteKey) {
+ ColumnFamilyHandleImplDummy cf1(6, nullptr);
+ ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator());
+ ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator());
+
+ ASSERT_OK(batch_->Merge(&cf1, "ddd", ""));
+ ASSERT_OK(batch_->Put(&cf1, "ddd", ""));
+ ASSERT_OK(batch_->Delete(&cf1, "ddd"));
+ ASSERT_OK(batch_->Put(&cf2, "aaa", ""));
+ ASSERT_OK(batch_->Delete(&cf2, "aaa"));
+ ASSERT_OK(batch_->Put(&cf2, "aaa", "aaa"));
+ ASSERT_OK(batch_->Put(&cf2, "eee", "eee"));
+ ASSERT_OK(batch_->Put(&cf1, "ccc", ""));
+ ASSERT_OK(batch_->Put(&reverse_cf, "a11", ""));
+ ASSERT_OK(batch_->Delete(&cf1, "ccc"));
+ ASSERT_OK(batch_->Put(&reverse_cf, "a33", "a33"));
+ ASSERT_OK(batch_->Put(&reverse_cf, "a11", "a11"));
+ Slice slices[] = {"a", "3", "3"};
+ ASSERT_OK(batch_->Delete(&reverse_cf, SliceParts(slices, 3)));
+
+ {
+ std::unique_ptr<WBWIIterator> iter(batch_->NewIterator(&cf1));
+ iter->Seek("");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("ccc", iter->Entry().key.ToString());
+ ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("ddd", iter->Entry().key.ToString());
+ ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ {
+ std::unique_ptr<WBWIIterator> iter(batch_->NewIterator(&cf2));
+ iter->SeekToLast();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("eee", iter->Entry().key.ToString());
+ ASSERT_EQ("eee", iter->Entry().value.ToString());
+ iter->Prev();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("aaa", iter->Entry().key.ToString());
+ ASSERT_EQ("aaa", iter->Entry().value.ToString());
+ iter->Prev();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->SeekToFirst();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("aaa", iter->Entry().key.ToString());
+ ASSERT_EQ("aaa", iter->Entry().value.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("eee", iter->Entry().key.ToString());
+ ASSERT_EQ("eee", iter->Entry().value.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ {
+ std::unique_ptr<WBWIIterator> iter(batch_->NewIterator(&reverse_cf));
+ iter->Seek("");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->Seek("z");
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a33", iter->Entry().key.ToString());
+ ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a11", iter->Entry().key.ToString());
+ ASSERT_EQ("a11", iter->Entry().value.ToString());
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->SeekToLast();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a11", iter->Entry().key.ToString());
+ ASSERT_EQ("a11", iter->Entry().value.ToString());
+ iter->Prev();
+
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ("a33", iter->Entry().key.ToString());
+ ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
+ iter->Prev();
+ ASSERT_TRUE(!iter->Valid());
+ }
+}
+
+TEST_P(WriteBatchWithIndexTest, TestWBWIIterator) {
+ ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
+ ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
+ ASSERT_OK(batch_->Put(&cf1, "a", "a1"));
+ ASSERT_OK(batch_->Put(&cf1, "c", "c1"));
+ ASSERT_OK(batch_->Put(&cf1, "c", "c2"));
+ ASSERT_OK(batch_->Put(&cf1, "e", "e1"));
+ ASSERT_OK(batch_->Put(&cf1, "e", "e2"));
+ ASSERT_OK(batch_->Put(&cf1, "e", "e3"));
+ std::unique_ptr<WBWIIteratorImpl> iter1(
+ static_cast<WBWIIteratorImpl*>(batch_->NewIterator(&cf1)));
+ std::unique_ptr<WBWIIteratorImpl> iter2(
+ static_cast<WBWIIteratorImpl*>(batch_->NewIterator(&cf2)));
+ AssertIterEqual(iter1.get(), {"a", "c", "e"});
+ AssertIterEqual(iter2.get(), {});
+ ASSERT_OK(batch_->Put(&cf2, "a", "a2"));
+ ASSERT_OK(batch_->Merge(&cf2, "b", "b1"));
+ ASSERT_OK(batch_->Merge(&cf2, "b", "b2"));
+ ASSERT_OK(batch_->Delete(&cf2, "d"));
+ ASSERT_OK(batch_->Merge(&cf2, "d", "d2"));
+ ASSERT_OK(batch_->Merge(&cf2, "d", "d3"));
+ ASSERT_OK(batch_->Delete(&cf2, "f"));
+ AssertIterEqual(iter1.get(), {"a", "c", "e"});
+ AssertIterEqual(iter2.get(), {"a", "b", "d", "f"});
+}
+
+TEST_P(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
+ std::vector<std::string> source_strings = {"a", "b", "c", "d", "e",
+ "f", "g", "h", "i", "j"};
+ for (int rand_seed = 301; rand_seed < 366; rand_seed++) {
+ Random rnd(rand_seed);
+
+ ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
+ ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
+ ColumnFamilyHandleImplDummy cf3(8, BytewiseComparator());
+ batch_->Clear();
+
+ if (rand_seed % 2 == 0) {
+ ASSERT_OK(batch_->Put(&cf2, "zoo", "bar"));
+ }
+ if (rand_seed % 4 == 1) {
+ ASSERT_OK(batch_->Put(&cf3, "zoo", "bar"));
+ }
+
+ KVMap map;
+ KVMap merged_map;
+ for (auto key : source_strings) {
+ std::string value = key + key;
+ int type = rnd.Uniform(6);
+ switch (type) {
+ case 0:
+ // only base has it
+ map[key] = value;
+ merged_map[key] = value;
+ break;
+ case 1:
+ // only delta has it
+ ASSERT_OK(batch_->Put(&cf1, key, value));
+ map[key] = value;
+ merged_map[key] = value;
+ break;
+ case 2:
+ // both has it. Delta should win
+ ASSERT_OK(batch_->Put(&cf1, key, value));
+ map[key] = "wrong_value";
+ merged_map[key] = value;
+ break;
+ case 3:
+ // both has it. Delta is delete
+ ASSERT_OK(batch_->Delete(&cf1, key));
+ map[key] = "wrong_value";
+ break;
+ case 4:
+ // only delta has it. Delta is delete
+ ASSERT_OK(batch_->Delete(&cf1, key));
+ map[key] = "wrong_value";
+ break;
+ default:
+ // Neither iterator has it.
+ break;
+ }
+ }
+
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&map)));
+ std::unique_ptr<Iterator> result_iter(new KVIter(&merged_map));
+
+ bool is_valid = false;
+ for (int i = 0; i < 128; i++) {
+ // Random walk and make sure iter and result_iter returns the
+ // same key and value
+ int type = rnd.Uniform(6);
+ ASSERT_OK(iter->status());
+ switch (type) {
+ case 0:
+ // Seek to First
+ iter->SeekToFirst();
+ result_iter->SeekToFirst();
+ break;
+ case 1:
+ // Seek to last
+ iter->SeekToLast();
+ result_iter->SeekToLast();
+ break;
+ case 2: {
+ // Seek to random key
+ auto key_idx = rnd.Uniform(static_cast<int>(source_strings.size()));
+ auto key = source_strings[key_idx];
+ iter->Seek(key);
+ result_iter->Seek(key);
+ break;
+ }
+ case 3: {
+ // SeekForPrev to random key
+ auto key_idx = rnd.Uniform(static_cast<int>(source_strings.size()));
+ auto key = source_strings[key_idx];
+ iter->SeekForPrev(key);
+ result_iter->SeekForPrev(key);
+ break;
+ }
+ case 4:
+ // Next
+ if (is_valid) {
+ iter->Next();
+ result_iter->Next();
+ } else {
+ continue;
+ }
+ break;
+ default:
+ assert(type == 5);
+ // Prev
+ if (is_valid) {
+ iter->Prev();
+ result_iter->Prev();
+ } else {
+ continue;
+ }
+ break;
+ }
+ AssertItersMatch(iter.get(), result_iter.get());
+ is_valid = iter->Valid();
+ }
+
+ ASSERT_OK(iter->status());
+ }
+}
+
+TEST_P(WriteBatchWithIndexTest, TestIteraratorWithBase) {
+ ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
+ ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
+ {
+ KVMap map;
+ map["a"] = "aa";
+ map["c"] = "cc";
+ map["e"] = "ee";
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&map)));
+
+ iter->SeekToFirst();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Next();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Next();
+ AssertIter(iter.get(), "e", "ee");
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->SeekToLast();
+ AssertIter(iter.get(), "e", "ee");
+ iter->Prev();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Prev();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Prev();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->Seek("b");
+ AssertIter(iter.get(), "c", "cc");
+
+ iter->Prev();
+ AssertIter(iter.get(), "a", "aa");
+
+ iter->Seek("a");
+ AssertIter(iter.get(), "a", "aa");
+ }
+
+ // Test the case that there is one element in the write batch
+ ASSERT_OK(batch_->Put(&cf2, "zoo", "bar"));
+ ASSERT_OK(batch_->Put(&cf1, "a", "aa"));
+ {
+ KVMap empty_map;
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
+
+ iter->SeekToFirst();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ ASSERT_OK(batch_->Delete(&cf1, "b"));
+ ASSERT_OK(batch_->Put(&cf1, "c", "cc"));
+ ASSERT_OK(batch_->Put(&cf1, "d", "dd"));
+ ASSERT_OK(batch_->Delete(&cf1, "e"));
+
+ {
+ KVMap map;
+ map["b"] = "";
+ map["cc"] = "cccc";
+ map["f"] = "ff";
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&map)));
+
+ iter->SeekToFirst();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Next();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Next();
+ AssertIter(iter.get(), "cc", "cccc");
+ iter->Next();
+ AssertIter(iter.get(), "d", "dd");
+ iter->Next();
+ AssertIter(iter.get(), "f", "ff");
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->SeekToLast();
+ AssertIter(iter.get(), "f", "ff");
+ iter->Prev();
+ AssertIter(iter.get(), "d", "dd");
+ iter->Prev();
+ AssertIter(iter.get(), "cc", "cccc");
+ iter->Prev();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Next();
+ AssertIter(iter.get(), "cc", "cccc");
+ iter->Prev();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Prev();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Prev();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->Seek("c");
+ AssertIter(iter.get(), "c", "cc");
+
+ iter->Seek("cb");
+ AssertIter(iter.get(), "cc", "cccc");
+
+ iter->Seek("cc");
+ AssertIter(iter.get(), "cc", "cccc");
+ iter->Next();
+ AssertIter(iter.get(), "d", "dd");
+
+ iter->Seek("e");
+ AssertIter(iter.get(), "f", "ff");
+
+ iter->Prev();
+ AssertIter(iter.get(), "d", "dd");
+
+ iter->Next();
+ AssertIter(iter.get(), "f", "ff");
+ }
+
+ {
+ KVMap empty_map;
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
+
+ iter->SeekToFirst();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Next();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Next();
+ AssertIter(iter.get(), "d", "dd");
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->SeekToLast();
+ AssertIter(iter.get(), "d", "dd");
+ iter->Prev();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Prev();
+ AssertIter(iter.get(), "a", "aa");
+
+ iter->Prev();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->Seek("aa");
+ AssertIter(iter.get(), "c", "cc");
+ iter->Next();
+ AssertIter(iter.get(), "d", "dd");
+
+ iter->Seek("ca");
+ AssertIter(iter.get(), "d", "dd");
+
+ iter->Prev();
+ AssertIter(iter.get(), "c", "cc");
+ }
+}
+
+TEST_P(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) {
+ ColumnFamilyHandleImplDummy cf1(6, ReverseBytewiseComparator());
+ ColumnFamilyHandleImplDummy cf2(2, ReverseBytewiseComparator());
+
+ // Test the case that there is one element in the write batch
+ ASSERT_OK(batch_->Put(&cf2, "zoo", "bar"));
+ ASSERT_OK(batch_->Put(&cf1, "a", "aa"));
+ {
+ KVMap empty_map;
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
+
+ iter->SeekToFirst();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+ }
+
+ ASSERT_OK(batch_->Put(&cf1, "c", "cc"));
+ {
+ KVMap map;
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&map)));
+
+ iter->SeekToFirst();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Next();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->SeekToLast();
+ AssertIter(iter.get(), "a", "aa");
+ iter->Prev();
+ AssertIter(iter.get(), "c", "cc");
+ iter->Prev();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->Seek("b");
+ AssertIter(iter.get(), "a", "aa");
+
+ iter->Prev();
+ AssertIter(iter.get(), "c", "cc");
+
+ iter->Seek("a");
+ AssertIter(iter.get(), "a", "aa");
+ }
+
+ // default column family
+ ASSERT_OK(batch_->Put("a", "b"));
+ {
+ KVMap map;
+ map["b"] = "";
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(new KVIter(&map)));
+
+ iter->SeekToFirst();
+ AssertIter(iter.get(), "a", "b");
+ iter->Next();
+ AssertIter(iter.get(), "b", "");
+ iter->Next();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->SeekToLast();
+ AssertIter(iter.get(), "b", "");
+ iter->Prev();
+ AssertIter(iter.get(), "a", "b");
+ iter->Prev();
+ ASSERT_OK(iter->status());
+ ASSERT_TRUE(!iter->Valid());
+
+ iter->Seek("b");
+ AssertIter(iter.get(), "b", "");
+
+ iter->Prev();
+ AssertIter(iter.get(), "a", "b");
+
+ iter->Seek("0");
+ AssertIter(iter.get(), "a", "b");
+ }
+}
+
+TEST_P(WriteBatchWithIndexTest, TestGetFromBatch) {
+ Options options;
+ Status s;
+ std::string value;
+
+ s = batch_->GetFromBatch(options_, "b", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(batch_->Put("a", "a"));
+ ASSERT_OK(batch_->Put("b", "b"));
+ ASSERT_OK(batch_->Put("c", "c"));
+ ASSERT_OK(batch_->Put("a", "z"));
+ ASSERT_OK(batch_->Delete("c"));
+ ASSERT_OK(batch_->Delete("d"));
+ ASSERT_OK(batch_->Delete("e"));
+ ASSERT_OK(batch_->Put("e", "e"));
+
+ s = batch_->GetFromBatch(options_, "b", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b", value);
+
+ s = batch_->GetFromBatch(options_, "a", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("z", value);
+
+ s = batch_->GetFromBatch(options_, "c", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = batch_->GetFromBatch(options_, "d", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = batch_->GetFromBatch(options_, "x", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = batch_->GetFromBatch(options_, "e", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("e", value);
+
+ ASSERT_OK(batch_->Merge("z", "z"));
+
+ s = batch_->GetFromBatch(options_, "z", &value);
+ ASSERT_NOK(s); // No merge operator specified.
+
+ s = batch_->GetFromBatch(options_, "b", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b", value);
+}
+
+TEST_P(WriteBatchWithIndexTest, TestGetFromBatchMerge) {
+ Status s = OpenDB();
+ ASSERT_OK(s);
+
+ ColumnFamilyHandle* column_family = db_->DefaultColumnFamily();
+ std::string value;
+
+ s = batch_->GetFromBatch(options_, "x", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(batch_->Put("x", "X"));
+ std::string expected = "X";
+
+ for (int i = 0; i < 5; i++) {
+ ASSERT_OK(batch_->Merge("x", std::to_string(i)));
+ expected = expected + "," + std::to_string(i);
+
+ if (i % 2 == 0) {
+ ASSERT_OK(batch_->Put("y", std::to_string(i / 2)));
+ }
+
+ ASSERT_OK(batch_->Merge("z", "z"));
+
+ s = batch_->GetFromBatch(column_family, options_, "x", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(expected, value);
+
+ s = batch_->GetFromBatch(column_family, options_, "y", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ(std::to_string(i / 2), value);
+
+ s = batch_->GetFromBatch(column_family, options_, "z", &value);
+ ASSERT_TRUE(s.IsMergeInProgress());
+ }
+}
+
+TEST_F(WBWIOverwriteTest, TestGetFromBatchMerge2) {
+ Status s = OpenDB();
+ ASSERT_OK(s);
+
+ ColumnFamilyHandle* column_family = db_->DefaultColumnFamily();
+ std::string value;
+
+ s = batch_->GetFromBatch(column_family, options_, "X", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(batch_->Put(column_family, "X", "x"));
+ ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value));
+ ASSERT_EQ("x", value);
+
+ ASSERT_OK(batch_->Put(column_family, "X", "x2"));
+ ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value));
+ ASSERT_EQ("x2", value);
+
+ ASSERT_OK(batch_->Merge(column_family, "X", "aaa"));
+ ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value));
+ ASSERT_EQ("x2,aaa", value);
+
+ ASSERT_OK(batch_->Merge(column_family, "X", "bbb"));
+ ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value));
+ ASSERT_EQ("x2,aaa,bbb", value);
+
+ ASSERT_OK(batch_->Put(column_family, "X", "x3"));
+ ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value));
+ ASSERT_EQ("x3", value);
+
+ ASSERT_OK(batch_->Merge(column_family, "X", "ccc"));
+ ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value));
+ ASSERT_EQ("x3,ccc", value);
+
+ ASSERT_OK(batch_->Delete(column_family, "X"));
+ s = batch_->GetFromBatch(column_family, options_, "X", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ batch_->Merge(column_family, "X", "ddd");
+ ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value));
+ ASSERT_EQ("ddd", value);
+}
+
+TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDB) {
+ ASSERT_OK(OpenDB());
+
+ std::string value;
+
+ ASSERT_OK(db_->Put(write_opts_, "a", "a"));
+ ASSERT_OK(db_->Put(write_opts_, "b", "b"));
+ ASSERT_OK(db_->Put(write_opts_, "c", "c"));
+
+ ASSERT_OK(batch_->Put("a", "batch_->a"));
+ ASSERT_OK(batch_->Delete("b"));
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value));
+ ASSERT_EQ("batch_->a", value);
+
+ Status s = batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value));
+ ASSERT_EQ("c", value);
+
+ s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(db_->Delete(write_opts_, "x"));
+
+ s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value);
+ ASSERT_TRUE(s.IsNotFound());
+}
+
+TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) {
+ Status s = OpenDB();
+ ASSERT_OK(s);
+
+ std::string value;
+
+ ASSERT_OK(db_->Put(write_opts_, "a", "a0"));
+ ASSERT_OK(db_->Put(write_opts_, "b", "b0"));
+ ASSERT_OK(db_->Merge(write_opts_, "b", "b1"));
+ ASSERT_OK(db_->Merge(write_opts_, "c", "c0"));
+ ASSERT_OK(db_->Merge(write_opts_, "d", "d0"));
+
+ ASSERT_OK(batch_->Merge("a", "a1"));
+ ASSERT_OK(batch_->Merge("a", "a2"));
+ ASSERT_OK(batch_->Merge("b", "b2"));
+ ASSERT_OK(batch_->Merge("d", "d1"));
+ ASSERT_OK(batch_->Merge("e", "e0"));
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value));
+ ASSERT_EQ("a0,a1,a2", value);
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value));
+ ASSERT_EQ("b0,b1,b2", value);
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value));
+ ASSERT_EQ("c0", value);
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "d", &value));
+ ASSERT_EQ("d0,d1", value);
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value));
+ ASSERT_EQ("e0", value);
+
+ ASSERT_OK(db_->Delete(write_opts_, "x"));
+
+ s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ const Snapshot* snapshot = db_->GetSnapshot();
+ ReadOptions snapshot_read_options;
+ snapshot_read_options.snapshot = snapshot;
+
+ ASSERT_OK(db_->Delete(write_opts_, "a"));
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value));
+ ASSERT_EQ("a1,a2", value);
+
+ ASSERT_OK(
+ s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "a", &value));
+ ASSERT_EQ("a0,a1,a2", value);
+
+ ASSERT_OK(batch_->Delete("a"));
+
+ s = batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "a", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(s = db_->Merge(write_opts_, "c", "c1"));
+
+ ASSERT_OK(s = batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value));
+ ASSERT_EQ("c0,c1", value);
+
+ ASSERT_OK(
+ s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "c", &value));
+ ASSERT_EQ("c0", value);
+
+ ASSERT_OK(db_->Put(write_opts_, "e", "e1"));
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value));
+ ASSERT_EQ("e1,e0", value);
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, snapshot_read_options, "e", &value));
+ ASSERT_EQ("e0", value);
+
+ ASSERT_OK(s = db_->Delete(write_opts_, "e"));
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value));
+ ASSERT_EQ("e0", value);
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, snapshot_read_options, "e", &value));
+ ASSERT_EQ("e0", value);
+
+ db_->ReleaseSnapshot(snapshot);
+}
+
+TEST_F(WBWIOverwriteTest, TestGetFromBatchAndDBMerge2) {
+ Status s = OpenDB();
+ ASSERT_OK(s);
+
+ std::string value;
+
+ s = batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(batch_->Merge("A", "xxx"));
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value));
+ ASSERT_EQ(value, "xxx");
+
+ ASSERT_OK(batch_->Merge("A", "yyy"));
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value));
+ ASSERT_EQ(value, "xxx,yyy");
+
+ ASSERT_OK(db_->Put(write_opts_, "A", "a0"));
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value));
+ ASSERT_EQ(value, "a0,xxx,yyy");
+
+ ASSERT_OK(batch_->Delete("A"));
+
+ s = batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+}
+
+TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge3) {
+ Status s = OpenDB();
+ ASSERT_OK(s);
+
+ FlushOptions flush_options;
+ std::string value;
+
+ ASSERT_OK(db_->Put(write_opts_, "A", "1"));
+ ASSERT_OK(db_->Flush(flush_options, db_->DefaultColumnFamily()));
+ ASSERT_OK(batch_->Merge("A", "2"));
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value));
+ ASSERT_EQ(value, "1,2");
+}
+
+TEST_P(WriteBatchWithIndexTest, TestPinnedGetFromBatchAndDB) {
+ Status s = OpenDB();
+ ASSERT_OK(s);
+
+ PinnableSlice value;
+
+ ASSERT_OK(db_->Put(write_opts_, "a", "a0"));
+ ASSERT_OK(db_->Put(write_opts_, "b", "b0"));
+ ASSERT_OK(db_->Merge(write_opts_, "b", "b1"));
+ ASSERT_OK(db_->Merge(write_opts_, "c", "c0"));
+ ASSERT_OK(db_->Merge(write_opts_, "d", "d0"));
+ ASSERT_OK(batch_->Merge("a", "a1"));
+ ASSERT_OK(batch_->Merge("a", "a2"));
+ ASSERT_OK(batch_->Merge("b", "b2"));
+ ASSERT_OK(batch_->Merge("d", "d1"));
+ ASSERT_OK(batch_->Merge("e", "e0"));
+
+ for (int i = 0; i < 2; i++) {
+ if (i == 1) {
+ // Do it again with a flushed DB...
+ ASSERT_OK(db_->Flush(FlushOptions(), db_->DefaultColumnFamily()));
+ }
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value));
+ ASSERT_EQ("a0,a1,a2", value.ToString());
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value));
+ ASSERT_EQ("b0,b1,b2", value.ToString());
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value));
+ ASSERT_EQ("c0", value.ToString());
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "d", &value));
+ ASSERT_EQ("d0,d1", value.ToString());
+
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value));
+ ASSERT_EQ("e0", value.ToString());
+ ASSERT_OK(db_->Delete(write_opts_, "x"));
+
+ s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ }
+}
+
+void AssertKey(std::string key, WBWIIterator* iter) {
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(key, iter->Entry().key.ToString());
+}
+
+void AssertValue(std::string value, WBWIIterator* iter) {
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(value, iter->Entry().value.ToString());
+}
+
+// Tests that we can write to the WBWI while we iterate (from a single thread).
+// iteration should see the newest writes
+TEST_F(WBWIOverwriteTest, MutateWhileIteratingCorrectnessTest) {
+ for (char c = 'a'; c <= 'z'; ++c) {
+ ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c)));
+ }
+
+ std::unique_ptr<WBWIIterator> iter(batch_->NewIterator());
+ iter->Seek("k");
+ AssertKey("k", iter.get());
+ iter->Next();
+ AssertKey("l", iter.get());
+ ASSERT_OK(batch_->Put("ab", "cc"));
+ iter->Next();
+ AssertKey("m", iter.get());
+ ASSERT_OK(batch_->Put("mm", "kk"));
+ iter->Next();
+ AssertKey("mm", iter.get());
+ AssertValue("kk", iter.get());
+ ASSERT_OK(batch_->Delete("mm"));
+
+ iter->Next();
+ AssertKey("n", iter.get());
+ iter->Prev();
+ AssertKey("mm", iter.get());
+ ASSERT_EQ(kDeleteRecord, iter->Entry().type);
+
+ iter->Seek("ab");
+ AssertKey("ab", iter.get());
+ ASSERT_OK(batch_->Delete("x"));
+ iter->Seek("x");
+ AssertKey("x", iter.get());
+ ASSERT_EQ(kDeleteRecord, iter->Entry().type);
+ iter->Prev();
+ AssertKey("w", iter.get());
+}
+
+void AssertIterKey(std::string key, Iterator* iter) {
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(key, iter->key().ToString());
+}
+
+void AssertIterValue(std::string value, Iterator* iter) {
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(value, iter->value().ToString());
+}
+
+// same thing as above, but testing IteratorWithBase
+TEST_F(WBWIOverwriteTest, MutateWhileIteratingBaseCorrectnessTest) {
+ WriteBatchWithIndex batch(BytewiseComparator(), 0, true);
+ for (char c = 'a'; c <= 'z'; ++c) {
+ ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c)));
+ }
+
+ KVMap map;
+ map["aa"] = "aa";
+ map["cc"] = "cc";
+ map["ee"] = "ee";
+ map["em"] = "me";
+
+ std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(new KVIter(&map)));
+ iter->Seek("k");
+ AssertIterKey("k", iter.get());
+ iter->Next();
+ AssertIterKey("l", iter.get());
+ ASSERT_OK(batch_->Put("ab", "cc"));
+ iter->Next();
+ AssertIterKey("m", iter.get());
+ ASSERT_OK(batch_->Put("mm", "kk"));
+ iter->Next();
+ AssertIterKey("mm", iter.get());
+ AssertIterValue("kk", iter.get());
+ ASSERT_OK(batch_->Delete("mm"));
+ iter->Next();
+ AssertIterKey("n", iter.get());
+ iter->Prev();
+ // "mm" is deleted, so we're back at "m"
+ AssertIterKey("m", iter.get());
+
+ iter->Seek("ab");
+ AssertIterKey("ab", iter.get());
+ iter->Prev();
+ AssertIterKey("aa", iter.get());
+ iter->Prev();
+ AssertIterKey("a", iter.get());
+ ASSERT_OK(batch_->Delete("aa"));
+ iter->Next();
+ AssertIterKey("ab", iter.get());
+ iter->Prev();
+ AssertIterKey("a", iter.get());
+
+ ASSERT_OK(batch_->Delete("x"));
+ iter->Seek("x");
+ AssertIterKey("y", iter.get());
+ iter->Next();
+ AssertIterKey("z", iter.get());
+ iter->Prev();
+ iter->Prev();
+ AssertIterKey("w", iter.get());
+
+ ASSERT_OK(batch_->Delete("e"));
+ iter->Seek("e");
+ AssertIterKey("ee", iter.get());
+ AssertIterValue("ee", iter.get());
+ ASSERT_OK(batch_->Put("ee", "xx"));
+ // still the same value
+ AssertIterValue("ee", iter.get());
+ iter->Next();
+ AssertIterKey("em", iter.get());
+ iter->Prev();
+ // new value
+ AssertIterValue("xx", iter.get());
+
+ ASSERT_OK(iter->status());
+}
+
+// stress testing mutations with IteratorWithBase
+TEST_F(WBWIOverwriteTest, MutateWhileIteratingBaseStressTest) {
+ for (char c = 'a'; c <= 'z'; ++c) {
+ ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c)));
+ }
+
+ KVMap map;
+ for (char c = 'a'; c <= 'z'; ++c) {
+ map[std::string(2, c)] = std::string(2, c);
+ }
+
+ std::unique_ptr<Iterator> iter(batch_->NewIteratorWithBase(new KVIter(&map)));
+
+ Random rnd(301);
+ for (int i = 0; i < 1000000; ++i) {
+ int random = rnd.Uniform(8);
+ char c = static_cast<char>(rnd.Uniform(26) + 'a');
+ switch (random) {
+ case 0:
+ ASSERT_OK(batch_->Put(std::string(1, c), "xxx"));
+ break;
+ case 1:
+ ASSERT_OK(batch_->Put(std::string(2, c), "xxx"));
+ break;
+ case 2:
+ ASSERT_OK(batch_->Delete(std::string(1, c)));
+ break;
+ case 3:
+ ASSERT_OK(batch_->Delete(std::string(2, c)));
+ break;
+ case 4:
+ iter->Seek(std::string(1, c));
+ break;
+ case 5:
+ iter->Seek(std::string(2, c));
+ break;
+ case 6:
+ if (iter->Valid()) {
+ iter->Next();
+ }
+ break;
+ case 7:
+ if (iter->Valid()) {
+ iter->Prev();
+ }
+ break;
+ default:
+ assert(false);
+ }
+ }
+ ASSERT_OK(iter->status());
+}
+
+TEST_P(WriteBatchWithIndexTest, TestNewIteratorWithBaseFromWbwi) {
+ ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
+ KVMap map;
+ map["a"] = "aa";
+ map["c"] = "cc";
+ map["e"] = "ee";
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&map)));
+ ASSERT_NE(nullptr, iter);
+ iter->SeekToFirst();
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_OK(iter->status());
+}
+
+TEST_P(WriteBatchWithIndexTest, SavePointTest) {
+ ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator());
+ KVMap empty_map;
+ std::unique_ptr<Iterator> cf0_iter(
+ batch_->NewIteratorWithBase(new KVIter(&empty_map)));
+ std::unique_ptr<Iterator> cf1_iter(
+ batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
+ Status s;
+ KVMap kvm_cf0_0 = {{"A", "aa"}, {"B", "b"}};
+ KVMap kvm_cf1_0 = {{"A", "a1"}, {"C", "c1"}, {"E", "e1"}};
+ KVIter kvi_cf0_0(&kvm_cf0_0);
+ KVIter kvi_cf1_0(&kvm_cf1_0);
+
+ ASSERT_OK(batch_->Put("A", "a"));
+ ASSERT_OK(batch_->Put("B", "b"));
+ ASSERT_OK(batch_->Put("A", "aa"));
+ ASSERT_OK(batch_->Put(&cf1, "A", "a1"));
+ ASSERT_OK(batch_->Delete(&cf1, "B"));
+ ASSERT_OK(batch_->Put(&cf1, "C", "c1"));
+ ASSERT_OK(batch_->Put(&cf1, "E", "e1"));
+
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_0);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_0);
+ batch_->SetSavePoint(); // 1
+
+ KVMap kvm_cf0_1 = {{"B", "bb"}, {"C", "cc"}};
+ KVMap kvm_cf1_1 = {{"B", "b1"}, {"C", "c1"}};
+ KVIter kvi_cf0_1(&kvm_cf0_1);
+ KVIter kvi_cf1_1(&kvm_cf1_1);
+
+ ASSERT_OK(batch_->Put("C", "cc"));
+ ASSERT_OK(batch_->Put("B", "bb"));
+ ASSERT_OK(batch_->Delete("A"));
+ ASSERT_OK(batch_->Put(&cf1, "B", "b1"));
+ ASSERT_OK(batch_->Delete(&cf1, "A"));
+ ASSERT_OK(batch_->SingleDelete(&cf1, "E"));
+ batch_->SetSavePoint(); // 2
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_1);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_1);
+
+ KVMap kvm_cf0_2 = {{"A", "xxx"}, {"C", "cc"}};
+ KVMap kvm_cf1_2 = {{"B", "b2"}};
+ KVIter kvi_cf0_2(&kvm_cf0_2);
+ KVIter kvi_cf1_2(&kvm_cf1_2);
+
+ ASSERT_OK(batch_->Put("A", "aaa"));
+ ASSERT_OK(batch_->Put("A", "xxx"));
+ ASSERT_OK(batch_->Delete("B"));
+ ASSERT_OK(batch_->Put(&cf1, "B", "b2"));
+ ASSERT_OK(batch_->Delete(&cf1, "C"));
+ batch_->SetSavePoint(); // 3
+ batch_->SetSavePoint(); // 4
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_2);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_2);
+
+ KVMap kvm_cf0_4 = {{"A", "xxx"}, {"C", "cc"}};
+ KVMap kvm_cf1_4 = {{"B", "b2"}};
+ KVIter kvi_cf0_4(&kvm_cf0_4);
+ KVIter kvi_cf1_4(&kvm_cf1_4);
+ ASSERT_OK(batch_->SingleDelete("D"));
+ ASSERT_OK(batch_->Delete(&cf1, "D"));
+ ASSERT_OK(batch_->Delete(&cf1, "E"));
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_4);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_4);
+
+ ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 4
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_2);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_2);
+
+ ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 3
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_2);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_2);
+
+ ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 2
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_1);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_1);
+
+ batch_->SetSavePoint(); // 5
+ ASSERT_OK(batch_->Put("X", "x"));
+
+ KVMap kvm_cf0_5 = {{"B", "bb"}, {"C", "cc"}, {"X", "x"}};
+ KVIter kvi_cf0_5(&kvm_cf0_5);
+ KVIter kvi_cf1_5(&kvm_cf1_1);
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_5);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_5);
+
+ ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 5
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_1);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_1);
+
+ ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 1
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_0);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_0);
+
+ s = batch_->RollbackToSavePoint(); // no savepoint found
+ ASSERT_TRUE(s.IsNotFound());
+ AssertItersEqual(cf0_iter.get(), &kvi_cf0_0);
+ AssertItersEqual(cf1_iter.get(), &kvi_cf1_0);
+
+ batch_->SetSavePoint(); // 6
+
+ batch_->Clear();
+ ASSERT_EQ("", PrintContents(batch_.get(), nullptr));
+ ASSERT_EQ("", PrintContents(batch_.get(), &cf1));
+
+ s = batch_->RollbackToSavePoint(); // rollback to 6
+ ASSERT_TRUE(s.IsNotFound());
+}
+
+TEST_P(WriteBatchWithIndexTest, SingleDeleteTest) {
+ Status s;
+ std::string value;
+
+ ASSERT_OK(batch_->SingleDelete("A"));
+
+ s = batch_->GetFromBatch(options_, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = batch_->GetFromBatch(options_, "B", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ batch_->Clear();
+ ASSERT_OK(batch_->Put("A", "a"));
+ ASSERT_OK(batch_->Put("A", "a2"));
+ ASSERT_OK(batch_->Put("B", "b"));
+ ASSERT_OK(batch_->SingleDelete("A"));
+
+ s = batch_->GetFromBatch(options_, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = batch_->GetFromBatch(options_, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b", value);
+
+ ASSERT_OK(batch_->Put("C", "c"));
+ ASSERT_OK(batch_->Put("A", "a3"));
+ ASSERT_OK(batch_->Delete("B"));
+ ASSERT_OK(batch_->SingleDelete("B"));
+ ASSERT_OK(batch_->SingleDelete("C"));
+
+ s = batch_->GetFromBatch(options_, "A", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a3", value);
+ s = batch_->GetFromBatch(options_, "B", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = batch_->GetFromBatch(options_, "C", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = batch_->GetFromBatch(options_, "D", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(batch_->Put("B", "b4"));
+ ASSERT_OK(batch_->Put("C", "c4"));
+ ASSERT_OK(batch_->Put("D", "d4"));
+ ASSERT_OK(batch_->SingleDelete("D"));
+ ASSERT_OK(batch_->SingleDelete("D"));
+ ASSERT_OK(batch_->Delete("A"));
+
+ s = batch_->GetFromBatch(options_, "A", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = batch_->GetFromBatch(options_, "B", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("b4", value);
+ s = batch_->GetFromBatch(options_, "C", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("c4", value);
+ s = batch_->GetFromBatch(options_, "D", &value);
+ ASSERT_TRUE(s.IsNotFound());
+}
+
+TEST_P(WriteBatchWithIndexTest, SingleDeleteDeltaIterTest) {
+ std::string value;
+ ASSERT_OK(batch_->Put("A", "a"));
+ ASSERT_OK(batch_->Put("A", "a2"));
+ ASSERT_OK(batch_->Put("B", "b"));
+ ASSERT_OK(batch_->SingleDelete("A"));
+ ASSERT_OK(batch_->Delete("B"));
+
+ KVMap map;
+ value = PrintContents(batch_.get(), &map, nullptr);
+ ASSERT_EQ("", value);
+
+ map["A"] = "aa";
+ map["C"] = "cc";
+ map["D"] = "dd";
+
+ ASSERT_OK(batch_->SingleDelete("B"));
+ ASSERT_OK(batch_->SingleDelete("C"));
+ ASSERT_OK(batch_->SingleDelete("Z"));
+
+ value = PrintContents(batch_.get(), &map, nullptr);
+ ASSERT_EQ("D:dd,", value);
+
+ ASSERT_OK(batch_->Put("A", "a3"));
+ ASSERT_OK(batch_->Put("B", "b3"));
+ ASSERT_OK(batch_->SingleDelete("A"));
+ ASSERT_OK(batch_->SingleDelete("A"));
+ ASSERT_OK(batch_->SingleDelete("D"));
+ ASSERT_OK(batch_->SingleDelete("D"));
+ ASSERT_OK(batch_->Delete("D"));
+
+ map["E"] = "ee";
+
+ value = PrintContents(batch_.get(), &map, nullptr);
+ ASSERT_EQ("B:b3,E:ee,", value);
+}
+
+TEST_P(WriteBatchWithIndexTest, MultiGetTest) {
+ // MultiGet a lot of keys in order to force std::vector reallocations
+ std::vector<std::string> keys;
+ for (int i = 0; i < 100; ++i) {
+ keys.emplace_back(std::to_string(i));
+ }
+
+ ASSERT_OK(OpenDB());
+ ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
+
+ // Write some data to the db for the even numbered keys
+ {
+ WriteBatch wb;
+ for (size_t i = 0; i < keys.size(); i += 2) {
+ std::string val = "val" + std::to_string(i);
+ ASSERT_OK(wb.Put(cf0, keys[i], val));
+ }
+ ASSERT_OK(db_->Write(write_opts_, &wb));
+ for (size_t i = 0; i < keys.size(); i += 2) {
+ std::string value;
+ ASSERT_OK(db_->Get(read_opts_, cf0, keys[i], &value));
+ }
+ }
+
+ // Write some data to the batch
+ for (size_t i = 0; i < keys.size(); ++i) {
+ if ((i % 5) == 0) {
+ ASSERT_OK(batch_->Delete(cf0, keys[i]));
+ } else if ((i % 7) == 0) {
+ std::string val = "new" + std::to_string(i);
+ ASSERT_OK(batch_->Put(cf0, keys[i], val));
+ }
+ if (i > 0 && (i % 3) == 0) {
+ ASSERT_OK(batch_->Merge(cf0, keys[i], "merge"));
+ }
+ }
+
+ std::vector<Slice> key_slices;
+ for (size_t i = 0; i < keys.size(); ++i) {
+ key_slices.emplace_back(keys[i]);
+ }
+ std::vector<PinnableSlice> values(keys.size());
+ std::vector<Status> statuses(keys.size());
+
+ batch_->MultiGetFromBatchAndDB(db_, read_opts_, cf0, key_slices.size(),
+ key_slices.data(), values.data(),
+ statuses.data(), false);
+ for (size_t i = 0; i < keys.size(); ++i) {
+ if (i == 0) {
+ ASSERT_TRUE(statuses[i].IsNotFound());
+ } else if ((i % 3) == 0) {
+ ASSERT_OK(statuses[i]);
+ if ((i % 5) == 0) { // Merge after Delete
+ ASSERT_EQ(values[i], "merge");
+ } else if ((i % 7) == 0) { // Merge after Put
+ std::string val = "new" + std::to_string(i);
+ ASSERT_EQ(values[i], val + ",merge");
+ } else if ((i % 2) == 0) {
+ std::string val = "val" + std::to_string(i);
+ ASSERT_EQ(values[i], val + ",merge");
+ } else {
+ ASSERT_EQ(values[i], "merge");
+ }
+ } else if ((i % 5) == 0) {
+ ASSERT_TRUE(statuses[i].IsNotFound());
+ } else if ((i % 7) == 0) {
+ ASSERT_OK(statuses[i]);
+ ASSERT_EQ(values[i], "new" + std::to_string(i));
+ } else if ((i % 2) == 0) {
+ ASSERT_OK(statuses[i]);
+ ASSERT_EQ(values[i], "val" + std::to_string(i));
+ } else {
+ ASSERT_TRUE(statuses[i].IsNotFound());
+ }
+ }
+}
+TEST_P(WriteBatchWithIndexTest, MultiGetTest2) {
+ // MultiGet a lot of keys in order to force std::vector reallocations
+ const int num_keys = 700;
+ const int keys_per_pass = 100;
+ std::vector<std::string> keys;
+ for (size_t i = 0; i < num_keys; ++i) {
+ keys.emplace_back(std::to_string(i));
+ }
+ ASSERT_OK(OpenDB());
+ ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
+
+ // Keys 0- 99 have a PUT in the batch but not DB
+ // Keys 100-199 have a PUT in the DB
+ // Keys 200-299 Have a PUT/DELETE
+ // Keys 300-399 Have a PUT/DELETE/MERGE
+ // Keys 400-499 have a PUT/MERGE
+ // Keys 500-599 have a MERGE only
+ // Keys 600-699 were never written
+ {
+ WriteBatch wb;
+ for (size_t i = 100; i < 500; i++) {
+ std::string val = std::to_string(i);
+ ASSERT_OK(wb.Put(cf0, keys[i], val));
+ }
+ ASSERT_OK(db_->Write(write_opts_, &wb));
+ }
+ ASSERT_OK(db_->Flush(FlushOptions(), cf0));
+ for (size_t i = 0; i < 100; i++) {
+ ASSERT_OK(batch_->Put(cf0, keys[i], keys[i]));
+ }
+ for (size_t i = 200; i < 400; i++) {
+ ASSERT_OK(batch_->Delete(cf0, keys[i]));
+ }
+ for (size_t i = 300; i < 600; i++) {
+ std::string val = std::to_string(i) + "m";
+ ASSERT_OK(batch_->Merge(cf0, keys[i], val));
+ }
+
+ Random rnd(301);
+ std::vector<PinnableSlice> values(keys_per_pass);
+ std::vector<Status> statuses(keys_per_pass);
+ for (int pass = 0; pass < 40; pass++) {
+ std::vector<Slice> key_slices;
+ for (size_t i = 0; i < keys_per_pass; i++) {
+ int random = rnd.Uniform(num_keys);
+ key_slices.emplace_back(keys[random]);
+ }
+ batch_->MultiGetFromBatchAndDB(db_, read_opts_, cf0, keys_per_pass,
+ key_slices.data(), values.data(),
+ statuses.data(), false);
+ for (size_t i = 0; i < keys_per_pass; i++) {
+ int key = ParseInt(key_slices[i].ToString());
+ switch (key / 100) {
+ case 0: // 0-99 PUT only
+ ASSERT_OK(statuses[i]);
+ ASSERT_EQ(values[i], key_slices[i].ToString());
+ break;
+ case 1: // 100-199 PUT only
+ ASSERT_OK(statuses[i]);
+ ASSERT_EQ(values[i], key_slices[i].ToString());
+ break;
+ case 2: // 200-299 Deleted
+ ASSERT_TRUE(statuses[i].IsNotFound());
+ break;
+ case 3: // 300-399 Delete+Merge
+ ASSERT_OK(statuses[i]);
+ ASSERT_EQ(values[i], key_slices[i].ToString() + "m");
+ break;
+ case 4: // 400-400 Put+ Merge
+ ASSERT_OK(statuses[i]);
+ ASSERT_EQ(values[i], key_slices[i].ToString() + "," +
+ key_slices[i].ToString() + "m");
+ break;
+ case 5: // Merge only
+ ASSERT_OK(statuses[i]);
+ ASSERT_EQ(values[i], key_slices[i].ToString() + "m");
+ break;
+ case 6: // Never written
+ ASSERT_TRUE(statuses[i].IsNotFound());
+ break;
+ default:
+ assert(false);
+ } // end switch
+ } // End for each key
+ } // end for passes
+}
+
+// This test has merges, but the merge does not play into the final result
+TEST_P(WriteBatchWithIndexTest, FakeMergeWithIteratorTest) {
+ ASSERT_OK(OpenDB());
+ ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
+
+ // The map we are starting with
+ KVMap input = {
+ {"odm", "odm0"},
+ {"omd", "omd0"},
+ {"omp", "omp0"},
+ };
+ KVMap result = {
+ {"odm", "odm2"}, // Orig, Delete, Merge
+ {"mp", "mp1"}, // Merge, Put
+ {"omp", "omp2"}, // Origi, Merge, Put
+ {"mmp", "mmp2"} // Merge, Merge, Put
+ };
+
+ for (auto& iter : result) {
+ EXPECT_EQ(AddToBatch(cf0, iter.first), iter.second);
+ }
+ AddToBatch(cf0, "md"); // Merge, Delete
+ AddToBatch(cf0, "mmd"); // Merge, Merge, Delete
+ AddToBatch(cf0, "omd"); // Orig, Merge, Delete
+
+ KVIter kvi(&result);
+ // First try just the batch
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(cf0, new KVIter(&input)));
+ AssertItersEqual(iter.get(), &kvi);
+}
+
+TEST_P(WriteBatchWithIndexTest, IteratorMergeTest) {
+ ASSERT_OK(OpenDB());
+ ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
+
+ KVMap result = {
+ {"m", "m0"}, // Merge
+ {"mm", "mm0,mm1"}, // Merge, Merge
+ {"dm", "dm1"}, // Delete, Merge
+ {"dmm", "dmm1,dmm2"}, // Delete, Merge, Merge
+ {"mdm", "mdm2"}, // Merge, Delete, Merge
+ {"mpm", "mpm1,mpm2"}, // Merge, Put, Merge
+ {"pm", "pm0,pm1"}, // Put, Merge
+ {"pmm", "pmm0,pmm1,pmm2"}, // Put, Merge, Merge
+ };
+
+ for (auto& iter : result) {
+ EXPECT_EQ(AddToBatch(cf0, iter.first), iter.second);
+ }
+
+ KVIter kvi(&result);
+ // First try just the batch
+ KVMap empty_map;
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(cf0, new KVIter(&empty_map)));
+ AssertItersEqual(iter.get(), &kvi);
+}
+
+TEST_P(WriteBatchWithIndexTest, IteratorMergeTestWithOrig) {
+ ASSERT_OK(OpenDB());
+ ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
+ KVMap original;
+ KVMap results = {
+ {"m", "om,m0"}, // Merge
+ {"mm", "omm,mm0,mm1"}, // Merge, Merge
+ {"dm", "dm1"}, // Delete, Merge
+ {"dmm", "dmm1,dmm2"}, // Delete, Merge, Merge
+ {"mdm", "mdm2"}, // Merge, Delete, Merge
+ {"mpm", "mpm1,mpm2"}, // Merge, Put, Merge
+ {"pm", "pm0,pm1"}, // Put, Merge
+ {"pmm", "pmm0,pmm1,pmm2"}, // Put, Merge, Merge
+ };
+
+ for (auto& iter : results) {
+ AddToBatch(cf0, iter.first);
+ original[iter.first] = "o" + iter.first;
+ }
+
+ KVIter kvi(&results);
+ // First try just the batch
+ std::unique_ptr<Iterator> iter(
+ batch_->NewIteratorWithBase(cf0, new KVIter(&original)));
+ AssertItersEqual(iter.get(), &kvi);
+}
+
+TEST_P(WriteBatchWithIndexTest, GetFromBatchAfterMerge) {
+ std::string value;
+ Status s;
+
+ ASSERT_OK(OpenDB());
+ ASSERT_OK(db_->Put(write_opts_, "o", "aa"));
+ batch_->Merge("o", "bb"); // Merging bb under key "o"
+ batch_->Merge("m", "cc"); // Merging bc under key "m"
+ s = batch_->GetFromBatch(options_, "m", &value);
+ ASSERT_EQ(s.code(), Status::Code::kMergeInProgress);
+ s = batch_->GetFromBatch(options_, "o", &value);
+ ASSERT_EQ(s.code(), Status::Code::kMergeInProgress);
+
+ ASSERT_OK(db_->Write(write_opts_, batch_->GetWriteBatch()));
+ ASSERT_OK(db_->Get(read_opts_, "o", &value));
+ ASSERT_EQ(value, "aa,bb");
+ ASSERT_OK(db_->Get(read_opts_, "m", &value));
+ ASSERT_EQ(value, "cc");
+}
+
+TEST_P(WriteBatchWithIndexTest, GetFromBatchAndDBAfterMerge) {
+ std::string value;
+
+ ASSERT_OK(OpenDB());
+ ASSERT_OK(db_->Put(write_opts_, "o", "aa"));
+ ASSERT_OK(batch_->Merge("o", "bb")); // Merging bb under key "o"
+ ASSERT_OK(batch_->Merge("m", "cc")); // Merging bc under key "m"
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "o", &value));
+ ASSERT_EQ(value, "aa,bb");
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "m", &value));
+ ASSERT_EQ(value, "cc");
+}
+
+TEST_F(WBWIKeepTest, GetAfterPut) {
+ std::string value;
+ ASSERT_OK(OpenDB());
+ ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
+
+ ASSERT_OK(db_->Put(write_opts_, "key", "orig"));
+
+ ASSERT_OK(batch_->Put("key", "aa")); // Writing aa under key
+ ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value));
+ ASSERT_EQ(value, "aa");
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "aa");
+
+ ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key
+ ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value));
+ ASSERT_EQ(value, "aa,bb");
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "aa,bb");
+
+ ASSERT_OK(batch_->Merge("key", "cc")); // Merging cc under key
+ ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value));
+ ASSERT_EQ(value, "aa,bb,cc");
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "aa,bb,cc");
+}
+
+TEST_P(WriteBatchWithIndexTest, GetAfterMergePut) {
+ std::string value;
+ ASSERT_OK(OpenDB());
+ ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
+ ASSERT_OK(db_->Put(write_opts_, "key", "orig"));
+
+ ASSERT_OK(batch_->Merge("key", "aa")); // Merging aa under key
+ Status s = batch_->GetFromBatch(cf0, options_, "key", &value);
+ ASSERT_EQ(s.code(), Status::Code::kMergeInProgress);
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "orig,aa");
+
+ ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key
+ s = batch_->GetFromBatch(cf0, options_, "key", &value);
+ ASSERT_EQ(s.code(), Status::Code::kMergeInProgress);
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "orig,aa,bb");
+
+ ASSERT_OK(batch_->Put("key", "cc")); // Writing cc under key
+ ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value));
+ ASSERT_EQ(value, "cc");
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "cc");
+
+ ASSERT_OK(batch_->Merge("key", "dd")); // Merging dd under key
+ ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value));
+ ASSERT_EQ(value, "cc,dd");
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "cc,dd");
+}
+
+TEST_P(WriteBatchWithIndexTest, GetAfterMergeDelete) {
+ std::string value;
+ ASSERT_OK(OpenDB());
+ ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
+
+ ASSERT_OK(batch_->Merge("key", "aa")); // Merging aa under key
+ Status s = batch_->GetFromBatch(cf0, options_, "key", &value);
+ ASSERT_EQ(s.code(), Status::Code::kMergeInProgress);
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "aa");
+
+ ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key
+ s = batch_->GetFromBatch(cf0, options_, "key", &value);
+ ASSERT_EQ(s.code(), Status::Code::kMergeInProgress);
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "aa,bb");
+
+ ASSERT_OK(batch_->Delete("key")); // Delete key from batch
+ s = batch_->GetFromBatch(cf0, options_, "key", &value);
+ ASSERT_TRUE(s.IsNotFound());
+ s = batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(batch_->Merge("key", "cc")); // Merging cc under key
+ ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value));
+ ASSERT_EQ(value, "cc");
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "cc");
+ ASSERT_OK(batch_->Merge("key", "dd")); // Merging dd under key
+ ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value));
+ ASSERT_EQ(value, "cc,dd");
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value));
+ ASSERT_EQ(value, "cc,dd");
+}
+
+TEST_F(WBWIOverwriteTest, TestBadMergeOperator) {
+ class FailingMergeOperator : public MergeOperator {
+ public:
+ FailingMergeOperator() {}
+
+ bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
+ MergeOperationOutput* /*merge_out*/) const override {
+ return false;
+ }
+
+ const char* Name() const override { return "Failing"; }
+ };
+ options_.merge_operator.reset(new FailingMergeOperator());
+ ASSERT_OK(OpenDB());
+
+ ColumnFamilyHandle* column_family = db_->DefaultColumnFamily();
+ std::string value;
+
+ ASSERT_OK(db_->Put(write_opts_, "a", "a0"));
+ ASSERT_OK(batch_->Put("b", "b0"));
+
+ ASSERT_OK(batch_->Merge("a", "a1"));
+ ASSERT_NOK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value));
+ ASSERT_NOK(batch_->GetFromBatch(column_family, options_, "a", &value));
+ ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value));
+ ASSERT_OK(batch_->GetFromBatch(column_family, options_, "b", &value));
+}
+
+TEST_P(WriteBatchWithIndexTest, ColumnFamilyWithTimestamp) {
+ ColumnFamilyHandleImplDummy cf2(2,
+ test::BytewiseComparatorWithU64TsWrapper());
+
+ // Sanity checks
+ ASSERT_TRUE(batch_->Put(&cf2, "key", "ts", "value").IsNotSupported());
+ ASSERT_TRUE(batch_->Put(/*column_family=*/nullptr, "key", "ts", "value")
+ .IsInvalidArgument());
+ ASSERT_TRUE(batch_->Delete(&cf2, "key", "ts").IsNotSupported());
+ ASSERT_TRUE(batch_->Delete(/*column_family=*/nullptr, "key", "ts")
+ .IsInvalidArgument());
+ ASSERT_TRUE(batch_->SingleDelete(&cf2, "key", "ts").IsNotSupported());
+ ASSERT_TRUE(batch_->SingleDelete(/*column_family=*/nullptr, "key", "ts")
+ .IsInvalidArgument());
+ {
+ std::string value;
+ ASSERT_TRUE(batch_
+ ->GetFromBatchAndDB(
+ /*db=*/nullptr, ReadOptions(), &cf2, "key", &value)
+ .IsInvalidArgument());
+ }
+ {
+ constexpr size_t num_keys = 2;
+ std::array<Slice, num_keys> keys{{Slice(), Slice()}};
+ std::array<PinnableSlice, num_keys> pinnable_vals{
+ {PinnableSlice(), PinnableSlice()}};
+ std::array<Status, num_keys> statuses{{Status(), Status()}};
+ constexpr bool sorted_input = false;
+ batch_->MultiGetFromBatchAndDB(/*db=*/nullptr, ReadOptions(), &cf2,
+ num_keys, keys.data(), pinnable_vals.data(),
+ statuses.data(), sorted_input);
+ for (const auto& s : statuses) {
+ ASSERT_TRUE(s.IsInvalidArgument());
+ }
+ }
+
+ constexpr uint32_t kMaxKey = 10;
+
+ const auto ts_sz_lookup = [&cf2](uint32_t id) {
+ if (cf2.GetID() == id) {
+ return sizeof(uint64_t);
+ } else {
+ return std::numeric_limits<size_t>::max();
+ }
+ };
+
+ // Put keys
+ for (uint32_t i = 0; i < kMaxKey; ++i) {
+ std::string key;
+ PutFixed32(&key, i);
+ Status s = batch_->Put(&cf2, key, "value" + std::to_string(i));
+ ASSERT_OK(s);
+ }
+
+ WriteBatch* wb = batch_->GetWriteBatch();
+ assert(wb);
+ ASSERT_OK(
+ wb->UpdateTimestamps(std::string(sizeof(uint64_t), '\0'), ts_sz_lookup));
+
+ // Point lookup
+ for (uint32_t i = 0; i < kMaxKey; ++i) {
+ std::string value;
+ std::string key;
+ PutFixed32(&key, i);
+ Status s = batch_->GetFromBatch(&cf2, Options(), key, &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("value" + std::to_string(i), value);
+ }
+
+ // Iterator
+ {
+ std::unique_ptr<WBWIIterator> it(batch_->NewIterator(&cf2));
+ uint32_t start = 0;
+ for (it->SeekToFirst(); it->Valid(); it->Next(), ++start) {
+ std::string key;
+ PutFixed32(&key, start);
+ ASSERT_OK(it->status());
+ ASSERT_EQ(key, it->Entry().key);
+ ASSERT_EQ("value" + std::to_string(start), it->Entry().value);
+ ASSERT_EQ(WriteType::kPutRecord, it->Entry().type);
+ }
+ ASSERT_EQ(kMaxKey, start);
+ }
+
+ // Delete the keys with Delete() or SingleDelete()
+ for (uint32_t i = 0; i < kMaxKey; ++i) {
+ std::string key;
+ PutFixed32(&key, i);
+ Status s;
+ if (0 == (i % 2)) {
+ s = batch_->Delete(&cf2, key);
+ } else {
+ s = batch_->SingleDelete(&cf2, key);
+ }
+ ASSERT_OK(s);
+ }
+
+ ASSERT_OK(wb->UpdateTimestamps(std::string(sizeof(uint64_t), '\xfe'),
+ ts_sz_lookup));
+
+ for (uint32_t i = 0; i < kMaxKey; ++i) {
+ std::string value;
+ std::string key;
+ PutFixed32(&key, i);
+ Status s = batch_->GetFromBatch(&cf2, Options(), key, &value);
+ ASSERT_TRUE(s.IsNotFound());
+ }
+
+ // Iterator
+ {
+ const bool overwrite = GetParam();
+ std::unique_ptr<WBWIIterator> it(batch_->NewIterator(&cf2));
+ uint32_t start = 0;
+ for (it->SeekToFirst(); it->Valid(); it->Next(), ++start) {
+ std::string key;
+ PutFixed32(&key, start);
+ ASSERT_EQ(key, it->Entry().key);
+ if (!overwrite) {
+ ASSERT_EQ(WriteType::kPutRecord, it->Entry().type);
+ it->Next();
+ ASSERT_TRUE(it->Valid());
+ }
+ if (0 == (start % 2)) {
+ ASSERT_EQ(WriteType::kDeleteRecord, it->Entry().type);
+ } else {
+ ASSERT_EQ(WriteType::kSingleDeleteRecord, it->Entry().type);
+ }
+ }
+ }
+}
+
+TEST_P(WriteBatchWithIndexTest, IndexNoTs) {
+ const Comparator* const ucmp = test::BytewiseComparatorWithU64TsWrapper();
+ ColumnFamilyHandleImplDummy cf(1, ucmp);
+ WriteBatchWithIndex wbwi;
+ ASSERT_OK(wbwi.Put(&cf, "a", "a0"));
+ ASSERT_OK(wbwi.Put(&cf, "a", "a1"));
+ {
+ std::string ts;
+ PutFixed64(&ts, 10000);
+ ASSERT_OK(wbwi.GetWriteBatch()->UpdateTimestamps(
+ ts, [](uint32_t cf_id) { return cf_id == 1 ? 8 : 0; }));
+ }
+ {
+ std::string value;
+ Status s = wbwi.GetFromBatch(&cf, options_, "a", &value);
+ ASSERT_OK(s);
+ ASSERT_EQ("a1", value);
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool());
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main() {
+ fprintf(stderr, "SKIPPED\n");
+ return 0;
+}
+
+#endif // !ROCKSDB_LITE