summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc')
-rw-r--r--src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc288
1 files changed, 288 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
new file mode 100644
index 000000000..8c1222f21
--- /dev/null
+++ b/src/rocksdb/utilities/write_batch_with_index/write_batch_with_index_internal.cc
@@ -0,0 +1,288 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
+
+#include "db/column_family.h"
+#include "db/merge_context.h"
+#include "db/merge_helper.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/db.h"
+#include "rocksdb/utilities/write_batch_with_index.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class Env;
+class Logger;
+class Statistics;
+
+Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
+ WriteType* type, Slice* Key,
+ Slice* value, Slice* blob,
+ Slice* xid) const {
+ if (type == nullptr || Key == nullptr || value == nullptr ||
+ blob == nullptr || xid == nullptr) {
+ return Status::InvalidArgument("Output parameters cannot be null");
+ }
+
+ if (data_offset == GetDataSize()) {
+ // reached end of batch.
+ return Status::NotFound();
+ }
+
+ if (data_offset > GetDataSize()) {
+ return Status::InvalidArgument("data offset exceed write batch size");
+ }
+ Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
+ char tag;
+ uint32_t column_family;
+ Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
+ blob, xid);
+
+ switch (tag) {
+ case kTypeColumnFamilyValue:
+ case kTypeValue:
+ *type = kPutRecord;
+ break;
+ case kTypeColumnFamilyDeletion:
+ case kTypeDeletion:
+ *type = kDeleteRecord;
+ break;
+ case kTypeColumnFamilySingleDeletion:
+ case kTypeSingleDeletion:
+ *type = kSingleDeleteRecord;
+ break;
+ case kTypeColumnFamilyRangeDeletion:
+ case kTypeRangeDeletion:
+ *type = kDeleteRangeRecord;
+ break;
+ case kTypeColumnFamilyMerge:
+ case kTypeMerge:
+ *type = kMergeRecord;
+ break;
+ case kTypeLogData:
+ *type = kLogDataRecord;
+ break;
+ case kTypeNoop:
+ case kTypeBeginPrepareXID:
+ case kTypeBeginPersistedPrepareXID:
+ case kTypeBeginUnprepareXID:
+ case kTypeEndPrepareXID:
+ case kTypeCommitXID:
+ case kTypeRollbackXID:
+ *type = kXIDRecord;
+ break;
+ default:
+ return Status::Corruption("unknown WriteBatch tag ",
+ ToString(static_cast<unsigned int>(tag)));
+ }
+ return Status::OK();
+}
+
+// If both of `entry1` and `entry2` point to real entry in write batch, we
+// compare the entries as following:
+// 1. first compare the column family, the one with larger CF will be larger;
+// 2. Inside the same CF, we first decode the entry to find the key of the entry
+// and the entry with larger key will be larger;
+// 3. If two entries are of the same CF and offset, the one with larger offset
+// will be larger.
+// Some times either `entry1` or `entry2` is dummy entry, which is actually
+// a search key. In this case, in step 2, we don't go ahead and decode the
+// entry but use the value in WriteBatchIndexEntry::search_key.
+// One special case is WriteBatchIndexEntry::key_size is kFlagMinInCf.
+// This indicate that we are going to seek to the first of the column family.
+// Once we see this, this entry will be smaller than all the real entries of
+// the column family.
+int WriteBatchEntryComparator::operator()(
+ const WriteBatchIndexEntry* entry1,
+ const WriteBatchIndexEntry* entry2) const {
+ if (entry1->column_family > entry2->column_family) {
+ return 1;
+ } else if (entry1->column_family < entry2->column_family) {
+ return -1;
+ }
+
+ // Deal with special case of seeking to the beginning of a column family
+ if (entry1->is_min_in_cf()) {
+ return -1;
+ } else if (entry2->is_min_in_cf()) {
+ return 1;
+ }
+
+ Slice key1, key2;
+ if (entry1->search_key == nullptr) {
+ key1 = Slice(write_batch_->Data().data() + entry1->key_offset,
+ entry1->key_size);
+ } else {
+ key1 = *(entry1->search_key);
+ }
+ if (entry2->search_key == nullptr) {
+ key2 = Slice(write_batch_->Data().data() + entry2->key_offset,
+ entry2->key_size);
+ } else {
+ key2 = *(entry2->search_key);
+ }
+
+ int cmp = CompareKey(entry1->column_family, key1, key2);
+ if (cmp != 0) {
+ return cmp;
+ } else if (entry1->offset > entry2->offset) {
+ return 1;
+ } else if (entry1->offset < entry2->offset) {
+ return -1;
+ }
+ return 0;
+}
+
+int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
+ const Slice& key1,
+ const Slice& key2) const {
+ if (column_family < cf_comparators_.size() &&
+ cf_comparators_[column_family] != nullptr) {
+ return cf_comparators_[column_family]->Compare(key1, key2);
+ } else {
+ return default_comparator_->Compare(key1, key2);
+ }
+}
+
+WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
+ const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ MergeContext* merge_context, WriteBatchEntryComparator* cmp,
+ std::string* value, bool overwrite_key, Status* s) {
+ uint32_t cf_id = GetColumnFamilyID(column_family);
+ *s = Status::OK();
+ WriteBatchWithIndexInternal::Result result =
+ WriteBatchWithIndexInternal::Result::kNotFound;
+
+ std::unique_ptr<WBWIIterator> iter =
+ std::unique_ptr<WBWIIterator>(batch->NewIterator(column_family));
+
+ // We want to iterate in the reverse order that the writes were added to the
+ // batch. Since we don't have a reverse iterator, we must seek past the end.
+ // TODO(agiardullo): consider adding support for reverse iteration
+ iter->Seek(key);
+ while (iter->Valid()) {
+ const WriteEntry entry = iter->Entry();
+ if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
+ break;
+ }
+
+ iter->Next();
+ }
+
+ if (!(*s).ok()) {
+ return WriteBatchWithIndexInternal::Result::kError;
+ }
+
+ if (!iter->Valid()) {
+ // Read past end of results. Reposition on last result.
+ iter->SeekToLast();
+ } else {
+ iter->Prev();
+ }
+
+ Slice entry_value;
+ while (iter->Valid()) {
+ const WriteEntry entry = iter->Entry();
+ if (cmp->CompareKey(cf_id, entry.key, key) != 0) {
+ // Unexpected error or we've reached a different next key
+ break;
+ }
+
+ switch (entry.type) {
+ case kPutRecord: {
+ result = WriteBatchWithIndexInternal::Result::kFound;
+ entry_value = entry.value;
+ break;
+ }
+ case kMergeRecord: {
+ result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
+ merge_context->PushOperand(entry.value);
+ break;
+ }
+ case kDeleteRecord:
+ case kSingleDeleteRecord: {
+ result = WriteBatchWithIndexInternal::Result::kDeleted;
+ break;
+ }
+ case kLogDataRecord:
+ case kXIDRecord: {
+ // ignore
+ break;
+ }
+ default: {
+ result = WriteBatchWithIndexInternal::Result::kError;
+ (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
+ ToString(entry.type));
+ break;
+ }
+ }
+ if (result == WriteBatchWithIndexInternal::Result::kFound ||
+ result == WriteBatchWithIndexInternal::Result::kDeleted ||
+ result == WriteBatchWithIndexInternal::Result::kError) {
+ // We can stop iterating once we find a PUT or DELETE
+ break;
+ }
+ if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
+ overwrite_key == true) {
+ // Since we've overwritten keys, we do not know what other operations are
+ // in this batch for this key, so we cannot do a Merge to compute the
+ // result. Instead, we will simply return MergeInProgress.
+ break;
+ }
+
+ iter->Prev();
+ }
+
+ if ((*s).ok()) {
+ if (result == WriteBatchWithIndexInternal::Result::kFound ||
+ result == WriteBatchWithIndexInternal::Result::kDeleted) {
+ // Found a Put or Delete. Merge if necessary.
+ if (merge_context->GetNumOperands() > 0) {
+ const MergeOperator* merge_operator;
+
+ if (column_family != nullptr) {
+ auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+ merge_operator = cfh->cfd()->ioptions()->merge_operator;
+ } else {
+ *s = Status::InvalidArgument("Must provide a column_family");
+ result = WriteBatchWithIndexInternal::Result::kError;
+ return result;
+ }
+ Statistics* statistics = immuable_db_options.statistics.get();
+ Env* env = immuable_db_options.env;
+ Logger* logger = immuable_db_options.info_log.get();
+
+ if (merge_operator) {
+ *s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
+ merge_context->GetOperands(), value,
+ logger, statistics, env);
+ } else {
+ *s = Status::InvalidArgument("Options::merge_operator must be set");
+ }
+ if ((*s).ok()) {
+ result = WriteBatchWithIndexInternal::Result::kFound;
+ } else {
+ result = WriteBatchWithIndexInternal::Result::kError;
+ }
+ } else { // nothing to merge
+ if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT
+ value->assign(entry_value.data(), entry_value.size());
+ }
+ }
+ }
+ }
+
+ return result;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // !ROCKSDB_LITE