diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/write_batch.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/write_batch.cc')
-rw-r--r-- | src/rocksdb/db/write_batch.cc | 3137 |
1 files changed, 3137 insertions, 0 deletions
diff --git a/src/rocksdb/db/write_batch.cc b/src/rocksdb/db/write_batch.cc new file mode 100644 index 000000000..796697cfc --- /dev/null +++ b/src/rocksdb/db/write_batch.cc @@ -0,0 +1,3137 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// WriteBatch::rep_ := +// sequence: fixed64 +// count: fixed32 +// data: record[count] +// record := +// kTypeValue varstring varstring +// kTypeDeletion varstring +// kTypeSingleDeletion varstring +// kTypeRangeDeletion varstring varstring +// kTypeMerge varstring varstring +// kTypeColumnFamilyValue varint32 varstring varstring +// kTypeColumnFamilyDeletion varint32 varstring +// kTypeColumnFamilySingleDeletion varint32 varstring +// kTypeColumnFamilyRangeDeletion varint32 varstring varstring +// kTypeColumnFamilyMerge varint32 varstring varstring +// kTypeBeginPrepareXID +// kTypeEndPrepareXID varstring +// kTypeCommitXID varstring +// kTypeCommitXIDAndTimestamp varstring varstring +// kTypeRollbackXID varstring +// kTypeBeginPersistedPrepareXID +// kTypeBeginUnprepareXID +// kTypeWideColumnEntity varstring varstring +// kTypeColumnFamilyWideColumnEntity varint32 varstring varstring +// kTypeNoop +// varstring := +// len: varint32 +// data: uint8[len] + +#include "rocksdb/write_batch.h" + +#include <algorithm> +#include <limits> +#include <map> +#include <stack> +#include <stdexcept> +#include <type_traits> +#include <unordered_map> +#include <vector> + +#include "db/column_family.h" +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "db/flush_scheduler.h" +#include "db/kv_checksum.h" +#include "db/memtable.h" +#include "db/merge_context.h" +#include "db/snapshot_impl.h" +#include "db/trim_history_scheduler.h" +#include "db/wide/wide_column_serialization.h" +#include "db/write_batch_internal.h" +#include "monitoring/perf_context_imp.h" +#include "monitoring/statistics.h" +#include "port/lang.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/system_clock.h" +#include "util/autovector.h" +#include "util/cast_util.h" +#include "util/coding.h" +#include "util/duplicate_detector.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +// anon namespace for file-local types +namespace { + +enum ContentFlags : uint32_t { + DEFERRED = 1 << 0, + HAS_PUT = 1 << 1, + HAS_DELETE = 1 << 2, + HAS_SINGLE_DELETE = 1 << 3, + HAS_MERGE = 1 << 4, + HAS_BEGIN_PREPARE = 1 << 5, + HAS_END_PREPARE = 1 << 6, + HAS_COMMIT = 1 << 7, + HAS_ROLLBACK = 1 << 8, + HAS_DELETE_RANGE = 1 << 9, + HAS_BLOB_INDEX = 1 << 10, + HAS_BEGIN_UNPREPARE = 1 << 11, + HAS_PUT_ENTITY = 1 << 12, +}; + +struct BatchContentClassifier : public WriteBatch::Handler { + uint32_t content_flags = 0; + + Status PutCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_PUT; + return Status::OK(); + } + + Status PutEntityCF(uint32_t /* column_family_id */, const Slice& /* key */, + const Slice& /* entity */) override { + content_flags |= ContentFlags::HAS_PUT_ENTITY; + return Status::OK(); + } + + Status DeleteCF(uint32_t, const Slice&) override { + content_flags |= ContentFlags::HAS_DELETE; + return Status::OK(); + } + + Status SingleDeleteCF(uint32_t, const Slice&) override { + content_flags |= ContentFlags::HAS_SINGLE_DELETE; + return Status::OK(); + } + + Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_DELETE_RANGE; + return Status::OK(); + } + + Status MergeCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_MERGE; + return Status::OK(); + } + + Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_BLOB_INDEX; + return Status::OK(); + } + + Status MarkBeginPrepare(bool unprepare) override { + content_flags |= ContentFlags::HAS_BEGIN_PREPARE; + if (unprepare) { + content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE; + } + return Status::OK(); + } + + Status MarkEndPrepare(const Slice&) override { + content_flags |= ContentFlags::HAS_END_PREPARE; + return Status::OK(); + } + + Status MarkCommit(const Slice&) override { + content_flags |= ContentFlags::HAS_COMMIT; + return Status::OK(); + } + + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_COMMIT; + return Status::OK(); + } + + Status MarkRollback(const Slice&) override { + content_flags |= ContentFlags::HAS_ROLLBACK; + return Status::OK(); + } +}; + +} // anonymous namespace + +struct SavePoints { + std::stack<SavePoint, autovector<SavePoint>> stack; +}; + +WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, + size_t protection_bytes_per_key, size_t default_cf_ts_sz) + : content_flags_(0), + max_bytes_(max_bytes), + default_cf_ts_sz_(default_cf_ts_sz), + rep_() { + // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per + // entry. + assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8); + if (protection_bytes_per_key != 0) { + prot_info_.reset(new WriteBatch::ProtectionInfo()); + } + rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) + ? reserved_bytes + : WriteBatchInternal::kHeader); + rep_.resize(WriteBatchInternal::kHeader); +} + +WriteBatch::WriteBatch(const std::string& rep) + : content_flags_(ContentFlags::DEFERRED), max_bytes_(0), rep_(rep) {} + +WriteBatch::WriteBatch(std::string&& rep) + : content_flags_(ContentFlags::DEFERRED), + max_bytes_(0), + rep_(std::move(rep)) {} + +WriteBatch::WriteBatch(const WriteBatch& src) + : wal_term_point_(src.wal_term_point_), + content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + max_bytes_(src.max_bytes_), + default_cf_ts_sz_(src.default_cf_ts_sz_), + rep_(src.rep_) { + if (src.save_points_ != nullptr) { + save_points_.reset(new SavePoints()); + save_points_->stack = src.save_points_->stack; + } + if (src.prot_info_ != nullptr) { + prot_info_.reset(new WriteBatch::ProtectionInfo()); + prot_info_->entries_ = src.prot_info_->entries_; + } +} + +WriteBatch::WriteBatch(WriteBatch&& src) noexcept + : save_points_(std::move(src.save_points_)), + wal_term_point_(std::move(src.wal_term_point_)), + content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + max_bytes_(src.max_bytes_), + prot_info_(std::move(src.prot_info_)), + default_cf_ts_sz_(src.default_cf_ts_sz_), + rep_(std::move(src.rep_)) {} + +WriteBatch& WriteBatch::operator=(const WriteBatch& src) { + if (&src != this) { + this->~WriteBatch(); + new (this) WriteBatch(src); + } + return *this; +} + +WriteBatch& WriteBatch::operator=(WriteBatch&& src) { + if (&src != this) { + this->~WriteBatch(); + new (this) WriteBatch(std::move(src)); + } + return *this; +} + +WriteBatch::~WriteBatch() {} + +WriteBatch::Handler::~Handler() {} + +void WriteBatch::Handler::LogData(const Slice& /*blob*/) { + // If the user has not specified something to do with blobs, then we ignore + // them. +} + +bool WriteBatch::Handler::Continue() { return true; } + +void WriteBatch::Clear() { + rep_.clear(); + rep_.resize(WriteBatchInternal::kHeader); + + content_flags_.store(0, std::memory_order_relaxed); + + if (save_points_ != nullptr) { + while (!save_points_->stack.empty()) { + save_points_->stack.pop(); + } + } + + if (prot_info_ != nullptr) { + prot_info_->entries_.clear(); + } + wal_term_point_.clear(); + default_cf_ts_sz_ = 0; +} + +uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); } + +uint32_t WriteBatch::ComputeContentFlags() const { + auto rv = content_flags_.load(std::memory_order_relaxed); + if ((rv & ContentFlags::DEFERRED) != 0) { + BatchContentClassifier classifier; + // Should we handle status here? + Iterate(&classifier).PermitUncheckedError(); + rv = classifier.content_flags; + + // this method is conceptually const, because it is performing a lazy + // computation that doesn't affect the abstract state of the batch. + // content_flags_ is marked mutable so that we can perform the + // following assignment + content_flags_.store(rv, std::memory_order_relaxed); + } + return rv; +} + +void WriteBatch::MarkWalTerminationPoint() { + wal_term_point_.size = GetDataSize(); + wal_term_point_.count = Count(); + wal_term_point_.content_flags = content_flags_; +} + +size_t WriteBatch::GetProtectionBytesPerKey() const { + if (prot_info_ != nullptr) { + return prot_info_->GetBytesPerKey(); + } + return 0; +} + +bool WriteBatch::HasPut() const { + return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; +} + +bool WriteBatch::HasPutEntity() const { + return (ComputeContentFlags() & ContentFlags::HAS_PUT_ENTITY) != 0; +} + +bool WriteBatch::HasDelete() const { + return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0; +} + +bool WriteBatch::HasSingleDelete() const { + return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0; +} + +bool WriteBatch::HasDeleteRange() const { + return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0; +} + +bool WriteBatch::HasMerge() const { + return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0; +} + +bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) { + assert(input != nullptr && key != nullptr); + // Skip tag byte + input->remove_prefix(1); + + if (cf_record) { + // Skip column_family bytes + uint32_t cf; + if (!GetVarint32(input, &cf)) { + return false; + } + } + + // Extract key + return GetLengthPrefixedSlice(input, key); +} + +bool WriteBatch::HasBeginPrepare() const { + return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0; +} + +bool WriteBatch::HasEndPrepare() const { + return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0; +} + +bool WriteBatch::HasCommit() const { + return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0; +} + +bool WriteBatch::HasRollback() const { + return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0; +} + +Status ReadRecordFromWriteBatch(Slice* input, char* tag, + uint32_t* column_family, Slice* key, + Slice* value, Slice* blob, Slice* xid) { + assert(key != nullptr && value != nullptr); + *tag = (*input)[0]; + input->remove_prefix(1); + *column_family = 0; // default + switch (*tag) { + case kTypeColumnFamilyValue: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch Put"); + } + FALLTHROUGH_INTENDED; + case kTypeValue: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch Put"); + } + break; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilySingleDeletion: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch Delete"); + } + FALLTHROUGH_INTENDED; + case kTypeDeletion: + case kTypeSingleDeletion: + if (!GetLengthPrefixedSlice(input, key)) { + return Status::Corruption("bad WriteBatch Delete"); + } + break; + case kTypeColumnFamilyRangeDeletion: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch DeleteRange"); + } + FALLTHROUGH_INTENDED; + case kTypeRangeDeletion: + // for range delete, "key" is begin_key, "value" is end_key + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch DeleteRange"); + } + break; + case kTypeColumnFamilyMerge: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch Merge"); + } + FALLTHROUGH_INTENDED; + case kTypeMerge: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch Merge"); + } + break; + case kTypeColumnFamilyBlobIndex: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch BlobIndex"); + } + FALLTHROUGH_INTENDED; + case kTypeBlobIndex: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch BlobIndex"); + } + break; + case kTypeLogData: + assert(blob != nullptr); + if (!GetLengthPrefixedSlice(input, blob)) { + return Status::Corruption("bad WriteBatch Blob"); + } + break; + case kTypeNoop: + case kTypeBeginPrepareXID: + // This indicates that the prepared batch is also persisted in the db. + // This is used in WritePreparedTxn + case kTypeBeginPersistedPrepareXID: + // This is used in WriteUnpreparedTxn + case kTypeBeginUnprepareXID: + break; + case kTypeEndPrepareXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad EndPrepare XID"); + } + break; + case kTypeCommitXIDAndTimestamp: + if (!GetLengthPrefixedSlice(input, key)) { + return Status::Corruption("bad commit timestamp"); + } + FALLTHROUGH_INTENDED; + case kTypeCommitXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad Commit XID"); + } + break; + case kTypeRollbackXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad Rollback XID"); + } + break; + case kTypeColumnFamilyWideColumnEntity: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch PutEntity"); + } + FALLTHROUGH_INTENDED; + case kTypeWideColumnEntity: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch PutEntity"); + } + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + return Status::OK(); +} + +Status WriteBatch::Iterate(Handler* handler) const { + if (rep_.size() < WriteBatchInternal::kHeader) { + return Status::Corruption("malformed WriteBatch (too small)"); + } + + return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader, + rep_.size()); +} + +Status WriteBatchInternal::Iterate(const WriteBatch* wb, + WriteBatch::Handler* handler, size_t begin, + size_t end) { + if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) { + return Status::Corruption("Invalid start/end bounds for Iterate"); + } + assert(begin <= end); + Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin)); + bool whole_batch = + (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size()); + + Slice key, value, blob, xid; + + // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as + // the batch boundary symbols otherwise we would mis-count the number of + // batches. We do that by checking whether the accumulated batch is empty + // before seeing the next Noop. + bool empty_batch = true; + uint32_t found = 0; + Status s; + char tag = 0; + uint32_t column_family = 0; // default + bool last_was_try_again = false; + bool handler_continue = true; + while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) { + handler_continue = handler->Continue(); + if (!handler_continue) { + break; + } + + if (LIKELY(!s.IsTryAgain())) { + last_was_try_again = false; + tag = 0; + column_family = 0; // default + + s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, + &blob, &xid); + if (!s.ok()) { + return s; + } + } else { + assert(s.IsTryAgain()); + assert(!last_was_try_again); // to detect infinite loop bugs + if (UNLIKELY(last_was_try_again)) { + return Status::Corruption( + "two consecutive TryAgain in WriteBatch handler; this is either a " + "software bug or data corruption."); + } + last_was_try_again = true; + s = Status::OK(); + } + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); + s = handler->PutCF(column_family, key, value); + if (LIKELY(s.ok())) { + empty_batch = false; + found++; + } + break; + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); + s = handler->DeleteCF(column_family, key); + if (LIKELY(s.ok())) { + empty_batch = false; + found++; + } + break; + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); + s = handler->SingleDeleteCF(column_family, key); + if (LIKELY(s.ok())) { + empty_batch = false; + found++; + } + break; + case kTypeColumnFamilyRangeDeletion: + case kTypeRangeDeletion: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE)); + s = handler->DeleteRangeCF(column_family, key, value); + if (LIKELY(s.ok())) { + empty_batch = false; + found++; + } + break; + case kTypeColumnFamilyMerge: + case kTypeMerge: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); + s = handler->MergeCF(column_family, key, value); + if (LIKELY(s.ok())) { + empty_batch = false; + found++; + } + break; + case kTypeColumnFamilyBlobIndex: + case kTypeBlobIndex: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); + s = handler->PutBlobIndexCF(column_family, key, value); + if (LIKELY(s.ok())) { + found++; + } + break; + case kTypeLogData: + handler->LogData(blob); + // A batch might have nothing but LogData. It is still a batch. + empty_batch = false; + break; + case kTypeBeginPrepareXID: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); + s = handler->MarkBeginPrepare(); + assert(s.ok()); + empty_batch = false; + if (handler->WriteAfterCommit() == + WriteBatch::Handler::OptionState::kDisabled) { + s = Status::NotSupported( + "WriteCommitted txn tag when write_after_commit_ is disabled (in " + "WritePrepared/WriteUnprepared mode). If it is not due to " + "corruption, the WAL must be emptied before changing the " + "WritePolicy."); + } + if (handler->WriteBeforePrepare() == + WriteBatch::Handler::OptionState::kEnabled) { + s = Status::NotSupported( + "WriteCommitted txn tag when write_before_prepare_ is enabled " + "(in WriteUnprepared mode). If it is not due to corruption, the " + "WAL must be emptied before changing the WritePolicy."); + } + break; + case kTypeBeginPersistedPrepareXID: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); + s = handler->MarkBeginPrepare(); + assert(s.ok()); + empty_batch = false; + if (handler->WriteAfterCommit() == + WriteBatch::Handler::OptionState::kEnabled) { + s = Status::NotSupported( + "WritePrepared/WriteUnprepared txn tag when write_after_commit_ " + "is enabled (in default WriteCommitted mode). If it is not due " + "to corruption, the WAL must be emptied before changing the " + "WritePolicy."); + } + break; + case kTypeBeginUnprepareXID: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE)); + s = handler->MarkBeginPrepare(true /* unprepared */); + assert(s.ok()); + empty_batch = false; + if (handler->WriteAfterCommit() == + WriteBatch::Handler::OptionState::kEnabled) { + s = Status::NotSupported( + "WriteUnprepared txn tag when write_after_commit_ is enabled (in " + "default WriteCommitted mode). If it is not due to corruption, " + "the WAL must be emptied before changing the WritePolicy."); + } + if (handler->WriteBeforePrepare() == + WriteBatch::Handler::OptionState::kDisabled) { + s = Status::NotSupported( + "WriteUnprepared txn tag when write_before_prepare_ is disabled " + "(in WriteCommitted/WritePrepared mode). If it is not due to " + "corruption, the WAL must be emptied before changing the " + "WritePolicy."); + } + break; + case kTypeEndPrepareXID: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE)); + s = handler->MarkEndPrepare(xid); + assert(s.ok()); + empty_batch = true; + break; + case kTypeCommitXID: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); + s = handler->MarkCommit(xid); + assert(s.ok()); + empty_batch = true; + break; + case kTypeCommitXIDAndTimestamp: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); + // key stores the commit timestamp. + assert(!key.empty()); + s = handler->MarkCommitWithTimestamp(xid, key); + if (LIKELY(s.ok())) { + empty_batch = true; + } + break; + case kTypeRollbackXID: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); + s = handler->MarkRollback(xid); + assert(s.ok()); + empty_batch = true; + break; + case kTypeNoop: + s = handler->MarkNoop(empty_batch); + assert(s.ok()); + empty_batch = true; + break; + case kTypeWideColumnEntity: + case kTypeColumnFamilyWideColumnEntity: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_PUT_ENTITY)); + s = handler->PutEntityCF(column_family, key, value); + if (LIKELY(s.ok())) { + empty_batch = false; + ++found; + } + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + } + if (!s.ok()) { + return s; + } + if (handler_continue && whole_batch && + found != WriteBatchInternal::Count(wb)) { + return Status::Corruption("WriteBatch has wrong count"); + } else { + return Status::OK(); + } +} + +bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) { + return b->is_latest_persistent_state_; +} + +void WriteBatchInternal::SetAsLatestPersistentState(WriteBatch* b) { + b->is_latest_persistent_state_ = true; +} + +uint32_t WriteBatchInternal::Count(const WriteBatch* b) { + return DecodeFixed32(b->rep_.data() + 8); +} + +void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) { + EncodeFixed32(&b->rep_[8], n); +} + +SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { + return SequenceNumber(DecodeFixed64(b->rep_.data())); +} + +void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { + EncodeFixed64(&b->rep_[0], seq); +} + +size_t WriteBatchInternal::GetFirstOffset(WriteBatch* /*b*/) { + return WriteBatchInternal::kHeader; +} + +std::tuple<Status, uint32_t, size_t> +WriteBatchInternal::GetColumnFamilyIdAndTimestampSize( + WriteBatch* b, ColumnFamilyHandle* column_family) { + uint32_t cf_id = GetColumnFamilyID(column_family); + size_t ts_sz = 0; + Status s; + if (column_family) { + const Comparator* const ucmp = column_family->GetComparator(); + if (ucmp) { + ts_sz = ucmp->timestamp_size(); + if (0 == cf_id && b->default_cf_ts_sz_ != ts_sz) { + s = Status::InvalidArgument("Default cf timestamp size mismatch"); + } + } + } else if (b->default_cf_ts_sz_ > 0) { + ts_sz = b->default_cf_ts_sz_; + } + return std::make_tuple(s, cf_id, ts_sz); +} + +namespace { +Status CheckColumnFamilyTimestampSize(ColumnFamilyHandle* column_family, + const Slice& ts) { + if (!column_family) { + return Status::InvalidArgument("column family handle cannot be null"); + } + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + size_t cf_ts_sz = ucmp->timestamp_size(); + if (0 == cf_ts_sz) { + return Status::InvalidArgument("timestamp disabled"); + } + if (cf_ts_sz != ts.size()) { + return Status::InvalidArgument("timestamp size mismatch"); + } + return Status::OK(); +} +} // anonymous namespace + +Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value) { + if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) { + return Status::InvalidArgument("key is too large"); + } + if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) { + return Status::InvalidArgument("value is too large"); + } + + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeValue)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store( + b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // Technically the optype could've been `kTypeColumnFamilyValue` with the + // CF ID encoded in the `WriteBatch`. That distinction is unimportant + // however since we verify CF ID is correct, as well as all other fields + // (a missing/extra encoded CF ID would corrupt another field). It is + // convenient to consolidate on `kTypeValue` here as that is what will be + // inserted into memtable. + b->prot_info_->entries_.emplace_back(ProtectionInfo64() + .ProtectKVO(key, value, kTypeValue) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Put(this, cf_id, key, value); + } + + needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array<Slice, 2> key_with_ts{{key, dummy_ts}}; + return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2), + SliceParts(&value, 1)); +} + +Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + has_key_with_ts_ = true; + assert(column_family); + uint32_t cf_id = column_family->GetID(); + std::array<Slice, 2> key_with_ts{{key, ts}}; + return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2), + SliceParts(&value, 1)); +} + +Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key, + const SliceParts& value) { + size_t total_key_bytes = 0; + for (int i = 0; i < key.num_parts; ++i) { + total_key_bytes += key.parts[i].size(); + } + if (total_key_bytes >= size_t{std::numeric_limits<uint32_t>::max()}) { + return Status::InvalidArgument("key is too large"); + } + + size_t total_value_bytes = 0; + for (int i = 0; i < value.num_parts; ++i) { + total_value_bytes += value.parts[i].size(); + } + if (total_value_bytes >= size_t{std::numeric_limits<uint32_t>::max()}) { + return Status::InvalidArgument("value is too large"); + } + return Status::OK(); +} + +Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value) { + Status s = CheckSlicePartsLength(key, value); + if (!s.ok()) { + return s; + } + + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeValue)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, key); + PutLengthPrefixedSliceParts(&b->rep_, value); + b->content_flags_.store( + b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back(ProtectionInfo64() + .ProtectKVO(key, value, kTypeValue) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (ts_sz == 0) { + return WriteBatchInternal::Put(this, cf_id, key, value); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); +} + +Status WriteBatchInternal::PutEntity(WriteBatch* b, uint32_t column_family_id, + const Slice& key, + const WideColumns& columns) { + assert(b); + + if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) { + return Status::InvalidArgument("key is too large"); + } + + WideColumns sorted_columns(columns); + std::sort(sorted_columns.begin(), sorted_columns.end(), + [](const WideColumn& lhs, const WideColumn& rhs) { + return lhs.name().compare(rhs.name()) < 0; + }); + + std::string entity; + const Status s = WideColumnSerialization::Serialize(sorted_columns, entity); + if (!s.ok()) { + return s; + } + + if (entity.size() > size_t{std::numeric_limits<uint32_t>::max()}) { + return Status::InvalidArgument("wide column entity is too large"); + } + + LocalSavePoint save(b); + + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeWideColumnEntity)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyWideColumnEntity)); + PutVarint32(&b->rep_, column_family_id); + } + + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, entity); + + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_PUT_ENTITY, + std::memory_order_relaxed); + + if (b->prot_info_ != nullptr) { + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, entity, kTypeWideColumnEntity) + .ProtectC(column_family_id)); + } + + return save.commit(); +} + +Status WriteBatch::PutEntity(ColumnFamilyHandle* column_family, + const Slice& key, const WideColumns& columns) { + if (!column_family) { + return Status::InvalidArgument( + "Cannot call this method without a column family handle"); + } + + Status s; + uint32_t cf_id = 0; + size_t ts_sz = 0; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (ts_sz) { + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); + } + + return WriteBatchInternal::PutEntity(this, cf_id, key, columns); +} + +Status WriteBatchInternal::InsertNoop(WriteBatch* b) { + b->rep_.push_back(static_cast<char>(kTypeNoop)); + return Status::OK(); +} + +Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, + bool write_after_commit, + bool unprepared_batch) { + // a manually constructed batch can only contain one prepare section + assert(b->rep_[12] == static_cast<char>(kTypeNoop)); + + // all savepoints up to this point are cleared + if (b->save_points_ != nullptr) { + while (!b->save_points_->stack.empty()) { + b->save_points_->stack.pop(); + } + } + + // rewrite noop as begin marker + b->rep_[12] = static_cast<char>( + write_after_commit ? kTypeBeginPrepareXID + : (unprepared_batch ? kTypeBeginUnprepareXID + : kTypeBeginPersistedPrepareXID)); + b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_END_PREPARE | + ContentFlags::HAS_BEGIN_PREPARE, + std::memory_order_relaxed); + if (unprepared_batch) { + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BEGIN_UNPREPARE, + std::memory_order_relaxed); + } + return Status::OK(); +} + +Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { + b->rep_.push_back(static_cast<char>(kTypeCommitXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_COMMIT, + std::memory_order_relaxed); + return Status::OK(); +} + +Status WriteBatchInternal::MarkCommitWithTimestamp(WriteBatch* b, + const Slice& xid, + const Slice& commit_ts) { + assert(!commit_ts.empty()); + b->rep_.push_back(static_cast<char>(kTypeCommitXIDAndTimestamp)); + PutLengthPrefixedSlice(&b->rep_, commit_ts); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_COMMIT, + std::memory_order_relaxed); + return Status::OK(); +} + +Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) { + b->rep_.push_back(static_cast<char>(kTypeRollbackXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_ROLLBACK, + std::memory_order_relaxed); + return Status::OK(); +} + +Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, + const Slice& key) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeDeletion)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, "" /* value */, kTypeDeletion) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Delete(this, cf_id, key); + } + + needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array<Slice, 2> key_with_ts{{key, dummy_ts}}; + return WriteBatchInternal::Delete(this, cf_id, + SliceParts(key_with_ts.data(), 2)); +} + +Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + assert(column_family); + has_key_with_ts_ = true; + uint32_t cf_id = column_family->GetID(); + std::array<Slice, 2> key_with_ts{{key, ts}}; + return WriteBatchInternal::Delete(this, cf_id, + SliceParts(key_with_ts.data(), 2)); +} + +Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeDeletion)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, + SliceParts(nullptr /* _parts */, 0 /* _num_parts */), + kTypeDeletion) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Delete(this, cf_id, key); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); +} + +Status WriteBatchInternal::SingleDelete(WriteBatch* b, + uint32_t column_family_id, + const Slice& key) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeSingleDeletion)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_SINGLE_DELETE, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, "" /* value */, kTypeSingleDeletion) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::SingleDelete(this, cf_id, key); + } + + needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array<Slice, 2> key_with_ts{{key, dummy_ts}}; + return WriteBatchInternal::SingleDelete(this, cf_id, + SliceParts(key_with_ts.data(), 2)); +} + +Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + has_key_with_ts_ = true; + assert(column_family); + uint32_t cf_id = column_family->GetID(); + std::array<Slice, 2> key_with_ts{{key, ts}}; + return WriteBatchInternal::SingleDelete(this, cf_id, + SliceParts(key_with_ts.data(), 2)); +} + +Status WriteBatchInternal::SingleDelete(WriteBatch* b, + uint32_t column_family_id, + const SliceParts& key) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeSingleDeletion)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_SINGLE_DELETE, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, + SliceParts(nullptr /* _parts */, + 0 /* _num_parts */) /* value */, + kTypeSingleDeletion) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::SingleDelete(this, cf_id, key); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); +} + +Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, + const Slice& begin_key, + const Slice& end_key) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeRangeDeletion)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, begin_key); + PutLengthPrefixedSlice(&b->rep_, end_key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE_RANGE, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + // In `DeleteRange()`, the end key is treated as the value. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(begin_key, end_key, kTypeRangeDeletion) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key); + } + + needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array<Slice, 2> begin_key_with_ts{{begin_key, dummy_ts}}; + std::array<Slice, 2> end_key_with_ts{{end_key, dummy_ts}}; + return WriteBatchInternal::DeleteRange( + this, cf_id, SliceParts(begin_key_with_ts.data(), 2), + SliceParts(end_key_with_ts.data(), 2)); +} + +Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key, + const Slice& ts) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + assert(column_family); + has_key_with_ts_ = true; + uint32_t cf_id = column_family->GetID(); + std::array<Slice, 2> key_with_ts{{begin_key, ts}}; + std::array<Slice, 2> end_key_with_ts{{end_key, ts}}; + return WriteBatchInternal::DeleteRange(this, cf_id, + SliceParts(key_with_ts.data(), 2), + SliceParts(end_key_with_ts.data(), 2)); +} + +Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, + const SliceParts& begin_key, + const SliceParts& end_key) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeRangeDeletion)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, begin_key); + PutLengthPrefixedSliceParts(&b->rep_, end_key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE_RANGE, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + // In `DeleteRange()`, the end key is treated as the value. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(begin_key, end_key, kTypeRangeDeletion) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, + const SliceParts& begin_key, + const SliceParts& end_key) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); +} + +Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value) { + if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) { + return Status::InvalidArgument("key is too large"); + } + if (value.size() > size_t{std::numeric_limits<uint32_t>::max()}) { + return Status::InvalidArgument("value is too large"); + } + + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeMerge)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_MERGE, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back(ProtectionInfo64() + .ProtectKVO(key, value, kTypeMerge) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Merge(this, cf_id, key, value); + } + + needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array<Slice, 2> key_with_ts{{key, dummy_ts}}; + + return WriteBatchInternal::Merge( + this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1)); +} + +Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + has_key_with_ts_ = true; + assert(column_family); + uint32_t cf_id = column_family->GetID(); + std::array<Slice, 2> key_with_ts{{key, ts}}; + return WriteBatchInternal::Merge( + this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1)); +} + +Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key, + const SliceParts& value) { + Status s = CheckSlicePartsLength(key, value); + if (!s.ok()) { + return s; + } + + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeMerge)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, key); + PutLengthPrefixedSliceParts(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_MERGE, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back(ProtectionInfo64() + .ProtectKVO(key, value, kTypeMerge) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::Merge(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value) { + size_t ts_sz = 0; + uint32_t cf_id = 0; + Status s; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (0 == ts_sz) { + return WriteBatchInternal::Merge(this, cf_id, key, value); + } + + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); +} + +Status WriteBatchInternal::PutBlobIndex(WriteBatch* b, + uint32_t column_family_id, + const Slice& key, const Slice& value) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast<char>(kTypeBlobIndex)); + } else { + b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BLOB_INDEX, + std::memory_order_relaxed); + if (b->prot_info_ != nullptr) { + // See comment in first `WriteBatchInternal::Put()` overload concerning the + // `ValueType` argument passed to `ProtectKVO()`. + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, value, kTypeBlobIndex) + .ProtectC(column_family_id)); + } + return save.commit(); +} + +Status WriteBatch::PutLogData(const Slice& blob) { + LocalSavePoint save(this); + rep_.push_back(static_cast<char>(kTypeLogData)); + PutLengthPrefixedSlice(&rep_, blob); + return save.commit(); +} + +void WriteBatch::SetSavePoint() { + if (save_points_ == nullptr) { + save_points_.reset(new SavePoints()); + } + // Record length and count of current batch of writes. + save_points_->stack.push(SavePoint( + GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed))); +} + +Status WriteBatch::RollbackToSavePoint() { + if (save_points_ == nullptr || save_points_->stack.size() == 0) { + return Status::NotFound(); + } + + // Pop the most recent savepoint off the stack + SavePoint savepoint = save_points_->stack.top(); + save_points_->stack.pop(); + + assert(savepoint.size <= rep_.size()); + assert(static_cast<uint32_t>(savepoint.count) <= Count()); + + if (savepoint.size == rep_.size()) { + // No changes to rollback + } else if (savepoint.size == 0) { + // Rollback everything + Clear(); + } else { + rep_.resize(savepoint.size); + if (prot_info_ != nullptr) { + prot_info_->entries_.resize(savepoint.count); + } + WriteBatchInternal::SetCount(this, savepoint.count); + content_flags_.store(savepoint.content_flags, std::memory_order_relaxed); + } + + return Status::OK(); +} + +Status WriteBatch::PopSavePoint() { + if (save_points_ == nullptr || save_points_->stack.size() == 0) { + return Status::NotFound(); + } + + // Pop the most recent savepoint off the stack + save_points_->stack.pop(); + + return Status::OK(); +} + +Status WriteBatch::UpdateTimestamps( + const Slice& ts, std::function<size_t(uint32_t)> ts_sz_func) { + TimestampUpdater<decltype(ts_sz_func)> ts_updater(prot_info_.get(), + std::move(ts_sz_func), ts); + const Status s = Iterate(&ts_updater); + if (s.ok()) { + needs_in_place_update_ts_ = false; + } + return s; +} + +Status WriteBatch::VerifyChecksum() const { + if (prot_info_ == nullptr) { + return Status::OK(); + } + Slice input(rep_.data() + WriteBatchInternal::kHeader, + rep_.size() - WriteBatchInternal::kHeader); + Slice key, value, blob, xid; + char tag = 0; + uint32_t column_family = 0; // default + Status s; + size_t prot_info_idx = 0; + bool checksum_protected = true; + while (!input.empty() && prot_info_idx < prot_info_->entries_.size()) { + // In case key/value/column_family are not updated by + // ReadRecordFromWriteBatch + key.clear(); + value.clear(); + column_family = 0; + s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, + &blob, &xid); + if (!s.ok()) { + return s; + } + checksum_protected = true; + // Write batch checksum uses op_type without ColumnFamily (e.g., if op_type + // in the write batch is kTypeColumnFamilyValue, kTypeValue is used to + // compute the checksum), and encodes column family id separately. See + // comment in first `WriteBatchInternal::Put()` for more detail. + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + tag = kTypeValue; + break; + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + tag = kTypeDeletion; + break; + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + tag = kTypeSingleDeletion; + break; + case kTypeColumnFamilyRangeDeletion: + case kTypeRangeDeletion: + tag = kTypeRangeDeletion; + break; + case kTypeColumnFamilyMerge: + case kTypeMerge: + tag = kTypeMerge; + break; + case kTypeColumnFamilyBlobIndex: + case kTypeBlobIndex: + tag = kTypeBlobIndex; + break; + case kTypeLogData: + case kTypeBeginPrepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + case kTypeNoop: + case kTypeBeginPersistedPrepareXID: + case kTypeBeginUnprepareXID: + case kTypeDeletionWithTimestamp: + case kTypeCommitXIDAndTimestamp: + checksum_protected = false; + break; + case kTypeColumnFamilyWideColumnEntity: + case kTypeWideColumnEntity: + tag = kTypeWideColumnEntity; + break; + default: + return Status::Corruption( + "unknown WriteBatch tag", + std::to_string(static_cast<unsigned int>(tag))); + } + if (checksum_protected) { + s = prot_info_->entries_[prot_info_idx++] + .StripC(column_family) + .StripKVO(key, value, static_cast<ValueType>(tag)) + .GetStatus(); + if (!s.ok()) { + return s; + } + } + } + + if (prot_info_idx != WriteBatchInternal::Count(this)) { + return Status::Corruption("WriteBatch has wrong count"); + } + assert(WriteBatchInternal::Count(this) == prot_info_->entries_.size()); + return Status::OK(); +} + +namespace { + +class MemTableInserter : public WriteBatch::Handler { + SequenceNumber sequence_; + ColumnFamilyMemTables* const cf_mems_; + FlushScheduler* const flush_scheduler_; + TrimHistoryScheduler* const trim_history_scheduler_; + const bool ignore_missing_column_families_; + const uint64_t recovering_log_number_; + // log number that all Memtables inserted into should reference + uint64_t log_number_ref_; + DBImpl* db_; + const bool concurrent_memtable_writes_; + bool post_info_created_; + const WriteBatch::ProtectionInfo* prot_info_; + size_t prot_info_idx_; + + bool* has_valid_writes_; + // On some (!) platforms just default creating + // a map is too expensive in the Write() path as they + // cause memory allocations though unused. + // Make creation optional but do not incur + // std::unique_ptr additional allocation + using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>; + using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type; + PostMapType mem_post_info_map_; + // current recovered transaction we are rebuilding (recovery) + WriteBatch* rebuilding_trx_; + SequenceNumber rebuilding_trx_seq_; + // Increase seq number once per each write batch. Otherwise increase it once + // per key. + bool seq_per_batch_; + // Whether the memtable write will be done only after the commit + bool write_after_commit_; + // Whether memtable write can be done before prepare + bool write_before_prepare_; + // Whether this batch was unprepared or not + bool unprepared_batch_; + using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type; + DupDetector duplicate_detector_; + bool dup_dectector_on_; + + bool hint_per_batch_; + bool hint_created_; + // Hints for this batch + using HintMap = std::unordered_map<MemTable*, void*>; + using HintMapType = std::aligned_storage<sizeof(HintMap)>::type; + HintMapType hint_; + + HintMap& GetHintMap() { + assert(hint_per_batch_); + if (!hint_created_) { + new (&hint_) HintMap(); + hint_created_ = true; + } + return *reinterpret_cast<HintMap*>(&hint_); + } + + MemPostInfoMap& GetPostMap() { + assert(concurrent_memtable_writes_); + if (!post_info_created_) { + new (&mem_post_info_map_) MemPostInfoMap(); + post_info_created_ = true; + } + return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_); + } + + bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) { + assert(!write_after_commit_); + assert(rebuilding_trx_ != nullptr); + if (!dup_dectector_on_) { + new (&duplicate_detector_) DuplicateDetector(db_); + dup_dectector_on_ = true; + } + return reinterpret_cast<DuplicateDetector*>(&duplicate_detector_) + ->IsDuplicateKeySeq(column_family_id, key, sequence_); + } + + const ProtectionInfoKVOC64* NextProtectionInfo() { + const ProtectionInfoKVOC64* res = nullptr; + if (prot_info_ != nullptr) { + assert(prot_info_idx_ < prot_info_->entries_.size()); + res = &prot_info_->entries_[prot_info_idx_]; + ++prot_info_idx_; + } + return res; + } + + void DecrementProtectionInfoIdxForTryAgain() { + if (prot_info_ != nullptr) --prot_info_idx_; + } + + void ResetProtectionInfo() { + prot_info_idx_ = 0; + prot_info_ = nullptr; + } + + protected: + Handler::OptionState WriteBeforePrepare() const override { + return write_before_prepare_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } + Handler::OptionState WriteAfterCommit() const override { + return write_after_commit_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } + + public: + // cf_mems should not be shared with concurrent inserters + MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems, + FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families, + uint64_t recovering_log_number, DB* db, + bool concurrent_memtable_writes, + const WriteBatch::ProtectionInfo* prot_info, + bool* has_valid_writes = nullptr, bool seq_per_batch = false, + bool batch_per_txn = true, bool hint_per_batch = false) + : sequence_(_sequence), + cf_mems_(cf_mems), + flush_scheduler_(flush_scheduler), + trim_history_scheduler_(trim_history_scheduler), + ignore_missing_column_families_(ignore_missing_column_families), + recovering_log_number_(recovering_log_number), + log_number_ref_(0), + db_(static_cast_with_check<DBImpl>(db)), + concurrent_memtable_writes_(concurrent_memtable_writes), + post_info_created_(false), + prot_info_(prot_info), + prot_info_idx_(0), + has_valid_writes_(has_valid_writes), + rebuilding_trx_(nullptr), + rebuilding_trx_seq_(0), + seq_per_batch_(seq_per_batch), + // Write after commit currently uses one seq per key (instead of per + // batch). So seq_per_batch being false indicates write_after_commit + // approach. + write_after_commit_(!seq_per_batch), + // WriteUnprepared can write WriteBatches per transaction, so + // batch_per_txn being false indicates write_before_prepare. + write_before_prepare_(!batch_per_txn), + unprepared_batch_(false), + duplicate_detector_(), + dup_dectector_on_(false), + hint_per_batch_(hint_per_batch), + hint_created_(false) { + assert(cf_mems_); + } + + ~MemTableInserter() override { + if (dup_dectector_on_) { + reinterpret_cast<DuplicateDetector*>(&duplicate_detector_) + ->~DuplicateDetector(); + } + if (post_info_created_) { + reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_)->~MemPostInfoMap(); + } + if (hint_created_) { + for (auto iter : GetHintMap()) { + delete[] reinterpret_cast<char*>(iter.second); + } + reinterpret_cast<HintMap*>(&hint_)->~HintMap(); + } + delete rebuilding_trx_; + } + + MemTableInserter(const MemTableInserter&) = delete; + MemTableInserter& operator=(const MemTableInserter&) = delete; + + // The batch seq is regularly restarted; In normal mode it is set when + // MemTableInserter is constructed in the write thread and in recovery mode it + // is set when a batch, which is tagged with seq, is read from the WAL. + // Within a sequenced batch, which could be a merge of multiple batches, we + // have two policies to advance the seq: i) seq_per_key (default) and ii) + // seq_per_batch. To implement the latter we need to mark the boundary between + // the individual batches. The approach is this: 1) Use the terminating + // markers to indicate the boundary (kTypeEndPrepareXID, kTypeCommitXID, + // kTypeRollbackXID) 2) Terminate a batch with kTypeNoop in the absence of a + // natural boundary marker. + void MaybeAdvanceSeq(bool batch_boundry = false) { + if (batch_boundry == seq_per_batch_) { + sequence_++; + } + } + + void set_log_number_ref(uint64_t log) { log_number_ref_ = log; } + void set_prot_info(const WriteBatch::ProtectionInfo* prot_info) { + prot_info_ = prot_info; + prot_info_idx_ = 0; + } + + SequenceNumber sequence() const { return sequence_; } + + void PostProcess() { + assert(concurrent_memtable_writes_); + // If post info was not created there is nothing + // to process and no need to create on demand + if (post_info_created_) { + for (auto& pair : GetPostMap()) { + pair.first->BatchPostProcess(pair.second); + } + } + } + + bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { + // If we are in a concurrent mode, it is the caller's responsibility + // to clone the original ColumnFamilyMemTables so that each thread + // has its own instance. Otherwise, it must be guaranteed that there + // is no concurrent access + bool found = cf_mems_->Seek(column_family_id); + if (!found) { + if (ignore_missing_column_families_) { + *s = Status::OK(); + } else { + *s = Status::InvalidArgument( + "Invalid column family specified in write batch"); + } + return false; + } + if (recovering_log_number_ != 0 && + recovering_log_number_ < cf_mems_->GetLogNumber()) { + // This is true only in recovery environment (recovering_log_number_ is + // always 0 in + // non-recovery, regular write code-path) + // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that + // column family already contains updates from this log. We can't apply + // updates twice because of update-in-place or merge workloads -- ignore + // the update + *s = Status::OK(); + return false; + } + + if (has_valid_writes_ != nullptr) { + *has_valid_writes_ = true; + } + + if (log_number_ref_ > 0) { + cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_); + } + + return true; + } + + Status PutCFImpl(uint32_t column_family_id, const Slice& key, + const Slice& value, ValueType value_type, + const ProtectionInfoKVOS64* kv_prot_info) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, + value); + // else insert the values to the memtable right away + } + + Status ret_status; + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probably flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id, + key, value); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); + } + return ret_status; + } + assert(ret_status.ok()); + + MemTable* mem = cf_mems_->GetMemTable(); + auto* moptions = mem->GetImmutableMemTableOptions(); + // inplace_update_support is inconsistent with snapshots, and therefore with + // any kind of transactions including the ones that use seq_per_batch + assert(!seq_per_batch_ || !moptions->inplace_update_support); + if (!moptions->inplace_update_support) { + ret_status = + mem->Add(sequence_, value_type, key, value, kv_prot_info, + concurrent_memtable_writes_, get_post_process_info(mem), + hint_per_batch_ ? &GetHintMap()[mem] : nullptr); + } else if (moptions->inplace_callback == nullptr || + value_type != kTypeValue) { + assert(!concurrent_memtable_writes_); + ret_status = mem->Update(sequence_, value_type, key, value, kv_prot_info); + } else { + assert(!concurrent_memtable_writes_); + assert(value_type == kTypeValue); + ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info); + if (ret_status.IsNotFound()) { + // key not found in memtable. Do sst get, update, add + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions ropts; + // it's going to be overwritten for sure, so no point caching data block + // containing the old version + ropts.fill_cache = false; + ropts.snapshot = &read_from_snapshot; + + std::string prev_value; + std::string merged_value; + + auto cf_handle = cf_mems_->GetColumnFamilyHandle(); + Status get_status = Status::NotSupported(); + if (db_ != nullptr && recovering_log_number_ == 0) { + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + // TODO (yanqin): fix when user-defined timestamp is enabled. + get_status = db_->Get(ropts, cf_handle, key, &prev_value); + } + // Intentionally overwrites the `NotFound` in `ret_status`. + if (!get_status.ok() && !get_status.IsNotFound()) { + ret_status = get_status; + } else { + ret_status = Status::OK(); + } + if (ret_status.ok()) { + UpdateStatus update_status; + char* prev_buffer = const_cast<char*>(prev_value.c_str()); + uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); + if (get_status.ok()) { + update_status = moptions->inplace_callback(prev_buffer, &prev_size, + value, &merged_value); + } else { + update_status = moptions->inplace_callback( + nullptr /* existing_value */, nullptr /* existing_value_size */, + value, &merged_value); + } + if (update_status == UpdateStatus::UPDATED_INPLACE) { + assert(get_status.ok()); + if (kv_prot_info != nullptr) { + ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); + updated_kv_prot_info.UpdateV(value, + Slice(prev_buffer, prev_size)); + // prev_value is updated in-place with final value. + ret_status = mem->Add(sequence_, value_type, key, + Slice(prev_buffer, prev_size), + &updated_kv_prot_info); + } else { + ret_status = mem->Add(sequence_, value_type, key, + Slice(prev_buffer, prev_size), + nullptr /* kv_prot_info */); + } + if (ret_status.ok()) { + RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); + } + } else if (update_status == UpdateStatus::UPDATED) { + if (kv_prot_info != nullptr) { + ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); + updated_kv_prot_info.UpdateV(value, merged_value); + // merged_value contains the final value. + ret_status = mem->Add(sequence_, value_type, key, + Slice(merged_value), &updated_kv_prot_info); + } else { + // merged_value contains the final value. + ret_status = + mem->Add(sequence_, value_type, key, Slice(merged_value), + nullptr /* kv_prot_info */); + } + if (ret_status.ok()) { + RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); + } + } + } + } + } + if (UNLIKELY(ret_status.IsTryAgain())) { + assert(seq_per_batch_); + const bool kBatchBoundary = true; + MaybeAdvanceSeq(kBatchBoundary); + } else if (ret_status.ok()) { + MaybeAdvanceSeq(); + CheckMemtableFull(); + } + // optimize for non-recovery mode + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id, + key, value); + } + return ret_status; + } + + Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + const auto* kv_prot_info = NextProtectionInfo(); + Status ret_status; + if (kv_prot_info != nullptr) { + // Memtable needs seqno, doesn't need CF ID + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + ret_status = PutCFImpl(column_family_id, key, value, kTypeValue, + &mem_kv_prot_info); + } else { + ret_status = PutCFImpl(column_family_id, key, value, kTypeValue, + nullptr /* kv_prot_info */); + } + // TODO: this assumes that if TryAgain status is returned to the caller, + // the operation is actually tried again. The proper way to do this is to + // pass a `try_again` parameter to the operation itself and decrement + // prot_info_idx_ based on that + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + + Status PutEntityCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + const auto* kv_prot_info = NextProtectionInfo(); + + Status s; + if (kv_prot_info) { + // Memtable needs seqno, doesn't need CF ID + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity, + &mem_kv_prot_info); + } else { + s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity, + /* kv_prot_info */ nullptr); + } + + if (UNLIKELY(s.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + + return s; + } + + Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key, + const Slice& value, ValueType delete_type, + const ProtectionInfoKVOS64* kv_prot_info) { + Status ret_status; + MemTable* mem = cf_mems_->GetMemTable(); + ret_status = + mem->Add(sequence_, delete_type, key, value, kv_prot_info, + concurrent_memtable_writes_, get_post_process_info(mem), + hint_per_batch_ ? &GetHintMap()[mem] : nullptr); + if (UNLIKELY(ret_status.IsTryAgain())) { + assert(seq_per_batch_); + const bool kBatchBoundary = true; + MaybeAdvanceSeq(kBatchBoundary); + } else if (ret_status.ok()) { + MaybeAdvanceSeq(); + CheckMemtableFull(); + } + return ret_status; + } + + Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + const auto* kv_prot_info = NextProtectionInfo(); + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + // else insert the values to the memtable right away + } + + Status ret_status; + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probably flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = + WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + + ColumnFamilyData* cfd = cf_mems_->current(); + assert(!cfd || cfd->user_comparator()); + const size_t ts_sz = (cfd && cfd->user_comparator()) + ? cfd->user_comparator()->timestamp_size() + : 0; + const ValueType delete_type = + (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp; + if (kv_prot_info != nullptr) { + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + mem_kv_prot_info.UpdateO(kTypeDeletion, delete_type); + ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type, + &mem_kv_prot_info); + } else { + ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type, + nullptr /* kv_prot_info */); + } + // optimize for non-recovery mode + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = + WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + + Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { + const auto* kv_prot_info = NextProtectionInfo(); + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, + key); + // else insert the values to the memtable right away + } + + Status ret_status; + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probably flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_, + column_family_id, key); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + assert(ret_status.ok()); + + if (kv_prot_info != nullptr) { + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + ret_status = DeleteImpl(column_family_id, key, Slice(), + kTypeSingleDeletion, &mem_kv_prot_info); + } else { + ret_status = DeleteImpl(column_family_id, key, Slice(), + kTypeSingleDeletion, nullptr /* kv_prot_info */); + } + // optimize for non-recovery mode + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_, + column_family_id, key); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + + Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key, + const Slice& end_key) override { + const auto* kv_prot_info = NextProtectionInfo(); + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, + begin_key, end_key); + // else insert the values to the memtable right away + } + + Status ret_status; + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probably flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = WriteBatchInternal::DeleteRange( + rebuilding_trx_, column_family_id, begin_key, end_key); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + assert(ret_status.ok()); + + if (db_ != nullptr) { + auto cf_handle = cf_mems_->GetColumnFamilyHandle(); + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + auto* cfd = + static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd(); + if (!cfd->is_delete_range_supported()) { + // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`. + ret_status.PermitUncheckedError(); + return Status::NotSupported( + std::string("DeleteRange not supported for table type ") + + cfd->ioptions()->table_factory->Name() + " in CF " + + cfd->GetName()); + } + int cmp = + cfd->user_comparator()->CompareWithoutTimestamp(begin_key, end_key); + if (cmp > 0) { + // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`. + ret_status.PermitUncheckedError(); + // It's an empty range where endpoints appear mistaken. Don't bother + // applying it to the DB, and return an error to the user. + return Status::InvalidArgument("end key comes before start key"); + } else if (cmp == 0) { + // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`. + ret_status.PermitUncheckedError(); + // It's an empty range. Don't bother applying it to the DB. + return Status::OK(); + } + } + + if (kv_prot_info != nullptr) { + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + ret_status = DeleteImpl(column_family_id, begin_key, end_key, + kTypeRangeDeletion, &mem_kv_prot_info); + } else { + ret_status = DeleteImpl(column_family_id, begin_key, end_key, + kTypeRangeDeletion, nullptr /* kv_prot_info */); + } + // optimize for non-recovery mode + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = WriteBatchInternal::DeleteRange( + rebuilding_trx_, column_family_id, begin_key, end_key); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + + Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + const auto* kv_prot_info = NextProtectionInfo(); + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, + value); + // else insert the values to the memtable right away + } + + Status ret_status; + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probably flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = WriteBatchInternal::Merge(rebuilding_trx_, + column_family_id, key, value); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + assert(ret_status.ok()); + + MemTable* mem = cf_mems_->GetMemTable(); + auto* moptions = mem->GetImmutableMemTableOptions(); + if (moptions->merge_operator == nullptr) { + return Status::InvalidArgument( + "Merge requires `ColumnFamilyOptions::merge_operator != nullptr`"); + } + bool perform_merge = false; + assert(!concurrent_memtable_writes_ || + moptions->max_successive_merges == 0); + + // If we pass DB through and options.max_successive_merges is hit + // during recovery, Get() will be issued which will try to acquire + // DB mutex and cause deadlock, as DB mutex is already held. + // So we disable merge in recovery + if (moptions->max_successive_merges > 0 && db_ != nullptr && + recovering_log_number_ == 0) { + assert(!concurrent_memtable_writes_); + LookupKey lkey(key, sequence_); + + // Count the number of successive merges at the head + // of the key in the memtable + size_t num_merges = mem->CountSuccessiveMergeEntries(lkey); + + if (num_merges >= moptions->max_successive_merges) { + perform_merge = true; + } + } + + if (perform_merge) { + // 1) Get the existing value + std::string get_value; + + // Pass in the sequence number so that we also include previous merge + // operations in the same batch. + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions read_options; + read_options.snapshot = &read_from_snapshot; + + auto cf_handle = cf_mems_->GetColumnFamilyHandle(); + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + Status get_status = db_->Get(read_options, cf_handle, key, &get_value); + if (!get_status.ok()) { + // Failed to read a key we know exists. Store the delta in memtable. + perform_merge = false; + } else { + Slice get_value_slice = Slice(get_value); + + // 2) Apply this merge + auto merge_operator = moptions->merge_operator; + assert(merge_operator); + + std::string new_value; + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator, key, &get_value_slice, {value}, &new_value, + moptions->info_log, moptions->statistics, + SystemClock::Default().get(), /* result_operand */ nullptr, + /* update_num_ops_stats */ false); + + if (!merge_status.ok()) { + // Failed to merge! + // Store the delta in memtable + perform_merge = false; + } else { + // 3) Add value to memtable + assert(!concurrent_memtable_writes_); + if (kv_prot_info != nullptr) { + auto merged_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + merged_kv_prot_info.UpdateV(value, new_value); + merged_kv_prot_info.UpdateO(kTypeMerge, kTypeValue); + ret_status = mem->Add(sequence_, kTypeValue, key, new_value, + &merged_kv_prot_info); + } else { + ret_status = mem->Add(sequence_, kTypeValue, key, new_value, + nullptr /* kv_prot_info */); + } + } + } + } + + if (!perform_merge) { + assert(ret_status.ok()); + // Add merge operand to memtable + if (kv_prot_info != nullptr) { + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + ret_status = + mem->Add(sequence_, kTypeMerge, key, value, &mem_kv_prot_info, + concurrent_memtable_writes_, get_post_process_info(mem)); + } else { + ret_status = mem->Add( + sequence_, kTypeMerge, key, value, nullptr /* kv_prot_info */, + concurrent_memtable_writes_, get_post_process_info(mem)); + } + } + + if (UNLIKELY(ret_status.IsTryAgain())) { + assert(seq_per_batch_); + const bool kBatchBoundary = true; + MaybeAdvanceSeq(kBatchBoundary); + } else if (ret_status.ok()) { + MaybeAdvanceSeq(); + CheckMemtableFull(); + } + // optimize for non-recovery mode + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // TODO(ajkr): propagate `ProtectionInfoKVOS64`. + ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, + key, value); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + + Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + const auto* kv_prot_info = NextProtectionInfo(); + Status ret_status; + if (kv_prot_info != nullptr) { + // Memtable needs seqno, doesn't need CF ID + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + // Same as PutCF except for value type. + ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex, + &mem_kv_prot_info); + } else { + ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex, + nullptr /* kv_prot_info */); + } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; + } + + void CheckMemtableFull() { + if (flush_scheduler_ != nullptr) { + auto* cfd = cf_mems_->current(); + assert(cfd != nullptr); + if (cfd->mem()->ShouldScheduleFlush() && + cfd->mem()->MarkFlushScheduled()) { + // MarkFlushScheduled only returns true if we are the one that + // should take action, so no need to dedup further + flush_scheduler_->ScheduleWork(cfd); + } + } + // check if memtable_list size exceeds max_write_buffer_size_to_maintain + if (trim_history_scheduler_ != nullptr) { + auto* cfd = cf_mems_->current(); + + assert(cfd); + assert(cfd->ioptions()); + + const size_t size_to_maintain = static_cast<size_t>( + cfd->ioptions()->max_write_buffer_size_to_maintain); + + if (size_to_maintain > 0) { + MemTableList* const imm = cfd->imm(); + assert(imm); + + if (imm->HasHistory()) { + const MemTable* const mem = cfd->mem(); + assert(mem); + + if (mem->MemoryAllocatedBytes() + + imm->MemoryAllocatedBytesExcludingLast() >= + size_to_maintain && + imm->MarkTrimHistoryNeeded()) { + trim_history_scheduler_->ScheduleWork(cfd); + } + } + } + } + } + + // The write batch handler calls MarkBeginPrepare with unprepare set to true + // if it encounters the kTypeBeginUnprepareXID marker. + Status MarkBeginPrepare(bool unprepare) override { + assert(rebuilding_trx_ == nullptr); + assert(db_); + + if (recovering_log_number_ != 0) { + db_->mutex()->AssertHeld(); + // during recovery we rebuild a hollow transaction + // from all encountered prepare sections of the wal + if (db_->allow_2pc() == false) { + return Status::NotSupported( + "WAL contains prepared transactions. Open with " + "TransactionDB::Open()."); + } + + // we are now iterating through a prepared section + rebuilding_trx_ = new WriteBatch(); + rebuilding_trx_seq_ = sequence_; + // Verify that we have matching MarkBeginPrepare/MarkEndPrepare markers. + // unprepared_batch_ should be false because it is false by default, and + // gets reset to false in MarkEndPrepare. + assert(!unprepared_batch_); + unprepared_batch_ = unprepare; + + if (has_valid_writes_ != nullptr) { + *has_valid_writes_ = true; + } + } + + return Status::OK(); + } + + Status MarkEndPrepare(const Slice& name) override { + assert(db_); + assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0)); + + if (recovering_log_number_ != 0) { + db_->mutex()->AssertHeld(); + assert(db_->allow_2pc()); + size_t batch_cnt = + write_after_commit_ + ? 0 // 0 will disable further checks + : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1); + db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(), + rebuilding_trx_, rebuilding_trx_seq_, + batch_cnt, unprepared_batch_); + unprepared_batch_ = false; + rebuilding_trx_ = nullptr; + } else { + assert(rebuilding_trx_ == nullptr); + } + const bool batch_boundry = true; + MaybeAdvanceSeq(batch_boundry); + + return Status::OK(); + } + + Status MarkNoop(bool empty_batch) override { + if (recovering_log_number_ != 0) { + db_->mutex()->AssertHeld(); + } + // A hack in pessimistic transaction could result into a noop at the start + // of the write batch, that should be ignored. + if (!empty_batch) { + // In the absence of Prepare markers, a kTypeNoop tag indicates the end of + // a batch. This happens when write batch commits skipping the prepare + // phase. + const bool batch_boundry = true; + MaybeAdvanceSeq(batch_boundry); + } + return Status::OK(); + } + + Status MarkCommit(const Slice& name) override { + assert(db_); + + Status s; + + if (recovering_log_number_ != 0) { + // We must hold db mutex in recovery. + db_->mutex()->AssertHeld(); + // in recovery when we encounter a commit marker + // we lookup this transaction in our set of rebuilt transactions + // and commit. + auto trx = db_->GetRecoveredTransaction(name.ToString()); + + // the log containing the prepared section may have + // been released in the last incarnation because the + // data was flushed to L0 + if (trx != nullptr) { + // at this point individual CF lognumbers will prevent + // duplicate re-insertion of values. + assert(log_number_ref_ == 0); + if (write_after_commit_) { + // write_after_commit_ can only have one batch in trx. + assert(trx->batches_.size() == 1); + const auto& batch_info = trx->batches_.begin()->second; + // all inserts must reference this trx log number + log_number_ref_ = batch_info.log_number_; + ResetProtectionInfo(); + s = batch_info.batch_->Iterate(this); + log_number_ref_ = 0; + } + // else the values are already inserted before the commit + + if (s.ok()) { + db_->DeleteRecoveredTransaction(name.ToString()); + } + if (has_valid_writes_ != nullptr) { + *has_valid_writes_ = true; + } + } + } else { + // When writes are not delayed until commit, there is no disconnect + // between a memtable write and the WAL that supports it. So the commit + // need not reference any log as the only log to which it depends. + assert(!write_after_commit_ || log_number_ref_ > 0); + } + const bool batch_boundry = true; + MaybeAdvanceSeq(batch_boundry); + + if (UNLIKELY(s.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + + return s; + } + + Status MarkCommitWithTimestamp(const Slice& name, + const Slice& commit_ts) override { + assert(db_); + + Status s; + + if (recovering_log_number_ != 0) { + // In recovery, db mutex must be held. + db_->mutex()->AssertHeld(); + // in recovery when we encounter a commit marker + // we lookup this transaction in our set of rebuilt transactions + // and commit. + auto trx = db_->GetRecoveredTransaction(name.ToString()); + // the log containing the prepared section may have + // been released in the last incarnation because the + // data was flushed to L0 + if (trx) { + // at this point individual CF lognumbers will prevent + // duplicate re-insertion of values. + assert(0 == log_number_ref_); + if (write_after_commit_) { + // write_after_commit_ can only have one batch in trx. + assert(trx->batches_.size() == 1); + const auto& batch_info = trx->batches_.begin()->second; + // all inserts must reference this trx log number + log_number_ref_ = batch_info.log_number_; + + s = batch_info.batch_->UpdateTimestamps( + commit_ts, [this](uint32_t cf) { + assert(db_); + VersionSet* const vset = db_->GetVersionSet(); + assert(vset); + ColumnFamilySet* const cf_set = vset->GetColumnFamilySet(); + assert(cf_set); + ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf); + assert(cfd); + const auto* const ucmp = cfd->user_comparator(); + assert(ucmp); + return ucmp->timestamp_size(); + }); + if (s.ok()) { + ResetProtectionInfo(); + s = batch_info.batch_->Iterate(this); + log_number_ref_ = 0; + } + } + // else the values are already inserted before the commit + + if (s.ok()) { + db_->DeleteRecoveredTransaction(name.ToString()); + } + if (has_valid_writes_) { + *has_valid_writes_ = true; + } + } + } else { + // When writes are not delayed until commit, there is no connection + // between a memtable write and the WAL that supports it. So the commit + // need not reference any log as the only log to which it depends. + assert(!write_after_commit_ || log_number_ref_ > 0); + } + constexpr bool batch_boundary = true; + MaybeAdvanceSeq(batch_boundary); + + if (UNLIKELY(s.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + + return s; + } + + Status MarkRollback(const Slice& name) override { + assert(db_); + + if (recovering_log_number_ != 0) { + auto trx = db_->GetRecoveredTransaction(name.ToString()); + + // the log containing the transactions prep section + // may have been released in the previous incarnation + // because we knew it had been rolled back + if (trx != nullptr) { + db_->DeleteRecoveredTransaction(name.ToString()); + } + } else { + // in non recovery we simply ignore this tag + } + + const bool batch_boundry = true; + MaybeAdvanceSeq(batch_boundry); + + return Status::OK(); + } + + private: + MemTablePostProcessInfo* get_post_process_info(MemTable* mem) { + if (!concurrent_memtable_writes_) { + // No need to batch counters locally if we don't use concurrent mode. + return nullptr; + } + return &GetPostMap()[mem]; + } +}; + +} // anonymous namespace + +// This function can only be called in these conditions: +// 1) During Recovery() +// 2) During Write(), in a single-threaded write thread +// 3) During Write(), in a concurrent context where memtables has been cloned +// The reason is that it calls memtables->Seek(), which has a stateful cache +Status WriteBatchInternal::InsertInto( + WriteThread::WriteGroup& write_group, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db, + bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) { + MemTableInserter inserter( + sequence, memtables, flush_scheduler, trim_history_scheduler, + ignore_missing_column_families, recovery_log_number, db, + concurrent_memtable_writes, nullptr /* prot_info */, + nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn); + for (auto w : write_group) { + if (w->CallbackFailed()) { + continue; + } + w->sequence = inserter.sequence(); + if (!w->ShouldWriteToMemtable()) { + // In seq_per_batch_ mode this advances the seq by one. + inserter.MaybeAdvanceSeq(true); + continue; + } + SetSequence(w->batch, inserter.sequence()); + inserter.set_log_number_ref(w->log_ref); + inserter.set_prot_info(w->batch->prot_info_.get()); + w->status = w->batch->Iterate(&inserter); + if (!w->status.ok()) { + return w->status; + } + assert(!seq_per_batch || w->batch_cnt != 0); + assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt); + } + return Status::OK(); +} + +Status WriteBatchInternal::InsertInto( + WriteThread::Writer* writer, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families, uint64_t log_number, DB* db, + bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt, + bool batch_per_txn, bool hint_per_batch) { +#ifdef NDEBUG + (void)batch_cnt; +#endif + assert(writer->ShouldWriteToMemtable()); + MemTableInserter inserter(sequence, memtables, flush_scheduler, + trim_history_scheduler, + ignore_missing_column_families, log_number, db, + concurrent_memtable_writes, nullptr /* prot_info */, + nullptr /*has_valid_writes*/, seq_per_batch, + batch_per_txn, hint_per_batch); + SetSequence(writer->batch, sequence); + inserter.set_log_number_ref(writer->log_ref); + inserter.set_prot_info(writer->batch->prot_info_.get()); + Status s = writer->batch->Iterate(&inserter); + assert(!seq_per_batch || batch_cnt != 0); + assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt); + if (concurrent_memtable_writes) { + inserter.PostProcess(); + } + return s; +} + +Status WriteBatchInternal::InsertInto( + const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families, uint64_t log_number, DB* db, + bool concurrent_memtable_writes, SequenceNumber* next_seq, + bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) { + MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler, + trim_history_scheduler, + ignore_missing_column_families, log_number, db, + concurrent_memtable_writes, batch->prot_info_.get(), + has_valid_writes, seq_per_batch, batch_per_txn); + Status s = batch->Iterate(&inserter); + if (next_seq != nullptr) { + *next_seq = inserter.sequence(); + } + if (concurrent_memtable_writes) { + inserter.PostProcess(); + } + return s; +} + +namespace { + +// This class updates protection info for a WriteBatch. +class ProtectionInfoUpdater : public WriteBatch::Handler { + public: + explicit ProtectionInfoUpdater(WriteBatch::ProtectionInfo* prot_info) + : prot_info_(prot_info) {} + + ~ProtectionInfoUpdater() override {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override { + return UpdateProtInfo(cf, key, val, kTypeValue); + } + + Status PutEntityCF(uint32_t cf, const Slice& key, + const Slice& entity) override { + return UpdateProtInfo(cf, key, entity, kTypeWideColumnEntity); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return UpdateProtInfo(cf, key, "", kTypeDeletion); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return UpdateProtInfo(cf, key, "", kTypeSingleDeletion); + } + + Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, + const Slice& end_key) override { + return UpdateProtInfo(cf, begin_key, end_key, kTypeRangeDeletion); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override { + return UpdateProtInfo(cf, key, val, kTypeMerge); + } + + Status PutBlobIndexCF(uint32_t cf, const Slice& key, + const Slice& val) override { + return UpdateProtInfo(cf, key, val, kTypeBlobIndex); + } + + Status MarkBeginPrepare(bool /* unprepare */) override { + return Status::OK(); + } + + Status MarkEndPrepare(const Slice& /* xid */) override { + return Status::OK(); + } + + Status MarkCommit(const Slice& /* xid */) override { return Status::OK(); } + + Status MarkCommitWithTimestamp(const Slice& /* xid */, + const Slice& /* ts */) override { + return Status::OK(); + } + + Status MarkRollback(const Slice& /* xid */) override { return Status::OK(); } + + Status MarkNoop(bool /* empty_batch */) override { return Status::OK(); } + + private: + Status UpdateProtInfo(uint32_t cf, const Slice& key, const Slice& val, + const ValueType op_type) { + if (prot_info_) { + prot_info_->entries_.emplace_back( + ProtectionInfo64().ProtectKVO(key, val, op_type).ProtectC(cf)); + } + return Status::OK(); + } + + // No copy or move. + ProtectionInfoUpdater(const ProtectionInfoUpdater&) = delete; + ProtectionInfoUpdater(ProtectionInfoUpdater&&) = delete; + ProtectionInfoUpdater& operator=(const ProtectionInfoUpdater&) = delete; + ProtectionInfoUpdater& operator=(ProtectionInfoUpdater&&) = delete; + + WriteBatch::ProtectionInfo* const prot_info_ = nullptr; +}; + +} // anonymous namespace + +Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { + assert(contents.size() >= WriteBatchInternal::kHeader); + assert(b->prot_info_ == nullptr); + + b->rep_.assign(contents.data(), contents.size()); + b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); + return Status::OK(); +} + +Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src, + const bool wal_only) { + assert(dst->Count() == 0 || + (dst->prot_info_ == nullptr) == (src->prot_info_ == nullptr)); + if ((src->prot_info_ != nullptr && + src->prot_info_->entries_.size() != src->Count()) || + (dst->prot_info_ != nullptr && + dst->prot_info_->entries_.size() != dst->Count())) { + return Status::Corruption( + "Write batch has inconsistent count and number of checksums"); + } + + size_t src_len; + int src_count; + uint32_t src_flags; + + const SavePoint& batch_end = src->GetWalTerminationPoint(); + + if (wal_only && !batch_end.is_cleared()) { + src_len = batch_end.size - WriteBatchInternal::kHeader; + src_count = batch_end.count; + src_flags = batch_end.content_flags; + } else { + src_len = src->rep_.size() - WriteBatchInternal::kHeader; + src_count = Count(src); + src_flags = src->content_flags_.load(std::memory_order_relaxed); + } + + if (src->prot_info_ != nullptr) { + if (dst->prot_info_ == nullptr) { + dst->prot_info_.reset(new WriteBatch::ProtectionInfo()); + } + std::copy(src->prot_info_->entries_.begin(), + src->prot_info_->entries_.begin() + src_count, + std::back_inserter(dst->prot_info_->entries_)); + } else if (dst->prot_info_ != nullptr) { + // dst has empty prot_info->entries + // In this special case, we allow write batch without prot_info to + // be appende to write batch with empty prot_info + dst->prot_info_ = nullptr; + } + SetCount(dst, Count(dst) + src_count); + assert(src->rep_.size() >= WriteBatchInternal::kHeader); + dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len); + dst->content_flags_.store( + dst->content_flags_.load(std::memory_order_relaxed) | src_flags, + std::memory_order_relaxed); + return Status::OK(); +} + +size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, + size_t rightByteSize) { + if (leftByteSize == 0 || rightByteSize == 0) { + return leftByteSize + rightByteSize; + } else { + return leftByteSize + rightByteSize - WriteBatchInternal::kHeader; + } +} + +Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb, + size_t bytes_per_key, + uint64_t* checksum) { + if (bytes_per_key == 0) { + if (wb->prot_info_ != nullptr) { + wb->prot_info_.reset(); + return Status::OK(); + } else { + // Already not protected. + return Status::OK(); + } + } else if (bytes_per_key == 8) { + if (wb->prot_info_ == nullptr) { + wb->prot_info_.reset(new WriteBatch::ProtectionInfo()); + ProtectionInfoUpdater prot_info_updater(wb->prot_info_.get()); + Status s = wb->Iterate(&prot_info_updater); + if (s.ok() && checksum != nullptr) { + uint64_t expected_hash = XXH3_64bits(wb->rep_.data(), wb->rep_.size()); + if (expected_hash != *checksum) { + return Status::Corruption("Write batch content corrupted."); + } + } + return s; + } else { + // Already protected. + return Status::OK(); + } + } + return Status::NotSupported( + "WriteBatch protection info must be zero or eight bytes/key"); +} + +} // namespace ROCKSDB_NAMESPACE |