summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/blob_db
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/blob_db
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/blob_db')
-rw-r--r--src/rocksdb/utilities/blob_db/blob_compaction_filter.cc490
-rw-r--r--src/rocksdb/utilities/blob_db/blob_compaction_filter.h204
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db.cc114
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db.h266
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_gc_stats.h56
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_impl.cc2177
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_impl.h503
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc113
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_iterator.h150
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_listener.h71
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_test.cc2407
-rw-r--r--src/rocksdb/utilities/blob_db/blob_dump_tool.cc282
-rw-r--r--src/rocksdb/utilities/blob_db/blob_dump_tool.h58
-rw-r--r--src/rocksdb/utilities/blob_db/blob_file.cc318
-rw-r--r--src/rocksdb/utilities/blob_db/blob_file.h246
15 files changed, 7455 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc
new file mode 100644
index 000000000..86907e979
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc
@@ -0,0 +1,490 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_compaction_filter.h"
+
+#include <cinttypes>
+
+#include "db/dbformat.h"
+#include "logging/logging.h"
+#include "rocksdb/system_clock.h"
+#include "test_util/sync_point.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+BlobIndexCompactionFilterBase::~BlobIndexCompactionFilterBase() {
+ if (blob_file_) {
+ CloseAndRegisterNewBlobFile();
+ }
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
+}
+
+CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2(
+ int level, const Slice& key, ValueType value_type, const Slice& value,
+ std::string* new_value, std::string* skip_until) const {
+ const CompactionFilter* ucf = user_comp_filter();
+ if (value_type != kBlobIndex) {
+ if (ucf == nullptr) {
+ return Decision::kKeep;
+ }
+ // Apply user compaction filter for inlined data.
+ CompactionFilter::Decision decision =
+ ucf->FilterV2(level, key, value_type, value, new_value, skip_until);
+ if (decision == Decision::kChangeValue) {
+ return HandleValueChange(key, new_value);
+ }
+ return decision;
+ }
+ BlobIndex blob_index;
+ Status s = blob_index.DecodeFrom(value);
+ if (!s.ok()) {
+ // Unable to decode blob index. Keeping the value.
+ return Decision::kKeep;
+ }
+ if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
+ // Expired
+ expired_count_++;
+ expired_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ if (!blob_index.IsInlined() &&
+ blob_index.file_number() < context_.next_file_number &&
+ context_.current_blob_files.count(blob_index.file_number()) == 0) {
+ // Corresponding blob file gone (most likely, evicted by FIFO eviction).
+ evicted_count_++;
+ evicted_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() &&
+ blob_index.expiration() < context_.evict_expiration_up_to) {
+ // Hack: Internal key is passed to BlobIndexCompactionFilter for it to
+ // get sequence number.
+ ParsedInternalKey ikey;
+ if (!ParseInternalKey(
+ key, &ikey,
+ context_.blob_db_impl->db_options_.allow_data_in_errors)
+ .ok()) {
+ assert(false);
+ return Decision::kKeep;
+ }
+ // Remove keys that could have been remove by last FIFO eviction.
+ // If get error while parsing key, ignore and continue.
+ if (ikey.sequence < context_.fifo_eviction_seq) {
+ evicted_count_++;
+ evicted_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ }
+ // Apply user compaction filter for all non-TTL blob data.
+ if (ucf != nullptr && !blob_index.HasTTL()) {
+ // Hack: Internal key is passed to BlobIndexCompactionFilter for it to
+ // get sequence number.
+ ParsedInternalKey ikey;
+ if (!ParseInternalKey(
+ key, &ikey,
+ context_.blob_db_impl->db_options_.allow_data_in_errors)
+ .ok()) {
+ assert(false);
+ return Decision::kKeep;
+ }
+ // Read value from blob file.
+ PinnableSlice blob;
+ CompressionType compression_type = kNoCompression;
+ constexpr bool need_decompress = true;
+ if (!ReadBlobFromOldFile(ikey.user_key, blob_index, &blob, need_decompress,
+ &compression_type)) {
+ return Decision::kIOError;
+ }
+ CompactionFilter::Decision decision = ucf->FilterV2(
+ level, ikey.user_key, kValue, blob, new_value, skip_until);
+ if (decision == Decision::kChangeValue) {
+ return HandleValueChange(ikey.user_key, new_value);
+ }
+ return decision;
+ }
+ return Decision::kKeep;
+}
+
+CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange(
+ const Slice& key, std::string* new_value) const {
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ if (new_value->size() < blob_db_impl->bdb_options_.min_blob_size) {
+ // Keep new_value inlined.
+ return Decision::kChangeValue;
+ }
+ if (!OpenNewBlobFileIfNeeded()) {
+ return Decision::kIOError;
+ }
+ Slice new_blob_value(*new_value);
+ std::string compression_output;
+ if (blob_db_impl->bdb_options_.compression != kNoCompression) {
+ new_blob_value =
+ blob_db_impl->GetCompressedSlice(new_blob_value, &compression_output);
+ }
+ uint64_t new_blob_file_number = 0;
+ uint64_t new_blob_offset = 0;
+ if (!WriteBlobToNewFile(key, new_blob_value, &new_blob_file_number,
+ &new_blob_offset)) {
+ return Decision::kIOError;
+ }
+ if (!CloseAndRegisterNewBlobFileIfNeeded()) {
+ return Decision::kIOError;
+ }
+ BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
+ new_blob_value.size(),
+ blob_db_impl->bdb_options_.compression);
+ return Decision::kChangeBlobIndex;
+}
+
+BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() {
+ assert(context().blob_db_impl);
+
+ ROCKS_LOG_INFO(context().blob_db_impl->db_options_.info_log,
+ "GC pass finished %s: encountered %" PRIu64 " blobs (%" PRIu64
+ " bytes), relocated %" PRIu64 " blobs (%" PRIu64
+ " bytes), created %" PRIu64 " new blob file(s)",
+ !gc_stats_.HasError() ? "successfully" : "with failure",
+ gc_stats_.AllBlobs(), gc_stats_.AllBytes(),
+ gc_stats_.RelocatedBlobs(), gc_stats_.RelocatedBytes(),
+ gc_stats_.NewFiles());
+
+ RecordTick(statistics(), BLOB_DB_GC_NUM_KEYS_RELOCATED,
+ gc_stats_.RelocatedBlobs());
+ RecordTick(statistics(), BLOB_DB_GC_BYTES_RELOCATED,
+ gc_stats_.RelocatedBytes());
+ RecordTick(statistics(), BLOB_DB_GC_NUM_NEW_FILES, gc_stats_.NewFiles());
+ RecordTick(statistics(), BLOB_DB_GC_FAILURES, gc_stats_.HasError());
+}
+
+bool BlobIndexCompactionFilterBase::IsBlobFileOpened() const {
+ if (blob_file_) {
+ assert(writer_);
+ return true;
+ }
+ return false;
+}
+
+bool BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded() const {
+ if (IsBlobFileOpened()) {
+ return true;
+ }
+
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ const Status s = blob_db_impl->CreateBlobFileAndWriter(
+ /* has_ttl */ false, ExpirationRange(), "compaction/GC", &blob_file_,
+ &writer_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ blob_db_impl->db_options_.info_log,
+ "Error opening new blob file during compaction/GC, status: %s",
+ s.ToString().c_str());
+ blob_file_.reset();
+ writer_.reset();
+ return false;
+ }
+
+ assert(blob_file_);
+ assert(writer_);
+
+ return true;
+}
+
+bool BlobIndexCompactionFilterBase::ReadBlobFromOldFile(
+ const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob,
+ bool need_decompress, CompressionType* compression_type) const {
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ Status s = blob_db_impl->GetRawBlobFromFile(
+ key, blob_index.file_number(), blob_index.offset(), blob_index.size(),
+ blob, compression_type);
+
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ blob_db_impl->db_options_.info_log,
+ "Error reading blob during compaction/GC, key: %s (%s), status: %s",
+ key.ToString(/* output_hex */ true).c_str(),
+ blob_index.DebugString(/* output_hex */ true).c_str(),
+ s.ToString().c_str());
+
+ return false;
+ }
+
+ if (need_decompress && *compression_type != kNoCompression) {
+ s = blob_db_impl->DecompressSlice(*blob, *compression_type, blob);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ blob_db_impl->db_options_.info_log,
+ "Uncompression error during blob read from file: %" PRIu64
+ " blob_offset: %" PRIu64 " blob_size: %" PRIu64
+ " key: %s status: '%s'",
+ blob_index.file_number(), blob_index.offset(), blob_index.size(),
+ key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
+
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool BlobIndexCompactionFilterBase::WriteBlobToNewFile(
+ const Slice& key, const Slice& blob, uint64_t* new_blob_file_number,
+ uint64_t* new_blob_offset) const {
+ TEST_SYNC_POINT("BlobIndexCompactionFilterBase::WriteBlobToNewFile");
+ assert(new_blob_file_number);
+ assert(new_blob_offset);
+
+ assert(blob_file_);
+ *new_blob_file_number = blob_file_->BlobFileNumber();
+
+ assert(writer_);
+ uint64_t new_key_offset = 0;
+ const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset,
+ new_blob_offset);
+
+ if (!s.ok()) {
+ const BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
+ "Error writing blob to new file %s during compaction/GC, "
+ "key: %s, status: %s",
+ blob_file_->PathName().c_str(),
+ key.ToString(/* output_hex */ true).c_str(),
+ s.ToString().c_str());
+ return false;
+ }
+
+ const uint64_t new_size =
+ BlobLogRecord::kHeaderSize + key.size() + blob.size();
+ blob_file_->BlobRecordAdded(new_size);
+
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ blob_db_impl->total_blob_size_ += new_size;
+
+ return true;
+}
+
+bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFileIfNeeded()
+ const {
+ const BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ assert(blob_file_);
+ if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) {
+ return true;
+ }
+
+ return CloseAndRegisterNewBlobFile();
+}
+
+bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFile() const {
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+ assert(blob_file_);
+
+ Status s;
+
+ {
+ WriteLock wl(&blob_db_impl->mutex_);
+
+ s = blob_db_impl->CloseBlobFile(blob_file_);
+
+ // Note: we delay registering the new blob file until it's closed to
+ // prevent FIFO eviction from processing it during compaction/GC.
+ blob_db_impl->RegisterBlobFile(blob_file_);
+ }
+
+ assert(blob_file_->Immutable());
+
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ blob_db_impl->db_options_.info_log,
+ "Error closing new blob file %s during compaction/GC, status: %s",
+ blob_file_->PathName().c_str(), s.ToString().c_str());
+ }
+
+ blob_file_.reset();
+ return s.ok();
+}
+
+CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
+ const Slice& key, const Slice& existing_value,
+ std::string* new_value) const {
+ assert(new_value);
+
+ const BlobDBImpl* const blob_db_impl = context().blob_db_impl;
+ (void)blob_db_impl;
+
+ assert(blob_db_impl);
+ assert(blob_db_impl->bdb_options_.enable_garbage_collection);
+
+ BlobIndex blob_index;
+ const Status s = blob_index.DecodeFrom(existing_value);
+ if (!s.ok()) {
+ gc_stats_.SetError();
+ return BlobDecision::kCorruption;
+ }
+
+ if (blob_index.IsInlined()) {
+ gc_stats_.AddBlob(blob_index.value().size());
+
+ return BlobDecision::kKeep;
+ }
+
+ gc_stats_.AddBlob(blob_index.size());
+
+ if (blob_index.HasTTL()) {
+ return BlobDecision::kKeep;
+ }
+
+ if (blob_index.file_number() >= context_gc_.cutoff_file_number) {
+ return BlobDecision::kKeep;
+ }
+
+ // Note: each compaction generates its own blob files, which, depending on the
+ // workload, might result in many small blob files. The total number of files
+ // is bounded though (determined by the number of compactions and the blob
+ // file size option).
+ if (!OpenNewBlobFileIfNeeded()) {
+ gc_stats_.SetError();
+ return BlobDecision::kIOError;
+ }
+
+ PinnableSlice blob;
+ CompressionType compression_type = kNoCompression;
+ std::string compression_output;
+ if (!ReadBlobFromOldFile(key, blob_index, &blob, false, &compression_type)) {
+ gc_stats_.SetError();
+ return BlobDecision::kIOError;
+ }
+
+ // If the compression_type is changed, re-compress it with the new compression
+ // type.
+ if (compression_type != blob_db_impl->bdb_options_.compression) {
+ if (compression_type != kNoCompression) {
+ const Status status =
+ blob_db_impl->DecompressSlice(blob, compression_type, &blob);
+ if (!status.ok()) {
+ gc_stats_.SetError();
+ return BlobDecision::kCorruption;
+ }
+ }
+ if (blob_db_impl->bdb_options_.compression != kNoCompression) {
+ blob_db_impl->GetCompressedSlice(blob, &compression_output);
+ blob = PinnableSlice(&compression_output);
+ blob.PinSelf();
+ }
+ }
+
+ uint64_t new_blob_file_number = 0;
+ uint64_t new_blob_offset = 0;
+ if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) {
+ gc_stats_.SetError();
+ return BlobDecision::kIOError;
+ }
+
+ if (!CloseAndRegisterNewBlobFileIfNeeded()) {
+ gc_stats_.SetError();
+ return BlobDecision::kIOError;
+ }
+
+ BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
+ blob.size(), compression_type);
+
+ gc_stats_.AddRelocatedBlob(blob_index.size());
+
+ return BlobDecision::kChangeValue;
+}
+
+bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const {
+ if (IsBlobFileOpened()) {
+ return true;
+ }
+ bool result = BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded();
+ if (result) {
+ gc_stats_.AddNewFile();
+ }
+ return result;
+}
+
+std::unique_ptr<CompactionFilter>
+BlobIndexCompactionFilterFactoryBase::CreateUserCompactionFilterFromFactory(
+ const CompactionFilter::Context& context) const {
+ std::unique_ptr<CompactionFilter> user_comp_filter_from_factory;
+ if (user_comp_filter_factory_) {
+ user_comp_filter_from_factory =
+ user_comp_filter_factory_->CreateCompactionFilter(context);
+ }
+ return user_comp_filter_from_factory;
+}
+
+std::unique_ptr<CompactionFilter>
+BlobIndexCompactionFilterFactory::CreateCompactionFilter(
+ const CompactionFilter::Context& _context) {
+ assert(clock());
+
+ int64_t current_time = 0;
+ Status s = clock()->GetCurrentTime(&current_time);
+ if (!s.ok()) {
+ return nullptr;
+ }
+ assert(current_time >= 0);
+
+ assert(blob_db_impl());
+
+ BlobCompactionContext context;
+ blob_db_impl()->GetCompactionContext(&context);
+
+ std::unique_ptr<CompactionFilter> user_comp_filter_from_factory =
+ CreateUserCompactionFilterFromFactory(_context);
+
+ return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
+ std::move(context), user_comp_filter(),
+ std::move(user_comp_filter_from_factory), current_time, statistics()));
+}
+
+std::unique_ptr<CompactionFilter>
+BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter(
+ const CompactionFilter::Context& _context) {
+ assert(clock());
+
+ int64_t current_time = 0;
+ Status s = clock()->GetCurrentTime(&current_time);
+ if (!s.ok()) {
+ return nullptr;
+ }
+ assert(current_time >= 0);
+
+ assert(blob_db_impl());
+
+ BlobCompactionContext context;
+ BlobCompactionContextGC context_gc;
+ blob_db_impl()->GetCompactionContext(&context, &context_gc);
+
+ std::unique_ptr<CompactionFilter> user_comp_filter_from_factory =
+ CreateUserCompactionFilterFromFactory(_context);
+
+ return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilterGC(
+ std::move(context), std::move(context_gc), user_comp_filter(),
+ std::move(user_comp_filter_from_factory), current_time, statistics()));
+}
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.h b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
new file mode 100644
index 000000000..1493cfc1a
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
@@ -0,0 +1,204 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <unordered_set>
+
+#include "db/blob/blob_index.h"
+#include "monitoring/statistics.h"
+#include "rocksdb/compaction_filter.h"
+#include "utilities/blob_db/blob_db_gc_stats.h"
+#include "utilities/blob_db/blob_db_impl.h"
+#include "utilities/compaction_filters/layered_compaction_filter_base.h"
+
+namespace ROCKSDB_NAMESPACE {
+class SystemClock;
+namespace blob_db {
+
+struct BlobCompactionContext {
+ BlobDBImpl* blob_db_impl = nullptr;
+ uint64_t next_file_number = 0;
+ std::unordered_set<uint64_t> current_blob_files;
+ SequenceNumber fifo_eviction_seq = 0;
+ uint64_t evict_expiration_up_to = 0;
+};
+
+struct BlobCompactionContextGC {
+ uint64_t cutoff_file_number = 0;
+};
+
+// Compaction filter that deletes expired blob indexes from the base DB.
+// Comes into two varieties, one for the non-GC case and one for the GC case.
+class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
+ public:
+ BlobIndexCompactionFilterBase(
+ BlobCompactionContext&& _context,
+ const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+ uint64_t current_time, Statistics* stats)
+ : LayeredCompactionFilterBase(_user_comp_filter,
+ std::move(_user_comp_filter_from_factory)),
+ context_(std::move(_context)),
+ current_time_(current_time),
+ statistics_(stats) {}
+
+ ~BlobIndexCompactionFilterBase() override;
+
+ // Filter expired blob indexes regardless of snapshots.
+ bool IgnoreSnapshots() const override { return true; }
+
+ Decision FilterV2(int level, const Slice& key, ValueType value_type,
+ const Slice& value, std::string* new_value,
+ std::string* skip_until) const override;
+
+ bool IsStackedBlobDbInternalCompactionFilter() const override { return true; }
+
+ protected:
+ bool IsBlobFileOpened() const;
+ virtual bool OpenNewBlobFileIfNeeded() const;
+ bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
+ PinnableSlice* blob, bool need_decompress,
+ CompressionType* compression_type) const;
+ bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
+ uint64_t* new_blob_file_number,
+ uint64_t* new_blob_offset) const;
+ bool CloseAndRegisterNewBlobFileIfNeeded() const;
+ bool CloseAndRegisterNewBlobFile() const;
+
+ Statistics* statistics() const { return statistics_; }
+ const BlobCompactionContext& context() const { return context_; }
+
+ private:
+ Decision HandleValueChange(const Slice& key, std::string* new_value) const;
+
+ private:
+ BlobCompactionContext context_;
+ const uint64_t current_time_;
+ Statistics* statistics_;
+
+ mutable std::shared_ptr<BlobFile> blob_file_;
+ mutable std::shared_ptr<BlobLogWriter> writer_;
+
+ // It is safe to not using std::atomic since the compaction filter, created
+ // from a compaction filter factroy, will not be called from multiple threads.
+ mutable uint64_t expired_count_ = 0;
+ mutable uint64_t expired_size_ = 0;
+ mutable uint64_t evicted_count_ = 0;
+ mutable uint64_t evicted_size_ = 0;
+};
+
+class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
+ public:
+ BlobIndexCompactionFilter(
+ BlobCompactionContext&& _context,
+ const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+ uint64_t current_time, Statistics* stats)
+ : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
+ std::move(_user_comp_filter_from_factory),
+ current_time, stats) {}
+
+ const char* Name() const override { return "BlobIndexCompactionFilter"; }
+};
+
+class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
+ public:
+ BlobIndexCompactionFilterGC(
+ BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc,
+ const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
+ uint64_t current_time, Statistics* stats)
+ : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
+ std::move(_user_comp_filter_from_factory),
+ current_time, stats),
+ context_gc_(std::move(context_gc)) {}
+
+ ~BlobIndexCompactionFilterGC() override;
+
+ const char* Name() const override { return "BlobIndexCompactionFilterGC"; }
+
+ BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value,
+ std::string* new_value) const override;
+
+ private:
+ bool OpenNewBlobFileIfNeeded() const override;
+
+ private:
+ BlobCompactionContextGC context_gc_;
+ mutable BlobDBGarbageCollectionStats gc_stats_;
+};
+
+// Compaction filter factory; similarly to the filters above, it comes
+// in two flavors, one that creates filters that support GC, and one
+// that creates non-GC filters.
+class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
+ public:
+ BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl,
+ SystemClock* _clock,
+ const ColumnFamilyOptions& _cf_options,
+ Statistics* _statistics)
+ : blob_db_impl_(_blob_db_impl),
+ clock_(_clock),
+ statistics_(_statistics),
+ user_comp_filter_(_cf_options.compaction_filter),
+ user_comp_filter_factory_(_cf_options.compaction_filter_factory) {}
+
+ protected:
+ std::unique_ptr<CompactionFilter> CreateUserCompactionFilterFromFactory(
+ const CompactionFilter::Context& context) const;
+
+ BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
+ SystemClock* clock() const { return clock_; }
+ Statistics* statistics() const { return statistics_; }
+ const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
+
+ private:
+ BlobDBImpl* blob_db_impl_;
+ SystemClock* clock_;
+ Statistics* statistics_;
+ const CompactionFilter* user_comp_filter_;
+ std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
+};
+
+class BlobIndexCompactionFilterFactory
+ : public BlobIndexCompactionFilterFactoryBase {
+ public:
+ BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl,
+ SystemClock* _clock,
+ const ColumnFamilyOptions& _cf_options,
+ Statistics* _statistics)
+ : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options,
+ _statistics) {}
+
+ const char* Name() const override {
+ return "BlobIndexCompactionFilterFactory";
+ }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override;
+};
+
+class BlobIndexCompactionFilterFactoryGC
+ : public BlobIndexCompactionFilterFactoryBase {
+ public:
+ BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl,
+ SystemClock* _clock,
+ const ColumnFamilyOptions& _cf_options,
+ Statistics* _statistics)
+ : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options,
+ _statistics) {}
+
+ const char* Name() const override {
+ return "BlobIndexCompactionFilterFactoryGC";
+ }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override;
+};
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db.cc b/src/rocksdb/utilities/blob_db/blob_db.cc
new file mode 100644
index 000000000..cbd02e68e
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db.cc
@@ -0,0 +1,114 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_db.h"
+
+#include <cinttypes>
+
+#include "logging/logging.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options,
+ const std::string& dbname, BlobDB** blob_db) {
+ *blob_db = nullptr;
+ DBOptions db_options(options);
+ ColumnFamilyOptions cf_options(options);
+ std::vector<ColumnFamilyDescriptor> column_families;
+ column_families.push_back(
+ ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+ std::vector<ColumnFamilyHandle*> handles;
+ Status s = BlobDB::Open(db_options, bdb_options, dbname, column_families,
+ &handles, blob_db);
+ if (s.ok()) {
+ assert(handles.size() == 1);
+ // i can delete the handle since DBImpl is always holding a reference to
+ // default column family
+ delete handles[0];
+ }
+ return s;
+}
+
+Status BlobDB::Open(const DBOptions& db_options,
+ const BlobDBOptions& bdb_options, const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles,
+ BlobDB** blob_db) {
+ assert(handles);
+
+ if (column_families.size() != 1 ||
+ column_families[0].name != kDefaultColumnFamilyName) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+
+ BlobDBImpl* blob_db_impl = new BlobDBImpl(dbname, bdb_options, db_options,
+ column_families[0].options);
+ Status s = blob_db_impl->Open(handles);
+ if (s.ok()) {
+ *blob_db = static_cast<BlobDB*>(blob_db_impl);
+ } else {
+ if (!handles->empty()) {
+ for (ColumnFamilyHandle* cfh : *handles) {
+ blob_db_impl->DestroyColumnFamilyHandle(cfh);
+ }
+
+ handles->clear();
+ }
+
+ delete blob_db_impl;
+ *blob_db = nullptr;
+ }
+ return s;
+}
+
+BlobDB::BlobDB() : StackableDB(nullptr) {}
+
+void BlobDBOptions::Dump(Logger* log) const {
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.blob_dir: %s",
+ blob_dir.c_str());
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.path_relative: %d",
+ path_relative);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.is_fifo: %d",
+ is_fifo);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.max_db_size: %" PRIu64,
+ max_db_size);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.ttl_range_secs: %" PRIu64,
+ ttl_range_secs);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.min_blob_size: %" PRIu64,
+ min_blob_size);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.bytes_per_sync: %" PRIu64,
+ bytes_per_sync);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.blob_file_size: %" PRIu64,
+ blob_file_size);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.compression: %d",
+ static_cast<int>(compression));
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.enable_garbage_collection: %d",
+ enable_garbage_collection);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.garbage_collection_cutoff: %f",
+ garbage_collection_cutoff);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.disable_background_tasks: %d",
+ disable_background_tasks);
+}
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif
diff --git a/src/rocksdb/utilities/blob_db/blob_db.h b/src/rocksdb/utilities/blob_db/blob_db.h
new file mode 100644
index 000000000..e9d92486f
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db.h
@@ -0,0 +1,266 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <functional>
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "rocksdb/db.h"
+#include "rocksdb/status.h"
+#include "rocksdb/utilities/stackable_db.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace blob_db {
+
+// A wrapped database which puts values of KV pairs in a separate log
+// and store location to the log in the underlying DB.
+//
+// The factory needs to be moved to include/rocksdb/utilities to allow
+// users to use blob DB.
+
+constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
+
+struct BlobDBOptions {
+ // Name of the directory under the base DB where blobs will be stored. Using
+ // a directory where the base DB stores its SST files is not supported.
+ // Default is "blob_dir"
+ std::string blob_dir = "blob_dir";
+
+ // whether the blob_dir path is relative or absolute.
+ bool path_relative = true;
+
+ // When max_db_size is reached, evict blob files to free up space
+ // instead of returnning NoSpace error on write. Blob files will be
+ // evicted from oldest to newest, based on file creation time.
+ bool is_fifo = false;
+
+ // Maximum size of the database (including SST files and blob files).
+ //
+ // Default: 0 (no limits)
+ uint64_t max_db_size = 0;
+
+ // a new bucket is opened, for ttl_range. So if ttl_range is 600seconds
+ // (10 minutes), and the first bucket starts at 1471542000
+ // then the blob buckets will be
+ // first bucket is 1471542000 - 1471542600
+ // second bucket is 1471542600 - 1471543200
+ // and so on
+ uint64_t ttl_range_secs = 3600;
+
+ // The smallest value to store in blob log. Values smaller than this threshold
+ // will be inlined in base DB together with the key.
+ uint64_t min_blob_size = 0;
+
+ // Allows OS to incrementally sync blob files to disk for every
+ // bytes_per_sync bytes written. Users shouldn't rely on it for
+ // persistency guarantee.
+ uint64_t bytes_per_sync = 512 * 1024;
+
+ // the target size of each blob file. File will become immutable
+ // after it exceeds that size
+ uint64_t blob_file_size = 256 * 1024 * 1024;
+
+ // what compression to use for Blob's
+ CompressionType compression = kNoCompression;
+
+ // If enabled, BlobDB cleans up stale blobs in non-TTL files during compaction
+ // by rewriting the remaining live blobs to new files.
+ bool enable_garbage_collection = false;
+
+ // The cutoff in terms of blob file age for garbage collection. Blobs in
+ // the oldest N non-TTL blob files will be rewritten when encountered during
+ // compaction, where N = garbage_collection_cutoff * number_of_non_TTL_files.
+ double garbage_collection_cutoff = 0.25;
+
+ // Disable all background job. Used for test only.
+ bool disable_background_tasks = false;
+
+ void Dump(Logger* log) const;
+};
+
+class BlobDB : public StackableDB {
+ public:
+ using ROCKSDB_NAMESPACE::StackableDB::Put;
+ virtual Status Put(const WriteOptions& options, const Slice& key,
+ const Slice& value) override = 0;
+ virtual Status Put(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) override {
+ if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ return Put(options, key, value);
+ }
+
+ using ROCKSDB_NAMESPACE::StackableDB::Delete;
+ virtual Status Delete(const WriteOptions& options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key) override {
+ if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ assert(db_ != nullptr);
+ return db_->Delete(options, column_family, key);
+ }
+
+ virtual Status PutWithTTL(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t ttl) = 0;
+ virtual Status PutWithTTL(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value, uint64_t ttl) {
+ if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ return PutWithTTL(options, key, value, ttl);
+ }
+
+ // Put with expiration. Key with expiration time equal to
+ // std::numeric_limits<uint64_t>::max() means the key don't expire.
+ virtual Status PutUntil(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t expiration) = 0;
+ virtual Status PutUntil(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value, uint64_t expiration) {
+ if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ return PutUntil(options, key, value, expiration);
+ }
+
+ using ROCKSDB_NAMESPACE::StackableDB::Get;
+ virtual Status Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) override = 0;
+
+ // Get value and expiration.
+ virtual Status Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration) = 0;
+ virtual Status Get(const ReadOptions& options, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration) {
+ return Get(options, DefaultColumnFamily(), key, value, expiration);
+ }
+
+ using ROCKSDB_NAMESPACE::StackableDB::MultiGet;
+ virtual std::vector<Status> MultiGet(
+ const ReadOptions& options, const std::vector<Slice>& keys,
+ std::vector<std::string>* values) override = 0;
+ virtual std::vector<Status> MultiGet(
+ const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ const std::vector<Slice>& keys,
+ std::vector<std::string>* values) override {
+ for (auto column_family : column_families) {
+ if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
+ return std::vector<Status>(
+ column_families.size(),
+ Status::NotSupported(
+ "Blob DB doesn't support non-default column family."));
+ }
+ }
+ return MultiGet(options, keys, values);
+ }
+ virtual void MultiGet(const ReadOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const size_t num_keys, const Slice* /*keys*/,
+ PinnableSlice* /*values*/, Status* statuses,
+ const bool /*sorted_input*/ = false) override {
+ for (size_t i = 0; i < num_keys; ++i) {
+ statuses[i] =
+ Status::NotSupported("Blob DB doesn't support batched MultiGet");
+ }
+ }
+
+ using ROCKSDB_NAMESPACE::StackableDB::SingleDelete;
+ virtual Status SingleDelete(const WriteOptions& /*wopts*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/) override {
+ return Status::NotSupported("Not supported operation in blob db.");
+ }
+
+ using ROCKSDB_NAMESPACE::StackableDB::Merge;
+ virtual Status Merge(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/, const Slice& /*value*/) override {
+ return Status::NotSupported("Not supported operation in blob db.");
+ }
+
+ virtual Status Write(const WriteOptions& opts,
+ WriteBatch* updates) override = 0;
+
+ using ROCKSDB_NAMESPACE::StackableDB::NewIterator;
+ virtual Iterator* NewIterator(const ReadOptions& options) override = 0;
+ virtual Iterator* NewIterator(const ReadOptions& options,
+ ColumnFamilyHandle* column_family) override {
+ if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
+ // Blob DB doesn't support non-default column family.
+ return nullptr;
+ }
+ return NewIterator(options);
+ }
+
+ Status CompactFiles(
+ const CompactionOptions& compact_options,
+ const std::vector<std::string>& input_file_names, const int output_level,
+ const int output_path_id = -1,
+ std::vector<std::string>* const output_file_names = nullptr,
+ CompactionJobInfo* compaction_job_info = nullptr) override = 0;
+ Status CompactFiles(
+ const CompactionOptions& compact_options,
+ ColumnFamilyHandle* column_family,
+ const std::vector<std::string>& input_file_names, const int output_level,
+ const int output_path_id = -1,
+ std::vector<std::string>* const output_file_names = nullptr,
+ CompactionJobInfo* compaction_job_info = nullptr) override {
+ if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+
+ return CompactFiles(compact_options, input_file_names, output_level,
+ output_path_id, output_file_names, compaction_job_info);
+ }
+
+ using ROCKSDB_NAMESPACE::StackableDB::Close;
+ virtual Status Close() override = 0;
+
+ // Opening blob db.
+ static Status Open(const Options& options, const BlobDBOptions& bdb_options,
+ const std::string& dbname, BlobDB** blob_db);
+
+ static Status Open(const DBOptions& db_options,
+ const BlobDBOptions& bdb_options,
+ const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles,
+ BlobDB** blob_db);
+
+ virtual BlobDBOptions GetBlobDBOptions() const = 0;
+
+ virtual Status SyncBlobFiles() = 0;
+
+ virtual ~BlobDB() {}
+
+ protected:
+ explicit BlobDB();
+};
+
+// Destroy the content of the database.
+Status DestroyBlobDB(const std::string& dbname, const Options& options,
+ const BlobDBOptions& bdb_options);
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_gc_stats.h b/src/rocksdb/utilities/blob_db/blob_db_gc_stats.h
new file mode 100644
index 000000000..fea6b0032
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_gc_stats.h
@@ -0,0 +1,56 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#include <cstdint>
+
+#include "rocksdb/rocksdb_namespace.h"
+
+#ifndef ROCKSDB_LITE
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace blob_db {
+
+/**
+ * Statistics related to a single garbage collection pass (i.e. a single
+ * (sub)compaction).
+ */
+class BlobDBGarbageCollectionStats {
+ public:
+ uint64_t AllBlobs() const { return all_blobs_; }
+ uint64_t AllBytes() const { return all_bytes_; }
+ uint64_t RelocatedBlobs() const { return relocated_blobs_; }
+ uint64_t RelocatedBytes() const { return relocated_bytes_; }
+ uint64_t NewFiles() const { return new_files_; }
+ bool HasError() const { return error_; }
+
+ void AddBlob(uint64_t size) {
+ ++all_blobs_;
+ all_bytes_ += size;
+ }
+
+ void AddRelocatedBlob(uint64_t size) {
+ ++relocated_blobs_;
+ relocated_bytes_ += size;
+ }
+
+ void AddNewFile() { ++new_files_; }
+
+ void SetError() { error_ = true; }
+
+ private:
+ uint64_t all_blobs_ = 0;
+ uint64_t all_bytes_ = 0;
+ uint64_t relocated_blobs_ = 0;
+ uint64_t relocated_bytes_ = 0;
+ uint64_t new_files_ = 0;
+ bool error_ = false;
+};
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.cc b/src/rocksdb/utilities/blob_db/blob_db_impl.cc
new file mode 100644
index 000000000..87e294c5c
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_impl.cc
@@ -0,0 +1,2177 @@
+
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_db_impl.h"
+
+#include <algorithm>
+#include <cinttypes>
+#include <iomanip>
+#include <memory>
+#include <sstream>
+
+#include "db/blob/blob_index.h"
+#include "db/db_impl/db_impl.h"
+#include "db/write_batch_internal.h"
+#include "file/file_util.h"
+#include "file/filename.h"
+#include "file/random_access_file_reader.h"
+#include "file/sst_file_manager_impl.h"
+#include "file/writable_file_writer.h"
+#include "logging/logging.h"
+#include "monitoring/instrumented_mutex.h"
+#include "monitoring/statistics.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/utilities/stackable_db.h"
+#include "rocksdb/utilities/transaction.h"
+#include "table/block_based/block.h"
+#include "table/block_based/block_based_table_builder.h"
+#include "table/block_based/block_builder.h"
+#include "table/meta_blocks.h"
+#include "test_util/sync_point.h"
+#include "util/cast_util.h"
+#include "util/crc32c.h"
+#include "util/mutexlock.h"
+#include "util/random.h"
+#include "util/stop_watch.h"
+#include "util/timer_queue.h"
+#include "utilities/blob_db/blob_compaction_filter.h"
+#include "utilities/blob_db/blob_db_iterator.h"
+#include "utilities/blob_db/blob_db_listener.h"
+
+namespace {
+int kBlockBasedTableVersionFormat = 2;
+} // end namespace
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+bool BlobFileComparator::operator()(
+ const std::shared_ptr<BlobFile>& lhs,
+ const std::shared_ptr<BlobFile>& rhs) const {
+ return lhs->BlobFileNumber() > rhs->BlobFileNumber();
+}
+
+bool BlobFileComparatorTTL::operator()(
+ const std::shared_ptr<BlobFile>& lhs,
+ const std::shared_ptr<BlobFile>& rhs) const {
+ assert(lhs->HasTTL() && rhs->HasTTL());
+ if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
+ return true;
+ }
+ if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
+ return false;
+ }
+ return lhs->BlobFileNumber() < rhs->BlobFileNumber();
+}
+
+BlobDBImpl::BlobDBImpl(const std::string& dbname,
+ const BlobDBOptions& blob_db_options,
+ const DBOptions& db_options,
+ const ColumnFamilyOptions& cf_options)
+ : BlobDB(),
+ dbname_(dbname),
+ db_impl_(nullptr),
+ env_(db_options.env),
+ bdb_options_(blob_db_options),
+ db_options_(db_options),
+ cf_options_(cf_options),
+ file_options_(db_options),
+ statistics_(db_options_.statistics.get()),
+ next_file_number_(1),
+ flush_sequence_(0),
+ closed_(true),
+ open_file_count_(0),
+ total_blob_size_(0),
+ live_sst_size_(0),
+ fifo_eviction_seq_(0),
+ evict_expiration_up_to_(0),
+ debug_level_(0) {
+ clock_ = env_->GetSystemClock().get();
+ blob_dir_ = (bdb_options_.path_relative)
+ ? dbname + "/" + bdb_options_.blob_dir
+ : bdb_options_.blob_dir;
+ file_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
+}
+
+BlobDBImpl::~BlobDBImpl() {
+ tqueue_.shutdown();
+ // CancelAllBackgroundWork(db_, true);
+ Status s __attribute__((__unused__)) = Close();
+ assert(s.ok());
+}
+
+Status BlobDBImpl::Close() {
+ if (closed_) {
+ return Status::OK();
+ }
+ closed_ = true;
+
+ // Close base DB before BlobDBImpl destructs to stop event listener and
+ // compaction filter call.
+ Status s = db_->Close();
+ // delete db_ anyway even if close failed.
+ delete db_;
+ // Reset pointers to avoid StackableDB delete the pointer again.
+ db_ = nullptr;
+ db_impl_ = nullptr;
+ if (!s.ok()) {
+ return s;
+ }
+
+ s = SyncBlobFiles();
+ return s;
+}
+
+BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
+
+Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
+ assert(handles != nullptr);
+ assert(db_ == nullptr);
+
+ if (blob_dir_.empty()) {
+ return Status::NotSupported("No blob directory in options");
+ }
+
+ if (bdb_options_.garbage_collection_cutoff < 0.0 ||
+ bdb_options_.garbage_collection_cutoff > 1.0) {
+ return Status::InvalidArgument(
+ "Garbage collection cutoff must be in the interval [0.0, 1.0]");
+ }
+
+ // Temporarily disable compactions in the base DB during open; save the user
+ // defined value beforehand so we can restore it once BlobDB is initialized.
+ // Note: this is only needed if garbage collection is enabled.
+ const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
+
+ if (bdb_options_.enable_garbage_collection) {
+ cf_options_.disable_auto_compactions = true;
+ }
+
+ Status s;
+
+ // Create info log.
+ if (db_options_.info_log == nullptr) {
+ s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
+
+ if ((cf_options_.compaction_filter != nullptr ||
+ cf_options_.compaction_filter_factory != nullptr)) {
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "BlobDB only support compaction filter on non-TTL values.");
+ }
+
+ // Open blob directory.
+ s = env_->CreateDirIfMissing(blob_dir_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to create blob_dir %s, status: %s",
+ blob_dir_.c_str(), s.ToString().c_str());
+ }
+ s = env_->GetFileSystem()->NewDirectory(blob_dir_, IOOptions(), &dir_ent_,
+ nullptr);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
+ s.ToString().c_str());
+ return s;
+ }
+
+ // Open blob files.
+ s = OpenAllBlobFiles();
+ if (!s.ok()) {
+ return s;
+ }
+
+ // Update options
+ if (bdb_options_.enable_garbage_collection) {
+ db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
+ cf_options_.compaction_filter_factory =
+ std::make_shared<BlobIndexCompactionFilterFactoryGC>(
+ this, clock_, cf_options_, statistics_);
+ } else {
+ db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
+ cf_options_.compaction_filter_factory =
+ std::make_shared<BlobIndexCompactionFilterFactory>(
+ this, clock_, cf_options_, statistics_);
+ }
+
+ // Reset user compaction filter after building into compaction factory.
+ cf_options_.compaction_filter = nullptr;
+
+ // Open base db.
+ ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
+ s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
+ if (!s.ok()) {
+ return s;
+ }
+ db_impl_ = static_cast_with_check<DBImpl>(db_->GetRootDB());
+
+ // Sanitize the blob_dir provided. Using a directory where the
+ // base DB stores its files for the default CF is not supported.
+ const ColumnFamilyData* const cfd =
+ static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
+ assert(cfd);
+
+ const ImmutableCFOptions* const ioptions = cfd->ioptions();
+ assert(ioptions);
+
+ assert(env_);
+
+ for (const auto& cf_path : ioptions->cf_paths) {
+ bool blob_dir_same_as_cf_dir = false;
+ s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Error while sanitizing blob_dir %s, status: %s",
+ blob_dir_.c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ if (blob_dir_same_as_cf_dir) {
+ return Status::NotSupported(
+ "Using the base DB's storage directories for BlobDB files is not "
+ "supported.");
+ }
+ }
+
+ // Initialize SST file <-> oldest blob file mapping if garbage collection
+ // is enabled.
+ if (bdb_options_.enable_garbage_collection) {
+ std::vector<LiveFileMetaData> live_files;
+ db_->GetLiveFilesMetaData(&live_files);
+
+ InitializeBlobFileToSstMapping(live_files);
+
+ MarkUnreferencedBlobFilesObsoleteDuringOpen();
+
+ if (!disable_auto_compactions) {
+ s = db_->EnableAutoCompaction(*handles);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ db_options_.info_log,
+ "Failed to enable automatic compactions during open, status: %s",
+ s.ToString().c_str());
+ return s;
+ }
+ }
+ }
+
+ // Add trash files in blob dir to file delete scheduler.
+ SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
+ db_impl_->immutable_db_options().sst_file_manager.get());
+ DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
+
+ UpdateLiveSSTSize();
+
+ // Start background jobs.
+ if (!bdb_options_.disable_background_tasks) {
+ StartBackgroundTasks();
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
+ bdb_options_.Dump(db_options_.info_log.get());
+ closed_ = false;
+ return s;
+}
+
+void BlobDBImpl::StartBackgroundTasks() {
+ // store a call to a member function and object
+ tqueue_.add(
+ kReclaimOpenFilesPeriodMillisecs,
+ std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
+ tqueue_.add(
+ kDeleteObsoleteFilesPeriodMillisecs,
+ std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
+ tqueue_.add(kSanityCheckPeriodMillisecs,
+ std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
+ tqueue_.add(
+ kEvictExpiredFilesPeriodMillisecs,
+ std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
+}
+
+Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
+ assert(file_numbers != nullptr);
+ std::vector<std::string> all_files;
+ Status s = env_->GetChildren(blob_dir_, &all_files);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to get list of blob files, status: %s",
+ s.ToString().c_str());
+ return s;
+ }
+
+ for (const auto& file_name : all_files) {
+ uint64_t file_number;
+ FileType type;
+ bool success = ParseFileName(file_name, &file_number, &type);
+ if (success && type == kBlobFile) {
+ file_numbers->insert(file_number);
+ } else {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Skipping file in blob directory: %s", file_name.c_str());
+ }
+ }
+
+ return s;
+}
+
+Status BlobDBImpl::OpenAllBlobFiles() {
+ std::set<uint64_t> file_numbers;
+ Status s = GetAllBlobFiles(&file_numbers);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (!file_numbers.empty()) {
+ next_file_number_.store(*file_numbers.rbegin() + 1);
+ }
+
+ std::ostringstream blob_file_oss;
+ std::ostringstream live_imm_oss;
+ std::ostringstream obsolete_file_oss;
+
+ for (auto& file_number : file_numbers) {
+ std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
+ this, blob_dir_, file_number, db_options_.info_log.get());
+ blob_file->MarkImmutable(/* sequence */ 0);
+
+ // Read file header and footer
+ Status read_metadata_status =
+ blob_file->ReadMetadata(env_->GetFileSystem(), file_options_);
+ if (read_metadata_status.IsCorruption()) {
+ // Remove incomplete file.
+ if (!obsolete_files_.empty()) {
+ obsolete_file_oss << ", ";
+ }
+ obsolete_file_oss << file_number;
+
+ ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
+ continue;
+ } else if (!read_metadata_status.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Unable to read metadata of blob file %" PRIu64
+ ", status: '%s'",
+ file_number, read_metadata_status.ToString().c_str());
+ return read_metadata_status;
+ }
+
+ total_blob_size_ += blob_file->GetFileSize();
+
+ if (!blob_files_.empty()) {
+ blob_file_oss << ", ";
+ }
+ blob_file_oss << file_number;
+
+ blob_files_[file_number] = blob_file;
+
+ if (!blob_file->HasTTL()) {
+ if (!live_imm_non_ttl_blob_files_.empty()) {
+ live_imm_oss << ", ";
+ }
+ live_imm_oss << file_number;
+
+ live_imm_non_ttl_blob_files_[file_number] = blob_file;
+ }
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
+ blob_file_oss.str().c_str());
+ ROCKS_LOG_INFO(
+ db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
+ live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Found %" ROCKSDB_PRIszt
+ " incomplete or corrupted blob files: %s",
+ obsolete_files_.size(), obsolete_file_oss.str().c_str());
+ return s;
+}
+
+template <typename Linker>
+void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
+ uint64_t blob_file_number,
+ Linker linker) {
+ assert(bdb_options_.enable_garbage_collection);
+ assert(blob_file_number != kInvalidBlobFileNumber);
+
+ auto it = blob_files_.find(blob_file_number);
+ if (it == blob_files_.end()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Blob file %" PRIu64
+ " not found while trying to link "
+ "SST file %" PRIu64,
+ blob_file_number, sst_file_number);
+ return;
+ }
+
+ BlobFile* const blob_file = it->second.get();
+ assert(blob_file);
+
+ linker(blob_file, sst_file_number);
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Blob file %" PRIu64 " linked to SST file %" PRIu64,
+ blob_file_number, sst_file_number);
+}
+
+void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
+ uint64_t blob_file_number) {
+ auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
+ WriteLock file_lock(&blob_file->mutex_);
+ blob_file->LinkSstFile(sst_file);
+ };
+
+ LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
+}
+
+void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
+ uint64_t blob_file_number) {
+ auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
+ blob_file->LinkSstFile(sst_file);
+ };
+
+ LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
+}
+
+void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
+ uint64_t blob_file_number) {
+ assert(bdb_options_.enable_garbage_collection);
+ assert(blob_file_number != kInvalidBlobFileNumber);
+
+ auto it = blob_files_.find(blob_file_number);
+ if (it == blob_files_.end()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Blob file %" PRIu64
+ " not found while trying to unlink "
+ "SST file %" PRIu64,
+ blob_file_number, sst_file_number);
+ return;
+ }
+
+ BlobFile* const blob_file = it->second.get();
+ assert(blob_file);
+
+ {
+ WriteLock file_lock(&blob_file->mutex_);
+ blob_file->UnlinkSstFile(sst_file_number);
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
+ blob_file_number, sst_file_number);
+}
+
+void BlobDBImpl::InitializeBlobFileToSstMapping(
+ const std::vector<LiveFileMetaData>& live_files) {
+ assert(bdb_options_.enable_garbage_collection);
+
+ for (const auto& live_file : live_files) {
+ const uint64_t sst_file_number = live_file.file_number;
+ const uint64_t blob_file_number = live_file.oldest_blob_file_number;
+
+ if (blob_file_number == kInvalidBlobFileNumber) {
+ continue;
+ }
+
+ LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
+ }
+}
+
+void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
+ assert(bdb_options_.enable_garbage_collection);
+
+ WriteLock lock(&mutex_);
+
+ if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
+ LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
+ }
+
+ assert(flush_sequence_ < info.largest_seqno);
+ flush_sequence_ = info.largest_seqno;
+
+ MarkUnreferencedBlobFilesObsolete();
+}
+
+void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
+ assert(bdb_options_.enable_garbage_collection);
+
+ if (!info.status.ok()) {
+ return;
+ }
+
+ // Note: the same SST file may appear in both the input and the output
+ // file list in case of a trivial move. We walk through the two lists
+ // below in a fashion that's similar to merge sort to detect this.
+
+ auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
+ return lhs.file_number < rhs.file_number;
+ };
+
+ auto inputs = info.input_file_infos;
+ auto iit = inputs.begin();
+ const auto iit_end = inputs.end();
+
+ std::sort(iit, iit_end, cmp);
+
+ auto outputs = info.output_file_infos;
+ auto oit = outputs.begin();
+ const auto oit_end = outputs.end();
+
+ std::sort(oit, oit_end, cmp);
+
+ WriteLock lock(&mutex_);
+
+ while (iit != iit_end && oit != oit_end) {
+ const auto& input = *iit;
+ const auto& output = *oit;
+
+ if (input.file_number == output.file_number) {
+ ++iit;
+ ++oit;
+ } else if (input.file_number < output.file_number) {
+ if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
+ UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
+ }
+
+ ++iit;
+ } else {
+ assert(output.file_number < input.file_number);
+
+ if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
+ LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
+ }
+
+ ++oit;
+ }
+ }
+
+ while (iit != iit_end) {
+ const auto& input = *iit;
+
+ if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
+ UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
+ }
+
+ ++iit;
+ }
+
+ while (oit != oit_end) {
+ const auto& output = *oit;
+
+ if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
+ LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
+ }
+
+ ++oit;
+ }
+
+ MarkUnreferencedBlobFilesObsolete();
+}
+
+bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
+ const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
+ assert(blob_file);
+ assert(!blob_file->HasTTL());
+ assert(blob_file->Immutable());
+ assert(bdb_options_.enable_garbage_collection);
+
+ // Note: FIFO eviction could have marked this file obsolete already.
+ if (blob_file->Obsolete()) {
+ return true;
+ }
+
+ // We cannot mark this file (or any higher-numbered files for that matter)
+ // obsolete if it is referenced by any memtables or SSTs. We keep track of
+ // the SSTs explicitly. To account for memtables, we keep track of the highest
+ // sequence number received in flush notifications, and we do not mark the
+ // blob file obsolete if there are still unflushed memtables from before
+ // the time the blob file was closed.
+ if (blob_file->GetImmutableSequence() > flush_sequence_ ||
+ !blob_file->GetLinkedSstFiles().empty()) {
+ return false;
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Blob file %" PRIu64 " is no longer needed, marking obsolete",
+ blob_file->BlobFileNumber());
+
+ ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
+ return true;
+}
+
+template <class Functor>
+void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
+ assert(bdb_options_.enable_garbage_collection);
+
+ // Iterate through all live immutable non-TTL blob files, and mark them
+ // obsolete assuming no SST files or memtables rely on the blobs in them.
+ // Note: we need to stop as soon as we find a blob file that has any
+ // linked SSTs (or one potentially referenced by memtables).
+
+ uint64_t obsoleted_files = 0;
+
+ auto it = live_imm_non_ttl_blob_files_.begin();
+ while (it != live_imm_non_ttl_blob_files_.end()) {
+ const auto& blob_file = it->second;
+ assert(blob_file);
+ assert(blob_file->BlobFileNumber() == it->first);
+ assert(!blob_file->HasTTL());
+ assert(blob_file->Immutable());
+
+ // Small optimization: Obsolete() does an atomic read, so we can do
+ // this check without taking a lock on the blob file's mutex.
+ if (blob_file->Obsolete()) {
+ it = live_imm_non_ttl_blob_files_.erase(it);
+ continue;
+ }
+
+ if (!mark_if_needed(blob_file)) {
+ break;
+ }
+
+ it = live_imm_non_ttl_blob_files_.erase(it);
+
+ ++obsoleted_files;
+ }
+
+ if (obsoleted_files > 0) {
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "%" PRIu64 " blob file(s) marked obsolete by GC",
+ obsoleted_files);
+ RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
+ }
+}
+
+void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
+ const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
+
+ MarkUnreferencedBlobFilesObsoleteImpl(
+ [this, obsolete_seq](const std::shared_ptr<BlobFile>& blob_file) {
+ WriteLock file_lock(&blob_file->mutex_);
+ return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
+ });
+}
+
+void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
+ MarkUnreferencedBlobFilesObsoleteImpl(
+ [this](const std::shared_ptr<BlobFile>& blob_file) {
+ return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
+ });
+}
+
+void BlobDBImpl::CloseRandomAccessLocked(
+ const std::shared_ptr<BlobFile>& bfile) {
+ bfile->CloseRandomAccessLocked();
+ open_file_count_--;
+}
+
+Status BlobDBImpl::GetBlobFileReader(
+ const std::shared_ptr<BlobFile>& blob_file,
+ std::shared_ptr<RandomAccessFileReader>* reader) {
+ assert(reader != nullptr);
+ bool fresh_open = false;
+ Status s = blob_file->GetReader(env_, file_options_, reader, &fresh_open);
+ if (s.ok() && fresh_open) {
+ assert(*reader != nullptr);
+ open_file_count_++;
+ }
+ return s;
+}
+
+std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
+ bool has_ttl, const ExpirationRange& expiration_range,
+ const std::string& reason) {
+ assert(has_ttl == (expiration_range.first || expiration_range.second));
+
+ uint64_t file_num = next_file_number_++;
+
+ const uint32_t column_family_id =
+ static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+ auto blob_file = std::make_shared<BlobFile>(
+ this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
+ bdb_options_.compression, has_ttl, expiration_range);
+
+ ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
+ blob_file->PathName().c_str(), reason.c_str());
+ LogFlush(db_options_.info_log);
+
+ return blob_file;
+}
+
+void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
+ const uint64_t blob_file_number = blob_file->BlobFileNumber();
+
+ auto it = blob_files_.lower_bound(blob_file_number);
+ assert(it == blob_files_.end() || it->first != blob_file_number);
+
+ blob_files_.insert(it,
+ std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
+ blob_file_number, std::move(blob_file)));
+}
+
+Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
+ std::string fpath(bfile->PathName());
+ std::unique_ptr<FSWritableFile> wfile;
+ const auto& fs = env_->GetFileSystem();
+
+ Status s = fs->ReopenWritableFile(fpath, file_options_, &wfile, nullptr);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to open blob file for write: %s status: '%s'"
+ " exists: '%s'",
+ fpath.c_str(), s.ToString().c_str(),
+ fs->FileExists(fpath, file_options_.io_options, nullptr)
+ .ToString()
+ .c_str());
+ return s;
+ }
+
+ std::unique_ptr<WritableFileWriter> fwriter;
+ fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, file_options_));
+
+ uint64_t boffset = bfile->GetFileSize();
+ if (debug_level_ >= 2 && boffset) {
+ ROCKS_LOG_DEBUG(db_options_.info_log,
+ "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
+ boffset);
+ }
+
+ BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
+ if (bfile->file_size_ == BlobLogHeader::kSize) {
+ et = BlobLogWriter::kEtFileHdr;
+ } else if (bfile->file_size_ > BlobLogHeader::kSize) {
+ et = BlobLogWriter::kEtRecord;
+ } else if (bfile->file_size_) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Open blob file: %s with wrong size: %" PRIu64,
+ fpath.c_str(), boffset);
+ return Status::Corruption("Invalid blob file size");
+ }
+
+ constexpr bool do_flush = true;
+
+ bfile->log_writer_ = std::make_shared<BlobLogWriter>(
+ std::move(fwriter), clock_, statistics_, bfile->file_number_,
+ db_options_.use_fsync, do_flush, boffset);
+ bfile->log_writer_->last_elem_type_ = et;
+
+ return s;
+}
+
+std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
+ uint64_t expiration) const {
+ if (open_ttl_files_.empty()) {
+ return nullptr;
+ }
+
+ std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
+ tmp->SetHasTTL(true);
+ tmp->expiration_range_ = std::make_pair(expiration, 0);
+ tmp->file_number_ = std::numeric_limits<uint64_t>::max();
+
+ auto citr = open_ttl_files_.equal_range(tmp);
+ if (citr.first == open_ttl_files_.end()) {
+ assert(citr.second == open_ttl_files_.end());
+
+ std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
+ return (check->expiration_range_.second <= expiration) ? nullptr : check;
+ }
+
+ if (citr.first != citr.second) {
+ return *(citr.first);
+ }
+
+ auto finditr = citr.second;
+ if (finditr != open_ttl_files_.begin()) {
+ --finditr;
+ }
+
+ bool b2 = (*finditr)->expiration_range_.second <= expiration;
+ bool b1 = (*finditr)->expiration_range_.first > expiration;
+
+ return (b1 || b2) ? nullptr : (*finditr);
+}
+
+Status BlobDBImpl::CheckOrCreateWriterLocked(
+ const std::shared_ptr<BlobFile>& blob_file,
+ std::shared_ptr<BlobLogWriter>* writer) {
+ assert(writer != nullptr);
+ *writer = blob_file->GetWriter();
+ if (*writer != nullptr) {
+ return Status::OK();
+ }
+ Status s = CreateWriterLocked(blob_file);
+ if (s.ok()) {
+ *writer = blob_file->GetWriter();
+ }
+ return s;
+}
+
+Status BlobDBImpl::CreateBlobFileAndWriter(
+ bool has_ttl, const ExpirationRange& expiration_range,
+ const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
+ std::shared_ptr<BlobLogWriter>* writer) {
+ TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
+ assert(has_ttl == (expiration_range.first || expiration_range.second));
+ assert(blob_file);
+ assert(writer);
+
+ *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
+ assert(*blob_file);
+
+ // file not visible, hence no lock
+ Status s = CheckOrCreateWriterLocked(*blob_file, writer);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to get writer for blob file: %s, error: %s",
+ (*blob_file)->PathName().c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ assert(*writer);
+
+ s = (*writer)->WriteHeader((*blob_file)->header_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to write header to new blob file: %s"
+ " status: '%s'",
+ (*blob_file)->PathName().c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ (*blob_file)->SetFileSize(BlobLogHeader::kSize);
+ total_blob_size_ += BlobLogHeader::kSize;
+
+ return s;
+}
+
+Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
+ assert(blob_file);
+
+ {
+ ReadLock rl(&mutex_);
+
+ if (open_non_ttl_file_) {
+ assert(!open_non_ttl_file_->Immutable());
+ *blob_file = open_non_ttl_file_;
+ return Status::OK();
+ }
+ }
+
+ // Check again
+ WriteLock wl(&mutex_);
+
+ if (open_non_ttl_file_) {
+ assert(!open_non_ttl_file_->Immutable());
+ *blob_file = open_non_ttl_file_;
+ return Status::OK();
+ }
+
+ std::shared_ptr<BlobLogWriter> writer;
+ const Status s = CreateBlobFileAndWriter(
+ /* has_ttl */ false, ExpirationRange(),
+ /* reason */ "SelectBlobFile", blob_file, &writer);
+ if (!s.ok()) {
+ return s;
+ }
+
+ RegisterBlobFile(*blob_file);
+ open_non_ttl_file_ = *blob_file;
+
+ return s;
+}
+
+Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
+ std::shared_ptr<BlobFile>* blob_file) {
+ assert(blob_file);
+ assert(expiration != kNoExpiration);
+
+ {
+ ReadLock rl(&mutex_);
+
+ *blob_file = FindBlobFileLocked(expiration);
+ if (*blob_file != nullptr) {
+ assert(!(*blob_file)->Immutable());
+ return Status::OK();
+ }
+ }
+
+ // Check again
+ WriteLock wl(&mutex_);
+
+ *blob_file = FindBlobFileLocked(expiration);
+ if (*blob_file != nullptr) {
+ assert(!(*blob_file)->Immutable());
+ return Status::OK();
+ }
+
+ const uint64_t exp_low =
+ (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
+ const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
+ const ExpirationRange expiration_range(exp_low, exp_high);
+
+ std::ostringstream oss;
+ oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
+
+ std::shared_ptr<BlobLogWriter> writer;
+ const Status s =
+ CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
+ /* reason */ oss.str(), blob_file, &writer);
+ if (!s.ok()) {
+ return s;
+ }
+
+ RegisterBlobFile(*blob_file);
+ open_ttl_files_.insert(*blob_file);
+
+ return s;
+}
+
+class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
+ private:
+ const WriteOptions& options_;
+ BlobDBImpl* blob_db_impl_;
+ uint32_t default_cf_id_;
+ WriteBatch batch_;
+
+ public:
+ BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
+ uint32_t default_cf_id)
+ : options_(options),
+ blob_db_impl_(blob_db_impl),
+ default_cf_id_(default_cf_id) {}
+
+ WriteBatch* batch() { return &batch_; }
+
+ Status PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override {
+ if (column_family_id != default_cf_id_) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
+ &batch_);
+ return s;
+ }
+
+ Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
+ if (column_family_id != default_cf_id_) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
+ return s;
+ }
+
+ virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
+ const Slice& end_key) {
+ if (column_family_id != default_cf_id_) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
+ begin_key, end_key);
+ return s;
+ }
+
+ Status SingleDeleteCF(uint32_t /*column_family_id*/,
+ const Slice& /*key*/) override {
+ return Status::NotSupported("Not supported operation in blob db.");
+ }
+
+ Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
+ const Slice& /*value*/) override {
+ return Status::NotSupported("Not supported operation in blob db.");
+ }
+
+ void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
+};
+
+Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
+ StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_WRITE);
+ uint32_t default_cf_id =
+ static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+ ->GetID();
+ Status s;
+ BlobInserter blob_inserter(options, this, default_cf_id);
+ {
+ // Release write_mutex_ before DB write to avoid race condition with
+ // flush begin listener, which also require write_mutex_ to sync
+ // blob files.
+ MutexLock l(&write_mutex_);
+ s = updates->Iterate(&blob_inserter);
+ }
+ if (!s.ok()) {
+ return s;
+ }
+ return db_->Write(options, blob_inserter.batch());
+}
+
+Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
+ const Slice& value) {
+ return PutUntil(options, key, value, kNoExpiration);
+}
+
+Status BlobDBImpl::PutWithTTL(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t ttl) {
+ uint64_t now = EpochNow();
+ uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
+ return PutUntil(options, key, value, expiration);
+}
+
+Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t expiration) {
+ StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_PUT);
+ Status s;
+ WriteBatch batch;
+ {
+ // Release write_mutex_ before DB write to avoid race condition with
+ // flush begin listener, which also require write_mutex_ to sync
+ // blob files.
+ MutexLock l(&write_mutex_);
+ s = PutBlobValue(options, key, value, expiration, &batch);
+ }
+ if (s.ok()) {
+ s = db_->Write(options, &batch);
+ }
+ return s;
+}
+
+Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
+ const Slice& key, const Slice& value,
+ uint64_t expiration, WriteBatch* batch) {
+ write_mutex_.AssertHeld();
+ Status s;
+ std::string index_entry;
+ uint32_t column_family_id =
+ static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+ ->GetID();
+ if (value.size() < bdb_options_.min_blob_size) {
+ if (expiration == kNoExpiration) {
+ // Put as normal value
+ s = batch->Put(key, value);
+ RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
+ } else {
+ // Inlined with TTL
+ BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
+ s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
+ index_entry);
+ RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
+ }
+ } else {
+ std::string compression_output;
+ Slice value_compressed = GetCompressedSlice(value, &compression_output);
+
+ std::string headerbuf;
+ BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
+ expiration);
+
+ // Check DB size limit before selecting blob file to
+ // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
+ // done before calling SelectBlobFile().
+ s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
+ value_compressed.size());
+ if (!s.ok()) {
+ return s;
+ }
+
+ std::shared_ptr<BlobFile> blob_file;
+ if (expiration != kNoExpiration) {
+ s = SelectBlobFileTTL(expiration, &blob_file);
+ } else {
+ s = SelectBlobFile(&blob_file);
+ }
+ if (s.ok()) {
+ assert(blob_file != nullptr);
+ assert(blob_file->GetCompressionType() == bdb_options_.compression);
+ s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
+ &index_entry);
+ }
+ if (s.ok()) {
+ if (expiration != kNoExpiration) {
+ WriteLock file_lock(&blob_file->mutex_);
+ blob_file->ExtendExpirationRange(expiration);
+ }
+ s = CloseBlobFileIfNeeded(blob_file);
+ }
+ if (s.ok()) {
+ s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
+ index_entry);
+ }
+ if (s.ok()) {
+ if (expiration == kNoExpiration) {
+ RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
+ } else {
+ RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
+ }
+ } else {
+ ROCKS_LOG_ERROR(
+ db_options_.info_log,
+ "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
+ " status: '%s' blob_file: '%s'",
+ blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
+ s.ToString().c_str(), blob_file->DumpState().c_str());
+ }
+ }
+
+ RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
+ RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
+ RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
+ RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
+
+ return s;
+}
+
+Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
+ std::string* compression_output) const {
+ if (bdb_options_.compression == kNoCompression) {
+ return raw;
+ }
+ StopWatch compression_sw(clock_, statistics_, BLOB_DB_COMPRESSION_MICROS);
+ CompressionType type = bdb_options_.compression;
+ CompressionOptions opts;
+ CompressionContext context(type);
+ CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
+ 0 /* sample_for_compression */);
+ CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
+ compression_output, nullptr, nullptr);
+ return *compression_output;
+}
+
+Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
+ CompressionType compression_type,
+ PinnableSlice* value_output) const {
+ assert(compression_type != kNoCompression);
+
+ BlockContents contents;
+ auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
+
+ {
+ StopWatch decompression_sw(clock_, statistics_,
+ BLOB_DB_DECOMPRESSION_MICROS);
+ UncompressionContext context(compression_type);
+ UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
+ compression_type);
+ Status s = UncompressBlockData(
+ info, compressed_value.data(), compressed_value.size(), &contents,
+ kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
+ if (!s.ok()) {
+ return Status::Corruption("Unable to decompress blob.");
+ }
+ }
+
+ value_output->PinSelf(contents.data);
+
+ return Status::OK();
+}
+
+Status BlobDBImpl::CompactFiles(
+ const CompactionOptions& compact_options,
+ const std::vector<std::string>& input_file_names, const int output_level,
+ const int output_path_id, std::vector<std::string>* const output_file_names,
+ CompactionJobInfo* compaction_job_info) {
+ // Note: we need CompactionJobInfo to be able to track updates to the
+ // blob file <-> SST mappings, so we provide one if the user hasn't,
+ // assuming that GC is enabled.
+ CompactionJobInfo info{};
+ if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
+ compaction_job_info = &info;
+ }
+
+ const Status s =
+ db_->CompactFiles(compact_options, input_file_names, output_level,
+ output_path_id, output_file_names, compaction_job_info);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (bdb_options_.enable_garbage_collection) {
+ assert(compaction_job_info);
+ ProcessCompactionJobInfo(*compaction_job_info);
+ }
+
+ return s;
+}
+
+void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
+ assert(context);
+
+ context->blob_db_impl = this;
+ context->next_file_number = next_file_number_.load();
+ context->current_blob_files.clear();
+ for (auto& p : blob_files_) {
+ context->current_blob_files.insert(p.first);
+ }
+ context->fifo_eviction_seq = fifo_eviction_seq_;
+ context->evict_expiration_up_to = evict_expiration_up_to_;
+}
+
+void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
+ assert(context);
+
+ ReadLock l(&mutex_);
+ GetCompactionContextCommon(context);
+}
+
+void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
+ BlobCompactionContextGC* context_gc) {
+ assert(context);
+ assert(context_gc);
+
+ ReadLock l(&mutex_);
+ GetCompactionContextCommon(context);
+
+ if (!live_imm_non_ttl_blob_files_.empty()) {
+ auto it = live_imm_non_ttl_blob_files_.begin();
+ std::advance(it, bdb_options_.garbage_collection_cutoff *
+ live_imm_non_ttl_blob_files_.size());
+ context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
+ ? it->first
+ : std::numeric_limits<uint64_t>::max();
+ }
+}
+
+void BlobDBImpl::UpdateLiveSSTSize() {
+ uint64_t live_sst_size = 0;
+ bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
+ if (ok) {
+ live_sst_size_.store(live_sst_size);
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Updated total SST file size: %" PRIu64 " bytes.",
+ live_sst_size);
+ } else {
+ ROCKS_LOG_ERROR(
+ db_options_.info_log,
+ "Failed to update total SST file size after flush or compaction.");
+ }
+ {
+ // Trigger FIFO eviction if needed.
+ MutexLock l(&write_mutex_);
+ Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
+ if (s.IsNoSpace()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "DB grow out-of-space after SST size updated. Current live"
+ " SST size: %" PRIu64
+ " , current blob files size: %" PRIu64 ".",
+ live_sst_size_.load(), total_blob_size_.load());
+ }
+ }
+}
+
+Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
+ bool force_evict) {
+ write_mutex_.AssertHeld();
+
+ uint64_t live_sst_size = live_sst_size_.load();
+ if (bdb_options_.max_db_size == 0 ||
+ live_sst_size + total_blob_size_.load() + blob_size <=
+ bdb_options_.max_db_size) {
+ return Status::OK();
+ }
+
+ if (bdb_options_.is_fifo == false ||
+ (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
+ // FIFO eviction is disabled, or no space to insert new blob even we evict
+ // all blob files.
+ return Status::NoSpace(
+ "Write failed, as writing it would exceed max_db_size limit.");
+ }
+
+ std::vector<std::shared_ptr<BlobFile>> candidate_files;
+ CopyBlobFiles(&candidate_files);
+ std::sort(candidate_files.begin(), candidate_files.end(),
+ BlobFileComparator());
+ fifo_eviction_seq_ = GetLatestSequenceNumber();
+
+ WriteLock l(&mutex_);
+
+ while (!candidate_files.empty() &&
+ live_sst_size + total_blob_size_.load() + blob_size >
+ bdb_options_.max_db_size) {
+ std::shared_ptr<BlobFile> blob_file = candidate_files.back();
+ candidate_files.pop_back();
+ WriteLock file_lock(&blob_file->mutex_);
+ if (blob_file->Obsolete()) {
+ // File already obsoleted by someone else.
+ assert(blob_file->Immutable());
+ continue;
+ }
+ // FIFO eviction can evict open blob files.
+ if (!blob_file->Immutable()) {
+ Status s = CloseBlobFile(blob_file);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ assert(blob_file->Immutable());
+ auto expiration_range = blob_file->GetExpirationRange();
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Evict oldest blob file since DB out of space. Current "
+ "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
+ ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
+ ".",
+ live_sst_size, total_blob_size_.load(),
+ bdb_options_.max_db_size, blob_file->BlobFileNumber());
+ ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
+ evict_expiration_up_to_ = expiration_range.first;
+ RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
+ RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
+ blob_file->BlobCount());
+ RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
+ blob_file->GetFileSize());
+ TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
+ }
+ if (live_sst_size + total_blob_size_.load() + blob_size >
+ bdb_options_.max_db_size) {
+ return Status::NoSpace(
+ "Write failed, as writing it would exceed max_db_size limit.");
+ }
+ return Status::OK();
+}
+
+Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
+ const std::string& headerbuf, const Slice& key,
+ const Slice& value, uint64_t expiration,
+ std::string* index_entry) {
+ Status s;
+ uint64_t blob_offset = 0;
+ uint64_t key_offset = 0;
+ {
+ WriteLock lockbfile_w(&bfile->mutex_);
+ std::shared_ptr<BlobLogWriter> writer;
+ s = CheckOrCreateWriterLocked(bfile, &writer);
+ if (!s.ok()) {
+ return s;
+ }
+
+ // write the blob to the blob log.
+ s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
+ &blob_offset);
+ }
+
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Invalid status in AppendBlob: %s status: '%s'",
+ bfile->PathName().c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ uint64_t size_put = headerbuf.size() + key.size() + value.size();
+ bfile->BlobRecordAdded(size_put);
+ total_blob_size_ += size_put;
+
+ if (expiration == kNoExpiration) {
+ BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
+ value.size(), bdb_options_.compression);
+ } else {
+ BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
+ blob_offset, value.size(),
+ bdb_options_.compression);
+ }
+
+ return s;
+}
+
+std::vector<Status> BlobDBImpl::MultiGet(const ReadOptions& read_options,
+ const std::vector<Slice>& keys,
+ std::vector<std::string>* values) {
+ StopWatch multiget_sw(clock_, statistics_, BLOB_DB_MULTIGET_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
+ // Get a snapshot to avoid blob file get deleted between we
+ // fetch and index entry and reading from the file.
+ ReadOptions ro(read_options);
+ bool snapshot_created = SetSnapshotIfNeeded(&ro);
+
+ std::vector<Status> statuses;
+ statuses.reserve(keys.size());
+ values->clear();
+ values->reserve(keys.size());
+ PinnableSlice value;
+ for (size_t i = 0; i < keys.size(); i++) {
+ statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
+ values->push_back(value.ToString());
+ value.Reset();
+ }
+ if (snapshot_created) {
+ db_->ReleaseSnapshot(ro.snapshot);
+ }
+ return statuses;
+}
+
+bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
+ assert(read_options != nullptr);
+ if (read_options->snapshot != nullptr) {
+ return false;
+ }
+ read_options->snapshot = db_->GetSnapshot();
+ return true;
+}
+
+Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
+ PinnableSlice* value, uint64_t* expiration) {
+ assert(value);
+
+ BlobIndex blob_index;
+ Status s = blob_index.DecodeFrom(index_entry);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
+ return Status::NotFound("Key expired");
+ }
+
+ if (expiration != nullptr) {
+ if (blob_index.HasTTL()) {
+ *expiration = blob_index.expiration();
+ } else {
+ *expiration = kNoExpiration;
+ }
+ }
+
+ if (blob_index.IsInlined()) {
+ // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
+ // memory buffer to avoid extra copy.
+ value->PinSelf(blob_index.value());
+ return Status::OK();
+ }
+
+ CompressionType compression_type = kNoCompression;
+ s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
+ blob_index.size(), value, &compression_type);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (compression_type != kNoCompression) {
+ s = DecompressSlice(*value, compression_type, value);
+ if (!s.ok()) {
+ if (debug_level_ >= 2) {
+ ROCKS_LOG_ERROR(
+ db_options_.info_log,
+ "Uncompression error during blob read from file: %" PRIu64
+ " blob_offset: %" PRIu64 " blob_size: %" PRIu64
+ " key: %s status: '%s'",
+ blob_index.file_number(), blob_index.offset(), blob_index.size(),
+ key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
+ }
+ return s;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
+ uint64_t offset, uint64_t size,
+ PinnableSlice* value,
+ CompressionType* compression_type) {
+ assert(value);
+ assert(compression_type);
+ assert(*compression_type == kNoCompression);
+
+ if (!size) {
+ value->PinSelf("");
+ return Status::OK();
+ }
+
+ // offset has to have certain min, as we will read CRC
+ // later from the Blob Header, which needs to be also a
+ // valid offset.
+ if (offset <
+ (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
+ if (debug_level_ >= 2) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Invalid blob index file_number: %" PRIu64
+ " blob_offset: %" PRIu64 " blob_size: %" PRIu64
+ " key: %s",
+ file_number, offset, size,
+ key.ToString(/* output_hex */ true).c_str());
+ }
+
+ return Status::NotFound("Invalid blob offset");
+ }
+
+ std::shared_ptr<BlobFile> blob_file;
+
+ {
+ ReadLock rl(&mutex_);
+ auto it = blob_files_.find(file_number);
+
+ // file was deleted
+ if (it == blob_files_.end()) {
+ return Status::NotFound("Blob Not Found as blob file missing");
+ }
+
+ blob_file = it->second;
+ }
+
+ *compression_type = blob_file->GetCompressionType();
+
+ // takes locks when called
+ std::shared_ptr<RandomAccessFileReader> reader;
+ Status s = GetBlobFileReader(blob_file, &reader);
+ if (!s.ok()) {
+ return s;
+ }
+
+ assert(offset >= key.size() + sizeof(uint32_t));
+ const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
+ const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
+
+ // Allocate the buffer. This is safe in C++11
+ std::string buf;
+ AlignedBuf aligned_buf;
+
+ // A partial blob record contain checksum, key and value.
+ Slice blob_record;
+
+ {
+ StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
+ // TODO: rate limit old blob DB file reads.
+ if (reader->use_direct_io()) {
+ s = reader->Read(IOOptions(), record_offset,
+ static_cast<size_t>(record_size), &blob_record, nullptr,
+ &aligned_buf, Env::IO_TOTAL /* rate_limiter_priority */);
+ } else {
+ buf.reserve(static_cast<size_t>(record_size));
+ s = reader->Read(IOOptions(), record_offset,
+ static_cast<size_t>(record_size), &blob_record, &buf[0],
+ nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
+ }
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
+ }
+
+ if (!s.ok()) {
+ ROCKS_LOG_DEBUG(
+ db_options_.info_log,
+ "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
+ ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
+ file_number, offset, size, key.size(), s.ToString().c_str());
+ return s;
+ }
+
+ if (blob_record.size() != record_size) {
+ ROCKS_LOG_DEBUG(
+ db_options_.info_log,
+ "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
+ ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
+ ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
+ file_number, offset, size, key.size(), blob_record.size(), record_size);
+
+ return Status::Corruption("Failed to retrieve blob from blob index.");
+ }
+
+ Slice crc_slice(blob_record.data(), sizeof(uint32_t));
+ Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
+ static_cast<size_t>(size));
+
+ uint32_t crc_exp = 0;
+ if (!GetFixed32(&crc_slice, &crc_exp)) {
+ ROCKS_LOG_DEBUG(
+ db_options_.info_log,
+ "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
+ ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
+ file_number, offset, size, key.size(), s.ToString().c_str());
+ return Status::Corruption("Unable to decode checksum.");
+ }
+
+ uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
+ blob_record.size() - sizeof(uint32_t));
+ crc = crc32c::Mask(crc); // Adjust for storage
+ if (crc != crc_exp) {
+ if (debug_level_ >= 2) {
+ ROCKS_LOG_ERROR(
+ db_options_.info_log,
+ "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
+ " blob_size: %" PRIu64 " key: %s status: '%s'",
+ file_number, offset, size,
+ key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
+ }
+
+ return Status::Corruption("Corruption. Blob CRC mismatch");
+ }
+
+ value->PinSelf(blob_value);
+
+ return Status::OK();
+}
+
+Status BlobDBImpl::Get(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) {
+ return Get(read_options, column_family, key, value,
+ static_cast<uint64_t*>(nullptr) /*expiration*/);
+}
+
+Status BlobDBImpl::Get(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration) {
+ StopWatch get_sw(clock_, statistics_, BLOB_DB_GET_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_GET);
+ return GetImpl(read_options, column_family, key, value, expiration);
+}
+
+Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration) {
+ if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ // Get a snapshot to avoid blob file get deleted between we
+ // fetch and index entry and reading from the file.
+ // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
+ ReadOptions ro(read_options);
+ bool snapshot_created = SetSnapshotIfNeeded(&ro);
+
+ PinnableSlice index_entry;
+ Status s;
+ bool is_blob_index = false;
+ DBImpl::GetImplOptions get_impl_options;
+ get_impl_options.column_family = column_family;
+ get_impl_options.value = &index_entry;
+ get_impl_options.is_blob_index = &is_blob_index;
+ s = db_impl_->GetImpl(ro, key, get_impl_options);
+ if (expiration != nullptr) {
+ *expiration = kNoExpiration;
+ }
+ RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
+ if (s.ok()) {
+ if (is_blob_index) {
+ s = GetBlobValue(key, index_entry, value, expiration);
+ } else {
+ // The index entry is the value itself in this case.
+ value->PinSelf(index_entry);
+ }
+ RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
+ }
+ if (snapshot_created) {
+ db_->ReleaseSnapshot(ro.snapshot);
+ }
+ return s;
+}
+
+std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
+ if (aborted) {
+ return std::make_pair(false, -1);
+ }
+
+ ReadLock rl(&mutex_);
+
+ ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
+ ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
+ blob_files_.size());
+ ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
+ open_ttl_files_.size());
+
+ for (const auto& blob_file : open_ttl_files_) {
+ (void)blob_file;
+ assert(!blob_file->Immutable());
+ }
+
+ for (const auto& pair : live_imm_non_ttl_blob_files_) {
+ const auto& blob_file = pair.second;
+ (void)blob_file;
+ assert(!blob_file->HasTTL());
+ assert(blob_file->Immutable());
+ }
+
+ uint64_t now = EpochNow();
+
+ for (auto blob_file_pair : blob_files_) {
+ auto blob_file = blob_file_pair.second;
+ std::ostringstream buf;
+
+ buf << "Blob file " << blob_file->BlobFileNumber() << ", size "
+ << blob_file->GetFileSize() << ", blob count " << blob_file->BlobCount()
+ << ", immutable " << blob_file->Immutable();
+
+ if (blob_file->HasTTL()) {
+ ExpirationRange expiration_range;
+ {
+ ReadLock file_lock(&blob_file->mutex_);
+ expiration_range = blob_file->GetExpirationRange();
+ }
+ buf << ", expiration range (" << expiration_range.first << ", "
+ << expiration_range.second << ")";
+
+ if (!blob_file->Obsolete()) {
+ buf << ", expire in " << (expiration_range.second - now) << "seconds";
+ }
+ }
+ if (blob_file->Obsolete()) {
+ buf << ", obsolete at " << blob_file->GetObsoleteSequence();
+ }
+ buf << ".";
+ ROCKS_LOG_INFO(db_options_.info_log, "%s", buf.str().c_str());
+ }
+
+ // reschedule
+ return std::make_pair(true, -1);
+}
+
+Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
+ TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
+ assert(bfile);
+ assert(!bfile->Immutable());
+ assert(!bfile->Obsolete());
+
+ if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
+ write_mutex_.AssertHeld();
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Closing blob file %" PRIu64 ". Path: %s",
+ bfile->BlobFileNumber(), bfile->PathName().c_str());
+
+ const SequenceNumber sequence = GetLatestSequenceNumber();
+
+ const Status s = bfile->WriteFooterAndCloseLocked(sequence);
+
+ if (s.ok()) {
+ total_blob_size_ += BlobLogFooter::kSize;
+ } else {
+ bfile->MarkImmutable(sequence);
+
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to close blob file %" PRIu64 "with error: %s",
+ bfile->BlobFileNumber(), s.ToString().c_str());
+ }
+
+ if (bfile->HasTTL()) {
+ size_t erased __attribute__((__unused__));
+ erased = open_ttl_files_.erase(bfile);
+ } else {
+ if (bfile == open_non_ttl_file_) {
+ open_non_ttl_file_ = nullptr;
+ }
+
+ const uint64_t blob_file_number = bfile->BlobFileNumber();
+ auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
+ assert(it == live_imm_non_ttl_blob_files_.end() ||
+ it->first != blob_file_number);
+ live_imm_non_ttl_blob_files_.insert(
+ it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
+ blob_file_number, bfile));
+ }
+
+ return s;
+}
+
+Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
+ write_mutex_.AssertHeld();
+
+ // atomic read
+ if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
+ return Status::OK();
+ }
+
+ WriteLock lock(&mutex_);
+ WriteLock file_lock(&bfile->mutex_);
+
+ assert(!bfile->Obsolete() || bfile->Immutable());
+ if (bfile->Immutable()) {
+ return Status::OK();
+ }
+
+ return CloseBlobFile(bfile);
+}
+
+void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
+ SequenceNumber obsolete_seq,
+ bool update_size) {
+ assert(blob_file->Immutable());
+ assert(!blob_file->Obsolete());
+
+ // Should hold write lock of mutex_ or during DB open.
+ blob_file->MarkObsolete(obsolete_seq);
+ obsolete_files_.push_back(blob_file);
+ assert(total_blob_size_.load() >= blob_file->GetFileSize());
+ if (update_size) {
+ total_blob_size_ -= blob_file->GetFileSize();
+ }
+}
+
+bool BlobDBImpl::VisibleToActiveSnapshot(
+ const std::shared_ptr<BlobFile>& bfile) {
+ assert(bfile->Obsolete());
+
+ // We check whether the oldest snapshot is no less than the last sequence
+ // by the time the blob file become obsolete. If so, the blob file is not
+ // visible to all existing snapshots.
+ //
+ // If we keep track of the earliest sequence of the keys in the blob file,
+ // we could instead check if there's a snapshot falls in range
+ // [earliest_sequence, obsolete_sequence). But doing so will make the
+ // implementation more complicated.
+ SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
+ SequenceNumber oldest_snapshot = kMaxSequenceNumber;
+ {
+ // Need to lock DBImpl mutex before access snapshot list.
+ InstrumentedMutexLock l(db_impl_->mutex());
+ auto& snapshots = db_impl_->snapshots();
+ if (!snapshots.empty()) {
+ oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
+ }
+ }
+ bool visible = oldest_snapshot < obsolete_sequence;
+ if (visible) {
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
+ ") visible to oldest snapshot %" PRIu64 ".",
+ bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
+ }
+ return visible;
+}
+
+std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
+ if (aborted) {
+ return std::make_pair(false, -1);
+ }
+
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
+
+ std::vector<std::shared_ptr<BlobFile>> process_files;
+ uint64_t now = EpochNow();
+ {
+ ReadLock rl(&mutex_);
+ for (auto p : blob_files_) {
+ auto& blob_file = p.second;
+ ReadLock file_lock(&blob_file->mutex_);
+ if (blob_file->HasTTL() && !blob_file->Obsolete() &&
+ blob_file->GetExpirationRange().second <= now) {
+ process_files.push_back(blob_file);
+ }
+ }
+ }
+
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
+ TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
+
+ SequenceNumber seq = GetLatestSequenceNumber();
+ {
+ MutexLock l(&write_mutex_);
+ WriteLock lock(&mutex_);
+ for (auto& blob_file : process_files) {
+ WriteLock file_lock(&blob_file->mutex_);
+
+ // Need to double check if the file is obsolete.
+ if (blob_file->Obsolete()) {
+ assert(blob_file->Immutable());
+ continue;
+ }
+
+ if (!blob_file->Immutable()) {
+ CloseBlobFile(blob_file);
+ }
+
+ assert(blob_file->Immutable());
+
+ ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
+ }
+ }
+
+ return std::make_pair(true, -1);
+}
+
+Status BlobDBImpl::SyncBlobFiles() {
+ MutexLock l(&write_mutex_);
+
+ std::vector<std::shared_ptr<BlobFile>> process_files;
+ {
+ ReadLock rl(&mutex_);
+ for (auto fitr : open_ttl_files_) {
+ process_files.push_back(fitr);
+ }
+ if (open_non_ttl_file_ != nullptr) {
+ process_files.push_back(open_non_ttl_file_);
+ }
+ }
+
+ Status s;
+ for (auto& blob_file : process_files) {
+ s = blob_file->Fsync();
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to sync blob file %" PRIu64 ", status: %s",
+ blob_file->BlobFileNumber(), s.ToString().c_str());
+ return s;
+ }
+ }
+
+ s = dir_ent_->FsyncWithDirOptions(IOOptions(), nullptr, DirFsyncOptions());
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to sync blob directory, status: %s",
+ s.ToString().c_str());
+ }
+ return s;
+}
+
+std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
+ if (aborted) return std::make_pair(false, -1);
+
+ if (open_file_count_.load() < kOpenFilesTrigger) {
+ return std::make_pair(true, -1);
+ }
+
+ // in the future, we should sort by last_access_
+ // instead of closing every file
+ ReadLock rl(&mutex_);
+ for (auto const& ent : blob_files_) {
+ auto bfile = ent.second;
+ if (bfile->last_access_.load() == -1) continue;
+
+ WriteLock lockbfile_w(&bfile->mutex_);
+ CloseRandomAccessLocked(bfile);
+ }
+
+ return std::make_pair(true, -1);
+}
+
+std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
+ if (aborted) {
+ return std::make_pair(false, -1);
+ }
+
+ MutexLock delete_file_lock(&delete_file_mutex_);
+ if (disable_file_deletions_ > 0) {
+ return std::make_pair(true, -1);
+ }
+
+ std::list<std::shared_ptr<BlobFile>> tobsolete;
+ {
+ WriteLock wl(&mutex_);
+ if (obsolete_files_.empty()) {
+ return std::make_pair(true, -1);
+ }
+ tobsolete.swap(obsolete_files_);
+ }
+
+ bool file_deleted = false;
+ for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
+ auto bfile = *iter;
+ {
+ ReadLock lockbfile_r(&bfile->mutex_);
+ if (VisibleToActiveSnapshot(bfile)) {
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Could not delete file due to snapshot failure %s",
+ bfile->PathName().c_str());
+ ++iter;
+ continue;
+ }
+ }
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Will delete file due to snapshot success %s",
+ bfile->PathName().c_str());
+
+ {
+ WriteLock wl(&mutex_);
+ blob_files_.erase(bfile->BlobFileNumber());
+ }
+
+ Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
+ bfile->PathName(), blob_dir_, true,
+ /*force_fg=*/false);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "File failed to be deleted as obsolete %s",
+ bfile->PathName().c_str());
+ ++iter;
+ continue;
+ }
+
+ file_deleted = true;
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "File deleted as obsolete from blob dir %s",
+ bfile->PathName().c_str());
+
+ iter = tobsolete.erase(iter);
+ }
+
+ // directory change. Fsync
+ if (file_deleted) {
+ Status s = dir_ent_->FsyncWithDirOptions(
+ IOOptions(), nullptr,
+ DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted));
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
+ blob_dir_.c_str(), s.ToString().c_str());
+ }
+ }
+
+ // put files back into obsolete if for some reason, delete failed
+ if (!tobsolete.empty()) {
+ WriteLock wl(&mutex_);
+ for (auto bfile : tobsolete) {
+ blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
+ obsolete_files_.push_front(bfile);
+ }
+ }
+
+ return std::make_pair(!aborted, -1);
+}
+
+void BlobDBImpl::CopyBlobFiles(
+ std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
+ ReadLock rl(&mutex_);
+ for (auto const& p : blob_files_) {
+ bfiles_copy->push_back(p.second);
+ }
+}
+
+Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
+ auto* cfd =
+ static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
+ ->cfd();
+ // Get a snapshot to avoid blob file get deleted between we
+ // fetch and index entry and reading from the file.
+ ManagedSnapshot* own_snapshot = nullptr;
+ const Snapshot* snapshot = read_options.snapshot;
+ if (snapshot == nullptr) {
+ own_snapshot = new ManagedSnapshot(db_);
+ snapshot = own_snapshot->snapshot();
+ }
+ auto* iter = db_impl_->NewIteratorImpl(
+ read_options, cfd, snapshot->GetSequenceNumber(),
+ nullptr /*read_callback*/, true /*expose_blob_index*/);
+ return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_);
+}
+
+Status DestroyBlobDB(const std::string& dbname, const Options& options,
+ const BlobDBOptions& bdb_options) {
+ const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
+ Env* env = soptions.env;
+
+ Status status;
+ std::string blobdir;
+ blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
+ : bdb_options.blob_dir;
+
+ std::vector<std::string> filenames;
+ if (env->GetChildren(blobdir, &filenames).ok()) {
+ for (const auto& f : filenames) {
+ uint64_t number;
+ FileType type;
+ if (ParseFileName(f, &number, &type) && type == kBlobFile) {
+ Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
+ /*force_fg=*/false);
+ if (status.ok() && !del.ok()) {
+ status = del;
+ }
+ }
+ }
+ // TODO: What to do if we cannot delete the directory?
+ env->DeleteDir(blobdir).PermitUncheckedError();
+ }
+ Status destroy = DestroyDB(dbname, options);
+ if (status.ok() && !destroy.ok()) {
+ status = destroy;
+ }
+
+ return status;
+}
+
+#ifndef NDEBUG
+Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
+ PinnableSlice* value) {
+ return GetBlobValue(key, index_entry, value);
+}
+
+void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
+ SequenceNumber immutable_sequence) {
+ auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
+ db_options_.info_log.get());
+ blob_file->MarkImmutable(immutable_sequence);
+
+ blob_files_[blob_file_number] = blob_file;
+ live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
+}
+
+std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
+ ReadLock l(&mutex_);
+ std::vector<std::shared_ptr<BlobFile>> blob_files;
+ for (auto& p : blob_files_) {
+ blob_files.emplace_back(p.second);
+ }
+ return blob_files;
+}
+
+std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
+ const {
+ ReadLock l(&mutex_);
+ std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
+ for (const auto& pair : live_imm_non_ttl_blob_files_) {
+ live_imm_non_ttl_files.emplace_back(pair.second);
+ }
+ return live_imm_non_ttl_files;
+}
+
+std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
+ const {
+ ReadLock l(&mutex_);
+ std::vector<std::shared_ptr<BlobFile>> obsolete_files;
+ for (auto& bfile : obsolete_files_) {
+ obsolete_files.emplace_back(bfile);
+ }
+ return obsolete_files;
+}
+
+void BlobDBImpl::TEST_DeleteObsoleteFiles() {
+ DeleteObsoleteFiles(false /*abort*/);
+}
+
+Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
+ MutexLock l(&write_mutex_);
+ WriteLock lock(&mutex_);
+ WriteLock file_lock(&bfile->mutex_);
+
+ return CloseBlobFile(bfile);
+}
+
+void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
+ SequenceNumber obsolete_seq,
+ bool update_size) {
+ return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
+}
+
+void BlobDBImpl::TEST_EvictExpiredFiles() {
+ EvictExpiredFiles(false /*abort*/);
+}
+
+uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
+
+void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
+ const std::vector<LiveFileMetaData>& live_files) {
+ InitializeBlobFileToSstMapping(live_files);
+}
+
+void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
+ ProcessFlushJobInfo(info);
+}
+
+void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
+ ProcessCompactionJobInfo(info);
+}
+
+#endif // !NDEBUG
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.h b/src/rocksdb/utilities/blob_db/blob_db_impl.h
new file mode 100644
index 000000000..0b4dbf5e5
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_impl.h
@@ -0,0 +1,503 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+#include <condition_variable>
+#include <limits>
+#include <list>
+#include <memory>
+#include <set>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "db/blob/blob_log_format.h"
+#include "db/blob/blob_log_writer.h"
+#include "db/db_iter.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/db.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/listener.h"
+#include "rocksdb/options.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/wal_filter.h"
+#include "util/mutexlock.h"
+#include "util/timer_queue.h"
+#include "utilities/blob_db/blob_db.h"
+#include "utilities/blob_db/blob_file.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class DBImpl;
+class ColumnFamilyHandle;
+class ColumnFamilyData;
+class SystemClock;
+
+struct FlushJobInfo;
+
+namespace blob_db {
+
+struct BlobCompactionContext;
+struct BlobCompactionContextGC;
+class BlobDBImpl;
+class BlobFile;
+
+// Comparator to sort "TTL" aware Blob files based on the lower value of
+// TTL range.
+struct BlobFileComparatorTTL {
+ bool operator()(const std::shared_ptr<BlobFile>& lhs,
+ const std::shared_ptr<BlobFile>& rhs) const;
+};
+
+struct BlobFileComparator {
+ bool operator()(const std::shared_ptr<BlobFile>& lhs,
+ const std::shared_ptr<BlobFile>& rhs) const;
+};
+
+/**
+ * The implementation class for BlobDB. It manages the blob logs, which
+ * are sequentially written files. Blob logs can be of the TTL or non-TTL
+ * varieties; the former are cleaned up when they expire, while the latter
+ * are (optionally) garbage collected.
+ */
+class BlobDBImpl : public BlobDB {
+ friend class BlobFile;
+ friend class BlobDBIterator;
+ friend class BlobDBListener;
+ friend class BlobDBListenerGC;
+ friend class BlobIndexCompactionFilterBase;
+ friend class BlobIndexCompactionFilterGC;
+
+ public:
+ // deletions check period
+ static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
+
+ // sanity check task
+ static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
+
+ // how many random access open files can we tolerate
+ static constexpr uint32_t kOpenFilesTrigger = 100;
+
+ // how often to schedule reclaim open files.
+ static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
+
+ // how often to schedule delete obs files periods
+ static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
+
+ // how often to schedule expired files eviction.
+ static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
+
+ // when should oldest file be evicted:
+ // on reaching 90% of blob_dir_size
+ static constexpr double kEvictOldestFileAtSize = 0.9;
+
+ using BlobDB::Put;
+ Status Put(const WriteOptions& options, const Slice& key,
+ const Slice& value) override;
+
+ using BlobDB::Get;
+ Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value) override;
+
+ Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value,
+ uint64_t* expiration) override;
+
+ using BlobDB::NewIterator;
+ virtual Iterator* NewIterator(const ReadOptions& read_options) override;
+
+ using BlobDB::NewIterators;
+ virtual Status NewIterators(
+ const ReadOptions& /*read_options*/,
+ const std::vector<ColumnFamilyHandle*>& /*column_families*/,
+ std::vector<Iterator*>* /*iterators*/) override {
+ return Status::NotSupported("Not implemented");
+ }
+
+ using BlobDB::MultiGet;
+ virtual std::vector<Status> MultiGet(
+ const ReadOptions& read_options, const std::vector<Slice>& keys,
+ std::vector<std::string>* values) override;
+
+ using BlobDB::Write;
+ virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
+
+ virtual Status Close() override;
+
+ using BlobDB::PutWithTTL;
+ Status PutWithTTL(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t ttl) override;
+
+ using BlobDB::PutUntil;
+ Status PutUntil(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t expiration) override;
+
+ using BlobDB::CompactFiles;
+ Status CompactFiles(
+ const CompactionOptions& compact_options,
+ const std::vector<std::string>& input_file_names, const int output_level,
+ const int output_path_id = -1,
+ std::vector<std::string>* const output_file_names = nullptr,
+ CompactionJobInfo* compaction_job_info = nullptr) override;
+
+ BlobDBOptions GetBlobDBOptions() const override;
+
+ BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
+ const DBOptions& db_options,
+ const ColumnFamilyOptions& cf_options);
+
+ virtual Status DisableFileDeletions() override;
+
+ virtual Status EnableFileDeletions(bool force) override;
+
+ virtual Status GetLiveFiles(std::vector<std::string>&,
+ uint64_t* manifest_file_size,
+ bool flush_memtable = true) override;
+ virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
+
+ ~BlobDBImpl();
+
+ Status Open(std::vector<ColumnFamilyHandle*>* handles);
+
+ Status SyncBlobFiles() override;
+
+ // Common part of the two GetCompactionContext methods below.
+ // REQUIRES: read lock on mutex_
+ void GetCompactionContextCommon(BlobCompactionContext* context);
+
+ void GetCompactionContext(BlobCompactionContext* context);
+ void GetCompactionContext(BlobCompactionContext* context,
+ BlobCompactionContextGC* context_gc);
+
+#ifndef NDEBUG
+ Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
+ PinnableSlice* value);
+
+ void TEST_AddDummyBlobFile(uint64_t blob_file_number,
+ SequenceNumber immutable_sequence);
+
+ std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
+
+ std::vector<std::shared_ptr<BlobFile>> TEST_GetLiveImmNonTTLFiles() const;
+
+ std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
+
+ Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
+
+ void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
+ SequenceNumber obsolete_seq = 0,
+ bool update_size = true);
+
+ void TEST_EvictExpiredFiles();
+
+ void TEST_DeleteObsoleteFiles();
+
+ uint64_t TEST_live_sst_size();
+
+ const std::string& TEST_blob_dir() const { return blob_dir_; }
+
+ void TEST_InitializeBlobFileToSstMapping(
+ const std::vector<LiveFileMetaData>& live_files);
+
+ void TEST_ProcessFlushJobInfo(const FlushJobInfo& info);
+
+ void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info);
+
+#endif // !NDEBUG
+
+ private:
+ class BlobInserter;
+
+ // Create a snapshot if there isn't one in read options.
+ // Return true if a snapshot is created.
+ bool SetSnapshotIfNeeded(ReadOptions* read_options);
+
+ Status GetImpl(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration = nullptr);
+
+ Status GetBlobValue(const Slice& key, const Slice& index_entry,
+ PinnableSlice* value, uint64_t* expiration = nullptr);
+
+ Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
+ uint64_t offset, uint64_t size,
+ PinnableSlice* value,
+ CompressionType* compression_type);
+
+ Slice GetCompressedSlice(const Slice& raw,
+ std::string* compression_output) const;
+
+ Status DecompressSlice(const Slice& compressed_value,
+ CompressionType compression_type,
+ PinnableSlice* value_output) const;
+
+ // Close a file by appending a footer, and removes file from open files list.
+ // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_
+ // and the blob file's mutex_. If called on a blob file which is visible only
+ // to a single thread (like in the case of new files written during
+ // compaction/GC), the locks on write_mutex_ and the blob file's mutex_ can be
+ // avoided.
+ Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
+
+ // Close a file if its size exceeds blob_file_size
+ // REQUIRES: lock held on write_mutex_.
+ Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
+
+ // Mark file as obsolete and move the file to obsolete file list.
+ //
+ // REQUIRED: hold write lock of mutex_ or during DB open.
+ void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
+ SequenceNumber obsolete_seq, bool update_size);
+
+ Status PutBlobValue(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t expiration,
+ WriteBatch* batch);
+
+ Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
+ const std::string& headerbuf, const Slice& key,
+ const Slice& value, uint64_t expiration,
+ std::string* index_entry);
+
+ // Create a new blob file and associated writer.
+ Status CreateBlobFileAndWriter(bool has_ttl,
+ const ExpirationRange& expiration_range,
+ const std::string& reason,
+ std::shared_ptr<BlobFile>* blob_file,
+ std::shared_ptr<BlobLogWriter>* writer);
+
+ // Get the open non-TTL blob log file, or create a new one if no such file
+ // exists.
+ Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
+
+ // Get the open TTL blob log file for a certain expiration, or create a new
+ // one if no such file exists.
+ Status SelectBlobFileTTL(uint64_t expiration,
+ std::shared_ptr<BlobFile>* blob_file);
+
+ std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
+
+ // periodic sanity check. Bunch of checks
+ std::pair<bool, int64_t> SanityCheck(bool aborted);
+
+ // Delete files that have been marked obsolete (either because of TTL
+ // or GC). Check whether any snapshots exist which refer to the same.
+ std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
+
+ // periodically check if open blob files and their TTL's has expired
+ // if expired, close the sequential writer and make the file immutable
+ std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
+
+ // if the number of open files, approaches ULIMIT's this
+ // task will close random readers, which are kept around for
+ // efficiency
+ std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
+
+ std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
+
+ // Adds the background tasks to the timer queue
+ void StartBackgroundTasks();
+
+ // add a new Blob File
+ std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl,
+ const ExpirationRange& expiration_range,
+ const std::string& reason);
+
+ // Register a new blob file.
+ // REQUIRES: write lock on mutex_.
+ void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file);
+
+ // collect all the blob log files from the blob directory
+ Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
+
+ // Open all blob files found in blob_dir.
+ Status OpenAllBlobFiles();
+
+ // Link an SST to a blob file. Comes in locking and non-locking varieties
+ // (the latter is used during Open).
+ template <typename Linker>
+ void LinkSstToBlobFileImpl(uint64_t sst_file_number,
+ uint64_t blob_file_number, Linker linker);
+
+ void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number);
+
+ void LinkSstToBlobFileNoLock(uint64_t sst_file_number,
+ uint64_t blob_file_number);
+
+ // Unlink an SST from a blob file.
+ void UnlinkSstFromBlobFile(uint64_t sst_file_number,
+ uint64_t blob_file_number);
+
+ // Initialize the mapping between blob files and SSTs during Open.
+ void InitializeBlobFileToSstMapping(
+ const std::vector<LiveFileMetaData>& live_files);
+
+ // Update the mapping between blob files and SSTs after a flush and mark
+ // any unneeded blob files obsolete.
+ void ProcessFlushJobInfo(const FlushJobInfo& info);
+
+ // Update the mapping between blob files and SSTs after a compaction and
+ // mark any unneeded blob files obsolete.
+ void ProcessCompactionJobInfo(const CompactionJobInfo& info);
+
+ // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs
+ // linked to it, and all memtables from before the blob file became immutable
+ // have been flushed. Note: should only be called if the condition holds for
+ // all lower-numbered non-TTL blob files as well.
+ bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile>& blob_file,
+ SequenceNumber obsolete_seq);
+
+ // Mark all immutable non-TTL blob files that aren't needed by any SSTs as
+ // obsolete. Comes in two varieties; the version used during Open need not
+ // worry about locking or snapshots.
+ template <class Functor>
+ void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed);
+
+ void MarkUnreferencedBlobFilesObsolete();
+ void MarkUnreferencedBlobFilesObsoleteDuringOpen();
+
+ void UpdateLiveSSTSize();
+
+ Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
+ std::shared_ptr<RandomAccessFileReader>* reader);
+
+ // hold write mutex on file and call.
+ // Close the above Random Access reader
+ void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
+
+ // hold write mutex on file and call
+ // creates a sequential (append) writer for this blobfile
+ Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
+
+ // returns a BlobLogWriter object for the file. If writer is not
+ // already present, creates one. Needs Write Mutex to be held
+ Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
+ std::shared_ptr<BlobLogWriter>* writer);
+
+ // checks if there is no snapshot which is referencing the
+ // blobs
+ bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
+ bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
+
+ void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
+
+ uint64_t EpochNow() { return clock_->NowMicros() / 1000000; }
+
+ // Check if inserting a new blob will make DB grow out of space.
+ // If is_fifo = true, FIFO eviction will be triggered to make room for the
+ // new blob. If force_evict = true, FIFO eviction will evict blob files
+ // even eviction will not make enough room for the new blob.
+ Status CheckSizeAndEvictBlobFiles(uint64_t blob_size,
+ bool force_evict = false);
+
+ // name of the database directory
+ std::string dbname_;
+
+ // the base DB
+ DBImpl* db_impl_;
+ Env* env_;
+ SystemClock* clock_;
+ // the options that govern the behavior of Blob Storage
+ BlobDBOptions bdb_options_;
+ DBOptions db_options_;
+ ColumnFamilyOptions cf_options_;
+ FileOptions file_options_;
+
+ // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
+ // ownership.
+ Statistics* statistics_;
+
+ // by default this is "blob_dir" under dbname_
+ // but can be configured
+ std::string blob_dir_;
+
+ // pointer to directory
+ std::unique_ptr<FSDirectory> dir_ent_;
+
+ // Read Write Mutex, which protects all the data structures
+ // HEAVILY TRAFFICKED
+ mutable port::RWMutex mutex_;
+
+ // Writers has to hold write_mutex_ before writing.
+ mutable port::Mutex write_mutex_;
+
+ // counter for blob file number
+ std::atomic<uint64_t> next_file_number_;
+
+ // entire metadata of all the BLOB files memory
+ std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
+
+ // All live immutable non-TTL blob files.
+ std::map<uint64_t, std::shared_ptr<BlobFile>> live_imm_non_ttl_blob_files_;
+
+ // The largest sequence number that has been flushed.
+ SequenceNumber flush_sequence_;
+
+ // opened non-TTL blob file.
+ std::shared_ptr<BlobFile> open_non_ttl_file_;
+
+ // all the blob files which are currently being appended to based
+ // on variety of incoming TTL's
+ std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;
+
+ // Flag to check whether Close() has been called on this DB
+ bool closed_;
+
+ // timer based queue to execute tasks
+ TimerQueue tqueue_;
+
+ // number of files opened for random access/GET
+ // counter is used to monitor and close excess RA files.
+ std::atomic<uint32_t> open_file_count_;
+
+ // Total size of all live blob files (i.e. exclude obsolete files).
+ std::atomic<uint64_t> total_blob_size_;
+
+ // total size of SST files.
+ std::atomic<uint64_t> live_sst_size_;
+
+ // Latest FIFO eviction timestamp
+ //
+ // REQUIRES: access with metex_ lock held.
+ uint64_t fifo_eviction_seq_;
+
+ // The expiration up to which latest FIFO eviction evicts.
+ //
+ // REQUIRES: access with metex_ lock held.
+ uint64_t evict_expiration_up_to_;
+
+ std::list<std::shared_ptr<BlobFile>> obsolete_files_;
+
+ // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
+ // on the mutex to avoid contention.
+ //
+ // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
+ // the difference. mutex_ only needs to be held when access the
+ // data-structure, and delete_file_mutex_ needs to be held the whole time
+ // during DeleteObsoleteFiles to avoid being run simultaneously with
+ // DisableFileDeletions.
+ //
+ // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
+ // to hold delete_file_mutex_ first to avoid deadlock.
+ mutable port::Mutex delete_file_mutex_;
+
+ // Each call of DisableFileDeletions will increase disable_file_deletion_
+ // by 1. EnableFileDeletions will either decrease the count by 1 or reset
+ // it to zeor, depending on the force flag.
+ //
+ // REQUIRES: access with delete_file_mutex_ held.
+ int disable_file_deletions_ = 0;
+
+ uint32_t debug_level_;
+};
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc b/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc
new file mode 100644
index 000000000..87e3f33cc
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc
@@ -0,0 +1,113 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "file/filename.h"
+#include "logging/logging.h"
+#include "util/cast_util.h"
+#include "util/mutexlock.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+// BlobDBImpl methods to get snapshot of files, e.g. for replication.
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+Status BlobDBImpl::DisableFileDeletions() {
+ // Disable base DB file deletions.
+ Status s = db_impl_->DisableFileDeletions();
+ if (!s.ok()) {
+ return s;
+ }
+
+ int count = 0;
+ {
+ // Hold delete_file_mutex_ to make sure no DeleteObsoleteFiles job
+ // is running.
+ MutexLock l(&delete_file_mutex_);
+ count = ++disable_file_deletions_;
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Disabled blob file deletions. count: %d", count);
+ return Status::OK();
+}
+
+Status BlobDBImpl::EnableFileDeletions(bool force) {
+ // Enable base DB file deletions.
+ Status s = db_impl_->EnableFileDeletions(force);
+ if (!s.ok()) {
+ return s;
+ }
+
+ int count = 0;
+ {
+ MutexLock l(&delete_file_mutex_);
+ if (force) {
+ disable_file_deletions_ = 0;
+ } else if (disable_file_deletions_ > 0) {
+ count = --disable_file_deletions_;
+ }
+ assert(count >= 0);
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d",
+ count);
+ // Consider trigger DeleteobsoleteFiles once after re-enabled, if we are to
+ // make DeleteobsoleteFiles re-run interval configuration.
+ return Status::OK();
+}
+
+Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
+ uint64_t* manifest_file_size,
+ bool flush_memtable) {
+ if (!bdb_options_.path_relative) {
+ return Status::NotSupported(
+ "Not able to get relative blob file path from absolute blob_dir.");
+ }
+ // Hold a lock in the beginning to avoid updates to base DB during the call
+ ReadLock rl(&mutex_);
+ Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
+ if (!s.ok()) {
+ return s;
+ }
+ ret.reserve(ret.size() + blob_files_.size());
+ for (auto bfile_pair : blob_files_) {
+ auto blob_file = bfile_pair.second;
+ // Path should be relative to db_name, but begin with slash.
+ ret.emplace_back(
+ BlobFileName("", bdb_options_.blob_dir, blob_file->BlobFileNumber()));
+ }
+ return Status::OK();
+}
+
+void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
+ // Path should be relative to db_name.
+ assert(bdb_options_.path_relative);
+ // Hold a lock in the beginning to avoid updates to base DB during the call
+ ReadLock rl(&mutex_);
+ db_->GetLiveFilesMetaData(metadata);
+ for (auto bfile_pair : blob_files_) {
+ auto blob_file = bfile_pair.second;
+ LiveFileMetaData filemetadata;
+ filemetadata.size = blob_file->GetFileSize();
+ const uint64_t file_number = blob_file->BlobFileNumber();
+ // Path should be relative to db_name, but begin with slash.
+ filemetadata.name = BlobFileName("", bdb_options_.blob_dir, file_number);
+ filemetadata.file_number = file_number;
+ if (blob_file->HasTTL()) {
+ filemetadata.oldest_ancester_time = blob_file->GetExpirationRange().first;
+ }
+ auto cfh =
+ static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily());
+ filemetadata.column_family_name = cfh->GetName();
+ metadata->emplace_back(filemetadata);
+ }
+}
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_iterator.h b/src/rocksdb/utilities/blob_db/blob_db_iterator.h
new file mode 100644
index 000000000..fd2b2f8f5
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_iterator.h
@@ -0,0 +1,150 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include "db/arena_wrapped_db_iter.h"
+#include "rocksdb/iterator.h"
+#include "util/stop_watch.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+class Statistics;
+class SystemClock;
+
+namespace blob_db {
+
+using ROCKSDB_NAMESPACE::ManagedSnapshot;
+
+class BlobDBIterator : public Iterator {
+ public:
+ BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter,
+ BlobDBImpl* blob_db, SystemClock* clock,
+ Statistics* statistics)
+ : snapshot_(snapshot),
+ iter_(iter),
+ blob_db_(blob_db),
+ clock_(clock),
+ statistics_(statistics) {}
+
+ virtual ~BlobDBIterator() = default;
+
+ bool Valid() const override {
+ if (!iter_->Valid()) {
+ return false;
+ }
+ return status_.ok();
+ }
+
+ Status status() const override {
+ if (!iter_->status().ok()) {
+ return iter_->status();
+ }
+ return status_;
+ }
+
+ void SeekToFirst() override {
+ StopWatch seek_sw(clock_, statistics_, BLOB_DB_SEEK_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_SEEK);
+ iter_->SeekToFirst();
+ while (UpdateBlobValue()) {
+ iter_->Next();
+ }
+ }
+
+ void SeekToLast() override {
+ StopWatch seek_sw(clock_, statistics_, BLOB_DB_SEEK_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_SEEK);
+ iter_->SeekToLast();
+ while (UpdateBlobValue()) {
+ iter_->Prev();
+ }
+ }
+
+ void Seek(const Slice& target) override {
+ StopWatch seek_sw(clock_, statistics_, BLOB_DB_SEEK_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_SEEK);
+ iter_->Seek(target);
+ while (UpdateBlobValue()) {
+ iter_->Next();
+ }
+ }
+
+ void SeekForPrev(const Slice& target) override {
+ StopWatch seek_sw(clock_, statistics_, BLOB_DB_SEEK_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_SEEK);
+ iter_->SeekForPrev(target);
+ while (UpdateBlobValue()) {
+ iter_->Prev();
+ }
+ }
+
+ void Next() override {
+ assert(Valid());
+ StopWatch next_sw(clock_, statistics_, BLOB_DB_NEXT_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_NEXT);
+ iter_->Next();
+ while (UpdateBlobValue()) {
+ iter_->Next();
+ }
+ }
+
+ void Prev() override {
+ assert(Valid());
+ StopWatch prev_sw(clock_, statistics_, BLOB_DB_PREV_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_PREV);
+ iter_->Prev();
+ while (UpdateBlobValue()) {
+ iter_->Prev();
+ }
+ }
+
+ Slice key() const override {
+ assert(Valid());
+ return iter_->key();
+ }
+
+ Slice value() const override {
+ assert(Valid());
+ if (!iter_->IsBlob()) {
+ return iter_->value();
+ }
+ return value_;
+ }
+
+ // Iterator::Refresh() not supported.
+
+ private:
+ // Return true if caller should continue to next value.
+ bool UpdateBlobValue() {
+ value_.Reset();
+ status_ = Status::OK();
+ if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) {
+ Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_);
+ if (s.IsNotFound()) {
+ return true;
+ } else {
+ if (!s.ok()) {
+ status_ = s;
+ }
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ std::unique_ptr<ManagedSnapshot> snapshot_;
+ std::unique_ptr<ArenaWrappedDBIter> iter_;
+ BlobDBImpl* blob_db_;
+ SystemClock* clock_;
+ Statistics* statistics_;
+ Status status_;
+ PinnableSlice value_;
+};
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_listener.h b/src/rocksdb/utilities/blob_db/blob_db_listener.h
new file mode 100644
index 000000000..d17d29853
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_listener.h
@@ -0,0 +1,71 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+
+#include "rocksdb/listener.h"
+#include "util/mutexlock.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+class BlobDBListener : public EventListener {
+ public:
+ explicit BlobDBListener(BlobDBImpl* blob_db_impl)
+ : blob_db_impl_(blob_db_impl) {}
+
+ void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*info*/) override {
+ assert(blob_db_impl_ != nullptr);
+ blob_db_impl_->SyncBlobFiles();
+ }
+
+ void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override {
+ assert(blob_db_impl_ != nullptr);
+ blob_db_impl_->UpdateLiveSSTSize();
+ }
+
+ void OnCompactionCompleted(DB* /*db*/,
+ const CompactionJobInfo& /*info*/) override {
+ assert(blob_db_impl_ != nullptr);
+ blob_db_impl_->UpdateLiveSSTSize();
+ }
+
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "BlobDBListener"; }
+
+ protected:
+ BlobDBImpl* blob_db_impl_;
+};
+
+class BlobDBListenerGC : public BlobDBListener {
+ public:
+ explicit BlobDBListenerGC(BlobDBImpl* blob_db_impl)
+ : BlobDBListener(blob_db_impl) {}
+
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "BlobDBListenerGC"; }
+ void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
+ BlobDBListener::OnFlushCompleted(db, info);
+
+ assert(blob_db_impl_);
+ blob_db_impl_->ProcessFlushJobInfo(info);
+ }
+
+ void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) override {
+ BlobDBListener::OnCompactionCompleted(db, info);
+
+ assert(blob_db_impl_);
+ blob_db_impl_->ProcessCompactionJobInfo(info);
+ }
+};
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_test.cc b/src/rocksdb/utilities/blob_db/blob_db_test.cc
new file mode 100644
index 000000000..e392962b2
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_test.cc
@@ -0,0 +1,2407 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_db.h"
+
+#include <algorithm>
+#include <chrono>
+#include <cstdlib>
+#include <iomanip>
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "db/blob/blob_index.h"
+#include "db/db_test_util.h"
+#include "env/composite_env_wrapper.h"
+#include "file/file_util.h"
+#include "file/sst_file_manager_impl.h"
+#include "port/port.h"
+#include "rocksdb/utilities/debug.h"
+#include "test_util/mock_time_env.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
+#include "util/random.h"
+#include "util/string_util.h"
+#include "utilities/blob_db/blob_db_impl.h"
+#include "utilities/fault_injection_env.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+class BlobDBTest : public testing::Test {
+ public:
+ const int kMaxBlobSize = 1 << 14;
+
+ struct BlobIndexVersion {
+ BlobIndexVersion() = default;
+ BlobIndexVersion(std::string _user_key, uint64_t _file_number,
+ uint64_t _expiration, SequenceNumber _sequence,
+ ValueType _type)
+ : user_key(std::move(_user_key)),
+ file_number(_file_number),
+ expiration(_expiration),
+ sequence(_sequence),
+ type(_type) {}
+
+ std::string user_key;
+ uint64_t file_number = kInvalidBlobFileNumber;
+ uint64_t expiration = kNoExpiration;
+ SequenceNumber sequence = 0;
+ ValueType type = kTypeValue;
+ };
+
+ BlobDBTest()
+ : dbname_(test::PerThreadDBPath("blob_db_test")), blob_db_(nullptr) {
+ mock_clock_ = std::make_shared<MockSystemClock>(SystemClock::Default());
+ mock_env_.reset(new CompositeEnvWrapper(Env::Default(), mock_clock_));
+ fault_injection_env_.reset(new FaultInjectionTestEnv(Env::Default()));
+
+ Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
+ assert(s.ok());
+ }
+
+ ~BlobDBTest() override {
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ Destroy();
+ }
+
+ Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
+ Options options = Options()) {
+ options.create_if_missing = true;
+ if (options.env == mock_env_.get()) {
+ // Need to disable stats dumping and persisting which also use
+ // RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
+ // With mocked time, this can hang on some platforms (MacOS)
+ // because (a) on some platforms, pthread_cond_timedwait does not appear
+ // to release the lock for other threads to operate if the deadline time
+ // is already passed, and (b) TimedWait calls are currently a bad
+ // abstraction because the deadline parameter is usually computed from
+ // Env time, but is interpreted in real clock time.
+ options.stats_dump_period_sec = 0;
+ options.stats_persist_period_sec = 0;
+ }
+ return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
+ }
+
+ void Open(BlobDBOptions bdb_options = BlobDBOptions(),
+ Options options = Options()) {
+ ASSERT_OK(TryOpen(bdb_options, options));
+ }
+
+ void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
+ Options options = Options()) {
+ assert(blob_db_ != nullptr);
+ delete blob_db_;
+ blob_db_ = nullptr;
+ Open(bdb_options, options);
+ }
+
+ void Close() {
+ assert(blob_db_ != nullptr);
+ delete blob_db_;
+ blob_db_ = nullptr;
+ }
+
+ void Destroy() {
+ if (blob_db_) {
+ Options options = blob_db_->GetOptions();
+ BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
+ delete blob_db_;
+ blob_db_ = nullptr;
+ ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
+ }
+ }
+
+ BlobDBImpl *blob_db_impl() {
+ return reinterpret_cast<BlobDBImpl *>(blob_db_);
+ }
+
+ Status Put(const Slice &key, const Slice &value,
+ std::map<std::string, std::string> *data = nullptr) {
+ Status s = blob_db_->Put(WriteOptions(), key, value);
+ if (data != nullptr) {
+ (*data)[key.ToString()] = value.ToString();
+ }
+ return s;
+ }
+
+ void Delete(const std::string &key,
+ std::map<std::string, std::string> *data = nullptr) {
+ ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
+ if (data != nullptr) {
+ data->erase(key);
+ }
+ }
+
+ Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
+ std::map<std::string, std::string> *data = nullptr) {
+ Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
+ if (data != nullptr) {
+ (*data)[key.ToString()] = value.ToString();
+ }
+ return s;
+ }
+
+ Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
+ return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
+ }
+
+ void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = rnd->HumanReadableString(len);
+ ASSERT_OK(
+ blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
+ void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = rnd->HumanReadableString(len);
+ ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
+ expiration));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
+ void PutRandom(const std::string &key, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ PutRandom(blob_db_, key, rnd, data);
+ }
+
+ void PutRandom(DB *db, const std::string &key, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = rnd->HumanReadableString(len);
+ ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
+ void PutRandomToWriteBatch(
+ const std::string &key, Random *rnd, WriteBatch *batch,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = rnd->HumanReadableString(len);
+ ASSERT_OK(batch->Put(key, value));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
+ // Verify blob db contain expected data and nothing more.
+ void VerifyDB(const std::map<std::string, std::string> &data) {
+ VerifyDB(blob_db_, data);
+ }
+
+ void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
+ // Verify normal Get
+ auto *cfh = db->DefaultColumnFamily();
+ for (auto &p : data) {
+ PinnableSlice value_slice;
+ ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
+ ASSERT_EQ(p.second, value_slice.ToString());
+ std::string value;
+ ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
+ ASSERT_EQ(p.second, value);
+ }
+
+ // Verify iterators
+ Iterator *iter = db->NewIterator(ReadOptions());
+ iter->SeekToFirst();
+ for (auto &p : data) {
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(p.first, iter->key().ToString());
+ ASSERT_EQ(p.second, iter->value().ToString());
+ iter->Next();
+ }
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ delete iter;
+ }
+
+ void VerifyBaseDB(
+ const std::map<std::string, KeyVersion> &expected_versions) {
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ DB *db = blob_db_->GetRootDB();
+ const size_t kMaxKeys = 10000;
+ std::vector<KeyVersion> versions;
+ ASSERT_OK(GetAllKeyVersions(db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(expected_versions.size(), versions.size());
+ size_t i = 0;
+ for (auto &key_version : expected_versions) {
+ const KeyVersion &expected_version = key_version.second;
+ ASSERT_EQ(expected_version.user_key, versions[i].user_key);
+ ASSERT_EQ(expected_version.sequence, versions[i].sequence);
+ ASSERT_EQ(expected_version.type, versions[i].type);
+ if (versions[i].type == kTypeValue) {
+ ASSERT_EQ(expected_version.value, versions[i].value);
+ } else {
+ ASSERT_EQ(kTypeBlobIndex, versions[i].type);
+ PinnableSlice value;
+ ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
+ versions[i].value, &value));
+ ASSERT_EQ(expected_version.value, value.ToString());
+ }
+ i++;
+ }
+ }
+
+ void VerifyBaseDBBlobIndex(
+ const std::map<std::string, BlobIndexVersion> &expected_versions) {
+ const size_t kMaxKeys = 10000;
+ std::vector<KeyVersion> versions;
+ ASSERT_OK(
+ GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
+ ASSERT_EQ(versions.size(), expected_versions.size());
+
+ size_t i = 0;
+ for (const auto &expected_pair : expected_versions) {
+ const BlobIndexVersion &expected_version = expected_pair.second;
+
+ ASSERT_EQ(versions[i].user_key, expected_version.user_key);
+ ASSERT_EQ(versions[i].sequence, expected_version.sequence);
+ ASSERT_EQ(versions[i].type, expected_version.type);
+ if (versions[i].type != kTypeBlobIndex) {
+ ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
+ ASSERT_EQ(kNoExpiration, expected_version.expiration);
+
+ ++i;
+ continue;
+ }
+
+ BlobIndex blob_index;
+ ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
+
+ const uint64_t file_number = !blob_index.IsInlined()
+ ? blob_index.file_number()
+ : kInvalidBlobFileNumber;
+ ASSERT_EQ(file_number, expected_version.file_number);
+
+ const uint64_t expiration =
+ blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
+ ASSERT_EQ(expiration, expected_version.expiration);
+
+ ++i;
+ }
+ }
+
+ void InsertBlobs() {
+ WriteOptions wo;
+ std::string value;
+
+ Random rnd(301);
+ for (size_t i = 0; i < 100000; i++) {
+ uint64_t ttl = rnd.Next() % 86400;
+ PutRandomWithTTL("key" + std::to_string(i % 500), ttl, &rnd, nullptr);
+ }
+
+ for (size_t i = 0; i < 10; i++) {
+ Delete("key" + std::to_string(i % 500));
+ }
+ }
+
+ const std::string dbname_;
+ std::shared_ptr<MockSystemClock> mock_clock_;
+ std::unique_ptr<Env> mock_env_;
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
+ BlobDB *blob_db_;
+}; // class BlobDBTest
+
+TEST_F(BlobDBTest, Put) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + std::to_string(i), &rnd, &data);
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, PutWithTTL) {
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.min_blob_size = 0;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ mock_clock_->SetCurrentTime(50);
+ for (size_t i = 0; i < 100; i++) {
+ uint64_t ttl = rnd.Next() % 100;
+ PutRandomWithTTL("key" + std::to_string(i), ttl, &rnd,
+ (ttl <= 50 ? nullptr : &data));
+ }
+ mock_clock_->SetCurrentTime(100);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, PutUntil) {
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.min_blob_size = 0;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ mock_clock_->SetCurrentTime(50);
+ for (size_t i = 0; i < 100; i++) {
+ uint64_t expiration = rnd.Next() % 100 + 50;
+ PutRandomUntil("key" + std::to_string(i), expiration, &rnd,
+ (expiration <= 100 ? nullptr : &data));
+ }
+ mock_clock_->SetCurrentTime(100);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, StackableDBGet) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + std::to_string(i), &rnd, &data);
+ }
+ for (size_t i = 0; i < 100; i++) {
+ StackableDB *db = blob_db_;
+ ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
+ std::string key = "key" + std::to_string(i);
+ PinnableSlice pinnable_value;
+ ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
+ std::string string_value;
+ ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
+ ASSERT_EQ(string_value, pinnable_value.ToString());
+ ASSERT_EQ(string_value, data[key]);
+ }
+}
+
+TEST_F(BlobDBTest, GetExpiration) {
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.disable_background_tasks = true;
+ mock_clock_->SetCurrentTime(100);
+ Open(bdb_options, options);
+ ASSERT_OK(Put("key1", "value1"));
+ ASSERT_OK(PutWithTTL("key2", "value2", 200));
+ PinnableSlice value;
+ uint64_t expiration;
+ ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
+ ASSERT_EQ("value1", value.ToString());
+ ASSERT_EQ(kNoExpiration, expiration);
+ ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
+ ASSERT_EQ("value2", value.ToString());
+ ASSERT_EQ(300 /* = 100 + 200 */, expiration);
+}
+
+TEST_F(BlobDBTest, GetIOError) {
+ Options options;
+ options.env = fault_injection_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0; // Make sure value write to blob file
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
+ PinnableSlice value;
+ ASSERT_OK(Put("foo", "bar"));
+ fault_injection_env_->SetFilesystemActive(false, Status::IOError());
+ Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
+ ASSERT_TRUE(s.IsIOError());
+ // Reactivate file system to allow test to close DB.
+ fault_injection_env_->SetFilesystemActive(true);
+}
+
+TEST_F(BlobDBTest, PutIOError) {
+ Options options;
+ options.env = fault_injection_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0; // Make sure value write to blob file
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ fault_injection_env_->SetFilesystemActive(false, Status::IOError());
+ ASSERT_TRUE(Put("foo", "v1").IsIOError());
+ fault_injection_env_->SetFilesystemActive(true, Status::IOError());
+ ASSERT_OK(Put("bar", "v1"));
+}
+
+TEST_F(BlobDBTest, WriteBatch) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < 10; j++) {
+ PutRandomToWriteBatch("key" + std::to_string(j * 100 + i), &rnd, &batch,
+ &data);
+ }
+
+ ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, Delete) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + std::to_string(i), &rnd, &data);
+ }
+ for (size_t i = 0; i < 100; i += 5) {
+ Delete("key" + std::to_string(i), &data);
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, DeleteBatch) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + std::to_string(i), &rnd);
+ }
+ WriteBatch batch;
+ for (size_t i = 0; i < 100; i++) {
+ ASSERT_OK(batch.Delete("key" + std::to_string(i)));
+ }
+ ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
+ // DB should be empty.
+ VerifyDB({});
+}
+
+TEST_F(BlobDBTest, Override) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (int i = 0; i < 10000; i++) {
+ PutRandom("key" + std::to_string(i), &rnd, nullptr);
+ }
+ // override all the keys
+ for (int i = 0; i < 10000; i++) {
+ PutRandom("key" + std::to_string(i), &rnd, &data);
+ }
+ VerifyDB(data);
+}
+
+#ifdef SNAPPY
+TEST_F(BlobDBTest, Compression) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ bdb_options.compression = CompressionType::kSnappyCompression;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("put-key" + std::to_string(i), &rnd, &data);
+ }
+ for (int i = 0; i < 100; i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < 10; j++) {
+ PutRandomToWriteBatch("write-batch-key" + std::to_string(j * 100 + i),
+ &rnd, &batch, &data);
+ }
+ ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, DecompressAfterReopen) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ bdb_options.compression = CompressionType::kSnappyCompression;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("put-key" + std::to_string(i), &rnd, &data);
+ }
+ VerifyDB(data);
+ bdb_options.compression = CompressionType::kNoCompression;
+ Reopen(bdb_options);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, EnableDisableCompressionGC) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.garbage_collection_cutoff = 1.0;
+ bdb_options.disable_background_tasks = true;
+ bdb_options.compression = kSnappyCompression;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ size_t data_idx = 0;
+ for (; data_idx < 100; data_idx++) {
+ PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
+ }
+ VerifyDB(data);
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_EQ(kSnappyCompression, blob_files[0]->GetCompressionType());
+
+ // disable compression
+ bdb_options.compression = kNoCompression;
+ Reopen(bdb_options);
+
+ // Add more data with new compression type
+ for (; data_idx < 200; data_idx++) {
+ PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
+ }
+ VerifyDB(data);
+
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ ASSERT_EQ(kNoCompression, blob_files[1]->GetCompressionType());
+
+ // Enable GC. If we do it earlier the snapshot release triggered compaction
+ // may compact files and trigger GC before we can verify there are two files.
+ bdb_options.enable_garbage_collection = true;
+ Reopen(bdb_options);
+
+ // Trigger compaction
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ VerifyDB(data);
+
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ for (auto bfile : blob_files) {
+ ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
+ }
+
+ // enabling the compression again
+ bdb_options.compression = kSnappyCompression;
+ Reopen(bdb_options);
+
+ // Add more data with new compression type
+ for (; data_idx < 300; data_idx++) {
+ PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
+ }
+ VerifyDB(data);
+
+ // Trigger compaction
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ VerifyDB(data);
+
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ for (auto bfile : blob_files) {
+ ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
+ }
+}
+
+#ifdef LZ4
+// Test switch compression types and run GC, it needs both Snappy and LZ4
+// support.
+TEST_F(BlobDBTest, ChangeCompressionGC) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.garbage_collection_cutoff = 1.0;
+ bdb_options.disable_background_tasks = true;
+ bdb_options.compression = kLZ4Compression;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ size_t data_idx = 0;
+ for (; data_idx < 100; data_idx++) {
+ PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
+ }
+ VerifyDB(data);
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_EQ(kLZ4Compression, blob_files[0]->GetCompressionType());
+
+ // Change compression type
+ bdb_options.compression = kSnappyCompression;
+ Reopen(bdb_options);
+
+ // Add more data with Snappy compression type
+ for (; data_idx < 200; data_idx++) {
+ PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
+ }
+ VerifyDB(data);
+
+ // Verify blob file compression type
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ ASSERT_EQ(kSnappyCompression, blob_files[1]->GetCompressionType());
+
+ // Enable GC. If we do it earlier the snapshot release triggered compaction
+ // may compact files and trigger GC before we can verify there are two files.
+ bdb_options.enable_garbage_collection = true;
+ Reopen(bdb_options);
+
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ VerifyDB(data);
+
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ for (auto bfile : blob_files) {
+ ASSERT_EQ(kSnappyCompression, bfile->GetCompressionType());
+ }
+
+ // Disable compression
+ bdb_options.compression = kNoCompression;
+ Reopen(bdb_options);
+ for (; data_idx < 300; data_idx++) {
+ PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
+ }
+ VerifyDB(data);
+
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ VerifyDB(data);
+
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ for (auto bfile : blob_files) {
+ ASSERT_EQ(kNoCompression, bfile->GetCompressionType());
+ }
+
+ // switching different compression types to generate mixed compression types
+ bdb_options.compression = kSnappyCompression;
+ Reopen(bdb_options);
+ for (; data_idx < 400; data_idx++) {
+ PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
+ }
+ VerifyDB(data);
+
+ bdb_options.compression = kLZ4Compression;
+ Reopen(bdb_options);
+ for (; data_idx < 500; data_idx++) {
+ PutRandom("put-key" + std::to_string(data_idx), &rnd, &data);
+ }
+ VerifyDB(data);
+
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ VerifyDB(data);
+
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ for (auto bfile : blob_files) {
+ ASSERT_EQ(kLZ4Compression, bfile->GetCompressionType());
+ }
+}
+#endif // LZ4
+#endif // SNAPPY
+
+TEST_F(BlobDBTest, MultipleWriters) {
+ Open(BlobDBOptions());
+
+ std::vector<port::Thread> workers;
+ std::vector<std::map<std::string, std::string>> data_set(10);
+ for (uint32_t i = 0; i < 10; i++)
+ workers.push_back(port::Thread(
+ [&](uint32_t id) {
+ Random rnd(301 + id);
+ for (int j = 0; j < 100; j++) {
+ std::string key =
+ "key" + std::to_string(id) + "_" + std::to_string(j);
+ if (id < 5) {
+ PutRandom(key, &rnd, &data_set[id]);
+ } else {
+ WriteBatch batch;
+ PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
+ ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
+ }
+ }
+ },
+ i));
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 10; i++) {
+ workers[i].join();
+ data.insert(data_set[i].begin(), data_set[i].end());
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, SstFileManager) {
+ // run the same test for Get(), MultiGet() and Iterator each.
+ std::shared_ptr<SstFileManager> sst_file_manager(
+ NewSstFileManager(mock_env_.get()));
+ sst_file_manager->SetDeleteRateBytesPerSecond(1);
+ SstFileManagerImpl *sfm =
+ static_cast<SstFileManagerImpl *>(sst_file_manager.get());
+
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.garbage_collection_cutoff = 1.0;
+ Options db_options;
+
+ int files_scheduled_to_delete = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
+ assert(arg);
+ const std::string *const file_path =
+ static_cast<const std::string *>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ ++files_scheduled_to_delete;
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ db_options.sst_file_manager = sst_file_manager;
+
+ Open(bdb_options, db_options);
+
+ // Create one obselete file and clean it.
+ ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar"));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ std::shared_ptr<BlobFile> bfile = blob_files[0];
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+
+ // Even if SSTFileManager is not set, DB is creating a dummy one.
+ ASSERT_EQ(1, files_scheduled_to_delete);
+ Destroy();
+ // Make sure that DestroyBlobDB() also goes through delete scheduler.
+ ASSERT_EQ(2, files_scheduled_to_delete);
+ SyncPoint::GetInstance()->DisableProcessing();
+ sfm->WaitForEmptyTrash();
+}
+
+TEST_F(BlobDBTest, SstFileManagerRestart) {
+ int files_scheduled_to_delete = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
+ assert(arg);
+ const std::string *const file_path =
+ static_cast<const std::string *>(arg);
+ if (file_path->find(".blob") != std::string::npos) {
+ ++files_scheduled_to_delete;
+ }
+ });
+
+ // run the same test for Get(), MultiGet() and Iterator each.
+ std::shared_ptr<SstFileManager> sst_file_manager(
+ NewSstFileManager(mock_env_.get()));
+ sst_file_manager->SetDeleteRateBytesPerSecond(1);
+ SstFileManagerImpl *sfm =
+ static_cast<SstFileManagerImpl *>(sst_file_manager.get());
+
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ Options db_options;
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ db_options.sst_file_manager = sst_file_manager;
+
+ Open(bdb_options, db_options);
+ std::string blob_dir = blob_db_impl()->TEST_blob_dir();
+ ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "bar"));
+ Close();
+
+ // Create 3 dummy trash files under the blob_dir
+ const auto &fs = db_options.env->GetFileSystem();
+ ASSERT_OK(CreateFile(fs, blob_dir + "/000666.blob.trash", "", false));
+ ASSERT_OK(CreateFile(fs, blob_dir + "/000888.blob.trash", "", true));
+ ASSERT_OK(CreateFile(fs, blob_dir + "/something_not_match.trash", "", false));
+
+ // Make sure that reopening the DB rescan the existing trash files
+ Open(bdb_options, db_options);
+ ASSERT_EQ(files_scheduled_to_delete, 2);
+
+ sfm->WaitForEmptyTrash();
+
+ // There should be exact one file under the blob dir now.
+ std::vector<std::string> all_files;
+ ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
+ int nfiles = 0;
+ for (const auto &f : all_files) {
+ assert(!f.empty());
+ if (f[0] == '.') {
+ continue;
+ }
+ nfiles++;
+ }
+ ASSERT_EQ(nfiles, 1);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.garbage_collection_cutoff = 1.0;
+ bdb_options.disable_background_tasks = true;
+
+ Options options;
+ options.disable_auto_compactions = true;
+
+ // i = when to take snapshot
+ for (int i = 0; i < 4; i++) {
+ Destroy();
+ Open(bdb_options, options);
+
+ const Snapshot *snapshot = nullptr;
+
+ // First file
+ ASSERT_OK(Put("key1", "value"));
+ if (i == 0) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+
+ // Second file
+ ASSERT_OK(Put("key2", "value"));
+ if (i == 1) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ auto bfile = blob_files[1];
+ ASSERT_FALSE(bfile->Immutable());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
+
+ // Third file
+ ASSERT_OK(Put("key3", "value"));
+ if (i == 2) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_TRUE(bfile->Obsolete());
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
+ bfile->GetObsoleteSequence());
+
+ Delete("key2");
+ if (i == 3) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+
+ ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+
+ if (i >= 2) {
+ // The snapshot shouldn't see data in bfile
+ ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_->ReleaseSnapshot(snapshot);
+ } else {
+ // The snapshot will see data in bfile, so the file shouldn't be deleted
+ ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_->ReleaseSnapshot(snapshot);
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
+ }
+ }
+}
+
+TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
+ Options options;
+ options.env = mock_env_.get();
+ mock_clock_->SetCurrentTime(0);
+ Open(BlobDBOptions(), options);
+ ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
+ ColumnFamilyHandle *handle = nullptr;
+ std::string value;
+ std::vector<std::string> values;
+ // The call simply pass through to base db. It should succeed.
+ ASSERT_OK(
+ blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
+ ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
+ ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
+ .IsNotSupported());
+ ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
+ .IsNotSupported());
+ WriteBatch batch;
+ ASSERT_OK(batch.Put("k1", "v1"));
+ ASSERT_OK(batch.Put(handle, "k2", "v2"));
+ ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
+ ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
+ ASSERT_TRUE(
+ blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
+ auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
+ {"k1", "k2"}, &values);
+ ASSERT_EQ(2, statuses.size());
+ ASSERT_TRUE(statuses[0].IsNotSupported());
+ ASSERT_TRUE(statuses[1].IsNotSupported());
+ ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
+ delete handle;
+}
+
+TEST_F(BlobDBTest, GetLiveFilesMetaData) {
+ Random rnd(301);
+
+ BlobDBOptions bdb_options;
+ bdb_options.blob_dir = "blob_dir";
+ bdb_options.path_relative = true;
+ bdb_options.ttl_range_secs = 10;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+
+ Options options;
+ options.env = mock_env_.get();
+
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + std::to_string(i), &rnd, &data);
+ }
+
+ constexpr uint64_t expiration = 1000ULL;
+ PutRandomUntil("key100", expiration, &rnd, &data);
+
+ std::vector<LiveFileMetaData> metadata;
+ blob_db_->GetLiveFilesMetaData(&metadata);
+
+ ASSERT_EQ(2U, metadata.size());
+ // Path should be relative to db_name, but begin with slash.
+ const std::string filename1("/blob_dir/000001.blob");
+ ASSERT_EQ(filename1, metadata[0].name);
+ ASSERT_EQ(1, metadata[0].file_number);
+ ASSERT_EQ(0, metadata[0].oldest_ancester_time);
+ ASSERT_EQ(kDefaultColumnFamilyName, metadata[0].column_family_name);
+
+ const std::string filename2("/blob_dir/000002.blob");
+ ASSERT_EQ(filename2, metadata[1].name);
+ ASSERT_EQ(2, metadata[1].file_number);
+ ASSERT_EQ(expiration, metadata[1].oldest_ancester_time);
+ ASSERT_EQ(kDefaultColumnFamilyName, metadata[1].column_family_name);
+
+ std::vector<std::string> livefile;
+ uint64_t mfs;
+ ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
+ ASSERT_EQ(5U, livefile.size());
+ ASSERT_EQ(filename1, livefile[3]);
+ ASSERT_EQ(filename2, livefile[4]);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
+ constexpr size_t kNumKey = 20;
+ constexpr size_t kNumIteration = 10;
+ Random rnd(301);
+ std::map<std::string, std::string> data;
+ std::vector<bool> is_blob(kNumKey, false);
+
+ // Write to plain rocksdb.
+ Options options;
+ options.create_if_missing = true;
+ DB *db = nullptr;
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ for (size_t i = 0; i < kNumIteration; i++) {
+ auto key_index = rnd.Next() % kNumKey;
+ std::string key = "key" + std::to_string(key_index);
+ PutRandom(db, key, &rnd, &data);
+ }
+ VerifyDB(db, data);
+ delete db;
+ db = nullptr;
+
+ // Open as blob db. Verify it can read existing data.
+ Open();
+ VerifyDB(blob_db_, data);
+ for (size_t i = 0; i < kNumIteration; i++) {
+ auto key_index = rnd.Next() % kNumKey;
+ std::string key = "key" + std::to_string(key_index);
+ is_blob[key_index] = true;
+ PutRandom(blob_db_, key, &rnd, &data);
+ }
+ VerifyDB(blob_db_, data);
+ delete blob_db_;
+ blob_db_ = nullptr;
+
+ // Verify plain db return error for keys written by blob db.
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ std::string value;
+ for (size_t i = 0; i < kNumKey; i++) {
+ std::string key = "key" + std::to_string(i);
+ Status s = db->Get(ReadOptions(), key, &value);
+ if (data.count(key) == 0) {
+ ASSERT_TRUE(s.IsNotFound());
+ } else if (is_blob[i]) {
+ ASSERT_TRUE(s.IsCorruption());
+ } else {
+ ASSERT_OK(s);
+ ASSERT_EQ(data[key], value);
+ }
+ }
+ delete db;
+}
+
+// Test to verify that a NoSpace IOError Status is returned on reaching
+// max_db_size limit.
+TEST_F(BlobDBTest, OutOfSpace) {
+ // Use mock env to stop wall clock.
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 200;
+ bdb_options.is_fifo = false;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+
+ // Each stored blob has an overhead of about 42 bytes currently.
+ // So a small key + a 100 byte blob should take up ~150 bytes in the db.
+ std::string value(100, 'v');
+ ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
+
+ // Putting another blob should fail as ading it would exceed the max_db_size
+ // limit.
+ Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
+ ASSERT_TRUE(s.IsIOError());
+ ASSERT_TRUE(s.IsNoSpace());
+}
+
+TEST_F(BlobDBTest, FIFOEviction) {
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 200;
+ bdb_options.blob_file_size = 100;
+ bdb_options.is_fifo = true;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+
+ std::atomic<int> evict_count{0};
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlobDBImpl::EvictOldestBlobFile:Evicted",
+ [&](void *) { evict_count++; });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // Each stored blob has an overhead of 32 bytes currently.
+ // So a 100 byte blob should take up 132 bytes.
+ std::string value(100, 'v');
+ ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
+ VerifyDB({{"key1", value}});
+
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+
+ // Adding another 100 bytes blob would take the total size to 264 bytes
+ // (2*132). max_db_size will be exceeded
+ // than max_db_size and trigger FIFO eviction.
+ ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
+ ASSERT_EQ(1, evict_count);
+ // key1 will exist until corresponding file be deleted.
+ VerifyDB({{"key1", value}, {"key2", value}});
+
+ // Adding another 100 bytes blob without TTL.
+ ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
+ ASSERT_EQ(2, evict_count);
+ // key1 and key2 will exist until corresponding file be deleted.
+ VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
+
+ // The fourth blob file, without TTL.
+ ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
+ ASSERT_EQ(3, evict_count);
+ VerifyDB(
+ {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
+
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(4, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->Obsolete());
+ ASSERT_TRUE(blob_files[1]->Obsolete());
+ ASSERT_TRUE(blob_files[2]->Obsolete());
+ ASSERT_FALSE(blob_files[3]->Obsolete());
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(3, obsolete_files.size());
+ ASSERT_EQ(blob_files[0], obsolete_files[0]);
+ ASSERT_EQ(blob_files[1], obsolete_files[1]);
+ ASSERT_EQ(blob_files[2], obsolete_files[2]);
+
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_TRUE(obsolete_files.empty());
+ VerifyDB({{"key4", value}});
+}
+
+TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
+ Options options;
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 1000;
+ bdb_options.blob_file_size = 5000;
+ bdb_options.is_fifo = true;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+
+ std::atomic<int> evict_count{0};
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlobDBImpl::EvictOldestBlobFile:Evicted",
+ [&](void *) { evict_count++; });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ std::string value(2000, 'v');
+ ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
+ ASSERT_EQ(0, evict_count);
+}
+
+TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
+ BlobDBOptions bdb_options;
+ bdb_options.is_fifo = true;
+ bdb_options.min_blob_size = 100;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ // Use mock env to stop wall clock.
+ options.env = mock_env_.get();
+ options.disable_auto_compactions = true;
+ auto statistics = CreateDBStatistics();
+ options.statistics = statistics;
+ Open(bdb_options, options);
+
+ ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
+ std::string small_value(50, 'v');
+ std::map<std::string, std::string> data;
+ // Insert some data into LSM tree to make sure FIFO eviction take SST
+ // file size into account.
+ for (int i = 0; i < 1000; i++) {
+ ASSERT_OK(Put("key" + std::to_string(i), small_value, &data));
+ }
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+ uint64_t live_sst_size = 0;
+ ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
+ &live_sst_size));
+ ASSERT_TRUE(live_sst_size > 0);
+ ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
+
+ bdb_options.max_db_size = live_sst_size + 2000;
+ Reopen(bdb_options, options);
+ ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
+
+ std::string value_1k(1000, 'v');
+ ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
+ ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ VerifyDB(data);
+ // large_key2 evicts large_key1
+ ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ data.erase("large_key1");
+ VerifyDB(data);
+ // large_key3 get no enough space even after evicting large_key2, so it
+ // instead return no space error.
+ std::string value_2k(2000, 'v');
+ ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ // Verify large_key2 still exists.
+ VerifyDB(data);
+}
+
+// Test flush or compaction will trigger FIFO eviction since they update
+// total SST file size.
+TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 1000;
+ bdb_options.is_fifo = true;
+ bdb_options.min_blob_size = 100;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ // Use mock env to stop wall clock.
+ options.env = mock_env_.get();
+ auto statistics = CreateDBStatistics();
+ options.statistics = statistics;
+ options.compression = kNoCompression;
+ Open(bdb_options, options);
+
+ std::string value(800, 'v');
+ ASSERT_OK(PutWithTTL("large_key", value, 60));
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ VerifyDB({{"large_key", value}});
+
+ // Insert some small keys and flush to bring DB out of space.
+ std::map<std::string, std::string> data;
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put("key" + std::to_string(i), "v", &data));
+ }
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+
+ // Verify large_key is deleted by FIFO eviction.
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, InlineSmallValues) {
+ constexpr uint64_t kMaxExpiration = 1000;
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = kMaxExpiration;
+ bdb_options.min_blob_size = 100;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ options.env = mock_env_.get();
+ mock_clock_->SetCurrentTime(0);
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ std::map<std::string, KeyVersion> versions;
+ for (size_t i = 0; i < 1000; i++) {
+ bool is_small_value = rnd.Next() % 2;
+ bool has_ttl = rnd.Next() % 2;
+ uint64_t expiration = rnd.Next() % kMaxExpiration;
+ int len = is_small_value ? 50 : 200;
+ std::string key = "key" + std::to_string(i);
+ std::string value = rnd.HumanReadableString(len);
+ std::string blob_index;
+ data[key] = value;
+ SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+ if (!has_ttl) {
+ ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
+ } else {
+ ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
+ }
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+ versions[key] =
+ KeyVersion(key, value, sequence,
+ (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
+ }
+ VerifyDB(data);
+ VerifyBaseDB(versions);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ std::shared_ptr<BlobFile> non_ttl_file;
+ std::shared_ptr<BlobFile> ttl_file;
+ if (blob_files[0]->HasTTL()) {
+ ttl_file = blob_files[0];
+ non_ttl_file = blob_files[1];
+ } else {
+ non_ttl_file = blob_files[0];
+ ttl_file = blob_files[1];
+ }
+ ASSERT_FALSE(non_ttl_file->HasTTL());
+ ASSERT_TRUE(ttl_file->HasTTL());
+}
+
+TEST_F(BlobDBTest, UserCompactionFilter) {
+ class CustomerFilter : public CompactionFilter {
+ public:
+ bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
+ std::string *new_value, bool *value_changed) const override {
+ *value_changed = false;
+ // changing value size to test value transitions between inlined data
+ // and stored-in-blob data
+ if (value.size() % 4 == 1) {
+ *new_value = value.ToString();
+ // double size by duplicating value
+ *new_value += *new_value;
+ *value_changed = true;
+ return false;
+ } else if (value.size() % 3 == 1) {
+ *new_value = value.ToString();
+ // trancate value size by half
+ *new_value = new_value->substr(0, new_value->size() / 2);
+ *value_changed = true;
+ return false;
+ } else if (value.size() % 2 == 1) {
+ return true;
+ }
+ return false;
+ }
+ bool IgnoreSnapshots() const override { return true; }
+ const char *Name() const override { return "CustomerFilter"; }
+ };
+ class CustomerFilterFactory : public CompactionFilterFactory {
+ const char *Name() const override { return "CustomerFilterFactory"; }
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context & /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(new CustomerFilter());
+ }
+ };
+
+ constexpr size_t kNumPuts = 1 << 10;
+ // Generate both inlined and blob value
+ constexpr uint64_t kMinValueSize = 1 << 6;
+ constexpr uint64_t kMaxValueSize = 1 << 8;
+ constexpr uint64_t kMinBlobSize = 1 << 7;
+ static_assert(kMinValueSize < kMinBlobSize, "");
+ static_assert(kMaxValueSize > kMinBlobSize, "");
+
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = kMinBlobSize;
+ bdb_options.blob_file_size = kMaxValueSize * 10;
+ bdb_options.disable_background_tasks = true;
+ if (Snappy_Supported()) {
+ bdb_options.compression = CompressionType::kSnappyCompression;
+ }
+ // case_num == 0: Test user defined compaction filter
+ // case_num == 1: Test user defined compaction filter factory
+ for (int case_num = 0; case_num < 2; case_num++) {
+ Options options;
+ if (case_num == 0) {
+ options.compaction_filter = new CustomerFilter();
+ } else {
+ options.compaction_filter_factory.reset(new CustomerFilterFactory());
+ }
+ options.disable_auto_compactions = true;
+ options.env = mock_env_.get();
+ options.statistics = CreateDBStatistics();
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ std::map<std::string, std::string> data_after_compact;
+ Random rnd(301);
+ uint64_t value_size = kMinValueSize;
+ int drop_record = 0;
+ for (size_t i = 0; i < kNumPuts; ++i) {
+ std::ostringstream oss;
+ oss << "key" << std::setw(4) << std::setfill('0') << i;
+
+ const std::string key(oss.str());
+ const std::string value = rnd.HumanReadableString((int)value_size);
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(Put(key, value));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ if (value.length() % 4 == 1) {
+ data_after_compact[key] = value + value;
+ } else if (value.length() % 3 == 1) {
+ data_after_compact[key] = value.substr(0, value.size() / 2);
+ } else if (value.length() % 2 == 1) {
+ ++drop_record;
+ } else {
+ data_after_compact[key] = value;
+ }
+
+ if (++value_size > kMaxValueSize) {
+ value_size = kMinValueSize;
+ }
+ }
+ // Verify full data set
+ VerifyDB(data);
+ // Applying compaction filter for records
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ // Verify data after compaction, only value with even length left.
+ VerifyDB(data_after_compact);
+ ASSERT_EQ(drop_record,
+ options.statistics->getTickerCount(COMPACTION_KEY_DROP_USER));
+ delete options.compaction_filter;
+ Destroy();
+ }
+}
+
+// Test user comapction filter when there is IO error on blob data.
+TEST_F(BlobDBTest, UserCompactionFilter_BlobIOError) {
+ class CustomerFilter : public CompactionFilter {
+ public:
+ bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
+ std::string *new_value, bool *value_changed) const override {
+ *new_value = value.ToString() + "_new";
+ *value_changed = true;
+ return false;
+ }
+ bool IgnoreSnapshots() const override { return true; }
+ const char *Name() const override { return "CustomerFilter"; }
+ };
+
+ constexpr size_t kNumPuts = 100;
+ constexpr int kValueSize = 100;
+
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.blob_file_size = kValueSize * 10;
+ bdb_options.disable_background_tasks = true;
+ bdb_options.compression = CompressionType::kNoCompression;
+
+ std::vector<std::string> io_failure_cases = {
+ "BlobDBImpl::CreateBlobFileAndWriter",
+ "BlobIndexCompactionFilterBase::WriteBlobToNewFile",
+ "BlobDBImpl::CloseBlobFile"};
+
+ for (size_t case_num = 0; case_num < io_failure_cases.size(); case_num++) {
+ Options options;
+ options.compaction_filter = new CustomerFilter();
+ options.disable_auto_compactions = true;
+ options.env = fault_injection_env_.get();
+ options.statistics = CreateDBStatistics();
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ Random rnd(301);
+ for (size_t i = 0; i < kNumPuts; ++i) {
+ std::ostringstream oss;
+ oss << "key" << std::setw(4) << std::setfill('0') << i;
+
+ const std::string key(oss.str());
+ const std::string value = rnd.HumanReadableString(kValueSize);
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(Put(key, value));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+ data[key] = value;
+ }
+
+ // Verify full data set
+ VerifyDB(data);
+
+ SyncPoint::GetInstance()->SetCallBack(
+ io_failure_cases[case_num], [&](void * /*arg*/) {
+ fault_injection_env_->SetFilesystemActive(false, Status::IOError());
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ auto s = blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_TRUE(s.IsIOError());
+
+ // Reactivate file system to allow test to verify and close DB.
+ fault_injection_env_->SetFilesystemActive(true);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ // Verify full data set after compaction failure
+ VerifyDB(data);
+
+ delete options.compaction_filter;
+ Destroy();
+ }
+}
+
+// Test comapction filter should remove any expired blob index.
+TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
+ constexpr size_t kNumKeys = 100;
+ constexpr size_t kNumPuts = 1000;
+ constexpr uint64_t kMaxExpiration = 1000;
+ constexpr uint64_t kCompactTime = 500;
+ constexpr uint64_t kMinBlobSize = 100;
+ Random rnd(301);
+ mock_clock_->SetCurrentTime(0);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = kMinBlobSize;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ options.env = mock_env_.get();
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ std::map<std::string, std::string> data_after_compact;
+ for (size_t i = 0; i < kNumPuts; i++) {
+ bool is_small_value = rnd.Next() % 2;
+ bool has_ttl = rnd.Next() % 2;
+ uint64_t expiration = rnd.Next() % kMaxExpiration;
+ int len = is_small_value ? 10 : 200;
+ std::string key = "key" + std::to_string(rnd.Next() % kNumKeys);
+ std::string value = rnd.HumanReadableString(len);
+ if (!has_ttl) {
+ if (is_small_value) {
+ std::string blob_entry;
+ BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
+ // Fake blob index with TTL. See what it will do.
+ ASSERT_GT(kMinBlobSize, blob_entry.size());
+ value = blob_entry;
+ }
+ ASSERT_OK(Put(key, value));
+ data_after_compact[key] = value;
+ } else {
+ ASSERT_OK(PutUntil(key, value, expiration));
+ if (expiration <= kCompactTime) {
+ data_after_compact.erase(key);
+ } else {
+ data_after_compact[key] = value;
+ }
+ }
+ data[key] = value;
+ }
+ VerifyDB(data);
+
+ mock_clock_->SetCurrentTime(kCompactTime);
+ // Take a snapshot before compaction. Make sure expired blob indexes is
+ // filtered regardless of snapshot.
+ const Snapshot *snapshot = blob_db_->GetSnapshot();
+ // Issue manual compaction to trigger compaction filter.
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ blob_db_->ReleaseSnapshot(snapshot);
+ // Verify expired blob index are filtered.
+ std::vector<KeyVersion> versions;
+ const size_t kMaxKeys = 10000;
+ ASSERT_OK(GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(data_after_compact.size(), versions.size());
+ for (auto &version : versions) {
+ ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
+ }
+ VerifyDB(data_after_compact);
+}
+
+// Test compaction filter should remove any blob index where corresponding
+// blob file has been removed.
+TEST_F(BlobDBTest, FilterFileNotAvailable) {
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ options.disable_auto_compactions = true;
+ Open(bdb_options, options);
+
+ ASSERT_OK(Put("foo", "v1"));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+
+ ASSERT_OK(Put("bar", "v2"));
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
+
+ const size_t kMaxKeys = 10000;
+
+ DB *base_db = blob_db_->GetRootDB();
+ std::vector<KeyVersion> versions;
+ ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(2, versions.size());
+ ASSERT_EQ("bar", versions[0].user_key);
+ ASSERT_EQ("foo", versions[1].user_key);
+ VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
+
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(2, versions.size());
+ ASSERT_EQ("bar", versions[0].user_key);
+ ASSERT_EQ("foo", versions[1].user_key);
+ VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
+
+ // Remove the first blob file and compact. foo should be remove from base db.
+ blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(1, versions.size());
+ ASSERT_EQ("bar", versions[0].user_key);
+ VerifyDB({{"bar", "v2"}});
+
+ // Remove the second blob file and compact. bar should be remove from base db.
+ blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(0, versions.size());
+ VerifyDB({});
+}
+
+// Test compaction filter should filter any inlined TTL keys that would have
+// been dropped by last FIFO eviction if they are store out-of-line.
+TEST_F(BlobDBTest, FilterForFIFOEviction) {
+ Random rnd(215);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 100;
+ bdb_options.ttl_range_secs = 60;
+ bdb_options.max_db_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ // Use mock env to stop wall clock.
+ mock_clock_->SetCurrentTime(0);
+ options.env = mock_env_.get();
+ auto statistics = CreateDBStatistics();
+ options.statistics = statistics;
+ options.disable_auto_compactions = true;
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ std::map<std::string, std::string> data_after_compact;
+ // Insert some small values that will be inlined.
+ for (int i = 0; i < 1000; i++) {
+ std::string key = "key" + std::to_string(i);
+ std::string value = rnd.HumanReadableString(50);
+ uint64_t ttl = rnd.Next() % 120 + 1;
+ ASSERT_OK(PutWithTTL(key, value, ttl, &data));
+ if (ttl >= 60) {
+ data_after_compact[key] = value;
+ }
+ }
+ uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+ uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
+ ASSERT_GT(live_sst_size, 0);
+ VerifyDB(data);
+
+ bdb_options.max_db_size = live_sst_size + 30000;
+ bdb_options.is_fifo = true;
+ Reopen(bdb_options, options);
+ VerifyDB(data);
+
+ // Put two large values, each on a different blob file.
+ std::string large_value(10000, 'v');
+ ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
+ ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
+ ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ data["large_key1"] = large_value;
+ data["large_key2"] = large_value;
+ VerifyDB(data);
+
+ // Put a third large value which will bring the DB out of space.
+ // FIFO eviction will evict the file of large_key1.
+ ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ data.erase("large_key1");
+ data["large_key3"] = large_value;
+ VerifyDB(data);
+
+ // Putting some more small values. These values shouldn't be evicted by
+ // compaction filter since they are inserted after FIFO eviction.
+ ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
+ ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
+
+ // FIFO eviction doesn't trigger again since there enough room for the flush.
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+
+ // Manual compact and check if compaction filter evict those keys with
+ // expiration < 60.
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ // All keys with expiration < 60, plus large_key1 is filtered by
+ // compaction filter.
+ ASSERT_EQ(num_keys_to_evict + 1,
+ statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ data_after_compact["large_key2"] = large_value;
+ data_after_compact["large_key3"] = large_value;
+ VerifyDB(data_after_compact);
+}
+
+TEST_F(BlobDBTest, GarbageCollection) {
+ constexpr size_t kNumPuts = 1 << 10;
+
+ constexpr uint64_t kExpiration = 1000;
+ constexpr uint64_t kCompactTime = 500;
+
+ constexpr uint64_t kKeySize = 7; // "key" + 4 digits
+
+ constexpr uint64_t kSmallValueSize = 1 << 6;
+ constexpr uint64_t kLargeValueSize = 1 << 8;
+ constexpr uint64_t kMinBlobSize = 1 << 7;
+ static_assert(kSmallValueSize < kMinBlobSize, "");
+ static_assert(kLargeValueSize > kMinBlobSize, "");
+
+ constexpr size_t kBlobsPerFile = 8;
+ constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
+ constexpr uint64_t kBlobFileSize =
+ BlobLogHeader::kSize +
+ (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
+
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = kMinBlobSize;
+ bdb_options.blob_file_size = kBlobFileSize;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.garbage_collection_cutoff = 0.25;
+ bdb_options.disable_background_tasks = true;
+
+ Options options;
+ options.env = mock_env_.get();
+ options.statistics = CreateDBStatistics();
+
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ std::map<std::string, KeyVersion> blob_value_versions;
+ std::map<std::string, BlobIndexVersion> blob_index_versions;
+
+ Random rnd(301);
+
+ // Add a bunch of large non-TTL values. These will be written to non-TTL
+ // blob files and will be subject to GC.
+ for (size_t i = 0; i < kNumPuts; ++i) {
+ std::ostringstream oss;
+ oss << "key" << std::setw(4) << std::setfill('0') << i;
+
+ const std::string key(oss.str());
+ const std::string value = rnd.HumanReadableString(kLargeValueSize);
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(Put(key, value));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
+ blob_index_versions[key] =
+ BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
+ sequence, kTypeBlobIndex);
+ }
+
+ // Add some small and/or TTL values that will be ignored during GC.
+ // First, add a large TTL value will be written to its own TTL blob file.
+ {
+ const std::string key("key2000");
+ const std::string value = rnd.HumanReadableString(kLargeValueSize);
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(PutUntil(key, value, kExpiration));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
+ blob_index_versions[key] =
+ BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
+ sequence, kTypeBlobIndex);
+ }
+
+ // Now add a small TTL value (which will be inlined).
+ {
+ const std::string key("key3000");
+ const std::string value = rnd.HumanReadableString(kSmallValueSize);
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(PutUntil(key, value, kExpiration));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
+ blob_index_versions[key] = BlobIndexVersion(
+ key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
+ }
+
+ // Finally, add a small non-TTL value (which will be stored as a regular
+ // value).
+ {
+ const std::string key("key4000");
+ const std::string value = rnd.HumanReadableString(kSmallValueSize);
+ const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+
+ ASSERT_OK(Put(key, value));
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+
+ data[key] = value;
+ blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
+ blob_index_versions[key] = BlobIndexVersion(
+ key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
+ }
+
+ VerifyDB(data);
+ VerifyBaseDB(blob_value_versions);
+ VerifyBaseDBBlobIndex(blob_index_versions);
+
+ // At this point, we should have 128 immutable non-TTL files with file numbers
+ // 1..128.
+ {
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
+ for (size_t i = 0; i < kNumBlobFiles; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ ASSERT_EQ(live_imm_files[i]->GetFileSize(),
+ kBlobFileSize + BlobLogFooter::kSize);
+ }
+ }
+
+ mock_clock_->SetCurrentTime(kCompactTime);
+
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ // We expect the data to remain the same and the blobs from the oldest N files
+ // to be moved to new files. Sequence numbers get zeroed out during the
+ // compaction.
+ VerifyDB(data);
+
+ for (auto &pair : blob_value_versions) {
+ KeyVersion &version = pair.second;
+ version.sequence = 0;
+ }
+
+ VerifyBaseDB(blob_value_versions);
+
+ const uint64_t cutoff = static_cast<uint64_t>(
+ bdb_options.garbage_collection_cutoff * kNumBlobFiles);
+ for (auto &pair : blob_index_versions) {
+ BlobIndexVersion &version = pair.second;
+
+ version.sequence = 0;
+
+ if (version.file_number == kInvalidBlobFileNumber) {
+ continue;
+ }
+
+ if (version.file_number > cutoff) {
+ continue;
+ }
+
+ version.file_number += kNumBlobFiles + 1;
+ }
+
+ VerifyBaseDBBlobIndex(blob_index_versions);
+
+ const Statistics *const statistics = options.statistics.get();
+ assert(statistics);
+
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
+ cutoff * kBlobsPerFile);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
+ cutoff * kBlobsPerFile * kLargeValueSize);
+
+ // At this point, we should have 128 immutable non-TTL files with file numbers
+ // 33..128 and 130..161. (129 was taken by the TTL blob file.)
+ {
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
+ for (size_t i = 0; i < kNumBlobFiles; ++i) {
+ uint64_t expected_file_number = i + cutoff + 1;
+ if (expected_file_number > kNumBlobFiles) {
+ ++expected_file_number;
+ }
+
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
+ ASSERT_EQ(live_imm_files[i]->GetFileSize(),
+ kBlobFileSize + BlobLogFooter::kSize);
+ }
+ }
+}
+
+TEST_F(BlobDBTest, GarbageCollectionFailure) {
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.garbage_collection_cutoff = 1.0;
+ bdb_options.disable_background_tasks = true;
+
+ Options db_options;
+ db_options.statistics = CreateDBStatistics();
+
+ Open(bdb_options, db_options);
+
+ // Write a couple of valid blobs.
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_OK(Put("dead", "beef"));
+
+ // Write a fake blob reference into the base DB that points to a non-existing
+ // blob file.
+ std::string blob_index;
+ BlobIndex::EncodeBlob(&blob_index, /* file_number */ 1000, /* offset */ 1234,
+ /* size */ 5678, kNoCompression);
+
+ WriteBatch batch;
+ ASSERT_OK(WriteBatchInternal::PutBlobIndex(
+ &batch, blob_db_->DefaultColumnFamily()->GetID(), "key", blob_index));
+ ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
+
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(blob_files.size(), 1);
+ auto blob_file = blob_files[0];
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
+
+ ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+ .IsIOError());
+
+ const Statistics *const statistics = db_options.statistics.get();
+ assert(statistics);
+
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
+ ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
+}
+
+// File should be evicted after expiration.
+TEST_F(BlobDBTest, EvictExpiredFile) {
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 100;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ options.env = mock_env_.get();
+ Open(bdb_options, options);
+ mock_clock_->SetCurrentTime(50);
+ std::map<std::string, std::string> data;
+ ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ auto blob_file = blob_files[0];
+ ASSERT_FALSE(blob_file->Immutable());
+ ASSERT_FALSE(blob_file->Obsolete());
+ VerifyDB(data);
+ mock_clock_->SetCurrentTime(250);
+ // The key should expired now.
+ blob_db_impl()->TEST_EvictExpiredFiles();
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ ASSERT_TRUE(blob_file->Immutable());
+ ASSERT_TRUE(blob_file->Obsolete());
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ // Make sure we don't return garbage value after blob file being evicted,
+ // but the blob index still exists in the LSM tree.
+ std::string val = "";
+ ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
+ ASSERT_EQ("", val);
+}
+
+TEST_F(BlobDBTest, DisableFileDeletions) {
+ BlobDBOptions bdb_options;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (bool force : {true, false}) {
+ ASSERT_OK(Put("foo", "v", &data));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ auto blob_file = blob_files[0];
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
+ blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ // Call DisableFileDeletions twice.
+ ASSERT_OK(blob_db_->DisableFileDeletions());
+ ASSERT_OK(blob_db_->DisableFileDeletions());
+ // File deletions should be disabled.
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ VerifyDB(data);
+ // Enable file deletions once. If force=true, file deletion is enabled.
+ // Otherwise it needs to enable it for a second time.
+ ASSERT_OK(blob_db_->EnableFileDeletions(force));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ if (!force) {
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ VerifyDB(data);
+ // Call EnableFileDeletions a second time.
+ ASSERT_OK(blob_db_->EnableFileDeletions(false));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ }
+ // Regardless of value of `force`, file should be deleted by now.
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ VerifyDB({});
+ }
+}
+
+TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
+ BlobDBOptions bdb_options;
+ bdb_options.enable_garbage_collection = true;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+
+ // Register some dummy blob files.
+ blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
+ blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
+ blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
+ blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
+ blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
+
+ // Initialize the blob <-> SST file mapping. First, add some SST files with
+ // blob file references, then some without.
+ std::vector<LiveFileMetaData> live_files;
+
+ for (uint64_t i = 1; i <= 10; ++i) {
+ LiveFileMetaData live_file;
+ live_file.file_number = i;
+ live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
+
+ live_files.emplace_back(live_file);
+ }
+
+ for (uint64_t i = 11; i <= 20; ++i) {
+ LiveFileMetaData live_file;
+ live_file.file_number = i;
+
+ live_files.emplace_back(live_file);
+ }
+
+ blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
+
+ // Check that the blob <-> SST mappings have been correctly initialized.
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+
+ ASSERT_EQ(blob_files.size(), 5);
+
+ {
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 5);
+ for (size_t i = 0; i < 5; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ }
+
+ ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
+ }
+
+ {
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
+ const std::vector<bool> expected_obsolete{false, false, false, false,
+ false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 5);
+ for (size_t i = 0; i < 5; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ }
+
+ ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
+ }
+
+ // Simulate a flush where the SST does not reference any blob files.
+ {
+ FlushJobInfo info{};
+ info.file_number = 21;
+ info.smallest_seqno = 1;
+ info.largest_seqno = 100;
+
+ blob_db_impl()->TEST_ProcessFlushJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
+ const std::vector<bool> expected_obsolete{false, false, false, false,
+ false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 5);
+ for (size_t i = 0; i < 5; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ }
+
+ ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
+ }
+
+ // Simulate a flush where the SST references a blob file.
+ {
+ FlushJobInfo info{};
+ info.file_number = 22;
+ info.oldest_blob_file_number = 5;
+ info.smallest_seqno = 101;
+ info.largest_seqno = 200;
+
+ blob_db_impl()->TEST_ProcessFlushJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
+ const std::vector<bool> expected_obsolete{false, false, false, false,
+ false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 5);
+ for (size_t i = 0; i < 5; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
+ }
+
+ ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
+ }
+
+ // Simulate a compaction. Some inputs and outputs have blob file references,
+ // some don't. There is also a trivial move (which means the SST appears on
+ // both the input and the output list). Blob file 1 loses all its linked SSTs,
+ // and since it got marked immutable at sequence number 200 which has already
+ // been flushed, it can be marked obsolete.
+ {
+ CompactionJobInfo info{};
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
+ info.input_file_infos.emplace_back(
+ CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
+ info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
+ info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
+ info.output_file_infos.emplace_back(
+ CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
+
+ blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
+ const std::vector<bool> expected_obsolete{true, false, false, false, false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 4);
+ for (size_t i = 0; i < 4; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
+ }
+
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(obsolete_files.size(), 1);
+ ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
+ }
+
+ // Simulate a failed compaction. No mappings should be updated.
+ {
+ CompactionJobInfo info{};
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
+ info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
+ info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
+ info.status = Status::Corruption();
+
+ blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
+ const std::vector<bool> expected_obsolete{true, false, false, false, false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 4);
+ for (size_t i = 0; i < 4; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
+ }
+
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(obsolete_files.size(), 1);
+ ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
+ }
+
+ // Simulate another compaction. Blob file 2 loses all its linked SSTs
+ // but since it got marked immutable at sequence number 300 which hasn't
+ // been flushed yet, it cannot be marked obsolete at this point.
+ {
+ CompactionJobInfo info{};
+ info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
+ info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
+ info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
+
+ blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
+ const std::vector<bool> expected_obsolete{true, false, false, false, false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 4);
+ for (size_t i = 0; i < 4; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
+ }
+
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(obsolete_files.size(), 1);
+ ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
+ }
+
+ // Simulate a flush with largest sequence number 300. This will make it
+ // possible to mark blob file 2 obsolete.
+ {
+ FlushJobInfo info{};
+ info.file_number = 26;
+ info.smallest_seqno = 201;
+ info.largest_seqno = 300;
+
+ blob_db_impl()->TEST_ProcessFlushJobInfo(info);
+
+ const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
+ {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
+ const std::vector<bool> expected_obsolete{true, true, false, false, false};
+ for (size_t i = 0; i < 5; ++i) {
+ const auto &blob_file = blob_files[i];
+ ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
+ ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
+ }
+
+ auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
+ ASSERT_EQ(live_imm_files.size(), 3);
+ for (size_t i = 0; i < 3; ++i) {
+ ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
+ }
+
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(obsolete_files.size(), 2);
+ ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
+ ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
+ }
+}
+
+TEST_F(BlobDBTest, ShutdownWait) {
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 100;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = false;
+ Options options;
+ options.env = mock_env_.get();
+
+ SyncPoint::GetInstance()->LoadDependency({
+ {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
+ {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
+ {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
+ {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
+ });
+ // Force all tasks to be scheduled immediately.
+ SyncPoint::GetInstance()->SetCallBack(
+ "TimeQueue::Add:item.end", [&](void *arg) {
+ std::chrono::steady_clock::time_point *tp =
+ static_cast<std::chrono::steady_clock::time_point *>(arg);
+ *tp =
+ std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
+ });
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
+ // Sleep 3 ms to increase the chance of data race.
+ // We've synced up the code so that EvictExpiredFiles()
+ // is called concurrently with ~BlobDBImpl().
+ // ~BlobDBImpl() is supposed to wait for all background
+ // task to shutdown before doing anything else. In order
+ // to use the same test to reproduce a bug of the waiting
+ // logic, we wait a little bit here, so that TSAN can
+ // catch the data race.
+ // We should improve the test if we find a better way.
+ Env::Default()->SleepForMicroseconds(3000);
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Open(bdb_options, options);
+ mock_clock_->SetCurrentTime(50);
+ std::map<std::string, std::string> data;
+ ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ auto blob_file = blob_files[0];
+ ASSERT_FALSE(blob_file->Immutable());
+ ASSERT_FALSE(blob_file->Obsolete());
+ VerifyDB(data);
+
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
+ mock_clock_->SetCurrentTime(250);
+ // The key should expired now.
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
+
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
+ Close();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(BlobDBTest, SyncBlobFileBeforeClose) {
+ Options options;
+ options.statistics = CreateDBStatistics();
+
+ BlobDBOptions blob_options;
+ blob_options.min_blob_size = 0;
+ blob_options.bytes_per_sync = 1 << 20;
+ blob_options.disable_background_tasks = true;
+
+ Open(blob_options, options);
+
+ ASSERT_OK(Put("foo", "bar"));
+
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(blob_files.size(), 1);
+
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+ ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_BLOB_FILE_SYNCED), 1);
+}
+
+TEST_F(BlobDBTest, SyncBlobFileBeforeCloseIOError) {
+ Options options;
+ options.env = fault_injection_env_.get();
+
+ BlobDBOptions blob_options;
+ blob_options.min_blob_size = 0;
+ blob_options.bytes_per_sync = 1 << 20;
+ blob_options.disable_background_tasks = true;
+
+ Open(blob_options, options);
+
+ ASSERT_OK(Put("foo", "bar"));
+
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(blob_files.size(), 1);
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlobLogWriter::Sync", [this](void * /* arg */) {
+ fault_injection_env_->SetFilesystemActive(false, Status::IOError());
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ const Status s = blob_db_impl()->TEST_CloseBlobFile(blob_files[0]);
+
+ fault_injection_env_->SetFilesystemActive(true);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ ASSERT_TRUE(s.IsIOError());
+}
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+
+// A black-box test for the ttl wrapper around rocksdb
+int main(int argc, char **argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_dump_tool.cc b/src/rocksdb/utilities/blob_db/blob_dump_tool.cc
new file mode 100644
index 000000000..1e0632990
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_dump_tool.cc
@@ -0,0 +1,282 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_dump_tool.h"
+
+#include <stdio.h>
+
+#include <cinttypes>
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include "file/random_access_file_reader.h"
+#include "file/readahead_raf.h"
+#include "port/port.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/file_system.h"
+#include "table/format.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+BlobDumpTool::BlobDumpTool()
+ : reader_(nullptr), buffer_(nullptr), buffer_size_(0) {}
+
+Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key,
+ DisplayType show_blob,
+ DisplayType show_uncompressed_blob,
+ bool show_summary) {
+ constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
+ Status s;
+ const auto fs = FileSystem::Default();
+ IOOptions io_opts;
+ s = fs->FileExists(filename, io_opts, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ uint64_t file_size = 0;
+ s = fs->GetFileSize(filename, io_opts, &file_size, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ std::unique_ptr<FSRandomAccessFile> file;
+ s = fs->NewRandomAccessFile(filename, FileOptions(), &file, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ file = NewReadaheadRandomAccessFile(std::move(file), kReadaheadSize);
+ if (file_size == 0) {
+ return Status::Corruption("File is empty.");
+ }
+ reader_.reset(new RandomAccessFileReader(std::move(file), filename));
+ uint64_t offset = 0;
+ uint64_t footer_offset = 0;
+ CompressionType compression = kNoCompression;
+ s = DumpBlobLogHeader(&offset, &compression);
+ if (!s.ok()) {
+ return s;
+ }
+ s = DumpBlobLogFooter(file_size, &footer_offset);
+ if (!s.ok()) {
+ return s;
+ }
+ uint64_t total_records = 0;
+ uint64_t total_key_size = 0;
+ uint64_t total_blob_size = 0;
+ uint64_t total_uncompressed_blob_size = 0;
+ if (show_key != DisplayType::kNone || show_summary) {
+ while (offset < footer_offset) {
+ s = DumpRecord(show_key, show_blob, show_uncompressed_blob, show_summary,
+ compression, &offset, &total_records, &total_key_size,
+ &total_blob_size, &total_uncompressed_blob_size);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+ if (show_summary) {
+ fprintf(stdout, "Summary:\n");
+ fprintf(stdout, " total records: %" PRIu64 "\n", total_records);
+ fprintf(stdout, " total key size: %" PRIu64 "\n", total_key_size);
+ fprintf(stdout, " total blob size: %" PRIu64 "\n", total_blob_size);
+ if (compression != kNoCompression) {
+ fprintf(stdout, " total raw blob size: %" PRIu64 "\n",
+ total_uncompressed_blob_size);
+ }
+ }
+ return s;
+}
+
+Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) {
+ if (buffer_size_ < size) {
+ if (buffer_size_ == 0) {
+ buffer_size_ = 4096;
+ }
+ while (buffer_size_ < size) {
+ buffer_size_ *= 2;
+ }
+ buffer_.reset(new char[buffer_size_]);
+ }
+ Status s = reader_->Read(IOOptions(), offset, size, result, buffer_.get(),
+ nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
+ if (!s.ok()) {
+ return s;
+ }
+ if (result->size() != size) {
+ return Status::Corruption("Reach the end of the file unexpectedly.");
+ }
+ return s;
+}
+
+Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset,
+ CompressionType* compression) {
+ Slice slice;
+ Status s = Read(0, BlobLogHeader::kSize, &slice);
+ if (!s.ok()) {
+ return s;
+ }
+ BlobLogHeader header;
+ s = header.DecodeFrom(slice);
+ if (!s.ok()) {
+ return s;
+ }
+ fprintf(stdout, "Blob log header:\n");
+ fprintf(stdout, " Version : %" PRIu32 "\n", header.version);
+ fprintf(stdout, " Column Family ID : %" PRIu32 "\n",
+ header.column_family_id);
+ std::string compression_str;
+ if (!GetStringFromCompressionType(&compression_str, header.compression)
+ .ok()) {
+ compression_str = "Unrecongnized compression type (" +
+ std::to_string((int)header.compression) + ")";
+ }
+ fprintf(stdout, " Compression : %s\n", compression_str.c_str());
+ fprintf(stdout, " Expiration range : %s\n",
+ GetString(header.expiration_range).c_str());
+ *offset = BlobLogHeader::kSize;
+ *compression = header.compression;
+ return s;
+}
+
+Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size,
+ uint64_t* footer_offset) {
+ auto no_footer = [&]() {
+ *footer_offset = file_size;
+ fprintf(stdout, "No blob log footer.\n");
+ return Status::OK();
+ };
+ if (file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
+ return no_footer();
+ }
+ Slice slice;
+ *footer_offset = file_size - BlobLogFooter::kSize;
+ Status s = Read(*footer_offset, BlobLogFooter::kSize, &slice);
+ if (!s.ok()) {
+ return s;
+ }
+ BlobLogFooter footer;
+ s = footer.DecodeFrom(slice);
+ if (!s.ok()) {
+ return no_footer();
+ }
+ fprintf(stdout, "Blob log footer:\n");
+ fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count);
+ fprintf(stdout, " Expiration Range : %s\n",
+ GetString(footer.expiration_range).c_str());
+ return s;
+}
+
+Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
+ DisplayType show_uncompressed_blob,
+ bool show_summary, CompressionType compression,
+ uint64_t* offset, uint64_t* total_records,
+ uint64_t* total_key_size,
+ uint64_t* total_blob_size,
+ uint64_t* total_uncompressed_blob_size) {
+ if (show_key != DisplayType::kNone) {
+ fprintf(stdout, "Read record with offset 0x%" PRIx64 " (%" PRIu64 "):\n",
+ *offset, *offset);
+ }
+ Slice slice;
+ Status s = Read(*offset, BlobLogRecord::kHeaderSize, &slice);
+ if (!s.ok()) {
+ return s;
+ }
+ BlobLogRecord record;
+ s = record.DecodeHeaderFrom(slice);
+ if (!s.ok()) {
+ return s;
+ }
+ uint64_t key_size = record.key_size;
+ uint64_t value_size = record.value_size;
+ if (show_key != DisplayType::kNone) {
+ fprintf(stdout, " key size : %" PRIu64 "\n", key_size);
+ fprintf(stdout, " value size : %" PRIu64 "\n", value_size);
+ fprintf(stdout, " expiration : %" PRIu64 "\n", record.expiration);
+ }
+ *offset += BlobLogRecord::kHeaderSize;
+ s = Read(*offset, static_cast<size_t>(key_size + value_size), &slice);
+ if (!s.ok()) {
+ return s;
+ }
+ // Decompress value
+ std::string uncompressed_value;
+ if (compression != kNoCompression &&
+ (show_uncompressed_blob != DisplayType::kNone || show_summary)) {
+ BlockContents contents;
+ UncompressionContext context(compression);
+ UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
+ compression);
+ s = UncompressBlockData(
+ info, slice.data() + key_size, static_cast<size_t>(value_size),
+ &contents, 2 /*compress_format_version*/, ImmutableOptions(Options()));
+ if (!s.ok()) {
+ return s;
+ }
+ uncompressed_value = contents.data.ToString();
+ }
+ if (show_key != DisplayType::kNone) {
+ fprintf(stdout, " key : ");
+ DumpSlice(Slice(slice.data(), static_cast<size_t>(key_size)), show_key);
+ if (show_blob != DisplayType::kNone) {
+ fprintf(stdout, " blob : ");
+ DumpSlice(Slice(slice.data() + static_cast<size_t>(key_size),
+ static_cast<size_t>(value_size)),
+ show_blob);
+ }
+ if (show_uncompressed_blob != DisplayType::kNone) {
+ fprintf(stdout, " raw blob : ");
+ DumpSlice(Slice(uncompressed_value), show_uncompressed_blob);
+ }
+ }
+ *offset += key_size + value_size;
+ *total_records += 1;
+ *total_key_size += key_size;
+ *total_blob_size += value_size;
+ *total_uncompressed_blob_size += uncompressed_value.size();
+ return s;
+}
+
+void BlobDumpTool::DumpSlice(const Slice s, DisplayType type) {
+ if (type == DisplayType::kRaw) {
+ fprintf(stdout, "%s\n", s.ToString().c_str());
+ } else if (type == DisplayType::kHex) {
+ fprintf(stdout, "%s\n", s.ToString(true /*hex*/).c_str());
+ } else if (type == DisplayType::kDetail) {
+ char buf[100];
+ for (size_t i = 0; i < s.size(); i += 16) {
+ memset(buf, 0, sizeof(buf));
+ for (size_t j = 0; j < 16 && i + j < s.size(); j++) {
+ unsigned char c = s[i + j];
+ snprintf(buf + j * 3 + 15, 2, "%x", c >> 4);
+ snprintf(buf + j * 3 + 16, 2, "%x", c & 0xf);
+ snprintf(buf + j + 65, 2, "%c", (0x20 <= c && c <= 0x7e) ? c : '.');
+ }
+ for (size_t p = 0; p + 1 < sizeof(buf); p++) {
+ if (buf[p] == 0) {
+ buf[p] = ' ';
+ }
+ }
+ fprintf(stdout, "%s\n", i == 0 ? buf + 15 : buf);
+ }
+ }
+}
+
+template <class T>
+std::string BlobDumpTool::GetString(std::pair<T, T> p) {
+ if (p.first == 0 && p.second == 0) {
+ return "nil";
+ }
+ return "(" + std::to_string(p.first) + ", " + std::to_string(p.second) + ")";
+}
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_dump_tool.h b/src/rocksdb/utilities/blob_db/blob_dump_tool.h
new file mode 100644
index 000000000..bece564e1
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_dump_tool.h
@@ -0,0 +1,58 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "db/blob/blob_log_format.h"
+#include "file/random_access_file_reader.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+class BlobDumpTool {
+ public:
+ enum class DisplayType {
+ kNone,
+ kRaw,
+ kHex,
+ kDetail,
+ };
+
+ BlobDumpTool();
+
+ Status Run(const std::string& filename, DisplayType show_key,
+ DisplayType show_blob, DisplayType show_uncompressed_blob,
+ bool show_summary);
+
+ private:
+ std::unique_ptr<RandomAccessFileReader> reader_;
+ std::unique_ptr<char[]> buffer_;
+ size_t buffer_size_;
+
+ Status Read(uint64_t offset, size_t size, Slice* result);
+ Status DumpBlobLogHeader(uint64_t* offset, CompressionType* compression);
+ Status DumpBlobLogFooter(uint64_t file_size, uint64_t* footer_offset);
+ Status DumpRecord(DisplayType show_key, DisplayType show_blob,
+ DisplayType show_uncompressed_blob, bool show_summary,
+ CompressionType compression, uint64_t* offset,
+ uint64_t* total_records, uint64_t* total_key_size,
+ uint64_t* total_blob_size,
+ uint64_t* total_uncompressed_blob_size);
+ void DumpSlice(const Slice s, DisplayType type);
+
+ template <class T>
+ std::string GetString(std::pair<T, T> p);
+};
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_file.cc b/src/rocksdb/utilities/blob_db/blob_file.cc
new file mode 100644
index 000000000..c68e557c6
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_file.cc
@@ -0,0 +1,318 @@
+
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+#include "utilities/blob_db/blob_file.h"
+
+#include <stdio.h>
+
+#include <algorithm>
+#include <cinttypes>
+#include <memory>
+
+#include "db/column_family.h"
+#include "db/db_impl/db_impl.h"
+#include "db/dbformat.h"
+#include "file/filename.h"
+#include "file/readahead_raf.h"
+#include "logging/logging.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace blob_db {
+
+BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
+ Logger* info_log)
+ : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {}
+
+BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
+ Logger* info_log, uint32_t column_family_id,
+ CompressionType compression, bool has_ttl,
+ const ExpirationRange& expiration_range)
+ : parent_(p),
+ path_to_dir_(bdir),
+ file_number_(fn),
+ info_log_(info_log),
+ column_family_id_(column_family_id),
+ compression_(compression),
+ has_ttl_(has_ttl),
+ expiration_range_(expiration_range),
+ header_(column_family_id, compression, has_ttl, expiration_range),
+ header_valid_(true) {}
+
+BlobFile::~BlobFile() {
+ if (obsolete_) {
+ std::string pn(PathName());
+ Status s = Env::Default()->DeleteFile(PathName());
+ if (!s.ok()) {
+ // ROCKS_LOG_INFO(db_options_.info_log,
+ // "File could not be deleted %s", pn.c_str());
+ }
+ }
+}
+
+uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; }
+
+std::string BlobFile::PathName() const {
+ return BlobFileName(path_to_dir_, file_number_);
+}
+
+std::string BlobFile::DumpState() const {
+ char str[1000];
+ snprintf(
+ str, sizeof(str),
+ "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64
+ " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
+ "), writer: %d reader: %d",
+ path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(),
+ closed_.load(), obsolete_.load(), expiration_range_.first,
+ expiration_range_.second, (!!log_writer_), (!!ra_file_reader_));
+ return str;
+}
+
+void BlobFile::MarkObsolete(SequenceNumber sequence) {
+ assert(Immutable());
+ obsolete_sequence_ = sequence;
+ obsolete_.store(true);
+}
+
+Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) {
+ BlobLogFooter footer;
+ footer.blob_count = blob_count_;
+ if (HasTTL()) {
+ footer.expiration_range = expiration_range_;
+ }
+
+ // this will close the file and reset the Writable File Pointer.
+ Status s = log_writer_->AppendFooter(footer, /* checksum_method */ nullptr,
+ /* checksum_value */ nullptr);
+ if (s.ok()) {
+ closed_ = true;
+ immutable_sequence_ = sequence;
+ file_size_ += BlobLogFooter::kSize;
+ }
+ // delete the sequential writer
+ log_writer_.reset();
+ return s;
+}
+
+Status BlobFile::ReadFooter(BlobLogFooter* bf) {
+ if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) {
+ return Status::IOError("File does not have footer", PathName());
+ }
+
+ uint64_t footer_offset = file_size_ - BlobLogFooter::kSize;
+ // assume that ra_file_reader_ is valid before we enter this
+ assert(ra_file_reader_);
+
+ Slice result;
+ std::string buf;
+ AlignedBuf aligned_buf;
+ Status s;
+ // TODO: rate limit reading footers from blob files.
+ if (ra_file_reader_->use_direct_io()) {
+ s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize,
+ &result, nullptr, &aligned_buf,
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ } else {
+ buf.reserve(BlobLogFooter::kSize + 10);
+ s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize,
+ &result, &buf[0], nullptr,
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ }
+ if (!s.ok()) return s;
+ if (result.size() != BlobLogFooter::kSize) {
+ // should not happen
+ return Status::IOError("EOF reached before footer");
+ }
+
+ s = bf->DecodeFrom(result);
+ return s;
+}
+
+Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
+ blob_count_ = footer.blob_count;
+ expiration_range_ = footer.expiration_range;
+ closed_ = true;
+ return Status::OK();
+}
+
+Status BlobFile::Fsync() {
+ Status s;
+ if (log_writer_.get()) {
+ s = log_writer_->Sync();
+ }
+ return s;
+}
+
+void BlobFile::CloseRandomAccessLocked() {
+ ra_file_reader_.reset();
+ last_access_ = -1;
+}
+
+Status BlobFile::GetReader(Env* env, const FileOptions& file_options,
+ std::shared_ptr<RandomAccessFileReader>* reader,
+ bool* fresh_open) {
+ assert(reader != nullptr);
+ assert(fresh_open != nullptr);
+ *fresh_open = false;
+ int64_t current_time = 0;
+ if (env->GetCurrentTime(&current_time).ok()) {
+ last_access_.store(current_time);
+ }
+ Status s;
+
+ {
+ ReadLock lockbfile_r(&mutex_);
+ if (ra_file_reader_) {
+ *reader = ra_file_reader_;
+ return s;
+ }
+ }
+
+ WriteLock lockbfile_w(&mutex_);
+ // Double check.
+ if (ra_file_reader_) {
+ *reader = ra_file_reader_;
+ return s;
+ }
+
+ std::unique_ptr<FSRandomAccessFile> rfile;
+ s = env->GetFileSystem()->NewRandomAccessFile(PathName(), file_options,
+ &rfile, nullptr);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to open blob file for random-read: %s status: '%s'"
+ " exists: '%s'",
+ PathName().c_str(), s.ToString().c_str(),
+ env->FileExists(PathName()).ToString().c_str());
+ return s;
+ }
+
+ ra_file_reader_ =
+ std::make_shared<RandomAccessFileReader>(std::move(rfile), PathName());
+ *reader = ra_file_reader_;
+ *fresh_open = true;
+ return s;
+}
+
+Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
+ const FileOptions& file_options) {
+ assert(Immutable());
+ // Get file size.
+ uint64_t file_size = 0;
+ Status s =
+ fs->GetFileSize(PathName(), file_options.io_options, &file_size, nullptr);
+ if (s.ok()) {
+ file_size_ = file_size;
+ } else {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to get size of blob file %" PRIu64 ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ if (file_size < BlobLogHeader::kSize) {
+ ROCKS_LOG_ERROR(
+ info_log_, "Incomplete blob file blob file %" PRIu64 ", size: %" PRIu64,
+ file_number_, file_size);
+ return Status::Corruption("Incomplete blob file header.");
+ }
+
+ // Create file reader.
+ std::unique_ptr<RandomAccessFileReader> file_reader;
+ s = RandomAccessFileReader::Create(fs, PathName(), file_options, &file_reader,
+ nullptr);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to open blob file %" PRIu64 ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+
+ // Read file header.
+ std::string header_buf;
+ AlignedBuf aligned_buf;
+ Slice header_slice;
+ // TODO: rate limit reading headers from blob files.
+ if (file_reader->use_direct_io()) {
+ s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice,
+ nullptr, &aligned_buf,
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ } else {
+ header_buf.reserve(BlobLogHeader::kSize);
+ s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice,
+ &header_buf[0], nullptr,
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ }
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ info_log_, "Failed to read header of blob file %" PRIu64 ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ BlobLogHeader header;
+ s = header.DecodeFrom(header_slice);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to decode header of blob file %" PRIu64
+ ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ column_family_id_ = header.column_family_id;
+ compression_ = header.compression;
+ has_ttl_ = header.has_ttl;
+ if (has_ttl_) {
+ expiration_range_ = header.expiration_range;
+ }
+ header_valid_ = true;
+
+ // Read file footer.
+ if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) {
+ // OK not to have footer.
+ assert(!footer_valid_);
+ return Status::OK();
+ }
+ std::string footer_buf;
+ Slice footer_slice;
+ // TODO: rate limit reading footers from blob files.
+ if (file_reader->use_direct_io()) {
+ s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize,
+ BlobLogFooter::kSize, &footer_slice, nullptr,
+ &aligned_buf,
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ } else {
+ footer_buf.reserve(BlobLogFooter::kSize);
+ s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize,
+ BlobLogFooter::kSize, &footer_slice, &footer_buf[0],
+ nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
+ }
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ info_log_, "Failed to read footer of blob file %" PRIu64 ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ BlobLogFooter footer;
+ s = footer.DecodeFrom(footer_slice);
+ if (!s.ok()) {
+ // OK not to have footer.
+ assert(!footer_valid_);
+ return Status::OK();
+ }
+ blob_count_ = footer.blob_count;
+ if (has_ttl_) {
+ assert(header.expiration_range.first <= footer.expiration_range.first);
+ assert(header.expiration_range.second >= footer.expiration_range.second);
+ expiration_range_ = footer.expiration_range;
+ }
+ footer_valid_ = true;
+ return Status::OK();
+}
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_file.h b/src/rocksdb/utilities/blob_db/blob_file.h
new file mode 100644
index 000000000..6f3f2bea7
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_file.h
@@ -0,0 +1,246 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+#include <limits>
+#include <memory>
+#include <unordered_set>
+
+#include "db/blob/blob_log_format.h"
+#include "db/blob/blob_log_writer.h"
+#include "file/random_access_file_reader.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/options.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+class BlobDBImpl;
+
+class BlobFile {
+ friend class BlobDBImpl;
+ friend struct BlobFileComparator;
+ friend struct BlobFileComparatorTTL;
+ friend class BlobIndexCompactionFilterBase;
+ friend class BlobIndexCompactionFilterGC;
+
+ private:
+ // access to parent
+ const BlobDBImpl* parent_{nullptr};
+
+ // path to blob directory
+ std::string path_to_dir_;
+
+ // the id of the file.
+ // the above 2 are created during file creation and never changed
+ // after that
+ uint64_t file_number_{0};
+
+ // The file numbers of the SST files whose oldest blob file reference
+ // points to this blob file.
+ std::unordered_set<uint64_t> linked_sst_files_;
+
+ // Info log.
+ Logger* info_log_{nullptr};
+
+ // Column family id.
+ uint32_t column_family_id_{std::numeric_limits<uint32_t>::max()};
+
+ // Compression type of blobs in the file
+ CompressionType compression_{kNoCompression};
+
+ // If true, the keys in this file all has TTL. Otherwise all keys don't
+ // have TTL.
+ bool has_ttl_{false};
+
+ // TTL range of blobs in the file.
+ ExpirationRange expiration_range_;
+
+ // number of blobs in the file
+ std::atomic<uint64_t> blob_count_{0};
+
+ // size of the file
+ std::atomic<uint64_t> file_size_{0};
+
+ BlobLogHeader header_;
+
+ // closed_ = true implies the file is no more mutable
+ // no more blobs will be appended and the footer has been written out
+ std::atomic<bool> closed_{false};
+
+ // The latest sequence number when the file was closed/made immutable.
+ SequenceNumber immutable_sequence_{0};
+
+ // Whether the file was marked obsolete (due to either TTL or GC).
+ // obsolete_ still needs to do iterator/snapshot checks
+ std::atomic<bool> obsolete_{false};
+
+ // The last sequence number by the time the file marked as obsolete.
+ // Data in this file is visible to a snapshot taken before the sequence.
+ SequenceNumber obsolete_sequence_{0};
+
+ // Sequential/Append writer for blobs
+ std::shared_ptr<BlobLogWriter> log_writer_;
+
+ // random access file reader for GET calls
+ std::shared_ptr<RandomAccessFileReader> ra_file_reader_;
+
+ // This Read-Write mutex is per file specific and protects
+ // all the datastructures
+ mutable port::RWMutex mutex_;
+
+ // time when the random access reader was last created.
+ std::atomic<std::int64_t> last_access_{-1};
+
+ bool header_valid_{false};
+
+ bool footer_valid_{false};
+
+ public:
+ BlobFile() = default;
+
+ BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
+ Logger* info_log);
+
+ BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
+ Logger* info_log, uint32_t column_family_id,
+ CompressionType compression, bool has_ttl,
+ const ExpirationRange& expiration_range);
+
+ ~BlobFile();
+
+ uint32_t GetColumnFamilyId() const;
+
+ // Returns log file's absolute pathname.
+ std::string PathName() const;
+
+ // Primary identifier for blob file.
+ // once the file is created, this never changes
+ uint64_t BlobFileNumber() const { return file_number_; }
+
+ // Get the set of SST files whose oldest blob file reference points to
+ // this file.
+ const std::unordered_set<uint64_t>& GetLinkedSstFiles() const {
+ return linked_sst_files_;
+ }
+
+ // Link an SST file whose oldest blob file reference points to this file.
+ void LinkSstFile(uint64_t sst_file_number) {
+ assert(linked_sst_files_.find(sst_file_number) == linked_sst_files_.end());
+ linked_sst_files_.insert(sst_file_number);
+ }
+
+ // Unlink an SST file whose oldest blob file reference points to this file.
+ void UnlinkSstFile(uint64_t sst_file_number) {
+ auto it = linked_sst_files_.find(sst_file_number);
+ assert(it != linked_sst_files_.end());
+ linked_sst_files_.erase(it);
+ }
+
+ // the following functions are atomic, and don't need
+ // read lock
+ uint64_t BlobCount() const {
+ return blob_count_.load(std::memory_order_acquire);
+ }
+
+ std::string DumpState() const;
+
+ // if the file is not taking any more appends.
+ bool Immutable() const { return closed_.load(); }
+
+ // Mark the file as immutable.
+ // REQUIRES: write lock held, or access from single thread (on DB open).
+ void MarkImmutable(SequenceNumber sequence) {
+ closed_ = true;
+ immutable_sequence_ = sequence;
+ }
+
+ SequenceNumber GetImmutableSequence() const {
+ assert(Immutable());
+ return immutable_sequence_;
+ }
+
+ // Whether the file was marked obsolete (due to either TTL or GC).
+ bool Obsolete() const {
+ assert(Immutable() || !obsolete_.load());
+ return obsolete_.load();
+ }
+
+ // Mark file as obsolete (due to either TTL or GC). The file is not visible to
+ // snapshots with sequence greater or equal to the given sequence.
+ void MarkObsolete(SequenceNumber sequence);
+
+ SequenceNumber GetObsoleteSequence() const {
+ assert(Obsolete());
+ return obsolete_sequence_;
+ }
+
+ Status Fsync();
+
+ uint64_t GetFileSize() const {
+ return file_size_.load(std::memory_order_acquire);
+ }
+
+ // All Get functions which are not atomic, will need ReadLock on the mutex
+
+ const ExpirationRange& GetExpirationRange() const {
+ return expiration_range_;
+ }
+
+ void ExtendExpirationRange(uint64_t expiration) {
+ expiration_range_.first = std::min(expiration_range_.first, expiration);
+ expiration_range_.second = std::max(expiration_range_.second, expiration);
+ }
+
+ bool HasTTL() const { return has_ttl_; }
+
+ void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
+
+ CompressionType GetCompressionType() const { return compression_; }
+
+ std::shared_ptr<BlobLogWriter> GetWriter() const { return log_writer_; }
+
+ // Read blob file header and footer. Return corruption if file header is
+ // malform or incomplete. If footer is malform or incomplete, set
+ // footer_valid_ to false and return Status::OK.
+ Status ReadMetadata(const std::shared_ptr<FileSystem>& fs,
+ const FileOptions& file_options);
+
+ Status GetReader(Env* env, const FileOptions& file_options,
+ std::shared_ptr<RandomAccessFileReader>* reader,
+ bool* fresh_open);
+
+ private:
+ Status ReadFooter(BlobLogFooter* footer);
+
+ Status WriteFooterAndCloseLocked(SequenceNumber sequence);
+
+ void CloseRandomAccessLocked();
+
+ // this is used, when you are reading only the footer of a
+ // previously closed file
+ Status SetFromFooterLocked(const BlobLogFooter& footer);
+
+ void set_expiration_range(const ExpirationRange& expiration_range) {
+ expiration_range_ = expiration_range;
+ }
+
+ // The following functions are atomic, and don't need locks
+ void SetFileSize(uint64_t fs) { file_size_ = fs; }
+
+ void SetBlobCount(uint64_t bc) { blob_count_ = bc; }
+
+ void BlobRecordAdded(uint64_t record_size) {
+ ++blob_count_;
+ file_size_ += record_size;
+ }
+};
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE