From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- .../utilities/blob_db/blob_compaction_filter.cc | 490 ++++ .../utilities/blob_db/blob_compaction_filter.h | 204 ++ src/rocksdb/utilities/blob_db/blob_db.cc | 114 + src/rocksdb/utilities/blob_db/blob_db.h | 266 +++ src/rocksdb/utilities/blob_db/blob_db_gc_stats.h | 56 + src/rocksdb/utilities/blob_db/blob_db_impl.cc | 2177 ++++++++++++++++++ src/rocksdb/utilities/blob_db/blob_db_impl.h | 503 ++++ .../utilities/blob_db/blob_db_impl_filesnapshot.cc | 113 + src/rocksdb/utilities/blob_db/blob_db_iterator.h | 150 ++ src/rocksdb/utilities/blob_db/blob_db_listener.h | 71 + src/rocksdb/utilities/blob_db/blob_db_test.cc | 2407 ++++++++++++++++++++ src/rocksdb/utilities/blob_db/blob_dump_tool.cc | 282 +++ src/rocksdb/utilities/blob_db/blob_dump_tool.h | 58 + src/rocksdb/utilities/blob_db/blob_file.cc | 318 +++ src/rocksdb/utilities/blob_db/blob_file.h | 246 ++ 15 files changed, 7455 insertions(+) create mode 100644 src/rocksdb/utilities/blob_db/blob_compaction_filter.cc create mode 100644 src/rocksdb/utilities/blob_db/blob_compaction_filter.h create mode 100644 src/rocksdb/utilities/blob_db/blob_db.cc create mode 100644 src/rocksdb/utilities/blob_db/blob_db.h create mode 100644 src/rocksdb/utilities/blob_db/blob_db_gc_stats.h create mode 100644 src/rocksdb/utilities/blob_db/blob_db_impl.cc create mode 100644 src/rocksdb/utilities/blob_db/blob_db_impl.h create mode 100644 src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc create mode 100644 src/rocksdb/utilities/blob_db/blob_db_iterator.h create mode 100644 src/rocksdb/utilities/blob_db/blob_db_listener.h create mode 100644 src/rocksdb/utilities/blob_db/blob_db_test.cc create mode 100644 src/rocksdb/utilities/blob_db/blob_dump_tool.cc create mode 100644 src/rocksdb/utilities/blob_db/blob_dump_tool.h create mode 100644 src/rocksdb/utilities/blob_db/blob_file.cc create mode 100644 src/rocksdb/utilities/blob_db/blob_file.h (limited to 'src/rocksdb/utilities/blob_db') diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc new file mode 100644 index 000000000..86907e979 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc @@ -0,0 +1,490 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_compaction_filter.h" + +#include + +#include "db/dbformat.h" +#include "logging/logging.h" +#include "rocksdb/system_clock.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +BlobIndexCompactionFilterBase::~BlobIndexCompactionFilterBase() { + if (blob_file_) { + CloseAndRegisterNewBlobFile(); + } + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); +} + +CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2( + int level, const Slice& key, ValueType value_type, const Slice& value, + std::string* new_value, std::string* skip_until) const { + const CompactionFilter* ucf = user_comp_filter(); + if (value_type != kBlobIndex) { + if (ucf == nullptr) { + return Decision::kKeep; + } + // Apply user compaction filter for inlined data. + CompactionFilter::Decision decision = + ucf->FilterV2(level, key, value_type, value, new_value, skip_until); + if (decision == Decision::kChangeValue) { + return HandleValueChange(key, new_value); + } + return decision; + } + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + // Unable to decode blob index. Keeping the value. + return Decision::kKeep; + } + if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { + // Expired + expired_count_++; + expired_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (!blob_index.IsInlined() && + blob_index.file_number() < context_.next_file_number && + context_.current_blob_files.count(blob_index.file_number()) == 0) { + // Corresponding blob file gone (most likely, evicted by FIFO eviction). + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && + blob_index.expiration() < context_.evict_expiration_up_to) { + // Hack: Internal key is passed to BlobIndexCompactionFilter for it to + // get sequence number. + ParsedInternalKey ikey; + if (!ParseInternalKey( + key, &ikey, + context_.blob_db_impl->db_options_.allow_data_in_errors) + .ok()) { + assert(false); + return Decision::kKeep; + } + // Remove keys that could have been remove by last FIFO eviction. + // If get error while parsing key, ignore and continue. + if (ikey.sequence < context_.fifo_eviction_seq) { + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + } + // Apply user compaction filter for all non-TTL blob data. + if (ucf != nullptr && !blob_index.HasTTL()) { + // Hack: Internal key is passed to BlobIndexCompactionFilter for it to + // get sequence number. + ParsedInternalKey ikey; + if (!ParseInternalKey( + key, &ikey, + context_.blob_db_impl->db_options_.allow_data_in_errors) + .ok()) { + assert(false); + return Decision::kKeep; + } + // Read value from blob file. + PinnableSlice blob; + CompressionType compression_type = kNoCompression; + constexpr bool need_decompress = true; + if (!ReadBlobFromOldFile(ikey.user_key, blob_index, &blob, need_decompress, + &compression_type)) { + return Decision::kIOError; + } + CompactionFilter::Decision decision = ucf->FilterV2( + level, ikey.user_key, kValue, blob, new_value, skip_until); + if (decision == Decision::kChangeValue) { + return HandleValueChange(ikey.user_key, new_value); + } + return decision; + } + return Decision::kKeep; +} + +CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange( + const Slice& key, std::string* new_value) const { + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + if (new_value->size() < blob_db_impl->bdb_options_.min_blob_size) { + // Keep new_value inlined. + return Decision::kChangeValue; + } + if (!OpenNewBlobFileIfNeeded()) { + return Decision::kIOError; + } + Slice new_blob_value(*new_value); + std::string compression_output; + if (blob_db_impl->bdb_options_.compression != kNoCompression) { + new_blob_value = + blob_db_impl->GetCompressedSlice(new_blob_value, &compression_output); + } + uint64_t new_blob_file_number = 0; + uint64_t new_blob_offset = 0; + if (!WriteBlobToNewFile(key, new_blob_value, &new_blob_file_number, + &new_blob_offset)) { + return Decision::kIOError; + } + if (!CloseAndRegisterNewBlobFileIfNeeded()) { + return Decision::kIOError; + } + BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, + new_blob_value.size(), + blob_db_impl->bdb_options_.compression); + return Decision::kChangeBlobIndex; +} + +BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() { + assert(context().blob_db_impl); + + ROCKS_LOG_INFO(context().blob_db_impl->db_options_.info_log, + "GC pass finished %s: encountered %" PRIu64 " blobs (%" PRIu64 + " bytes), relocated %" PRIu64 " blobs (%" PRIu64 + " bytes), created %" PRIu64 " new blob file(s)", + !gc_stats_.HasError() ? "successfully" : "with failure", + gc_stats_.AllBlobs(), gc_stats_.AllBytes(), + gc_stats_.RelocatedBlobs(), gc_stats_.RelocatedBytes(), + gc_stats_.NewFiles()); + + RecordTick(statistics(), BLOB_DB_GC_NUM_KEYS_RELOCATED, + gc_stats_.RelocatedBlobs()); + RecordTick(statistics(), BLOB_DB_GC_BYTES_RELOCATED, + gc_stats_.RelocatedBytes()); + RecordTick(statistics(), BLOB_DB_GC_NUM_NEW_FILES, gc_stats_.NewFiles()); + RecordTick(statistics(), BLOB_DB_GC_FAILURES, gc_stats_.HasError()); +} + +bool BlobIndexCompactionFilterBase::IsBlobFileOpened() const { + if (blob_file_) { + assert(writer_); + return true; + } + return false; +} + +bool BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded() const { + if (IsBlobFileOpened()) { + return true; + } + + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + const Status s = blob_db_impl->CreateBlobFileAndWriter( + /* has_ttl */ false, ExpirationRange(), "compaction/GC", &blob_file_, + &writer_); + if (!s.ok()) { + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Error opening new blob file during compaction/GC, status: %s", + s.ToString().c_str()); + blob_file_.reset(); + writer_.reset(); + return false; + } + + assert(blob_file_); + assert(writer_); + + return true; +} + +bool BlobIndexCompactionFilterBase::ReadBlobFromOldFile( + const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob, + bool need_decompress, CompressionType* compression_type) const { + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + Status s = blob_db_impl->GetRawBlobFromFile( + key, blob_index.file_number(), blob_index.offset(), blob_index.size(), + blob, compression_type); + + if (!s.ok()) { + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Error reading blob during compaction/GC, key: %s (%s), status: %s", + key.ToString(/* output_hex */ true).c_str(), + blob_index.DebugString(/* output_hex */ true).c_str(), + s.ToString().c_str()); + + return false; + } + + if (need_decompress && *compression_type != kNoCompression) { + s = blob_db_impl->DecompressSlice(*blob, *compression_type, blob); + if (!s.ok()) { + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Uncompression error during blob read from file: %" PRIu64 + " blob_offset: %" PRIu64 " blob_size: %" PRIu64 + " key: %s status: '%s'", + blob_index.file_number(), blob_index.offset(), blob_index.size(), + key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str()); + + return false; + } + } + + return true; +} + +bool BlobIndexCompactionFilterBase::WriteBlobToNewFile( + const Slice& key, const Slice& blob, uint64_t* new_blob_file_number, + uint64_t* new_blob_offset) const { + TEST_SYNC_POINT("BlobIndexCompactionFilterBase::WriteBlobToNewFile"); + assert(new_blob_file_number); + assert(new_blob_offset); + + assert(blob_file_); + *new_blob_file_number = blob_file_->BlobFileNumber(); + + assert(writer_); + uint64_t new_key_offset = 0; + const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset, + new_blob_offset); + + if (!s.ok()) { + const BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log, + "Error writing blob to new file %s during compaction/GC, " + "key: %s, status: %s", + blob_file_->PathName().c_str(), + key.ToString(/* output_hex */ true).c_str(), + s.ToString().c_str()); + return false; + } + + const uint64_t new_size = + BlobLogRecord::kHeaderSize + key.size() + blob.size(); + blob_file_->BlobRecordAdded(new_size); + + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + blob_db_impl->total_blob_size_ += new_size; + + return true; +} + +bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFileIfNeeded() + const { + const BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + assert(blob_file_); + if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) { + return true; + } + + return CloseAndRegisterNewBlobFile(); +} + +bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFile() const { + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + assert(blob_file_); + + Status s; + + { + WriteLock wl(&blob_db_impl->mutex_); + + s = blob_db_impl->CloseBlobFile(blob_file_); + + // Note: we delay registering the new blob file until it's closed to + // prevent FIFO eviction from processing it during compaction/GC. + blob_db_impl->RegisterBlobFile(blob_file_); + } + + assert(blob_file_->Immutable()); + + if (!s.ok()) { + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Error closing new blob file %s during compaction/GC, status: %s", + blob_file_->PathName().c_str(), s.ToString().c_str()); + } + + blob_file_.reset(); + return s.ok(); +} + +CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput( + const Slice& key, const Slice& existing_value, + std::string* new_value) const { + assert(new_value); + + const BlobDBImpl* const blob_db_impl = context().blob_db_impl; + (void)blob_db_impl; + + assert(blob_db_impl); + assert(blob_db_impl->bdb_options_.enable_garbage_collection); + + BlobIndex blob_index; + const Status s = blob_index.DecodeFrom(existing_value); + if (!s.ok()) { + gc_stats_.SetError(); + return BlobDecision::kCorruption; + } + + if (blob_index.IsInlined()) { + gc_stats_.AddBlob(blob_index.value().size()); + + return BlobDecision::kKeep; + } + + gc_stats_.AddBlob(blob_index.size()); + + if (blob_index.HasTTL()) { + return BlobDecision::kKeep; + } + + if (blob_index.file_number() >= context_gc_.cutoff_file_number) { + return BlobDecision::kKeep; + } + + // Note: each compaction generates its own blob files, which, depending on the + // workload, might result in many small blob files. The total number of files + // is bounded though (determined by the number of compactions and the blob + // file size option). + if (!OpenNewBlobFileIfNeeded()) { + gc_stats_.SetError(); + return BlobDecision::kIOError; + } + + PinnableSlice blob; + CompressionType compression_type = kNoCompression; + std::string compression_output; + if (!ReadBlobFromOldFile(key, blob_index, &blob, false, &compression_type)) { + gc_stats_.SetError(); + return BlobDecision::kIOError; + } + + // If the compression_type is changed, re-compress it with the new compression + // type. + if (compression_type != blob_db_impl->bdb_options_.compression) { + if (compression_type != kNoCompression) { + const Status status = + blob_db_impl->DecompressSlice(blob, compression_type, &blob); + if (!status.ok()) { + gc_stats_.SetError(); + return BlobDecision::kCorruption; + } + } + if (blob_db_impl->bdb_options_.compression != kNoCompression) { + blob_db_impl->GetCompressedSlice(blob, &compression_output); + blob = PinnableSlice(&compression_output); + blob.PinSelf(); + } + } + + uint64_t new_blob_file_number = 0; + uint64_t new_blob_offset = 0; + if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) { + gc_stats_.SetError(); + return BlobDecision::kIOError; + } + + if (!CloseAndRegisterNewBlobFileIfNeeded()) { + gc_stats_.SetError(); + return BlobDecision::kIOError; + } + + BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, + blob.size(), compression_type); + + gc_stats_.AddRelocatedBlob(blob_index.size()); + + return BlobDecision::kChangeValue; +} + +bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const { + if (IsBlobFileOpened()) { + return true; + } + bool result = BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded(); + if (result) { + gc_stats_.AddNewFile(); + } + return result; +} + +std::unique_ptr +BlobIndexCompactionFilterFactoryBase::CreateUserCompactionFilterFromFactory( + const CompactionFilter::Context& context) const { + std::unique_ptr user_comp_filter_from_factory; + if (user_comp_filter_factory_) { + user_comp_filter_from_factory = + user_comp_filter_factory_->CreateCompactionFilter(context); + } + return user_comp_filter_from_factory; +} + +std::unique_ptr +BlobIndexCompactionFilterFactory::CreateCompactionFilter( + const CompactionFilter::Context& _context) { + assert(clock()); + + int64_t current_time = 0; + Status s = clock()->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + + assert(blob_db_impl()); + + BlobCompactionContext context; + blob_db_impl()->GetCompactionContext(&context); + + std::unique_ptr user_comp_filter_from_factory = + CreateUserCompactionFilterFromFactory(_context); + + return std::unique_ptr(new BlobIndexCompactionFilter( + std::move(context), user_comp_filter(), + std::move(user_comp_filter_from_factory), current_time, statistics())); +} + +std::unique_ptr +BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter( + const CompactionFilter::Context& _context) { + assert(clock()); + + int64_t current_time = 0; + Status s = clock()->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + + assert(blob_db_impl()); + + BlobCompactionContext context; + BlobCompactionContextGC context_gc; + blob_db_impl()->GetCompactionContext(&context, &context_gc); + + std::unique_ptr user_comp_filter_from_factory = + CreateUserCompactionFilterFromFactory(_context); + + return std::unique_ptr(new BlobIndexCompactionFilterGC( + std::move(context), std::move(context_gc), user_comp_filter(), + std::move(user_comp_filter_from_factory), current_time, statistics())); +} + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.h b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h new file mode 100644 index 000000000..1493cfc1a --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h @@ -0,0 +1,204 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include + +#include "db/blob/blob_index.h" +#include "monitoring/statistics.h" +#include "rocksdb/compaction_filter.h" +#include "utilities/blob_db/blob_db_gc_stats.h" +#include "utilities/blob_db/blob_db_impl.h" +#include "utilities/compaction_filters/layered_compaction_filter_base.h" + +namespace ROCKSDB_NAMESPACE { +class SystemClock; +namespace blob_db { + +struct BlobCompactionContext { + BlobDBImpl* blob_db_impl = nullptr; + uint64_t next_file_number = 0; + std::unordered_set current_blob_files; + SequenceNumber fifo_eviction_seq = 0; + uint64_t evict_expiration_up_to = 0; +}; + +struct BlobCompactionContextGC { + uint64_t cutoff_file_number = 0; +}; + +// Compaction filter that deletes expired blob indexes from the base DB. +// Comes into two varieties, one for the non-GC case and one for the GC case. +class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase { + public: + BlobIndexCompactionFilterBase( + BlobCompactionContext&& _context, + const CompactionFilter* _user_comp_filter, + std::unique_ptr _user_comp_filter_from_factory, + uint64_t current_time, Statistics* stats) + : LayeredCompactionFilterBase(_user_comp_filter, + std::move(_user_comp_filter_from_factory)), + context_(std::move(_context)), + current_time_(current_time), + statistics_(stats) {} + + ~BlobIndexCompactionFilterBase() override; + + // Filter expired blob indexes regardless of snapshots. + bool IgnoreSnapshots() const override { return true; } + + Decision FilterV2(int level, const Slice& key, ValueType value_type, + const Slice& value, std::string* new_value, + std::string* skip_until) const override; + + bool IsStackedBlobDbInternalCompactionFilter() const override { return true; } + + protected: + bool IsBlobFileOpened() const; + virtual bool OpenNewBlobFileIfNeeded() const; + bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index, + PinnableSlice* blob, bool need_decompress, + CompressionType* compression_type) const; + bool WriteBlobToNewFile(const Slice& key, const Slice& blob, + uint64_t* new_blob_file_number, + uint64_t* new_blob_offset) const; + bool CloseAndRegisterNewBlobFileIfNeeded() const; + bool CloseAndRegisterNewBlobFile() const; + + Statistics* statistics() const { return statistics_; } + const BlobCompactionContext& context() const { return context_; } + + private: + Decision HandleValueChange(const Slice& key, std::string* new_value) const; + + private: + BlobCompactionContext context_; + const uint64_t current_time_; + Statistics* statistics_; + + mutable std::shared_ptr blob_file_; + mutable std::shared_ptr writer_; + + // It is safe to not using std::atomic since the compaction filter, created + // from a compaction filter factroy, will not be called from multiple threads. + mutable uint64_t expired_count_ = 0; + mutable uint64_t expired_size_ = 0; + mutable uint64_t evicted_count_ = 0; + mutable uint64_t evicted_size_ = 0; +}; + +class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase { + public: + BlobIndexCompactionFilter( + BlobCompactionContext&& _context, + const CompactionFilter* _user_comp_filter, + std::unique_ptr _user_comp_filter_from_factory, + uint64_t current_time, Statistics* stats) + : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter, + std::move(_user_comp_filter_from_factory), + current_time, stats) {} + + const char* Name() const override { return "BlobIndexCompactionFilter"; } +}; + +class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase { + public: + BlobIndexCompactionFilterGC( + BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc, + const CompactionFilter* _user_comp_filter, + std::unique_ptr _user_comp_filter_from_factory, + uint64_t current_time, Statistics* stats) + : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter, + std::move(_user_comp_filter_from_factory), + current_time, stats), + context_gc_(std::move(context_gc)) {} + + ~BlobIndexCompactionFilterGC() override; + + const char* Name() const override { return "BlobIndexCompactionFilterGC"; } + + BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value, + std::string* new_value) const override; + + private: + bool OpenNewBlobFileIfNeeded() const override; + + private: + BlobCompactionContextGC context_gc_; + mutable BlobDBGarbageCollectionStats gc_stats_; +}; + +// Compaction filter factory; similarly to the filters above, it comes +// in two flavors, one that creates filters that support GC, and one +// that creates non-GC filters. +class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory { + public: + BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, + SystemClock* _clock, + const ColumnFamilyOptions& _cf_options, + Statistics* _statistics) + : blob_db_impl_(_blob_db_impl), + clock_(_clock), + statistics_(_statistics), + user_comp_filter_(_cf_options.compaction_filter), + user_comp_filter_factory_(_cf_options.compaction_filter_factory) {} + + protected: + std::unique_ptr CreateUserCompactionFilterFromFactory( + const CompactionFilter::Context& context) const; + + BlobDBImpl* blob_db_impl() const { return blob_db_impl_; } + SystemClock* clock() const { return clock_; } + Statistics* statistics() const { return statistics_; } + const CompactionFilter* user_comp_filter() const { return user_comp_filter_; } + + private: + BlobDBImpl* blob_db_impl_; + SystemClock* clock_; + Statistics* statistics_; + const CompactionFilter* user_comp_filter_; + std::shared_ptr user_comp_filter_factory_; +}; + +class BlobIndexCompactionFilterFactory + : public BlobIndexCompactionFilterFactoryBase { + public: + BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, + SystemClock* _clock, + const ColumnFamilyOptions& _cf_options, + Statistics* _statistics) + : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options, + _statistics) {} + + const char* Name() const override { + return "BlobIndexCompactionFilterFactory"; + } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override; +}; + +class BlobIndexCompactionFilterFactoryGC + : public BlobIndexCompactionFilterFactoryBase { + public: + BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, + SystemClock* _clock, + const ColumnFamilyOptions& _cf_options, + Statistics* _statistics) + : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options, + _statistics) {} + + const char* Name() const override { + return "BlobIndexCompactionFilterFactoryGC"; + } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override; +}; + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db.cc b/src/rocksdb/utilities/blob_db/blob_db.cc new file mode 100644 index 000000000..cbd02e68e --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db.cc @@ -0,0 +1,114 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_db.h" + +#include + +#include "logging/logging.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options, + const std::string& dbname, BlobDB** blob_db) { + *blob_db = nullptr; + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + std::vector handles; + Status s = BlobDB::Open(db_options, bdb_options, dbname, column_families, + &handles, blob_db); + if (s.ok()) { + assert(handles.size() == 1); + // i can delete the handle since DBImpl is always holding a reference to + // default column family + delete handles[0]; + } + return s; +} + +Status BlobDB::Open(const DBOptions& db_options, + const BlobDBOptions& bdb_options, const std::string& dbname, + const std::vector& column_families, + std::vector* handles, + BlobDB** blob_db) { + assert(handles); + + if (column_families.size() != 1 || + column_families[0].name != kDefaultColumnFamilyName) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + + BlobDBImpl* blob_db_impl = new BlobDBImpl(dbname, bdb_options, db_options, + column_families[0].options); + Status s = blob_db_impl->Open(handles); + if (s.ok()) { + *blob_db = static_cast(blob_db_impl); + } else { + if (!handles->empty()) { + for (ColumnFamilyHandle* cfh : *handles) { + blob_db_impl->DestroyColumnFamilyHandle(cfh); + } + + handles->clear(); + } + + delete blob_db_impl; + *blob_db = nullptr; + } + return s; +} + +BlobDB::BlobDB() : StackableDB(nullptr) {} + +void BlobDBOptions::Dump(Logger* log) const { + ROCKS_LOG_HEADER( + log, " BlobDBOptions.blob_dir: %s", + blob_dir.c_str()); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.path_relative: %d", + path_relative); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.is_fifo: %d", + is_fifo); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.max_db_size: %" PRIu64, + max_db_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.ttl_range_secs: %" PRIu64, + ttl_range_secs); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.min_blob_size: %" PRIu64, + min_blob_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.bytes_per_sync: %" PRIu64, + bytes_per_sync); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.blob_file_size: %" PRIu64, + blob_file_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.compression: %d", + static_cast(compression)); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.enable_garbage_collection: %d", + enable_garbage_collection); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.garbage_collection_cutoff: %f", + garbage_collection_cutoff); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.disable_background_tasks: %d", + disable_background_tasks); +} + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif diff --git a/src/rocksdb/utilities/blob_db/blob_db.h b/src/rocksdb/utilities/blob_db/blob_db.h new file mode 100644 index 000000000..e9d92486f --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db.h @@ -0,0 +1,266 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/status.h" +#include "rocksdb/utilities/stackable_db.h" + +namespace ROCKSDB_NAMESPACE { + +namespace blob_db { + +// A wrapped database which puts values of KV pairs in a separate log +// and store location to the log in the underlying DB. +// +// The factory needs to be moved to include/rocksdb/utilities to allow +// users to use blob DB. + +constexpr uint64_t kNoExpiration = std::numeric_limits::max(); + +struct BlobDBOptions { + // Name of the directory under the base DB where blobs will be stored. Using + // a directory where the base DB stores its SST files is not supported. + // Default is "blob_dir" + std::string blob_dir = "blob_dir"; + + // whether the blob_dir path is relative or absolute. + bool path_relative = true; + + // When max_db_size is reached, evict blob files to free up space + // instead of returnning NoSpace error on write. Blob files will be + // evicted from oldest to newest, based on file creation time. + bool is_fifo = false; + + // Maximum size of the database (including SST files and blob files). + // + // Default: 0 (no limits) + uint64_t max_db_size = 0; + + // a new bucket is opened, for ttl_range. So if ttl_range is 600seconds + // (10 minutes), and the first bucket starts at 1471542000 + // then the blob buckets will be + // first bucket is 1471542000 - 1471542600 + // second bucket is 1471542600 - 1471543200 + // and so on + uint64_t ttl_range_secs = 3600; + + // The smallest value to store in blob log. Values smaller than this threshold + // will be inlined in base DB together with the key. + uint64_t min_blob_size = 0; + + // Allows OS to incrementally sync blob files to disk for every + // bytes_per_sync bytes written. Users shouldn't rely on it for + // persistency guarantee. + uint64_t bytes_per_sync = 512 * 1024; + + // the target size of each blob file. File will become immutable + // after it exceeds that size + uint64_t blob_file_size = 256 * 1024 * 1024; + + // what compression to use for Blob's + CompressionType compression = kNoCompression; + + // If enabled, BlobDB cleans up stale blobs in non-TTL files during compaction + // by rewriting the remaining live blobs to new files. + bool enable_garbage_collection = false; + + // The cutoff in terms of blob file age for garbage collection. Blobs in + // the oldest N non-TTL blob files will be rewritten when encountered during + // compaction, where N = garbage_collection_cutoff * number_of_non_TTL_files. + double garbage_collection_cutoff = 0.25; + + // Disable all background job. Used for test only. + bool disable_background_tasks = false; + + void Dump(Logger* log) const; +}; + +class BlobDB : public StackableDB { + public: + using ROCKSDB_NAMESPACE::StackableDB::Put; + virtual Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override = 0; + virtual Status Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override { + if (column_family->GetID() != DefaultColumnFamily()->GetID()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return Put(options, key, value); + } + + using ROCKSDB_NAMESPACE::StackableDB::Delete; + virtual Status Delete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) override { + if (column_family->GetID() != DefaultColumnFamily()->GetID()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + assert(db_ != nullptr); + return db_->Delete(options, column_family, key); + } + + virtual Status PutWithTTL(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) = 0; + virtual Status PutWithTTL(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, uint64_t ttl) { + if (column_family->GetID() != DefaultColumnFamily()->GetID()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return PutWithTTL(options, key, value, ttl); + } + + // Put with expiration. Key with expiration time equal to + // std::numeric_limits::max() means the key don't expire. + virtual Status PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) = 0; + virtual Status PutUntil(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, uint64_t expiration) { + if (column_family->GetID() != DefaultColumnFamily()->GetID()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return PutUntil(options, key, value, expiration); + } + + using ROCKSDB_NAMESPACE::StackableDB::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override = 0; + + // Get value and expiration. + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration) = 0; + virtual Status Get(const ReadOptions& options, const Slice& key, + PinnableSlice* value, uint64_t* expiration) { + return Get(options, DefaultColumnFamily(), key, value, expiration); + } + + using ROCKSDB_NAMESPACE::StackableDB::MultiGet; + virtual std::vector MultiGet( + const ReadOptions& options, const std::vector& keys, + std::vector* values) override = 0; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_families, + const std::vector& keys, + std::vector* values) override { + for (auto column_family : column_families) { + if (column_family->GetID() != DefaultColumnFamily()->GetID()) { + return std::vector( + column_families.size(), + Status::NotSupported( + "Blob DB doesn't support non-default column family.")); + } + } + return MultiGet(options, keys, values); + } + virtual void MultiGet(const ReadOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const size_t num_keys, const Slice* /*keys*/, + PinnableSlice* /*values*/, Status* statuses, + const bool /*sorted_input*/ = false) override { + for (size_t i = 0; i < num_keys; ++i) { + statuses[i] = + Status::NotSupported("Blob DB doesn't support batched MultiGet"); + } + } + + using ROCKSDB_NAMESPACE::StackableDB::SingleDelete; + virtual Status SingleDelete(const WriteOptions& /*wopts*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } + + using ROCKSDB_NAMESPACE::StackableDB::Merge; + virtual Status Merge(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/, const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } + + virtual Status Write(const WriteOptions& opts, + WriteBatch* updates) override = 0; + + using ROCKSDB_NAMESPACE::StackableDB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options) override = 0; + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override { + if (column_family->GetID() != DefaultColumnFamily()->GetID()) { + // Blob DB doesn't support non-default column family. + return nullptr; + } + return NewIterator(options); + } + + Status CompactFiles( + const CompactionOptions& compact_options, + const std::vector& input_file_names, const int output_level, + const int output_path_id = -1, + std::vector* const output_file_names = nullptr, + CompactionJobInfo* compaction_job_info = nullptr) override = 0; + Status CompactFiles( + const CompactionOptions& compact_options, + ColumnFamilyHandle* column_family, + const std::vector& input_file_names, const int output_level, + const int output_path_id = -1, + std::vector* const output_file_names = nullptr, + CompactionJobInfo* compaction_job_info = nullptr) override { + if (column_family->GetID() != DefaultColumnFamily()->GetID()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + + return CompactFiles(compact_options, input_file_names, output_level, + output_path_id, output_file_names, compaction_job_info); + } + + using ROCKSDB_NAMESPACE::StackableDB::Close; + virtual Status Close() override = 0; + + // Opening blob db. + static Status Open(const Options& options, const BlobDBOptions& bdb_options, + const std::string& dbname, BlobDB** blob_db); + + static Status Open(const DBOptions& db_options, + const BlobDBOptions& bdb_options, + const std::string& dbname, + const std::vector& column_families, + std::vector* handles, + BlobDB** blob_db); + + virtual BlobDBOptions GetBlobDBOptions() const = 0; + + virtual Status SyncBlobFiles() = 0; + + virtual ~BlobDB() {} + + protected: + explicit BlobDB(); +}; + +// Destroy the content of the database. +Status DestroyBlobDB(const std::string& dbname, const Options& options, + const BlobDBOptions& bdb_options); + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_gc_stats.h b/src/rocksdb/utilities/blob_db/blob_db_gc_stats.h new file mode 100644 index 000000000..fea6b0032 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_gc_stats.h @@ -0,0 +1,56 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#include + +#include "rocksdb/rocksdb_namespace.h" + +#ifndef ROCKSDB_LITE + +namespace ROCKSDB_NAMESPACE { + +namespace blob_db { + +/** + * Statistics related to a single garbage collection pass (i.e. a single + * (sub)compaction). + */ +class BlobDBGarbageCollectionStats { + public: + uint64_t AllBlobs() const { return all_blobs_; } + uint64_t AllBytes() const { return all_bytes_; } + uint64_t RelocatedBlobs() const { return relocated_blobs_; } + uint64_t RelocatedBytes() const { return relocated_bytes_; } + uint64_t NewFiles() const { return new_files_; } + bool HasError() const { return error_; } + + void AddBlob(uint64_t size) { + ++all_blobs_; + all_bytes_ += size; + } + + void AddRelocatedBlob(uint64_t size) { + ++relocated_blobs_; + relocated_bytes_ += size; + } + + void AddNewFile() { ++new_files_; } + + void SetError() { error_ = true; } + + private: + uint64_t all_blobs_ = 0; + uint64_t all_bytes_ = 0; + uint64_t relocated_blobs_ = 0; + uint64_t relocated_bytes_ = 0; + uint64_t new_files_ = 0; + bool error_ = false; +}; + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.cc b/src/rocksdb/utilities/blob_db/blob_db_impl.cc new file mode 100644 index 000000000..87e294c5c --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_impl.cc @@ -0,0 +1,2177 @@ + +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_db_impl.h" + +#include +#include +#include +#include +#include + +#include "db/blob/blob_index.h" +#include "db/db_impl/db_impl.h" +#include "db/write_batch_internal.h" +#include "file/file_util.h" +#include "file/filename.h" +#include "file/random_access_file_reader.h" +#include "file/sst_file_manager_impl.h" +#include "file/writable_file_writer.h" +#include "logging/logging.h" +#include "monitoring/instrumented_mutex.h" +#include "monitoring/statistics.h" +#include "rocksdb/convenience.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/utilities/stackable_db.h" +#include "rocksdb/utilities/transaction.h" +#include "table/block_based/block.h" +#include "table/block_based/block_based_table_builder.h" +#include "table/block_based/block_builder.h" +#include "table/meta_blocks.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" +#include "util/crc32c.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/stop_watch.h" +#include "util/timer_queue.h" +#include "utilities/blob_db/blob_compaction_filter.h" +#include "utilities/blob_db/blob_db_iterator.h" +#include "utilities/blob_db/blob_db_listener.h" + +namespace { +int kBlockBasedTableVersionFormat = 2; +} // end namespace + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +bool BlobFileComparator::operator()( + const std::shared_ptr& lhs, + const std::shared_ptr& rhs) const { + return lhs->BlobFileNumber() > rhs->BlobFileNumber(); +} + +bool BlobFileComparatorTTL::operator()( + const std::shared_ptr& lhs, + const std::shared_ptr& rhs) const { + assert(lhs->HasTTL() && rhs->HasTTL()); + if (lhs->expiration_range_.first < rhs->expiration_range_.first) { + return true; + } + if (lhs->expiration_range_.first > rhs->expiration_range_.first) { + return false; + } + return lhs->BlobFileNumber() < rhs->BlobFileNumber(); +} + +BlobDBImpl::BlobDBImpl(const std::string& dbname, + const BlobDBOptions& blob_db_options, + const DBOptions& db_options, + const ColumnFamilyOptions& cf_options) + : BlobDB(), + dbname_(dbname), + db_impl_(nullptr), + env_(db_options.env), + bdb_options_(blob_db_options), + db_options_(db_options), + cf_options_(cf_options), + file_options_(db_options), + statistics_(db_options_.statistics.get()), + next_file_number_(1), + flush_sequence_(0), + closed_(true), + open_file_count_(0), + total_blob_size_(0), + live_sst_size_(0), + fifo_eviction_seq_(0), + evict_expiration_up_to_(0), + debug_level_(0) { + clock_ = env_->GetSystemClock().get(); + blob_dir_ = (bdb_options_.path_relative) + ? dbname + "/" + bdb_options_.blob_dir + : bdb_options_.blob_dir; + file_options_.bytes_per_sync = blob_db_options.bytes_per_sync; +} + +BlobDBImpl::~BlobDBImpl() { + tqueue_.shutdown(); + // CancelAllBackgroundWork(db_, true); + Status s __attribute__((__unused__)) = Close(); + assert(s.ok()); +} + +Status BlobDBImpl::Close() { + if (closed_) { + return Status::OK(); + } + closed_ = true; + + // Close base DB before BlobDBImpl destructs to stop event listener and + // compaction filter call. + Status s = db_->Close(); + // delete db_ anyway even if close failed. + delete db_; + // Reset pointers to avoid StackableDB delete the pointer again. + db_ = nullptr; + db_impl_ = nullptr; + if (!s.ok()) { + return s; + } + + s = SyncBlobFiles(); + return s; +} + +BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } + +Status BlobDBImpl::Open(std::vector* handles) { + assert(handles != nullptr); + assert(db_ == nullptr); + + if (blob_dir_.empty()) { + return Status::NotSupported("No blob directory in options"); + } + + if (bdb_options_.garbage_collection_cutoff < 0.0 || + bdb_options_.garbage_collection_cutoff > 1.0) { + return Status::InvalidArgument( + "Garbage collection cutoff must be in the interval [0.0, 1.0]"); + } + + // Temporarily disable compactions in the base DB during open; save the user + // defined value beforehand so we can restore it once BlobDB is initialized. + // Note: this is only needed if garbage collection is enabled. + const bool disable_auto_compactions = cf_options_.disable_auto_compactions; + + if (bdb_options_.enable_garbage_collection) { + cf_options_.disable_auto_compactions = true; + } + + Status s; + + // Create info log. + if (db_options_.info_log == nullptr) { + s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log); + if (!s.ok()) { + return s; + } + } + + ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB..."); + + if ((cf_options_.compaction_filter != nullptr || + cf_options_.compaction_filter_factory != nullptr)) { + ROCKS_LOG_INFO(db_options_.info_log, + "BlobDB only support compaction filter on non-TTL values."); + } + + // Open blob directory. + s = env_->CreateDirIfMissing(blob_dir_); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to create blob_dir %s, status: %s", + blob_dir_.c_str(), s.ToString().c_str()); + } + s = env_->GetFileSystem()->NewDirectory(blob_dir_, IOOptions(), &dir_ent_, + nullptr); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(), + s.ToString().c_str()); + return s; + } + + // Open blob files. + s = OpenAllBlobFiles(); + if (!s.ok()) { + return s; + } + + // Update options + if (bdb_options_.enable_garbage_collection) { + db_options_.listeners.push_back(std::make_shared(this)); + cf_options_.compaction_filter_factory = + std::make_shared( + this, clock_, cf_options_, statistics_); + } else { + db_options_.listeners.push_back(std::make_shared(this)); + cf_options_.compaction_filter_factory = + std::make_shared( + this, clock_, cf_options_, statistics_); + } + + // Reset user compaction filter after building into compaction factory. + cf_options_.compaction_filter = nullptr; + + // Open base db. + ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_); + s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_); + if (!s.ok()) { + return s; + } + db_impl_ = static_cast_with_check(db_->GetRootDB()); + + // Sanitize the blob_dir provided. Using a directory where the + // base DB stores its files for the default CF is not supported. + const ColumnFamilyData* const cfd = + static_cast(DefaultColumnFamily())->cfd(); + assert(cfd); + + const ImmutableCFOptions* const ioptions = cfd->ioptions(); + assert(ioptions); + + assert(env_); + + for (const auto& cf_path : ioptions->cf_paths) { + bool blob_dir_same_as_cf_dir = false; + s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while sanitizing blob_dir %s, status: %s", + blob_dir_.c_str(), s.ToString().c_str()); + return s; + } + + if (blob_dir_same_as_cf_dir) { + return Status::NotSupported( + "Using the base DB's storage directories for BlobDB files is not " + "supported."); + } + } + + // Initialize SST file <-> oldest blob file mapping if garbage collection + // is enabled. + if (bdb_options_.enable_garbage_collection) { + std::vector live_files; + db_->GetLiveFilesMetaData(&live_files); + + InitializeBlobFileToSstMapping(live_files); + + MarkUnreferencedBlobFilesObsoleteDuringOpen(); + + if (!disable_auto_compactions) { + s = db_->EnableAutoCompaction(*handles); + if (!s.ok()) { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to enable automatic compactions during open, status: %s", + s.ToString().c_str()); + return s; + } + } + } + + // Add trash files in blob dir to file delete scheduler. + SstFileManagerImpl* sfm = static_cast( + db_impl_->immutable_db_options().sst_file_manager.get()); + DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_); + + UpdateLiveSSTSize(); + + // Start background jobs. + if (!bdb_options_.disable_background_tasks) { + StartBackgroundTasks(); + } + + ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this); + bdb_options_.Dump(db_options_.info_log.get()); + closed_ = false; + return s; +} + +void BlobDBImpl::StartBackgroundTasks() { + // store a call to a member function and object + tqueue_.add( + kReclaimOpenFilesPeriodMillisecs, + std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1)); + tqueue_.add( + kDeleteObsoleteFilesPeriodMillisecs, + std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1)); + tqueue_.add(kSanityCheckPeriodMillisecs, + std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1)); + tqueue_.add( + kEvictExpiredFilesPeriodMillisecs, + std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1)); +} + +Status BlobDBImpl::GetAllBlobFiles(std::set* file_numbers) { + assert(file_numbers != nullptr); + std::vector all_files; + Status s = env_->GetChildren(blob_dir_, &all_files); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to get list of blob files, status: %s", + s.ToString().c_str()); + return s; + } + + for (const auto& file_name : all_files) { + uint64_t file_number; + FileType type; + bool success = ParseFileName(file_name, &file_number, &type); + if (success && type == kBlobFile) { + file_numbers->insert(file_number); + } else { + ROCKS_LOG_WARN(db_options_.info_log, + "Skipping file in blob directory: %s", file_name.c_str()); + } + } + + return s; +} + +Status BlobDBImpl::OpenAllBlobFiles() { + std::set file_numbers; + Status s = GetAllBlobFiles(&file_numbers); + if (!s.ok()) { + return s; + } + + if (!file_numbers.empty()) { + next_file_number_.store(*file_numbers.rbegin() + 1); + } + + std::ostringstream blob_file_oss; + std::ostringstream live_imm_oss; + std::ostringstream obsolete_file_oss; + + for (auto& file_number : file_numbers) { + std::shared_ptr blob_file = std::make_shared( + this, blob_dir_, file_number, db_options_.info_log.get()); + blob_file->MarkImmutable(/* sequence */ 0); + + // Read file header and footer + Status read_metadata_status = + blob_file->ReadMetadata(env_->GetFileSystem(), file_options_); + if (read_metadata_status.IsCorruption()) { + // Remove incomplete file. + if (!obsolete_files_.empty()) { + obsolete_file_oss << ", "; + } + obsolete_file_oss << file_number; + + ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/); + continue; + } else if (!read_metadata_status.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Unable to read metadata of blob file %" PRIu64 + ", status: '%s'", + file_number, read_metadata_status.ToString().c_str()); + return read_metadata_status; + } + + total_blob_size_ += blob_file->GetFileSize(); + + if (!blob_files_.empty()) { + blob_file_oss << ", "; + } + blob_file_oss << file_number; + + blob_files_[file_number] = blob_file; + + if (!blob_file->HasTTL()) { + if (!live_imm_non_ttl_blob_files_.empty()) { + live_imm_oss << ", "; + } + live_imm_oss << file_number; + + live_imm_non_ttl_blob_files_[file_number] = blob_file; + } + } + + ROCKS_LOG_INFO(db_options_.info_log, + "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(), + blob_file_oss.str().c_str()); + ROCKS_LOG_INFO( + db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s", + live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str()); + ROCKS_LOG_INFO(db_options_.info_log, + "Found %" ROCKSDB_PRIszt + " incomplete or corrupted blob files: %s", + obsolete_files_.size(), obsolete_file_oss.str().c_str()); + return s; +} + +template +void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number, + uint64_t blob_file_number, + Linker linker) { + assert(bdb_options_.enable_garbage_collection); + assert(blob_file_number != kInvalidBlobFileNumber); + + auto it = blob_files_.find(blob_file_number); + if (it == blob_files_.end()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Blob file %" PRIu64 + " not found while trying to link " + "SST file %" PRIu64, + blob_file_number, sst_file_number); + return; + } + + BlobFile* const blob_file = it->second.get(); + assert(blob_file); + + linker(blob_file, sst_file_number); + + ROCKS_LOG_INFO(db_options_.info_log, + "Blob file %" PRIu64 " linked to SST file %" PRIu64, + blob_file_number, sst_file_number); +} + +void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number, + uint64_t blob_file_number) { + auto linker = [](BlobFile* blob_file, uint64_t sst_file) { + WriteLock file_lock(&blob_file->mutex_); + blob_file->LinkSstFile(sst_file); + }; + + LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker); +} + +void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number, + uint64_t blob_file_number) { + auto linker = [](BlobFile* blob_file, uint64_t sst_file) { + blob_file->LinkSstFile(sst_file); + }; + + LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker); +} + +void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number, + uint64_t blob_file_number) { + assert(bdb_options_.enable_garbage_collection); + assert(blob_file_number != kInvalidBlobFileNumber); + + auto it = blob_files_.find(blob_file_number); + if (it == blob_files_.end()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Blob file %" PRIu64 + " not found while trying to unlink " + "SST file %" PRIu64, + blob_file_number, sst_file_number); + return; + } + + BlobFile* const blob_file = it->second.get(); + assert(blob_file); + + { + WriteLock file_lock(&blob_file->mutex_); + blob_file->UnlinkSstFile(sst_file_number); + } + + ROCKS_LOG_INFO(db_options_.info_log, + "Blob file %" PRIu64 " unlinked from SST file %" PRIu64, + blob_file_number, sst_file_number); +} + +void BlobDBImpl::InitializeBlobFileToSstMapping( + const std::vector& live_files) { + assert(bdb_options_.enable_garbage_collection); + + for (const auto& live_file : live_files) { + const uint64_t sst_file_number = live_file.file_number; + const uint64_t blob_file_number = live_file.oldest_blob_file_number; + + if (blob_file_number == kInvalidBlobFileNumber) { + continue; + } + + LinkSstToBlobFileNoLock(sst_file_number, blob_file_number); + } +} + +void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) { + assert(bdb_options_.enable_garbage_collection); + + WriteLock lock(&mutex_); + + if (info.oldest_blob_file_number != kInvalidBlobFileNumber) { + LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number); + } + + assert(flush_sequence_ < info.largest_seqno); + flush_sequence_ = info.largest_seqno; + + MarkUnreferencedBlobFilesObsolete(); +} + +void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) { + assert(bdb_options_.enable_garbage_collection); + + if (!info.status.ok()) { + return; + } + + // Note: the same SST file may appear in both the input and the output + // file list in case of a trivial move. We walk through the two lists + // below in a fashion that's similar to merge sort to detect this. + + auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) { + return lhs.file_number < rhs.file_number; + }; + + auto inputs = info.input_file_infos; + auto iit = inputs.begin(); + const auto iit_end = inputs.end(); + + std::sort(iit, iit_end, cmp); + + auto outputs = info.output_file_infos; + auto oit = outputs.begin(); + const auto oit_end = outputs.end(); + + std::sort(oit, oit_end, cmp); + + WriteLock lock(&mutex_); + + while (iit != iit_end && oit != oit_end) { + const auto& input = *iit; + const auto& output = *oit; + + if (input.file_number == output.file_number) { + ++iit; + ++oit; + } else if (input.file_number < output.file_number) { + if (input.oldest_blob_file_number != kInvalidBlobFileNumber) { + UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number); + } + + ++iit; + } else { + assert(output.file_number < input.file_number); + + if (output.oldest_blob_file_number != kInvalidBlobFileNumber) { + LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number); + } + + ++oit; + } + } + + while (iit != iit_end) { + const auto& input = *iit; + + if (input.oldest_blob_file_number != kInvalidBlobFileNumber) { + UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number); + } + + ++iit; + } + + while (oit != oit_end) { + const auto& output = *oit; + + if (output.oldest_blob_file_number != kInvalidBlobFileNumber) { + LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number); + } + + ++oit; + } + + MarkUnreferencedBlobFilesObsolete(); +} + +bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded( + const std::shared_ptr& blob_file, SequenceNumber obsolete_seq) { + assert(blob_file); + assert(!blob_file->HasTTL()); + assert(blob_file->Immutable()); + assert(bdb_options_.enable_garbage_collection); + + // Note: FIFO eviction could have marked this file obsolete already. + if (blob_file->Obsolete()) { + return true; + } + + // We cannot mark this file (or any higher-numbered files for that matter) + // obsolete if it is referenced by any memtables or SSTs. We keep track of + // the SSTs explicitly. To account for memtables, we keep track of the highest + // sequence number received in flush notifications, and we do not mark the + // blob file obsolete if there are still unflushed memtables from before + // the time the blob file was closed. + if (blob_file->GetImmutableSequence() > flush_sequence_ || + !blob_file->GetLinkedSstFiles().empty()) { + return false; + } + + ROCKS_LOG_INFO(db_options_.info_log, + "Blob file %" PRIu64 " is no longer needed, marking obsolete", + blob_file->BlobFileNumber()); + + ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true); + return true; +} + +template +void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) { + assert(bdb_options_.enable_garbage_collection); + + // Iterate through all live immutable non-TTL blob files, and mark them + // obsolete assuming no SST files or memtables rely on the blobs in them. + // Note: we need to stop as soon as we find a blob file that has any + // linked SSTs (or one potentially referenced by memtables). + + uint64_t obsoleted_files = 0; + + auto it = live_imm_non_ttl_blob_files_.begin(); + while (it != live_imm_non_ttl_blob_files_.end()) { + const auto& blob_file = it->second; + assert(blob_file); + assert(blob_file->BlobFileNumber() == it->first); + assert(!blob_file->HasTTL()); + assert(blob_file->Immutable()); + + // Small optimization: Obsolete() does an atomic read, so we can do + // this check without taking a lock on the blob file's mutex. + if (blob_file->Obsolete()) { + it = live_imm_non_ttl_blob_files_.erase(it); + continue; + } + + if (!mark_if_needed(blob_file)) { + break; + } + + it = live_imm_non_ttl_blob_files_.erase(it); + + ++obsoleted_files; + } + + if (obsoleted_files > 0) { + ROCKS_LOG_INFO(db_options_.info_log, + "%" PRIu64 " blob file(s) marked obsolete by GC", + obsoleted_files); + RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files); + } +} + +void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() { + const SequenceNumber obsolete_seq = GetLatestSequenceNumber(); + + MarkUnreferencedBlobFilesObsoleteImpl( + [this, obsolete_seq](const std::shared_ptr& blob_file) { + WriteLock file_lock(&blob_file->mutex_); + return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq); + }); +} + +void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() { + MarkUnreferencedBlobFilesObsoleteImpl( + [this](const std::shared_ptr& blob_file) { + return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0); + }); +} + +void BlobDBImpl::CloseRandomAccessLocked( + const std::shared_ptr& bfile) { + bfile->CloseRandomAccessLocked(); + open_file_count_--; +} + +Status BlobDBImpl::GetBlobFileReader( + const std::shared_ptr& blob_file, + std::shared_ptr* reader) { + assert(reader != nullptr); + bool fresh_open = false; + Status s = blob_file->GetReader(env_, file_options_, reader, &fresh_open); + if (s.ok() && fresh_open) { + assert(*reader != nullptr); + open_file_count_++; + } + return s; +} + +std::shared_ptr BlobDBImpl::NewBlobFile( + bool has_ttl, const ExpirationRange& expiration_range, + const std::string& reason) { + assert(has_ttl == (expiration_range.first || expiration_range.second)); + + uint64_t file_num = next_file_number_++; + + const uint32_t column_family_id = + static_cast(DefaultColumnFamily())->GetID(); + auto blob_file = std::make_shared( + this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id, + bdb_options_.compression, has_ttl, expiration_range); + + ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'", + blob_file->PathName().c_str(), reason.c_str()); + LogFlush(db_options_.info_log); + + return blob_file; +} + +void BlobDBImpl::RegisterBlobFile(std::shared_ptr blob_file) { + const uint64_t blob_file_number = blob_file->BlobFileNumber(); + + auto it = blob_files_.lower_bound(blob_file_number); + assert(it == blob_files_.end() || it->first != blob_file_number); + + blob_files_.insert(it, + std::map>::value_type( + blob_file_number, std::move(blob_file))); +} + +Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { + std::string fpath(bfile->PathName()); + std::unique_ptr wfile; + const auto& fs = env_->GetFileSystem(); + + Status s = fs->ReopenWritableFile(fpath, file_options_, &wfile, nullptr); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to open blob file for write: %s status: '%s'" + " exists: '%s'", + fpath.c_str(), s.ToString().c_str(), + fs->FileExists(fpath, file_options_.io_options, nullptr) + .ToString() + .c_str()); + return s; + } + + std::unique_ptr fwriter; + fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, file_options_)); + + uint64_t boffset = bfile->GetFileSize(); + if (debug_level_ >= 2 && boffset) { + ROCKS_LOG_DEBUG(db_options_.info_log, + "Open blob file: %s with offset: %" PRIu64, fpath.c_str(), + boffset); + } + + BlobLogWriter::ElemType et = BlobLogWriter::kEtNone; + if (bfile->file_size_ == BlobLogHeader::kSize) { + et = BlobLogWriter::kEtFileHdr; + } else if (bfile->file_size_ > BlobLogHeader::kSize) { + et = BlobLogWriter::kEtRecord; + } else if (bfile->file_size_) { + ROCKS_LOG_WARN(db_options_.info_log, + "Open blob file: %s with wrong size: %" PRIu64, + fpath.c_str(), boffset); + return Status::Corruption("Invalid blob file size"); + } + + constexpr bool do_flush = true; + + bfile->log_writer_ = std::make_shared( + std::move(fwriter), clock_, statistics_, bfile->file_number_, + db_options_.use_fsync, do_flush, boffset); + bfile->log_writer_->last_elem_type_ = et; + + return s; +} + +std::shared_ptr BlobDBImpl::FindBlobFileLocked( + uint64_t expiration) const { + if (open_ttl_files_.empty()) { + return nullptr; + } + + std::shared_ptr tmp = std::make_shared(); + tmp->SetHasTTL(true); + tmp->expiration_range_ = std::make_pair(expiration, 0); + tmp->file_number_ = std::numeric_limits::max(); + + auto citr = open_ttl_files_.equal_range(tmp); + if (citr.first == open_ttl_files_.end()) { + assert(citr.second == open_ttl_files_.end()); + + std::shared_ptr check = *(open_ttl_files_.rbegin()); + return (check->expiration_range_.second <= expiration) ? nullptr : check; + } + + if (citr.first != citr.second) { + return *(citr.first); + } + + auto finditr = citr.second; + if (finditr != open_ttl_files_.begin()) { + --finditr; + } + + bool b2 = (*finditr)->expiration_range_.second <= expiration; + bool b1 = (*finditr)->expiration_range_.first > expiration; + + return (b1 || b2) ? nullptr : (*finditr); +} + +Status BlobDBImpl::CheckOrCreateWriterLocked( + const std::shared_ptr& blob_file, + std::shared_ptr* writer) { + assert(writer != nullptr); + *writer = blob_file->GetWriter(); + if (*writer != nullptr) { + return Status::OK(); + } + Status s = CreateWriterLocked(blob_file); + if (s.ok()) { + *writer = blob_file->GetWriter(); + } + return s; +} + +Status BlobDBImpl::CreateBlobFileAndWriter( + bool has_ttl, const ExpirationRange& expiration_range, + const std::string& reason, std::shared_ptr* blob_file, + std::shared_ptr* writer) { + TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter"); + assert(has_ttl == (expiration_range.first || expiration_range.second)); + assert(blob_file); + assert(writer); + + *blob_file = NewBlobFile(has_ttl, expiration_range, reason); + assert(*blob_file); + + // file not visible, hence no lock + Status s = CheckOrCreateWriterLocked(*blob_file, writer); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to get writer for blob file: %s, error: %s", + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; + } + + assert(*writer); + + s = (*writer)->WriteHeader((*blob_file)->header_); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to write header to new blob file: %s" + " status: '%s'", + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; + } + + (*blob_file)->SetFileSize(BlobLogHeader::kSize); + total_blob_size_ += BlobLogHeader::kSize; + + return s; +} + +Status BlobDBImpl::SelectBlobFile(std::shared_ptr* blob_file) { + assert(blob_file); + + { + ReadLock rl(&mutex_); + + if (open_non_ttl_file_) { + assert(!open_non_ttl_file_->Immutable()); + *blob_file = open_non_ttl_file_; + return Status::OK(); + } + } + + // Check again + WriteLock wl(&mutex_); + + if (open_non_ttl_file_) { + assert(!open_non_ttl_file_->Immutable()); + *blob_file = open_non_ttl_file_; + return Status::OK(); + } + + std::shared_ptr writer; + const Status s = CreateBlobFileAndWriter( + /* has_ttl */ false, ExpirationRange(), + /* reason */ "SelectBlobFile", blob_file, &writer); + if (!s.ok()) { + return s; + } + + RegisterBlobFile(*blob_file); + open_non_ttl_file_ = *blob_file; + + return s; +} + +Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration, + std::shared_ptr* blob_file) { + assert(blob_file); + assert(expiration != kNoExpiration); + + { + ReadLock rl(&mutex_); + + *blob_file = FindBlobFileLocked(expiration); + if (*blob_file != nullptr) { + assert(!(*blob_file)->Immutable()); + return Status::OK(); + } + } + + // Check again + WriteLock wl(&mutex_); + + *blob_file = FindBlobFileLocked(expiration); + if (*blob_file != nullptr) { + assert(!(*blob_file)->Immutable()); + return Status::OK(); + } + + const uint64_t exp_low = + (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs; + const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs; + const ExpirationRange expiration_range(exp_low, exp_high); + + std::ostringstream oss; + oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')'; + + std::shared_ptr writer; + const Status s = + CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range, + /* reason */ oss.str(), blob_file, &writer); + if (!s.ok()) { + return s; + } + + RegisterBlobFile(*blob_file); + open_ttl_files_.insert(*blob_file); + + return s; +} + +class BlobDBImpl::BlobInserter : public WriteBatch::Handler { + private: + const WriteOptions& options_; + BlobDBImpl* blob_db_impl_; + uint32_t default_cf_id_; + WriteBatch batch_; + + public: + BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl, + uint32_t default_cf_id) + : options_(options), + blob_db_impl_(blob_db_impl), + default_cf_id_(default_cf_id) {} + + WriteBatch* batch() { return &batch_; } + + Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration, + &batch_); + return s; + } + + Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key); + return s; + } + + virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key, + const Slice& end_key) { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id, + begin_key, end_key); + return s; + } + + Status SingleDeleteCF(uint32_t /*column_family_id*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } + + Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/, + const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } + + void LogData(const Slice& blob) override { batch_.PutLogData(blob); } +}; + +Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { + StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_WRITE); + uint32_t default_cf_id = + static_cast_with_check(DefaultColumnFamily()) + ->GetID(); + Status s; + BlobInserter blob_inserter(options, this, default_cf_id); + { + // Release write_mutex_ before DB write to avoid race condition with + // flush begin listener, which also require write_mutex_ to sync + // blob files. + MutexLock l(&write_mutex_); + s = updates->Iterate(&blob_inserter); + } + if (!s.ok()) { + return s; + } + return db_->Write(options, blob_inserter.batch()); +} + +Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key, + const Slice& value) { + return PutUntil(options, key, value, kNoExpiration); +} + +Status BlobDBImpl::PutWithTTL(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) { + uint64_t now = EpochNow(); + uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration; + return PutUntil(options, key, value, expiration); +} + +Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) { + StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_PUT); + Status s; + WriteBatch batch; + { + // Release write_mutex_ before DB write to avoid race condition with + // flush begin listener, which also require write_mutex_ to sync + // blob files. + MutexLock l(&write_mutex_); + s = PutBlobValue(options, key, value, expiration, &batch); + } + if (s.ok()) { + s = db_->Write(options, &batch); + } + return s; +} + +Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/, + const Slice& key, const Slice& value, + uint64_t expiration, WriteBatch* batch) { + write_mutex_.AssertHeld(); + Status s; + std::string index_entry; + uint32_t column_family_id = + static_cast_with_check(DefaultColumnFamily()) + ->GetID(); + if (value.size() < bdb_options_.min_blob_size) { + if (expiration == kNoExpiration) { + // Put as normal value + s = batch->Put(key, value); + RecordTick(statistics_, BLOB_DB_WRITE_INLINED); + } else { + // Inlined with TTL + BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value); + s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, + index_entry); + RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL); + } + } else { + std::string compression_output; + Slice value_compressed = GetCompressedSlice(value, &compression_output); + + std::string headerbuf; + BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed, + expiration); + + // Check DB size limit before selecting blob file to + // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be + // done before calling SelectBlobFile(). + s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() + + value_compressed.size()); + if (!s.ok()) { + return s; + } + + std::shared_ptr blob_file; + if (expiration != kNoExpiration) { + s = SelectBlobFileTTL(expiration, &blob_file); + } else { + s = SelectBlobFile(&blob_file); + } + if (s.ok()) { + assert(blob_file != nullptr); + assert(blob_file->GetCompressionType() == bdb_options_.compression); + s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration, + &index_entry); + } + if (s.ok()) { + if (expiration != kNoExpiration) { + WriteLock file_lock(&blob_file->mutex_); + blob_file->ExtendExpirationRange(expiration); + } + s = CloseBlobFileIfNeeded(blob_file); + } + if (s.ok()) { + s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, + index_entry); + } + if (s.ok()) { + if (expiration == kNoExpiration) { + RecordTick(statistics_, BLOB_DB_WRITE_BLOB); + } else { + RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL); + } + } else { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt + " status: '%s' blob_file: '%s'", + blob_file->PathName().c_str(), key.ToString().c_str(), value.size(), + s.ToString().c_str(), blob_file->DumpState().c_str()); + } + } + + RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN); + RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size()); + RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size()); + RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size()); + + return s; +} + +Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, + std::string* compression_output) const { + if (bdb_options_.compression == kNoCompression) { + return raw; + } + StopWatch compression_sw(clock_, statistics_, BLOB_DB_COMPRESSION_MICROS); + CompressionType type = bdb_options_.compression; + CompressionOptions opts; + CompressionContext context(type); + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type, + 0 /* sample_for_compression */); + CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false, + compression_output, nullptr, nullptr); + return *compression_output; +} + +Status BlobDBImpl::DecompressSlice(const Slice& compressed_value, + CompressionType compression_type, + PinnableSlice* value_output) const { + assert(compression_type != kNoCompression); + + BlockContents contents; + auto cfh = static_cast(DefaultColumnFamily()); + + { + StopWatch decompression_sw(clock_, statistics_, + BLOB_DB_DECOMPRESSION_MICROS); + UncompressionContext context(compression_type); + UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), + compression_type); + Status s = UncompressBlockData( + info, compressed_value.data(), compressed_value.size(), &contents, + kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); + if (!s.ok()) { + return Status::Corruption("Unable to decompress blob."); + } + } + + value_output->PinSelf(contents.data); + + return Status::OK(); +} + +Status BlobDBImpl::CompactFiles( + const CompactionOptions& compact_options, + const std::vector& input_file_names, const int output_level, + const int output_path_id, std::vector* const output_file_names, + CompactionJobInfo* compaction_job_info) { + // Note: we need CompactionJobInfo to be able to track updates to the + // blob file <-> SST mappings, so we provide one if the user hasn't, + // assuming that GC is enabled. + CompactionJobInfo info{}; + if (bdb_options_.enable_garbage_collection && !compaction_job_info) { + compaction_job_info = &info; + } + + const Status s = + db_->CompactFiles(compact_options, input_file_names, output_level, + output_path_id, output_file_names, compaction_job_info); + if (!s.ok()) { + return s; + } + + if (bdb_options_.enable_garbage_collection) { + assert(compaction_job_info); + ProcessCompactionJobInfo(*compaction_job_info); + } + + return s; +} + +void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) { + assert(context); + + context->blob_db_impl = this; + context->next_file_number = next_file_number_.load(); + context->current_blob_files.clear(); + for (auto& p : blob_files_) { + context->current_blob_files.insert(p.first); + } + context->fifo_eviction_seq = fifo_eviction_seq_; + context->evict_expiration_up_to = evict_expiration_up_to_; +} + +void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) { + assert(context); + + ReadLock l(&mutex_); + GetCompactionContextCommon(context); +} + +void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context, + BlobCompactionContextGC* context_gc) { + assert(context); + assert(context_gc); + + ReadLock l(&mutex_); + GetCompactionContextCommon(context); + + if (!live_imm_non_ttl_blob_files_.empty()) { + auto it = live_imm_non_ttl_blob_files_.begin(); + std::advance(it, bdb_options_.garbage_collection_cutoff * + live_imm_non_ttl_blob_files_.size()); + context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end() + ? it->first + : std::numeric_limits::max(); + } +} + +void BlobDBImpl::UpdateLiveSSTSize() { + uint64_t live_sst_size = 0; + bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size); + if (ok) { + live_sst_size_.store(live_sst_size); + ROCKS_LOG_INFO(db_options_.info_log, + "Updated total SST file size: %" PRIu64 " bytes.", + live_sst_size); + } else { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to update total SST file size after flush or compaction."); + } + { + // Trigger FIFO eviction if needed. + MutexLock l(&write_mutex_); + Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/); + if (s.IsNoSpace()) { + ROCKS_LOG_WARN(db_options_.info_log, + "DB grow out-of-space after SST size updated. Current live" + " SST size: %" PRIu64 + " , current blob files size: %" PRIu64 ".", + live_sst_size_.load(), total_blob_size_.load()); + } + } +} + +Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict) { + write_mutex_.AssertHeld(); + + uint64_t live_sst_size = live_sst_size_.load(); + if (bdb_options_.max_db_size == 0 || + live_sst_size + total_blob_size_.load() + blob_size <= + bdb_options_.max_db_size) { + return Status::OK(); + } + + if (bdb_options_.is_fifo == false || + (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) { + // FIFO eviction is disabled, or no space to insert new blob even we evict + // all blob files. + return Status::NoSpace( + "Write failed, as writing it would exceed max_db_size limit."); + } + + std::vector> candidate_files; + CopyBlobFiles(&candidate_files); + std::sort(candidate_files.begin(), candidate_files.end(), + BlobFileComparator()); + fifo_eviction_seq_ = GetLatestSequenceNumber(); + + WriteLock l(&mutex_); + + while (!candidate_files.empty() && + live_sst_size + total_blob_size_.load() + blob_size > + bdb_options_.max_db_size) { + std::shared_ptr blob_file = candidate_files.back(); + candidate_files.pop_back(); + WriteLock file_lock(&blob_file->mutex_); + if (blob_file->Obsolete()) { + // File already obsoleted by someone else. + assert(blob_file->Immutable()); + continue; + } + // FIFO eviction can evict open blob files. + if (!blob_file->Immutable()) { + Status s = CloseBlobFile(blob_file); + if (!s.ok()) { + return s; + } + } + assert(blob_file->Immutable()); + auto expiration_range = blob_file->GetExpirationRange(); + ROCKS_LOG_INFO(db_options_.info_log, + "Evict oldest blob file since DB out of space. Current " + "live SST file size: %" PRIu64 ", total blob size: %" PRIu64 + ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64 + ".", + live_sst_size, total_blob_size_.load(), + bdb_options_.max_db_size, blob_file->BlobFileNumber()); + ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/); + evict_expiration_up_to_ = expiration_range.first; + RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED); + RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED, + blob_file->BlobCount()); + RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED, + blob_file->GetFileSize()); + TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted"); + } + if (live_sst_size + total_blob_size_.load() + blob_size > + bdb_options_.max_db_size) { + return Status::NoSpace( + "Write failed, as writing it would exceed max_db_size limit."); + } + return Status::OK(); +} + +Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, + const std::string& headerbuf, const Slice& key, + const Slice& value, uint64_t expiration, + std::string* index_entry) { + Status s; + uint64_t blob_offset = 0; + uint64_t key_offset = 0; + { + WriteLock lockbfile_w(&bfile->mutex_); + std::shared_ptr writer; + s = CheckOrCreateWriterLocked(bfile, &writer); + if (!s.ok()) { + return s; + } + + // write the blob to the blob log. + s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset, + &blob_offset); + } + + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Invalid status in AppendBlob: %s status: '%s'", + bfile->PathName().c_str(), s.ToString().c_str()); + return s; + } + + uint64_t size_put = headerbuf.size() + key.size() + value.size(); + bfile->BlobRecordAdded(size_put); + total_blob_size_ += size_put; + + if (expiration == kNoExpiration) { + BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset, + value.size(), bdb_options_.compression); + } else { + BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(), + blob_offset, value.size(), + bdb_options_.compression); + } + + return s; +} + +std::vector BlobDBImpl::MultiGet(const ReadOptions& read_options, + const std::vector& keys, + std::vector* values) { + StopWatch multiget_sw(clock_, statistics_, BLOB_DB_MULTIGET_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_MULTIGET); + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + ReadOptions ro(read_options); + bool snapshot_created = SetSnapshotIfNeeded(&ro); + + std::vector statuses; + statuses.reserve(keys.size()); + values->clear(); + values->reserve(keys.size()); + PinnableSlice value; + for (size_t i = 0; i < keys.size(); i++) { + statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value)); + values->push_back(value.ToString()); + value.Reset(); + } + if (snapshot_created) { + db_->ReleaseSnapshot(ro.snapshot); + } + return statuses; +} + +bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { + assert(read_options != nullptr); + if (read_options->snapshot != nullptr) { + return false; + } + read_options->snapshot = db_->GetSnapshot(); + return true; +} + +Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value, uint64_t* expiration) { + assert(value); + + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(index_entry); + if (!s.ok()) { + return s; + } + + if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) { + return Status::NotFound("Key expired"); + } + + if (expiration != nullptr) { + if (blob_index.HasTTL()) { + *expiration = blob_index.expiration(); + } else { + *expiration = kNoExpiration; + } + } + + if (blob_index.IsInlined()) { + // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same + // memory buffer to avoid extra copy. + value->PinSelf(blob_index.value()); + return Status::OK(); + } + + CompressionType compression_type = kNoCompression; + s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(), + blob_index.size(), value, &compression_type); + if (!s.ok()) { + return s; + } + + if (compression_type != kNoCompression) { + s = DecompressSlice(*value, compression_type, value); + if (!s.ok()) { + if (debug_level_ >= 2) { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Uncompression error during blob read from file: %" PRIu64 + " blob_offset: %" PRIu64 " blob_size: %" PRIu64 + " key: %s status: '%s'", + blob_index.file_number(), blob_index.offset(), blob_index.size(), + key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str()); + } + return s; + } + } + + return Status::OK(); +} + +Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number, + uint64_t offset, uint64_t size, + PinnableSlice* value, + CompressionType* compression_type) { + assert(value); + assert(compression_type); + assert(*compression_type == kNoCompression); + + if (!size) { + value->PinSelf(""); + return Status::OK(); + } + + // offset has to have certain min, as we will read CRC + // later from the Blob Header, which needs to be also a + // valid offset. + if (offset < + (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) { + if (debug_level_ >= 2) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Invalid blob index file_number: %" PRIu64 + " blob_offset: %" PRIu64 " blob_size: %" PRIu64 + " key: %s", + file_number, offset, size, + key.ToString(/* output_hex */ true).c_str()); + } + + return Status::NotFound("Invalid blob offset"); + } + + std::shared_ptr blob_file; + + { + ReadLock rl(&mutex_); + auto it = blob_files_.find(file_number); + + // file was deleted + if (it == blob_files_.end()) { + return Status::NotFound("Blob Not Found as blob file missing"); + } + + blob_file = it->second; + } + + *compression_type = blob_file->GetCompressionType(); + + // takes locks when called + std::shared_ptr reader; + Status s = GetBlobFileReader(blob_file, &reader); + if (!s.ok()) { + return s; + } + + assert(offset >= key.size() + sizeof(uint32_t)); + const uint64_t record_offset = offset - key.size() - sizeof(uint32_t); + const uint64_t record_size = sizeof(uint32_t) + key.size() + size; + + // Allocate the buffer. This is safe in C++11 + std::string buf; + AlignedBuf aligned_buf; + + // A partial blob record contain checksum, key and value. + Slice blob_record; + + { + StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); + // TODO: rate limit old blob DB file reads. + if (reader->use_direct_io()) { + s = reader->Read(IOOptions(), record_offset, + static_cast(record_size), &blob_record, nullptr, + &aligned_buf, Env::IO_TOTAL /* rate_limiter_priority */); + } else { + buf.reserve(static_cast(record_size)); + s = reader->Read(IOOptions(), record_offset, + static_cast(record_size), &blob_record, &buf[0], + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); + } + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size()); + } + + if (!s.ok()) { + ROCKS_LOG_DEBUG( + db_options_.info_log, + "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64 + ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'", + file_number, offset, size, key.size(), s.ToString().c_str()); + return s; + } + + if (blob_record.size() != record_size) { + ROCKS_LOG_DEBUG( + db_options_.info_log, + "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64 + ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt + ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes", + file_number, offset, size, key.size(), blob_record.size(), record_size); + + return Status::Corruption("Failed to retrieve blob from blob index."); + } + + Slice crc_slice(blob_record.data(), sizeof(uint32_t)); + Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(), + static_cast(size)); + + uint32_t crc_exp = 0; + if (!GetFixed32(&crc_slice, &crc_exp)) { + ROCKS_LOG_DEBUG( + db_options_.info_log, + "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64 + ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'", + file_number, offset, size, key.size(), s.ToString().c_str()); + return Status::Corruption("Unable to decode checksum."); + } + + uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t), + blob_record.size() - sizeof(uint32_t)); + crc = crc32c::Mask(crc); // Adjust for storage + if (crc != crc_exp) { + if (debug_level_ >= 2) { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64 + " blob_size: %" PRIu64 " key: %s status: '%s'", + file_number, offset, size, + key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str()); + } + + return Status::Corruption("Corruption. Blob CRC mismatch"); + } + + value->PinSelf(blob_value); + + return Status::OK(); +} + +Status BlobDBImpl::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) { + return Get(read_options, column_family, key, value, + static_cast(nullptr) /*expiration*/); +} + +Status BlobDBImpl::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration) { + StopWatch get_sw(clock_, statistics_, BLOB_DB_GET_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_GET); + return GetImpl(read_options, column_family, key, value, expiration); +} + +Status BlobDBImpl::GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration) { + if (column_family->GetID() != DefaultColumnFamily()->GetID()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + // TODO(yiwu): For Get() retry if file not found would be a simpler strategy. + ReadOptions ro(read_options); + bool snapshot_created = SetSnapshotIfNeeded(&ro); + + PinnableSlice index_entry; + Status s; + bool is_blob_index = false; + DBImpl::GetImplOptions get_impl_options; + get_impl_options.column_family = column_family; + get_impl_options.value = &index_entry; + get_impl_options.is_blob_index = &is_blob_index; + s = db_impl_->GetImpl(ro, key, get_impl_options); + if (expiration != nullptr) { + *expiration = kNoExpiration; + } + RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ); + if (s.ok()) { + if (is_blob_index) { + s = GetBlobValue(key, index_entry, value, expiration); + } else { + // The index entry is the value itself in this case. + value->PinSelf(index_entry); + } + RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size()); + } + if (snapshot_created) { + db_->ReleaseSnapshot(ro.snapshot); + } + return s; +} + +std::pair BlobDBImpl::SanityCheck(bool aborted) { + if (aborted) { + return std::make_pair(false, -1); + } + + ReadLock rl(&mutex_); + + ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check"); + ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt, + blob_files_.size()); + ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt, + open_ttl_files_.size()); + + for (const auto& blob_file : open_ttl_files_) { + (void)blob_file; + assert(!blob_file->Immutable()); + } + + for (const auto& pair : live_imm_non_ttl_blob_files_) { + const auto& blob_file = pair.second; + (void)blob_file; + assert(!blob_file->HasTTL()); + assert(blob_file->Immutable()); + } + + uint64_t now = EpochNow(); + + for (auto blob_file_pair : blob_files_) { + auto blob_file = blob_file_pair.second; + std::ostringstream buf; + + buf << "Blob file " << blob_file->BlobFileNumber() << ", size " + << blob_file->GetFileSize() << ", blob count " << blob_file->BlobCount() + << ", immutable " << blob_file->Immutable(); + + if (blob_file->HasTTL()) { + ExpirationRange expiration_range; + { + ReadLock file_lock(&blob_file->mutex_); + expiration_range = blob_file->GetExpirationRange(); + } + buf << ", expiration range (" << expiration_range.first << ", " + << expiration_range.second << ")"; + + if (!blob_file->Obsolete()) { + buf << ", expire in " << (expiration_range.second - now) << "seconds"; + } + } + if (blob_file->Obsolete()) { + buf << ", obsolete at " << blob_file->GetObsoleteSequence(); + } + buf << "."; + ROCKS_LOG_INFO(db_options_.info_log, "%s", buf.str().c_str()); + } + + // reschedule + return std::make_pair(true, -1); +} + +Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { + TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile"); + assert(bfile); + assert(!bfile->Immutable()); + assert(!bfile->Obsolete()); + + if (bfile->HasTTL() || bfile == open_non_ttl_file_) { + write_mutex_.AssertHeld(); + } + + ROCKS_LOG_INFO(db_options_.info_log, + "Closing blob file %" PRIu64 ". Path: %s", + bfile->BlobFileNumber(), bfile->PathName().c_str()); + + const SequenceNumber sequence = GetLatestSequenceNumber(); + + const Status s = bfile->WriteFooterAndCloseLocked(sequence); + + if (s.ok()) { + total_blob_size_ += BlobLogFooter::kSize; + } else { + bfile->MarkImmutable(sequence); + + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to close blob file %" PRIu64 "with error: %s", + bfile->BlobFileNumber(), s.ToString().c_str()); + } + + if (bfile->HasTTL()) { + size_t erased __attribute__((__unused__)); + erased = open_ttl_files_.erase(bfile); + } else { + if (bfile == open_non_ttl_file_) { + open_non_ttl_file_ = nullptr; + } + + const uint64_t blob_file_number = bfile->BlobFileNumber(); + auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number); + assert(it == live_imm_non_ttl_blob_files_.end() || + it->first != blob_file_number); + live_imm_non_ttl_blob_files_.insert( + it, std::map>::value_type( + blob_file_number, bfile)); + } + + return s; +} + +Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr& bfile) { + write_mutex_.AssertHeld(); + + // atomic read + if (bfile->GetFileSize() < bdb_options_.blob_file_size) { + return Status::OK(); + } + + WriteLock lock(&mutex_); + WriteLock file_lock(&bfile->mutex_); + + assert(!bfile->Obsolete() || bfile->Immutable()); + if (bfile->Immutable()) { + return Status::OK(); + } + + return CloseBlobFile(bfile); +} + +void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr blob_file, + SequenceNumber obsolete_seq, + bool update_size) { + assert(blob_file->Immutable()); + assert(!blob_file->Obsolete()); + + // Should hold write lock of mutex_ or during DB open. + blob_file->MarkObsolete(obsolete_seq); + obsolete_files_.push_back(blob_file); + assert(total_blob_size_.load() >= blob_file->GetFileSize()); + if (update_size) { + total_blob_size_ -= blob_file->GetFileSize(); + } +} + +bool BlobDBImpl::VisibleToActiveSnapshot( + const std::shared_ptr& bfile) { + assert(bfile->Obsolete()); + + // We check whether the oldest snapshot is no less than the last sequence + // by the time the blob file become obsolete. If so, the blob file is not + // visible to all existing snapshots. + // + // If we keep track of the earliest sequence of the keys in the blob file, + // we could instead check if there's a snapshot falls in range + // [earliest_sequence, obsolete_sequence). But doing so will make the + // implementation more complicated. + SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence(); + SequenceNumber oldest_snapshot = kMaxSequenceNumber; + { + // Need to lock DBImpl mutex before access snapshot list. + InstrumentedMutexLock l(db_impl_->mutex()); + auto& snapshots = db_impl_->snapshots(); + if (!snapshots.empty()) { + oldest_snapshot = snapshots.oldest()->GetSequenceNumber(); + } + } + bool visible = oldest_snapshot < obsolete_sequence; + if (visible) { + ROCKS_LOG_INFO(db_options_.info_log, + "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64 + ") visible to oldest snapshot %" PRIu64 ".", + bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot); + } + return visible; +} + +std::pair BlobDBImpl::EvictExpiredFiles(bool aborted) { + if (aborted) { + return std::make_pair(false, -1); + } + + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0"); + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1"); + + std::vector> process_files; + uint64_t now = EpochNow(); + { + ReadLock rl(&mutex_); + for (auto p : blob_files_) { + auto& blob_file = p.second; + ReadLock file_lock(&blob_file->mutex_); + if (blob_file->HasTTL() && !blob_file->Obsolete() && + blob_file->GetExpirationRange().second <= now) { + process_files.push_back(blob_file); + } + } + } + + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2"); + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3"); + TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr); + + SequenceNumber seq = GetLatestSequenceNumber(); + { + MutexLock l(&write_mutex_); + WriteLock lock(&mutex_); + for (auto& blob_file : process_files) { + WriteLock file_lock(&blob_file->mutex_); + + // Need to double check if the file is obsolete. + if (blob_file->Obsolete()) { + assert(blob_file->Immutable()); + continue; + } + + if (!blob_file->Immutable()) { + CloseBlobFile(blob_file); + } + + assert(blob_file->Immutable()); + + ObsoleteBlobFile(blob_file, seq, true /*update_size*/); + } + } + + return std::make_pair(true, -1); +} + +Status BlobDBImpl::SyncBlobFiles() { + MutexLock l(&write_mutex_); + + std::vector> process_files; + { + ReadLock rl(&mutex_); + for (auto fitr : open_ttl_files_) { + process_files.push_back(fitr); + } + if (open_non_ttl_file_ != nullptr) { + process_files.push_back(open_non_ttl_file_); + } + } + + Status s; + for (auto& blob_file : process_files) { + s = blob_file->Fsync(); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to sync blob file %" PRIu64 ", status: %s", + blob_file->BlobFileNumber(), s.ToString().c_str()); + return s; + } + } + + s = dir_ent_->FsyncWithDirOptions(IOOptions(), nullptr, DirFsyncOptions()); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to sync blob directory, status: %s", + s.ToString().c_str()); + } + return s; +} + +std::pair BlobDBImpl::ReclaimOpenFiles(bool aborted) { + if (aborted) return std::make_pair(false, -1); + + if (open_file_count_.load() < kOpenFilesTrigger) { + return std::make_pair(true, -1); + } + + // in the future, we should sort by last_access_ + // instead of closing every file + ReadLock rl(&mutex_); + for (auto const& ent : blob_files_) { + auto bfile = ent.second; + if (bfile->last_access_.load() == -1) continue; + + WriteLock lockbfile_w(&bfile->mutex_); + CloseRandomAccessLocked(bfile); + } + + return std::make_pair(true, -1); +} + +std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { + if (aborted) { + return std::make_pair(false, -1); + } + + MutexLock delete_file_lock(&delete_file_mutex_); + if (disable_file_deletions_ > 0) { + return std::make_pair(true, -1); + } + + std::list> tobsolete; + { + WriteLock wl(&mutex_); + if (obsolete_files_.empty()) { + return std::make_pair(true, -1); + } + tobsolete.swap(obsolete_files_); + } + + bool file_deleted = false; + for (auto iter = tobsolete.begin(); iter != tobsolete.end();) { + auto bfile = *iter; + { + ReadLock lockbfile_r(&bfile->mutex_); + if (VisibleToActiveSnapshot(bfile)) { + ROCKS_LOG_INFO(db_options_.info_log, + "Could not delete file due to snapshot failure %s", + bfile->PathName().c_str()); + ++iter; + continue; + } + } + ROCKS_LOG_INFO(db_options_.info_log, + "Will delete file due to snapshot success %s", + bfile->PathName().c_str()); + + { + WriteLock wl(&mutex_); + blob_files_.erase(bfile->BlobFileNumber()); + } + + Status s = DeleteDBFile(&(db_impl_->immutable_db_options()), + bfile->PathName(), blob_dir_, true, + /*force_fg=*/false); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "File failed to be deleted as obsolete %s", + bfile->PathName().c_str()); + ++iter; + continue; + } + + file_deleted = true; + ROCKS_LOG_INFO(db_options_.info_log, + "File deleted as obsolete from blob dir %s", + bfile->PathName().c_str()); + + iter = tobsolete.erase(iter); + } + + // directory change. Fsync + if (file_deleted) { + Status s = dir_ent_->FsyncWithDirOptions( + IOOptions(), nullptr, + DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted)); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s", + blob_dir_.c_str(), s.ToString().c_str()); + } + } + + // put files back into obsolete if for some reason, delete failed + if (!tobsolete.empty()) { + WriteLock wl(&mutex_); + for (auto bfile : tobsolete) { + blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); + obsolete_files_.push_front(bfile); + } + } + + return std::make_pair(!aborted, -1); +} + +void BlobDBImpl::CopyBlobFiles( + std::vector>* bfiles_copy) { + ReadLock rl(&mutex_); + for (auto const& p : blob_files_) { + bfiles_copy->push_back(p.second); + } +} + +Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) { + auto* cfd = + static_cast_with_check(DefaultColumnFamily()) + ->cfd(); + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + ManagedSnapshot* own_snapshot = nullptr; + const Snapshot* snapshot = read_options.snapshot; + if (snapshot == nullptr) { + own_snapshot = new ManagedSnapshot(db_); + snapshot = own_snapshot->snapshot(); + } + auto* iter = db_impl_->NewIteratorImpl( + read_options, cfd, snapshot->GetSequenceNumber(), + nullptr /*read_callback*/, true /*expose_blob_index*/); + return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_); +} + +Status DestroyBlobDB(const std::string& dbname, const Options& options, + const BlobDBOptions& bdb_options) { + const ImmutableDBOptions soptions(SanitizeOptions(dbname, options)); + Env* env = soptions.env; + + Status status; + std::string blobdir; + blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir + : bdb_options.blob_dir; + + std::vector filenames; + if (env->GetChildren(blobdir, &filenames).ok()) { + for (const auto& f : filenames) { + uint64_t number; + FileType type; + if (ParseFileName(f, &number, &type) && type == kBlobFile) { + Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true, + /*force_fg=*/false); + if (status.ok() && !del.ok()) { + status = del; + } + } + } + // TODO: What to do if we cannot delete the directory? + env->DeleteDir(blobdir).PermitUncheckedError(); + } + Status destroy = DestroyDB(dbname, options); + if (status.ok() && !destroy.ok()) { + status = destroy; + } + + return status; +} + +#ifndef NDEBUG +Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value) { + return GetBlobValue(key, index_entry, value); +} + +void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number, + SequenceNumber immutable_sequence) { + auto blob_file = std::make_shared(this, blob_dir_, blob_file_number, + db_options_.info_log.get()); + blob_file->MarkImmutable(immutable_sequence); + + blob_files_[blob_file_number] = blob_file; + live_imm_non_ttl_blob_files_[blob_file_number] = blob_file; +} + +std::vector> BlobDBImpl::TEST_GetBlobFiles() const { + ReadLock l(&mutex_); + std::vector> blob_files; + for (auto& p : blob_files_) { + blob_files.emplace_back(p.second); + } + return blob_files; +} + +std::vector> BlobDBImpl::TEST_GetLiveImmNonTTLFiles() + const { + ReadLock l(&mutex_); + std::vector> live_imm_non_ttl_files; + for (const auto& pair : live_imm_non_ttl_blob_files_) { + live_imm_non_ttl_files.emplace_back(pair.second); + } + return live_imm_non_ttl_files; +} + +std::vector> BlobDBImpl::TEST_GetObsoleteFiles() + const { + ReadLock l(&mutex_); + std::vector> obsolete_files; + for (auto& bfile : obsolete_files_) { + obsolete_files.emplace_back(bfile); + } + return obsolete_files; +} + +void BlobDBImpl::TEST_DeleteObsoleteFiles() { + DeleteObsoleteFiles(false /*abort*/); +} + +Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { + MutexLock l(&write_mutex_); + WriteLock lock(&mutex_); + WriteLock file_lock(&bfile->mutex_); + + return CloseBlobFile(bfile); +} + +void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, + SequenceNumber obsolete_seq, + bool update_size) { + return ObsoleteBlobFile(blob_file, obsolete_seq, update_size); +} + +void BlobDBImpl::TEST_EvictExpiredFiles() { + EvictExpiredFiles(false /*abort*/); +} + +uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); } + +void BlobDBImpl::TEST_InitializeBlobFileToSstMapping( + const std::vector& live_files) { + InitializeBlobFileToSstMapping(live_files); +} + +void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) { + ProcessFlushJobInfo(info); +} + +void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) { + ProcessCompactionJobInfo(info); +} + +#endif // !NDEBUG + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.h b/src/rocksdb/utilities/blob_db/blob_db_impl.h new file mode 100644 index 000000000..0b4dbf5e5 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_impl.h @@ -0,0 +1,503 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "db/blob/blob_log_format.h" +#include "db/blob/blob_log_writer.h" +#include "db/db_iter.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" +#include "rocksdb/file_system.h" +#include "rocksdb/listener.h" +#include "rocksdb/options.h" +#include "rocksdb/statistics.h" +#include "rocksdb/wal_filter.h" +#include "util/mutexlock.h" +#include "util/timer_queue.h" +#include "utilities/blob_db/blob_db.h" +#include "utilities/blob_db/blob_file.h" + +namespace ROCKSDB_NAMESPACE { + +class DBImpl; +class ColumnFamilyHandle; +class ColumnFamilyData; +class SystemClock; + +struct FlushJobInfo; + +namespace blob_db { + +struct BlobCompactionContext; +struct BlobCompactionContextGC; +class BlobDBImpl; +class BlobFile; + +// Comparator to sort "TTL" aware Blob files based on the lower value of +// TTL range. +struct BlobFileComparatorTTL { + bool operator()(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) const; +}; + +struct BlobFileComparator { + bool operator()(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) const; +}; + +/** + * The implementation class for BlobDB. It manages the blob logs, which + * are sequentially written files. Blob logs can be of the TTL or non-TTL + * varieties; the former are cleaned up when they expire, while the latter + * are (optionally) garbage collected. + */ +class BlobDBImpl : public BlobDB { + friend class BlobFile; + friend class BlobDBIterator; + friend class BlobDBListener; + friend class BlobDBListenerGC; + friend class BlobIndexCompactionFilterBase; + friend class BlobIndexCompactionFilterGC; + + public: + // deletions check period + static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000; + + // sanity check task + static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000; + + // how many random access open files can we tolerate + static constexpr uint32_t kOpenFilesTrigger = 100; + + // how often to schedule reclaim open files. + static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000; + + // how often to schedule delete obs files periods + static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000; + + // how often to schedule expired files eviction. + static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000; + + // when should oldest file be evicted: + // on reaching 90% of blob_dir_size + static constexpr double kEvictOldestFileAtSize = 0.9; + + using BlobDB::Put; + Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override; + + using BlobDB::Get; + Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) override; + + Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value, + uint64_t* expiration) override; + + using BlobDB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& read_options) override; + + using BlobDB::NewIterators; + virtual Status NewIterators( + const ReadOptions& /*read_options*/, + const std::vector& /*column_families*/, + std::vector* /*iterators*/) override { + return Status::NotSupported("Not implemented"); + } + + using BlobDB::MultiGet; + virtual std::vector MultiGet( + const ReadOptions& read_options, const std::vector& keys, + std::vector* values) override; + + using BlobDB::Write; + virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + + virtual Status Close() override; + + using BlobDB::PutWithTTL; + Status PutWithTTL(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) override; + + using BlobDB::PutUntil; + Status PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) override; + + using BlobDB::CompactFiles; + Status CompactFiles( + const CompactionOptions& compact_options, + const std::vector& input_file_names, const int output_level, + const int output_path_id = -1, + std::vector* const output_file_names = nullptr, + CompactionJobInfo* compaction_job_info = nullptr) override; + + BlobDBOptions GetBlobDBOptions() const override; + + BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options, + const DBOptions& db_options, + const ColumnFamilyOptions& cf_options); + + virtual Status DisableFileDeletions() override; + + virtual Status EnableFileDeletions(bool force) override; + + virtual Status GetLiveFiles(std::vector&, + uint64_t* manifest_file_size, + bool flush_memtable = true) override; + virtual void GetLiveFilesMetaData(std::vector*) override; + + ~BlobDBImpl(); + + Status Open(std::vector* handles); + + Status SyncBlobFiles() override; + + // Common part of the two GetCompactionContext methods below. + // REQUIRES: read lock on mutex_ + void GetCompactionContextCommon(BlobCompactionContext* context); + + void GetCompactionContext(BlobCompactionContext* context); + void GetCompactionContext(BlobCompactionContext* context, + BlobCompactionContextGC* context_gc); + +#ifndef NDEBUG + Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value); + + void TEST_AddDummyBlobFile(uint64_t blob_file_number, + SequenceNumber immutable_sequence); + + std::vector> TEST_GetBlobFiles() const; + + std::vector> TEST_GetLiveImmNonTTLFiles() const; + + std::vector> TEST_GetObsoleteFiles() const; + + Status TEST_CloseBlobFile(std::shared_ptr& bfile); + + void TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, + SequenceNumber obsolete_seq = 0, + bool update_size = true); + + void TEST_EvictExpiredFiles(); + + void TEST_DeleteObsoleteFiles(); + + uint64_t TEST_live_sst_size(); + + const std::string& TEST_blob_dir() const { return blob_dir_; } + + void TEST_InitializeBlobFileToSstMapping( + const std::vector& live_files); + + void TEST_ProcessFlushJobInfo(const FlushJobInfo& info); + + void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info); + +#endif // !NDEBUG + + private: + class BlobInserter; + + // Create a snapshot if there isn't one in read options. + // Return true if a snapshot is created. + bool SetSnapshotIfNeeded(ReadOptions* read_options); + + Status GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration = nullptr); + + Status GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value, uint64_t* expiration = nullptr); + + Status GetRawBlobFromFile(const Slice& key, uint64_t file_number, + uint64_t offset, uint64_t size, + PinnableSlice* value, + CompressionType* compression_type); + + Slice GetCompressedSlice(const Slice& raw, + std::string* compression_output) const; + + Status DecompressSlice(const Slice& compressed_value, + CompressionType compression_type, + PinnableSlice* value_output) const; + + // Close a file by appending a footer, and removes file from open files list. + // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_ + // and the blob file's mutex_. If called on a blob file which is visible only + // to a single thread (like in the case of new files written during + // compaction/GC), the locks on write_mutex_ and the blob file's mutex_ can be + // avoided. + Status CloseBlobFile(std::shared_ptr bfile); + + // Close a file if its size exceeds blob_file_size + // REQUIRES: lock held on write_mutex_. + Status CloseBlobFileIfNeeded(std::shared_ptr& bfile); + + // Mark file as obsolete and move the file to obsolete file list. + // + // REQUIRED: hold write lock of mutex_ or during DB open. + void ObsoleteBlobFile(std::shared_ptr blob_file, + SequenceNumber obsolete_seq, bool update_size); + + Status PutBlobValue(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration, + WriteBatch* batch); + + Status AppendBlob(const std::shared_ptr& bfile, + const std::string& headerbuf, const Slice& key, + const Slice& value, uint64_t expiration, + std::string* index_entry); + + // Create a new blob file and associated writer. + Status CreateBlobFileAndWriter(bool has_ttl, + const ExpirationRange& expiration_range, + const std::string& reason, + std::shared_ptr* blob_file, + std::shared_ptr* writer); + + // Get the open non-TTL blob log file, or create a new one if no such file + // exists. + Status SelectBlobFile(std::shared_ptr* blob_file); + + // Get the open TTL blob log file for a certain expiration, or create a new + // one if no such file exists. + Status SelectBlobFileTTL(uint64_t expiration, + std::shared_ptr* blob_file); + + std::shared_ptr FindBlobFileLocked(uint64_t expiration) const; + + // periodic sanity check. Bunch of checks + std::pair SanityCheck(bool aborted); + + // Delete files that have been marked obsolete (either because of TTL + // or GC). Check whether any snapshots exist which refer to the same. + std::pair DeleteObsoleteFiles(bool aborted); + + // periodically check if open blob files and their TTL's has expired + // if expired, close the sequential writer and make the file immutable + std::pair EvictExpiredFiles(bool aborted); + + // if the number of open files, approaches ULIMIT's this + // task will close random readers, which are kept around for + // efficiency + std::pair ReclaimOpenFiles(bool aborted); + + std::pair RemoveTimerQ(TimerQueue* tq, bool aborted); + + // Adds the background tasks to the timer queue + void StartBackgroundTasks(); + + // add a new Blob File + std::shared_ptr NewBlobFile(bool has_ttl, + const ExpirationRange& expiration_range, + const std::string& reason); + + // Register a new blob file. + // REQUIRES: write lock on mutex_. + void RegisterBlobFile(std::shared_ptr blob_file); + + // collect all the blob log files from the blob directory + Status GetAllBlobFiles(std::set* file_numbers); + + // Open all blob files found in blob_dir. + Status OpenAllBlobFiles(); + + // Link an SST to a blob file. Comes in locking and non-locking varieties + // (the latter is used during Open). + template + void LinkSstToBlobFileImpl(uint64_t sst_file_number, + uint64_t blob_file_number, Linker linker); + + void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number); + + void LinkSstToBlobFileNoLock(uint64_t sst_file_number, + uint64_t blob_file_number); + + // Unlink an SST from a blob file. + void UnlinkSstFromBlobFile(uint64_t sst_file_number, + uint64_t blob_file_number); + + // Initialize the mapping between blob files and SSTs during Open. + void InitializeBlobFileToSstMapping( + const std::vector& live_files); + + // Update the mapping between blob files and SSTs after a flush and mark + // any unneeded blob files obsolete. + void ProcessFlushJobInfo(const FlushJobInfo& info); + + // Update the mapping between blob files and SSTs after a compaction and + // mark any unneeded blob files obsolete. + void ProcessCompactionJobInfo(const CompactionJobInfo& info); + + // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs + // linked to it, and all memtables from before the blob file became immutable + // have been flushed. Note: should only be called if the condition holds for + // all lower-numbered non-TTL blob files as well. + bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr& blob_file, + SequenceNumber obsolete_seq); + + // Mark all immutable non-TTL blob files that aren't needed by any SSTs as + // obsolete. Comes in two varieties; the version used during Open need not + // worry about locking or snapshots. + template + void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed); + + void MarkUnreferencedBlobFilesObsolete(); + void MarkUnreferencedBlobFilesObsoleteDuringOpen(); + + void UpdateLiveSSTSize(); + + Status GetBlobFileReader(const std::shared_ptr& blob_file, + std::shared_ptr* reader); + + // hold write mutex on file and call. + // Close the above Random Access reader + void CloseRandomAccessLocked(const std::shared_ptr& bfile); + + // hold write mutex on file and call + // creates a sequential (append) writer for this blobfile + Status CreateWriterLocked(const std::shared_ptr& bfile); + + // returns a BlobLogWriter object for the file. If writer is not + // already present, creates one. Needs Write Mutex to be held + Status CheckOrCreateWriterLocked(const std::shared_ptr& blob_file, + std::shared_ptr* writer); + + // checks if there is no snapshot which is referencing the + // blobs + bool VisibleToActiveSnapshot(const std::shared_ptr& file); + bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr& bfile); + + void CopyBlobFiles(std::vector>* bfiles_copy); + + uint64_t EpochNow() { return clock_->NowMicros() / 1000000; } + + // Check if inserting a new blob will make DB grow out of space. + // If is_fifo = true, FIFO eviction will be triggered to make room for the + // new blob. If force_evict = true, FIFO eviction will evict blob files + // even eviction will not make enough room for the new blob. + Status CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict = false); + + // name of the database directory + std::string dbname_; + + // the base DB + DBImpl* db_impl_; + Env* env_; + SystemClock* clock_; + // the options that govern the behavior of Blob Storage + BlobDBOptions bdb_options_; + DBOptions db_options_; + ColumnFamilyOptions cf_options_; + FileOptions file_options_; + + // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold + // ownership. + Statistics* statistics_; + + // by default this is "blob_dir" under dbname_ + // but can be configured + std::string blob_dir_; + + // pointer to directory + std::unique_ptr dir_ent_; + + // Read Write Mutex, which protects all the data structures + // HEAVILY TRAFFICKED + mutable port::RWMutex mutex_; + + // Writers has to hold write_mutex_ before writing. + mutable port::Mutex write_mutex_; + + // counter for blob file number + std::atomic next_file_number_; + + // entire metadata of all the BLOB files memory + std::map> blob_files_; + + // All live immutable non-TTL blob files. + std::map> live_imm_non_ttl_blob_files_; + + // The largest sequence number that has been flushed. + SequenceNumber flush_sequence_; + + // opened non-TTL blob file. + std::shared_ptr open_non_ttl_file_; + + // all the blob files which are currently being appended to based + // on variety of incoming TTL's + std::set, BlobFileComparatorTTL> open_ttl_files_; + + // Flag to check whether Close() has been called on this DB + bool closed_; + + // timer based queue to execute tasks + TimerQueue tqueue_; + + // number of files opened for random access/GET + // counter is used to monitor and close excess RA files. + std::atomic open_file_count_; + + // Total size of all live blob files (i.e. exclude obsolete files). + std::atomic total_blob_size_; + + // total size of SST files. + std::atomic live_sst_size_; + + // Latest FIFO eviction timestamp + // + // REQUIRES: access with metex_ lock held. + uint64_t fifo_eviction_seq_; + + // The expiration up to which latest FIFO eviction evicts. + // + // REQUIRES: access with metex_ lock held. + uint64_t evict_expiration_up_to_; + + std::list> obsolete_files_; + + // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block + // on the mutex to avoid contention. + // + // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note + // the difference. mutex_ only needs to be held when access the + // data-structure, and delete_file_mutex_ needs to be held the whole time + // during DeleteObsoleteFiles to avoid being run simultaneously with + // DisableFileDeletions. + // + // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced + // to hold delete_file_mutex_ first to avoid deadlock. + mutable port::Mutex delete_file_mutex_; + + // Each call of DisableFileDeletions will increase disable_file_deletion_ + // by 1. EnableFileDeletions will either decrease the count by 1 or reset + // it to zeor, depending on the force flag. + // + // REQUIRES: access with delete_file_mutex_ held. + int disable_file_deletions_ = 0; + + uint32_t debug_level_; +}; + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc b/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc new file mode 100644 index 000000000..87e3f33cc --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc @@ -0,0 +1,113 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "file/filename.h" +#include "logging/logging.h" +#include "util/cast_util.h" +#include "util/mutexlock.h" +#include "utilities/blob_db/blob_db_impl.h" + +// BlobDBImpl methods to get snapshot of files, e.g. for replication. + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +Status BlobDBImpl::DisableFileDeletions() { + // Disable base DB file deletions. + Status s = db_impl_->DisableFileDeletions(); + if (!s.ok()) { + return s; + } + + int count = 0; + { + // Hold delete_file_mutex_ to make sure no DeleteObsoleteFiles job + // is running. + MutexLock l(&delete_file_mutex_); + count = ++disable_file_deletions_; + } + + ROCKS_LOG_INFO(db_options_.info_log, + "Disabled blob file deletions. count: %d", count); + return Status::OK(); +} + +Status BlobDBImpl::EnableFileDeletions(bool force) { + // Enable base DB file deletions. + Status s = db_impl_->EnableFileDeletions(force); + if (!s.ok()) { + return s; + } + + int count = 0; + { + MutexLock l(&delete_file_mutex_); + if (force) { + disable_file_deletions_ = 0; + } else if (disable_file_deletions_ > 0) { + count = --disable_file_deletions_; + } + assert(count >= 0); + } + + ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d", + count); + // Consider trigger DeleteobsoleteFiles once after re-enabled, if we are to + // make DeleteobsoleteFiles re-run interval configuration. + return Status::OK(); +} + +Status BlobDBImpl::GetLiveFiles(std::vector& ret, + uint64_t* manifest_file_size, + bool flush_memtable) { + if (!bdb_options_.path_relative) { + return Status::NotSupported( + "Not able to get relative blob file path from absolute blob_dir."); + } + // Hold a lock in the beginning to avoid updates to base DB during the call + ReadLock rl(&mutex_); + Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable); + if (!s.ok()) { + return s; + } + ret.reserve(ret.size() + blob_files_.size()); + for (auto bfile_pair : blob_files_) { + auto blob_file = bfile_pair.second; + // Path should be relative to db_name, but begin with slash. + ret.emplace_back( + BlobFileName("", bdb_options_.blob_dir, blob_file->BlobFileNumber())); + } + return Status::OK(); +} + +void BlobDBImpl::GetLiveFilesMetaData(std::vector* metadata) { + // Path should be relative to db_name. + assert(bdb_options_.path_relative); + // Hold a lock in the beginning to avoid updates to base DB during the call + ReadLock rl(&mutex_); + db_->GetLiveFilesMetaData(metadata); + for (auto bfile_pair : blob_files_) { + auto blob_file = bfile_pair.second; + LiveFileMetaData filemetadata; + filemetadata.size = blob_file->GetFileSize(); + const uint64_t file_number = blob_file->BlobFileNumber(); + // Path should be relative to db_name, but begin with slash. + filemetadata.name = BlobFileName("", bdb_options_.blob_dir, file_number); + filemetadata.file_number = file_number; + if (blob_file->HasTTL()) { + filemetadata.oldest_ancester_time = blob_file->GetExpirationRange().first; + } + auto cfh = + static_cast_with_check(DefaultColumnFamily()); + filemetadata.column_family_name = cfh->GetName(); + metadata->emplace_back(filemetadata); + } +} + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_iterator.h b/src/rocksdb/utilities/blob_db/blob_db_iterator.h new file mode 100644 index 000000000..fd2b2f8f5 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_iterator.h @@ -0,0 +1,150 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include "db/arena_wrapped_db_iter.h" +#include "rocksdb/iterator.h" +#include "util/stop_watch.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace ROCKSDB_NAMESPACE { +class Statistics; +class SystemClock; + +namespace blob_db { + +using ROCKSDB_NAMESPACE::ManagedSnapshot; + +class BlobDBIterator : public Iterator { + public: + BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter, + BlobDBImpl* blob_db, SystemClock* clock, + Statistics* statistics) + : snapshot_(snapshot), + iter_(iter), + blob_db_(blob_db), + clock_(clock), + statistics_(statistics) {} + + virtual ~BlobDBIterator() = default; + + bool Valid() const override { + if (!iter_->Valid()) { + return false; + } + return status_.ok(); + } + + Status status() const override { + if (!iter_->status().ok()) { + return iter_->status(); + } + return status_; + } + + void SeekToFirst() override { + StopWatch seek_sw(clock_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); + iter_->SeekToFirst(); + while (UpdateBlobValue()) { + iter_->Next(); + } + } + + void SeekToLast() override { + StopWatch seek_sw(clock_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); + iter_->SeekToLast(); + while (UpdateBlobValue()) { + iter_->Prev(); + } + } + + void Seek(const Slice& target) override { + StopWatch seek_sw(clock_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); + iter_->Seek(target); + while (UpdateBlobValue()) { + iter_->Next(); + } + } + + void SeekForPrev(const Slice& target) override { + StopWatch seek_sw(clock_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); + iter_->SeekForPrev(target); + while (UpdateBlobValue()) { + iter_->Prev(); + } + } + + void Next() override { + assert(Valid()); + StopWatch next_sw(clock_, statistics_, BLOB_DB_NEXT_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_NEXT); + iter_->Next(); + while (UpdateBlobValue()) { + iter_->Next(); + } + } + + void Prev() override { + assert(Valid()); + StopWatch prev_sw(clock_, statistics_, BLOB_DB_PREV_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_PREV); + iter_->Prev(); + while (UpdateBlobValue()) { + iter_->Prev(); + } + } + + Slice key() const override { + assert(Valid()); + return iter_->key(); + } + + Slice value() const override { + assert(Valid()); + if (!iter_->IsBlob()) { + return iter_->value(); + } + return value_; + } + + // Iterator::Refresh() not supported. + + private: + // Return true if caller should continue to next value. + bool UpdateBlobValue() { + value_.Reset(); + status_ = Status::OK(); + if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) { + Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); + if (s.IsNotFound()) { + return true; + } else { + if (!s.ok()) { + status_ = s; + } + return false; + } + } else { + return false; + } + } + + std::unique_ptr snapshot_; + std::unique_ptr iter_; + BlobDBImpl* blob_db_; + SystemClock* clock_; + Statistics* statistics_; + Status status_; + PinnableSlice value_; +}; +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_listener.h b/src/rocksdb/utilities/blob_db/blob_db_listener.h new file mode 100644 index 000000000..d17d29853 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_listener.h @@ -0,0 +1,71 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include + +#include "rocksdb/listener.h" +#include "util/mutexlock.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +class BlobDBListener : public EventListener { + public: + explicit BlobDBListener(BlobDBImpl* blob_db_impl) + : blob_db_impl_(blob_db_impl) {} + + void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->SyncBlobFiles(); + } + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->UpdateLiveSSTSize(); + } + + void OnCompactionCompleted(DB* /*db*/, + const CompactionJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->UpdateLiveSSTSize(); + } + + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "BlobDBListener"; } + + protected: + BlobDBImpl* blob_db_impl_; +}; + +class BlobDBListenerGC : public BlobDBListener { + public: + explicit BlobDBListenerGC(BlobDBImpl* blob_db_impl) + : BlobDBListener(blob_db_impl) {} + + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "BlobDBListenerGC"; } + void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { + BlobDBListener::OnFlushCompleted(db, info); + + assert(blob_db_impl_); + blob_db_impl_->ProcessFlushJobInfo(info); + } + + void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) override { + BlobDBListener::OnCompactionCompleted(db, info); + + assert(blob_db_impl_); + blob_db_impl_->ProcessCompactionJobInfo(info); + } +}; + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_test.cc b/src/rocksdb/utilities/blob_db/blob_db_test.cc new file mode 100644 index 000000000..e392962b2 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_test.cc @@ -0,0 +1,2407 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_db.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "db/blob/blob_index.h" +#include "db/db_test_util.h" +#include "env/composite_env_wrapper.h" +#include "file/file_util.h" +#include "file/sst_file_manager_impl.h" +#include "port/port.h" +#include "rocksdb/utilities/debug.h" +#include "test_util/mock_time_env.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "util/random.h" +#include "util/string_util.h" +#include "utilities/blob_db/blob_db_impl.h" +#include "utilities/fault_injection_env.h" + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +class BlobDBTest : public testing::Test { + public: + const int kMaxBlobSize = 1 << 14; + + struct BlobIndexVersion { + BlobIndexVersion() = default; + BlobIndexVersion(std::string _user_key, uint64_t _file_number, + uint64_t _expiration, SequenceNumber _sequence, + ValueType _type) + : user_key(std::move(_user_key)), + file_number(_file_number), + expiration(_expiration), + sequence(_sequence), + type(_type) {} + + std::string user_key; + uint64_t file_number = kInvalidBlobFileNumber; + uint64_t expiration = kNoExpiration; + SequenceNumber sequence = 0; + ValueType type = kTypeValue; + }; + + BlobDBTest() + : dbname_(test::PerThreadDBPath("blob_db_test")), blob_db_(nullptr) { + mock_clock_ = std::make_shared(SystemClock::Default()); + mock_env_.reset(new CompositeEnvWrapper(Env::Default(), mock_clock_)); + fault_injection_env_.reset(new FaultInjectionTestEnv(Env::Default())); + + Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions()); + assert(s.ok()); + } + + ~BlobDBTest() override { + SyncPoint::GetInstance()->ClearAllCallBacks(); + Destroy(); + } + + Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(), + Options options = Options()) { + options.create_if_missing = true; + if (options.env == mock_env_.get()) { + // Need to disable stats dumping and persisting which also use + // RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal. + // With mocked time, this can hang on some platforms (MacOS) + // because (a) on some platforms, pthread_cond_timedwait does not appear + // to release the lock for other threads to operate if the deadline time + // is already passed, and (b) TimedWait calls are currently a bad + // abstraction because the deadline parameter is usually computed from + // Env time, but is interpreted in real clock time. + options.stats_dump_period_sec = 0; + options.stats_persist_period_sec = 0; + } + return BlobDB::Open(options, bdb_options, dbname_, &blob_db_); + } + + void Open(BlobDBOptions bdb_options = BlobDBOptions(), + Options options = Options()) { + ASSERT_OK(TryOpen(bdb_options, options)); + } + + void Reopen(BlobDBOptions bdb_options = BlobDBOptions(), + Options options = Options()) { + assert(blob_db_ != nullptr); + delete blob_db_; + blob_db_ = nullptr; + Open(bdb_options, options); + } + + void Close() { + assert(blob_db_ != nullptr); + delete blob_db_; + blob_db_ = nullptr; + } + + void Destroy() { + if (blob_db_) { + Options options = blob_db_->GetOptions(); + BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions(); + delete blob_db_; + blob_db_ = nullptr; + ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options)); + } + } + + BlobDBImpl *blob_db_impl() { + return reinterpret_cast(blob_db_); + } + + Status Put(const Slice &key, const Slice &value, + std::map *data = nullptr) { + Status s = blob_db_->Put(WriteOptions(), key, value); + if (data != nullptr) { + (*data)[key.ToString()] = value.ToString(); + } + return s; + } + + void Delete(const std::string &key, + std::map *data = nullptr) { + ASSERT_OK(blob_db_->Delete(WriteOptions(), key)); + if (data != nullptr) { + data->erase(key); + } + } + + Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl, + std::map *data = nullptr) { + Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl); + if (data != nullptr) { + (*data)[key.ToString()] = value.ToString(); + } + return s; + } + + Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) { + return blob_db_->PutUntil(WriteOptions(), key, value, expiration); + } + + void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd, + std::map *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = rnd->HumanReadableString(len); + ASSERT_OK( + blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl)); + if (data != nullptr) { + (*data)[key] = value; + } + } + + void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd, + std::map *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = rnd->HumanReadableString(len); + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value), + expiration)); + if (data != nullptr) { + (*data)[key] = value; + } + } + + void PutRandom(const std::string &key, Random *rnd, + std::map *data = nullptr) { + PutRandom(blob_db_, key, rnd, data); + } + + void PutRandom(DB *db, const std::string &key, Random *rnd, + std::map *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = rnd->HumanReadableString(len); + ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); + if (data != nullptr) { + (*data)[key] = value; + } + } + + void PutRandomToWriteBatch( + const std::string &key, Random *rnd, WriteBatch *batch, + std::map *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = rnd->HumanReadableString(len); + ASSERT_OK(batch->Put(key, value)); + if (data != nullptr) { + (*data)[key] = value; + } + } + + // Verify blob db contain expected data and nothing more. + void VerifyDB(const std::map &data) { + VerifyDB(blob_db_, data); + } + + void VerifyDB(DB *db, const std::map &data) { + // Verify normal Get + auto *cfh = db->DefaultColumnFamily(); + for (auto &p : data) { + PinnableSlice value_slice; + ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice)); + ASSERT_EQ(p.second, value_slice.ToString()); + std::string value; + ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value)); + ASSERT_EQ(p.second, value); + } + + // Verify iterators + Iterator *iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + for (auto &p : data) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(p.first, iter->key().ToString()); + ASSERT_EQ(p.second, iter->value().ToString()); + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + delete iter; + } + + void VerifyBaseDB( + const std::map &expected_versions) { + auto *bdb_impl = static_cast(blob_db_); + DB *db = blob_db_->GetRootDB(); + const size_t kMaxKeys = 10000; + std::vector versions; + ASSERT_OK(GetAllKeyVersions(db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(expected_versions.size(), versions.size()); + size_t i = 0; + for (auto &key_version : expected_versions) { + const KeyVersion &expected_version = key_version.second; + ASSERT_EQ(expected_version.user_key, versions[i].user_key); + ASSERT_EQ(expected_version.sequence, versions[i].sequence); + ASSERT_EQ(expected_version.type, versions[i].type); + if (versions[i].type == kTypeValue) { + ASSERT_EQ(expected_version.value, versions[i].value); + } else { + ASSERT_EQ(kTypeBlobIndex, versions[i].type); + PinnableSlice value; + ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key, + versions[i].value, &value)); + ASSERT_EQ(expected_version.value, value.ToString()); + } + i++; + } + } + + void VerifyBaseDBBlobIndex( + const std::map &expected_versions) { + const size_t kMaxKeys = 10000; + std::vector versions; + ASSERT_OK( + GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions)); + ASSERT_EQ(versions.size(), expected_versions.size()); + + size_t i = 0; + for (const auto &expected_pair : expected_versions) { + const BlobIndexVersion &expected_version = expected_pair.second; + + ASSERT_EQ(versions[i].user_key, expected_version.user_key); + ASSERT_EQ(versions[i].sequence, expected_version.sequence); + ASSERT_EQ(versions[i].type, expected_version.type); + if (versions[i].type != kTypeBlobIndex) { + ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number); + ASSERT_EQ(kNoExpiration, expected_version.expiration); + + ++i; + continue; + } + + BlobIndex blob_index; + ASSERT_OK(blob_index.DecodeFrom(versions[i].value)); + + const uint64_t file_number = !blob_index.IsInlined() + ? blob_index.file_number() + : kInvalidBlobFileNumber; + ASSERT_EQ(file_number, expected_version.file_number); + + const uint64_t expiration = + blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration; + ASSERT_EQ(expiration, expected_version.expiration); + + ++i; + } + } + + void InsertBlobs() { + WriteOptions wo; + std::string value; + + Random rnd(301); + for (size_t i = 0; i < 100000; i++) { + uint64_t ttl = rnd.Next() % 86400; + PutRandomWithTTL("key" + std::to_string(i % 500), ttl, &rnd, nullptr); + } + + for (size_t i = 0; i < 10; i++) { + Delete("key" + std::to_string(i % 500)); + } + } + + const std::string dbname_; + std::shared_ptr mock_clock_; + std::unique_ptr mock_env_; + std::unique_ptr fault_injection_env_; + BlobDB *blob_db_; +}; // class BlobDBTest + +TEST_F(BlobDBTest, Put) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map data; + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + std::to_string(i), &rnd, &data); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, PutWithTTL) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + std::map data; + mock_clock_->SetCurrentTime(50); + for (size_t i = 0; i < 100; i++) { + uint64_t ttl = rnd.Next() % 100; + PutRandomWithTTL("key" + std::to_string(i), ttl, &rnd, + (ttl <= 50 ? nullptr : &data)); + } + mock_clock_->SetCurrentTime(100); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); + VerifyDB(data); +} + +TEST_F(BlobDBTest, PutUntil) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + std::map data; + mock_clock_->SetCurrentTime(50); + for (size_t i = 0; i < 100; i++) { + uint64_t expiration = rnd.Next() % 100 + 50; + PutRandomUntil("key" + std::to_string(i), expiration, &rnd, + (expiration <= 100 ? nullptr : &data)); + } + mock_clock_->SetCurrentTime(100); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); + VerifyDB(data); +} + +TEST_F(BlobDBTest, StackableDBGet) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map data; + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + std::to_string(i), &rnd, &data); + } + for (size_t i = 0; i < 100; i++) { + StackableDB *db = blob_db_; + ColumnFamilyHandle *column_family = db->DefaultColumnFamily(); + std::string key = "key" + std::to_string(i); + PinnableSlice pinnable_value; + ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value)); + std::string string_value; + ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value)); + ASSERT_EQ(string_value, pinnable_value.ToString()); + ASSERT_EQ(string_value, data[key]); + } +} + +TEST_F(BlobDBTest, GetExpiration) { + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + mock_clock_->SetCurrentTime(100); + Open(bdb_options, options); + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(PutWithTTL("key2", "value2", 200)); + PinnableSlice value; + uint64_t expiration; + ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration)); + ASSERT_EQ("value1", value.ToString()); + ASSERT_EQ(kNoExpiration, expiration); + ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration)); + ASSERT_EQ("value2", value.ToString()); + ASSERT_EQ(300 /* = 100 + 200 */, expiration); +} + +TEST_F(BlobDBTest, GetIOError) { + Options options; + options.env = fault_injection_env_.get(); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; // Make sure value write to blob file + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily(); + PinnableSlice value; + ASSERT_OK(Put("foo", "bar")); + fault_injection_env_->SetFilesystemActive(false, Status::IOError()); + Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value); + ASSERT_TRUE(s.IsIOError()); + // Reactivate file system to allow test to close DB. + fault_injection_env_->SetFilesystemActive(true); +} + +TEST_F(BlobDBTest, PutIOError) { + Options options; + options.env = fault_injection_env_.get(); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; // Make sure value write to blob file + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + fault_injection_env_->SetFilesystemActive(false, Status::IOError()); + ASSERT_TRUE(Put("foo", "v1").IsIOError()); + fault_injection_env_->SetFilesystemActive(true, Status::IOError()); + ASSERT_OK(Put("bar", "v1")); +} + +TEST_F(BlobDBTest, WriteBatch) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map data; + for (size_t i = 0; i < 100; i++) { + WriteBatch batch; + for (size_t j = 0; j < 10; j++) { + PutRandomToWriteBatch("key" + std::to_string(j * 100 + i), &rnd, &batch, + &data); + } + + ASSERT_OK(blob_db_->Write(WriteOptions(), &batch)); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, Delete) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map data; + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + std::to_string(i), &rnd, &data); + } + for (size_t i = 0; i < 100; i += 5) { + Delete("key" + std::to_string(i), &data); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, DeleteBatch) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + std::to_string(i), &rnd); + } + WriteBatch batch; + for (size_t i = 0; i < 100; i++) { + ASSERT_OK(batch.Delete("key" + std::to_string(i))); + } + ASSERT_OK(blob_db_->Write(WriteOptions(), &batch)); + // DB should be empty. + VerifyDB({}); +} + +TEST_F(BlobDBTest, Override) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map data; + for (int i = 0; i < 10000; i++) { + PutRandom("key" + std::to_string(i), &rnd, nullptr); + } + // override all the keys + for (int i = 0; i < 10000; i++) { + PutRandom("key" + std::to_string(i), &rnd, &data); + } + VerifyDB(data); +} + +#ifdef SNAPPY +TEST_F(BlobDBTest, Compression) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + bdb_options.compression = CompressionType::kSnappyCompression; + Open(bdb_options); + std::map data; + for (size_t i = 0; i < 100; i++) { + PutRandom("put-key" + std::to_string(i), &rnd, &data); + } + for (int i = 0; i < 100; i++) { + WriteBatch batch; + for (size_t j = 0; j < 10; j++) { + PutRandomToWriteBatch("write-batch-key" + std::to_string(j * 100 + i), + &rnd, &batch, &data); + } + ASSERT_OK(blob_db_->Write(WriteOptions(), &batch)); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, DecompressAfterReopen) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + bdb_options.compression = CompressionType::kSnappyCompression; + Open(bdb_options); + std::map data; + for (size_t i = 0; i < 100; i++) { + PutRandom("put-key" + std::to_string(i), &rnd, &data); + } + VerifyDB(data); + bdb_options.compression = CompressionType::kNoCompression; + Reopen(bdb_options); + VerifyDB(data); +} + +TEST_F(BlobDBTest, EnableDisableCompressionGC) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.garbage_collection_cutoff = 1.0; + bdb_options.disable_background_tasks = true; + bdb_options.compression = kSnappyCompression; + Open(bdb_options); + std::map data; + size_t data_idx = 0; + for (; data_idx < 100; data_idx++) { + PutRandom("put-key" + std::to_string(data_idx), &rnd, &data); + } + VerifyDB(data); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_EQ(kSnappyCompression, blob_files[0]->GetCompressionType()); + + // disable compression + bdb_options.compression = kNoCompression; + Reopen(bdb_options); + + // Add more data with new compression type + for (; data_idx < 200; data_idx++) { + PutRandom("put-key" + std::to_string(data_idx), &rnd, &data); + } + VerifyDB(data); + + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(kNoCompression, blob_files[1]->GetCompressionType()); + + // Enable GC. If we do it earlier the snapshot release triggered compaction + // may compact files and trigger GC before we can verify there are two files. + bdb_options.enable_garbage_collection = true; + Reopen(bdb_options); + + // Trigger compaction + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + VerifyDB(data); + + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + for (auto bfile : blob_files) { + ASSERT_EQ(kNoCompression, bfile->GetCompressionType()); + } + + // enabling the compression again + bdb_options.compression = kSnappyCompression; + Reopen(bdb_options); + + // Add more data with new compression type + for (; data_idx < 300; data_idx++) { + PutRandom("put-key" + std::to_string(data_idx), &rnd, &data); + } + VerifyDB(data); + + // Trigger compaction + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + VerifyDB(data); + + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + for (auto bfile : blob_files) { + ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType()); + } +} + +#ifdef LZ4 +// Test switch compression types and run GC, it needs both Snappy and LZ4 +// support. +TEST_F(BlobDBTest, ChangeCompressionGC) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.garbage_collection_cutoff = 1.0; + bdb_options.disable_background_tasks = true; + bdb_options.compression = kLZ4Compression; + Open(bdb_options); + std::map data; + size_t data_idx = 0; + for (; data_idx < 100; data_idx++) { + PutRandom("put-key" + std::to_string(data_idx), &rnd, &data); + } + VerifyDB(data); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_EQ(kLZ4Compression, blob_files[0]->GetCompressionType()); + + // Change compression type + bdb_options.compression = kSnappyCompression; + Reopen(bdb_options); + + // Add more data with Snappy compression type + for (; data_idx < 200; data_idx++) { + PutRandom("put-key" + std::to_string(data_idx), &rnd, &data); + } + VerifyDB(data); + + // Verify blob file compression type + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(kSnappyCompression, blob_files[1]->GetCompressionType()); + + // Enable GC. If we do it earlier the snapshot release triggered compaction + // may compact files and trigger GC before we can verify there are two files. + bdb_options.enable_garbage_collection = true; + Reopen(bdb_options); + + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + VerifyDB(data); + + blob_db_impl()->TEST_DeleteObsoleteFiles(); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + for (auto bfile : blob_files) { + ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType()); + } + + // Disable compression + bdb_options.compression = kNoCompression; + Reopen(bdb_options); + for (; data_idx < 300; data_idx++) { + PutRandom("put-key" + std::to_string(data_idx), &rnd, &data); + } + VerifyDB(data); + + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + VerifyDB(data); + + blob_db_impl()->TEST_DeleteObsoleteFiles(); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + for (auto bfile : blob_files) { + ASSERT_EQ(kNoCompression, bfile->GetCompressionType()); + } + + // switching different compression types to generate mixed compression types + bdb_options.compression = kSnappyCompression; + Reopen(bdb_options); + for (; data_idx < 400; data_idx++) { + PutRandom("put-key" + std::to_string(data_idx), &rnd, &data); + } + VerifyDB(data); + + bdb_options.compression = kLZ4Compression; + Reopen(bdb_options); + for (; data_idx < 500; data_idx++) { + PutRandom("put-key" + std::to_string(data_idx), &rnd, &data); + } + VerifyDB(data); + + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + VerifyDB(data); + + blob_db_impl()->TEST_DeleteObsoleteFiles(); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + for (auto bfile : blob_files) { + ASSERT_EQ(kLZ4Compression, bfile->GetCompressionType()); + } +} +#endif // LZ4 +#endif // SNAPPY + +TEST_F(BlobDBTest, MultipleWriters) { + Open(BlobDBOptions()); + + std::vector workers; + std::vector> data_set(10); + for (uint32_t i = 0; i < 10; i++) + workers.push_back(port::Thread( + [&](uint32_t id) { + Random rnd(301 + id); + for (int j = 0; j < 100; j++) { + std::string key = + "key" + std::to_string(id) + "_" + std::to_string(j); + if (id < 5) { + PutRandom(key, &rnd, &data_set[id]); + } else { + WriteBatch batch; + PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]); + ASSERT_OK(blob_db_->Write(WriteOptions(), &batch)); + } + } + }, + i)); + std::map data; + for (size_t i = 0; i < 10; i++) { + workers[i].join(); + data.insert(data_set[i].begin(), data_set[i].end()); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, SstFileManager) { + // run the same test for Get(), MultiGet() and Iterator each. + std::shared_ptr sst_file_manager( + NewSstFileManager(mock_env_.get())); + sst_file_manager->SetDeleteRateBytesPerSecond(1); + SstFileManagerImpl *sfm = + static_cast(sst_file_manager.get()); + + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.enable_garbage_collection = true; + bdb_options.garbage_collection_cutoff = 1.0; + Options db_options; + + int files_scheduled_to_delete = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) { + assert(arg); + const std::string *const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + ++files_scheduled_to_delete; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + db_options.sst_file_manager = sst_file_manager; + + Open(bdb_options, db_options); + + // Create one obselete file and clean it. + ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar")); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + std::shared_ptr bfile = blob_files[0]; + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + + // Even if SSTFileManager is not set, DB is creating a dummy one. + ASSERT_EQ(1, files_scheduled_to_delete); + Destroy(); + // Make sure that DestroyBlobDB() also goes through delete scheduler. + ASSERT_EQ(2, files_scheduled_to_delete); + SyncPoint::GetInstance()->DisableProcessing(); + sfm->WaitForEmptyTrash(); +} + +TEST_F(BlobDBTest, SstFileManagerRestart) { + int files_scheduled_to_delete = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) { + assert(arg); + const std::string *const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + ++files_scheduled_to_delete; + } + }); + + // run the same test for Get(), MultiGet() and Iterator each. + std::shared_ptr sst_file_manager( + NewSstFileManager(mock_env_.get())); + sst_file_manager->SetDeleteRateBytesPerSecond(1); + SstFileManagerImpl *sfm = + static_cast(sst_file_manager.get()); + + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + Options db_options; + + SyncPoint::GetInstance()->EnableProcessing(); + db_options.sst_file_manager = sst_file_manager; + + Open(bdb_options, db_options); + std::string blob_dir = blob_db_impl()->TEST_blob_dir(); + ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar")); + Close(); + + // Create 3 dummy trash files under the blob_dir + const auto &fs = db_options.env->GetFileSystem(); + ASSERT_OK(CreateFile(fs, blob_dir + "/000666.blob.trash", "", false)); + ASSERT_OK(CreateFile(fs, blob_dir + "/000888.blob.trash", "", true)); + ASSERT_OK(CreateFile(fs, blob_dir + "/something_not_match.trash", "", false)); + + // Make sure that reopening the DB rescan the existing trash files + Open(bdb_options, db_options); + ASSERT_EQ(files_scheduled_to_delete, 2); + + sfm->WaitForEmptyTrash(); + + // There should be exact one file under the blob dir now. + std::vector all_files; + ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files)); + int nfiles = 0; + for (const auto &f : all_files) { + assert(!f.empty()); + if (f[0] == '.') { + continue; + } + nfiles++; + } + ASSERT_EQ(nfiles, 1); + + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(BlobDBTest, SnapshotAndGarbageCollection) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.enable_garbage_collection = true; + bdb_options.garbage_collection_cutoff = 1.0; + bdb_options.disable_background_tasks = true; + + Options options; + options.disable_auto_compactions = true; + + // i = when to take snapshot + for (int i = 0; i < 4; i++) { + Destroy(); + Open(bdb_options, options); + + const Snapshot *snapshot = nullptr; + + // First file + ASSERT_OK(Put("key1", "value")); + if (i == 0) { + snapshot = blob_db_->GetSnapshot(); + } + + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + + // Second file + ASSERT_OK(Put("key2", "value")); + if (i == 1) { + snapshot = blob_db_->GetSnapshot(); + } + + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + auto bfile = blob_files[1]; + ASSERT_FALSE(bfile->Immutable()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); + + // Third file + ASSERT_OK(Put("key3", "value")); + if (i == 2) { + snapshot = blob_db_->GetSnapshot(); + } + + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_TRUE(bfile->Obsolete()); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), + bfile->GetObsoleteSequence()); + + Delete("key2"); + if (i == 3) { + snapshot = blob_db_->GetSnapshot(); + } + + ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + + if (i >= 2) { + // The snapshot shouldn't see data in bfile + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_->ReleaseSnapshot(snapshot); + } else { + // The snapshot will see data in bfile, so the file shouldn't be deleted + ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_->ReleaseSnapshot(snapshot); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + } + } +} + +TEST_F(BlobDBTest, ColumnFamilyNotSupported) { + Options options; + options.env = mock_env_.get(); + mock_clock_->SetCurrentTime(0); + Open(BlobDBOptions(), options); + ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily(); + ColumnFamilyHandle *handle = nullptr; + std::string value; + std::vector values; + // The call simply pass through to base db. It should succeed. + ASSERT_OK( + blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle)); + ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported()); + ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60) + .IsNotSupported()); + ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100) + .IsNotSupported()); + WriteBatch batch; + ASSERT_OK(batch.Put("k1", "v1")); + ASSERT_OK(batch.Put(handle, "k2", "v2")); + ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported()); + ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + ASSERT_TRUE( + blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported()); + auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle}, + {"k1", "k2"}, &values); + ASSERT_EQ(2, statuses.size()); + ASSERT_TRUE(statuses[0].IsNotSupported()); + ASSERT_TRUE(statuses[1].IsNotSupported()); + ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle)); + delete handle; +} + +TEST_F(BlobDBTest, GetLiveFilesMetaData) { + Random rnd(301); + + BlobDBOptions bdb_options; + bdb_options.blob_dir = "blob_dir"; + bdb_options.path_relative = true; + bdb_options.ttl_range_secs = 10; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + + Options options; + options.env = mock_env_.get(); + + Open(bdb_options, options); + + std::map data; + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + std::to_string(i), &rnd, &data); + } + + constexpr uint64_t expiration = 1000ULL; + PutRandomUntil("key100", expiration, &rnd, &data); + + std::vector metadata; + blob_db_->GetLiveFilesMetaData(&metadata); + + ASSERT_EQ(2U, metadata.size()); + // Path should be relative to db_name, but begin with slash. + const std::string filename1("/blob_dir/000001.blob"); + ASSERT_EQ(filename1, metadata[0].name); + ASSERT_EQ(1, metadata[0].file_number); + ASSERT_EQ(0, metadata[0].oldest_ancester_time); + ASSERT_EQ(kDefaultColumnFamilyName, metadata[0].column_family_name); + + const std::string filename2("/blob_dir/000002.blob"); + ASSERT_EQ(filename2, metadata[1].name); + ASSERT_EQ(2, metadata[1].file_number); + ASSERT_EQ(expiration, metadata[1].oldest_ancester_time); + ASSERT_EQ(kDefaultColumnFamilyName, metadata[1].column_family_name); + + std::vector livefile; + uint64_t mfs; + ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false)); + ASSERT_EQ(5U, livefile.size()); + ASSERT_EQ(filename1, livefile[3]); + ASSERT_EQ(filename2, livefile[4]); + VerifyDB(data); +} + +TEST_F(BlobDBTest, MigrateFromPlainRocksDB) { + constexpr size_t kNumKey = 20; + constexpr size_t kNumIteration = 10; + Random rnd(301); + std::map data; + std::vector is_blob(kNumKey, false); + + // Write to plain rocksdb. + Options options; + options.create_if_missing = true; + DB *db = nullptr; + ASSERT_OK(DB::Open(options, dbname_, &db)); + for (size_t i = 0; i < kNumIteration; i++) { + auto key_index = rnd.Next() % kNumKey; + std::string key = "key" + std::to_string(key_index); + PutRandom(db, key, &rnd, &data); + } + VerifyDB(db, data); + delete db; + db = nullptr; + + // Open as blob db. Verify it can read existing data. + Open(); + VerifyDB(blob_db_, data); + for (size_t i = 0; i < kNumIteration; i++) { + auto key_index = rnd.Next() % kNumKey; + std::string key = "key" + std::to_string(key_index); + is_blob[key_index] = true; + PutRandom(blob_db_, key, &rnd, &data); + } + VerifyDB(blob_db_, data); + delete blob_db_; + blob_db_ = nullptr; + + // Verify plain db return error for keys written by blob db. + ASSERT_OK(DB::Open(options, dbname_, &db)); + std::string value; + for (size_t i = 0; i < kNumKey; i++) { + std::string key = "key" + std::to_string(i); + Status s = db->Get(ReadOptions(), key, &value); + if (data.count(key) == 0) { + ASSERT_TRUE(s.IsNotFound()); + } else if (is_blob[i]) { + ASSERT_TRUE(s.IsCorruption()); + } else { + ASSERT_OK(s); + ASSERT_EQ(data[key], value); + } + } + delete db; +} + +// Test to verify that a NoSpace IOError Status is returned on reaching +// max_db_size limit. +TEST_F(BlobDBTest, OutOfSpace) { + // Use mock env to stop wall clock. + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.max_db_size = 200; + bdb_options.is_fifo = false; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + + // Each stored blob has an overhead of about 42 bytes currently. + // So a small key + a 100 byte blob should take up ~150 bytes in the db. + std::string value(100, 'v'); + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60)); + + // Putting another blob should fail as ading it would exceed the max_db_size + // limit. + Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60); + ASSERT_TRUE(s.IsIOError()); + ASSERT_TRUE(s.IsNoSpace()); +} + +TEST_F(BlobDBTest, FIFOEviction) { + BlobDBOptions bdb_options; + bdb_options.max_db_size = 200; + bdb_options.blob_file_size = 100; + bdb_options.is_fifo = true; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + + std::atomic evict_count{0}; + SyncPoint::GetInstance()->SetCallBack( + "BlobDBImpl::EvictOldestBlobFile:Evicted", + [&](void *) { evict_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Each stored blob has an overhead of 32 bytes currently. + // So a 100 byte blob should take up 132 bytes. + std::string value(100, 'v'); + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10)); + VerifyDB({{"key1", value}}); + + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + + // Adding another 100 bytes blob would take the total size to 264 bytes + // (2*132). max_db_size will be exceeded + // than max_db_size and trigger FIFO eviction. + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60)); + ASSERT_EQ(1, evict_count); + // key1 will exist until corresponding file be deleted. + VerifyDB({{"key1", value}, {"key2", value}}); + + // Adding another 100 bytes blob without TTL. + ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value)); + ASSERT_EQ(2, evict_count); + // key1 and key2 will exist until corresponding file be deleted. + VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}}); + + // The fourth blob file, without TTL. + ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value)); + ASSERT_EQ(3, evict_count); + VerifyDB( + {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}}); + + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(4, blob_files.size()); + ASSERT_TRUE(blob_files[0]->Obsolete()); + ASSERT_TRUE(blob_files[1]->Obsolete()); + ASSERT_TRUE(blob_files[2]->Obsolete()); + ASSERT_FALSE(blob_files[3]->Obsolete()); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(3, obsolete_files.size()); + ASSERT_EQ(blob_files[0], obsolete_files[0]); + ASSERT_EQ(blob_files[1], obsolete_files[1]); + ASSERT_EQ(blob_files[2], obsolete_files[2]); + + blob_db_impl()->TEST_DeleteObsoleteFiles(); + obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_TRUE(obsolete_files.empty()); + VerifyDB({{"key4", value}}); +} + +TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) { + Options options; + BlobDBOptions bdb_options; + bdb_options.max_db_size = 1000; + bdb_options.blob_file_size = 5000; + bdb_options.is_fifo = true; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + + std::atomic evict_count{0}; + SyncPoint::GetInstance()->SetCallBack( + "BlobDBImpl::EvictOldestBlobFile:Evicted", + [&](void *) { evict_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::string value(2000, 'v'); + ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace()); + ASSERT_EQ(0, evict_count); +} + +TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) { + BlobDBOptions bdb_options; + bdb_options.is_fifo = true; + bdb_options.min_blob_size = 100; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + options.env = mock_env_.get(); + options.disable_auto_compactions = true; + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + Open(bdb_options, options); + + ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size()); + std::string small_value(50, 'v'); + std::map data; + // Insert some data into LSM tree to make sure FIFO eviction take SST + // file size into account. + for (int i = 0; i < 1000; i++) { + ASSERT_OK(Put("key" + std::to_string(i), small_value, &data)); + } + ASSERT_OK(blob_db_->Flush(FlushOptions())); + uint64_t live_sst_size = 0; + ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize, + &live_sst_size)); + ASSERT_TRUE(live_sst_size > 0); + ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size()); + + bdb_options.max_db_size = live_sst_size + 2000; + Reopen(bdb_options, options); + ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size()); + + std::string value_1k(1000, 'v'); + ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data)); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB(data); + // large_key2 evicts large_key1 + ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + data.erase("large_key1"); + VerifyDB(data); + // large_key3 get no enough space even after evicting large_key2, so it + // instead return no space error. + std::string value_2k(2000, 'v'); + ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace()); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + // Verify large_key2 still exists. + VerifyDB(data); +} + +// Test flush or compaction will trigger FIFO eviction since they update +// total SST file size. +TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) { + BlobDBOptions bdb_options; + bdb_options.max_db_size = 1000; + bdb_options.is_fifo = true; + bdb_options.min_blob_size = 100; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + options.env = mock_env_.get(); + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + options.compression = kNoCompression; + Open(bdb_options, options); + + std::string value(800, 'v'); + ASSERT_OK(PutWithTTL("large_key", value, 60)); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB({{"large_key", value}}); + + // Insert some small keys and flush to bring DB out of space. + std::map data; + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put("key" + std::to_string(i), "v", &data)); + } + ASSERT_OK(blob_db_->Flush(FlushOptions())); + + // Verify large_key is deleted by FIFO eviction. + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB(data); +} + +TEST_F(BlobDBTest, InlineSmallValues) { + constexpr uint64_t kMaxExpiration = 1000; + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = kMaxExpiration; + bdb_options.min_blob_size = 100; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + mock_clock_->SetCurrentTime(0); + Open(bdb_options, options); + std::map data; + std::map versions; + for (size_t i = 0; i < 1000; i++) { + bool is_small_value = rnd.Next() % 2; + bool has_ttl = rnd.Next() % 2; + uint64_t expiration = rnd.Next() % kMaxExpiration; + int len = is_small_value ? 50 : 200; + std::string key = "key" + std::to_string(i); + std::string value = rnd.HumanReadableString(len); + std::string blob_index; + data[key] = value; + SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + if (!has_ttl) { + ASSERT_OK(blob_db_->Put(WriteOptions(), key, value)); + } else { + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration)); + } + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + versions[key] = + KeyVersion(key, value, sequence, + (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex); + } + VerifyDB(data); + VerifyBaseDB(versions); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + std::shared_ptr non_ttl_file; + std::shared_ptr ttl_file; + if (blob_files[0]->HasTTL()) { + ttl_file = blob_files[0]; + non_ttl_file = blob_files[1]; + } else { + non_ttl_file = blob_files[0]; + ttl_file = blob_files[1]; + } + ASSERT_FALSE(non_ttl_file->HasTTL()); + ASSERT_TRUE(ttl_file->HasTTL()); +} + +TEST_F(BlobDBTest, UserCompactionFilter) { + class CustomerFilter : public CompactionFilter { + public: + bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value, + std::string *new_value, bool *value_changed) const override { + *value_changed = false; + // changing value size to test value transitions between inlined data + // and stored-in-blob data + if (value.size() % 4 == 1) { + *new_value = value.ToString(); + // double size by duplicating value + *new_value += *new_value; + *value_changed = true; + return false; + } else if (value.size() % 3 == 1) { + *new_value = value.ToString(); + // trancate value size by half + *new_value = new_value->substr(0, new_value->size() / 2); + *value_changed = true; + return false; + } else if (value.size() % 2 == 1) { + return true; + } + return false; + } + bool IgnoreSnapshots() const override { return true; } + const char *Name() const override { return "CustomerFilter"; } + }; + class CustomerFilterFactory : public CompactionFilterFactory { + const char *Name() const override { return "CustomerFilterFactory"; } + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context & /*context*/) override { + return std::unique_ptr(new CustomerFilter()); + } + }; + + constexpr size_t kNumPuts = 1 << 10; + // Generate both inlined and blob value + constexpr uint64_t kMinValueSize = 1 << 6; + constexpr uint64_t kMaxValueSize = 1 << 8; + constexpr uint64_t kMinBlobSize = 1 << 7; + static_assert(kMinValueSize < kMinBlobSize, ""); + static_assert(kMaxValueSize > kMinBlobSize, ""); + + BlobDBOptions bdb_options; + bdb_options.min_blob_size = kMinBlobSize; + bdb_options.blob_file_size = kMaxValueSize * 10; + bdb_options.disable_background_tasks = true; + if (Snappy_Supported()) { + bdb_options.compression = CompressionType::kSnappyCompression; + } + // case_num == 0: Test user defined compaction filter + // case_num == 1: Test user defined compaction filter factory + for (int case_num = 0; case_num < 2; case_num++) { + Options options; + if (case_num == 0) { + options.compaction_filter = new CustomerFilter(); + } else { + options.compaction_filter_factory.reset(new CustomerFilterFactory()); + } + options.disable_auto_compactions = true; + options.env = mock_env_.get(); + options.statistics = CreateDBStatistics(); + Open(bdb_options, options); + + std::map data; + std::map data_after_compact; + Random rnd(301); + uint64_t value_size = kMinValueSize; + int drop_record = 0; + for (size_t i = 0; i < kNumPuts; ++i) { + std::ostringstream oss; + oss << "key" << std::setw(4) << std::setfill('0') << i; + + const std::string key(oss.str()); + const std::string value = rnd.HumanReadableString((int)value_size); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(Put(key, value)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + if (value.length() % 4 == 1) { + data_after_compact[key] = value + value; + } else if (value.length() % 3 == 1) { + data_after_compact[key] = value.substr(0, value.size() / 2); + } else if (value.length() % 2 == 1) { + ++drop_record; + } else { + data_after_compact[key] = value; + } + + if (++value_size > kMaxValueSize) { + value_size = kMinValueSize; + } + } + // Verify full data set + VerifyDB(data); + // Applying compaction filter for records + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // Verify data after compaction, only value with even length left. + VerifyDB(data_after_compact); + ASSERT_EQ(drop_record, + options.statistics->getTickerCount(COMPACTION_KEY_DROP_USER)); + delete options.compaction_filter; + Destroy(); + } +} + +// Test user comapction filter when there is IO error on blob data. +TEST_F(BlobDBTest, UserCompactionFilter_BlobIOError) { + class CustomerFilter : public CompactionFilter { + public: + bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value, + std::string *new_value, bool *value_changed) const override { + *new_value = value.ToString() + "_new"; + *value_changed = true; + return false; + } + bool IgnoreSnapshots() const override { return true; } + const char *Name() const override { return "CustomerFilter"; } + }; + + constexpr size_t kNumPuts = 100; + constexpr int kValueSize = 100; + + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.blob_file_size = kValueSize * 10; + bdb_options.disable_background_tasks = true; + bdb_options.compression = CompressionType::kNoCompression; + + std::vector io_failure_cases = { + "BlobDBImpl::CreateBlobFileAndWriter", + "BlobIndexCompactionFilterBase::WriteBlobToNewFile", + "BlobDBImpl::CloseBlobFile"}; + + for (size_t case_num = 0; case_num < io_failure_cases.size(); case_num++) { + Options options; + options.compaction_filter = new CustomerFilter(); + options.disable_auto_compactions = true; + options.env = fault_injection_env_.get(); + options.statistics = CreateDBStatistics(); + Open(bdb_options, options); + + std::map data; + Random rnd(301); + for (size_t i = 0; i < kNumPuts; ++i) { + std::ostringstream oss; + oss << "key" << std::setw(4) << std::setfill('0') << i; + + const std::string key(oss.str()); + const std::string value = rnd.HumanReadableString(kValueSize); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(Put(key, value)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + data[key] = value; + } + + // Verify full data set + VerifyDB(data); + + SyncPoint::GetInstance()->SetCallBack( + io_failure_cases[case_num], [&](void * /*arg*/) { + fault_injection_env_->SetFilesystemActive(false, Status::IOError()); + }); + SyncPoint::GetInstance()->EnableProcessing(); + auto s = blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + ASSERT_TRUE(s.IsIOError()); + + // Reactivate file system to allow test to verify and close DB. + fault_injection_env_->SetFilesystemActive(true); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Verify full data set after compaction failure + VerifyDB(data); + + delete options.compaction_filter; + Destroy(); + } +} + +// Test comapction filter should remove any expired blob index. +TEST_F(BlobDBTest, FilterExpiredBlobIndex) { + constexpr size_t kNumKeys = 100; + constexpr size_t kNumPuts = 1000; + constexpr uint64_t kMaxExpiration = 1000; + constexpr uint64_t kCompactTime = 500; + constexpr uint64_t kMinBlobSize = 100; + Random rnd(301); + mock_clock_->SetCurrentTime(0); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = kMinBlobSize; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + Open(bdb_options, options); + + std::map data; + std::map data_after_compact; + for (size_t i = 0; i < kNumPuts; i++) { + bool is_small_value = rnd.Next() % 2; + bool has_ttl = rnd.Next() % 2; + uint64_t expiration = rnd.Next() % kMaxExpiration; + int len = is_small_value ? 10 : 200; + std::string key = "key" + std::to_string(rnd.Next() % kNumKeys); + std::string value = rnd.HumanReadableString(len); + if (!has_ttl) { + if (is_small_value) { + std::string blob_entry; + BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value); + // Fake blob index with TTL. See what it will do. + ASSERT_GT(kMinBlobSize, blob_entry.size()); + value = blob_entry; + } + ASSERT_OK(Put(key, value)); + data_after_compact[key] = value; + } else { + ASSERT_OK(PutUntil(key, value, expiration)); + if (expiration <= kCompactTime) { + data_after_compact.erase(key); + } else { + data_after_compact[key] = value; + } + } + data[key] = value; + } + VerifyDB(data); + + mock_clock_->SetCurrentTime(kCompactTime); + // Take a snapshot before compaction. Make sure expired blob indexes is + // filtered regardless of snapshot. + const Snapshot *snapshot = blob_db_->GetSnapshot(); + // Issue manual compaction to trigger compaction filter. + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + blob_db_->ReleaseSnapshot(snapshot); + // Verify expired blob index are filtered. + std::vector versions; + const size_t kMaxKeys = 10000; + ASSERT_OK(GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions)); + ASSERT_EQ(data_after_compact.size(), versions.size()); + for (auto &version : versions) { + ASSERT_TRUE(data_after_compact.count(version.user_key) > 0); + } + VerifyDB(data_after_compact); +} + +// Test compaction filter should remove any blob index where corresponding +// blob file has been removed. +TEST_F(BlobDBTest, FilterFileNotAvailable) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + options.disable_auto_compactions = true; + Open(bdb_options, options); + + ASSERT_OK(Put("foo", "v1")); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_EQ(1, blob_files[0]->BlobFileNumber()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + + ASSERT_OK(Put("bar", "v2")); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(2, blob_files[1]->BlobFileNumber()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1])); + + const size_t kMaxKeys = 10000; + + DB *base_db = blob_db_->GetRootDB(); + std::vector versions; + ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(2, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + ASSERT_EQ("foo", versions[1].user_key); + VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); + + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(2, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + ASSERT_EQ("foo", versions[1].user_key); + VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); + + // Remove the first blob file and compact. foo should be remove from base db. + blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(1, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + VerifyDB({{"bar", "v2"}}); + + // Remove the second blob file and compact. bar should be remove from base db. + blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(0, versions.size()); + VerifyDB({}); +} + +// Test compaction filter should filter any inlined TTL keys that would have +// been dropped by last FIFO eviction if they are store out-of-line. +TEST_F(BlobDBTest, FilterForFIFOEviction) { + Random rnd(215); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 100; + bdb_options.ttl_range_secs = 60; + bdb_options.max_db_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + mock_clock_->SetCurrentTime(0); + options.env = mock_env_.get(); + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + options.disable_auto_compactions = true; + Open(bdb_options, options); + + std::map data; + std::map data_after_compact; + // Insert some small values that will be inlined. + for (int i = 0; i < 1000; i++) { + std::string key = "key" + std::to_string(i); + std::string value = rnd.HumanReadableString(50); + uint64_t ttl = rnd.Next() % 120 + 1; + ASSERT_OK(PutWithTTL(key, value, ttl, &data)); + if (ttl >= 60) { + data_after_compact[key] = value; + } + } + uint64_t num_keys_to_evict = data.size() - data_after_compact.size(); + ASSERT_OK(blob_db_->Flush(FlushOptions())); + uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size(); + ASSERT_GT(live_sst_size, 0); + VerifyDB(data); + + bdb_options.max_db_size = live_sst_size + 30000; + bdb_options.is_fifo = true; + Reopen(bdb_options, options); + VerifyDB(data); + + // Put two large values, each on a different blob file. + std::string large_value(10000, 'v'); + ASSERT_OK(PutWithTTL("large_key1", large_value, 90)); + ASSERT_OK(PutWithTTL("large_key2", large_value, 150)); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + data["large_key1"] = large_value; + data["large_key2"] = large_value; + VerifyDB(data); + + // Put a third large value which will bring the DB out of space. + // FIFO eviction will evict the file of large_key1. + ASSERT_OK(PutWithTTL("large_key3", large_value, 150)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + data.erase("large_key1"); + data["large_key3"] = large_value; + VerifyDB(data); + + // Putting some more small values. These values shouldn't be evicted by + // compaction filter since they are inserted after FIFO eviction. + ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact)); + ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact)); + + // FIFO eviction doesn't trigger again since there enough room for the flush. + ASSERT_OK(blob_db_->Flush(FlushOptions())); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + + // Manual compact and check if compaction filter evict those keys with + // expiration < 60. + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // All keys with expiration < 60, plus large_key1 is filtered by + // compaction filter. + ASSERT_EQ(num_keys_to_evict + 1, + statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + data_after_compact["large_key2"] = large_value; + data_after_compact["large_key3"] = large_value; + VerifyDB(data_after_compact); +} + +TEST_F(BlobDBTest, GarbageCollection) { + constexpr size_t kNumPuts = 1 << 10; + + constexpr uint64_t kExpiration = 1000; + constexpr uint64_t kCompactTime = 500; + + constexpr uint64_t kKeySize = 7; // "key" + 4 digits + + constexpr uint64_t kSmallValueSize = 1 << 6; + constexpr uint64_t kLargeValueSize = 1 << 8; + constexpr uint64_t kMinBlobSize = 1 << 7; + static_assert(kSmallValueSize < kMinBlobSize, ""); + static_assert(kLargeValueSize > kMinBlobSize, ""); + + constexpr size_t kBlobsPerFile = 8; + constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile; + constexpr uint64_t kBlobFileSize = + BlobLogHeader::kSize + + (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile; + + BlobDBOptions bdb_options; + bdb_options.min_blob_size = kMinBlobSize; + bdb_options.blob_file_size = kBlobFileSize; + bdb_options.enable_garbage_collection = true; + bdb_options.garbage_collection_cutoff = 0.25; + bdb_options.disable_background_tasks = true; + + Options options; + options.env = mock_env_.get(); + options.statistics = CreateDBStatistics(); + + Open(bdb_options, options); + + std::map data; + std::map blob_value_versions; + std::map blob_index_versions; + + Random rnd(301); + + // Add a bunch of large non-TTL values. These will be written to non-TTL + // blob files and will be subject to GC. + for (size_t i = 0; i < kNumPuts; ++i) { + std::ostringstream oss; + oss << "key" << std::setw(4) << std::setfill('0') << i; + + const std::string key(oss.str()); + const std::string value = rnd.HumanReadableString(kLargeValueSize); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(Put(key, value)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex); + blob_index_versions[key] = + BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration, + sequence, kTypeBlobIndex); + } + + // Add some small and/or TTL values that will be ignored during GC. + // First, add a large TTL value will be written to its own TTL blob file. + { + const std::string key("key2000"); + const std::string value = rnd.HumanReadableString(kLargeValueSize); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(PutUntil(key, value, kExpiration)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex); + blob_index_versions[key] = + BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration, + sequence, kTypeBlobIndex); + } + + // Now add a small TTL value (which will be inlined). + { + const std::string key("key3000"); + const std::string value = rnd.HumanReadableString(kSmallValueSize); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(PutUntil(key, value, kExpiration)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex); + blob_index_versions[key] = BlobIndexVersion( + key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex); + } + + // Finally, add a small non-TTL value (which will be stored as a regular + // value). + { + const std::string key("key4000"); + const std::string value = rnd.HumanReadableString(kSmallValueSize); + const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + + ASSERT_OK(Put(key, value)); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + + data[key] = value; + blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue); + blob_index_versions[key] = BlobIndexVersion( + key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue); + } + + VerifyDB(data); + VerifyBaseDB(blob_value_versions); + VerifyBaseDBBlobIndex(blob_index_versions); + + // At this point, we should have 128 immutable non-TTL files with file numbers + // 1..128. + { + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), kNumBlobFiles); + for (size_t i = 0; i < kNumBlobFiles; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + ASSERT_EQ(live_imm_files[i]->GetFileSize(), + kBlobFileSize + BlobLogFooter::kSize); + } + } + + mock_clock_->SetCurrentTime(kCompactTime); + + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // We expect the data to remain the same and the blobs from the oldest N files + // to be moved to new files. Sequence numbers get zeroed out during the + // compaction. + VerifyDB(data); + + for (auto &pair : blob_value_versions) { + KeyVersion &version = pair.second; + version.sequence = 0; + } + + VerifyBaseDB(blob_value_versions); + + const uint64_t cutoff = static_cast( + bdb_options.garbage_collection_cutoff * kNumBlobFiles); + for (auto &pair : blob_index_versions) { + BlobIndexVersion &version = pair.second; + + version.sequence = 0; + + if (version.file_number == kInvalidBlobFileNumber) { + continue; + } + + if (version.file_number > cutoff) { + continue; + } + + version.file_number += kNumBlobFiles + 1; + } + + VerifyBaseDBBlobIndex(blob_index_versions); + + const Statistics *const statistics = options.statistics.get(); + assert(statistics); + + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), + cutoff * kBlobsPerFile); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), + cutoff * kBlobsPerFile * kLargeValueSize); + + // At this point, we should have 128 immutable non-TTL files with file numbers + // 33..128 and 130..161. (129 was taken by the TTL blob file.) + { + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), kNumBlobFiles); + for (size_t i = 0; i < kNumBlobFiles; ++i) { + uint64_t expected_file_number = i + cutoff + 1; + if (expected_file_number > kNumBlobFiles) { + ++expected_file_number; + } + + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number); + ASSERT_EQ(live_imm_files[i]->GetFileSize(), + kBlobFileSize + BlobLogFooter::kSize); + } + } +} + +TEST_F(BlobDBTest, GarbageCollectionFailure) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.enable_garbage_collection = true; + bdb_options.garbage_collection_cutoff = 1.0; + bdb_options.disable_background_tasks = true; + + Options db_options; + db_options.statistics = CreateDBStatistics(); + + Open(bdb_options, db_options); + + // Write a couple of valid blobs. + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Put("dead", "beef")); + + // Write a fake blob reference into the base DB that points to a non-existing + // blob file. + std::string blob_index; + BlobIndex::EncodeBlob(&blob_index, /* file_number */ 1000, /* offset */ 1234, + /* size */ 5678, kNoCompression); + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex( + &batch, blob_db_->DefaultColumnFamily()->GetID(), "key", blob_index)); + ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch)); + + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + auto blob_file = blob_files[0]; + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file)); + + ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr) + .IsIOError()); + + const Statistics *const statistics = db_options.statistics.get(); + assert(statistics); + + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7); +} + +// File should be evicted after expiration. +TEST_F(BlobDBTest, EvictExpiredFile) { + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 100; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + Open(bdb_options, options); + mock_clock_->SetCurrentTime(50); + std::map data; + ASSERT_OK(PutWithTTL("foo", "bar", 100, &data)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + auto blob_file = blob_files[0]; + ASSERT_FALSE(blob_file->Immutable()); + ASSERT_FALSE(blob_file->Obsolete()); + VerifyDB(data); + mock_clock_->SetCurrentTime(250); + // The key should expired now. + blob_db_impl()->TEST_EvictExpiredFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + ASSERT_TRUE(blob_file->Immutable()); + ASSERT_TRUE(blob_file->Obsolete()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); + // Make sure we don't return garbage value after blob file being evicted, + // but the blob index still exists in the LSM tree. + std::string val = ""; + ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound()); + ASSERT_EQ("", val); +} + +TEST_F(BlobDBTest, DisableFileDeletions) { + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map data; + for (bool force : {true, false}) { + ASSERT_OK(Put("foo", "v", &data)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + auto blob_file = blob_files[0]; + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file)); + blob_db_impl()->TEST_ObsoleteBlobFile(blob_file); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + // Call DisableFileDeletions twice. + ASSERT_OK(blob_db_->DisableFileDeletions()); + ASSERT_OK(blob_db_->DisableFileDeletions()); + // File deletions should be disabled. + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + VerifyDB(data); + // Enable file deletions once. If force=true, file deletion is enabled. + // Otherwise it needs to enable it for a second time. + ASSERT_OK(blob_db_->EnableFileDeletions(force)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + if (!force) { + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + VerifyDB(data); + // Call EnableFileDeletions a second time. + ASSERT_OK(blob_db_->EnableFileDeletions(false)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + } + // Regardless of value of `force`, file should be deleted by now. + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); + VerifyDB({}); + } +} + +TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) { + BlobDBOptions bdb_options; + bdb_options.enable_garbage_collection = true; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + + // Register some dummy blob files. + blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200); + blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300); + blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400); + blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500); + blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600); + + // Initialize the blob <-> SST file mapping. First, add some SST files with + // blob file references, then some without. + std::vector live_files; + + for (uint64_t i = 1; i <= 10; ++i) { + LiveFileMetaData live_file; + live_file.file_number = i; + live_file.oldest_blob_file_number = ((i - 1) % 5) + 1; + + live_files.emplace_back(live_file); + } + + for (uint64_t i = 11; i <= 20; ++i) { + LiveFileMetaData live_file; + live_file.file_number = i; + + live_files.emplace_back(live_file); + } + + blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files); + + // Check that the blob <-> SST mappings have been correctly initialized. + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + + ASSERT_EQ(blob_files.size(), 5); + + { + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 5); + for (size_t i = 0; i < 5; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + } + + ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty()); + } + + { + const std::vector> expected_sst_files{ + {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}}; + const std::vector expected_obsolete{false, false, false, false, + false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 5); + for (size_t i = 0; i < 5; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + } + + ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty()); + } + + // Simulate a flush where the SST does not reference any blob files. + { + FlushJobInfo info{}; + info.file_number = 21; + info.smallest_seqno = 1; + info.largest_seqno = 100; + + blob_db_impl()->TEST_ProcessFlushJobInfo(info); + + const std::vector> expected_sst_files{ + {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}}; + const std::vector expected_obsolete{false, false, false, false, + false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 5); + for (size_t i = 0; i < 5; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + } + + ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty()); + } + + // Simulate a flush where the SST references a blob file. + { + FlushJobInfo info{}; + info.file_number = 22; + info.oldest_blob_file_number = 5; + info.smallest_seqno = 101; + info.largest_seqno = 200; + + blob_db_impl()->TEST_ProcessFlushJobInfo(info); + + const std::vector> expected_sst_files{ + {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}}; + const std::vector expected_obsolete{false, false, false, false, + false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 5); + for (size_t i = 0; i < 5; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1); + } + + ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty()); + } + + // Simulate a compaction. Some inputs and outputs have blob file references, + // some don't. There is also a trivial move (which means the SST appears on + // both the input and the output list). Blob file 1 loses all its linked SSTs, + // and since it got marked immutable at sequence number 200 which has already + // been flushed, it can be marked obsolete. + { + CompactionJobInfo info{}; + info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1}); + info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2}); + info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1}); + info.input_file_infos.emplace_back( + CompactionFileInfo{1, 11, kInvalidBlobFileNumber}); + info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5}); + info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5}); + info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3}); + info.output_file_infos.emplace_back( + CompactionFileInfo{2, 24, kInvalidBlobFileNumber}); + + blob_db_impl()->TEST_ProcessCompactionJobInfo(info); + + const std::vector> expected_sst_files{ + {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}}; + const std::vector expected_obsolete{true, false, false, false, false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 4); + for (size_t i = 0; i < 4; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2); + } + + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(obsolete_files.size(), 1); + ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1); + } + + // Simulate a failed compaction. No mappings should be updated. + { + CompactionJobInfo info{}; + info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2}); + info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5}); + info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3}); + info.status = Status::Corruption(); + + blob_db_impl()->TEST_ProcessCompactionJobInfo(info); + + const std::vector> expected_sst_files{ + {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}}; + const std::vector expected_obsolete{true, false, false, false, false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 4); + for (size_t i = 0; i < 4; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2); + } + + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(obsolete_files.size(), 1); + ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1); + } + + // Simulate another compaction. Blob file 2 loses all its linked SSTs + // but since it got marked immutable at sequence number 300 which hasn't + // been flushed yet, it cannot be marked obsolete at this point. + { + CompactionJobInfo info{}; + info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2}); + info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5}); + info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3}); + + blob_db_impl()->TEST_ProcessCompactionJobInfo(info); + + const std::vector> expected_sst_files{ + {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}}; + const std::vector expected_obsolete{true, false, false, false, false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 4); + for (size_t i = 0; i < 4; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2); + } + + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(obsolete_files.size(), 1); + ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1); + } + + // Simulate a flush with largest sequence number 300. This will make it + // possible to mark blob file 2 obsolete. + { + FlushJobInfo info{}; + info.file_number = 26; + info.smallest_seqno = 201; + info.largest_seqno = 300; + + blob_db_impl()->TEST_ProcessFlushJobInfo(info); + + const std::vector> expected_sst_files{ + {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}}; + const std::vector expected_obsolete{true, true, false, false, false}; + for (size_t i = 0; i < 5; ++i) { + const auto &blob_file = blob_files[i]; + ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]); + ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]); + } + + auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles(); + ASSERT_EQ(live_imm_files.size(), 3); + for (size_t i = 0; i < 3; ++i) { + ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3); + } + + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(obsolete_files.size(), 2); + ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1); + ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2); + } +} + +TEST_F(BlobDBTest, ShutdownWait) { + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 100; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = false; + Options options; + options.env = mock_env_.get(); + + SyncPoint::GetInstance()->LoadDependency({ + {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"}, + {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"}, + {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"}, + {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"}, + }); + // Force all tasks to be scheduled immediately. + SyncPoint::GetInstance()->SetCallBack( + "TimeQueue::Add:item.end", [&](void *arg) { + std::chrono::steady_clock::time_point *tp = + static_cast(arg); + *tp = + std::chrono::steady_clock::now() - std::chrono::milliseconds(10000); + }); + + SyncPoint::GetInstance()->SetCallBack( + "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) { + // Sleep 3 ms to increase the chance of data race. + // We've synced up the code so that EvictExpiredFiles() + // is called concurrently with ~BlobDBImpl(). + // ~BlobDBImpl() is supposed to wait for all background + // task to shutdown before doing anything else. In order + // to use the same test to reproduce a bug of the waiting + // logic, we wait a little bit here, so that TSAN can + // catch the data race. + // We should improve the test if we find a better way. + Env::Default()->SleepForMicroseconds(3000); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + Open(bdb_options, options); + mock_clock_->SetCurrentTime(50); + std::map data; + ASSERT_OK(PutWithTTL("foo", "bar", 100, &data)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + auto blob_file = blob_files[0]; + ASSERT_FALSE(blob_file->Immutable()); + ASSERT_FALSE(blob_file->Obsolete()); + VerifyDB(data); + + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0"); + mock_clock_->SetCurrentTime(250); + // The key should expired now. + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1"); + + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2"); + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3"); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(BlobDBTest, SyncBlobFileBeforeClose) { + Options options; + options.statistics = CreateDBStatistics(); + + BlobDBOptions blob_options; + blob_options.min_blob_size = 0; + blob_options.bytes_per_sync = 1 << 20; + blob_options.disable_background_tasks = true; + + Open(blob_options, options); + + ASSERT_OK(Put("foo", "bar")); + + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_BLOB_FILE_SYNCED), 1); +} + +TEST_F(BlobDBTest, SyncBlobFileBeforeCloseIOError) { + Options options; + options.env = fault_injection_env_.get(); + + BlobDBOptions blob_options; + blob_options.min_blob_size = 0; + blob_options.bytes_per_sync = 1 << 20; + blob_options.disable_background_tasks = true; + + Open(blob_options, options); + + ASSERT_OK(Put("foo", "bar")); + + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + SyncPoint::GetInstance()->SetCallBack( + "BlobLogWriter::Sync", [this](void * /* arg */) { + fault_injection_env_->SetFilesystemActive(false, Status::IOError()); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + const Status s = blob_db_impl()->TEST_CloseBlobFile(blob_files[0]); + + fault_injection_env_->SetFilesystemActive(true); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_TRUE(s.IsIOError()); +} + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE + +// A black-box test for the ttl wrapper around rocksdb +int main(int argc, char **argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_dump_tool.cc b/src/rocksdb/utilities/blob_db/blob_dump_tool.cc new file mode 100644 index 000000000..1e0632990 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_dump_tool.cc @@ -0,0 +1,282 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_dump_tool.h" + +#include + +#include +#include +#include +#include + +#include "file/random_access_file_reader.h" +#include "file/readahead_raf.h" +#include "port/port.h" +#include "rocksdb/convenience.h" +#include "rocksdb/file_system.h" +#include "table/format.h" +#include "util/coding.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +BlobDumpTool::BlobDumpTool() + : reader_(nullptr), buffer_(nullptr), buffer_size_(0) {} + +Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key, + DisplayType show_blob, + DisplayType show_uncompressed_blob, + bool show_summary) { + constexpr size_t kReadaheadSize = 2 * 1024 * 1024; + Status s; + const auto fs = FileSystem::Default(); + IOOptions io_opts; + s = fs->FileExists(filename, io_opts, nullptr); + if (!s.ok()) { + return s; + } + uint64_t file_size = 0; + s = fs->GetFileSize(filename, io_opts, &file_size, nullptr); + if (!s.ok()) { + return s; + } + std::unique_ptr file; + s = fs->NewRandomAccessFile(filename, FileOptions(), &file, nullptr); + if (!s.ok()) { + return s; + } + file = NewReadaheadRandomAccessFile(std::move(file), kReadaheadSize); + if (file_size == 0) { + return Status::Corruption("File is empty."); + } + reader_.reset(new RandomAccessFileReader(std::move(file), filename)); + uint64_t offset = 0; + uint64_t footer_offset = 0; + CompressionType compression = kNoCompression; + s = DumpBlobLogHeader(&offset, &compression); + if (!s.ok()) { + return s; + } + s = DumpBlobLogFooter(file_size, &footer_offset); + if (!s.ok()) { + return s; + } + uint64_t total_records = 0; + uint64_t total_key_size = 0; + uint64_t total_blob_size = 0; + uint64_t total_uncompressed_blob_size = 0; + if (show_key != DisplayType::kNone || show_summary) { + while (offset < footer_offset) { + s = DumpRecord(show_key, show_blob, show_uncompressed_blob, show_summary, + compression, &offset, &total_records, &total_key_size, + &total_blob_size, &total_uncompressed_blob_size); + if (!s.ok()) { + break; + } + } + } + if (show_summary) { + fprintf(stdout, "Summary:\n"); + fprintf(stdout, " total records: %" PRIu64 "\n", total_records); + fprintf(stdout, " total key size: %" PRIu64 "\n", total_key_size); + fprintf(stdout, " total blob size: %" PRIu64 "\n", total_blob_size); + if (compression != kNoCompression) { + fprintf(stdout, " total raw blob size: %" PRIu64 "\n", + total_uncompressed_blob_size); + } + } + return s; +} + +Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) { + if (buffer_size_ < size) { + if (buffer_size_ == 0) { + buffer_size_ = 4096; + } + while (buffer_size_ < size) { + buffer_size_ *= 2; + } + buffer_.reset(new char[buffer_size_]); + } + Status s = reader_->Read(IOOptions(), offset, size, result, buffer_.get(), + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); + if (!s.ok()) { + return s; + } + if (result->size() != size) { + return Status::Corruption("Reach the end of the file unexpectedly."); + } + return s; +} + +Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset, + CompressionType* compression) { + Slice slice; + Status s = Read(0, BlobLogHeader::kSize, &slice); + if (!s.ok()) { + return s; + } + BlobLogHeader header; + s = header.DecodeFrom(slice); + if (!s.ok()) { + return s; + } + fprintf(stdout, "Blob log header:\n"); + fprintf(stdout, " Version : %" PRIu32 "\n", header.version); + fprintf(stdout, " Column Family ID : %" PRIu32 "\n", + header.column_family_id); + std::string compression_str; + if (!GetStringFromCompressionType(&compression_str, header.compression) + .ok()) { + compression_str = "Unrecongnized compression type (" + + std::to_string((int)header.compression) + ")"; + } + fprintf(stdout, " Compression : %s\n", compression_str.c_str()); + fprintf(stdout, " Expiration range : %s\n", + GetString(header.expiration_range).c_str()); + *offset = BlobLogHeader::kSize; + *compression = header.compression; + return s; +} + +Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size, + uint64_t* footer_offset) { + auto no_footer = [&]() { + *footer_offset = file_size; + fprintf(stdout, "No blob log footer.\n"); + return Status::OK(); + }; + if (file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) { + return no_footer(); + } + Slice slice; + *footer_offset = file_size - BlobLogFooter::kSize; + Status s = Read(*footer_offset, BlobLogFooter::kSize, &slice); + if (!s.ok()) { + return s; + } + BlobLogFooter footer; + s = footer.DecodeFrom(slice); + if (!s.ok()) { + return no_footer(); + } + fprintf(stdout, "Blob log footer:\n"); + fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count); + fprintf(stdout, " Expiration Range : %s\n", + GetString(footer.expiration_range).c_str()); + return s; +} + +Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob, + DisplayType show_uncompressed_blob, + bool show_summary, CompressionType compression, + uint64_t* offset, uint64_t* total_records, + uint64_t* total_key_size, + uint64_t* total_blob_size, + uint64_t* total_uncompressed_blob_size) { + if (show_key != DisplayType::kNone) { + fprintf(stdout, "Read record with offset 0x%" PRIx64 " (%" PRIu64 "):\n", + *offset, *offset); + } + Slice slice; + Status s = Read(*offset, BlobLogRecord::kHeaderSize, &slice); + if (!s.ok()) { + return s; + } + BlobLogRecord record; + s = record.DecodeHeaderFrom(slice); + if (!s.ok()) { + return s; + } + uint64_t key_size = record.key_size; + uint64_t value_size = record.value_size; + if (show_key != DisplayType::kNone) { + fprintf(stdout, " key size : %" PRIu64 "\n", key_size); + fprintf(stdout, " value size : %" PRIu64 "\n", value_size); + fprintf(stdout, " expiration : %" PRIu64 "\n", record.expiration); + } + *offset += BlobLogRecord::kHeaderSize; + s = Read(*offset, static_cast(key_size + value_size), &slice); + if (!s.ok()) { + return s; + } + // Decompress value + std::string uncompressed_value; + if (compression != kNoCompression && + (show_uncompressed_blob != DisplayType::kNone || show_summary)) { + BlockContents contents; + UncompressionContext context(compression); + UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), + compression); + s = UncompressBlockData( + info, slice.data() + key_size, static_cast(value_size), + &contents, 2 /*compress_format_version*/, ImmutableOptions(Options())); + if (!s.ok()) { + return s; + } + uncompressed_value = contents.data.ToString(); + } + if (show_key != DisplayType::kNone) { + fprintf(stdout, " key : "); + DumpSlice(Slice(slice.data(), static_cast(key_size)), show_key); + if (show_blob != DisplayType::kNone) { + fprintf(stdout, " blob : "); + DumpSlice(Slice(slice.data() + static_cast(key_size), + static_cast(value_size)), + show_blob); + } + if (show_uncompressed_blob != DisplayType::kNone) { + fprintf(stdout, " raw blob : "); + DumpSlice(Slice(uncompressed_value), show_uncompressed_blob); + } + } + *offset += key_size + value_size; + *total_records += 1; + *total_key_size += key_size; + *total_blob_size += value_size; + *total_uncompressed_blob_size += uncompressed_value.size(); + return s; +} + +void BlobDumpTool::DumpSlice(const Slice s, DisplayType type) { + if (type == DisplayType::kRaw) { + fprintf(stdout, "%s\n", s.ToString().c_str()); + } else if (type == DisplayType::kHex) { + fprintf(stdout, "%s\n", s.ToString(true /*hex*/).c_str()); + } else if (type == DisplayType::kDetail) { + char buf[100]; + for (size_t i = 0; i < s.size(); i += 16) { + memset(buf, 0, sizeof(buf)); + for (size_t j = 0; j < 16 && i + j < s.size(); j++) { + unsigned char c = s[i + j]; + snprintf(buf + j * 3 + 15, 2, "%x", c >> 4); + snprintf(buf + j * 3 + 16, 2, "%x", c & 0xf); + snprintf(buf + j + 65, 2, "%c", (0x20 <= c && c <= 0x7e) ? c : '.'); + } + for (size_t p = 0; p + 1 < sizeof(buf); p++) { + if (buf[p] == 0) { + buf[p] = ' '; + } + } + fprintf(stdout, "%s\n", i == 0 ? buf + 15 : buf); + } + } +} + +template +std::string BlobDumpTool::GetString(std::pair p) { + if (p.first == 0 && p.second == 0) { + return "nil"; + } + return "(" + std::to_string(p.first) + ", " + std::to_string(p.second) + ")"; +} + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_dump_tool.h b/src/rocksdb/utilities/blob_db/blob_dump_tool.h new file mode 100644 index 000000000..bece564e1 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_dump_tool.h @@ -0,0 +1,58 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "db/blob/blob_log_format.h" +#include "file/random_access_file_reader.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +class BlobDumpTool { + public: + enum class DisplayType { + kNone, + kRaw, + kHex, + kDetail, + }; + + BlobDumpTool(); + + Status Run(const std::string& filename, DisplayType show_key, + DisplayType show_blob, DisplayType show_uncompressed_blob, + bool show_summary); + + private: + std::unique_ptr reader_; + std::unique_ptr buffer_; + size_t buffer_size_; + + Status Read(uint64_t offset, size_t size, Slice* result); + Status DumpBlobLogHeader(uint64_t* offset, CompressionType* compression); + Status DumpBlobLogFooter(uint64_t file_size, uint64_t* footer_offset); + Status DumpRecord(DisplayType show_key, DisplayType show_blob, + DisplayType show_uncompressed_blob, bool show_summary, + CompressionType compression, uint64_t* offset, + uint64_t* total_records, uint64_t* total_key_size, + uint64_t* total_blob_size, + uint64_t* total_uncompressed_blob_size); + void DumpSlice(const Slice s, DisplayType type); + + template + std::string GetString(std::pair p); +}; + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_file.cc b/src/rocksdb/utilities/blob_db/blob_file.cc new file mode 100644 index 000000000..c68e557c6 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_file.cc @@ -0,0 +1,318 @@ + +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE +#include "utilities/blob_db/blob_file.h" + +#include + +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "file/filename.h" +#include "file/readahead_raf.h" +#include "logging/logging.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace ROCKSDB_NAMESPACE { + +namespace blob_db { + +BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, + Logger* info_log) + : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {} + +BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, + Logger* info_log, uint32_t column_family_id, + CompressionType compression, bool has_ttl, + const ExpirationRange& expiration_range) + : parent_(p), + path_to_dir_(bdir), + file_number_(fn), + info_log_(info_log), + column_family_id_(column_family_id), + compression_(compression), + has_ttl_(has_ttl), + expiration_range_(expiration_range), + header_(column_family_id, compression, has_ttl, expiration_range), + header_valid_(true) {} + +BlobFile::~BlobFile() { + if (obsolete_) { + std::string pn(PathName()); + Status s = Env::Default()->DeleteFile(PathName()); + if (!s.ok()) { + // ROCKS_LOG_INFO(db_options_.info_log, + // "File could not be deleted %s", pn.c_str()); + } + } +} + +uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; } + +std::string BlobFile::PathName() const { + return BlobFileName(path_to_dir_, file_number_); +} + +std::string BlobFile::DumpState() const { + char str[1000]; + snprintf( + str, sizeof(str), + "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64 + " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 + "), writer: %d reader: %d", + path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(), + closed_.load(), obsolete_.load(), expiration_range_.first, + expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); + return str; +} + +void BlobFile::MarkObsolete(SequenceNumber sequence) { + assert(Immutable()); + obsolete_sequence_ = sequence; + obsolete_.store(true); +} + +Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) { + BlobLogFooter footer; + footer.blob_count = blob_count_; + if (HasTTL()) { + footer.expiration_range = expiration_range_; + } + + // this will close the file and reset the Writable File Pointer. + Status s = log_writer_->AppendFooter(footer, /* checksum_method */ nullptr, + /* checksum_value */ nullptr); + if (s.ok()) { + closed_ = true; + immutable_sequence_ = sequence; + file_size_ += BlobLogFooter::kSize; + } + // delete the sequential writer + log_writer_.reset(); + return s; +} + +Status BlobFile::ReadFooter(BlobLogFooter* bf) { + if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) { + return Status::IOError("File does not have footer", PathName()); + } + + uint64_t footer_offset = file_size_ - BlobLogFooter::kSize; + // assume that ra_file_reader_ is valid before we enter this + assert(ra_file_reader_); + + Slice result; + std::string buf; + AlignedBuf aligned_buf; + Status s; + // TODO: rate limit reading footers from blob files. + if (ra_file_reader_->use_direct_io()) { + s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, + &result, nullptr, &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); + } else { + buf.reserve(BlobLogFooter::kSize + 10); + s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, + &result, &buf[0], nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); + } + if (!s.ok()) return s; + if (result.size() != BlobLogFooter::kSize) { + // should not happen + return Status::IOError("EOF reached before footer"); + } + + s = bf->DecodeFrom(result); + return s; +} + +Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { + blob_count_ = footer.blob_count; + expiration_range_ = footer.expiration_range; + closed_ = true; + return Status::OK(); +} + +Status BlobFile::Fsync() { + Status s; + if (log_writer_.get()) { + s = log_writer_->Sync(); + } + return s; +} + +void BlobFile::CloseRandomAccessLocked() { + ra_file_reader_.reset(); + last_access_ = -1; +} + +Status BlobFile::GetReader(Env* env, const FileOptions& file_options, + std::shared_ptr* reader, + bool* fresh_open) { + assert(reader != nullptr); + assert(fresh_open != nullptr); + *fresh_open = false; + int64_t current_time = 0; + if (env->GetCurrentTime(¤t_time).ok()) { + last_access_.store(current_time); + } + Status s; + + { + ReadLock lockbfile_r(&mutex_); + if (ra_file_reader_) { + *reader = ra_file_reader_; + return s; + } + } + + WriteLock lockbfile_w(&mutex_); + // Double check. + if (ra_file_reader_) { + *reader = ra_file_reader_; + return s; + } + + std::unique_ptr rfile; + s = env->GetFileSystem()->NewRandomAccessFile(PathName(), file_options, + &rfile, nullptr); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to open blob file for random-read: %s status: '%s'" + " exists: '%s'", + PathName().c_str(), s.ToString().c_str(), + env->FileExists(PathName()).ToString().c_str()); + return s; + } + + ra_file_reader_ = + std::make_shared(std::move(rfile), PathName()); + *reader = ra_file_reader_; + *fresh_open = true; + return s; +} + +Status BlobFile::ReadMetadata(const std::shared_ptr& fs, + const FileOptions& file_options) { + assert(Immutable()); + // Get file size. + uint64_t file_size = 0; + Status s = + fs->GetFileSize(PathName(), file_options.io_options, &file_size, nullptr); + if (s.ok()) { + file_size_ = file_size; + } else { + ROCKS_LOG_ERROR(info_log_, + "Failed to get size of blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + if (file_size < BlobLogHeader::kSize) { + ROCKS_LOG_ERROR( + info_log_, "Incomplete blob file blob file %" PRIu64 ", size: %" PRIu64, + file_number_, file_size); + return Status::Corruption("Incomplete blob file header."); + } + + // Create file reader. + std::unique_ptr file_reader; + s = RandomAccessFileReader::Create(fs, PathName(), file_options, &file_reader, + nullptr); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to open blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + + // Read file header. + std::string header_buf; + AlignedBuf aligned_buf; + Slice header_slice; + // TODO: rate limit reading headers from blob files. + if (file_reader->use_direct_io()) { + s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, + nullptr, &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); + } else { + header_buf.reserve(BlobLogHeader::kSize); + s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, + &header_buf[0], nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); + } + if (!s.ok()) { + ROCKS_LOG_ERROR( + info_log_, "Failed to read header of blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + BlobLogHeader header; + s = header.DecodeFrom(header_slice); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to decode header of blob file %" PRIu64 + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + column_family_id_ = header.column_family_id; + compression_ = header.compression; + has_ttl_ = header.has_ttl; + if (has_ttl_) { + expiration_range_ = header.expiration_range; + } + header_valid_ = true; + + // Read file footer. + if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) { + // OK not to have footer. + assert(!footer_valid_); + return Status::OK(); + } + std::string footer_buf; + Slice footer_slice; + // TODO: rate limit reading footers from blob files. + if (file_reader->use_direct_io()) { + s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, + BlobLogFooter::kSize, &footer_slice, nullptr, + &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); + } else { + footer_buf.reserve(BlobLogFooter::kSize); + s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, + BlobLogFooter::kSize, &footer_slice, &footer_buf[0], + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); + } + if (!s.ok()) { + ROCKS_LOG_ERROR( + info_log_, "Failed to read footer of blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + BlobLogFooter footer; + s = footer.DecodeFrom(footer_slice); + if (!s.ok()) { + // OK not to have footer. + assert(!footer_valid_); + return Status::OK(); + } + blob_count_ = footer.blob_count; + if (has_ttl_) { + assert(header.expiration_range.first <= footer.expiration_range.first); + assert(header.expiration_range.second >= footer.expiration_range.second); + expiration_range_ = footer.expiration_range; + } + footer_valid_ = true; + return Status::OK(); +} + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_file.h b/src/rocksdb/utilities/blob_db/blob_file.h new file mode 100644 index 000000000..6f3f2bea7 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_file.h @@ -0,0 +1,246 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include + +#include "db/blob/blob_log_format.h" +#include "db/blob/blob_log_writer.h" +#include "file/random_access_file_reader.h" +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "rocksdb/options.h" + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +class BlobDBImpl; + +class BlobFile { + friend class BlobDBImpl; + friend struct BlobFileComparator; + friend struct BlobFileComparatorTTL; + friend class BlobIndexCompactionFilterBase; + friend class BlobIndexCompactionFilterGC; + + private: + // access to parent + const BlobDBImpl* parent_{nullptr}; + + // path to blob directory + std::string path_to_dir_; + + // the id of the file. + // the above 2 are created during file creation and never changed + // after that + uint64_t file_number_{0}; + + // The file numbers of the SST files whose oldest blob file reference + // points to this blob file. + std::unordered_set linked_sst_files_; + + // Info log. + Logger* info_log_{nullptr}; + + // Column family id. + uint32_t column_family_id_{std::numeric_limits::max()}; + + // Compression type of blobs in the file + CompressionType compression_{kNoCompression}; + + // If true, the keys in this file all has TTL. Otherwise all keys don't + // have TTL. + bool has_ttl_{false}; + + // TTL range of blobs in the file. + ExpirationRange expiration_range_; + + // number of blobs in the file + std::atomic blob_count_{0}; + + // size of the file + std::atomic file_size_{0}; + + BlobLogHeader header_; + + // closed_ = true implies the file is no more mutable + // no more blobs will be appended and the footer has been written out + std::atomic closed_{false}; + + // The latest sequence number when the file was closed/made immutable. + SequenceNumber immutable_sequence_{0}; + + // Whether the file was marked obsolete (due to either TTL or GC). + // obsolete_ still needs to do iterator/snapshot checks + std::atomic obsolete_{false}; + + // The last sequence number by the time the file marked as obsolete. + // Data in this file is visible to a snapshot taken before the sequence. + SequenceNumber obsolete_sequence_{0}; + + // Sequential/Append writer for blobs + std::shared_ptr log_writer_; + + // random access file reader for GET calls + std::shared_ptr ra_file_reader_; + + // This Read-Write mutex is per file specific and protects + // all the datastructures + mutable port::RWMutex mutex_; + + // time when the random access reader was last created. + std::atomic last_access_{-1}; + + bool header_valid_{false}; + + bool footer_valid_{false}; + + public: + BlobFile() = default; + + BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum, + Logger* info_log); + + BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum, + Logger* info_log, uint32_t column_family_id, + CompressionType compression, bool has_ttl, + const ExpirationRange& expiration_range); + + ~BlobFile(); + + uint32_t GetColumnFamilyId() const; + + // Returns log file's absolute pathname. + std::string PathName() const; + + // Primary identifier for blob file. + // once the file is created, this never changes + uint64_t BlobFileNumber() const { return file_number_; } + + // Get the set of SST files whose oldest blob file reference points to + // this file. + const std::unordered_set& GetLinkedSstFiles() const { + return linked_sst_files_; + } + + // Link an SST file whose oldest blob file reference points to this file. + void LinkSstFile(uint64_t sst_file_number) { + assert(linked_sst_files_.find(sst_file_number) == linked_sst_files_.end()); + linked_sst_files_.insert(sst_file_number); + } + + // Unlink an SST file whose oldest blob file reference points to this file. + void UnlinkSstFile(uint64_t sst_file_number) { + auto it = linked_sst_files_.find(sst_file_number); + assert(it != linked_sst_files_.end()); + linked_sst_files_.erase(it); + } + + // the following functions are atomic, and don't need + // read lock + uint64_t BlobCount() const { + return blob_count_.load(std::memory_order_acquire); + } + + std::string DumpState() const; + + // if the file is not taking any more appends. + bool Immutable() const { return closed_.load(); } + + // Mark the file as immutable. + // REQUIRES: write lock held, or access from single thread (on DB open). + void MarkImmutable(SequenceNumber sequence) { + closed_ = true; + immutable_sequence_ = sequence; + } + + SequenceNumber GetImmutableSequence() const { + assert(Immutable()); + return immutable_sequence_; + } + + // Whether the file was marked obsolete (due to either TTL or GC). + bool Obsolete() const { + assert(Immutable() || !obsolete_.load()); + return obsolete_.load(); + } + + // Mark file as obsolete (due to either TTL or GC). The file is not visible to + // snapshots with sequence greater or equal to the given sequence. + void MarkObsolete(SequenceNumber sequence); + + SequenceNumber GetObsoleteSequence() const { + assert(Obsolete()); + return obsolete_sequence_; + } + + Status Fsync(); + + uint64_t GetFileSize() const { + return file_size_.load(std::memory_order_acquire); + } + + // All Get functions which are not atomic, will need ReadLock on the mutex + + const ExpirationRange& GetExpirationRange() const { + return expiration_range_; + } + + void ExtendExpirationRange(uint64_t expiration) { + expiration_range_.first = std::min(expiration_range_.first, expiration); + expiration_range_.second = std::max(expiration_range_.second, expiration); + } + + bool HasTTL() const { return has_ttl_; } + + void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; } + + CompressionType GetCompressionType() const { return compression_; } + + std::shared_ptr GetWriter() const { return log_writer_; } + + // Read blob file header and footer. Return corruption if file header is + // malform or incomplete. If footer is malform or incomplete, set + // footer_valid_ to false and return Status::OK. + Status ReadMetadata(const std::shared_ptr& fs, + const FileOptions& file_options); + + Status GetReader(Env* env, const FileOptions& file_options, + std::shared_ptr* reader, + bool* fresh_open); + + private: + Status ReadFooter(BlobLogFooter* footer); + + Status WriteFooterAndCloseLocked(SequenceNumber sequence); + + void CloseRandomAccessLocked(); + + // this is used, when you are reading only the footer of a + // previously closed file + Status SetFromFooterLocked(const BlobLogFooter& footer); + + void set_expiration_range(const ExpirationRange& expiration_range) { + expiration_range_ = expiration_range; + } + + // The following functions are atomic, and don't need locks + void SetFileSize(uint64_t fs) { file_size_ = fs; } + + void SetBlobCount(uint64_t bc) { blob_count_ = bc; } + + void BlobRecordAdded(uint64_t record_size) { + ++blob_count_; + file_size_ += record_size; + } +}; +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE -- cgit v1.2.3