diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/write_batch_internal.h | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/write_batch_internal.h | 401 |
1 files changed, 401 insertions, 0 deletions
diff --git a/src/rocksdb/db/write_batch_internal.h b/src/rocksdb/db/write_batch_internal.h new file mode 100644 index 000000000..1be0bd140 --- /dev/null +++ b/src/rocksdb/db/write_batch_internal.h @@ -0,0 +1,401 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include <array> +#include <vector> + +#include "db/flush_scheduler.h" +#include "db/kv_checksum.h" +#include "db/trim_history_scheduler.h" +#include "db/write_thread.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/types.h" +#include "rocksdb/write_batch.h" +#include "util/autovector.h" +#include "util/cast_util.h" + +namespace ROCKSDB_NAMESPACE { + +class MemTable; +class FlushScheduler; +class ColumnFamilyData; + +class ColumnFamilyMemTables { + public: + virtual ~ColumnFamilyMemTables() {} + virtual bool Seek(uint32_t column_family_id) = 0; + // returns true if the update to memtable should be ignored + // (useful when recovering from log whose updates have already + // been processed) + virtual uint64_t GetLogNumber() const = 0; + virtual MemTable* GetMemTable() const = 0; + virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; + virtual ColumnFamilyData* current() { return nullptr; } +}; + +class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { + public: + explicit ColumnFamilyMemTablesDefault(MemTable* mem) + : ok_(false), mem_(mem) {} + + bool Seek(uint32_t column_family_id) override { + ok_ = (column_family_id == 0); + return ok_; + } + + uint64_t GetLogNumber() const override { return 0; } + + MemTable* GetMemTable() const override { + assert(ok_); + return mem_; + } + + ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; } + + private: + bool ok_; + MemTable* mem_; +}; + +struct WriteBatch::ProtectionInfo { + // `WriteBatch` usually doesn't contain a huge number of keys so protecting + // with a fixed, non-configurable eight bytes per key may work well enough. + autovector<ProtectionInfoKVOC64> entries_; + + size_t GetBytesPerKey() const { return 8; } +}; + +// WriteBatchInternal provides static methods for manipulating a +// WriteBatch that we don't want in the public WriteBatch interface. +class WriteBatchInternal { + public: + // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. + static constexpr size_t kHeader = 12; + + // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* + static Status Put(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + + static Status Put(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value); + + static Status PutEntity(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const WideColumns& columns); + + static Status Delete(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key); + + static Status Delete(WriteBatch* batch, uint32_t column_family_id, + const Slice& key); + + static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key); + + static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id, + const Slice& key); + + static Status DeleteRange(WriteBatch* b, uint32_t column_family_id, + const Slice& begin_key, const Slice& end_key); + + static Status DeleteRange(WriteBatch* b, uint32_t column_family_id, + const SliceParts& begin_key, + const SliceParts& end_key); + + static Status Merge(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + + static Status Merge(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value); + + static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + + static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid, + const bool write_after_commit = true, + const bool unprepared_batch = false); + + static Status MarkRollback(WriteBatch* batch, const Slice& xid); + + static Status MarkCommit(WriteBatch* batch, const Slice& xid); + + static Status MarkCommitWithTimestamp(WriteBatch* batch, const Slice& xid, + const Slice& commit_ts); + + static Status InsertNoop(WriteBatch* batch); + + // Return the number of entries in the batch. + static uint32_t Count(const WriteBatch* batch); + + // Set the count for the number of entries in the batch. + static void SetCount(WriteBatch* batch, uint32_t n); + + // Return the sequence number for the start of this batch. + static SequenceNumber Sequence(const WriteBatch* batch); + + // Store the specified number as the sequence number for the start of + // this batch. + static void SetSequence(WriteBatch* batch, SequenceNumber seq); + + // Returns the offset of the first entry in the batch. + // This offset is only valid if the batch is not empty. + static size_t GetFirstOffset(WriteBatch* batch); + + static Slice Contents(const WriteBatch* batch) { return Slice(batch->rep_); } + + static size_t ByteSize(const WriteBatch* batch) { return batch->rep_.size(); } + + static Status SetContents(WriteBatch* batch, const Slice& contents); + + static Status CheckSlicePartsLength(const SliceParts& key, + const SliceParts& value); + + // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive. + // + // If ignore_missing_column_families == true. WriteBatch + // referencing non-existing column family will be ignored. + // If ignore_missing_column_families == false, processing of the + // batches will be stopped if a reference is found to a non-existing + // column family and InvalidArgument() will be returned. The writes + // in batches may be only partially applied at that point. + // + // If log_number is non-zero, the memtable will be updated only if + // memtables->GetLogNumber() >= log_number. + // + // If flush_scheduler is non-null, it will be invoked if the memtable + // should be flushed. + // + // Under concurrent use, the caller is responsible for making sure that + // the memtables object itself is thread-local. + static Status InsertInto( + WriteThread::WriteGroup& write_group, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families = false, uint64_t log_number = 0, + DB* db = nullptr, bool concurrent_memtable_writes = false, + bool seq_per_batch = false, bool batch_per_txn = true); + + // Convenience form of InsertInto when you have only one batch + // next_seq returns the seq after last sequence number used in MemTable insert + static Status InsertInto( + const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families = false, uint64_t log_number = 0, + DB* db = nullptr, bool concurrent_memtable_writes = false, + SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr, + bool seq_per_batch = false, bool batch_per_txn = true); + + static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, + TrimHistoryScheduler* trim_history_scheduler, + bool ignore_missing_column_families = false, + uint64_t log_number = 0, DB* db = nullptr, + bool concurrent_memtable_writes = false, + bool seq_per_batch = false, size_t batch_cnt = 0, + bool batch_per_txn = true, + bool hint_per_batch = false); + + // Appends src write batch to dst write batch and updates count in dst + // write batch. Returns OK if the append is successful. Checks number of + // checksum against count in dst and src write batches, and returns Corruption + // if the count is inconsistent. + static Status Append(WriteBatch* dst, const WriteBatch* src, + const bool WAL_only = false); + + // Returns the byte size of appending a WriteBatch with ByteSize + // leftByteSize and a WriteBatch with ByteSize rightByteSize + static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); + + // Iterate over [begin, end) range of a write batch + static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler, + size_t begin, size_t end); + + // This write batch includes the latest state that should be persisted. Such + // state meant to be used only during recovery. + static void SetAsLatestPersistentState(WriteBatch* b); + static bool IsLatestPersistentState(const WriteBatch* b); + + static std::tuple<Status, uint32_t, size_t> GetColumnFamilyIdAndTimestampSize( + WriteBatch* b, ColumnFamilyHandle* column_family); + + static bool TimestampsUpdateNeeded(const WriteBatch& wb) { + return wb.needs_in_place_update_ts_; + } + + static bool HasKeyWithTimestamp(const WriteBatch& wb) { + return wb.has_key_with_ts_; + } + + // Update per-key value protection information on this write batch. + // If checksum is provided, the batch content is verfied against the checksum. + static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key, + uint64_t* checksum = nullptr); +}; + +// LocalSavePoint is similar to a scope guard +class LocalSavePoint { + public: + explicit LocalSavePoint(WriteBatch* batch) + : batch_(batch), + savepoint_(batch->GetDataSize(), batch->Count(), + batch->content_flags_.load(std::memory_order_relaxed)) +#ifndef NDEBUG + , + committed_(false) +#endif + { + } + +#ifndef NDEBUG + ~LocalSavePoint() { assert(committed_); } +#endif + Status commit() { +#ifndef NDEBUG + committed_ = true; +#endif + if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) { + batch_->rep_.resize(savepoint_.size); + WriteBatchInternal::SetCount(batch_, savepoint_.count); + if (batch_->prot_info_ != nullptr) { + batch_->prot_info_->entries_.resize(savepoint_.count); + } + batch_->content_flags_.store(savepoint_.content_flags, + std::memory_order_relaxed); + return Status::MemoryLimit(); + } + return Status::OK(); + } + + private: + WriteBatch* batch_; + SavePoint savepoint_; +#ifndef NDEBUG + bool committed_; +#endif +}; + +template <typename TimestampSizeFuncType> +class TimestampUpdater : public WriteBatch::Handler { + public: + explicit TimestampUpdater(WriteBatch::ProtectionInfo* prot_info, + TimestampSizeFuncType&& ts_sz_func, const Slice& ts) + : prot_info_(prot_info), + ts_sz_func_(std::move(ts_sz_func)), + timestamp_(ts) { + assert(!timestamp_.empty()); + } + + ~TimestampUpdater() override {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { + return UpdateTimestamp(cf, key); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return UpdateTimestamp(cf, key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return UpdateTimestamp(cf, key); + } + + Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, + const Slice& end_key) override { + Status s = UpdateTimestamp(cf, begin_key, true /* is_key */); + if (s.ok()) { + s = UpdateTimestamp(cf, end_key, false /* is_key */); + } + return s; + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { + return UpdateTimestamp(cf, key); + } + + Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override { + return UpdateTimestamp(cf, key); + } + + Status MarkBeginPrepare(bool) override { return Status::OK(); } + + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + + Status MarkCommit(const Slice&) override { return Status::OK(); } + + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + return Status::OK(); + } + + Status MarkRollback(const Slice&) override { return Status::OK(); } + + Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } + + private: + // @param is_key specifies whether the update is for key or value. + Status UpdateTimestamp(uint32_t cf, const Slice& buf, bool is_key = true) { + Status s = UpdateTimestampImpl(cf, buf, idx_, is_key); + ++idx_; + return s; + } + + Status UpdateTimestampImpl(uint32_t cf, const Slice& buf, size_t /*idx*/, + bool is_key) { + if (timestamp_.empty()) { + return Status::InvalidArgument("Timestamp is empty"); + } + size_t cf_ts_sz = ts_sz_func_(cf); + if (0 == cf_ts_sz) { + // Skip this column family. + return Status::OK(); + } else if (std::numeric_limits<size_t>::max() == cf_ts_sz) { + // Column family timestamp info not found. + return Status::NotFound(); + } else if (cf_ts_sz != timestamp_.size()) { + return Status::InvalidArgument("timestamp size mismatch"); + } + UpdateProtectionInformationIfNeeded(buf, timestamp_, is_key); + + char* ptr = const_cast<char*>(buf.data() + buf.size() - cf_ts_sz); + assert(ptr); + memcpy(ptr, timestamp_.data(), timestamp_.size()); + return Status::OK(); + } + + void UpdateProtectionInformationIfNeeded(const Slice& buf, const Slice& ts, + bool is_key) { + if (prot_info_ != nullptr) { + const size_t ts_sz = ts.size(); + SliceParts old(&buf, 1); + Slice old_no_ts(buf.data(), buf.size() - ts_sz); + std::array<Slice, 2> new_key_cmpts{{old_no_ts, ts}}; + SliceParts new_parts(new_key_cmpts.data(), 2); + if (is_key) { + prot_info_->entries_[idx_].UpdateK(old, new_parts); + } else { + prot_info_->entries_[idx_].UpdateV(old, new_parts); + } + } + } + + // No copy or move. + TimestampUpdater(const TimestampUpdater&) = delete; + TimestampUpdater(TimestampUpdater&&) = delete; + TimestampUpdater& operator=(const TimestampUpdater&) = delete; + TimestampUpdater& operator=(TimestampUpdater&&) = delete; + + WriteBatch::ProtectionInfo* const prot_info_ = nullptr; + const TimestampSizeFuncType ts_sz_func_{}; + const Slice timestamp_; + size_t idx_ = 0; +}; + +} // namespace ROCKSDB_NAMESPACE |