summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/memtable_list.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/memtable_list.h')
-rw-r--r--src/rocksdb/db/memtable_list.h471
1 files changed, 471 insertions, 0 deletions
diff --git a/src/rocksdb/db/memtable_list.h b/src/rocksdb/db/memtable_list.h
new file mode 100644
index 000000000..1ad28a59e
--- /dev/null
+++ b/src/rocksdb/db/memtable_list.h
@@ -0,0 +1,471 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#include <deque>
+#include <limits>
+#include <list>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "db/logs_with_prep_tracker.h"
+#include "db/memtable.h"
+#include "db/range_del_aggregator.h"
+#include "file/filename.h"
+#include "logging/log_buffer.h"
+#include "monitoring/instrumented_mutex.h"
+#include "rocksdb/db.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/types.h"
+#include "util/autovector.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class ColumnFamilyData;
+class InternalKeyComparator;
+class InstrumentedMutex;
+class MergeIteratorBuilder;
+class MemTableList;
+
+struct FlushJobInfo;
+
+// keeps a list of immutable memtables in a vector. the list is immutable
+// if refcount is bigger than one. It is used as a state for Get() and
+// Iterator code paths
+//
+// This class is not thread-safe. External synchronization is required
+// (such as holding the db mutex or being on the write thread).
+class MemTableListVersion {
+ public:
+ explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
+ const MemTableListVersion& old);
+ explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
+ int max_write_buffer_number_to_maintain,
+ int64_t max_write_buffer_size_to_maintain);
+
+ void Ref();
+ void Unref(autovector<MemTable*>* to_delete = nullptr);
+
+ // Search all the memtables starting from the most recent one.
+ // Return the most recent value found, if any.
+ //
+ // If any operation was found for this key, its most recent sequence number
+ // will be stored in *seq on success (regardless of whether true/false is
+ // returned). Otherwise, *seq will be set to kMaxSequenceNumber.
+ bool Get(const LookupKey& key, std::string* value,
+ PinnableWideColumns* columns, std::string* timestamp, Status* s,
+ MergeContext* merge_context,
+ SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
+ const ReadOptions& read_opts, ReadCallback* callback = nullptr,
+ bool* is_blob_index = nullptr);
+
+ bool Get(const LookupKey& key, std::string* value,
+ PinnableWideColumns* columns, std::string* timestamp, Status* s,
+ MergeContext* merge_context,
+ SequenceNumber* max_covering_tombstone_seq,
+ const ReadOptions& read_opts, ReadCallback* callback = nullptr,
+ bool* is_blob_index = nullptr) {
+ SequenceNumber seq;
+ return Get(key, value, columns, timestamp, s, merge_context,
+ max_covering_tombstone_seq, &seq, read_opts, callback,
+ is_blob_index);
+ }
+
+ void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
+ ReadCallback* callback);
+
+ // Returns all the merge operands corresponding to the key by searching all
+ // memtables starting from the most recent one.
+ bool GetMergeOperands(const LookupKey& key, Status* s,
+ MergeContext* merge_context,
+ SequenceNumber* max_covering_tombstone_seq,
+ const ReadOptions& read_opts);
+
+ // Similar to Get(), but searches the Memtable history of memtables that
+ // have already been flushed. Should only be used from in-memory only
+ // queries (such as Transaction validation) as the history may contain
+ // writes that are also present in the SST files.
+ bool GetFromHistory(const LookupKey& key, std::string* value,
+ PinnableWideColumns* columns, std::string* timestamp,
+ Status* s, MergeContext* merge_context,
+ SequenceNumber* max_covering_tombstone_seq,
+ SequenceNumber* seq, const ReadOptions& read_opts,
+ bool* is_blob_index = nullptr);
+ bool GetFromHistory(const LookupKey& key, std::string* value,
+ PinnableWideColumns* columns, std::string* timestamp,
+ Status* s, MergeContext* merge_context,
+ SequenceNumber* max_covering_tombstone_seq,
+ const ReadOptions& read_opts,
+ bool* is_blob_index = nullptr) {
+ SequenceNumber seq;
+ return GetFromHistory(key, value, columns, timestamp, s, merge_context,
+ max_covering_tombstone_seq, &seq, read_opts,
+ is_blob_index);
+ }
+
+ Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
+ RangeDelAggregator* range_del_agg);
+
+ void AddIterators(const ReadOptions& options,
+ std::vector<InternalIterator*>* iterator_list,
+ Arena* arena);
+
+ void AddIterators(const ReadOptions& options,
+ MergeIteratorBuilder* merge_iter_builder,
+ bool add_range_tombstone_iter);
+
+ uint64_t GetTotalNumEntries() const;
+
+ uint64_t GetTotalNumDeletes() const;
+
+ MemTable::MemTableStats ApproximateStats(const Slice& start_ikey,
+ const Slice& end_ikey);
+
+ // Returns the value of MemTable::GetEarliestSequenceNumber() on the most
+ // recent MemTable in this list or kMaxSequenceNumber if the list is empty.
+ // If include_history=true, will also search Memtables in MemTableList
+ // History.
+ SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const;
+
+ // Return the first sequence number from the memtable list, which is the
+ // smallest sequence number of all FirstSequenceNumber.
+ // Return kMaxSequenceNumber if the list is empty.
+ SequenceNumber GetFirstSequenceNumber() const;
+
+ private:
+ friend class MemTableList;
+
+ friend Status InstallMemtableAtomicFlushResults(
+ const autovector<MemTableList*>* imm_lists,
+ const autovector<ColumnFamilyData*>& cfds,
+ const autovector<const MutableCFOptions*>& mutable_cf_options_list,
+ const autovector<const autovector<MemTable*>*>& mems_list,
+ VersionSet* vset, LogsWithPrepTracker* prep_tracker,
+ InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
+ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+ committed_flush_jobs_info,
+ autovector<MemTable*>* to_delete, FSDirectory* db_directory,
+ LogBuffer* log_buffer);
+
+ // REQUIRE: m is an immutable memtable
+ void Add(MemTable* m, autovector<MemTable*>* to_delete);
+ // REQUIRE: m is an immutable memtable
+ void Remove(MemTable* m, autovector<MemTable*>* to_delete);
+
+ // Return true if memtable is trimmed
+ bool TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
+
+ bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
+ std::string* value, PinnableWideColumns* columns,
+ std::string* timestamp, Status* s,
+ MergeContext* merge_context,
+ SequenceNumber* max_covering_tombstone_seq,
+ SequenceNumber* seq, const ReadOptions& read_opts,
+ ReadCallback* callback = nullptr,
+ bool* is_blob_index = nullptr);
+
+ void AddMemTable(MemTable* m);
+
+ void UnrefMemTable(autovector<MemTable*>* to_delete, MemTable* m);
+
+ // Calculate the total amount of memory used by memlist_ and memlist_history_
+ // excluding the last MemTable in memlist_history_. The reason for excluding
+ // the last MemTable is to see if dropping the last MemTable will keep total
+ // memory usage above or equal to max_write_buffer_size_to_maintain_
+ size_t MemoryAllocatedBytesExcludingLast() const;
+
+ // Whether this version contains flushed memtables that are only kept around
+ // for transaction conflict checking.
+ bool HasHistory() const { return !memlist_history_.empty(); }
+
+ bool MemtableLimitExceeded(size_t usage);
+
+ // Immutable MemTables that have not yet been flushed.
+ std::list<MemTable*> memlist_;
+
+ // MemTables that have already been flushed
+ // (used during Transaction validation)
+ std::list<MemTable*> memlist_history_;
+
+ // Maximum number of MemTables to keep in memory (including both flushed
+ const int max_write_buffer_number_to_maintain_;
+ // Maximum size of MemTables to keep in memory (including both flushed
+ // and not-yet-flushed tables).
+ const int64_t max_write_buffer_size_to_maintain_;
+
+ int refs_ = 0;
+
+ size_t* parent_memtable_list_memory_usage_;
+};
+
+// This class stores references to all the immutable memtables.
+// The memtables are flushed to L0 as soon as possible and in
+// any order. If there are more than one immutable memtable, their
+// flushes can occur concurrently. However, they are 'committed'
+// to the manifest in FIFO order to maintain correctness and
+// recoverability from a crash.
+//
+//
+// Other than imm_flush_needed and imm_trim_needed, this class is not
+// thread-safe and requires external synchronization (such as holding the db
+// mutex or being on the write thread.)
+class MemTableList {
+ public:
+ // A list of memtables.
+ explicit MemTableList(int min_write_buffer_number_to_merge,
+ int max_write_buffer_number_to_maintain,
+ int64_t max_write_buffer_size_to_maintain)
+ : imm_flush_needed(false),
+ imm_trim_needed(false),
+ min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
+ current_(new MemTableListVersion(&current_memory_usage_,
+ max_write_buffer_number_to_maintain,
+ max_write_buffer_size_to_maintain)),
+ num_flush_not_started_(0),
+ commit_in_progress_(false),
+ flush_requested_(false),
+ current_memory_usage_(0),
+ current_memory_allocted_bytes_excluding_last_(0),
+ current_has_history_(false) {
+ current_->Ref();
+ }
+
+ // Should not delete MemTableList without making sure MemTableList::current()
+ // is Unref()'d.
+ ~MemTableList() {}
+
+ MemTableListVersion* current() const { return current_; }
+
+ // so that background threads can detect non-nullptr pointer to
+ // determine whether there is anything more to start flushing.
+ std::atomic<bool> imm_flush_needed;
+
+ std::atomic<bool> imm_trim_needed;
+
+ // Returns the total number of memtables in the list that haven't yet
+ // been flushed and logged.
+ int NumNotFlushed() const;
+
+ // Returns total number of memtables in the list that have been
+ // completely flushed and logged.
+ int NumFlushed() const;
+
+ // Returns true if there is at least one memtable on which flush has
+ // not yet started.
+ bool IsFlushPending() const;
+
+ // Returns true if there is at least one memtable that is pending flush or
+ // flushing.
+ bool IsFlushPendingOrRunning() const;
+
+ // Returns the earliest memtables that needs to be flushed. The returned
+ // memtables are guaranteed to be in the ascending order of created time.
+ void PickMemtablesToFlush(uint64_t max_memtable_id,
+ autovector<MemTable*>* mems,
+ uint64_t* max_next_log_number = nullptr);
+
+ // Reset status of the given memtable list back to pending state so that
+ // they can get picked up again on the next round of flush.
+ void RollbackMemtableFlush(const autovector<MemTable*>& mems,
+ uint64_t file_number);
+
+ // Try commit a successful flush in the manifest file. It might just return
+ // Status::OK letting a concurrent flush to do the actual the recording.
+ Status TryInstallMemtableFlushResults(
+ ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
+ const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
+ VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
+ autovector<MemTable*>* to_delete, FSDirectory* db_directory,
+ LogBuffer* log_buffer,
+ std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
+ bool write_edits = true);
+
+ // New memtables are inserted at the front of the list.
+ // Takes ownership of the referenced held on *m by the caller of Add().
+ // By default, adding memtables will flag that the memtable list needs to be
+ // flushed, but in certain situations, like after a mempurge, we may want to
+ // avoid flushing the memtable list upon addition of a memtable.
+ void Add(MemTable* m, autovector<MemTable*>* to_delete);
+
+ // Returns an estimate of the number of bytes of data in use.
+ size_t ApproximateMemoryUsage();
+
+ // Returns the cached current_memory_allocted_bytes_excluding_last_ value.
+ size_t MemoryAllocatedBytesExcludingLast() const;
+
+ // Returns the cached current_has_history_ value.
+ bool HasHistory() const;
+
+ // Updates current_memory_allocted_bytes_excluding_last_ and
+ // current_has_history_ from MemTableListVersion. Must be called whenever
+ // InstallNewVersion is called.
+ void UpdateCachedValuesFromMemTableListVersion();
+
+ // `usage` is the current size of the mutable Memtable. When
+ // max_write_buffer_size_to_maintain is used, total size of mutable and
+ // immutable memtables is checked against it to decide whether to trim
+ // memtable list.
+ //
+ // Return true if memtable is trimmed
+ bool TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
+
+ // Returns an estimate of the number of bytes of data used by
+ // the unflushed mem-tables.
+ size_t ApproximateUnflushedMemTablesMemoryUsage();
+
+ // Returns an estimate of the timestamp of the earliest key.
+ uint64_t ApproximateOldestKeyTime() const;
+
+ // Request a flush of all existing memtables to storage. This will
+ // cause future calls to IsFlushPending() to return true if this list is
+ // non-empty (regardless of the min_write_buffer_number_to_merge
+ // parameter). This flush request will persist until the next time
+ // PickMemtablesToFlush() is called.
+ void FlushRequested() {
+ flush_requested_ = true;
+ // If there are some memtables stored in imm() that don't trigger
+ // flush (eg: mempurge output memtable), then update imm_flush_needed.
+ // Note: if race condition and imm_flush_needed is set to true
+ // when there is num_flush_not_started_==0, then there is no
+ // impact whatsoever. Imm_flush_needed is only used in an assert
+ // in IsFlushPending().
+ if (num_flush_not_started_ > 0) {
+ imm_flush_needed.store(true, std::memory_order_release);
+ }
+ }
+
+ bool HasFlushRequested() { return flush_requested_; }
+
+ // Returns true if a trim history should be scheduled and the caller should
+ // be the one to schedule it
+ bool MarkTrimHistoryNeeded() {
+ auto expected = false;
+ return imm_trim_needed.compare_exchange_strong(
+ expected, true, std::memory_order_relaxed, std::memory_order_relaxed);
+ }
+
+ void ResetTrimHistoryNeeded() {
+ auto expected = true;
+ imm_trim_needed.compare_exchange_strong(
+ expected, false, std::memory_order_relaxed, std::memory_order_relaxed);
+ }
+
+ // Copying allowed
+ // MemTableList(const MemTableList&);
+ // void operator=(const MemTableList&);
+
+ size_t* current_memory_usage() { return &current_memory_usage_; }
+
+ // Returns the min log containing the prep section after memtables listsed in
+ // `memtables_to_flush` are flushed and their status is persisted in manifest.
+ uint64_t PrecomputeMinLogContainingPrepSection(
+ const std::unordered_set<MemTable*>* memtables_to_flush = nullptr);
+
+ uint64_t GetEarliestMemTableID() const {
+ auto& memlist = current_->memlist_;
+ if (memlist.empty()) {
+ return std::numeric_limits<uint64_t>::max();
+ }
+ return memlist.back()->GetID();
+ }
+
+ uint64_t GetLatestMemTableID() const {
+ auto& memlist = current_->memlist_;
+ if (memlist.empty()) {
+ return 0;
+ }
+ return memlist.front()->GetID();
+ }
+
+ void AssignAtomicFlushSeq(const SequenceNumber& seq) {
+ const auto& memlist = current_->memlist_;
+ // Scan the memtable list from new to old
+ for (auto it = memlist.begin(); it != memlist.end(); ++it) {
+ MemTable* mem = *it;
+ if (mem->atomic_flush_seqno_ == kMaxSequenceNumber) {
+ mem->atomic_flush_seqno_ = seq;
+ } else {
+ // Earlier memtables must have been assigned a atomic flush seq, no
+ // need to continue scan.
+ break;
+ }
+ }
+ }
+
+ // Used only by DBImplSecondary during log replay.
+ // Remove memtables whose data were written before the WAL with log_number
+ // was created, i.e. mem->GetNextLogNumber() <= log_number. The memtables are
+ // not freed, but put into a vector for future deref and reclamation.
+ void RemoveOldMemTables(uint64_t log_number,
+ autovector<MemTable*>* to_delete);
+
+ private:
+ friend Status InstallMemtableAtomicFlushResults(
+ const autovector<MemTableList*>* imm_lists,
+ const autovector<ColumnFamilyData*>& cfds,
+ const autovector<const MutableCFOptions*>& mutable_cf_options_list,
+ const autovector<const autovector<MemTable*>*>& mems_list,
+ VersionSet* vset, LogsWithPrepTracker* prep_tracker,
+ InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
+ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+ committed_flush_jobs_info,
+ autovector<MemTable*>* to_delete, FSDirectory* db_directory,
+ LogBuffer* log_buffer);
+
+ // DB mutex held
+ void InstallNewVersion();
+
+ // DB mutex held
+ // Called after writing to MANIFEST
+ void RemoveMemTablesOrRestoreFlags(const Status& s, ColumnFamilyData* cfd,
+ size_t batch_count, LogBuffer* log_buffer,
+ autovector<MemTable*>* to_delete,
+ InstrumentedMutex* mu);
+
+ const int min_write_buffer_number_to_merge_;
+
+ MemTableListVersion* current_;
+
+ // the number of elements that still need flushing
+ int num_flush_not_started_;
+
+ // committing in progress
+ bool commit_in_progress_;
+
+ // Requested a flush of memtables to storage. It's possible to request that
+ // a subset of memtables be flushed.
+ bool flush_requested_;
+
+ // The current memory usage.
+ size_t current_memory_usage_;
+
+ // Cached value of current_->MemoryAllocatedBytesExcludingLast().
+ std::atomic<size_t> current_memory_allocted_bytes_excluding_last_;
+
+ // Cached value of current_->HasHistory().
+ std::atomic<bool> current_has_history_;
+};
+
+// Installs memtable atomic flush results.
+// In most cases, imm_lists is nullptr, and the function simply uses the
+// immutable memtable lists associated with the cfds. There are unit tests that
+// installs flush results for external immutable memtable lists other than the
+// cfds' own immutable memtable lists, e.g. MemTableLIstTest. In this case,
+// imm_lists parameter is not nullptr.
+extern Status InstallMemtableAtomicFlushResults(
+ const autovector<MemTableList*>* imm_lists,
+ const autovector<ColumnFamilyData*>& cfds,
+ const autovector<const MutableCFOptions*>& mutable_cf_options_list,
+ const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
+ LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
+ const autovector<FileMetaData*>& file_meta,
+ const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
+ committed_flush_jobs_info,
+ autovector<MemTable*>* to_delete, FSDirectory* db_directory,
+ LogBuffer* log_buffer);
+} // namespace ROCKSDB_NAMESPACE