diff options
Diffstat (limited to 'src/rocksdb/utilities/write_batch_with_index')
4 files changed, 3200 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc new file mode 100644 index 00000000..c620ebd4 --- /dev/null +++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index.cc @@ -0,0 +1,952 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/write_batch_with_index.h" + +#include <memory> + +#include "db/column_family.h" +#include "db/db_impl.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" +#include "memtable/skiplist.h" +#include "options/db_options.h" +#include "rocksdb/comparator.h" +#include "rocksdb/iterator.h" +#include "util/arena.h" +#include "util/cast_util.h" +#include "util/string_util.h" +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" + +namespace rocksdb { + +// when direction == forward +// * current_at_base_ <=> base_iterator > delta_iterator +// when direction == backwards +// * current_at_base_ <=> base_iterator < delta_iterator +// always: +// * equal_keys_ <=> base_iterator == delta_iterator +class BaseDeltaIterator : public Iterator { + public: + BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, + const Comparator* comparator) + : forward_(true), + current_at_base_(true), + equal_keys_(false), + status_(Status::OK()), + base_iterator_(base_iterator), + delta_iterator_(delta_iterator), + comparator_(comparator) {} + + ~BaseDeltaIterator() override {} + + bool Valid() const override { + return current_at_base_ ? BaseValid() : DeltaValid(); + } + + void SeekToFirst() override { + forward_ = true; + base_iterator_->SeekToFirst(); + delta_iterator_->SeekToFirst(); + UpdateCurrent(); + } + + void SeekToLast() override { + forward_ = false; + base_iterator_->SeekToLast(); + delta_iterator_->SeekToLast(); + UpdateCurrent(); + } + + void Seek(const Slice& k) override { + forward_ = true; + base_iterator_->Seek(k); + delta_iterator_->Seek(k); + UpdateCurrent(); + } + + void SeekForPrev(const Slice& k) override { + forward_ = false; + base_iterator_->SeekForPrev(k); + delta_iterator_->SeekForPrev(k); + UpdateCurrent(); + } + + void Next() override { + if (!Valid()) { + status_ = Status::NotSupported("Next() on invalid iterator"); + return; + } + + if (!forward_) { + // Need to change direction + // if our direction was backward and we're not equal, we have two states: + // * both iterators are valid: we're already in a good state (current + // shows to smaller) + // * only one iterator is valid: we need to advance that iterator + forward_ = true; + equal_keys_ = false; + if (!BaseValid()) { + assert(DeltaValid()); + base_iterator_->SeekToFirst(); + } else if (!DeltaValid()) { + delta_iterator_->SeekToFirst(); + } else if (current_at_base_) { + // Change delta from larger than base to smaller + AdvanceDelta(); + } else { + // Change base from larger than delta to smaller + AdvanceBase(); + } + if (DeltaValid() && BaseValid()) { + if (comparator_->Equal(delta_iterator_->Entry().key, + base_iterator_->key())) { + equal_keys_ = true; + } + } + } + Advance(); + } + + void Prev() override { + if (!Valid()) { + status_ = Status::NotSupported("Prev() on invalid iterator"); + return; + } + + if (forward_) { + // Need to change direction + // if our direction was backward and we're not equal, we have two states: + // * both iterators are valid: we're already in a good state (current + // shows to smaller) + // * only one iterator is valid: we need to advance that iterator + forward_ = false; + equal_keys_ = false; + if (!BaseValid()) { + assert(DeltaValid()); + base_iterator_->SeekToLast(); + } else if (!DeltaValid()) { + delta_iterator_->SeekToLast(); + } else if (current_at_base_) { + // Change delta from less advanced than base to more advanced + AdvanceDelta(); + } else { + // Change base from less advanced than delta to more advanced + AdvanceBase(); + } + if (DeltaValid() && BaseValid()) { + if (comparator_->Equal(delta_iterator_->Entry().key, + base_iterator_->key())) { + equal_keys_ = true; + } + } + } + + Advance(); + } + + Slice key() const override { + return current_at_base_ ? base_iterator_->key() + : delta_iterator_->Entry().key; + } + + Slice value() const override { + return current_at_base_ ? base_iterator_->value() + : delta_iterator_->Entry().value; + } + + Status status() const override { + if (!status_.ok()) { + return status_; + } + if (!base_iterator_->status().ok()) { + return base_iterator_->status(); + } + return delta_iterator_->status(); + } + + private: + void AssertInvariants() { +#ifndef NDEBUG + bool not_ok = false; + if (!base_iterator_->status().ok()) { + assert(!base_iterator_->Valid()); + not_ok = true; + } + if (!delta_iterator_->status().ok()) { + assert(!delta_iterator_->Valid()); + not_ok = true; + } + if (not_ok) { + assert(!Valid()); + assert(!status().ok()); + return; + } + + if (!Valid()) { + return; + } + if (!BaseValid()) { + assert(!current_at_base_ && delta_iterator_->Valid()); + return; + } + if (!DeltaValid()) { + assert(current_at_base_ && base_iterator_->Valid()); + return; + } + // we don't support those yet + assert(delta_iterator_->Entry().type != kMergeRecord && + delta_iterator_->Entry().type != kLogDataRecord); + int compare = comparator_->Compare(delta_iterator_->Entry().key, + base_iterator_->key()); + if (forward_) { + // current_at_base -> compare < 0 + assert(!current_at_base_ || compare < 0); + // !current_at_base -> compare <= 0 + assert(current_at_base_ && compare >= 0); + } else { + // current_at_base -> compare > 0 + assert(!current_at_base_ || compare > 0); + // !current_at_base -> compare <= 0 + assert(current_at_base_ && compare <= 0); + } + // equal_keys_ <=> compare == 0 + assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); +#endif + } + + void Advance() { + if (equal_keys_) { + assert(BaseValid() && DeltaValid()); + AdvanceBase(); + AdvanceDelta(); + } else { + if (current_at_base_) { + assert(BaseValid()); + AdvanceBase(); + } else { + assert(DeltaValid()); + AdvanceDelta(); + } + } + UpdateCurrent(); + } + + void AdvanceDelta() { + if (forward_) { + delta_iterator_->Next(); + } else { + delta_iterator_->Prev(); + } + } + void AdvanceBase() { + if (forward_) { + base_iterator_->Next(); + } else { + base_iterator_->Prev(); + } + } + bool BaseValid() const { return base_iterator_->Valid(); } + bool DeltaValid() const { return delta_iterator_->Valid(); } + void UpdateCurrent() { +// Suppress false positive clang analyzer warnings. +#ifndef __clang_analyzer__ + status_ = Status::OK(); + while (true) { + WriteEntry delta_entry; + if (DeltaValid()) { + assert(delta_iterator_->status().ok()); + delta_entry = delta_iterator_->Entry(); + } else if (!delta_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = false; + return; + } + equal_keys_ = false; + if (!BaseValid()) { + if (!base_iterator_->status().ok()) { + // Expose the error status and stop. + current_at_base_ = true; + return; + } + + // Base has finished. + if (!DeltaValid()) { + // Finished + return; + } + if (delta_entry.type == kDeleteRecord || + delta_entry.type == kSingleDeleteRecord) { + AdvanceDelta(); + } else { + current_at_base_ = false; + return; + } + } else if (!DeltaValid()) { + // Delta has finished. + current_at_base_ = true; + return; + } else { + int compare = + (forward_ ? 1 : -1) * + comparator_->Compare(delta_entry.key, base_iterator_->key()); + if (compare <= 0) { // delta bigger or equal + if (compare == 0) { + equal_keys_ = true; + } + if (delta_entry.type != kDeleteRecord && + delta_entry.type != kSingleDeleteRecord) { + current_at_base_ = false; + return; + } + // Delta is less advanced and is delete. + AdvanceDelta(); + if (equal_keys_) { + AdvanceBase(); + } + } else { + current_at_base_ = true; + return; + } + } + } + + AssertInvariants(); +#endif // __clang_analyzer__ + } + + bool forward_; + bool current_at_base_; + bool equal_keys_; + Status status_; + std::unique_ptr<Iterator> base_iterator_; + std::unique_ptr<WBWIIterator> delta_iterator_; + const Comparator* comparator_; // not owned +}; + +typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&> + WriteBatchEntrySkipList; + +class WBWIIteratorImpl : public WBWIIterator { + public: + WBWIIteratorImpl(uint32_t column_family_id, + WriteBatchEntrySkipList* skip_list, + const ReadableWriteBatch* write_batch) + : column_family_id_(column_family_id), + skip_list_iter_(skip_list), + write_batch_(write_batch) {} + + ~WBWIIteratorImpl() override {} + + bool Valid() const override { + if (!skip_list_iter_.Valid()) { + return false; + } + const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); + return (iter_entry != nullptr && + iter_entry->column_family == column_family_id_); + } + + void SeekToFirst() override { + WriteBatchIndexEntry search_entry( + nullptr /* search_key */, column_family_id_, + true /* is_forward_direction */, true /* is_seek_to_first */); + skip_list_iter_.Seek(&search_entry); + } + + void SeekToLast() override { + WriteBatchIndexEntry search_entry( + nullptr /* search_key */, column_family_id_ + 1, + true /* is_forward_direction */, true /* is_seek_to_first */); + skip_list_iter_.Seek(&search_entry); + if (!skip_list_iter_.Valid()) { + skip_list_iter_.SeekToLast(); + } else { + skip_list_iter_.Prev(); + } + } + + void Seek(const Slice& key) override { + WriteBatchIndexEntry search_entry(&key, column_family_id_, + true /* is_forward_direction */, + false /* is_seek_to_first */); + skip_list_iter_.Seek(&search_entry); + } + + void SeekForPrev(const Slice& key) override { + WriteBatchIndexEntry search_entry(&key, column_family_id_, + false /* is_forward_direction */, + false /* is_seek_to_first */); + skip_list_iter_.SeekForPrev(&search_entry); + } + + void Next() override { skip_list_iter_.Next(); } + + void Prev() override { skip_list_iter_.Prev(); } + + WriteEntry Entry() const override { + WriteEntry ret; + Slice blob, xid; + const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); + // this is guaranteed with Valid() + assert(iter_entry != nullptr && + iter_entry->column_family == column_family_id_); + auto s = write_batch_->GetEntryFromDataOffset( + iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid); + assert(s.ok()); + assert(ret.type == kPutRecord || ret.type == kDeleteRecord || + ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord || + ret.type == kMergeRecord); + return ret; + } + + Status status() const override { + // this is in-memory data structure, so the only way status can be non-ok is + // through memory corruption + return Status::OK(); + } + + const WriteBatchIndexEntry* GetRawEntry() const { + return skip_list_iter_.key(); + } + + private: + uint32_t column_family_id_; + WriteBatchEntrySkipList::Iterator skip_list_iter_; + const ReadableWriteBatch* write_batch_; +}; + +struct WriteBatchWithIndex::Rep { + explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, + size_t max_bytes = 0, bool _overwrite_key = false) + : write_batch(reserved_bytes, max_bytes), + comparator(index_comparator, &write_batch), + skip_list(comparator, &arena), + overwrite_key(_overwrite_key), + last_entry_offset(0), + last_sub_batch_offset(0), + sub_batch_cnt(1) {} + ReadableWriteBatch write_batch; + WriteBatchEntryComparator comparator; + Arena arena; + WriteBatchEntrySkipList skip_list; + bool overwrite_key; + size_t last_entry_offset; + // The starting offset of the last sub-batch. A sub-batch starts right before + // inserting a key that is a duplicate of a key in the last sub-batch. Zero, + // the default, means that no duplicate key is detected so far. + size_t last_sub_batch_offset; + // Total number of sub-batches in the write batch. Default is 1. + size_t sub_batch_cnt; + + // Remember current offset of internal write batch, which is used as + // the starting offset of the next record. + void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); } + + // In overwrite mode, find the existing entry for the same key and update it + // to point to the current entry. + // Return true if the key is found and updated. + bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key); + bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key); + + // Add the recent entry to the update. + // In overwrite mode, if key already exists in the index, update it. + void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key); + void AddOrUpdateIndex(const Slice& key); + + // Allocate an index entry pointing to the last entry in the write batch and + // put it to skip list. + void AddNewEntry(uint32_t column_family_id); + + // Clear all updates buffered in this batch. + void Clear(); + void ClearIndex(); + + // Rebuild index by reading all records from the batch. + // Returns non-ok status on corruption. + Status ReBuildIndex(); +}; + +bool WriteBatchWithIndex::Rep::UpdateExistingEntry( + ColumnFamilyHandle* column_family, const Slice& key) { + uint32_t cf_id = GetColumnFamilyID(column_family); + return UpdateExistingEntryWithCfId(cf_id, key); +} + +bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( + uint32_t column_family_id, const Slice& key) { + if (!overwrite_key) { + return false; + } + + WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch); + iter.Seek(key); + if (!iter.Valid()) { + return false; + } + if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) { + return false; + } + WriteBatchIndexEntry* non_const_entry = + const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry()); + if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) { + last_sub_batch_offset = last_entry_offset; + sub_batch_cnt++; + } + non_const_entry->offset = last_entry_offset; + return true; +} + +void WriteBatchWithIndex::Rep::AddOrUpdateIndex( + ColumnFamilyHandle* column_family, const Slice& key) { + if (!UpdateExistingEntry(column_family, key)) { + uint32_t cf_id = GetColumnFamilyID(column_family); + const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); + if (cf_cmp != nullptr) { + comparator.SetComparatorForCF(cf_id, cf_cmp); + } + AddNewEntry(cf_id); + } +} + +void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { + if (!UpdateExistingEntryWithCfId(0, key)) { + AddNewEntry(0); + } +} + +void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { + const std::string& wb_data = write_batch.Data(); + Slice entry_ptr = Slice(wb_data.data() + last_entry_offset, + wb_data.size() - last_entry_offset); + // Extract key + Slice key; + bool success __attribute__((__unused__)); + success = + ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0); + assert(success); + + auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); + auto* index_entry = + new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id, + key.data() - wb_data.data(), key.size()); + skip_list.Insert(index_entry); +} + +void WriteBatchWithIndex::Rep::Clear() { + write_batch.Clear(); + ClearIndex(); +} + +void WriteBatchWithIndex::Rep::ClearIndex() { + skip_list.~WriteBatchEntrySkipList(); + arena.~Arena(); + new (&arena) Arena(); + new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); + last_entry_offset = 0; + last_sub_batch_offset = 0; + sub_batch_cnt = 1; +} + +Status WriteBatchWithIndex::Rep::ReBuildIndex() { + Status s; + + ClearIndex(); + + if (write_batch.Count() == 0) { + // Nothing to re-index + return s; + } + + size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); + + Slice input(write_batch.Data()); + input.remove_prefix(offset); + + // Loop through all entries in Rep and add each one to the index + int found = 0; + while (s.ok() && !input.empty()) { + Slice key, value, blob, xid; + uint32_t column_family_id = 0; // default + char tag = 0; + + // set offset of current entry for call to AddNewEntry() + last_entry_offset = input.data() - write_batch.Data().data(); + + s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, + &value, &blob, &xid); + if (!s.ok()) { + break; + } + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + case kTypeColumnFamilyMerge: + case kTypeMerge: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key)) { + AddNewEntry(column_family_id); + } + break; + case kTypeLogData: + case kTypeBeginPrepareXID: + case kTypeBeginPersistedPrepareXID: + case kTypeBeginUnprepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + case kTypeNoop: + break; + default: + return Status::Corruption("unknown WriteBatch tag in ReBuildIndex", + ToString(static_cast<unsigned int>(tag))); + } + } + + if (s.ok() && found != write_batch.Count()) { + s = Status::Corruption("WriteBatch has wrong count"); + } + + return s; +} + +WriteBatchWithIndex::WriteBatchWithIndex( + const Comparator* default_index_comparator, size_t reserved_bytes, + bool overwrite_key, size_t max_bytes) + : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes, + overwrite_key)) {} + +WriteBatchWithIndex::~WriteBatchWithIndex() {} + +WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } + +size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; } + +WBWIIterator* WriteBatchWithIndex::NewIterator() { + return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); +} + +WBWIIterator* WriteBatchWithIndex::NewIterator( + ColumnFamilyHandle* column_family) { + return new WBWIIteratorImpl(GetColumnFamilyID(column_family), + &(rep->skip_list), &rep->write_batch); +} + +Iterator* WriteBatchWithIndex::NewIteratorWithBase( + ColumnFamilyHandle* column_family, Iterator* base_iterator) { + if (rep->overwrite_key == false) { + assert(false); + return nullptr; + } + return new BaseDeltaIterator(base_iterator, NewIterator(column_family), + GetColumnFamilyUserComparator(column_family)); +} + +Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) { + if (rep->overwrite_key == false) { + assert(false); + return nullptr; + } + // default column family's comparator + return new BaseDeltaIterator(base_iterator, NewIterator(), + rep->comparator.default_comparator()); +} + +Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.Put(column_family, key, value); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, key); + } + return s; +} + +Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.Put(key, value); + if (s.ok()) { + rep->AddOrUpdateIndex(key); + } + return s; +} + +Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, + const Slice& key) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.Delete(column_family, key); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, key); + } + return s; +} + +Status WriteBatchWithIndex::Delete(const Slice& key) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.Delete(key); + if (s.ok()) { + rep->AddOrUpdateIndex(key); + } + return s; +} + +Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.SingleDelete(column_family, key); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, key); + } + return s; +} + +Status WriteBatchWithIndex::SingleDelete(const Slice& key) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.SingleDelete(key); + if (s.ok()) { + rep->AddOrUpdateIndex(key); + } + return s; +} + +Status WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family, + const Slice& begin_key, + const Slice& end_key) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.DeleteRange(column_family, begin_key, end_key); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, begin_key); + } + return s; +} + +Status WriteBatchWithIndex::DeleteRange(const Slice& begin_key, + const Slice& end_key) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.DeleteRange(begin_key, end_key); + if (s.ok()) { + rep->AddOrUpdateIndex(begin_key); + } + return s; +} + +Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.Merge(column_family, key, value); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, key); + } + return s; +} + +Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { + rep->SetLastEntryOffset(); + auto s = rep->write_batch.Merge(key, value); + if (s.ok()) { + rep->AddOrUpdateIndex(key); + } + return s; +} + +Status WriteBatchWithIndex::PutLogData(const Slice& blob) { + return rep->write_batch.PutLogData(blob); +} + +void WriteBatchWithIndex::Clear() { rep->Clear(); } + +Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, + const DBOptions& options, + const Slice& key, std::string* value) { + Status s; + MergeContext merge_context; + const ImmutableDBOptions immuable_db_options(options); + + WriteBatchWithIndexInternal::Result result = + WriteBatchWithIndexInternal::GetFromBatch( + immuable_db_options, this, column_family, key, &merge_context, + &rep->comparator, value, rep->overwrite_key, &s); + + switch (result) { + case WriteBatchWithIndexInternal::Result::kFound: + case WriteBatchWithIndexInternal::Result::kError: + // use returned status + break; + case WriteBatchWithIndexInternal::Result::kDeleted: + case WriteBatchWithIndexInternal::Result::kNotFound: + s = Status::NotFound(); + break; + case WriteBatchWithIndexInternal::Result::kMergeInProgress: + s = Status::MergeInProgress(); + break; + default: + assert(false); + } + + return s; +} + +Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, + const ReadOptions& read_options, + const Slice& key, + std::string* value) { + assert(value != nullptr); + PinnableSlice pinnable_val(value); + assert(!pinnable_val.IsPinned()); + auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, + &pinnable_val); + if (s.ok() && pinnable_val.IsPinned()) { + value->assign(pinnable_val.data(), pinnable_val.size()); + } // else value is already assigned + return s; +} + +Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, + const ReadOptions& read_options, + const Slice& key, + PinnableSlice* pinnable_val) { + return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, + pinnable_val); +} + +Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, + const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, + std::string* value) { + assert(value != nullptr); + PinnableSlice pinnable_val(value); + assert(!pinnable_val.IsPinned()); + auto s = + GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val); + if (s.ok() && pinnable_val.IsPinned()) { + value->assign(pinnable_val.data(), pinnable_val.size()); + } // else value is already assigned + return s; +} + +Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, + const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, + PinnableSlice* pinnable_val) { + return GetFromBatchAndDB(db, read_options, column_family, key, pinnable_val, + nullptr); +} + +Status WriteBatchWithIndex::GetFromBatchAndDB( + DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) { + Status s; + MergeContext merge_context; + const ImmutableDBOptions& immuable_db_options = + static_cast_with_check<DBImpl, DB>(db->GetRootDB()) + ->immutable_db_options(); + + // Since the lifetime of the WriteBatch is the same as that of the transaction + // we cannot pin it as otherwise the returned value will not be available + // after the transaction finishes. + std::string& batch_value = *pinnable_val->GetSelf(); + WriteBatchWithIndexInternal::Result result = + WriteBatchWithIndexInternal::GetFromBatch( + immuable_db_options, this, column_family, key, &merge_context, + &rep->comparator, &batch_value, rep->overwrite_key, &s); + + if (result == WriteBatchWithIndexInternal::Result::kFound) { + pinnable_val->PinSelf(); + return s; + } + if (result == WriteBatchWithIndexInternal::Result::kDeleted) { + return Status::NotFound(); + } + if (result == WriteBatchWithIndexInternal::Result::kError) { + return s; + } + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && + rep->overwrite_key == true) { + // Since we've overwritten keys, we do not know what other operations are + // in this batch for this key, so we cannot do a Merge to compute the + // result. Instead, we will simply return MergeInProgress. + return Status::MergeInProgress(); + } + + assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || + result == WriteBatchWithIndexInternal::Result::kNotFound); + + // Did not find key in batch OR could not resolve Merges. Try DB. + if (!callback) { + s = db->Get(read_options, column_family, key, pinnable_val); + } else { + s = static_cast_with_check<DBImpl, DB>(db->GetRootDB()) + ->GetImpl(read_options, column_family, key, pinnable_val, nullptr, + callback); + } + + if (s.ok() || s.IsNotFound()) { // DB Get Succeeded + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { + // Merge result from DB with merges in Batch + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + const MergeOperator* merge_operator = + cfh->cfd()->ioptions()->merge_operator; + Statistics* statistics = immuable_db_options.statistics.get(); + Env* env = immuable_db_options.env; + Logger* logger = immuable_db_options.info_log.get(); + + Slice* merge_data; + if (s.ok()) { + merge_data = pinnable_val; + } else { // Key not present in db (s.IsNotFound()) + merge_data = nullptr; + } + + if (merge_operator) { + s = MergeHelper::TimedFullMerge( + merge_operator, key, merge_data, merge_context.GetOperands(), + pinnable_val->GetSelf(), logger, statistics, env); + pinnable_val->PinSelf(); + } else { + s = Status::InvalidArgument("Options::merge_operator must be set"); + } + } + } + + return s; +} + +void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } + +Status WriteBatchWithIndex::RollbackToSavePoint() { + Status s = rep->write_batch.RollbackToSavePoint(); + + if (s.ok()) { + rep->sub_batch_cnt = 1; + rep->last_sub_batch_offset = 0; + s = rep->ReBuildIndex(); + } + + return s; +} + +Status WriteBatchWithIndex::PopSavePoint() { + return rep->write_batch.PopSavePoint(); +} + +void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) { + rep->write_batch.SetMaxBytes(max_bytes); +} + +size_t WriteBatchWithIndex::GetDataSize() const { + return rep->write_batch.GetDataSize(); +} + +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc new file mode 100644 index 00000000..243672ce --- /dev/null +++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -0,0 +1,288 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" + +#include "db/column_family.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/coding.h" +#include "util/string_util.h" + +namespace rocksdb { + +class Env; +class Logger; +class Statistics; + +Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, + WriteType* type, Slice* Key, + Slice* value, Slice* blob, + Slice* xid) const { + if (type == nullptr || Key == nullptr || value == nullptr || + blob == nullptr || xid == nullptr) { + return Status::InvalidArgument("Output parameters cannot be null"); + } + + if (data_offset == GetDataSize()) { + // reached end of batch. + return Status::NotFound(); + } + + if (data_offset > GetDataSize()) { + return Status::InvalidArgument("data offset exceed write batch size"); + } + Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); + char tag; + uint32_t column_family; + Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, + blob, xid); + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + *type = kPutRecord; + break; + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + *type = kDeleteRecord; + break; + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + *type = kSingleDeleteRecord; + break; + case kTypeColumnFamilyRangeDeletion: + case kTypeRangeDeletion: + *type = kDeleteRangeRecord; + break; + case kTypeColumnFamilyMerge: + case kTypeMerge: + *type = kMergeRecord; + break; + case kTypeLogData: + *type = kLogDataRecord; + break; + case kTypeNoop: + case kTypeBeginPrepareXID: + case kTypeBeginPersistedPrepareXID: + case kTypeBeginUnprepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + *type = kXIDRecord; + break; + default: + return Status::Corruption("unknown WriteBatch tag ", + ToString(static_cast<unsigned int>(tag))); + } + return Status::OK(); +} + +// If both of `entry1` and `entry2` point to real entry in write batch, we +// compare the entries as following: +// 1. first compare the column family, the one with larger CF will be larger; +// 2. Inside the same CF, we first decode the entry to find the key of the entry +// and the entry with larger key will be larger; +// 3. If two entries are of the same CF and offset, the one with larger offset +// will be larger. +// Some times either `entry1` or `entry2` is dummy entry, which is actually +// a search key. In this case, in step 2, we don't go ahead and decode the +// entry but use the value in WriteBatchIndexEntry::search_key. +// One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf. +// This indicate that we are going to seek to the first of the column family. +// Once we see this, this entry will be smaller than all the real entries of +// the column family. +int WriteBatchEntryComparator::operator()( + const WriteBatchIndexEntry* entry1, + const WriteBatchIndexEntry* entry2) const { + if (entry1->column_family > entry2->column_family) { + return 1; + } else if (entry1->column_family < entry2->column_family) { + return -1; + } + + // Deal with special case of seeking to the beginning of a column family + if (entry1->is_min_in_cf()) { + return -1; + } else if (entry2->is_min_in_cf()) { + return 1; + } + + Slice key1, key2; + if (entry1->search_key == nullptr) { + key1 = Slice(write_batch_->Data().data() + entry1->key_offset, + entry1->key_size); + } else { + key1 = *(entry1->search_key); + } + if (entry2->search_key == nullptr) { + key2 = Slice(write_batch_->Data().data() + entry2->key_offset, + entry2->key_size); + } else { + key2 = *(entry2->search_key); + } + + int cmp = CompareKey(entry1->column_family, key1, key2); + if (cmp != 0) { + return cmp; + } else if (entry1->offset > entry2->offset) { + return 1; + } else if (entry1->offset < entry2->offset) { + return -1; + } + return 0; +} + +int WriteBatchEntryComparator::CompareKey(uint32_t column_family, + const Slice& key1, + const Slice& key2) const { + if (column_family < cf_comparators_.size() && + cf_comparators_[column_family] != nullptr) { + return cf_comparators_[column_family]->Compare(key1, key2); + } else { + return default_comparator_->Compare(key1, key2); + } +} + +WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( + const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family, const Slice& key, + MergeContext* merge_context, WriteBatchEntryComparator* cmp, + std::string* value, bool overwrite_key, Status* s) { + uint32_t cf_id = GetColumnFamilyID(column_family); + *s = Status::OK(); + WriteBatchWithIndexInternal::Result result = + WriteBatchWithIndexInternal::Result::kNotFound; + + std::unique_ptr<WBWIIterator> iter = + std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family)); + + // We want to iterate in the reverse order that the writes were added to the + // batch. Since we don't have a reverse iterator, we must seek past the end. + // TODO(agiardullo): consider adding support for reverse iteration + iter->Seek(key); + while (iter->Valid()) { + const WriteEntry entry = iter->Entry(); + if (cmp->CompareKey(cf_id, entry.key, key) != 0) { + break; + } + + iter->Next(); + } + + if (!(*s).ok()) { + return WriteBatchWithIndexInternal::Result::kError; + } + + if (!iter->Valid()) { + // Read past end of results. Reposition on last result. + iter->SeekToLast(); + } else { + iter->Prev(); + } + + Slice entry_value; + while (iter->Valid()) { + const WriteEntry entry = iter->Entry(); + if (cmp->CompareKey(cf_id, entry.key, key) != 0) { + // Unexpected error or we've reached a different next key + break; + } + + switch (entry.type) { + case kPutRecord: { + result = WriteBatchWithIndexInternal::Result::kFound; + entry_value = entry.value; + break; + } + case kMergeRecord: { + result = WriteBatchWithIndexInternal::Result::kMergeInProgress; + merge_context->PushOperand(entry.value); + break; + } + case kDeleteRecord: + case kSingleDeleteRecord: { + result = WriteBatchWithIndexInternal::Result::kDeleted; + break; + } + case kLogDataRecord: + case kXIDRecord: { + // ignore + break; + } + default: { + result = WriteBatchWithIndexInternal::Result::kError; + (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", + ToString(entry.type)); + break; + } + } + if (result == WriteBatchWithIndexInternal::Result::kFound || + result == WriteBatchWithIndexInternal::Result::kDeleted || + result == WriteBatchWithIndexInternal::Result::kError) { + // We can stop iterating once we find a PUT or DELETE + break; + } + if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && + overwrite_key == true) { + // Since we've overwritten keys, we do not know what other operations are + // in this batch for this key, so we cannot do a Merge to compute the + // result. Instead, we will simply return MergeInProgress. + break; + } + + iter->Prev(); + } + + if ((*s).ok()) { + if (result == WriteBatchWithIndexInternal::Result::kFound || + result == WriteBatchWithIndexInternal::Result::kDeleted) { + // Found a Put or Delete. Merge if necessary. + if (merge_context->GetNumOperands() > 0) { + const MergeOperator* merge_operator; + + if (column_family != nullptr) { + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + merge_operator = cfh->cfd()->ioptions()->merge_operator; + } else { + *s = Status::InvalidArgument("Must provide a column_family"); + result = WriteBatchWithIndexInternal::Result::kError; + return result; + } + Statistics* statistics = immuable_db_options.statistics.get(); + Env* env = immuable_db_options.env; + Logger* logger = immuable_db_options.info_log.get(); + + if (merge_operator) { + *s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value, + merge_context->GetOperands(), value, + logger, statistics, env); + } else { + *s = Status::InvalidArgument("Options::merge_operator must be set"); + } + if ((*s).ok()) { + result = WriteBatchWithIndexInternal::Result::kFound; + } else { + result = WriteBatchWithIndexInternal::Result::kError; + } + } else { // nothing to merge + if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT + value->assign(entry_value.data(), entry_value.size()); + } + } + } + } + + return result; +} + +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h new file mode 100644 index 00000000..3eed7c72 --- /dev/null +++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -0,0 +1,145 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#ifndef ROCKSDB_LITE + +#include <limits> +#include <string> +#include <vector> + +#include "options/db_options.h" +#include "port/port.h" +#include "rocksdb/comparator.h" +#include "rocksdb/iterator.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "rocksdb/utilities/write_batch_with_index.h" + +namespace rocksdb { + +class MergeContext; +struct Options; + +// Key used by skip list, as the binary searchable index of WriteBatchWithIndex. +struct WriteBatchIndexEntry { + WriteBatchIndexEntry(size_t o, uint32_t c, size_t ko, size_t ksz) + : offset(o), + column_family(c), + key_offset(ko), + key_size(ksz), + search_key(nullptr) {} + // Create a dummy entry as the search key. This index entry won't be backed + // by an entry from the write batch, but a pointer to the search key. Or a + // special flag of offset can indicate we are seek to first. + // @_search_key: the search key + // @_column_family: column family + // @is_forward_direction: true for Seek(). False for SeekForPrev() + // @is_seek_to_first: true if we seek to the beginning of the column family + // _search_key should be null in this case. + WriteBatchIndexEntry(const Slice* _search_key, uint32_t _column_family, + bool is_forward_direction, bool is_seek_to_first) + // For SeekForPrev(), we need to make the dummy entry larger than any + // entry who has the same search key. Otherwise, we'll miss those entries. + : offset(is_forward_direction ? 0 : port::kMaxSizet), + column_family(_column_family), + key_offset(0), + key_size(is_seek_to_first ? kFlagMinInCf : 0), + search_key(_search_key) { + assert(_search_key != nullptr || is_seek_to_first); + } + + // If this flag appears in the key_size, it indicates a + // key that is smaller than any other entry for the same column family. + static const size_t kFlagMinInCf = port::kMaxSizet; + + bool is_min_in_cf() const { + assert(key_size != kFlagMinInCf || + (key_offset == 0 && search_key == nullptr)); + return key_size == kFlagMinInCf; + } + + // offset of an entry in write batch's string buffer. If this is a dummy + // lookup key, in which case search_key != nullptr, offset is set to either + // 0 or max, only for comparison purpose. Because when entries have the same + // key, the entry with larger offset is larger, offset = 0 will make a seek + // key small or equal than all the entries with the seek key, so that Seek() + // will find all the entries of the same key. Similarly, offset = MAX will + // make the entry just larger than all entries with the search key so + // SeekForPrev() will see all the keys with the same key. + size_t offset; + uint32_t column_family; // c1olumn family of the entry. + size_t key_offset; // offset of the key in write batch's string buffer. + size_t key_size; // size of the key. kFlagMinInCf indicates + // that this is a dummy look up entry for + // SeekToFirst() to the beginning of the column + // family. We use the flag here to save a boolean + // in the struct. + + const Slice* search_key; // if not null, instead of reading keys from + // write batch, use it to compare. This is used + // for lookup key. +}; + +class ReadableWriteBatch : public WriteBatch { + public: + explicit ReadableWriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0) + : WriteBatch(reserved_bytes, max_bytes) {} + // Retrieve some information from a write entry in the write batch, given + // the start offset of the write entry. + Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, + Slice* value, Slice* blob, Slice* xid) const; +}; + +class WriteBatchEntryComparator { + public: + WriteBatchEntryComparator(const Comparator* _default_comparator, + const ReadableWriteBatch* write_batch) + : default_comparator_(_default_comparator), write_batch_(write_batch) {} + // Compare a and b. Return a negative value if a is less than b, 0 if they + // are equal, and a positive value if a is greater than b + int operator()(const WriteBatchIndexEntry* entry1, + const WriteBatchIndexEntry* entry2) const; + + int CompareKey(uint32_t column_family, const Slice& key1, + const Slice& key2) const; + + void SetComparatorForCF(uint32_t column_family_id, + const Comparator* comparator) { + if (column_family_id >= cf_comparators_.size()) { + cf_comparators_.resize(column_family_id + 1, nullptr); + } + cf_comparators_[column_family_id] = comparator; + } + + const Comparator* default_comparator() { return default_comparator_; } + + private: + const Comparator* default_comparator_; + std::vector<const Comparator*> cf_comparators_; + const ReadableWriteBatch* write_batch_; +}; + +class WriteBatchWithIndexInternal { + public: + enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError }; + + // If batch contains a value for key, store it in *value and return kFound. + // If batch contains a deletion for key, return Deleted. + // If batch contains Merge operations as the most recent entry for a key, + // and the merge process does not stop (not reaching a value or delete), + // prepend the current merge operands to *operands, + // and return kMergeInProgress + // If batch does not contain this key, return kNotFound + // Else, return kError on error with error Status stored in *s. + static WriteBatchWithIndexInternal::Result GetFromBatch( + const ImmutableDBOptions& ioptions, WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family, const Slice& key, + MergeContext* merge_context, WriteBatchEntryComparator* cmp, + std::string* value, bool overwrite_key, Status* s); +}; + +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc new file mode 100644 index 00000000..be715fe3 --- /dev/null +++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -0,0 +1,1815 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#include <memory> +#include <map> +#include "db/column_family.h" +#include "port/stack_trace.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/testharness.h" +#include "utilities/merge_operators.h" +#include "utilities/merge_operators/string_append/stringappend.h" + +namespace rocksdb { + +namespace { +class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl { + public: + explicit ColumnFamilyHandleImplDummy(int id, const Comparator* comparator) + : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), + id_(id), + comparator_(comparator) {} + uint32_t GetID() const override { return id_; } + const Comparator* GetComparator() const override { return comparator_; } + + private: + uint32_t id_; + const Comparator* comparator_; +}; + +struct Entry { + std::string key; + std::string value; + WriteType type; +}; + +struct TestHandler : public WriteBatch::Handler { + std::map<uint32_t, std::vector<Entry>> seen; + Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + Entry e; + e.key = key.ToString(); + e.value = value.ToString(); + e.type = kPutRecord; + seen[column_family_id].push_back(e); + return Status::OK(); + } + Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + Entry e; + e.key = key.ToString(); + e.value = value.ToString(); + e.type = kMergeRecord; + seen[column_family_id].push_back(e); + return Status::OK(); + } + void LogData(const Slice& /*blob*/) override {} + Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + Entry e; + e.key = key.ToString(); + e.value = ""; + e.type = kDeleteRecord; + seen[column_family_id].push_back(e); + return Status::OK(); + } +}; +} // namespace anonymous + +class WriteBatchWithIndexTest : public testing::Test {}; + +void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries, + WriteBatchWithIndex* batch) { + // In this test, we insert <key, value> to column family `data`, and + // <value, key> to column family `index`. Then iterator them in order + // and seek them by key. + + // Sort entries by key + std::map<std::string, std::vector<Entry*>> data_map; + // Sort entries by value + std::map<std::string, std::vector<Entry*>> index_map; + for (auto& e : entries) { + data_map[e.key].push_back(&e); + index_map[e.value].push_back(&e); + } + + ColumnFamilyHandleImplDummy data(6, BytewiseComparator()); + ColumnFamilyHandleImplDummy index(8, BytewiseComparator()); + for (auto& e : entries) { + if (e.type == kPutRecord) { + batch->Put(&data, e.key, e.value); + batch->Put(&index, e.value, e.key); + } else if (e.type == kMergeRecord) { + batch->Merge(&data, e.key, e.value); + batch->Put(&index, e.value, e.key); + } else { + assert(e.type == kDeleteRecord); + std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data)); + iter->Seek(e.key); + ASSERT_OK(iter->status()); + auto write_entry = iter->Entry(); + ASSERT_EQ(e.key, write_entry.key.ToString()); + ASSERT_EQ(e.value, write_entry.value.ToString()); + batch->Delete(&data, e.key); + batch->Put(&index, e.value, ""); + } + } + + // Iterator all keys + { + std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data)); + for (int seek_to_first : {0, 1}) { + if (seek_to_first) { + iter->SeekToFirst(); + } else { + iter->Seek(""); + } + for (auto pair : data_map) { + for (auto v : pair.second) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + auto write_entry = iter->Entry(); + ASSERT_EQ(pair.first, write_entry.key.ToString()); + ASSERT_EQ(v->type, write_entry.type); + if (write_entry.type != kDeleteRecord) { + ASSERT_EQ(v->value, write_entry.value.ToString()); + } + iter->Next(); + } + } + ASSERT_TRUE(!iter->Valid()); + } + iter->SeekToLast(); + for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) { + for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + auto write_entry = iter->Entry(); + ASSERT_EQ(pair->first, write_entry.key.ToString()); + ASSERT_EQ((*v)->type, write_entry.type); + if (write_entry.type != kDeleteRecord) { + ASSERT_EQ((*v)->value, write_entry.value.ToString()); + } + iter->Prev(); + } + } + ASSERT_TRUE(!iter->Valid()); + } + + // Iterator all indexes + { + std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&index)); + for (int seek_to_first : {0, 1}) { + if (seek_to_first) { + iter->SeekToFirst(); + } else { + iter->Seek(""); + } + for (auto pair : index_map) { + for (auto v : pair.second) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + auto write_entry = iter->Entry(); + ASSERT_EQ(pair.first, write_entry.key.ToString()); + if (v->type != kDeleteRecord) { + ASSERT_EQ(v->key, write_entry.value.ToString()); + ASSERT_EQ(v->value, write_entry.key.ToString()); + } + iter->Next(); + } + } + ASSERT_TRUE(!iter->Valid()); + } + + iter->SeekToLast(); + for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) { + for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + auto write_entry = iter->Entry(); + ASSERT_EQ(pair->first, write_entry.key.ToString()); + if ((*v)->type != kDeleteRecord) { + ASSERT_EQ((*v)->key, write_entry.value.ToString()); + ASSERT_EQ((*v)->value, write_entry.key.ToString()); + } + iter->Prev(); + } + } + ASSERT_TRUE(!iter->Valid()); + } + + // Seek to every key + { + std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data)); + + // Seek the keys one by one in reverse order + for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) { + iter->Seek(pair->first); + ASSERT_OK(iter->status()); + for (auto v : pair->second) { + ASSERT_TRUE(iter->Valid()); + auto write_entry = iter->Entry(); + ASSERT_EQ(pair->first, write_entry.key.ToString()); + ASSERT_EQ(v->type, write_entry.type); + if (write_entry.type != kDeleteRecord) { + ASSERT_EQ(v->value, write_entry.value.ToString()); + } + iter->Next(); + ASSERT_OK(iter->status()); + } + } + } + + // Seek to every index + { + std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&index)); + + // Seek the keys one by one in reverse order + for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) { + iter->Seek(pair->first); + ASSERT_OK(iter->status()); + for (auto v : pair->second) { + ASSERT_TRUE(iter->Valid()); + auto write_entry = iter->Entry(); + ASSERT_EQ(pair->first, write_entry.key.ToString()); + ASSERT_EQ(v->value, write_entry.key.ToString()); + if (v->type != kDeleteRecord) { + ASSERT_EQ(v->key, write_entry.value.ToString()); + } + iter->Next(); + ASSERT_OK(iter->status()); + } + } + } + + // Verify WriteBatch can be iterated + TestHandler handler; + batch->GetWriteBatch()->Iterate(&handler); + + // Verify data column family + { + ASSERT_EQ(entries.size(), handler.seen[data.GetID()].size()); + size_t i = 0; + for (auto e : handler.seen[data.GetID()]) { + auto write_entry = entries[i++]; + ASSERT_EQ(e.type, write_entry.type); + ASSERT_EQ(e.key, write_entry.key); + if (e.type != kDeleteRecord) { + ASSERT_EQ(e.value, write_entry.value); + } + } + } + + // Verify index column family + { + ASSERT_EQ(entries.size(), handler.seen[index.GetID()].size()); + size_t i = 0; + for (auto e : handler.seen[index.GetID()]) { + auto write_entry = entries[i++]; + ASSERT_EQ(e.key, write_entry.value); + if (write_entry.type != kDeleteRecord) { + ASSERT_EQ(e.value, write_entry.key); + } + } + } +} + +TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { + Entry entries[] = { + {"aaa", "0005", kPutRecord}, + {"b", "0002", kPutRecord}, + {"cdd", "0002", kMergeRecord}, + {"aab", "00001", kPutRecord}, + {"cc", "00005", kPutRecord}, + {"cdd", "0002", kPutRecord}, + {"aab", "0003", kPutRecord}, + {"cc", "00005", kDeleteRecord}, + }; + std::vector<Entry> entries_list(entries, entries + 8); + + WriteBatchWithIndex batch(nullptr, 20); + + TestValueAsSecondaryIndexHelper(entries_list, &batch); + + // Clear batch and re-run test with new values + batch.Clear(); + + Entry new_entries[] = { + {"aaa", "0005", kPutRecord}, + {"e", "0002", kPutRecord}, + {"add", "0002", kMergeRecord}, + {"aab", "00001", kPutRecord}, + {"zz", "00005", kPutRecord}, + {"add", "0002", kPutRecord}, + {"aab", "0003", kPutRecord}, + {"zz", "00005", kDeleteRecord}, + }; + + entries_list = std::vector<Entry>(new_entries, new_entries + 8); + + TestValueAsSecondaryIndexHelper(entries_list, &batch); +} + +TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { + ColumnFamilyHandleImplDummy cf1(6, nullptr); + ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator()); + WriteBatchWithIndex batch(BytewiseComparator(), 20); + + batch.Put(&cf1, "ddd", ""); + batch.Put(&cf2, "aaa", ""); + batch.Put(&cf2, "eee", ""); + batch.Put(&cf1, "ccc", ""); + batch.Put(&reverse_cf, "a11", ""); + batch.Put(&cf1, "bbb", ""); + + Slice key_slices[] = {"a", "3", "3"}; + Slice value_slice = ""; + batch.Put(&reverse_cf, SliceParts(key_slices, 3), + SliceParts(&value_slice, 1)); + batch.Put(&reverse_cf, "a22", ""); + + { + std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bbb", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("ccc", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("ddd", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + { + std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf2)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("aaa", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("eee", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + { + std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&reverse_cf)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("z"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a33", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a22", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a11", iter->Entry().key.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("a22"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a22", iter->Entry().key.ToString()); + + iter->Seek("a13"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a11", iter->Entry().key.ToString()); + } +} + +TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { + ColumnFamilyHandleImplDummy cf1(6, nullptr); + ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator()); + WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + + batch.Put(&cf1, "ddd", ""); + batch.Merge(&cf1, "ddd", ""); + batch.Delete(&cf1, "ddd"); + batch.Put(&cf2, "aaa", ""); + batch.Delete(&cf2, "aaa"); + batch.Put(&cf2, "aaa", "aaa"); + batch.Put(&cf2, "eee", "eee"); + batch.Put(&cf1, "ccc", ""); + batch.Put(&reverse_cf, "a11", ""); + batch.Delete(&cf1, "ccc"); + batch.Put(&reverse_cf, "a33", "a33"); + batch.Put(&reverse_cf, "a11", "a11"); + Slice slices[] = {"a", "3", "3"}; + batch.Delete(&reverse_cf, SliceParts(slices, 3)); + + { + std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("ccc", iter->Entry().key.ToString()); + ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("ddd", iter->Entry().key.ToString()); + ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + { + std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf2)); + iter->SeekToLast(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("eee", iter->Entry().key.ToString()); + ASSERT_EQ("eee", iter->Entry().value.ToString()); + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("aaa", iter->Entry().key.ToString()); + ASSERT_EQ("aaa", iter->Entry().value.ToString()); + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("aaa", iter->Entry().key.ToString()); + ASSERT_EQ("aaa", iter->Entry().value.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("eee", iter->Entry().key.ToString()); + ASSERT_EQ("eee", iter->Entry().value.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + { + std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&reverse_cf)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("z"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a33", iter->Entry().key.ToString()); + ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a11", iter->Entry().key.ToString()); + ASSERT_EQ("a11", iter->Entry().value.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a11", iter->Entry().key.ToString()); + ASSERT_EQ("a11", iter->Entry().value.ToString()); + iter->Prev(); + + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a33", iter->Entry().key.ToString()); + ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord); + iter->Prev(); + ASSERT_TRUE(!iter->Valid()); + } +} + +namespace { +typedef std::map<std::string, std::string> KVMap; + +class KVIter : public Iterator { + public: + explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {} + bool Valid() const override { return iter_ != map_->end(); } + void SeekToFirst() override { iter_ = map_->begin(); } + void SeekToLast() override { + if (map_->empty()) { + iter_ = map_->end(); + } else { + iter_ = map_->find(map_->rbegin()->first); + } + } + void Seek(const Slice& k) override { + iter_ = map_->lower_bound(k.ToString()); + } + void SeekForPrev(const Slice& k) override { + iter_ = map_->upper_bound(k.ToString()); + Prev(); + } + void Next() override { ++iter_; } + void Prev() override { + if (iter_ == map_->begin()) { + iter_ = map_->end(); + return; + } + --iter_; + } + + Slice key() const override { return iter_->first; } + Slice value() const override { return iter_->second; } + Status status() const override { return Status::OK(); } + + private: + const KVMap* const map_; + KVMap::const_iterator iter_; +}; + +void AssertIter(Iterator* iter, const std::string& key, + const std::string& value) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(value, iter->value().ToString()); +} + +void AssertItersEqual(Iterator* iter1, Iterator* iter2) { + ASSERT_EQ(iter1->Valid(), iter2->Valid()); + if (iter1->Valid()) { + ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString()); + ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString()); + } +} +} // namespace + +TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { + std::vector<std::string> source_strings = {"a", "b", "c", "d", "e", + "f", "g", "h", "i", "j"}; + for (int rand_seed = 301; rand_seed < 366; rand_seed++) { + Random rnd(rand_seed); + + ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); + ColumnFamilyHandleImplDummy cf3(8, BytewiseComparator()); + + WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + + if (rand_seed % 2 == 0) { + batch.Put(&cf2, "zoo", "bar"); + } + if (rand_seed % 4 == 1) { + batch.Put(&cf3, "zoo", "bar"); + } + + KVMap map; + KVMap merged_map; + for (auto key : source_strings) { + std::string value = key + key; + int type = rnd.Uniform(6); + switch (type) { + case 0: + // only base has it + map[key] = value; + merged_map[key] = value; + break; + case 1: + // only delta has it + batch.Put(&cf1, key, value); + map[key] = value; + merged_map[key] = value; + break; + case 2: + // both has it. Delta should win + batch.Put(&cf1, key, value); + map[key] = "wrong_value"; + merged_map[key] = value; + break; + case 3: + // both has it. Delta is delete + batch.Delete(&cf1, key); + map[key] = "wrong_value"; + break; + case 4: + // only delta has it. Delta is delete + batch.Delete(&cf1, key); + map[key] = "wrong_value"; + break; + default: + // Neither iterator has it. + break; + } + } + + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + std::unique_ptr<Iterator> result_iter(new KVIter(&merged_map)); + + bool is_valid = false; + for (int i = 0; i < 128; i++) { + // Random walk and make sure iter and result_iter returns the + // same key and value + int type = rnd.Uniform(6); + ASSERT_OK(iter->status()); + switch (type) { + case 0: + // Seek to First + iter->SeekToFirst(); + result_iter->SeekToFirst(); + break; + case 1: + // Seek to last + iter->SeekToLast(); + result_iter->SeekToLast(); + break; + case 2: { + // Seek to random key + auto key_idx = rnd.Uniform(static_cast<int>(source_strings.size())); + auto key = source_strings[key_idx]; + iter->Seek(key); + result_iter->Seek(key); + break; + } + case 3: { + // SeekForPrev to random key + auto key_idx = rnd.Uniform(static_cast<int>(source_strings.size())); + auto key = source_strings[key_idx]; + iter->SeekForPrev(key); + result_iter->SeekForPrev(key); + break; + } + case 4: + // Next + if (is_valid) { + iter->Next(); + result_iter->Next(); + } else { + continue; + } + break; + default: + assert(type == 5); + // Prev + if (is_valid) { + iter->Prev(); + result_iter->Prev(); + } else { + continue; + } + break; + } + AssertItersEqual(iter.get(), result_iter.get()); + is_valid = iter->Valid(); + } + } +} + +TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { + ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); + WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + + { + KVMap map; + map["a"] = "aa"; + map["c"] = "cc"; + map["e"] = "ee"; + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + AssertIter(iter.get(), "c", "cc"); + iter->Next(); + AssertIter(iter.get(), "e", "ee"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->SeekToLast(); + AssertIter(iter.get(), "e", "ee"); + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + iter->Prev(); + AssertIter(iter.get(), "a", "aa"); + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("b"); + AssertIter(iter.get(), "c", "cc"); + + iter->Prev(); + AssertIter(iter.get(), "a", "aa"); + + iter->Seek("a"); + AssertIter(iter.get(), "a", "aa"); + } + + // Test the case that there is one element in the write batch + batch.Put(&cf2, "zoo", "bar"); + batch.Put(&cf1, "a", "aa"); + { + KVMap empty_map; + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + batch.Delete(&cf1, "b"); + batch.Put(&cf1, "c", "cc"); + batch.Put(&cf1, "d", "dd"); + batch.Delete(&cf1, "e"); + + { + KVMap map; + map["b"] = ""; + map["cc"] = "cccc"; + map["f"] = "ff"; + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + AssertIter(iter.get(), "c", "cc"); + iter->Next(); + AssertIter(iter.get(), "cc", "cccc"); + iter->Next(); + AssertIter(iter.get(), "d", "dd"); + iter->Next(); + AssertIter(iter.get(), "f", "ff"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->SeekToLast(); + AssertIter(iter.get(), "f", "ff"); + iter->Prev(); + AssertIter(iter.get(), "d", "dd"); + iter->Prev(); + AssertIter(iter.get(), "cc", "cccc"); + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + iter->Next(); + AssertIter(iter.get(), "cc", "cccc"); + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + iter->Prev(); + AssertIter(iter.get(), "a", "aa"); + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("c"); + AssertIter(iter.get(), "c", "cc"); + + iter->Seek("cb"); + AssertIter(iter.get(), "cc", "cccc"); + + iter->Seek("cc"); + AssertIter(iter.get(), "cc", "cccc"); + iter->Next(); + AssertIter(iter.get(), "d", "dd"); + + iter->Seek("e"); + AssertIter(iter.get(), "f", "ff"); + + iter->Prev(); + AssertIter(iter.get(), "d", "dd"); + + iter->Next(); + AssertIter(iter.get(), "f", "ff"); + } + + { + KVMap empty_map; + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + AssertIter(iter.get(), "c", "cc"); + iter->Next(); + AssertIter(iter.get(), "d", "dd"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->SeekToLast(); + AssertIter(iter.get(), "d", "dd"); + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + iter->Prev(); + AssertIter(iter.get(), "a", "aa"); + + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("aa"); + AssertIter(iter.get(), "c", "cc"); + iter->Next(); + AssertIter(iter.get(), "d", "dd"); + + iter->Seek("ca"); + AssertIter(iter.get(), "d", "dd"); + + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + } +} + +TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { + ColumnFamilyHandleImplDummy cf1(6, ReverseBytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(2, ReverseBytewiseComparator()); + WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + + // Test the case that there is one element in the write batch + batch.Put(&cf2, "zoo", "bar"); + batch.Put(&cf1, "a", "aa"); + { + KVMap empty_map; + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + batch.Put(&cf1, "c", "cc"); + { + KVMap map; + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "c", "cc"); + iter->Next(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->SeekToLast(); + AssertIter(iter.get(), "a", "aa"); + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("b"); + AssertIter(iter.get(), "a", "aa"); + + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + + iter->Seek("a"); + AssertIter(iter.get(), "a", "aa"); + } + + // default column family + batch.Put("a", "b"); + { + KVMap map; + map["b"] = ""; + std::unique_ptr<Iterator> iter(batch.NewIteratorWithBase(new KVIter(&map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "a", "b"); + iter->Next(); + AssertIter(iter.get(), "b", ""); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->SeekToLast(); + AssertIter(iter.get(), "b", ""); + iter->Prev(); + AssertIter(iter.get(), "a", "b"); + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("b"); + AssertIter(iter.get(), "b", ""); + + iter->Prev(); + AssertIter(iter.get(), "a", "b"); + + iter->Seek("0"); + AssertIter(iter.get(), "a", "b"); + } +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatch) { + Options options; + WriteBatchWithIndex batch; + Status s; + std::string value; + + s = batch.GetFromBatch(options, "b", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Put("a", "a"); + batch.Put("b", "b"); + batch.Put("c", "c"); + batch.Put("a", "z"); + batch.Delete("c"); + batch.Delete("d"); + batch.Delete("e"); + batch.Put("e", "e"); + + s = batch.GetFromBatch(options, "b", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); + + s = batch.GetFromBatch(options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("z", value); + + s = batch.GetFromBatch(options, "c", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatch(options, "d", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatch(options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatch(options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e", value); + + batch.Merge("z", "z"); + + s = batch.GetFromBatch(options, "z", &value); + ASSERT_NOK(s); // No merge operator specified. + + s = batch.GetFromBatch(options, "b", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) { + DB* db; + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + options.create_if_missing = true; + + std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + + ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); + WriteBatchWithIndex batch; + std::string value; + + s = batch.GetFromBatch(options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Put("x", "X"); + std::string expected = "X"; + + for (int i = 0; i < 5; i++) { + batch.Merge("x", ToString(i)); + expected = expected + "," + ToString(i); + + if (i % 2 == 0) { + batch.Put("y", ToString(i / 2)); + } + + batch.Merge("z", "z"); + + s = batch.GetFromBatch(column_family, options, "x", &value); + ASSERT_OK(s); + ASSERT_EQ(expected, value); + + s = batch.GetFromBatch(column_family, options, "y", &value); + ASSERT_OK(s); + ASSERT_EQ(ToString(i / 2), value); + + s = batch.GetFromBatch(column_family, options, "z", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + } + + delete db; + DestroyDB(dbname, options); +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) { + DB* db; + Options options; + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + options.create_if_missing = true; + + std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + + ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); + + // Test batch with overwrite_key=true + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + std::string value; + + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Put(column_family, "X", "x"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x", value); + + batch.Put(column_family, "X", "x2"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x2", value); + + batch.Merge(column_family, "X", "aaa"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Merge(column_family, "X", "bbb"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Put(column_family, "X", "x3"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("x3", value); + + batch.Merge(column_family, "X", "ccc"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Delete(column_family, "X"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Merge(column_family, "X", "ddd"); + s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + delete db; + DestroyDB(dbname, options); +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { + DB* db; + Options options; + options.create_if_missing = true; + std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + + WriteBatchWithIndex batch; + ReadOptions read_options; + WriteOptions write_options; + std::string value; + + s = db->Put(write_options, "a", "a"); + ASSERT_OK(s); + + s = db->Put(write_options, "b", "b"); + ASSERT_OK(s); + + s = db->Put(write_options, "c", "c"); + ASSERT_OK(s); + + batch.Put("a", "batch.a"); + batch.Delete("b"); + + s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("batch.a", value); + + s = batch.GetFromBatchAndDB(db, read_options, "b", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatchAndDB(db, read_options, "c", &value); + ASSERT_OK(s); + ASSERT_EQ("c", value); + + s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + db->Delete(write_options, "x"); + + s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete db; + DestroyDB(dbname, options); +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) { + DB* db; + Options options; + + options.create_if_missing = true; + std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); + + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + assert(s.ok()); + + WriteBatchWithIndex batch; + ReadOptions read_options; + WriteOptions write_options; + std::string value; + + s = db->Put(write_options, "a", "a0"); + ASSERT_OK(s); + + s = db->Put(write_options, "b", "b0"); + ASSERT_OK(s); + + s = db->Merge(write_options, "b", "b1"); + ASSERT_OK(s); + + s = db->Merge(write_options, "c", "c0"); + ASSERT_OK(s); + + s = db->Merge(write_options, "d", "d0"); + ASSERT_OK(s); + + batch.Merge("a", "a1"); + batch.Merge("a", "a2"); + batch.Merge("b", "b2"); + batch.Merge("d", "d1"); + batch.Merge("e", "e0"); + + s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("a0,a1,a2", value); + + s = batch.GetFromBatchAndDB(db, read_options, "b", &value); + ASSERT_OK(s); + ASSERT_EQ("b0,b1,b2", value); + + s = batch.GetFromBatchAndDB(db, read_options, "c", &value); + ASSERT_OK(s); + ASSERT_EQ("c0", value); + + s = batch.GetFromBatchAndDB(db, read_options, "d", &value); + ASSERT_OK(s); + ASSERT_EQ("d0,d1", value); + + s = batch.GetFromBatchAndDB(db, read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e0", value); + + s = db->Delete(write_options, "x"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + ASSERT_TRUE(s.IsNotFound()); + + const Snapshot* snapshot = db->GetSnapshot(); + ReadOptions snapshot_read_options; + snapshot_read_options.snapshot = snapshot; + + s = db->Delete(write_options, "a"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("a1,a2", value); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("a0,a1,a2", value); + + batch.Delete("a"); + + s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Merge(write_options, "c", "c1"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "c", &value); + ASSERT_OK(s); + ASSERT_EQ("c0,c1", value); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "c", &value); + ASSERT_OK(s); + ASSERT_EQ("c0", value); + + s = db->Put(write_options, "e", "e1"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e1,e0", value); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e0", value); + + s = db->Delete(write_options, "e"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e0", value); + + s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value); + ASSERT_OK(s); + ASSERT_EQ("e0", value); + + db->ReleaseSnapshot(snapshot); + delete db; + DestroyDB(dbname, options); +} + +TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) { + DB* db; + Options options; + + options.create_if_missing = true; + std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); + + options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + + DestroyDB(dbname, options); + Status s = DB::Open(options, dbname, &db); + assert(s.ok()); + + // Test batch with overwrite_key=true + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + + ReadOptions read_options; + WriteOptions write_options; + std::string value; + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + batch.Merge("A", "xxx"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Merge("A", "yyy"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + s = db->Put(write_options, "A", "a0"); + ASSERT_OK(s); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsMergeInProgress()); + + batch.Delete("A"); + + s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete db; + DestroyDB(dbname, options); +} + +void AssertKey(std::string key, WBWIIterator* iter) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(key, iter->Entry().key.ToString()); +} + +void AssertValue(std::string value, WBWIIterator* iter) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(value, iter->Entry().value.ToString()); +} + +// Tests that we can write to the WBWI while we iterate (from a single thread). +// iteration should see the newest writes +TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingCorrectnessTest) { + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + for (char c = 'a'; c <= 'z'; ++c) { + batch.Put(std::string(1, c), std::string(1, c)); + } + + std::unique_ptr<WBWIIterator> iter(batch.NewIterator()); + iter->Seek("k"); + AssertKey("k", iter.get()); + iter->Next(); + AssertKey("l", iter.get()); + batch.Put("ab", "cc"); + iter->Next(); + AssertKey("m", iter.get()); + batch.Put("mm", "kk"); + iter->Next(); + AssertKey("mm", iter.get()); + AssertValue("kk", iter.get()); + batch.Delete("mm"); + + iter->Next(); + AssertKey("n", iter.get()); + iter->Prev(); + AssertKey("mm", iter.get()); + ASSERT_EQ(kDeleteRecord, iter->Entry().type); + + iter->Seek("ab"); + AssertKey("ab", iter.get()); + batch.Delete("x"); + iter->Seek("x"); + AssertKey("x", iter.get()); + ASSERT_EQ(kDeleteRecord, iter->Entry().type); + iter->Prev(); + AssertKey("w", iter.get()); +} + +void AssertIterKey(std::string key, Iterator* iter) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(key, iter->key().ToString()); +} + +void AssertIterValue(std::string value, Iterator* iter) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(value, iter->value().ToString()); +} + +// same thing as above, but testing IteratorWithBase +TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + for (char c = 'a'; c <= 'z'; ++c) { + batch.Put(std::string(1, c), std::string(1, c)); + } + + KVMap map; + map["aa"] = "aa"; + map["cc"] = "cc"; + map["ee"] = "ee"; + map["em"] = "me"; + + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(new KVIter(&map))); + iter->Seek("k"); + AssertIterKey("k", iter.get()); + iter->Next(); + AssertIterKey("l", iter.get()); + batch.Put("ab", "cc"); + iter->Next(); + AssertIterKey("m", iter.get()); + batch.Put("mm", "kk"); + iter->Next(); + AssertIterKey("mm", iter.get()); + AssertIterValue("kk", iter.get()); + batch.Delete("mm"); + iter->Next(); + AssertIterKey("n", iter.get()); + iter->Prev(); + // "mm" is deleted, so we're back at "m" + AssertIterKey("m", iter.get()); + + iter->Seek("ab"); + AssertIterKey("ab", iter.get()); + iter->Prev(); + AssertIterKey("aa", iter.get()); + iter->Prev(); + AssertIterKey("a", iter.get()); + batch.Delete("aa"); + iter->Next(); + AssertIterKey("ab", iter.get()); + iter->Prev(); + AssertIterKey("a", iter.get()); + + batch.Delete("x"); + iter->Seek("x"); + AssertIterKey("y", iter.get()); + iter->Next(); + AssertIterKey("z", iter.get()); + iter->Prev(); + iter->Prev(); + AssertIterKey("w", iter.get()); + + batch.Delete("e"); + iter->Seek("e"); + AssertIterKey("ee", iter.get()); + AssertIterValue("ee", iter.get()); + batch.Put("ee", "xx"); + // still the same value + AssertIterValue("ee", iter.get()); + iter->Next(); + AssertIterKey("em", iter.get()); + iter->Prev(); + // new value + AssertIterValue("xx", iter.get()); +} + +// stress testing mutations with IteratorWithBase +TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { + WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + for (char c = 'a'; c <= 'z'; ++c) { + batch.Put(std::string(1, c), std::string(1, c)); + } + + KVMap map; + for (char c = 'a'; c <= 'z'; ++c) { + map[std::string(2, c)] = std::string(2, c); + } + + std::unique_ptr<Iterator> iter( + batch.NewIteratorWithBase(new KVIter(&map))); + + Random rnd(301); + for (int i = 0; i < 1000000; ++i) { + int random = rnd.Uniform(8); + char c = static_cast<char>(rnd.Uniform(26) + 'a'); + switch (random) { + case 0: + batch.Put(std::string(1, c), "xxx"); + break; + case 1: + batch.Put(std::string(2, c), "xxx"); + break; + case 2: + batch.Delete(std::string(1, c)); + break; + case 3: + batch.Delete(std::string(2, c)); + break; + case 4: + iter->Seek(std::string(1, c)); + break; + case 5: + iter->Seek(std::string(2, c)); + break; + case 6: + if (iter->Valid()) { + iter->Next(); + } + break; + case 7: + if (iter->Valid()) { + iter->Prev(); + } + break; + default: + assert(false); + } + } +} + +static std::string PrintContents(WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family) { + std::string result; + + WBWIIterator* iter; + if (column_family == nullptr) { + iter = batch->NewIterator(); + } else { + iter = batch->NewIterator(column_family); + } + + iter->SeekToFirst(); + while (iter->Valid()) { + WriteEntry e = iter->Entry(); + + if (e.type == kPutRecord) { + result.append("PUT("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else if (e.type == kMergeRecord) { + result.append("MERGE("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else if (e.type == kSingleDeleteRecord) { + result.append("SINGLE-DEL("); + result.append(e.key.ToString()); + result.append(")"); + } else { + assert(e.type == kDeleteRecord); + result.append("DEL("); + result.append(e.key.ToString()); + result.append(")"); + } + + result.append(","); + iter->Next(); + } + + delete iter; + return result; +} + +static std::string PrintContents(WriteBatchWithIndex* batch, KVMap* base_map, + ColumnFamilyHandle* column_family) { + std::string result; + + Iterator* iter; + if (column_family == nullptr) { + iter = batch->NewIteratorWithBase(new KVIter(base_map)); + } else { + iter = batch->NewIteratorWithBase(column_family, new KVIter(base_map)); + } + + iter->SeekToFirst(); + while (iter->Valid()) { + assert(iter->status().ok()); + + Slice key = iter->key(); + Slice value = iter->value(); + + result.append(key.ToString()); + result.append(":"); + result.append(value.ToString()); + result.append(","); + + iter->Next(); + } + + delete iter; + return result; +} + +TEST_F(WriteBatchWithIndexTest, SavePointTest) { + WriteBatchWithIndex batch; + ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator()); + Status s; + + batch.Put("A", "a"); + batch.Put("B", "b"); + batch.Put("A", "aa"); + batch.Put(&cf1, "A", "a1"); + batch.Delete(&cf1, "B"); + batch.Put(&cf1, "C", "c1"); + batch.Put(&cf1, "E", "e1"); + + batch.SetSavePoint(); // 1 + + batch.Put("C", "cc"); + batch.Put("B", "bb"); + batch.Delete("A"); + batch.Put(&cf1, "B", "b1"); + batch.Delete(&cf1, "A"); + batch.SingleDelete(&cf1, "E"); + batch.SetSavePoint(); // 2 + + batch.Put("A", "aaa"); + batch.Put("A", "xxx"); + batch.Delete("B"); + batch.Put(&cf1, "B", "b2"); + batch.Delete(&cf1, "C"); + batch.SetSavePoint(); // 3 + batch.SetSavePoint(); // 4 + batch.SingleDelete("D"); + batch.Delete(&cf1, "D"); + batch.Delete(&cf1, "E"); + + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,SINGLE-DEL(D),", + PrintContents(&batch, nullptr)); + + ASSERT_EQ( + "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," + "DEL(D),PUT(E):e1,SINGLE-DEL(E),DEL(E),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 4 + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ( + "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," + "PUT(E):e1,SINGLE-DEL(E),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 3 + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ( + "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," + "PUT(E):e1,SINGLE-DEL(E),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 2 + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ( + "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1," + "PUT(E):e1,SINGLE-DEL(E),", + PrintContents(&batch, &cf1)); + + batch.SetSavePoint(); // 5 + batch.Put("X", "x"); + + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,PUT(X):x,", + PrintContents(&batch, nullptr)); + + ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 5 + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ( + "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1," + "PUT(E):e1,SINGLE-DEL(E),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 1 + ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,PUT(E):e1,", + PrintContents(&batch, &cf1)); + + s = batch.RollbackToSavePoint(); // no savepoint found + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,PUT(E):e1,", + PrintContents(&batch, &cf1)); + + batch.SetSavePoint(); // 6 + + batch.Clear(); + ASSERT_EQ("", PrintContents(&batch, nullptr)); + ASSERT_EQ("", PrintContents(&batch, &cf1)); + + s = batch.RollbackToSavePoint(); // rollback to 6 + ASSERT_TRUE(s.IsNotFound()); +} + +TEST_F(WriteBatchWithIndexTest, SingleDeleteTest) { + WriteBatchWithIndex batch; + Status s; + std::string value; + DBOptions db_options; + + batch.SingleDelete("A"); + + s = batch.GetFromBatch(db_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + s = batch.GetFromBatch(db_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); + value = PrintContents(&batch, nullptr); + ASSERT_EQ("SINGLE-DEL(A),", value); + + batch.Clear(); + batch.Put("A", "a"); + batch.Put("A", "a2"); + batch.Put("B", "b"); + batch.SingleDelete("A"); + + s = batch.GetFromBatch(db_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + s = batch.GetFromBatch(db_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("b", value); + + value = PrintContents(&batch, nullptr); + ASSERT_EQ("PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(B):b,", value); + + batch.Put("C", "c"); + batch.Put("A", "a3"); + batch.Delete("B"); + batch.SingleDelete("B"); + batch.SingleDelete("C"); + + s = batch.GetFromBatch(db_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a3", value); + s = batch.GetFromBatch(db_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); + s = batch.GetFromBatch(db_options, "C", &value); + ASSERT_TRUE(s.IsNotFound()); + s = batch.GetFromBatch(db_options, "D", &value); + ASSERT_TRUE(s.IsNotFound()); + + value = PrintContents(&batch, nullptr); + ASSERT_EQ( + "PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(A):a3,PUT(B):b,DEL(B),SINGLE-DEL(B)" + ",PUT(C):c,SINGLE-DEL(C),", + value); + + batch.Put("B", "b4"); + batch.Put("C", "c4"); + batch.Put("D", "d4"); + batch.SingleDelete("D"); + batch.SingleDelete("D"); + batch.Delete("A"); + + s = batch.GetFromBatch(db_options, "A", &value); + ASSERT_TRUE(s.IsNotFound()); + s = batch.GetFromBatch(db_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("b4", value); + s = batch.GetFromBatch(db_options, "C", &value); + ASSERT_OK(s); + ASSERT_EQ("c4", value); + s = batch.GetFromBatch(db_options, "D", &value); + ASSERT_TRUE(s.IsNotFound()); + + value = PrintContents(&batch, nullptr); + ASSERT_EQ( + "PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(A):a3,DEL(A),PUT(B):b,DEL(B)," + "SINGLE-DEL(B),PUT(B):b4,PUT(C):c,SINGLE-DEL(C),PUT(C):c4,PUT(D):d4," + "SINGLE-DEL(D),SINGLE-DEL(D),", + value); +} + +TEST_F(WriteBatchWithIndexTest, SingleDeleteDeltaIterTest) { + Status s; + std::string value; + DBOptions db_options; + WriteBatchWithIndex batch(BytewiseComparator(), 20, true /* overwrite_key */); + batch.Put("A", "a"); + batch.Put("A", "a2"); + batch.Put("B", "b"); + batch.SingleDelete("A"); + batch.Delete("B"); + + KVMap map; + value = PrintContents(&batch, &map, nullptr); + ASSERT_EQ("", value); + + map["A"] = "aa"; + map["C"] = "cc"; + map["D"] = "dd"; + + batch.SingleDelete("B"); + batch.SingleDelete("C"); + batch.SingleDelete("Z"); + + value = PrintContents(&batch, &map, nullptr); + ASSERT_EQ("D:dd,", value); + + batch.Put("A", "a3"); + batch.Put("B", "b3"); + batch.SingleDelete("A"); + batch.SingleDelete("A"); + batch.SingleDelete("D"); + batch.SingleDelete("D"); + batch.Delete("D"); + + map["E"] = "ee"; + + value = PrintContents(&batch, &map, nullptr); + ASSERT_EQ("B:b3,E:ee,", value); +} + +} // namespace + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main() { + fprintf(stderr, "SKIPPED\n"); + return 0; +} + +#endif // !ROCKSDB_LITE |