From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/utilities/blob_db/blob_db_impl.h | 503 +++++++++++++++++++++++++++ 1 file changed, 503 insertions(+) create mode 100644 src/rocksdb/utilities/blob_db/blob_db_impl.h (limited to 'src/rocksdb/utilities/blob_db/blob_db_impl.h') diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.h b/src/rocksdb/utilities/blob_db/blob_db_impl.h new file mode 100644 index 000000000..0b4dbf5e5 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_impl.h @@ -0,0 +1,503 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "db/blob/blob_log_format.h" +#include "db/blob/blob_log_writer.h" +#include "db/db_iter.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" +#include "rocksdb/file_system.h" +#include "rocksdb/listener.h" +#include "rocksdb/options.h" +#include "rocksdb/statistics.h" +#include "rocksdb/wal_filter.h" +#include "util/mutexlock.h" +#include "util/timer_queue.h" +#include "utilities/blob_db/blob_db.h" +#include "utilities/blob_db/blob_file.h" + +namespace ROCKSDB_NAMESPACE { + +class DBImpl; +class ColumnFamilyHandle; +class ColumnFamilyData; +class SystemClock; + +struct FlushJobInfo; + +namespace blob_db { + +struct BlobCompactionContext; +struct BlobCompactionContextGC; +class BlobDBImpl; +class BlobFile; + +// Comparator to sort "TTL" aware Blob files based on the lower value of +// TTL range. +struct BlobFileComparatorTTL { + bool operator()(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) const; +}; + +struct BlobFileComparator { + bool operator()(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) const; +}; + +/** + * The implementation class for BlobDB. It manages the blob logs, which + * are sequentially written files. Blob logs can be of the TTL or non-TTL + * varieties; the former are cleaned up when they expire, while the latter + * are (optionally) garbage collected. + */ +class BlobDBImpl : public BlobDB { + friend class BlobFile; + friend class BlobDBIterator; + friend class BlobDBListener; + friend class BlobDBListenerGC; + friend class BlobIndexCompactionFilterBase; + friend class BlobIndexCompactionFilterGC; + + public: + // deletions check period + static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000; + + // sanity check task + static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000; + + // how many random access open files can we tolerate + static constexpr uint32_t kOpenFilesTrigger = 100; + + // how often to schedule reclaim open files. + static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000; + + // how often to schedule delete obs files periods + static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000; + + // how often to schedule expired files eviction. + static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000; + + // when should oldest file be evicted: + // on reaching 90% of blob_dir_size + static constexpr double kEvictOldestFileAtSize = 0.9; + + using BlobDB::Put; + Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override; + + using BlobDB::Get; + Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) override; + + Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value, + uint64_t* expiration) override; + + using BlobDB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& read_options) override; + + using BlobDB::NewIterators; + virtual Status NewIterators( + const ReadOptions& /*read_options*/, + const std::vector& /*column_families*/, + std::vector* /*iterators*/) override { + return Status::NotSupported("Not implemented"); + } + + using BlobDB::MultiGet; + virtual std::vector MultiGet( + const ReadOptions& read_options, const std::vector& keys, + std::vector* values) override; + + using BlobDB::Write; + virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + + virtual Status Close() override; + + using BlobDB::PutWithTTL; + Status PutWithTTL(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) override; + + using BlobDB::PutUntil; + Status PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) override; + + using BlobDB::CompactFiles; + Status CompactFiles( + const CompactionOptions& compact_options, + const std::vector& input_file_names, const int output_level, + const int output_path_id = -1, + std::vector* const output_file_names = nullptr, + CompactionJobInfo* compaction_job_info = nullptr) override; + + BlobDBOptions GetBlobDBOptions() const override; + + BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options, + const DBOptions& db_options, + const ColumnFamilyOptions& cf_options); + + virtual Status DisableFileDeletions() override; + + virtual Status EnableFileDeletions(bool force) override; + + virtual Status GetLiveFiles(std::vector&, + uint64_t* manifest_file_size, + bool flush_memtable = true) override; + virtual void GetLiveFilesMetaData(std::vector*) override; + + ~BlobDBImpl(); + + Status Open(std::vector* handles); + + Status SyncBlobFiles() override; + + // Common part of the two GetCompactionContext methods below. + // REQUIRES: read lock on mutex_ + void GetCompactionContextCommon(BlobCompactionContext* context); + + void GetCompactionContext(BlobCompactionContext* context); + void GetCompactionContext(BlobCompactionContext* context, + BlobCompactionContextGC* context_gc); + +#ifndef NDEBUG + Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value); + + void TEST_AddDummyBlobFile(uint64_t blob_file_number, + SequenceNumber immutable_sequence); + + std::vector> TEST_GetBlobFiles() const; + + std::vector> TEST_GetLiveImmNonTTLFiles() const; + + std::vector> TEST_GetObsoleteFiles() const; + + Status TEST_CloseBlobFile(std::shared_ptr& bfile); + + void TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, + SequenceNumber obsolete_seq = 0, + bool update_size = true); + + void TEST_EvictExpiredFiles(); + + void TEST_DeleteObsoleteFiles(); + + uint64_t TEST_live_sst_size(); + + const std::string& TEST_blob_dir() const { return blob_dir_; } + + void TEST_InitializeBlobFileToSstMapping( + const std::vector& live_files); + + void TEST_ProcessFlushJobInfo(const FlushJobInfo& info); + + void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info); + +#endif // !NDEBUG + + private: + class BlobInserter; + + // Create a snapshot if there isn't one in read options. + // Return true if a snapshot is created. + bool SetSnapshotIfNeeded(ReadOptions* read_options); + + Status GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration = nullptr); + + Status GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value, uint64_t* expiration = nullptr); + + Status GetRawBlobFromFile(const Slice& key, uint64_t file_number, + uint64_t offset, uint64_t size, + PinnableSlice* value, + CompressionType* compression_type); + + Slice GetCompressedSlice(const Slice& raw, + std::string* compression_output) const; + + Status DecompressSlice(const Slice& compressed_value, + CompressionType compression_type, + PinnableSlice* value_output) const; + + // Close a file by appending a footer, and removes file from open files list. + // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_ + // and the blob file's mutex_. If called on a blob file which is visible only + // to a single thread (like in the case of new files written during + // compaction/GC), the locks on write_mutex_ and the blob file's mutex_ can be + // avoided. + Status CloseBlobFile(std::shared_ptr bfile); + + // Close a file if its size exceeds blob_file_size + // REQUIRES: lock held on write_mutex_. + Status CloseBlobFileIfNeeded(std::shared_ptr& bfile); + + // Mark file as obsolete and move the file to obsolete file list. + // + // REQUIRED: hold write lock of mutex_ or during DB open. + void ObsoleteBlobFile(std::shared_ptr blob_file, + SequenceNumber obsolete_seq, bool update_size); + + Status PutBlobValue(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration, + WriteBatch* batch); + + Status AppendBlob(const std::shared_ptr& bfile, + const std::string& headerbuf, const Slice& key, + const Slice& value, uint64_t expiration, + std::string* index_entry); + + // Create a new blob file and associated writer. + Status CreateBlobFileAndWriter(bool has_ttl, + const ExpirationRange& expiration_range, + const std::string& reason, + std::shared_ptr* blob_file, + std::shared_ptr* writer); + + // Get the open non-TTL blob log file, or create a new one if no such file + // exists. + Status SelectBlobFile(std::shared_ptr* blob_file); + + // Get the open TTL blob log file for a certain expiration, or create a new + // one if no such file exists. + Status SelectBlobFileTTL(uint64_t expiration, + std::shared_ptr* blob_file); + + std::shared_ptr FindBlobFileLocked(uint64_t expiration) const; + + // periodic sanity check. Bunch of checks + std::pair SanityCheck(bool aborted); + + // Delete files that have been marked obsolete (either because of TTL + // or GC). Check whether any snapshots exist which refer to the same. + std::pair DeleteObsoleteFiles(bool aborted); + + // periodically check if open blob files and their TTL's has expired + // if expired, close the sequential writer and make the file immutable + std::pair EvictExpiredFiles(bool aborted); + + // if the number of open files, approaches ULIMIT's this + // task will close random readers, which are kept around for + // efficiency + std::pair ReclaimOpenFiles(bool aborted); + + std::pair RemoveTimerQ(TimerQueue* tq, bool aborted); + + // Adds the background tasks to the timer queue + void StartBackgroundTasks(); + + // add a new Blob File + std::shared_ptr NewBlobFile(bool has_ttl, + const ExpirationRange& expiration_range, + const std::string& reason); + + // Register a new blob file. + // REQUIRES: write lock on mutex_. + void RegisterBlobFile(std::shared_ptr blob_file); + + // collect all the blob log files from the blob directory + Status GetAllBlobFiles(std::set* file_numbers); + + // Open all blob files found in blob_dir. + Status OpenAllBlobFiles(); + + // Link an SST to a blob file. Comes in locking and non-locking varieties + // (the latter is used during Open). + template + void LinkSstToBlobFileImpl(uint64_t sst_file_number, + uint64_t blob_file_number, Linker linker); + + void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number); + + void LinkSstToBlobFileNoLock(uint64_t sst_file_number, + uint64_t blob_file_number); + + // Unlink an SST from a blob file. + void UnlinkSstFromBlobFile(uint64_t sst_file_number, + uint64_t blob_file_number); + + // Initialize the mapping between blob files and SSTs during Open. + void InitializeBlobFileToSstMapping( + const std::vector& live_files); + + // Update the mapping between blob files and SSTs after a flush and mark + // any unneeded blob files obsolete. + void ProcessFlushJobInfo(const FlushJobInfo& info); + + // Update the mapping between blob files and SSTs after a compaction and + // mark any unneeded blob files obsolete. + void ProcessCompactionJobInfo(const CompactionJobInfo& info); + + // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs + // linked to it, and all memtables from before the blob file became immutable + // have been flushed. Note: should only be called if the condition holds for + // all lower-numbered non-TTL blob files as well. + bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr& blob_file, + SequenceNumber obsolete_seq); + + // Mark all immutable non-TTL blob files that aren't needed by any SSTs as + // obsolete. Comes in two varieties; the version used during Open need not + // worry about locking or snapshots. + template + void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed); + + void MarkUnreferencedBlobFilesObsolete(); + void MarkUnreferencedBlobFilesObsoleteDuringOpen(); + + void UpdateLiveSSTSize(); + + Status GetBlobFileReader(const std::shared_ptr& blob_file, + std::shared_ptr* reader); + + // hold write mutex on file and call. + // Close the above Random Access reader + void CloseRandomAccessLocked(const std::shared_ptr& bfile); + + // hold write mutex on file and call + // creates a sequential (append) writer for this blobfile + Status CreateWriterLocked(const std::shared_ptr& bfile); + + // returns a BlobLogWriter object for the file. If writer is not + // already present, creates one. Needs Write Mutex to be held + Status CheckOrCreateWriterLocked(const std::shared_ptr& blob_file, + std::shared_ptr* writer); + + // checks if there is no snapshot which is referencing the + // blobs + bool VisibleToActiveSnapshot(const std::shared_ptr& file); + bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr& bfile); + + void CopyBlobFiles(std::vector>* bfiles_copy); + + uint64_t EpochNow() { return clock_->NowMicros() / 1000000; } + + // Check if inserting a new blob will make DB grow out of space. + // If is_fifo = true, FIFO eviction will be triggered to make room for the + // new blob. If force_evict = true, FIFO eviction will evict blob files + // even eviction will not make enough room for the new blob. + Status CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict = false); + + // name of the database directory + std::string dbname_; + + // the base DB + DBImpl* db_impl_; + Env* env_; + SystemClock* clock_; + // the options that govern the behavior of Blob Storage + BlobDBOptions bdb_options_; + DBOptions db_options_; + ColumnFamilyOptions cf_options_; + FileOptions file_options_; + + // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold + // ownership. + Statistics* statistics_; + + // by default this is "blob_dir" under dbname_ + // but can be configured + std::string blob_dir_; + + // pointer to directory + std::unique_ptr dir_ent_; + + // Read Write Mutex, which protects all the data structures + // HEAVILY TRAFFICKED + mutable port::RWMutex mutex_; + + // Writers has to hold write_mutex_ before writing. + mutable port::Mutex write_mutex_; + + // counter for blob file number + std::atomic next_file_number_; + + // entire metadata of all the BLOB files memory + std::map> blob_files_; + + // All live immutable non-TTL blob files. + std::map> live_imm_non_ttl_blob_files_; + + // The largest sequence number that has been flushed. + SequenceNumber flush_sequence_; + + // opened non-TTL blob file. + std::shared_ptr open_non_ttl_file_; + + // all the blob files which are currently being appended to based + // on variety of incoming TTL's + std::set, BlobFileComparatorTTL> open_ttl_files_; + + // Flag to check whether Close() has been called on this DB + bool closed_; + + // timer based queue to execute tasks + TimerQueue tqueue_; + + // number of files opened for random access/GET + // counter is used to monitor and close excess RA files. + std::atomic open_file_count_; + + // Total size of all live blob files (i.e. exclude obsolete files). + std::atomic total_blob_size_; + + // total size of SST files. + std::atomic live_sst_size_; + + // Latest FIFO eviction timestamp + // + // REQUIRES: access with metex_ lock held. + uint64_t fifo_eviction_seq_; + + // The expiration up to which latest FIFO eviction evicts. + // + // REQUIRES: access with metex_ lock held. + uint64_t evict_expiration_up_to_; + + std::list> obsolete_files_; + + // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block + // on the mutex to avoid contention. + // + // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note + // the difference. mutex_ only needs to be held when access the + // data-structure, and delete_file_mutex_ needs to be held the whole time + // during DeleteObsoleteFiles to avoid being run simultaneously with + // DisableFileDeletions. + // + // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced + // to hold delete_file_mutex_ first to avoid deadlock. + mutable port::Mutex delete_file_mutex_; + + // Each call of DisableFileDeletions will increase disable_file_deletion_ + // by 1. EnableFileDeletions will either decrease the count by 1 or reset + // it to zeor, depending on the force flag. + // + // REQUIRES: access with delete_file_mutex_ held. + int disable_file_deletions_ = 0; + + uint32_t debug_level_; +}; + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE -- cgit v1.2.3