From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/rocksdb/db/memtable.h | 542 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 542 insertions(+) create mode 100644 src/rocksdb/db/memtable.h (limited to 'src/rocksdb/db/memtable.h') diff --git a/src/rocksdb/db/memtable.h b/src/rocksdb/db/memtable.h new file mode 100644 index 000000000..f4e4b98a9 --- /dev/null +++ b/src/rocksdb/db/memtable.h @@ -0,0 +1,542 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include "db/dbformat.h" +#include "db/range_tombstone_fragmenter.h" +#include "db/read_callback.h" +#include "db/version_edit.h" +#include "memory/allocator.h" +#include "memory/concurrent_arena.h" +#include "monitoring/instrumented_mutex.h" +#include "options/cf_options.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/memtablerep.h" +#include "table/multiget_context.h" +#include "util/dynamic_bloom.h" +#include "util/hash.h" + +namespace ROCKSDB_NAMESPACE { + +struct FlushJobInfo; +class Mutex; +class MemTableIterator; +class MergeContext; + +struct ImmutableMemTableOptions { + explicit ImmutableMemTableOptions(const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options); + size_t arena_block_size; + uint32_t memtable_prefix_bloom_bits; + size_t memtable_huge_page_size; + bool memtable_whole_key_filtering; + bool inplace_update_support; + size_t inplace_update_num_locks; + UpdateStatus (*inplace_callback)(char* existing_value, + uint32_t* existing_value_size, + Slice delta_value, + std::string* merged_value); + size_t max_successive_merges; + Statistics* statistics; + MergeOperator* merge_operator; + Logger* info_log; +}; + +// Batched counters to updated when inserting keys in one write batch. +// In post process of the write batch, these can be updated together. +// Only used in concurrent memtable insert case. +struct MemTablePostProcessInfo { + uint64_t data_size = 0; + uint64_t num_entries = 0; + uint64_t num_deletes = 0; +}; + +using MultiGetRange = MultiGetContext::Range; +// Note: Many of the methods in this class have comments indicating that +// external synchronization is required as these methods are not thread-safe. +// It is up to higher layers of code to decide how to prevent concurrent +// invokation of these methods. This is usually done by acquiring either +// the db mutex or the single writer thread. +// +// Some of these methods are documented to only require external +// synchronization if this memtable is immutable. Calling MarkImmutable() is +// not sufficient to guarantee immutability. It is up to higher layers of +// code to determine if this MemTable can still be modified by other threads. +// Eg: The Superversion stores a pointer to the current MemTable (that can +// be modified) and a separate list of the MemTables that can no longer be +// written to (aka the 'immutable memtables'). +class MemTable { + public: + struct KeyComparator : public MemTableRep::KeyComparator { + const InternalKeyComparator comparator; + explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } + virtual int operator()(const char* prefix_len_key1, + const char* prefix_len_key2) const override; + virtual int operator()(const char* prefix_len_key, + const DecodedType& key) const override; + }; + + // MemTables are reference counted. The initial reference count + // is zero and the caller must call Ref() at least once. + // + // earliest_seq should be the current SequenceNumber in the db such that any + // key inserted into this memtable will have an equal or larger seq number. + // (When a db is first created, the earliest sequence number will be 0). + // If the earliest sequence number is not known, kMaxSequenceNumber may be + // used, but this may prevent some transactions from succeeding until the + // first key is inserted into the memtable. + explicit MemTable(const InternalKeyComparator& comparator, + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + WriteBufferManager* write_buffer_manager, + SequenceNumber earliest_seq, uint32_t column_family_id); + // No copying allowed + MemTable(const MemTable&) = delete; + MemTable& operator=(const MemTable&) = delete; + + // Do not delete this MemTable unless Unref() indicates it not in use. + ~MemTable(); + + // Increase reference count. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void Ref() { ++refs_; } + + // Drop reference count. + // If the refcount goes to zero return this memtable, otherwise return null. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + MemTable* Unref() { + --refs_; + assert(refs_ >= 0); + if (refs_ <= 0) { + return this; + } + return nullptr; + } + + // Returns an estimate of the number of bytes of data in use by this + // data structure. + // + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + size_t ApproximateMemoryUsage(); + + // As a cheap version of `ApproximateMemoryUsage()`, this function doens't + // require external synchronization. The value may be less accurate though + size_t ApproximateMemoryUsageFast() const { + return approximate_memory_usage_.load(std::memory_order_relaxed); + } + + // This method heuristically determines if the memtable should continue to + // host more data. + bool ShouldScheduleFlush() const { + return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED; + } + + // Returns true if a flush should be scheduled and the caller should + // be the one to schedule it + bool MarkFlushScheduled() { + auto before = FLUSH_REQUESTED; + return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, + std::memory_order_relaxed, + std::memory_order_relaxed); + } + + // Return an iterator that yields the contents of the memtable. + // + // The caller must ensure that the underlying MemTable remains live + // while the returned iterator is live. The keys returned by this + // iterator are internal keys encoded by AppendInternalKey in the + // db/dbformat.{h,cc} module. + // + // By default, it returns an iterator for prefix seek if prefix_extractor + // is configured in Options. + // arena: If not null, the arena needs to be used to allocate the Iterator. + // Calling ~Iterator of the iterator will destroy all the states but + // those allocated in arena. + InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena); + + FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + const ReadOptions& read_options, SequenceNumber read_seq); + + // Add an entry into memtable that maps key to value at the + // specified sequence number and with the specified type. + // Typically value will be empty if type==kTypeDeletion. + // + // REQUIRES: if allow_concurrent = false, external synchronization to prevent + // simultaneous operations on the same MemTable. + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + bool Add(SequenceNumber seq, ValueType type, const Slice& key, + const Slice& value, bool allow_concurrent = false, + MemTablePostProcessInfo* post_process_info = nullptr, + void** hint = nullptr); + + // Used to Get value associated with key or Get Merge Operands associated + // with key. + // If do_merge = true the default behavior which is Get value for key is + // executed. Expected behavior is described right below. + // If memtable contains a value for key, store it in *value and return true. + // If memtable contains a deletion for key, store a NotFound() error + // in *status and return true. + // If memtable contains Merge operation as the most recent entry for a key, + // and the merge process does not stop (not reaching a value or delete), + // prepend the current merge operand to *operands. + // store MergeInProgress in s, and return false. + // Else, return false. + // If any operation was found, its most recent sequence number + // will be stored in *seq on success (regardless of whether true/false is + // returned). Otherwise, *seq will be set to kMaxSequenceNumber. + // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other + // status returned indicates a corruption or other unexpected error. + // If do_merge = false then any Merge Operands encountered for key are simply + // stored in merge_context.operands_list and never actually merged to get a + // final value. The raw Merge Operands are eventually returned to the user. + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, + const ReadOptions& read_opts, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr, bool do_merge = true); + + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + const ReadOptions& read_opts, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr, bool do_merge = true) { + SequenceNumber seq; + return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq, + read_opts, callback, is_blob_index, do_merge); + } + + void MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob); + + // Attempts to update the new_value inplace, else does normal Add + // Pseudocode + // if key exists in current memtable && prev_value is of type kTypeValue + // if new sizeof(new_value) <= sizeof(prev_value) + // update inplace + // else add(key, new_value) + // else add(key, new_value) + // + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void Update(SequenceNumber seq, + const Slice& key, + const Slice& value); + + // If prev_value for key exists, attempts to update it inplace. + // else returns false + // Pseudocode + // if key exists in current memtable && prev_value is of type kTypeValue + // new_value = delta(prev_value) + // if sizeof(new_value) <= sizeof(prev_value) + // update inplace + // else add(key, new_value) + // else return false + // + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + bool UpdateCallback(SequenceNumber seq, + const Slice& key, + const Slice& delta); + + // Returns the number of successive merge entries starting from the newest + // entry for the key up to the last non-merge entry or last entry for the + // key in the memtable. + size_t CountSuccessiveMergeEntries(const LookupKey& key); + + // Update counters and flush status after inserting a whole write batch + // Used in concurrent memtable inserts. + void BatchPostProcess(const MemTablePostProcessInfo& update_counters) { + num_entries_.fetch_add(update_counters.num_entries, + std::memory_order_relaxed); + data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed); + if (update_counters.num_deletes != 0) { + num_deletes_.fetch_add(update_counters.num_deletes, + std::memory_order_relaxed); + } + UpdateFlushState(); + } + + // Get total number of entries in the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + uint64_t num_entries() const { + return num_entries_.load(std::memory_order_relaxed); + } + + // Get total number of deletes in the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + uint64_t num_deletes() const { + return num_deletes_.load(std::memory_order_relaxed); + } + + uint64_t get_data_size() const { + return data_size_.load(std::memory_order_relaxed); + } + + // Dynamically change the memtable's capacity. If set below the current usage, + // the next key added will trigger a flush. Can only increase size when + // memtable prefix bloom is disabled, since we can't easily allocate more + // space. + void UpdateWriteBufferSize(size_t new_write_buffer_size) { + if (bloom_filter_ == nullptr || + new_write_buffer_size < write_buffer_size_) { + write_buffer_size_.store(new_write_buffer_size, + std::memory_order_relaxed); + } + } + + // Returns the edits area that is needed for flushing the memtable + VersionEdit* GetEdits() { return &edit_; } + + // Returns if there is no entry inserted to the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + bool IsEmpty() const { return first_seqno_ == 0; } + + // Returns the sequence number of the first element that was inserted + // into the memtable. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + SequenceNumber GetFirstSequenceNumber() { + return first_seqno_.load(std::memory_order_relaxed); + } + + // Returns the sequence number that is guaranteed to be smaller than or equal + // to the sequence number of any key that could be inserted into this + // memtable. It can then be assumed that any write with a larger(or equal) + // sequence number will be present in this memtable or a later memtable. + // + // If the earliest sequence number could not be determined, + // kMaxSequenceNumber will be returned. + SequenceNumber GetEarliestSequenceNumber() { + return earliest_seqno_.load(std::memory_order_relaxed); + } + + // DB's latest sequence ID when the memtable is created. This number + // may be updated to a more recent one before any key is inserted. + SequenceNumber GetCreationSeq() const { return creation_seq_; } + + void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; } + + // Returns the next active logfile number when this memtable is about to + // be flushed to storage + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + uint64_t GetNextLogNumber() { return mem_next_logfile_number_; } + + // Sets the next active logfile number when this memtable is about to + // be flushed to storage + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } + + // if this memtable contains data from a committed + // two phase transaction we must take note of the + // log which contains that data so we can know + // when to relese that log + void RefLogContainingPrepSection(uint64_t log); + uint64_t GetMinLogContainingPrepSection(); + + // Notify the underlying storage that no more items will be added. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + // After MarkImmutable() is called, you should not attempt to + // write anything to this MemTable(). (Ie. do not call Add() or Update()). + void MarkImmutable() { + table_->MarkReadOnly(); + mem_tracker_.DoneAllocating(); + } + + // Notify the underlying storage that all data it contained has been + // persisted. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void MarkFlushed() { + table_->MarkFlushed(); + } + + // return true if the current MemTableRep supports merge operator. + bool IsMergeOperatorSupported() const { + return table_->IsMergeOperatorSupported(); + } + + // return true if the current MemTableRep supports snapshots. + // inplace update prevents snapshots, + bool IsSnapshotSupported() const { + return table_->IsSnapshotSupported() && !moptions_.inplace_update_support; + } + + struct MemTableStats { + uint64_t size; + uint64_t count; + }; + + MemTableStats ApproximateStats(const Slice& start_ikey, + const Slice& end_ikey); + + // Get the lock associated for the key + port::RWMutex* GetLock(const Slice& key); + + const InternalKeyComparator& GetInternalKeyComparator() const { + return comparator_.comparator; + } + + const ImmutableMemTableOptions* GetImmutableMemTableOptions() const { + return &moptions_; + } + + uint64_t ApproximateOldestKeyTime() const { + return oldest_key_time_.load(std::memory_order_relaxed); + } + + // REQUIRES: db_mutex held. + void SetID(uint64_t id) { id_ = id; } + + uint64_t GetID() const { return id_; } + + void SetFlushCompleted(bool completed) { flush_completed_ = completed; } + + uint64_t GetFileNumber() const { return file_number_; } + + void SetFileNumber(uint64_t file_num) { file_number_ = file_num; } + + void SetFlushInProgress(bool in_progress) { + flush_in_progress_ = in_progress; + } + +#ifndef ROCKSDB_LITE + void SetFlushJobInfo(std::unique_ptr&& info) { + flush_job_info_ = std::move(info); + } + + std::unique_ptr ReleaseFlushJobInfo() { + return std::move(flush_job_info_); + } +#endif // !ROCKSDB_LITE + + private: + enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; + + friend class MemTableIterator; + friend class MemTableBackwardIterator; + friend class MemTableList; + + KeyComparator comparator_; + const ImmutableMemTableOptions moptions_; + int refs_; + const size_t kArenaBlockSize; + AllocTracker mem_tracker_; + ConcurrentArena arena_; + std::unique_ptr table_; + std::unique_ptr range_del_table_; + std::atomic_bool is_range_del_table_empty_; + + // Total data size of all data inserted + std::atomic data_size_; + std::atomic num_entries_; + std::atomic num_deletes_; + + // Dynamically changeable memtable option + std::atomic write_buffer_size_; + + // These are used to manage memtable flushes to storage + bool flush_in_progress_; // started the flush + bool flush_completed_; // finished the flush + uint64_t file_number_; // filled up after flush is complete + + // The updates to be applied to the transaction log when this + // memtable is flushed to storage. + VersionEdit edit_; + + // The sequence number of the kv that was inserted first + std::atomic first_seqno_; + + // The db sequence number at the time of creation or kMaxSequenceNumber + // if not set. + std::atomic earliest_seqno_; + + SequenceNumber creation_seq_; + + // The log files earlier than this number can be deleted. + uint64_t mem_next_logfile_number_; + + // the earliest log containing a prepared section + // which has been inserted into this memtable. + std::atomic min_prep_log_referenced_; + + // rw locks for inplace updates + std::vector locks_; + + const SliceTransform* const prefix_extractor_; + std::unique_ptr bloom_filter_; + + std::atomic flush_state_; + + Env* env_; + + // Extract sequential insert prefixes. + const SliceTransform* insert_with_hint_prefix_extractor_; + + // Insert hints for each prefix. + std::unordered_map insert_hints_; + + // Timestamp of oldest key + std::atomic oldest_key_time_; + + // Memtable id to track flush. + uint64_t id_ = 0; + + // Sequence number of the atomic flush that is responsible for this memtable. + // The sequence number of atomic flush is a seq, such that no writes with + // sequence numbers greater than or equal to seq are flushed, while all + // writes with sequence number smaller than seq are flushed. + SequenceNumber atomic_flush_seqno_; + + // keep track of memory usage in table_, arena_, and range_del_table_. + // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow` + std::atomic approximate_memory_usage_; + +#ifndef ROCKSDB_LITE + // Flush job info of the current memtable. + std::unique_ptr flush_job_info_; +#endif // !ROCKSDB_LITE + + // Returns a heuristic flush decision + bool ShouldFlushNow(); + + // Updates flush_state_ using ShouldFlushNow() + void UpdateFlushState(); + + void UpdateOldestKeyTime(); + + void GetFromTable(const LookupKey& key, + SequenceNumber max_covering_tombstone_seq, bool do_merge, + ReadCallback* callback, bool* is_blob_index, + std::string* value, Status* s, MergeContext* merge_context, + SequenceNumber* seq, bool* found_final_value, + bool* merge_in_progress); +}; + +extern const char* EncodeKey(std::string* scratch, const Slice& target); + +} // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3